📄 event_graph.py 14,550 bytes Apr 28, 2026 📋 Raw

"""Event Graph — persistent store for briefing event routing decisions.

Stores extracted event data, type classification (calendar_event / coordination / info),
routing decisions, and coordination flags for Phase 7 HBM planning.

Tables:
- event_graph: Full extraction + routing metadata for every processed document
"""

import json
import sqlite3
from datetime import datetime, timezone
from typing import Optional

from icarus.core.config.staging import DATA_DIR

DB_PATH = DATA_DIR / "icarus.db"

---------------------------------------------------------------------------

Schema

---------------------------------------------------------------------------

SCHEMA = """
CREATE TABLE IF NOT EXISTS event_graph (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id TEXT UNIQUE NOT NULL,
type TEXT NOT NULL CHECK(type IN ('calendar_event', 'coordination', 'info')),

-- Extracted data
dates TEXT,                -- JSON array ["YYYY-MM-DD", ...]
times TEXT,                -- JSON array ["HH:MM", ...]
assigned_to TEXT,           -- JSON array ["matt", "aundrea", "sully", "harper"]
child TEXT,                -- "sully" | "harper" | "both" | null
context TEXT,              -- "transport" | "care_coverage" | "medical" | etc

-- Routing decision
calendar_destination TEXT,  -- "radicale" | null
coordination_logged BOOLEAN DEFAULT 0,
recipients TEXT,           -- JSON array of routed family members

-- Metadata
confidence REAL CHECK(confidence >= 0 AND confidence <= 1),
needs_confirmation BOOLEAN DEFAULT 0,
blocking BOOLEAN DEFAULT 0,

-- Processing state
created_at TEXT NOT NULL,   -- ISO8601
processed_at TEXT,          -- ISO8601
action_taken TEXT,         -- "calendar_created" | "confirmed" | null

-- Raw data
raw_extracted TEXT,        -- JSON full extraction

FOREIGN KEY(document_id) REFERENCES briefing_events(doc_id)

);

CREATE INDEX IF NOT EXISTS idx_event_graph_type ON event_graph(type);
CREATE INDEX IF NOT EXISTS idx_event_graph_dates ON event_graph(dates);
CREATE INDEX IF NOT EXISTS idx_event_graph_created ON event_graph(created_at);
CREATE INDEX IF NOT EXISTS idx_event_graph_assigned ON event_graph(assigned_to);
"""

def _get_connection() -> sqlite3.Connection:
DATA_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn

def init_db():
"""Initialize the event_graph table. Called on startup."""
with _get_connection() as conn:
conn.executescript(SCHEMA)

---------------------------------------------------------------------------

Type Classification

---------------------------------------------------------------------------

def classify_event_type(extraction: dict) -> str:
"""Classify a briefing extraction as calendar_event, coordination, or info.

Rules:
- calendar_event: Has specific time AND location/venue
- coordination: Has dates but NO specific time
- info: No actionable dates or just reference

Args:
    extraction: Raw extraction dict from briefing generator (key_details, etc.)

Returns:
    One of: "calendar_event", "coordination", "info"
"""
key_details = extraction.get("key_details", {})
if not isinstance(key_details, dict):
    key_details = {}

# Check for time — specific hour/minute
time_val = (
    key_details.get("time")
    or key_details.get("Time")
    or key_details.get("start")
    or key_details.get("Start")
    or ""
)
has_specific_time = bool(time_val and _is_specific_time(str(time_val)))

# Check for venue/location
location_val = (
    key_details.get("location")
    or key_details.get("Location")
    or ""
)
has_venue = bool(location_val and str(location_val).strip())

# Check for dates
date_val = (
    key_details.get("date")
    or key_details.get("Date")
    or ""
)
has_dates = bool(date_val and str(date_val).strip())

# Check for day-of-week mentions (coordination signal)
category = extraction.get("category", "info")
suggested_actions = extraction.get("suggested_actions", [])
action_text = " ".join(suggested_actions).lower() if suggested_actions else ""

# Coordination signals: "cover", "pick up", "grab", "drop off", "swap"
coordination_signals = ["cover", "pick up", "grab", "drop off", "swap",
                        "can you", "need someone", "who can"]
is_coordination_intent = any(sig in action_text for sig in coordination_signals)

if has_specific_time and has_venue:
    return "calendar_event"
elif has_dates and not has_specific_time:
    return "coordination"
elif has_dates and is_coordination_intent:
    return "coordination"
else:
    return "info"

def _is_specific_time(time_str: str) -> bool:
"""Check if a time string contains a specific hour:minute, not just a range.

"2:00 PM"  True
"morning"  False
"after school"  False
"10:30"  True
"""
import re
# Match patterns like "2:00", "10:30", "14:00", "2 PM", etc.
specific_pattern = re.compile(r'\b\d{1,2}:\d{2}\b|\b\d{1,2}\s*(?:AM|PM|am|pm)\b')
return bool(specific_pattern.search(time_str))

---------------------------------------------------------------------------

EventGraphWriter

---------------------------------------------------------------------------

class EventGraphWriter:
"""Persist briefing results to the Event Graph.

Idempotent: re-processing the same document_id updates the existing row.
"""

def write(self, document_id: str, briefing: dict, extraction: dict) -> bool:
    """Write a briefing to the Event Graph.

    Args:
        document_id: Unique document identifier (filename_hash or similar)
        briefing: The briefing card dict (title, summary, key_details, etc.)
        extraction: Raw extraction dict for additional metadata

    Returns:
        True if written successfully
    """
    event_type = classify_event_type(extraction)

    # Skip info-only documents — they don't need persistence
    if event_type == "info":
        return True  # Not an error, just nothing to persist

    # Extract structured fields
    key_details = extraction.get("key_details", {})
    if not isinstance(key_details, dict):
        key_details = {}

    # Dates
    date_val = key_details.get("date") or key_details.get("Date") or ""
    dates = [str(date_val)] if date_val else []
    # Also check for multiple dates in extraction
    if extraction.get("dates"):
        dates = extraction["dates"] if isinstance(extraction["dates"], list) else [extraction["dates"]]

    # Times
    time_val = key_details.get("time") or key_details.get("Time") or ""
    times = [str(time_val)] if time_val else []

    # Assigned family members
    assigned_to = extraction.get("assigned_to", [])
    if not assigned_to:
        # Infer from action_buttons or briefing metadata
        who_raw = key_details.get("who") or extraction.get("who", [])
        if who_raw:
            assigned_to = who_raw if isinstance(who_raw, list) else [who_raw]

    # Child inference
    child = self._infer_child(extraction)

    # Context inference
    context = self._infer_context(extraction)

    # Confidence
    confidence = float(extraction.get("confidence", 0.5))

    # Routing decisions
    calendar_destination = "radicale" if event_type == "calendar_event" else None
    coordination_logged = event_type == "coordination"

    # Recipients (from family inference)
    recipients = assigned_to if assigned_to else []

    # Coordination flags
    needs_confirmation = event_type == "coordination"
    blocking = event_type == "coordination" and any(
        sig in str(extraction.get("suggested_actions", []))
        for sig in ["cover", "care", "supervision"]
    )

    # Processing metadata
    now = datetime.now(timezone.utc).isoformat()

    conn = _get_connection()
    try:
        conn.execute(
            """
            INSERT OR REPLACE INTO event_graph (
                document_id, type, dates, times, assigned_to, child,
                context, calendar_destination, coordination_logged,
                recipients, confidence, needs_confirmation, blocking,
                created_at, processed_at, raw_extracted
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                document_id,
                event_type,
                json.dumps(dates),
                json.dumps(times),
                json.dumps(assigned_to),
                child,
                context,
                calendar_destination,
                coordination_logged,
                json.dumps(recipients),
                confidence,
                needs_confirmation,
                blocking,
                now,
                now,
                json.dumps(extraction, default=str),
            ),
        )
        conn.commit()
        return True
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

def _infer_child(self, extraction: dict) -> Optional[str]:
    """Infer which child is involved from the extraction."""
    text = ""
    key_details = extraction.get("key_details", {})
    if isinstance(key_details, dict):
        text += " " + str(key_details.get("who", ""))
    text += " " + extraction.get("title", "")
    text += " " + extraction.get("summary", "")
    text = text.lower()

    has_sully = any(n in text for n in ["sully", "sullivan", "sullivan's"])
    has_harper = "harper" in text

    if has_sully and has_harper:
        return "both"
    elif has_sully:
        return "sully"
    elif has_harper:
        return "harper"
    return None

def _infer_context(self, extraction: dict) -> Optional[str]:
    """Infer the context type from the extraction."""
    text = ""
    key_details = extraction.get("key_details", {})
    if isinstance(key_details, dict):
        text += " " + str(key_details)
    text += " " + extraction.get("title", "")
    text += " " + extraction.get("summary", "")
    actions = " ".join(extraction.get("suggested_actions", []))
    text += " " + actions
    text = text.lower()

    context_map = {
        "transport": ["pick up", "drop off", "drive", "car", "bus", "ride", "grab at", "pick-up", "drop-off"],
        "care_coverage": ["cover", "watch", "stay with", "babysit", "care", "supervision"],
        "medical": ["doctor", "dentist", "appointment", "checkup", "clinic", "hospital", "physical"],
        "school": ["school", "class", "teacher", "field trip", "permission", "grade", "conference"],
        "activity": ["practice", "game", "lesson", "recital", "tournament", "rehearsal", "club"],
    }

    for ctx, keywords in context_map.items():
        if any(kw in text for kw in keywords):
            return ctx

    return None

---------------------------------------------------------------------------

Query helpers

---------------------------------------------------------------------------

def get_recent_events(days: int = 7, limit: int = 50) -> list[dict]:
"""Return recent Event Graph entries for dashboard display.

Args:
    days: How many days back to look
    limit: Maximum rows to return

Returns:
    List of event dicts
"""
conn = _get_connection()
try:
    rows = conn.execute(
        """
        SELECT * FROM event_graph
        WHERE created_at > datetime('now', ?)
        ORDER BY created_at DESC
        LIMIT ?
        """,
        (f"-{days} days", limit),
    ).fetchall()

    results = []
    for row in rows:
        r = dict(row)
        # Parse JSON fields
        for field in ("dates", "times", "assigned_to", "recipients", "raw_extracted"):
            if r.get(field):
                try:
                    r[field] = json.loads(r[field])
                except (json.JSONDecodeError, TypeError):
                    pass
        results.append(r)
    return results
finally:
    conn.close()

def get_coordination_items() -> list[dict]:
"""Return coordination items that need confirmation (for HBM).

Returns items where needs_confirmation=1, ordered by blocking first.
"""
conn = _get_connection()
try:
    rows = conn.execute(
        """
        SELECT * FROM event_graph
        WHERE type = 'coordination' AND needs_confirmation = 1
        ORDER BY blocking DESC, created_at DESC
        """
    ).fetchall()

    results = []
    for row in rows:
        r = dict(row)
        for field in ("dates", "times", "assigned_to", "recipients", "raw_extracted"):
            if r.get(field):
                try:
                    r[field] = json.loads(r[field])
                except (json.JSONDecodeError, TypeError):
                    pass
        results.append(r)
    return results
finally:
    conn.close()

def get_stats() -> dict:
"""Return aggregate Event Graph statistics."""
conn = _get_connection()
try:
total = conn.execute("SELECT COUNT() FROM event_graph").fetchone()[0]
by_type = {}
for row in conn.execute("SELECT type, COUNT(
) as cnt FROM event_graph GROUP BY type"):
by_type[row["type"]] = row["cnt"]

    coordination_open = conn.execute(
        "SELECT COUNT(*) FROM event_graph WHERE type='coordination' AND needs_confirmation=1"
    ).fetchone()[0]

    blocking_count = conn.execute(
        "SELECT COUNT(*) FROM event_graph WHERE blocking=1"
    ).fetchone()[0]

    return {
        "total": total,
        "by_type": by_type,
        "coordination_open": coordination_open,
        "blocking": blocking_count,
    }
finally:
    conn.close()