"""Event Graph — persistent store for briefing event routing decisions. Stores extracted event data, type classification (calendar_event / coordination / info), routing decisions, and coordination flags for Phase 7 HBM planning. Tables: - event_graph: Full extraction + routing metadata for every processed document """ import json import sqlite3 from datetime import datetime, timezone from typing import Optional from icarus.core.config.staging import DATA_DIR DB_PATH = DATA_DIR / "icarus.db" # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- SCHEMA = """ CREATE TABLE IF NOT EXISTS event_graph ( id INTEGER PRIMARY KEY AUTOINCREMENT, document_id TEXT UNIQUE NOT NULL, type TEXT NOT NULL CHECK(type IN ('calendar_event', 'coordination', 'info')), -- Extracted data dates TEXT, -- JSON array ["YYYY-MM-DD", ...] times TEXT, -- JSON array ["HH:MM", ...] assigned_to TEXT, -- JSON array ["matt", "aundrea", "sully", "harper"] child TEXT, -- "sully" | "harper" | "both" | null context TEXT, -- "transport" | "care_coverage" | "medical" | etc -- Routing decision calendar_destination TEXT, -- "radicale" | null coordination_logged BOOLEAN DEFAULT 0, recipients TEXT, -- JSON array of routed family members -- Metadata confidence REAL CHECK(confidence >= 0 AND confidence <= 1), needs_confirmation BOOLEAN DEFAULT 0, blocking BOOLEAN DEFAULT 0, -- Processing state created_at TEXT NOT NULL, -- ISO8601 processed_at TEXT, -- ISO8601 action_taken TEXT, -- "calendar_created" | "confirmed" | null -- Raw data raw_extracted TEXT, -- JSON full extraction FOREIGN KEY(document_id) REFERENCES briefing_events(doc_id) ); CREATE INDEX IF NOT EXISTS idx_event_graph_type ON event_graph(type); CREATE INDEX IF NOT EXISTS idx_event_graph_dates ON event_graph(dates); CREATE INDEX IF NOT EXISTS idx_event_graph_created ON event_graph(created_at); CREATE INDEX IF NOT EXISTS idx_event_graph_assigned ON event_graph(assigned_to); """ def _get_connection() -> sqlite3.Connection: DATA_DIR.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row return conn def init_db(): """Initialize the event_graph table. Called on startup.""" with _get_connection() as conn: conn.executescript(SCHEMA) # --------------------------------------------------------------------------- # Type Classification # --------------------------------------------------------------------------- def classify_event_type(extraction: dict) -> str: """Classify a briefing extraction as calendar_event, coordination, or info. Rules: - calendar_event: Has specific time AND location/venue - coordination: Has dates but NO specific time - info: No actionable dates or just reference Args: extraction: Raw extraction dict from briefing generator (key_details, etc.) Returns: One of: "calendar_event", "coordination", "info" """ key_details = extraction.get("key_details", {}) if not isinstance(key_details, dict): key_details = {} # Check for time — specific hour/minute time_val = ( key_details.get("time") or key_details.get("Time") or key_details.get("start") or key_details.get("Start") or "" ) has_specific_time = bool(time_val and _is_specific_time(str(time_val))) # Check for venue/location location_val = ( key_details.get("location") or key_details.get("Location") or "" ) has_venue = bool(location_val and str(location_val).strip()) # Check for dates date_val = ( key_details.get("date") or key_details.get("Date") or "" ) has_dates = bool(date_val and str(date_val).strip()) # Check for day-of-week mentions (coordination signal) category = extraction.get("category", "info") suggested_actions = extraction.get("suggested_actions", []) action_text = " ".join(suggested_actions).lower() if suggested_actions else "" # Coordination signals: "cover", "pick up", "grab", "drop off", "swap" coordination_signals = ["cover", "pick up", "grab", "drop off", "swap", "can you", "need someone", "who can"] is_coordination_intent = any(sig in action_text for sig in coordination_signals) if has_specific_time and has_venue: return "calendar_event" elif has_dates and not has_specific_time: return "coordination" elif has_dates and is_coordination_intent: return "coordination" else: return "info" def _is_specific_time(time_str: str) -> bool: """Check if a time string contains a specific hour:minute, not just a range. "2:00 PM" → True "morning" → False "after school" → False "10:30" → True """ import re # Match patterns like "2:00", "10:30", "14:00", "2 PM", etc. specific_pattern = re.compile(r'\b\d{1,2}:\d{2}\b|\b\d{1,2}\s*(?:AM|PM|am|pm)\b') return bool(specific_pattern.search(time_str)) # --------------------------------------------------------------------------- # EventGraphWriter # --------------------------------------------------------------------------- class EventGraphWriter: """Persist briefing results to the Event Graph. Idempotent: re-processing the same document_id updates the existing row. """ def write(self, document_id: str, briefing: dict, extraction: dict) -> bool: """Write a briefing to the Event Graph. Args: document_id: Unique document identifier (filename_hash or similar) briefing: The briefing card dict (title, summary, key_details, etc.) extraction: Raw extraction dict for additional metadata Returns: True if written successfully """ event_type = classify_event_type(extraction) # Skip info-only documents — they don't need persistence if event_type == "info": return True # Not an error, just nothing to persist # Extract structured fields key_details = extraction.get("key_details", {}) if not isinstance(key_details, dict): key_details = {} # Dates date_val = key_details.get("date") or key_details.get("Date") or "" dates = [str(date_val)] if date_val else [] # Also check for multiple dates in extraction if extraction.get("dates"): dates = extraction["dates"] if isinstance(extraction["dates"], list) else [extraction["dates"]] # Times time_val = key_details.get("time") or key_details.get("Time") or "" times = [str(time_val)] if time_val else [] # Assigned family members assigned_to = extraction.get("assigned_to", []) if not assigned_to: # Infer from action_buttons or briefing metadata who_raw = key_details.get("who") or extraction.get("who", []) if who_raw: assigned_to = who_raw if isinstance(who_raw, list) else [who_raw] # Child inference child = self._infer_child(extraction) # Context inference context = self._infer_context(extraction) # Confidence confidence = float(extraction.get("confidence", 0.5)) # Routing decisions calendar_destination = "radicale" if event_type == "calendar_event" else None coordination_logged = event_type == "coordination" # Recipients (from family inference) recipients = assigned_to if assigned_to else [] # Coordination flags needs_confirmation = event_type == "coordination" blocking = event_type == "coordination" and any( sig in str(extraction.get("suggested_actions", [])) for sig in ["cover", "care", "supervision"] ) # Processing metadata now = datetime.now(timezone.utc).isoformat() conn = _get_connection() try: conn.execute( """ INSERT OR REPLACE INTO event_graph ( document_id, type, dates, times, assigned_to, child, context, calendar_destination, coordination_logged, recipients, confidence, needs_confirmation, blocking, created_at, processed_at, raw_extracted ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( document_id, event_type, json.dumps(dates), json.dumps(times), json.dumps(assigned_to), child, context, calendar_destination, coordination_logged, json.dumps(recipients), confidence, needs_confirmation, blocking, now, now, json.dumps(extraction, default=str), ), ) conn.commit() return True except Exception: conn.rollback() raise finally: conn.close() def _infer_child(self, extraction: dict) -> Optional[str]: """Infer which child is involved from the extraction.""" text = "" key_details = extraction.get("key_details", {}) if isinstance(key_details, dict): text += " " + str(key_details.get("who", "")) text += " " + extraction.get("title", "") text += " " + extraction.get("summary", "") text = text.lower() has_sully = any(n in text for n in ["sully", "sullivan", "sullivan's"]) has_harper = "harper" in text if has_sully and has_harper: return "both" elif has_sully: return "sully" elif has_harper: return "harper" return None def _infer_context(self, extraction: dict) -> Optional[str]: """Infer the context type from the extraction.""" text = "" key_details = extraction.get("key_details", {}) if isinstance(key_details, dict): text += " " + str(key_details) text += " " + extraction.get("title", "") text += " " + extraction.get("summary", "") actions = " ".join(extraction.get("suggested_actions", [])) text += " " + actions text = text.lower() context_map = { "transport": ["pick up", "drop off", "drive", "car", "bus", "ride", "grab at", "pick-up", "drop-off"], "care_coverage": ["cover", "watch", "stay with", "babysit", "care", "supervision"], "medical": ["doctor", "dentist", "appointment", "checkup", "clinic", "hospital", "physical"], "school": ["school", "class", "teacher", "field trip", "permission", "grade", "conference"], "activity": ["practice", "game", "lesson", "recital", "tournament", "rehearsal", "club"], } for ctx, keywords in context_map.items(): if any(kw in text for kw in keywords): return ctx return None # --------------------------------------------------------------------------- # Query helpers # --------------------------------------------------------------------------- def get_recent_events(days: int = 7, limit: int = 50) -> list[dict]: """Return recent Event Graph entries for dashboard display. Args: days: How many days back to look limit: Maximum rows to return Returns: List of event dicts """ conn = _get_connection() try: rows = conn.execute( """ SELECT * FROM event_graph WHERE created_at > datetime('now', ?) ORDER BY created_at DESC LIMIT ? """, (f"-{days} days", limit), ).fetchall() results = [] for row in rows: r = dict(row) # Parse JSON fields for field in ("dates", "times", "assigned_to", "recipients", "raw_extracted"): if r.get(field): try: r[field] = json.loads(r[field]) except (json.JSONDecodeError, TypeError): pass results.append(r) return results finally: conn.close() def get_coordination_items() -> list[dict]: """Return coordination items that need confirmation (for HBM). Returns items where needs_confirmation=1, ordered by blocking first. """ conn = _get_connection() try: rows = conn.execute( """ SELECT * FROM event_graph WHERE type = 'coordination' AND needs_confirmation = 1 ORDER BY blocking DESC, created_at DESC """ ).fetchall() results = [] for row in rows: r = dict(row) for field in ("dates", "times", "assigned_to", "recipients", "raw_extracted"): if r.get(field): try: r[field] = json.loads(r[field]) except (json.JSONDecodeError, TypeError): pass results.append(r) return results finally: conn.close() def get_stats() -> dict: """Return aggregate Event Graph statistics.""" conn = _get_connection() try: total = conn.execute("SELECT COUNT(*) FROM event_graph").fetchone()[0] by_type = {} for row in conn.execute("SELECT type, COUNT(*) as cnt FROM event_graph GROUP BY type"): by_type[row["type"]] = row["cnt"] coordination_open = conn.execute( "SELECT COUNT(*) FROM event_graph WHERE type='coordination' AND needs_confirmation=1" ).fetchone()[0] blocking_count = conn.execute( "SELECT COUNT(*) FROM event_graph WHERE blocking=1" ).fetchone()[0] return { "total": total, "by_type": by_type, "coordination_open": coordination_open, "blocking": blocking_count, } finally: conn.close()