"""Ingress Spooler — SQLite logging layer for Icarus webhook payloads.
Writes raw payloads immediately on arrival, before any processing.
Supports replay for debugging without external dependencies.
"""
import json
import os
import sqlite3
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
Get data directory (works standalone or as module)
if name == "main":
DATA_DIR = Path.home() / ".icarus" / "staging"
else:
try:
from icarus.core.config.staging import DATA_DIR
except ImportError:
DATA_DIR = Path.home() / ".icarus" / "staging"
Ensure spool directory exists
SPOOL_DIR = DATA_DIR / "ingress_spool"
SPOOL_DIR.mkdir(parents=True, exist_ok=True)
DB_PATH = SPOOL_DIR / "ingress_logs.db"
def _init_db():
"""Initialize SQLite schema if not exists."""
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS ingress_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
source TEXT NOT NULL,
raw_payload TEXT NOT NULL,
processing_started_at TEXT,
processing_completed_at TEXT,
processing_result TEXT,
error_message TEXT,
replayed_from_id INTEGER,
FOREIGN KEY (replayed_from_id) REFERENCES ingress_logs(id)
)
""")
# Index for fast lookups
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_received_at
ON ingress_logs(received_at DESC)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_source
ON ingress_logs(source)
""")
conn.commit()
def log_ingress(
raw_payload: dict,
source: str = "webhook",
replayed_from_id: Optional[int] = None
) -> int:
"""Log raw payload immediately on arrival.
Args:
raw_payload: Complete raw JSON payload from webhook
source: 'webhook', 'replay', 'manual', etc.
replayed_from_id: If this is a replay, the original log ID
Returns:
log_id: The ID of the inserted record
"""
_init_db()
received_at = datetime.utcnow().isoformat()
payload_json = json.dumps(raw_payload, ensure_ascii=False)
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.execute(
"""
INSERT INTO ingress_logs
(received_at, source, raw_payload, replayed_from_id)
VALUES (?, ?, ?, ?)
""",
(received_at, source, payload_json, replayed_from_id)
)
conn.commit()
return cursor.lastrowid
def mark_processing_started(log_id: int):
"""Mark when processing begins."""
started_at = datetime.utcnow().isoformat()
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"UPDATE ingress_logs SET processing_started_at = ? WHERE id = ?",
(started_at, log_id)
)
conn.commit()
def mark_processing_completed(
log_id: int,
result: str = "success",
error_message: Optional[str] = None
):
"""Mark when processing completes."""
completed_at = datetime.utcnow().isoformat()
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"""
UPDATE ingress_logs
SET processing_completed_at = ?, processing_result = ?, error_message = ?
WHERE id = ?
""",
(completed_at, result, error_message, log_id)
)
conn.commit()
def get_log(log_id: int) -> Optional[dict]:
"""Retrieve a specific log entry by ID."""
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM ingress_logs WHERE id = ?",
(log_id,)
).fetchone()
if row:
return dict(row)
return None
def list_recent_logs(limit: int = 50, source: Optional[str] = None) -> list:
"""List recent log entries."""
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
if source:
rows = conn.execute(
"""
SELECT id, received_at, source, processing_result, error_message
FROM ingress_logs
WHERE source = ?
ORDER BY received_at DESC
LIMIT ?
""",
(source, limit)
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, received_at, source, processing_result, error_message
FROM ingress_logs
ORDER BY received_at DESC
LIMIT ?
""",
(limit,)
).fetchall()
return [dict(row) for row in rows]
def get_raw_payload(log_id: int) -> Optional[dict]:
"""Get the raw payload for replay."""
log = get_log(log_id)
if log and log.get('raw_payload'):
return json.loads(log['raw_payload'])
return None
def get_stats() -> dict:
"""Get summary statistics."""
_init_db()
with sqlite3.connect(DB_PATH) as conn:
total = conn.execute("SELECT COUNT(*) FROM ingress_logs").fetchone()[0]
today = datetime.utcnow().strftime('%Y-%m-%d')
today_count = conn.execute(
"SELECT COUNT(*) FROM ingress_logs WHERE received_at LIKE ?",
(f'{today}%',)
).fetchone()[0]
failed = conn.execute(
"SELECT COUNT(*) FROM ingress_logs WHERE processing_result = 'error'"
).fetchone()[0]
return {
"total_logs": total,
"today": today_count,
"failed": failed,
"db_path": str(DB_PATH)
}