"""Event Graph — canonical family state store.
Persistent store for extracted events, coordination items, and info.
Single source of truth for the family dashboard and HBM.
Tables:
- events: All extracted events (calendar, coordination, info)
- extractions: Raw extraction results for debugging
"""
import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from icarus.config import get_data_dir
---------------------------------------------------------------------------
Schema
---------------------------------------------------------------------------
SCHEMA = """
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id TEXT UNIQUE, -- dedup key: hash of source message
source TEXT NOT NULL DEFAULT 'telegram', -- telegram | email
type TEXT NOT NULL CHECK(type IN ('calendar_event', 'coordination', 'info')),
summary TEXT NOT NULL,
dates TEXT, -- JSON array ["YYYY-MM-DD", ...]
times TEXT, -- JSON array ["HH:MM", ...]
people TEXT, -- JSON array of member ids
context TEXT DEFAULT 'other', -- transport | medical | school | social | care_coverage | other
location TEXT,
action_needed TEXT,
calendar_uid TEXT, -- Radicale UID if pushed to calendar
confidence REAL DEFAULT 0.0,
needs_confirmation INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
source_text TEXT, -- raw message text (truncated)
raw_extraction TEXT -- JSON
);
CREATE TABLE IF NOT EXISTS extractions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id TEXT NOT NULL,
tripwire_score REAL,
tripwire_patterns TEXT, -- JSON array
llm_output TEXT,
extraction_result TEXT, -- JSON
duration_ms INTEGER,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_type ON events(type);
CREATE INDEX IF NOT EXISTS idx_events_dates ON events(dates);
CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at);
CREATE INDEX IF NOT EXISTS idx_events_source_id ON events(source_id);
"""
---------------------------------------------------------------------------
Database helpers
---------------------------------------------------------------------------
def _ensure_db():
"""Ensure DB path exists and schema is created. Returns connection."""
data_dir = get_data_dir()
data_dir.mkdir(parents=True, exist_ok=True)
db_path = data_dir / "icarus.db"
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.commit()
return conn
---------------------------------------------------------------------------
Event operations
---------------------------------------------------------------------------
def _make_source_id(source: str, text: str) -> str:
"""Generate a deterministic source_id for dedup."""
import hashlib
return hashlib.sha256(f"{source}:{text}".encode()).hexdigest()[:32]
def insert_event(
source: str,
source_text: str,
event_type: str,
summary: str,
dates: Optional[list[str]] = None,
times: Optional[list[str]] = None,
people: Optional[list[str]] = None,
context: str = "other",
location: Optional[str] = None,
action_needed: Optional[str] = None,
confidence: float = 0.0,
needs_confirmation: bool = False,
raw_extraction: Optional[dict] = None,
source_id: Optional[str] = None,
) -> str:
"""Insert an event into the graph. Returns source_id."""
sid = source_id or _make_source_id(source, source_text)
now = datetime.now(timezone.utc).isoformat()
with _ensure_db() as conn:
try:
conn.execute(
"""INSERT INTO events (
source_id, source, type, summary,
dates, times, people, context, location, action_needed,
confidence, needs_confirmation,
created_at, source_text, raw_extraction
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
sid, source, event_type, summary,
json.dumps(dates or []),
json.dumps(times or []),
json.dumps(people or []),
context, location, action_needed,
confidence, int(needs_confirmation),
now, source_text[:1000],
json.dumps(raw_extraction) if raw_extraction else None,
),
)
conn.commit()
except sqlite3.IntegrityError:
pass # Duplicate source_id — already processed
return sid
def insert_extraction_log(
source_id: str,
tripwire_score: float,
tripwire_patterns: list[str],
llm_output: str,
extraction_result: dict,
duration_ms: int,
):
"""Log a tripwire + extraction event for debugging/analytics."""
now = datetime.now(timezone.utc).isoformat()
with _ensure_db() as conn:
conn.execute(
"""INSERT INTO extractions (
source_id, tripwire_score, tripwire_patterns,
llm_output, extraction_result, duration_ms, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
source_id,
tripwire_score,
json.dumps(tripwire_patterns),
llm_output[:2000],
json.dumps(extraction_result),
duration_ms,
now,
),
)
conn.commit()
---------------------------------------------------------------------------
Queries
---------------------------------------------------------------------------
def get_recent_events(days: int = 7, limit: int = 50) -> list[dict]:
"""Get recent events, ordered by creation time descending."""
with _ensure_db() as conn:
rows = conn.execute(
"""SELECT * FROM events
WHERE created_at >= datetime('now', ? || ' days', 'utc')
ORDER BY created_at DESC LIMIT ?""",
(f"-{days}", limit),
).fetchall()
return [dict(r) for r in rows]
def get_coordination_items() -> list[dict]:
"""Get outstanding coordination items (needs attention)."""
with _ensure_db() as conn:
rows = conn.execute(
"""SELECT * FROM events
WHERE type = 'coordination'
AND action_needed IS NOT NULL
AND calendar_uid IS NULL
ORDER BY created_at DESC
LIMIT 20"""
).fetchall()
return [dict(r) for r in rows]
def get_stats() -> dict:
"""Get aggregate stats for dashboard."""
with _ensure_db() as conn:
total = conn.execute("SELECT COUNT() FROM events").fetchone()[0]
by_type = conn.execute(
"SELECT type, COUNT() as cnt FROM events GROUP BY type"
).fetchall()
recent_24h = conn.execute(
"SELECT COUNT() FROM events WHERE created_at >= datetime('now', '-1 day', 'utc')"
).fetchone()[0]
extractions_24h = conn.execute(
"SELECT COUNT() FROM extractions WHERE created_at >= datetime('now', '-1 day', 'utc')"
).fetchone()[0]
return {
"total_events": total,
"by_type": {r["type"]: r["cnt"] for r in by_type},
"events_24h": recent_24h,
"extractions_24h": extractions_24h,
}