"""Document store — persist parsed event details for calendar action buttons.
Stores extracted event data from briefing cards so that when a user taps
"Add to Calendar", we can retrieve the full event details (summary, start,
end, location, description) without re-parsing the document.
Tables:
- briefing_events: Extracted event details linked to a document hash
"""
import json
import hashlib
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from icarus.core.config.staging import DATA_DIR
DB_PATH = DATA_DIR / "icarus.db"
TTL for stored events — clean up after 24 hours
EVENT_TTL_HOURS = 24
---------------------------------------------------------------------------
Schema
---------------------------------------------------------------------------
SCHEMA = """
CREATE TABLE IF NOT EXISTS briefing_events (
doc_id TEXT PRIMARY KEY,
event_hash TEXT NOT NULL,
summary TEXT NOT NULL,
start_time TEXT,
end_time TEXT,
location TEXT DEFAULT '',
description TEXT DEFAULT '',
category TEXT DEFAULT 'event',
who TEXT DEFAULT '[]',
source_filename TEXT DEFAULT '',
full_briefing TEXT DEFAULT '{}',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_expires ON briefing_events(expires_at);
CREATE INDEX IF NOT EXISTS idx_events_hash ON briefing_events(event_hash);
"""
def _get_connection() -> sqlite3.Connection:
DATA_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""Initialize the briefing_events table. Called on startup."""
with _get_connection() as conn:
conn.executescript(SCHEMA)
def _clean_expired():
"""Remove expired event entries."""
with _get_connection() as conn:
conn.execute(
"DELETE FROM briefing_events WHERE expires_at < datetime('now')"
)
conn.commit()
def generate_doc_id(filename: str, text: str) -> str:
"""Generate a stable document ID from filename + content hash."""
content_hash = hashlib.sha256(text.encode()).hexdigest()[:12]
safe_name = "".join(c if c.isalnum() else "" for c in filename[:32])
return f"{safe_name}_{content_hash}"
def _generate_event_hash(summary: str, start_time: str) -> str:
"""Generate a short hash for event dedup in callback_data.
Telegram callback_data has a 64-byte limit, so we keep this short.
"""
raw = f"{summary}|{start_time}"
return hashlib.sha256(raw.encode()).hexdigest()[:10]
def store_briefing_event(
summary: str,
start_time: str = "",
end_time: str = "",
location: str = "",
description: str = "",
category: str = "event",
who: list | None = None,
source_filename: str = "",
full_briefing: dict | None = None,
) -> dict:
"""Store a parsed event from a briefing card.
Returns a dict with doc_id and event_hash for building callback_data.
"""
_clean_expired()
doc_id = _generate_doc_id(source_filename, summary)
event_hash = _generate_event_hash(summary, start_time)
who_json = json.dumps(who or [])
briefing_json = json.dumps(full_briefing or {})
# Compute expiry
from datetime import timedelta
from zoneinfo import ZoneInfo
expires_at = datetime.now(timezone.utc) + timedelta(hours=EVENT_TTL_HOURS)
with _get_connection() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO briefing_events
(doc_id, event_hash, summary, start_time, end_time, location,
description, category, who, source_filename, full_briefing, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
doc_id,
event_hash,
summary,
start_time,
end_time,
location,
description,
category,
who_json,
source_filename,
briefing_json,
expires_at.isoformat(),
),
)
conn.commit()
return {
"doc_id": doc_id,
"event_hash": event_hash,
}
def get_event_by_hash(doc_id: str, event_hash: str) -> dict | None:
"""Retrieve a stored event by doc_id and event_hash.
Returns the event dict or None if not found/expired.
"""
_clean_expired()
with _get_connection() as conn:
row = conn.execute(
"""
SELECT * FROM briefing_events
WHERE doc_id = ? AND event_hash = ?
AND expires_at > datetime('now')
""",
(doc_id, event_hash),
).fetchone()
if not row:
return None
result = dict(row)
result["who"] = json.loads(result.get("who", "[]"))
result["full_briefing"] = json.loads(result.get("full_briefing", "{}"))
return result
def get_events_by_doc_id(doc_id: str) -> list[dict]:
"""Retrieve all stored events for a document.
Returns list of event dicts, or empty list if none found.
"""
_clean_expired()
with _get_connection() as conn:
rows = conn.execute(
"""
SELECT * FROM briefing_events
WHERE doc_id = ?
AND expires_at > datetime('now')
ORDER BY created_at
""",
(doc_id,),
).fetchall()
results = []
for row in rows:
result = dict(row)
result["who"] = json.loads(result.get("who", "[]"))
result["full_briefing"] = json.loads(result.get("full_briefing", "{}"))
results.append(result)
return results