📄 ingress_spooler.py 5,892 bytes Apr 26, 2026 📋 Raw

"""Ingress Spooler — SQLite logging layer for Icarus webhook payloads.

Writes raw payloads immediately on arrival, before any processing.
Supports replay for debugging without external dependencies.
"""

import json
import os
import sqlite3
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional

Get data directory (works standalone or as module)

if name == "main":
DATA_DIR = Path.home() / ".icarus" / "staging"
else:
try:
from icarus.core.config.staging import DATA_DIR
except ImportError:
DATA_DIR = Path.home() / ".icarus" / "staging"

Ensure spool directory exists

SPOOL_DIR = DATA_DIR / "ingress_spool"
SPOOL_DIR.mkdir(parents=True, exist_ok=True)

DB_PATH = SPOOL_DIR / "ingress_logs.db"

def _init_db():
"""Initialize SQLite schema if not exists."""
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS ingress_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
source TEXT NOT NULL,
raw_payload TEXT NOT NULL,
processing_started_at TEXT,
processing_completed_at TEXT,
processing_result TEXT,
error_message TEXT,
replayed_from_id INTEGER,
FOREIGN KEY (replayed_from_id) REFERENCES ingress_logs(id)
)
""")

    # Index for fast lookups
    conn.execute("""
        CREATE INDEX IF NOT EXISTS idx_received_at 
        ON ingress_logs(received_at DESC)
    """)
    conn.execute("""
        CREATE INDEX IF NOT EXISTS idx_source 
        ON ingress_logs(source)
    """)
    conn.commit()

def log_ingress(
raw_payload: dict,
source: str = "webhook",
replayed_from_id: Optional[int] = None
) -> int:
"""Log raw payload immediately on arrival.

Args:
    raw_payload: Complete raw JSON payload from webhook
    source: 'webhook', 'replay', 'manual', etc.
    replayed_from_id: If this is a replay, the original log ID

Returns:
    log_id: The ID of the inserted record
"""
_init_db()

received_at = datetime.utcnow().isoformat()
payload_json = json.dumps(raw_payload, ensure_ascii=False)

with sqlite3.connect(DB_PATH) as conn:
    cursor = conn.execute(
        """
        INSERT INTO ingress_logs 
        (received_at, source, raw_payload, replayed_from_id)
        VALUES (?, ?, ?, ?)
        """,
        (received_at, source, payload_json, replayed_from_id)
    )
    conn.commit()
    return cursor.lastrowid

def mark_processing_started(log_id: int):
"""Mark when processing begins."""
started_at = datetime.utcnow().isoformat()
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"UPDATE ingress_logs SET processing_started_at = ? WHERE id = ?",
(started_at, log_id)
)
conn.commit()

def mark_processing_completed(
log_id: int,
result: str = "success",
error_message: Optional[str] = None
):
"""Mark when processing completes."""
completed_at = datetime.utcnow().isoformat()
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"""
UPDATE ingress_logs
SET processing_completed_at = ?, processing_result = ?, error_message = ?
WHERE id = ?
""",
(completed_at, result, error_message, log_id)
)
conn.commit()

def get_log(log_id: int) -> Optional[dict]:
"""Retrieve a specific log entry by ID."""
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM ingress_logs WHERE id = ?",
(log_id,)
).fetchone()

    if row:
        return dict(row)
    return None

def list_recent_logs(limit: int = 50, source: Optional[str] = None) -> list:
"""List recent log entries."""
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row

    if source:
        rows = conn.execute(
            """
            SELECT id, received_at, source, processing_result, error_message
            FROM ingress_logs 
            WHERE source = ?
            ORDER BY received_at DESC
            LIMIT ?
            """,
            (source, limit)
        ).fetchall()
    else:
        rows = conn.execute(
            """
            SELECT id, received_at, source, processing_result, error_message
            FROM ingress_logs 
            ORDER BY received_at DESC
            LIMIT ?
            """,
            (limit,)
        ).fetchall()

    return [dict(row) for row in rows]

def get_raw_payload(log_id: int) -> Optional[dict]:
"""Get the raw payload for replay."""
log = get_log(log_id)
if log and log.get('raw_payload'):
return json.loads(log['raw_payload'])
return None

def get_stats() -> dict:
"""Get summary statistics."""
_init_db()
with sqlite3.connect(DB_PATH) as conn:
total = conn.execute("SELECT COUNT(*) FROM ingress_logs").fetchone()[0]

    today = datetime.utcnow().strftime('%Y-%m-%d')
    today_count = conn.execute(
        "SELECT COUNT(*) FROM ingress_logs WHERE received_at LIKE ?",
        (f'{today}%',)
    ).fetchone()[0]

    failed = conn.execute(
        "SELECT COUNT(*) FROM ingress_logs WHERE processing_result = 'error'"
    ).fetchone()[0]

    return {
        "total_logs": total,
        "today": today_count,
        "failed": failed,
        "db_path": str(DB_PATH)
    }