"""Document store — persist parsed event details for calendar action buttons. Stores extracted event data from briefing cards so that when a user taps "Add to Calendar", we can retrieve the full event details (summary, start, end, location, description) without re-parsing the document. Tables: - briefing_events: Extracted event details linked to a document hash """ import json import hashlib import sqlite3 from datetime import datetime, timezone from pathlib import Path from icarus.core.config.staging import DATA_DIR DB_PATH = DATA_DIR / "icarus.db" # TTL for stored events — clean up after 24 hours EVENT_TTL_HOURS = 24 # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- SCHEMA = """ CREATE TABLE IF NOT EXISTS briefing_events ( doc_id TEXT PRIMARY KEY, event_hash TEXT NOT NULL, summary TEXT NOT NULL, start_time TEXT, end_time TEXT, location TEXT DEFAULT '', description TEXT DEFAULT '', category TEXT DEFAULT 'event', who TEXT DEFAULT '[]', source_filename TEXT DEFAULT '', full_briefing TEXT DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NOT NULL ); CREATE INDEX IF NOT EXISTS idx_events_expires ON briefing_events(expires_at); CREATE INDEX IF NOT EXISTS idx_events_hash ON briefing_events(event_hash); """ def _get_connection() -> sqlite3.Connection: DATA_DIR.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row return conn def init_db(): """Initialize the briefing_events table. Called on startup.""" with _get_connection() as conn: conn.executescript(SCHEMA) def _clean_expired(): """Remove expired event entries.""" with _get_connection() as conn: conn.execute( "DELETE FROM briefing_events WHERE expires_at < datetime('now')" ) conn.commit() def _generate_doc_id(filename: str, text: str) -> str: """Generate a stable document ID from filename + content hash.""" content_hash = hashlib.sha256(text.encode()).hexdigest()[:12] safe_name = "".join(c if c.isalnum() else "_" for c in filename[:32]) return f"{safe_name}_{content_hash}" def _generate_event_hash(summary: str, start_time: str) -> str: """Generate a short hash for event dedup in callback_data. Telegram callback_data has a 64-byte limit, so we keep this short. """ raw = f"{summary}|{start_time}" return hashlib.sha256(raw.encode()).hexdigest()[:10] def store_briefing_event( summary: str, start_time: str = "", end_time: str = "", location: str = "", description: str = "", category: str = "event", who: list | None = None, source_filename: str = "", full_briefing: dict | None = None, ) -> dict: """Store a parsed event from a briefing card. Returns a dict with doc_id and event_hash for building callback_data. """ _clean_expired() doc_id = _generate_doc_id(source_filename, summary) event_hash = _generate_event_hash(summary, start_time) who_json = json.dumps(who or []) briefing_json = json.dumps(full_briefing or {}) # Compute expiry from datetime import timedelta from zoneinfo import ZoneInfo expires_at = datetime.now(timezone.utc) + timedelta(hours=EVENT_TTL_HOURS) with _get_connection() as conn: conn.execute( """ INSERT OR REPLACE INTO briefing_events (doc_id, event_hash, summary, start_time, end_time, location, description, category, who, source_filename, full_briefing, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( doc_id, event_hash, summary, start_time, end_time, location, description, category, who_json, source_filename, briefing_json, expires_at.isoformat(), ), ) conn.commit() return { "doc_id": doc_id, "event_hash": event_hash, } def get_event_by_hash(doc_id: str, event_hash: str) -> dict | None: """Retrieve a stored event by doc_id and event_hash. Returns the event dict or None if not found/expired. """ _clean_expired() with _get_connection() as conn: row = conn.execute( """ SELECT * FROM briefing_events WHERE doc_id = ? AND event_hash = ? AND expires_at > datetime('now') """, (doc_id, event_hash), ).fetchone() if not row: return None result = dict(row) result["who"] = json.loads(result.get("who", "[]")) result["full_briefing"] = json.loads(result.get("full_briefing", "{}")) return result def get_events_by_doc_id(doc_id: str) -> list[dict]: """Retrieve all stored events for a document. Returns list of event dicts, or empty list if none found. """ _clean_expired() with _get_connection() as conn: rows = conn.execute( """ SELECT * FROM briefing_events WHERE doc_id = ? AND expires_at > datetime('now') ORDER BY created_at """, (doc_id,), ).fetchall() results = [] for row in rows: result = dict(row) result["who"] = json.loads(result.get("who", "[]")) result["full_briefing"] = json.loads(result.get("full_briefing", "{}")) results.append(result) return results