"""Event Graph — persistent store for briefing event routing decisions.
Stores extracted event data, type classification (calendar_event / coordination / info),
routing decisions, and coordination flags for Phase 7 HBM planning.
Tables:
- event_graph: Full extraction + routing metadata for every processed document
"""
import json
import sqlite3
from datetime import datetime, timezone
from typing import Optional
from icarus.core.config.staging import DATA_DIR
DB_PATH = DATA_DIR / "icarus.db"
---------------------------------------------------------------------------
Schema
---------------------------------------------------------------------------
SCHEMA = """
CREATE TABLE IF NOT EXISTS event_graph (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id TEXT UNIQUE NOT NULL,
type TEXT NOT NULL CHECK(type IN ('calendar_event', 'coordination', 'info')),
-- Extracted data
dates TEXT, -- JSON array ["YYYY-MM-DD", ...]
times TEXT, -- JSON array ["HH:MM", ...]
assigned_to TEXT, -- JSON array ["matt", "aundrea", "sully", "harper"]
child TEXT, -- "sully" | "harper" | "both" | null
context TEXT, -- "transport" | "care_coverage" | "medical" | etc
-- Routing decision
calendar_destination TEXT, -- "radicale" | null
coordination_logged BOOLEAN DEFAULT 0,
recipients TEXT, -- JSON array of routed family members
-- Metadata
confidence REAL CHECK(confidence >= 0 AND confidence <= 1),
needs_confirmation BOOLEAN DEFAULT 0,
blocking BOOLEAN DEFAULT 0,
-- Processing state
created_at TEXT NOT NULL, -- ISO8601
processed_at TEXT, -- ISO8601
action_taken TEXT, -- "calendar_created" | "confirmed" | null
-- Raw data
raw_extracted TEXT, -- JSON full extraction
FOREIGN KEY(document_id) REFERENCES briefing_events(doc_id)
);
CREATE INDEX IF NOT EXISTS idx_event_graph_type ON event_graph(type);
CREATE INDEX IF NOT EXISTS idx_event_graph_dates ON event_graph(dates);
CREATE INDEX IF NOT EXISTS idx_event_graph_created ON event_graph(created_at);
CREATE INDEX IF NOT EXISTS idx_event_graph_assigned ON event_graph(assigned_to);
"""
def _get_connection() -> sqlite3.Connection:
DATA_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""Initialize the event_graph table. Called on startup."""
with _get_connection() as conn:
conn.executescript(SCHEMA)
---------------------------------------------------------------------------
Type Classification
---------------------------------------------------------------------------
def classify_event_type(extraction: dict) -> str:
"""Classify a briefing extraction as calendar_event, coordination, or info.
Rules:
- calendar_event: Has specific time AND location/venue
- coordination: Has dates but NO specific time
- info: No actionable dates or just reference
Args:
extraction: Raw extraction dict from briefing generator (key_details, etc.)
Returns:
One of: "calendar_event", "coordination", "info"
"""
key_details = extraction.get("key_details", {})
if not isinstance(key_details, dict):
key_details = {}
# Check for time — specific hour/minute
time_val = (
key_details.get("time")
or key_details.get("Time")
or key_details.get("start")
or key_details.get("Start")
or ""
)
has_specific_time = bool(time_val and _is_specific_time(str(time_val)))
# Check for venue/location
location_val = (
key_details.get("location")
or key_details.get("Location")
or ""
)
has_venue = bool(location_val and str(location_val).strip())
# Check for dates
date_val = (
key_details.get("date")
or key_details.get("Date")
or ""
)
has_dates = bool(date_val and str(date_val).strip())
# Check for day-of-week mentions (coordination signal)
category = extraction.get("category", "info")
suggested_actions = extraction.get("suggested_actions", [])
action_text = " ".join(suggested_actions).lower() if suggested_actions else ""
# Coordination signals: "cover", "pick up", "grab", "drop off", "swap"
coordination_signals = ["cover", "pick up", "grab", "drop off", "swap",
"can you", "need someone", "who can"]
is_coordination_intent = any(sig in action_text for sig in coordination_signals)
if has_specific_time and has_venue:
return "calendar_event"
elif has_dates and not has_specific_time:
return "coordination"
elif has_dates and is_coordination_intent:
return "coordination"
else:
return "info"
def _is_specific_time(time_str: str) -> bool:
"""Check if a time string contains a specific hour:minute, not just a range.
"2:00 PM" → True
"morning" → False
"after school" → False
"10:30" → True
"""
import re
# Match patterns like "2:00", "10:30", "14:00", "2 PM", etc.
specific_pattern = re.compile(r'\b\d{1,2}:\d{2}\b|\b\d{1,2}\s*(?:AM|PM|am|pm)\b')
return bool(specific_pattern.search(time_str))
---------------------------------------------------------------------------
EventGraphWriter
---------------------------------------------------------------------------
class EventGraphWriter:
"""Persist briefing results to the Event Graph.
Idempotent: re-processing the same document_id updates the existing row.
"""
def write(self, document_id: str, briefing: dict, extraction: dict) -> bool:
"""Write a briefing to the Event Graph.
Args:
document_id: Unique document identifier (filename_hash or similar)
briefing: The briefing card dict (title, summary, key_details, etc.)
extraction: Raw extraction dict for additional metadata
Returns:
True if written successfully
"""
event_type = classify_event_type(extraction)
# Skip info-only documents — they don't need persistence
if event_type == "info":
return True # Not an error, just nothing to persist
# Extract structured fields
key_details = extraction.get("key_details", {})
if not isinstance(key_details, dict):
key_details = {}
# Dates
date_val = key_details.get("date") or key_details.get("Date") or ""
dates = [str(date_val)] if date_val else []
# Also check for multiple dates in extraction
if extraction.get("dates"):
dates = extraction["dates"] if isinstance(extraction["dates"], list) else [extraction["dates"]]
# Times
time_val = key_details.get("time") or key_details.get("Time") or ""
times = [str(time_val)] if time_val else []
# Assigned family members
assigned_to = extraction.get("assigned_to", [])
if not assigned_to:
# Infer from action_buttons or briefing metadata
who_raw = key_details.get("who") or extraction.get("who", [])
if who_raw:
assigned_to = who_raw if isinstance(who_raw, list) else [who_raw]
# Child inference
child = self._infer_child(extraction)
# Context inference
context = self._infer_context(extraction)
# Confidence
confidence = float(extraction.get("confidence", 0.5))
# Routing decisions
calendar_destination = "radicale" if event_type == "calendar_event" else None
coordination_logged = event_type == "coordination"
# Recipients (from family inference)
recipients = assigned_to if assigned_to else []
# Coordination flags
needs_confirmation = event_type == "coordination"
blocking = event_type == "coordination" and any(
sig in str(extraction.get("suggested_actions", []))
for sig in ["cover", "care", "supervision"]
)
# Processing metadata
now = datetime.now(timezone.utc).isoformat()
conn = _get_connection()
try:
conn.execute(
"""
INSERT OR REPLACE INTO event_graph (
document_id, type, dates, times, assigned_to, child,
context, calendar_destination, coordination_logged,
recipients, confidence, needs_confirmation, blocking,
created_at, processed_at, raw_extracted
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
document_id,
event_type,
json.dumps(dates),
json.dumps(times),
json.dumps(assigned_to),
child,
context,
calendar_destination,
coordination_logged,
json.dumps(recipients),
confidence,
needs_confirmation,
blocking,
now,
now,
json.dumps(extraction, default=str),
),
)
conn.commit()
return True
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _infer_child(self, extraction: dict) -> Optional[str]:
"""Infer which child is involved from the extraction."""
text = ""
key_details = extraction.get("key_details", {})
if isinstance(key_details, dict):
text += " " + str(key_details.get("who", ""))
text += " " + extraction.get("title", "")
text += " " + extraction.get("summary", "")
text = text.lower()
has_sully = any(n in text for n in ["sully", "sullivan", "sullivan's"])
has_harper = "harper" in text
if has_sully and has_harper:
return "both"
elif has_sully:
return "sully"
elif has_harper:
return "harper"
return None
def _infer_context(self, extraction: dict) -> Optional[str]:
"""Infer the context type from the extraction."""
text = ""
key_details = extraction.get("key_details", {})
if isinstance(key_details, dict):
text += " " + str(key_details)
text += " " + extraction.get("title", "")
text += " " + extraction.get("summary", "")
actions = " ".join(extraction.get("suggested_actions", []))
text += " " + actions
text = text.lower()
context_map = {
"transport": ["pick up", "drop off", "drive", "car", "bus", "ride", "grab at", "pick-up", "drop-off"],
"care_coverage": ["cover", "watch", "stay with", "babysit", "care", "supervision"],
"medical": ["doctor", "dentist", "appointment", "checkup", "clinic", "hospital", "physical"],
"school": ["school", "class", "teacher", "field trip", "permission", "grade", "conference"],
"activity": ["practice", "game", "lesson", "recital", "tournament", "rehearsal", "club"],
}
for ctx, keywords in context_map.items():
if any(kw in text for kw in keywords):
return ctx
return None
---------------------------------------------------------------------------
Query helpers
---------------------------------------------------------------------------
def get_recent_events(days: int = 7, limit: int = 50) -> list[dict]:
"""Return recent Event Graph entries for dashboard display.
Args:
days: How many days back to look
limit: Maximum rows to return
Returns:
List of event dicts
"""
conn = _get_connection()
try:
rows = conn.execute(
"""
SELECT * FROM event_graph
WHERE created_at > datetime('now', ?)
ORDER BY created_at DESC
LIMIT ?
""",
(f"-{days} days", limit),
).fetchall()
results = []
for row in rows:
r = dict(row)
# Parse JSON fields
for field in ("dates", "times", "assigned_to", "recipients", "raw_extracted"):
if r.get(field):
try:
r[field] = json.loads(r[field])
except (json.JSONDecodeError, TypeError):
pass
results.append(r)
return results
finally:
conn.close()
def get_coordination_items() -> list[dict]:
"""Return coordination items that need confirmation (for HBM).
Returns items where needs_confirmation=1, ordered by blocking first.
"""
conn = _get_connection()
try:
rows = conn.execute(
"""
SELECT * FROM event_graph
WHERE type = 'coordination' AND needs_confirmation = 1
ORDER BY blocking DESC, created_at DESC
"""
).fetchall()
results = []
for row in rows:
r = dict(row)
for field in ("dates", "times", "assigned_to", "recipients", "raw_extracted"):
if r.get(field):
try:
r[field] = json.loads(r[field])
except (json.JSONDecodeError, TypeError):
pass
results.append(r)
return results
finally:
conn.close()
def get_stats() -> dict:
"""Return aggregate Event Graph statistics."""
conn = _get_connection()
try:
total = conn.execute("SELECT COUNT() FROM event_graph").fetchone()[0]
by_type = {}
for row in conn.execute("SELECT type, COUNT() as cnt FROM event_graph GROUP BY type"):
by_type[row["type"]] = row["cnt"]
coordination_open = conn.execute(
"SELECT COUNT(*) FROM event_graph WHERE type='coordination' AND needs_confirmation=1"
).fetchone()[0]
blocking_count = conn.execute(
"SELECT COUNT(*) FROM event_graph WHERE blocking=1"
).fetchone()[0]
return {
"total": total,
"by_type": by_type,
"coordination_open": coordination_open,
"blocking": blocking_count,
}
finally:
conn.close()