"""Event Graph — canonical family state store. Persistent store for extracted events, coordination items, and info. Single source of truth for the family dashboard and HBM. Tables: - events: All extracted events (calendar, coordination, info) - extractions: Raw extraction results for debugging """ import json import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Optional from icarus.config import get_data_dir # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- SCHEMA = """ CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, source_id TEXT UNIQUE, -- dedup key: hash of source message source TEXT NOT NULL DEFAULT 'telegram', -- telegram | email type TEXT NOT NULL CHECK(type IN ('calendar_event', 'coordination', 'info')), summary TEXT NOT NULL, dates TEXT, -- JSON array ["YYYY-MM-DD", ...] times TEXT, -- JSON array ["HH:MM", ...] people TEXT, -- JSON array of member ids context TEXT DEFAULT 'other', -- transport | medical | school | social | care_coverage | other location TEXT, action_needed TEXT, calendar_uid TEXT, -- Radicale UID if pushed to calendar confidence REAL DEFAULT 0.0, needs_confirmation INTEGER DEFAULT 0, created_at TEXT NOT NULL, source_text TEXT, -- raw message text (truncated) raw_extraction TEXT -- JSON ); CREATE TABLE IF NOT EXISTS extractions ( id INTEGER PRIMARY KEY AUTOINCREMENT, source_id TEXT NOT NULL, tripwire_score REAL, tripwire_patterns TEXT, -- JSON array llm_output TEXT, extraction_result TEXT, -- JSON duration_ms INTEGER, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_events_type ON events(type); CREATE INDEX IF NOT EXISTS idx_events_dates ON events(dates); CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at); CREATE INDEX IF NOT EXISTS idx_events_source_id ON events(source_id); """ # --------------------------------------------------------------------------- # Database helpers # --------------------------------------------------------------------------- def _ensure_db(): """Ensure DB path exists and schema is created. Returns connection.""" data_dir = get_data_dir() data_dir.mkdir(parents=True, exist_ok=True) db_path = data_dir / "icarus.db" conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row conn.executescript(SCHEMA) conn.commit() return conn # --------------------------------------------------------------------------- # Event operations # --------------------------------------------------------------------------- def _make_source_id(source: str, text: str) -> str: """Generate a deterministic source_id for dedup.""" import hashlib return hashlib.sha256(f"{source}:{text}".encode()).hexdigest()[:32] def insert_event( source: str, source_text: str, event_type: str, summary: str, dates: Optional[list[str]] = None, times: Optional[list[str]] = None, people: Optional[list[str]] = None, context: str = "other", location: Optional[str] = None, action_needed: Optional[str] = None, confidence: float = 0.0, needs_confirmation: bool = False, raw_extraction: Optional[dict] = None, source_id: Optional[str] = None, ) -> str: """Insert an event into the graph. Returns source_id.""" sid = source_id or _make_source_id(source, source_text) now = datetime.now(timezone.utc).isoformat() with _ensure_db() as conn: try: conn.execute( """INSERT INTO events ( source_id, source, type, summary, dates, times, people, context, location, action_needed, confidence, needs_confirmation, created_at, source_text, raw_extraction ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( sid, source, event_type, summary, json.dumps(dates or []), json.dumps(times or []), json.dumps(people or []), context, location, action_needed, confidence, int(needs_confirmation), now, source_text[:1000], json.dumps(raw_extraction) if raw_extraction else None, ), ) conn.commit() except sqlite3.IntegrityError: pass # Duplicate source_id — already processed return sid def insert_extraction_log( source_id: str, tripwire_score: float, tripwire_patterns: list[str], llm_output: str, extraction_result: dict, duration_ms: int, ): """Log a tripwire + extraction event for debugging/analytics.""" now = datetime.now(timezone.utc).isoformat() with _ensure_db() as conn: conn.execute( """INSERT INTO extractions ( source_id, tripwire_score, tripwire_patterns, llm_output, extraction_result, duration_ms, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?)""", ( source_id, tripwire_score, json.dumps(tripwire_patterns), llm_output[:2000], json.dumps(extraction_result), duration_ms, now, ), ) conn.commit() # --------------------------------------------------------------------------- # Queries # --------------------------------------------------------------------------- def get_recent_events(days: int = 7, limit: int = 50) -> list[dict]: """Get recent events, ordered by creation time descending.""" with _ensure_db() as conn: rows = conn.execute( """SELECT * FROM events WHERE created_at >= datetime('now', ? || ' days', 'utc') ORDER BY created_at DESC LIMIT ?""", (f"-{days}", limit), ).fetchall() return [dict(r) for r in rows] def get_coordination_items() -> list[dict]: """Get outstanding coordination items (needs attention).""" with _ensure_db() as conn: rows = conn.execute( """SELECT * FROM events WHERE type = 'coordination' AND action_needed IS NOT NULL AND calendar_uid IS NULL ORDER BY created_at DESC LIMIT 20""" ).fetchall() return [dict(r) for r in rows] def get_stats() -> dict: """Get aggregate stats for dashboard.""" with _ensure_db() as conn: total = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] by_type = conn.execute( "SELECT type, COUNT(*) as cnt FROM events GROUP BY type" ).fetchall() recent_24h = conn.execute( "SELECT COUNT(*) FROM events WHERE created_at >= datetime('now', '-1 day', 'utc')" ).fetchone()[0] extractions_24h = conn.execute( "SELECT COUNT(*) FROM extractions WHERE created_at >= datetime('now', '-1 day', 'utc')" ).fetchone()[0] return { "total_events": total, "by_type": {r["type"]: r["cnt"] for r in by_type}, "events_24h": recent_24h, "extractions_24h": extractions_24h, }