📄 shadow_database.py 25,356 bytes Sunday 22:12 📋 Raw

"""Shadow Database — Isolated storage for shadow mode message capture.

Completely separate from production and staging databases.
Stores: messages, extractions, tripwire results, and validation metrics.
"""

import sqlite3
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List

Shadow database path — completely isolated

DEFAULT_SHADOW_DB_PATH = Path.home() / ".icarus" / "shadow" / "shadow.db"

Schema version for migrations

SCHEMA_VERSION = 1

---------------------------------------------------------------------------

Schema

---------------------------------------------------------------------------

SCHEMA_SQL = """
-- Schema version tracking
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Raw messages captured from Telegram
CREATE TABLE IF NOT EXISTS shadow_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT NOT NULL,
chat_id TEXT NOT NULL,
chat_title TEXT,
sender_id TEXT NOT NULL,
sender_name TEXT,
sender_username TEXT,
message_text TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,

-- Tripwire results
tripwire_fired BOOLEAN DEFAULT 0,
tripwire_score REAL,
tripwire_matches TEXT,  -- JSON array of matched patterns

-- LLM extraction results
extraction_json TEXT,  -- Full extraction JSON
extraction_confidence REAL,
extraction_model TEXT,

-- Entity detection
entities_detected TEXT,  -- JSON array of detected entities
entities_normalized TEXT,  -- JSON array of normalized entity names

-- Processing metadata
processed_at DATETIME,
processing_duration_ms INTEGER,
error TEXT,

-- Unique constraint on message_id per chat
UNIQUE(message_id, chat_id)

);

CREATE INDEX IF NOT EXISTS idx_shadow_messages_chat ON shadow_messages(chat_id);
CREATE INDEX IF NOT EXISTS idx_shadow_messages_sender ON shadow_messages(sender_id);
CREATE INDEX IF NOT EXISTS idx_shadow_messages_timestamp ON shadow_messages(timestamp);
CREATE INDEX IF NOT EXISTS idx_shadow_messages_tripwire ON shadow_messages(tripwire_fired);

-- Extracted coordination/scheduling data
CREATE TABLE IF NOT EXISTS shadow_extractions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT NOT NULL,
shadow_message_id INTEGER NOT NULL,
extraction_type TEXT NOT NULL, -- 'coordination', 'schedule', 'reminder', 'info'

-- Extracted 4W data
extracted_who TEXT,  -- JSON array
extracted_what TEXT,
extracted_when TEXT,
extracted_where TEXT,

-- Confidence and status
confidence REAL CHECK(confidence >= 0 AND confidence <= 1),
needs_confirmation BOOLEAN DEFAULT 0,
status TEXT DEFAULT 'pending',  -- 'pending', 'validated', 'rejected', 'unclear'

-- Validation metadata (filled during manual review)
validated_by TEXT,
validated_at DATETIME,
validation_notes TEXT,

-- Ground truth (for metrics calculation)
ground_truth_type TEXT,  -- What it actually was
ground_truth_who TEXT,   -- JSON array of actual involved parties

-- Calendar validation results (Enriched Shadow Mode)
calendar_check_status TEXT,  -- 'match' | 'no_match' | 'conflict'
calendar_event_id TEXT,
calendar_event_title TEXT,
calendar_event_time TEXT,
fuzzy_match_score REAL,
admin_dm_sent BOOLEAN DEFAULT 0,
admin_response TEXT,  -- 'yes' | 'no' | 'edit' | 'timeout'
admin_dm_sent_at DATETIME,
admin_response_at DATETIME,

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

FOREIGN KEY(shadow_message_id) REFERENCES shadow_messages(id),
UNIQUE(message_id, extraction_type)

);

CREATE INDEX IF NOT EXISTS idx_shadow_extractions_type ON shadow_extractions(extraction_type);
CREATE INDEX IF NOT EXISTS idx_shadow_extractions_status ON shadow_extractions(status);
CREATE INDEX IF NOT EXISTS idx_shadow_extractions_confidence ON shadow_extractions(confidence);

-- Daily metrics summary (for validation reports)
CREATE TABLE IF NOT EXISTS shadow_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date DATE NOT NULL UNIQUE,

-- Volume metrics
total_messages INTEGER DEFAULT 0,
tripwire_fired_count INTEGER DEFAULT 0,
extractions_created INTEGER DEFAULT 0,

-- Tripwire precision/recall (calculated during validation)
tripwire_true_positives INTEGER DEFAULT 0,
tripwire_false_positives INTEGER DEFAULT 0,
tripwire_false_negatives INTEGER DEFAULT 0,

-- Entity recognition accuracy
entity_correct INTEGER DEFAULT 0,
entity_incorrect INTEGER DEFAULT 0,
entity_missed INTEGER DEFAULT 0,

-- Notes
notes TEXT,

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

);

CREATE INDEX IF NOT EXISTS idx_shadow_metrics_date ON shadow_metrics(date);

-- Insert schema version
INSERT OR IGNORE INTO schema_version (version) VALUES (1);
"""

class ShadowDatabase:
"""Database operations for shadow mode.

Completely isolated from production/staging databases.
"""

def __init__(self, db_path: Optional[Path] = None):
    self.db_path = Path(db_path) if db_path else DEFAULT_SHADOW_DB_PATH
    self.db_path.parent.mkdir(parents=True, exist_ok=True)
    self._init_db()

def _get_connection(self) -> sqlite3.Connection:
    """Get a database connection with row factory."""
    conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
    conn.row_factory = sqlite3.Row
    # Enable foreign keys
    conn.execute("PRAGMA foreign_keys = ON")
    return conn

def _init_db(self):
    """Initialize database schema."""
    with self._get_connection() as conn:
        conn.executescript(SCHEMA_SQL)
        conn.commit()

# -----------------------------------------------------------------------
# Message Operations
# -----------------------------------------------------------------------

def store_message(
    self,
    message_id: str,
    chat_id: str,
    sender_id: str,
    message_text: Optional[str] = None,
    sender_name: Optional[str] = None,
    sender_username: Optional[str] = None,
    chat_title: Optional[str] = None,
    timestamp: Optional[datetime] = None,
) -> int:
    """Store a raw message from Telegram.

    Returns the shadow_messages row id.
    """
    conn = self._get_connection()
    try:
        cursor = conn.execute(
            """
            INSERT OR IGNORE INTO shadow_messages
            (message_id, chat_id, sender_id, message_text, sender_name, 
             sender_username, chat_title, timestamp)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                message_id,
                chat_id,
                sender_id,
                message_text,
                sender_name,
                sender_username,
                chat_title,
                timestamp.isoformat() if timestamp else datetime.now(timezone.utc).isoformat(),
            ),
        )
        conn.commit()

        # Get the row id
        row = conn.execute(
            "SELECT id FROM shadow_messages WHERE message_id = ? AND chat_id = ?",
            (message_id, chat_id),
        ).fetchone()

        return row["id"] if row else -1

    finally:
        conn.close()

def update_tripwire_result(
    self,
    shadow_message_id: int,
    fired: bool,
    score: Optional[float] = None,
    matches: Optional[List[str]] = None,
):
    """Update tripwire results for a message."""
    conn = self._get_connection()
    try:
        conn.execute(
            """
            UPDATE shadow_messages
            SET tripwire_fired = ?, tripwire_score = ?, tripwire_matches = ?
            WHERE id = ?
            """,
            (
                1 if fired else 0,
                score,
                json.dumps(matches) if matches else None,
                shadow_message_id,
            ),
        )
        conn.commit()
    finally:
        conn.close()

def update_extraction_result(
    self,
    shadow_message_id: int,
    extraction: Dict[str, Any],
    confidence: float,
    model: str,
    entities: List[str],
    normalized_entities: List[str],
    processing_duration_ms: Optional[int] = None,
    error: Optional[str] = None,
):
    """Update LLM extraction results for a message."""
    conn = self._get_connection()
    try:
        conn.execute(
            """
            UPDATE shadow_messages
            SET extraction_json = ?, extraction_confidence = ?, extraction_model = ?,
                entities_detected = ?, entities_normalized = ?, 
                processed_at = ?, processing_duration_ms = ?, error = ?
            WHERE id = ?
            """,
            (
                json.dumps(extraction),
                confidence,
                model,
                json.dumps(entities),
                json.dumps(normalized_entities),
                datetime.now(timezone.utc).isoformat(),
                processing_duration_ms,
                error,
                shadow_message_id,
            ),
        )
        conn.commit()
    finally:
        conn.close()

# -----------------------------------------------------------------------
# Extraction Operations
# -----------------------------------------------------------------------

def store_extraction(
    self,
    message_id: str,
    shadow_message_id: int,
    extraction_type: str,
    extracted_who: Optional[List[str]] = None,
    extracted_what: Optional[str] = None,
    extracted_when: Optional[str] = None,
    extracted_where: Optional[str] = None,
    confidence: float = 0.0,
    needs_confirmation: bool = False,
) -> int:
    """Store an extracted coordination/schedule item.

    Returns the extraction row id.
    """
    conn = self._get_connection()
    try:
        cursor = conn.execute(
            """
            INSERT OR REPLACE INTO shadow_extractions
            (message_id, shadow_message_id, extraction_type, extracted_who,
             extracted_what, extracted_when, extracted_where, confidence, needs_confirmation)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                message_id,
                shadow_message_id,
                extraction_type,
                json.dumps(extracted_who) if extracted_who else None,
                extracted_what,
                extracted_when,
                extracted_where,
                confidence,
                1 if needs_confirmation else 0,
            ),
        )
        conn.commit()
        return cursor.lastrowid
    finally:
        conn.close()

def update_validation(
    self,
    extraction_id: int,
    status: str,
    validated_by: Optional[str] = None,
    notes: Optional[str] = None,
    ground_truth_type: Optional[str] = None,
    ground_truth_who: Optional[List[str]] = None,
):
    """Update validation status for an extraction."""
    conn = self._get_connection()
    try:
        conn.execute(
            """
            UPDATE shadow_extractions
            SET status = ?, validated_by = ?, validated_at = ?, 
                validation_notes = ?, ground_truth_type = ?, ground_truth_who = ?
            WHERE id = ?
            """,
            (
                status,
                validated_by,
                datetime.now(timezone.utc).isoformat(),
                notes,
                ground_truth_type,
                json.dumps(ground_truth_who) if ground_truth_who else None,
                extraction_id,
            ),
        )
        conn.commit()
    finally:
        conn.close()

# -----------------------------------------------------------------------
# Query Operations
# -----------------------------------------------------------------------

def get_recent_messages(self, chat_id: str, before_message_id: Optional[str] = None, limit: int = 5, within_minutes: int = 15) -> List[Dict[str, Any]]:
    """Get recent messages from a chat for context building.

    Args:
        chat_id: The chat ID to query
        before_message_id: Only return messages before this message ID
        limit: Maximum number of messages to return
        within_minutes: Only return messages within this many minutes of now

    Returns:
        List of message dictionaries, newest first
    """
    conn = self._get_connection()
    try:
        query = """
            SELECT * FROM shadow_messages
            WHERE chat_id = ?
            AND timestamp >= datetime('now', '-' || ? || ' minutes')
        """
        params = [chat_id, within_minutes]

        if before_message_id:
            query += " AND message_id < ?"
            params.append(before_message_id)

        query += " ORDER BY timestamp DESC LIMIT ?"
        params.append(limit)

        rows = conn.execute(query, params).fetchall()
        return [dict(row) for row in rows]
    finally:
        conn.close()

def get_extractions(
    self,
    status: Optional[str] = None,
    limit: int = 100,
) -> List[Dict[str, Any]]:
    """Get extractions, optionally filtered by status."""
    conn = self._get_connection()
    try:
        if status:
            rows = conn.execute(
                """
                SELECT * FROM shadow_extractions
                WHERE status = ?
                ORDER BY created_at DESC
                LIMIT ?
                """,
                (status, limit),
            ).fetchall()
        else:
            rows = conn.execute(
                """
                SELECT * FROM shadow_extractions
                ORDER BY created_at DESC
                LIMIT ?
                """,
                (limit,),
            ).fetchall()
        return [dict(row) for row in rows]
    finally:
        conn.close()

def get_daily_stats(self, date: Optional[str] = None) -> Dict[str, Any]:
    """Get statistics for a specific date (YYYY-MM-DD) or today."""
    if date is None:
        date = datetime.now(timezone.utc).strftime("%Y-%m-%d")

    conn = self._get_connection()
    try:
        # Get or create metrics row
        row = conn.execute(
            "SELECT * FROM shadow_metrics WHERE date = ?",
            (date,),
        ).fetchone()

        if not row:
            # Calculate from raw data
            stats = conn.execute(
                """
                SELECT 
                    COUNT(*) as total_messages,
                    SUM(CASE WHEN tripwire_fired = 1 THEN 1 ELSE 0 END) as tripwire_count,
                    SUM(CASE WHEN extraction_json IS NOT NULL THEN 1 ELSE 0 END) as extraction_count
                FROM shadow_messages
                WHERE DATE(timestamp) = ?
                """,
                (date,),
            ).fetchone()

            return {
                "date": date,
                "total_messages": stats["total_messages"] or 0,
                "tripwire_fired_count": stats["tripwire_count"] or 0,
                "extractions_created": stats["extraction_count"] or 0,
            }

        return dict(row)
    finally:
        conn.close()

def get_unvalidated_extractions(self, limit: int = 50) -> List[Dict[str, Any]]:
    """Get extractions awaiting manual validation."""
    conn = self._get_connection()
    try:
        rows = conn.execute(
            """
            SELECT e.*, m.message_text, m.sender_name, m.timestamp as message_timestamp
            FROM shadow_extractions e
            JOIN shadow_messages m ON e.shadow_message_id = m.id
            WHERE e.status = 'pending'
            ORDER BY e.confidence DESC, m.timestamp DESC
            LIMIT ?
            """,
            (limit,),
        ).fetchall()
        return [dict(row) for row in rows]
    finally:
        conn.close()

def get_extraction_by_id(self, extraction_id: int) -> Optional[Dict[str, Any]]:
    """Get a single extraction by its ID."""
    conn = self._get_connection()
    try:
        row = conn.execute(
            "SELECT * FROM shadow_extractions WHERE id = ?",
            (extraction_id,),
        ).fetchone()
        return dict(row) if row else None
    finally:
        conn.close()

def update_calendar_check(
    self,
    extraction_id: int,
    status: str,
    event_id: Optional[str] = None,
    event_title: Optional[str] = None,
    event_time: Optional[str] = None,
    fuzzy_score: Optional[float] = None,
):
    """Update calendar validation results for an extraction.

    Args:
        extraction_id: The extraction row id
        status: 'match', 'no_match', or 'conflict'
        event_id: Matched calendar event ID (if any)
        event_title: Matched calendar event title (if any)
        event_time: Matched calendar event time (if any)
        fuzzy_score: Fuzzy matching score (0.0-1.0)
    """
    conn = self._get_connection()
    try:
        conn.execute(
            """
            UPDATE shadow_extractions
            SET calendar_check_status = ?,
                calendar_event_id = ?,
                calendar_event_title = ?,
                calendar_event_time = ?,
                fuzzy_match_score = ?
            WHERE id = ?
            """,
            (
                status,
                event_id,
                event_title,
                event_time,
                fuzzy_score,
                extraction_id,
            ),
        )
        conn.commit()
    finally:
        conn.close()

def update_admin_dm_sent(
    self,
    extraction_id: int,
    dm_type: str,  # 'no_match' or 'conflict'
):
    """Mark that an admin DM was sent for this extraction."""
    conn = self._get_connection()
    try:
        conn.execute(
            """
            UPDATE shadow_extractions
            SET admin_dm_sent = 1,
                admin_dm_sent_at = ?
            WHERE id = ?
            """,
            (
                datetime.now(timezone.utc).isoformat(),
                extraction_id,
            ),
        )
        conn.commit()
    finally:
        conn.close()

def update_admin_response(
    self,
    extraction_id: int,
    response: str,  # 'yes', 'no', 'edit', 'timeout'
):
    """Store admin response to a DM."""
    conn = self._get_connection()
    try:
        conn.execute(
            """
            UPDATE shadow_extractions
            SET admin_response = ?,
                admin_response_at = ?
            WHERE id = ?
            """,
            (
                response,
                datetime.now(timezone.utc).isoformat(),
                extraction_id,
            ),
        )
        conn.commit()
    finally:
        conn.close()

def get_extractions_needing_calendar_check(
    self,
    limit: int = 50,
) -> List[Dict[str, Any]]:
    """Get extractions that need calendar validation (no check yet)."""
    conn = self._get_connection()
    try:
        rows = conn.execute(
            """
            SELECT e.*, m.message_text, m.sender_name, m.timestamp as message_timestamp
            FROM shadow_extractions e
            JOIN shadow_messages m ON e.shadow_message_id = m.id
            WHERE e.calendar_check_status IS NULL
              AND e.extraction_type IN ('coordination', 'schedule')
              AND e.confidence >= 0.7
            ORDER BY e.confidence DESC, m.timestamp DESC
            LIMIT ?
            """,
            (limit,),
        ).fetchall()
        return [dict(row) for row in rows]
    finally:
        conn.close()

def compute_daily_metrics(self, date: Optional[str] = None) -> Dict[str, Any]:
    """Compute and store daily aggregation metrics.

    Calculates precision/recall from raw data and stores
    in shadow_metrics for trend tracking.

    Returns the metrics dict.
    """
    if date is None:
        date = datetime.now(timezone.utc).strftime("%Y-%m-%d")

    conn = self._get_connection()
    try:
        # Count messages
        msg_stats = conn.execute(
            """
            SELECT
                COUNT(*) as total_messages,
                SUM(CASE WHEN tripwire_fired = 1 THEN 1 ELSE 0 END) as tripwire_fired_count
            FROM shadow_messages
            WHERE DATE(timestamp) = ?
            """,
            (date,),
        ).fetchone()

        # Count extractions
        ext_stats = conn.execute(
            """
            SELECT COUNT(*) as extraction_count
            FROM shadow_extractions e
            JOIN shadow_messages m ON m.id = e.shadow_message_id
            WHERE DATE(m.timestamp) = ?
            """,
            (date,),
        ).fetchone()

        total = msg_stats["total_messages"] or 0
        fired = msg_stats["tripwire_fired_count"] or 0
        extracted = ext_stats["extraction_count"] or 0

        # Upsert into metrics table
        conn.execute(
            """
            INSERT INTO shadow_metrics
                (date, total_messages, tripwire_fired_count, extractions_created, updated_at)
            VALUES (?, ?, ?, ?, ?)
            ON CONFLICT(date) DO UPDATE SET
                total_messages = excluded.total_messages,
                tripwire_fired_count = excluded.tripwire_fired_count,
                extractions_created = excluded.extractions_created,
                updated_at = excluded.updated_at
            """,
            (date, total, fired, extracted,
             datetime.now(timezone.utc).isoformat()),
        )
        conn.commit()

        result = {
            "date": date,
            "total_messages": total,
            "tripwire_fired_count": fired,
            "extractions_created": extracted,
        }
        logging.info("[ShadowMetrics] Daily metrics for %s: %d msgs, %d fired, %d extractions",
                     date, total, fired, extracted)
        return result

    finally:
        conn.close()

def export_for_review(self, start_date: str, end_date: str) -> List[Dict[str, Any]]:
    """Export messages and extractions for a date range."""
    conn = self._get_connection()
    try:
        rows = conn.execute(
            """
            SELECT 
                m.id as message_id,
                m.message_text,
                m.sender_name,
                m.timestamp,
                m.tripwire_fired,
                m.tripwire_matches,
                m.extraction_confidence,
                e.extraction_type,
                e.extracted_who,
                e.extracted_what,
                e.extracted_when,
                e.extracted_where,
                e.confidence as extraction_confidence,
                e.status
            FROM shadow_messages m
            LEFT JOIN shadow_extractions e ON m.id = e.shadow_message_id
            WHERE DATE(m.timestamp) BETWEEN ? AND ?
            ORDER BY m.timestamp DESC
            """,
            (start_date, end_date),
        ).fetchall()
        return [dict(row) for row in rows]
    finally:
        conn.close()

def init_shadow_db(db_path: Optional[Path] = None) -> ShadowDatabase:
"""Initialize and return a ShadowDatabase instance."""
return ShadowDatabase(db_path)