"""Shadow Database — Isolated storage for shadow mode message capture. Completely separate from production and staging databases. Stores: messages, extractions, tripwire results, and validation metrics. """ import sqlite3 import json import logging from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, Any, List # Shadow database path — completely isolated DEFAULT_SHADOW_DB_PATH = Path.home() / ".icarus" / "shadow" / "shadow.db" # Schema version for migrations SCHEMA_VERSION = 1 # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- SCHEMA_SQL = """ -- Schema version tracking CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER PRIMARY KEY, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Raw messages captured from Telegram CREATE TABLE IF NOT EXISTS shadow_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id TEXT NOT NULL, chat_id TEXT NOT NULL, chat_title TEXT, sender_id TEXT NOT NULL, sender_name TEXT, sender_username TEXT, message_text TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, -- Tripwire results tripwire_fired BOOLEAN DEFAULT 0, tripwire_score REAL, tripwire_matches TEXT, -- JSON array of matched patterns -- LLM extraction results extraction_json TEXT, -- Full extraction JSON extraction_confidence REAL, extraction_model TEXT, -- Entity detection entities_detected TEXT, -- JSON array of detected entities entities_normalized TEXT, -- JSON array of normalized entity names -- Processing metadata processed_at DATETIME, processing_duration_ms INTEGER, error TEXT, -- Unique constraint on message_id per chat UNIQUE(message_id, chat_id) ); CREATE INDEX IF NOT EXISTS idx_shadow_messages_chat ON shadow_messages(chat_id); CREATE INDEX IF NOT EXISTS idx_shadow_messages_sender ON shadow_messages(sender_id); CREATE INDEX IF NOT EXISTS idx_shadow_messages_timestamp ON shadow_messages(timestamp); CREATE INDEX IF NOT EXISTS idx_shadow_messages_tripwire ON shadow_messages(tripwire_fired); -- Extracted coordination/scheduling data CREATE TABLE IF NOT EXISTS shadow_extractions ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id TEXT NOT NULL, shadow_message_id INTEGER NOT NULL, extraction_type TEXT NOT NULL, -- 'coordination', 'schedule', 'reminder', 'info' -- Extracted 4W data extracted_who TEXT, -- JSON array extracted_what TEXT, extracted_when TEXT, extracted_where TEXT, -- Confidence and status confidence REAL CHECK(confidence >= 0 AND confidence <= 1), needs_confirmation BOOLEAN DEFAULT 0, status TEXT DEFAULT 'pending', -- 'pending', 'validated', 'rejected', 'unclear' -- Validation metadata (filled during manual review) validated_by TEXT, validated_at DATETIME, validation_notes TEXT, -- Ground truth (for metrics calculation) ground_truth_type TEXT, -- What it actually was ground_truth_who TEXT, -- JSON array of actual involved parties -- Calendar validation results (Enriched Shadow Mode) calendar_check_status TEXT, -- 'match' | 'no_match' | 'conflict' calendar_event_id TEXT, calendar_event_title TEXT, calendar_event_time TEXT, fuzzy_match_score REAL, admin_dm_sent BOOLEAN DEFAULT 0, admin_response TEXT, -- 'yes' | 'no' | 'edit' | 'timeout' admin_dm_sent_at DATETIME, admin_response_at DATETIME, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(shadow_message_id) REFERENCES shadow_messages(id), UNIQUE(message_id, extraction_type) ); CREATE INDEX IF NOT EXISTS idx_shadow_extractions_type ON shadow_extractions(extraction_type); CREATE INDEX IF NOT EXISTS idx_shadow_extractions_status ON shadow_extractions(status); CREATE INDEX IF NOT EXISTS idx_shadow_extractions_confidence ON shadow_extractions(confidence); -- Daily metrics summary (for validation reports) CREATE TABLE IF NOT EXISTS shadow_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, date DATE NOT NULL UNIQUE, -- Volume metrics total_messages INTEGER DEFAULT 0, tripwire_fired_count INTEGER DEFAULT 0, extractions_created INTEGER DEFAULT 0, -- Tripwire precision/recall (calculated during validation) tripwire_true_positives INTEGER DEFAULT 0, tripwire_false_positives INTEGER DEFAULT 0, tripwire_false_negatives INTEGER DEFAULT 0, -- Entity recognition accuracy entity_correct INTEGER DEFAULT 0, entity_incorrect INTEGER DEFAULT 0, entity_missed INTEGER DEFAULT 0, -- Notes notes TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_shadow_metrics_date ON shadow_metrics(date); -- Insert schema version INSERT OR IGNORE INTO schema_version (version) VALUES (1); """ class ShadowDatabase: """Database operations for shadow mode. Completely isolated from production/staging databases. """ def __init__(self, db_path: Optional[Path] = None): self.db_path = Path(db_path) if db_path else DEFAULT_SHADOW_DB_PATH self.db_path.parent.mkdir(parents=True, exist_ok=True) self._init_db() def _get_connection(self) -> sqlite3.Connection: """Get a database connection with row factory.""" conn = sqlite3.connect(str(self.db_path), check_same_thread=False) conn.row_factory = sqlite3.Row # Enable foreign keys conn.execute("PRAGMA foreign_keys = ON") return conn def _init_db(self): """Initialize database schema.""" with self._get_connection() as conn: conn.executescript(SCHEMA_SQL) conn.commit() # ----------------------------------------------------------------------- # Message Operations # ----------------------------------------------------------------------- def store_message( self, message_id: str, chat_id: str, sender_id: str, message_text: Optional[str] = None, sender_name: Optional[str] = None, sender_username: Optional[str] = None, chat_title: Optional[str] = None, timestamp: Optional[datetime] = None, ) -> int: """Store a raw message from Telegram. Returns the shadow_messages row id. """ conn = self._get_connection() try: cursor = conn.execute( """ INSERT OR IGNORE INTO shadow_messages (message_id, chat_id, sender_id, message_text, sender_name, sender_username, chat_title, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( message_id, chat_id, sender_id, message_text, sender_name, sender_username, chat_title, timestamp.isoformat() if timestamp else datetime.now(timezone.utc).isoformat(), ), ) conn.commit() # Get the row id row = conn.execute( "SELECT id FROM shadow_messages WHERE message_id = ? AND chat_id = ?", (message_id, chat_id), ).fetchone() return row["id"] if row else -1 finally: conn.close() def update_tripwire_result( self, shadow_message_id: int, fired: bool, score: Optional[float] = None, matches: Optional[List[str]] = None, ): """Update tripwire results for a message.""" conn = self._get_connection() try: conn.execute( """ UPDATE shadow_messages SET tripwire_fired = ?, tripwire_score = ?, tripwire_matches = ? WHERE id = ? """, ( 1 if fired else 0, score, json.dumps(matches) if matches else None, shadow_message_id, ), ) conn.commit() finally: conn.close() def update_extraction_result( self, shadow_message_id: int, extraction: Dict[str, Any], confidence: float, model: str, entities: List[str], normalized_entities: List[str], processing_duration_ms: Optional[int] = None, error: Optional[str] = None, ): """Update LLM extraction results for a message.""" conn = self._get_connection() try: conn.execute( """ UPDATE shadow_messages SET extraction_json = ?, extraction_confidence = ?, extraction_model = ?, entities_detected = ?, entities_normalized = ?, processed_at = ?, processing_duration_ms = ?, error = ? WHERE id = ? """, ( json.dumps(extraction), confidence, model, json.dumps(entities), json.dumps(normalized_entities), datetime.now(timezone.utc).isoformat(), processing_duration_ms, error, shadow_message_id, ), ) conn.commit() finally: conn.close() # ----------------------------------------------------------------------- # Extraction Operations # ----------------------------------------------------------------------- def store_extraction( self, message_id: str, shadow_message_id: int, extraction_type: str, extracted_who: Optional[List[str]] = None, extracted_what: Optional[str] = None, extracted_when: Optional[str] = None, extracted_where: Optional[str] = None, confidence: float = 0.0, needs_confirmation: bool = False, ) -> int: """Store an extracted coordination/schedule item. Returns the extraction row id. """ conn = self._get_connection() try: cursor = conn.execute( """ INSERT OR REPLACE INTO shadow_extractions (message_id, shadow_message_id, extraction_type, extracted_who, extracted_what, extracted_when, extracted_where, confidence, needs_confirmation) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( message_id, shadow_message_id, extraction_type, json.dumps(extracted_who) if extracted_who else None, extracted_what, extracted_when, extracted_where, confidence, 1 if needs_confirmation else 0, ), ) conn.commit() return cursor.lastrowid finally: conn.close() def update_validation( self, extraction_id: int, status: str, validated_by: Optional[str] = None, notes: Optional[str] = None, ground_truth_type: Optional[str] = None, ground_truth_who: Optional[List[str]] = None, ): """Update validation status for an extraction.""" conn = self._get_connection() try: conn.execute( """ UPDATE shadow_extractions SET status = ?, validated_by = ?, validated_at = ?, validation_notes = ?, ground_truth_type = ?, ground_truth_who = ? WHERE id = ? """, ( status, validated_by, datetime.now(timezone.utc).isoformat(), notes, ground_truth_type, json.dumps(ground_truth_who) if ground_truth_who else None, extraction_id, ), ) conn.commit() finally: conn.close() # ----------------------------------------------------------------------- # Query Operations # ----------------------------------------------------------------------- def get_recent_messages(self, chat_id: str, before_message_id: Optional[str] = None, limit: int = 5, within_minutes: int = 15) -> List[Dict[str, Any]]: """Get recent messages from a chat for context building. Args: chat_id: The chat ID to query before_message_id: Only return messages before this message ID limit: Maximum number of messages to return within_minutes: Only return messages within this many minutes of now Returns: List of message dictionaries, newest first """ conn = self._get_connection() try: query = """ SELECT * FROM shadow_messages WHERE chat_id = ? AND timestamp >= datetime('now', '-' || ? || ' minutes') """ params = [chat_id, within_minutes] if before_message_id: query += " AND message_id < ?" params.append(before_message_id) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) rows = conn.execute(query, params).fetchall() return [dict(row) for row in rows] finally: conn.close() def get_extractions( self, status: Optional[str] = None, limit: int = 100, ) -> List[Dict[str, Any]]: """Get extractions, optionally filtered by status.""" conn = self._get_connection() try: if status: rows = conn.execute( """ SELECT * FROM shadow_extractions WHERE status = ? ORDER BY created_at DESC LIMIT ? """, (status, limit), ).fetchall() else: rows = conn.execute( """ SELECT * FROM shadow_extractions ORDER BY created_at DESC LIMIT ? """, (limit,), ).fetchall() return [dict(row) for row in rows] finally: conn.close() def get_daily_stats(self, date: Optional[str] = None) -> Dict[str, Any]: """Get statistics for a specific date (YYYY-MM-DD) or today.""" if date is None: date = datetime.now(timezone.utc).strftime("%Y-%m-%d") conn = self._get_connection() try: # Get or create metrics row row = conn.execute( "SELECT * FROM shadow_metrics WHERE date = ?", (date,), ).fetchone() if not row: # Calculate from raw data stats = conn.execute( """ SELECT COUNT(*) as total_messages, SUM(CASE WHEN tripwire_fired = 1 THEN 1 ELSE 0 END) as tripwire_count, SUM(CASE WHEN extraction_json IS NOT NULL THEN 1 ELSE 0 END) as extraction_count FROM shadow_messages WHERE DATE(timestamp) = ? """, (date,), ).fetchone() return { "date": date, "total_messages": stats["total_messages"] or 0, "tripwire_fired_count": stats["tripwire_count"] or 0, "extractions_created": stats["extraction_count"] or 0, } return dict(row) finally: conn.close() def get_unvalidated_extractions(self, limit: int = 50) -> List[Dict[str, Any]]: """Get extractions awaiting manual validation.""" conn = self._get_connection() try: rows = conn.execute( """ SELECT e.*, m.message_text, m.sender_name, m.timestamp as message_timestamp FROM shadow_extractions e JOIN shadow_messages m ON e.shadow_message_id = m.id WHERE e.status = 'pending' ORDER BY e.confidence DESC, m.timestamp DESC LIMIT ? """, (limit,), ).fetchall() return [dict(row) for row in rows] finally: conn.close() def get_extraction_by_id(self, extraction_id: int) -> Optional[Dict[str, Any]]: """Get a single extraction by its ID.""" conn = self._get_connection() try: row = conn.execute( "SELECT * FROM shadow_extractions WHERE id = ?", (extraction_id,), ).fetchone() return dict(row) if row else None finally: conn.close() def update_calendar_check( self, extraction_id: int, status: str, event_id: Optional[str] = None, event_title: Optional[str] = None, event_time: Optional[str] = None, fuzzy_score: Optional[float] = None, ): """Update calendar validation results for an extraction. Args: extraction_id: The extraction row id status: 'match', 'no_match', or 'conflict' event_id: Matched calendar event ID (if any) event_title: Matched calendar event title (if any) event_time: Matched calendar event time (if any) fuzzy_score: Fuzzy matching score (0.0-1.0) """ conn = self._get_connection() try: conn.execute( """ UPDATE shadow_extractions SET calendar_check_status = ?, calendar_event_id = ?, calendar_event_title = ?, calendar_event_time = ?, fuzzy_match_score = ? WHERE id = ? """, ( status, event_id, event_title, event_time, fuzzy_score, extraction_id, ), ) conn.commit() finally: conn.close() def update_admin_dm_sent( self, extraction_id: int, dm_type: str, # 'no_match' or 'conflict' ): """Mark that an admin DM was sent for this extraction.""" conn = self._get_connection() try: conn.execute( """ UPDATE shadow_extractions SET admin_dm_sent = 1, admin_dm_sent_at = ? WHERE id = ? """, ( datetime.now(timezone.utc).isoformat(), extraction_id, ), ) conn.commit() finally: conn.close() def update_admin_response( self, extraction_id: int, response: str, # 'yes', 'no', 'edit', 'timeout' ): """Store admin response to a DM.""" conn = self._get_connection() try: conn.execute( """ UPDATE shadow_extractions SET admin_response = ?, admin_response_at = ? WHERE id = ? """, ( response, datetime.now(timezone.utc).isoformat(), extraction_id, ), ) conn.commit() finally: conn.close() def get_extractions_needing_calendar_check( self, limit: int = 50, ) -> List[Dict[str, Any]]: """Get extractions that need calendar validation (no check yet).""" conn = self._get_connection() try: rows = conn.execute( """ SELECT e.*, m.message_text, m.sender_name, m.timestamp as message_timestamp FROM shadow_extractions e JOIN shadow_messages m ON e.shadow_message_id = m.id WHERE e.calendar_check_status IS NULL AND e.extraction_type IN ('coordination', 'schedule') AND e.confidence >= 0.7 ORDER BY e.confidence DESC, m.timestamp DESC LIMIT ? """, (limit,), ).fetchall() return [dict(row) for row in rows] finally: conn.close() def compute_daily_metrics(self, date: Optional[str] = None) -> Dict[str, Any]: """Compute and store daily aggregation metrics. Calculates precision/recall from raw data and stores in shadow_metrics for trend tracking. Returns the metrics dict. """ if date is None: date = datetime.now(timezone.utc).strftime("%Y-%m-%d") conn = self._get_connection() try: # Count messages msg_stats = conn.execute( """ SELECT COUNT(*) as total_messages, SUM(CASE WHEN tripwire_fired = 1 THEN 1 ELSE 0 END) as tripwire_fired_count FROM shadow_messages WHERE DATE(timestamp) = ? """, (date,), ).fetchone() # Count extractions ext_stats = conn.execute( """ SELECT COUNT(*) as extraction_count FROM shadow_extractions e JOIN shadow_messages m ON m.id = e.shadow_message_id WHERE DATE(m.timestamp) = ? """, (date,), ).fetchone() total = msg_stats["total_messages"] or 0 fired = msg_stats["tripwire_fired_count"] or 0 extracted = ext_stats["extraction_count"] or 0 # Upsert into metrics table conn.execute( """ INSERT INTO shadow_metrics (date, total_messages, tripwire_fired_count, extractions_created, updated_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(date) DO UPDATE SET total_messages = excluded.total_messages, tripwire_fired_count = excluded.tripwire_fired_count, extractions_created = excluded.extractions_created, updated_at = excluded.updated_at """, (date, total, fired, extracted, datetime.now(timezone.utc).isoformat()), ) conn.commit() result = { "date": date, "total_messages": total, "tripwire_fired_count": fired, "extractions_created": extracted, } logging.info("[ShadowMetrics] Daily metrics for %s: %d msgs, %d fired, %d extractions", date, total, fired, extracted) return result finally: conn.close() def export_for_review(self, start_date: str, end_date: str) -> List[Dict[str, Any]]: """Export messages and extractions for a date range.""" conn = self._get_connection() try: rows = conn.execute( """ SELECT m.id as message_id, m.message_text, m.sender_name, m.timestamp, m.tripwire_fired, m.tripwire_matches, m.extraction_confidence, e.extraction_type, e.extracted_who, e.extracted_what, e.extracted_when, e.extracted_where, e.confidence as extraction_confidence, e.status FROM shadow_messages m LEFT JOIN shadow_extractions e ON m.id = e.shadow_message_id WHERE DATE(m.timestamp) BETWEEN ? AND ? ORDER BY m.timestamp DESC """, (start_date, end_date), ).fetchall() return [dict(row) for row in rows] finally: conn.close() def init_shadow_db(db_path: Optional[Path] = None) -> ShadowDatabase: """Initialize and return a ShadowDatabase instance.""" return ShadowDatabase(db_path)