📄 event_graph.py 7,544 bytes Yesterday 13:53 📋 Raw

"""Event Graph — canonical family state store.

Persistent store for extracted events, coordination items, and info.
Single source of truth for the family dashboard and HBM.

Tables:
- events: All extracted events (calendar, coordination, info)
- extractions: Raw extraction results for debugging
"""

import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

from icarus.config import get_data_dir

---------------------------------------------------------------------------

Schema

---------------------------------------------------------------------------

SCHEMA = """
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id TEXT UNIQUE, -- dedup key: hash of source message
source TEXT NOT NULL DEFAULT 'telegram', -- telegram | email
type TEXT NOT NULL CHECK(type IN ('calendar_event', 'coordination', 'info')),

summary TEXT NOT NULL,
dates TEXT,                      -- JSON array ["YYYY-MM-DD", ...]
times TEXT,                      -- JSON array ["HH:MM", ...]
people TEXT,                     -- JSON array of member ids
context TEXT DEFAULT 'other',    -- transport | medical | school | social | care_coverage | other
location TEXT,
action_needed TEXT,

calendar_uid TEXT,               -- Radicale UID if pushed to calendar
confidence REAL DEFAULT 0.0,
needs_confirmation INTEGER DEFAULT 0,

created_at TEXT NOT NULL,
source_text TEXT,                -- raw message text (truncated)
raw_extraction TEXT              -- JSON

);

CREATE TABLE IF NOT EXISTS extractions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id TEXT NOT NULL,
tripwire_score REAL,
tripwire_patterns TEXT, -- JSON array
llm_output TEXT,
extraction_result TEXT, -- JSON
duration_ms INTEGER,
created_at TEXT NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_events_type ON events(type);
CREATE INDEX IF NOT EXISTS idx_events_dates ON events(dates);
CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at);
CREATE INDEX IF NOT EXISTS idx_events_source_id ON events(source_id);
"""

---------------------------------------------------------------------------

Database helpers

---------------------------------------------------------------------------

def _ensure_db():
"""Ensure DB path exists and schema is created. Returns connection."""
data_dir = get_data_dir()
data_dir.mkdir(parents=True, exist_ok=True)
db_path = data_dir / "icarus.db"
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.commit()
return conn

---------------------------------------------------------------------------

Event operations

---------------------------------------------------------------------------

def _make_source_id(source: str, text: str) -> str:
"""Generate a deterministic source_id for dedup."""
import hashlib
return hashlib.sha256(f"{source}:{text}".encode()).hexdigest()[:32]

def insert_event(
source: str,
source_text: str,
event_type: str,
summary: str,
dates: Optional[list[str]] = None,
times: Optional[list[str]] = None,
people: Optional[list[str]] = None,
context: str = "other",
location: Optional[str] = None,
action_needed: Optional[str] = None,
confidence: float = 0.0,
needs_confirmation: bool = False,
raw_extraction: Optional[dict] = None,
source_id: Optional[str] = None,
) -> str:
"""Insert an event into the graph. Returns source_id."""
sid = source_id or _make_source_id(source, source_text)
now = datetime.now(timezone.utc).isoformat()

with _ensure_db() as conn:
    try:
        conn.execute(
            """INSERT INTO events (
                source_id, source, type, summary,
                dates, times, people, context, location, action_needed,
                confidence, needs_confirmation,
                created_at, source_text, raw_extraction
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                sid, source, event_type, summary,
                json.dumps(dates or []),
                json.dumps(times or []),
                json.dumps(people or []),
                context, location, action_needed,
                confidence, int(needs_confirmation),
                now, source_text[:1000],
                json.dumps(raw_extraction) if raw_extraction else None,
            ),
        )
        conn.commit()
    except sqlite3.IntegrityError:
        pass  # Duplicate source_id  already processed

return sid

def insert_extraction_log(
source_id: str,
tripwire_score: float,
tripwire_patterns: list[str],
llm_output: str,
extraction_result: dict,
duration_ms: int,
):
"""Log a tripwire + extraction event for debugging/analytics."""
now = datetime.now(timezone.utc).isoformat()
with _ensure_db() as conn:
conn.execute(
"""INSERT INTO extractions (
source_id, tripwire_score, tripwire_patterns,
llm_output, extraction_result, duration_ms, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
source_id,
tripwire_score,
json.dumps(tripwire_patterns),
llm_output[:2000],
json.dumps(extraction_result),
duration_ms,
now,
),
)
conn.commit()

---------------------------------------------------------------------------

Queries

---------------------------------------------------------------------------

def get_recent_events(days: int = 7, limit: int = 50) -> list[dict]:
"""Get recent events, ordered by creation time descending."""
with _ensure_db() as conn:
rows = conn.execute(
"""SELECT * FROM events
WHERE created_at >= datetime('now', ? || ' days', 'utc')
ORDER BY created_at DESC LIMIT ?""",
(f"-{days}", limit),
).fetchall()

return [dict(r) for r in rows]

def get_coordination_items() -> list[dict]:
"""Get outstanding coordination items (needs attention)."""
with _ensure_db() as conn:
rows = conn.execute(
"""SELECT * FROM events
WHERE type = 'coordination'
AND action_needed IS NOT NULL
AND calendar_uid IS NULL
ORDER BY created_at DESC
LIMIT 20"""
).fetchall()
return [dict(r) for r in rows]

def get_stats() -> dict:
"""Get aggregate stats for dashboard."""
with _ensure_db() as conn:
total = conn.execute("SELECT COUNT() FROM events").fetchone()[0]
by_type = conn.execute(
"SELECT type, COUNT(
) as cnt FROM events GROUP BY type"
).fetchall()
recent_24h = conn.execute(
"SELECT COUNT() FROM events WHERE created_at >= datetime('now', '-1 day', 'utc')"
).fetchone()[0]
extractions_24h = conn.execute(
"SELECT COUNT(
) FROM extractions WHERE created_at >= datetime('now', '-1 day', 'utc')"
).fetchone()[0]

return {
    "total_events": total,
    "by_type": {r["type"]: r["cnt"] for r in by_type},
    "events_24h": recent_24h,
    "extractions_24h": extractions_24h,
}