📄 shadow_bot.py 58,744 bytes Monday 15:49 📋 Raw

"""Shadow Mode Bot - Silent Telegram observer with SPEAK_HARD_DISABLED.

This bot observes messages from the Family Logistics group,
runs tripwire detection, performs LLM extraction, and logs everything
to the shadow database. It NEVER sends messages.

Hard constraints:
- SPEAK_ENABLED = False (compile-time constant, cannot be overridden)
- No reply to any message
- No confirmation buttons
- No Event Graph writes
- Isolated shadow database only
"""

import os
import re
import json
import time
import logging
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta

import httpx

from .shadow_database import ShadowDatabase
from .calendar_validator import CalendarValidator, CalendarCheckResult

SPEAK IS HARD-DISABLED - This is a compile-time safety constant.

The bot will NEVER send messages. To re-enable speaking, code changes

are required (removing this assertion would cause runtime errors).

SPEAK_ENABLED = False

VERBOSE ADMIN DM - When True, sends a DM to admin for EVERY processing

event (tripwire, extraction, calendar check, brain ingest).

This is for staging validation only — never enable in production.

VERBOSE_ADMIN_DM = os.environ.get("VERBOSE_ADMIN_DM", "false").lower() == "true"

Tripwire patterns for coordination signal detection

Tier 1: Fast regex/keyword matching (runs on every message)

Design principles (v2 — 2026-05-04 calibration):

1. Time signals are capped (max of all time scores, not additive)

2. Combos are limited to max 2 to prevent score inflation

3. Noisy words removed from broad patterns ("grab", "get", "change")

4. Implicit assignment patterns added ("X is going with you")

5. Suggestion intent patterns added ("we should", "let's")

6. Event recall is for referencing existing events, not task reminders

TRIPWIRE_PATTERNS = {
# Time/schedule indicators
"time_specific": r"\b(\d{1,2}:\d{2}\s(AM|PM|am|pm)?|\d{1,2}\s(AM|PM|am|pm))\b|\b(morning|afternoon|evening|tonight)\b",
"time_military": r"\b(0\d{3}|1\d{3}|2[0-3]\d{2})\b",
"day_reference": r"\b(today|tomorrow|tonight|yesterday|monday|tuesday|wednesday|thursday|friday|saturday|sunday)\b",
"vague_time": r"\b(this week|next week|this weekend|next weekend)\b",
"date_reference": r"\b(\d{1,2}/\d{1,2}|\d{1,2}-\d{1,2}|\b(january|february|march|april|may|june|july|august|september|october|november|december)\s+\d{1,2})\b",

# Meal/meeting/social indicators
"meal_meeting": r"\b(breakfast|lunch|brunch|dinner|drinks|coffee|meet|meeting|hang out|get together)\b",

# Direct coordination intent (explicit requests)
"can_you": r"\b(can you|could you|would you|will you|are you able to)\b",
"need_someone": r"\b(need someone|need a|looking for|anyone available|who can)\b",
"pickup_dropoff": r"\b(pick up|pickup|drop off|dropoff)\b",
"coverage_offer": r"\b(i can (take|grab|do)|i(?:'ll| will) (take|grab|do)|let me (take|grab|do)|i could take)\b",
"swap_change": r"\b(swap|switch|reschedule)\b",

# Implicit assignment — someone is going/assigned somewhere
"implicit_assign": r"\b(is going (with|to)|are going (with|to)|going with (you|me|us)|has to (go|be|get)|needs to (go|be|get))\b",

# Possessive family mention — "Sully's haircut", "Harper's dance"
"family_possessive": r"\b(Sully's|Sullivan's|Harper's|Maggie's|Matt's|Aundrea's)\b",

# Suggestion/planning intent — "we should", "let's", "how about"
"suggestion_intent": r"\b(we should|let's|how about|i was thinking|i'd like to|we could|we need to|we have to)\b",

# Location indicators (proper nouns + known venues)
"location": r"\b(at|to)\s+(?:the\s+)?([A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,2})|\b(school|practice|game|dentist|doctor|hospital|vet)\b",
"school_practice": r"\b(school|practice|game|match|dentist|doctor|appointment|OT|therapy)\b",

# Offer/help — narrowed to avoid catching casual chat
"offer_help": r"\b(do you want me to|should i|i'll take|i'll grab|i'll get|let me know|i could take)\b",

# Event recall / confirmation patterns — triggers CalendarValidator directly
# NOTE: "remember to" is a task reminder, NOT an event recall.
# Only "remember we have" (referencing a known event) is event recall.
"event_recall": r"\b(remember we have|don't forget (we|that)|are we still on for|looking forward to)\b",
"event_ack": r"\b(yes, i have|i'll be there|we're going to|see you at)\b",

}

Coordination keywords for combo scoring (narrowed to reduce false positives)

COORDINATION_KEYWORDS = [
"can you", "could you", "would you", "will you",
"need someone", "need a", "looking for", "anyone available",
"pick up", "drop off",
"cover", "watch", "stay with", "babysit", "fill in",
"swap", "switch", "reschedule",
"who can",
"breakfast", "lunch", "dinner", "drinks", "coffee", "meet",
]

Family member patterns (will be loaded from config)

FAMILY_MEMBER_ALIASES = {
"matt": ["matt", "matthew", "hoffmann"],
"aundrea": ["aundrea", "mom"],
"sullivan": ["sully", "sullivan"],
"harper": ["harper"],
"maggie": ["maggie", "dog"],
}

Family member patterns (will be loaded from config)

FAMILY_MEMBER_ALIASES = {
"matt": ["matt", "matthew", "hoffmann"],
"aundrea": ["aundrea", "mom"],
"sullivan": ["sully", "sullivan"],
"harper": ["harper"],
"maggie": ["maggie", "dog"],
}

Coordination signal scoring

COORDINATION_KEYWORDS = [
"can you", "could you", "would you", "will you",
"need someone", "need a", "looking for", "anyone available",
"pick up", "drop off", "grab", "get",
"cover", "watch", "stay with", "babysit", "fill in",
"swap", "switch", "change", "move",
"who can", "are you", "will you be",
"breakfast", "lunch", "dinner", "coffee", "meet", "hang out",
]

@dataclass
class TripwireResult:
"""Result of Tier 1 tripwire matching."""
fired: bool
score: float
matches: List[str]
confidence: str # "high", "medium", "low"

@dataclass
class ExtractionResult:
"""Result of Tier 2 LLM extraction."""
extraction_type: str
who: List[str]
what: str
when: str
where: str
confidence: float
needs_confirmation: bool
raw_extraction: Dict[str, Any]

class ShadowBot:
"""Silent observer bot for shadow mode.

NEVER SPEAKS. Only observes, extracts, and logs.
"""

def __init__(
    self,
    db: ShadowDatabase,
    bot_token: str,
    family_context: Optional[Dict[str, Any]] = None,
    llm_url: Optional[str] = None,
    llm_model: Optional[str] = None,
    admin_chat_id: Optional[str] = None,
    telegram_api_url: str = "https://api.telegram.org/bot",
    verbose_admin_dm: bool = False,
):
    self.db = db
    self.bot_token = bot_token
    self.base_url = f"{telegram_api_url}{bot_token}"
    self.family_context = family_context or {}
    self.llm_url = llm_url
    self.llm_model = llm_model or "qwen2.5-coder:7b"
    self.admin_chat_id = admin_chat_id  # For sending Admin DMs
    self.verbose_admin_dm = verbose_admin_dm  # Firehose mode for staging validation

    # Initialize calendar validator
    self.calendar_validator = CalendarValidator()

    # Compile tripwire patterns
    self._compiled_patterns = {
        name: re.compile(pattern, re.IGNORECASE)
        for name, pattern in TRIPWIRE_PATTERNS.items()
    }

    # Load family member aliases from context
    self._load_family_aliases()

    logging.info("[ShadowBot] Initialized - SPEAK_ENABLED=%s VERBOSE_ADMIN_DM=%s",
                 SPEAK_ENABLED, self.verbose_admin_dm)

def _load_family_aliases(self):
    """Load family member aliases from family context."""
    members = self.family_context.get("family", {}).get("members", [])
    for member in members:
        name = member.get("name", "").lower()
        if name:
            aliases = [name]
            for nick in member.get("nicknames", []):
                aliases.append(nick.lower())
            # Also add role-based aliases
            role = member.get("role", "").lower()
            if role in ["dad", "mom", "son", "daughter"]:
                aliases.append(role)
            FAMILY_MEMBER_ALIASES[name] = aliases

# -----------------------------------------------------------------------
# Core Message Processing
# -----------------------------------------------------------------------

async def process_message(self, update: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    """Process a Telegram message update.

    Returns processing result dict or None if not processed.
    """
    message = update.get("message", {})
    if not message:
        return None

    # Skip non-text messages for now (photos, documents handled separately if needed)
    if "text" not in message:
        logging.debug("[ShadowBot] Skipping non-text message")
        return None

    # Extract message metadata
    msg_id = str(message.get("message_id", ""))
    chat = message.get("chat", {})
    chat_id = str(chat.get("id", ""))
    chat_title = chat.get("title", "")

    sender = message.get("from", {})
    sender_id = str(sender.get("id", ""))
    sender_name = sender.get("first_name", "") + " " + sender.get("last_name", "")
    sender_name = sender_name.strip()
    sender_username = sender.get("username", "")

    text = message.get("text", "")
    timestamp = datetime.fromtimestamp(
        message.get("date", 0), tz=timezone.utc
    )

    logging.info("[ShadowBot] Processing message %s from %s", msg_id, sender_name or sender_username)

    # Stage 1: Store raw message
    shadow_msg_id = self.db.store_message(
        message_id=msg_id,
        chat_id=chat_id,
        sender_id=sender_id,
        message_text=text,
        sender_name=sender_name,
        sender_username=sender_username,
        chat_title=chat_title,
        timestamp=timestamp,
    )

    if shadow_msg_id < 0:
        logging.warning("[ShadowBot] Failed to store message %s", msg_id)
        return None

    # Stage 2: Run tripwire detection
    start_time = time.time()
    tripwire = self._run_tripwire(text)

    # Track pipeline results for verbose DM firehose
    _vd_calendar_status = ""
    _vd_calendar_title = ""
    _vd_calendar_fuzzy = 0.0

    self.db.update_tripwire_result(
        shadow_message_id=shadow_msg_id,
        fired=tripwire.fired,
        score=tripwire.score,
        matches=tripwire.matches,
    )

    logging.info("[ShadowBot] Tripwire fired=%s score=%.2f confidence=%s matches=%s",
                 tripwire.fired, tripwire.score, tripwire.confidence, tripwire.matches)

    # VERBOSE DM: Tripwire result (every message, even if tripwire didn't fire)
    if self.verbose_admin_dm:
        await self._send_verbose_dm_tripwire(
            sender_name=sender_name or sender_username,
            text=text[:200],  # Truncate long messages
            fired=tripwire.fired,
            score=tripwire.score,
            matches=tripwire.matches,
            confidence=tripwire.confidence,
        )

    # === EVENT RECALL DETECTION (Tier 1 Expansion) ===
    # Check if this is an event recall/confirmation (not coordination)
    has_event_recall = any(m in tripwire.matches for m in ["event_recall", "event_ack"])
    has_coordination = any(m in tripwire.matches for m in ["can_you", "need_someone", "pickup_dropoff", "coverage_offer", "offer_help", "swap_change"])

    if tripwire.fired and has_event_recall and not has_coordination:
        # Event recall detected — bypass LLM, route to CalendarValidator directly
        logging.info("[ShadowBot] Event recall detected — bypassing LLM extraction, routing to CalendarValidator")

        # Build simple extracted event from message text
        extracted_event = {
            "what": text,  # Use full text as what (CalendarValidator will fuzzy match)
            "when": self._extract_date_from_text(text),
            "where": "",
        }

        # Run calendar validation directly
        try:
            calendar_check = self.calendar_validator.check_event(extracted_event)

            if calendar_check.status == "MATCH":
                # Event acknowledged — log silently
                logging.info("[ShadowBot] Event recall MATCH — '%s' found in calendar (score=%.2f)",
                           calendar_check.event_title, calendar_check.fuzzy_score)
                # Store minimal extraction record
                extraction_id = self.db.store_extraction(
                    message_id=msg_id,
                    shadow_message_id=shadow_msg_id,
                    extraction_type="event_ack",
                    extracted_who=[],
                    extracted_what=text,
                    extracted_when=extracted_event["when"],
                    extracted_where="",
                    confidence=0.5,
                    needs_confirmation=False,
                )
                self.db.update_calendar_check(
                    extraction_id=extraction_id,
                    status="match",
                    event_id=calendar_check.event_id,
                    event_title=calendar_check.event_title,
                    event_time=calendar_check.event_start,
                    fuzzy_score=calendar_check.fuzzy_score,
                )

                # Ingest to brain
                await self._ingest_to_brain(
                    extraction_id=extraction_id,
                    extraction_result=ExtractionResult(
                        extraction_type="event_recall",
                        who=[],
                        what=text,
                        when=extracted_event["when"],
                        where="",
                        raw_extraction=extracted_event,
                        confidence=0.5,
                        needs_confirmation=False,
                    ),
                    message_text=text,
                    sender_name=sender_name,
                    sender_username=sender_username,
                    chat_title=chat_title,
                    msg_id=msg_id,
                    calendar_check_status="match",
                    calendar_event_id=calendar_check.event_id,
                    calendar_event_title=calendar_check.event_title,
                    fuzzy_match_score=calendar_check.fuzzy_score,
                )

            elif calendar_check.status == "NO_MATCH":
                # Event not in calendar — alert admin
                logging.info("[ShadowBot] Event recall NO_MATCH — '%s' not found in calendar", text)
                extraction_id = self.db.store_extraction(
                    message_id=msg_id,
                    shadow_message_id=shadow_msg_id,
                    extraction_type="event_recall",
                    extracted_who=[],
                    extracted_what=text,
                    extracted_when=extracted_event["when"],
                    extracted_where="",
                    confidence=0.5,
                    needs_confirmation=True,
                )
                self.db.update_calendar_check(
                    extraction_id=extraction_id,
                    status="no_match",
                    event_id=None,
                    event_title=None,
                    event_time=None,
                    fuzzy_score=0.0,
                )

                # Send admin DM about missing event
                await self._send_admin_dm_event_recall(
                    shadow_msg_id=shadow_msg_id,
                    extraction_id=extraction_id,
                    extracted_event=extracted_event,
                )

                # Ingest to brain
                await self._ingest_to_brain(
                    extraction_id=extraction_id,
                    extraction_result=ExtractionResult(
                        extraction_type="event_recall",
                        who=[],
                        what=text,
                        when=extracted_event["when"],
                        where="",
                        raw_extraction=extracted_event,
                        confidence=0.5,
                        needs_confirmation=True,
                    ),
                    message_text=text,
                    sender_name=sender_name,
                    sender_username=sender_username,
                    chat_title=chat_title,
                    msg_id=msg_id,
                    calendar_check_status="no_match",
                )

            # CONFLICT status handled same as MATCH for event recalls
            elif calendar_check.status == "CONFLICT":
                logging.info("[ShadowBot] Event recall CONFLICT — logging for review")
                extraction_id = self.db.store_extraction(
                    message_id=msg_id,
                    shadow_message_id=shadow_msg_id,
                    extraction_type="event_recall",
                    extracted_who=[],
                    extracted_what=text,
                    extracted_when=extracted_event["when"],
                    extracted_where="",
                    confidence=0.5,
                    needs_confirmation=True,
                )
                self.db.update_calendar_check(
                    extraction_id=extraction_id,
                    status="conflict",
                    event_id=calendar_check.event_id,
                    event_title=calendar_check.event_title,
                    event_time=calendar_check.event_start,
                    fuzzy_score=calendar_check.fuzzy_score,
                )

                # Ingest to brain
                await self._ingest_to_brain(
                    extraction_id=extraction_id,
                    extraction_result=ExtractionResult(
                        extraction_type="event_recall",
                        who=[],
                        what=text,
                        when=extracted_event["when"],
                        where="",
                        raw_extraction=extracted_event,
                        confidence=0.5,
                        needs_confirmation=True,
                    ),
                    message_text=text,
                    sender_name=sender_name,
                    sender_username=sender_username,
                    chat_title=chat_title,
                    msg_id=msg_id,
                    calendar_check_status="conflict",
                    calendar_event_id=calendar_check.event_id,
                    calendar_event_title=calendar_check.event_title,
                    fuzzy_match_score=calendar_check.fuzzy_score,
                )

        except Exception as e:
            logging.exception("[ShadowBot] Calendar validation failed for event recall: %s", e)

        # Skip normal LLM extraction pipeline
        if self.verbose_admin_dm:
            await self._send_verbose_dm_extraction(
                extraction_result=ExtractionResult(
                    extraction_type="event_recall",
                    who=[],
                    what=text,
                    when=extracted_event.get("when", ""),
                    where="",
                    raw_extraction=extracted_event,
                    confidence=0.5,
                    needs_confirmation=calendar_check.status != "MATCH" if 'calendar_check' in dir() else True,
                ),
                calendar_status=_vd_calendar_status,
                calendar_title=_vd_calendar_title,
                calendar_fuzzy=_vd_calendar_fuzzy,
            )
        return {
            "shadow_message_id": shadow_msg_id,
            "tripwire": tripwire,
            "extraction": None,
            "entities": [],
        }

    # Stage 3: If tripwire fires (normal coordination), run LLM extraction
    extraction_result = None
    entities_detected = []
    normalized_entities = []

    if tripwire.fired and self.llm_url:
        try:
            extraction_result = await self._run_llm_extraction(
                text, tripwire, 
                chat_id=chat_id, 
                message_id=msg_id
            )

            # Detect entities in the extraction
            entities_detected, normalized_entities = self._detect_entities(text, extraction_result)

            # Store extraction
            extraction_id = self.db.store_extraction(
                message_id=msg_id,
                shadow_message_id=shadow_msg_id,
                extraction_type=extraction_result.extraction_type,
                extracted_who=extraction_result.who,
                extracted_what=extraction_result.what,
                extracted_when=extraction_result.when,
                extracted_where=extraction_result.where,
                confidence=extraction_result.confidence,
                needs_confirmation=extraction_result.needs_confirmation,
            )

            logging.info("[ShadowBot] Extraction stored id=%s type=%s confidence=%.2f",
                         extraction_id, extraction_result.extraction_type, extraction_result.confidence)

            # VERBOSE DM: LLM extraction result
            if self.verbose_admin_dm:
                await self._send_verbose_dm_extraction(extraction_result=extraction_result)

            # Stage 3b: Calendar validation (Enriched Shadow Mode)
            # Skip calendar check for task_reminder items (shopping lists, etc.)
            if extraction_result.extraction_type == "task_reminder":
                _vd_calendar_status = "task_reminder"
                logging.info("[ShadowBot] Task reminder detected — skipping calendar check")
                self.db.update_calendar_check(
                    extraction_id=extraction_id,
                    status="task_reminder",
                    event_id=None,
                    event_title=None,
                    event_time=None,
                    fuzzy_score=0.0,
                )

                # Ingest task reminders to brain too (for shopping/errand retrieval)
                await self._ingest_to_brain(
                    extraction_id=extraction_id,
                    extraction_result=extraction_result,
                    message_text=text,
                    sender_name=sender_name,
                    sender_username=sender_username,
                    chat_title=chat_title,
                    msg_id=msg_id,
                    calendar_check_status="task_reminder",
                )
            elif extraction_result.confidence >= 0.7:
                try:
                    calendar_check = self.calendar_validator.check_event(
                        extraction_result.raw_extraction
                    )

                    # Update database with calendar check result
                    self.db.update_calendar_check(
                        extraction_id=extraction_id,
                        status=calendar_check.status.lower(),  # match, no_match, conflict
                        event_id=calendar_check.event_id,
                        event_title=calendar_check.event_title,
                        event_time=calendar_check.event_start,
                        fuzzy_score=calendar_check.fuzzy_score,
                    )

                    logging.info("[ShadowBot] Calendar check: %s (score=%.2f)",
                                 calendar_check.status, calendar_check.fuzzy_score)

                    # Ingest MATCH results to brain
                    if calendar_check.status == "MATCH":
                        _vd_calendar_status = "match"
                        _vd_calendar_title = calendar_check.event_title or ""
                        _vd_calendar_fuzzy = calendar_check.fuzzy_score
                        await self._ingest_to_brain(
                            extraction_id=extraction_id,
                            extraction_result=extraction_result,
                            message_text=text,
                            sender_name=sender_name,
                            sender_username=sender_username,
                            chat_title=chat_title,
                            msg_id=msg_id,
                            calendar_check_status="match",
                            calendar_event_id=calendar_check.event_id,
                            calendar_event_title=calendar_check.event_title,
                            fuzzy_match_score=calendar_check.fuzzy_score,
                        )

                    # Handle NO_MATCH and CONFLICT with Admin DM
                    if calendar_check.status == "NO_MATCH":
                        _vd_calendar_status = "no_match"
                        await self._send_admin_dm_no_match(
                            shadow_msg_id=shadow_msg_id,
                            extraction_id=extraction_id,
                            extracted_event=extraction_result.raw_extraction,
                        )

                        # Ingest to brain
                        await self._ingest_to_brain(
                            extraction_id=extraction_id,
                            extraction_result=extraction_result,
                            message_text=text,
                            sender_name=sender_name,
                            sender_username=sender_username,
                            chat_title=chat_title,
                            msg_id=msg_id,
                            calendar_check_status="no_match",
                        )
                    elif calendar_check.status == "CONFLICT":
                        _vd_calendar_status = "conflict"
                        _vd_calendar_title = calendar_check.event_title or ""
                        _vd_calendar_fuzzy = calendar_check.fuzzy_score
                        await self._send_admin_dm_conflict(
                            shadow_msg_id=shadow_msg_id,
                            extraction_id=extraction_id,
                            extracted_event=extraction_result.raw_extraction,
                            calendar_event=calendar_check.matched_event,
                            conflict_description=calendar_check.conflict_description,
                        )

                        # Ingest to brain
                        await self._ingest_to_brain(
                            extraction_id=extraction_id,
                            extraction_result=extraction_result,
                            message_text=text,
                            sender_name=sender_name,
                            sender_username=sender_username,
                            chat_title=chat_title,
                            msg_id=msg_id,
                            calendar_check_status="conflict",
                            calendar_event_id=calendar_check.event_id,
                            calendar_event_title=calendar_check.event_title,
                            fuzzy_match_score=calendar_check.fuzzy_score,
                        )

                except Exception as e:
                    logging.exception("[ShadowBot] Calendar validation failed: %s", e)
                    # Don't fail the whole pipeline if calendar check fails

        except Exception as e:
            logging.exception("[ShadowBot] Extraction failed: %s", e)
            extraction_result = None

    # Stage 4: Update message with extraction results
    processing_duration = int((time.time() - start_time) * 1000)

    self.db.update_extraction_result(
        shadow_message_id=shadow_msg_id,
        extraction=extraction_result.raw_extraction if extraction_result else {},
        confidence=extraction_result.confidence if extraction_result else 0.0,
        model=self.llm_model,
        entities=entities_detected,
        normalized_entities=normalized_entities,
        processing_duration_ms=processing_duration,
        error=None,
    )

    # VERBOSE DM: Final pipeline summary (normal coordination path)
    # This sends a DM for every processed message showing the full pipeline result
    if self.verbose_admin_dm and extraction_result:
        await self._send_verbose_dm_extraction(
            extraction_result=extraction_result,
            calendar_status=_vd_calendar_status,
            calendar_title=_vd_calendar_title,
            calendar_fuzzy=_vd_calendar_fuzzy,
        )

    return {
        "shadow_message_id": shadow_msg_id,
        "tripwire": tripwire,
        "extraction": extraction_result,
        "entities": normalized_entities,
    }

# -----------------------------------------------------------------------
# Tier 1: Tripwire Detection
# -----------------------------------------------------------------------

def _run_tripwire(self, text: str) -> TripwireResult:
    """Run regex/keyword tripwire on message text.

    Scoring Strategy (v2  2026-05-04 calibration):
    - Time signals: capped at max() to prevent Chronometer Spam
    - Base patterns: 0.10-0.25 per match
    - Combo bonuses: max 2 combos can fire (prevents inflation)
    - Family member presence: boosts when combined with action
    - Coordination keywords: 0.08 each (cap 0.24)
    - Fire threshold: 0.4 (catches coordination, ignores chat)
    """
    text_lower = text.lower()
    matches = []
    score = 0.0

    # Define time signals before use
    time_signals = ["time_specific", "time_military", "day_reference", "date_reference", "vague_time"]

    # Pattern matching - base scores (v2)
    pattern_scores = {
        "time_specific": 0.15,
        "time_military": 0.12,
        "day_reference": 0.15,
        "date_reference": 0.15,
        "vague_time": 0.08,
        "meal_meeting": 0.10,
        "can_you": 0.25,
        "need_someone": 0.25,
        "pickup_dropoff": 0.25,
        "coverage_offer": 0.20,
        "offer_help": 0.15,
        "event_recall": 0.25,
        "event_ack": 0.15,
        "swap_change": 0.20,
        "implicit_assign": 0.20,
        "family_possessive": 0.15,
        "suggestion_intent": 0.15,
        "location": 0.10,
        "school_practice": 0.10,
        "coordination_question": 0.10,
    }

    # Validate time_signals exist in pattern_scores
    assert all(m in pattern_scores for m in time_signals), f"Missing time signal in pattern_scores"

    for name, pattern in self._compiled_patterns.items():
        if pattern.search(text):
            matches.append(name)

    # === CAP TIME SIGNALS - prevent Chronometer Spam ===
    # Use max() to prevent date+day+time from stacking into false positives
    time_score = max(
        [pattern_scores.get(m, 0) for m in time_signals if m in matches],
        default=0
    )
    # Start score with capped time value
    score = time_score
    # Track which time signals matched for combo detection
    matched_time_signals = [m for m in matches if m in time_signals]
    # Remove time signals from matches list to prevent double-counting
    matches = [m for m in matches if m not in time_signals]

    # Continue with non-time pattern scores
    for name in matches:
        if name in pattern_scores:
            score += pattern_scores.get(name, 0.10)

    # === COMBO BONUSES (max 2 can fire, prevents inflation) ===
    combo_count = 0
    max_combos = 2

    # Define what counts as "coordination intent"
    has_time = len(matched_time_signals) > 0
    has_action = any(m in matches for m in [
        "can_you", "need_someone", "pickup_dropoff", "coverage_offer", "offer_help"
    ])
    has_implicit = "implicit_assign" in matches
    has_suggestion = "suggestion_intent" in matches
    has_family_possessive = "family_possessive" in matches
    has_coordination = has_action or has_implicit or has_suggestion

    # Combo 1: Time + Coordination Intent = Clear coordination
    if has_time and has_coordination and combo_count < max_combos:
        score += 0.20
        matches.append("combo:time+coord")
        combo_count += 1

    # Combo 2: Coordination Intent + Family Member = Direct personal coordination
    has_family = any(name.lower() in text_lower for name in FAMILY_MEMBER_ALIASES.keys())
    if has_coordination and has_family and combo_count < max_combos:
        score += 0.15
        matches.append("combo:coord+family")
        combo_count += 1

    # Combo 2b: Time + Possessive family = Family schedule reference
    # "Sully's haircut tomorrow" — not coordination intent, but family schedule relevant
    if has_time and has_family_possessive and combo_count < max_combos:
        score += 0.20
        matches.append("combo:time+family_possessive")
        combo_count += 1

    # Combo 3: Time + Meal/Meeting = Social coordination (requires question or action)
    if has_time and "meal_meeting" in matches and (has_coordination or "?" in text) and combo_count < max_combos:
        score += 0.15
        matches.append("combo:time+meal")
        combo_count += 1

    # Coordination keyword bonus (individual words, reduced weight)
    coord_matches = [kw for kw in COORDINATION_KEYWORDS if kw in text_lower]
    if coord_matches:
        score += min(len(coord_matches) * 0.08, 0.24)
        matches.extend([f"coord:{kw}" for kw in coord_matches[:3]])

    # Cap score at 1.0
    score = min(score, 1.0)

    # Confidence levels
    if score >= 0.7:
        confidence = "high"
    elif score >= 0.4:
        confidence = "medium"
    else:
        confidence = "low"

    # Fire threshold - catches combos, ignores chat
    fired = score >= 0.4

    return TripwireResult(
        fired=fired,
        score=round(score, 2),
        matches=matches,
        confidence=confidence,
    )

# -----------------------------------------------------------------------
# Tier 2: LLM Extraction
# -----------------------------------------------------------------------

async def _run_llm_extraction(
    self,
    text: str,
    tripwire: TripwireResult,
    chat_id: Optional[str] = None,
    message_id: Optional[str] = None,
) -> ExtractionResult:
    """Run LLM extraction to extract 4W data with Rolling Context Window.

    Extracts: Who, What, When, Where from coordination messages.
    Uses previous messages for coreference resolution.
    """
    # Build context block from recent messages
    context_messages = []
    if chat_id and self.db:
        # Fetch last 5 messages before current one (within 15 min window)
        context_messages = self.db.get_recent_messages(
            chat_id=chat_id,
            before_message_id=message_id,
            limit=5,
            within_minutes=15,
        )

    # Build extraction prompt
    family_members_str = ", ".join(
        name.title() for name in FAMILY_MEMBER_ALIASES.keys()
    )

    # Build context block string
    context_str = ""
    if context_messages:
        context_str = "RECENT CONVERSATION CONTEXT:\n"
        for msg in reversed(context_messages):  # Oldest first
            sender = msg.get("sender_name", "Unknown")
            msg_text = msg.get("message_text", "")
            if msg_text:
                context_str += f"{sender}: {msg_text}\n"
        context_str += "\n"

    prompt = f"""You are analyzing a family logistics message to extract coordination details.

FAMILY MEMBERS: {family_members_str}

{context_str}CURRENT MESSAGE:
{text}

Extract the following as JSON:
{{
"coordination_type": "coordination" | "schedule" | "reminder" | "task_reminder" | "info",
"who": ["list of people mentioned or implied"],
"what": "the EXACT event/activity name from the message",
"when": "when is this happening (be specific: date and/or time, or empty if not stated)",
"where": "where is this happening",
"confidence": 0.0-1.0,
"needs_confirmation": true | false,
"reasoning": "brief explanation"
}}

CRITICAL CLASSIFICATION RULES:
- "task_reminder": Use for shopping lists, grocery items, "remember to...", "don't forget...", household tasks without specific dates. These do NOT go on the calendar.
- "reminder": Use for time-specific reminders ("remind me tomorrow at 3pm about...").
- "schedule": Use for events with specific dates/times (appointments, practices, meetings).
- "coordination": Use for coverage requests, swaps, or who-does-what logistics.
- Extract the FINAL, AGREED-UPON state from the conversation context
- If the current message corrects a previous statement (e.g., "Actually, it's..."), use the CORRECTED information
- what: Preserve original phrasing. Do NOT add words like "check", "meeting", etc.
- when: Use the most specific date/time mentioned across the conversation
- where: Use the most specific location mentioned across the conversation
- Ignore retracted or corrected statements
- confidence: higher if all fields are clearly stated, lower if inference needed

Respond with ONLY the JSON object."""

    try:
        async with httpx.AsyncClient(timeout=30.0) as client:
            resp = await client.post(
                self.llm_url,
                json={
                    "model": self.llm_model,
                    "messages": [{"role": "user", "content": prompt}],
                    "temperature": 0.1,
                    "stream": False,
                },
            )
            resp.raise_for_status()
            data = resp.json()

            # Parse response
            content = data.get("choices", [{}])[0].get("message", {}).get("content", "")

            # Extract JSON from response (handle markdown code blocks)
            json_match = re.search(r'\{[\s\S]*\}', content)
            if json_match:
                content = json_match.group(0)

            extraction = json.loads(content)

            return ExtractionResult(
                extraction_type=extraction.get("coordination_type", "info"),
                who=extraction.get("who", []),
                what=extraction.get("what", ""),
                when=extraction.get("when", ""),
                where=extraction.get("where", ""),
                confidence=extraction.get("confidence", 0.5),
                needs_confirmation=extraction.get("needs_confirmation", True),
                raw_extraction=extraction,
            )

    except Exception as e:
        logging.exception("[ShadowBot] LLM extraction failed: %s", e)
        # Return fallback extraction
        return ExtractionResult(
            extraction_type="unclear",
            who=[],
            what="",
            when="",
            where="",
            confidence=0.0,
            needs_confirmation=True,
            raw_extraction={"error": str(e)},
        )

# -----------------------------------------------------------------------
# Entity Detection
# -----------------------------------------------------------------------

def _detect_entities(
    self,
    text: str,
    extraction: ExtractionResult
) -> Tuple[List[str], List[str]]:
    """Detect and normalize family member entities.

    Returns: (raw_entities, normalized_entities)
    """
    text_lower = text.lower()
    raw_detected = []
    normalized = []

    # Check for each family member's aliases
    for canonical, aliases in FAMILY_MEMBER_ALIASES.items():
        for alias in aliases:
            # Use word boundary matching
            pattern = r'\b' + re.escape(alias) + r'\b'
            if re.search(pattern, text_lower):
                raw_detected.append(alias)
                if canonical not in normalized:
                    normalized.append(canonical)
                break

    # Also check extraction who field
    for who in extraction.who:
        who_lower = who.lower()
        for canonical, aliases in FAMILY_MEMBER_ALIASES.items():
            if who_lower in aliases or who_lower == canonical:
                if canonical not in normalized:
                    normalized.append(canonical)
                    raw_detected.append(who)
                break

    return raw_detected, normalized

# -----------------------------------------------------------------------
# Event Recall  Admin DM Exhaust Methods
# -----------------------------------------------------------------------

async def _send_admin_dm_event_recall(
    self,
    shadow_msg_id: int,
    extraction_id: int,
    extracted_event: Dict[str, Any],
):
    """Send DM to admin about event not found in calendar.

    Called when event recall validation finds NO_MATCH.
    """
    if not self.admin_chat_id:
        logging.warning("[ShadowBot] No admin_chat_id configured, skipping DM")
        return

    what = extracted_event.get("what", "Unknown")
    when = extracted_event.get("when", "Unknown")

    message = f"""🔍 Shadow Mode  Event Recall Not Found

Message: "{what}"
When: {when}

Calendar: ❌ No matching event found

The family is discussing an event that does not appear on the calendar.
Would you like me to add it?

⚠️ This is a simulation. Calendar not modified."""

    try:
        async with httpx.AsyncClient(timeout=30.0) as client:
            url = f"{self.base_url}/sendMessage"
            resp = await client.post(
                url,
                json={
                    "chat_id": self.admin_chat_id,
                    "text": message,
                    "parse_mode": "HTML",
                },
            )
            resp.raise_for_status()

            # Mark DM as sent in database
            self.db.update_admin_dm_sent(extraction_id, "event_recall")

            logging.info("[ShadowBot] Admin DM sent for event recall NO_MATCH extraction_id=%s", extraction_id)

    except Exception as e:
        logging.exception("[ShadowBot] Failed to send admin DM: %s", e)

# -----------------------------------------------------------------------
# Helper: Extract date from text for event recall bypass
# -----------------------------------------------------------------------

def _extract_date_from_text(self, text: str) -> str:
    """Extract date reference from text for event recall calendar check.

    Uses the same patterns as CalendarValidator._parse_date_range
    but returns a simple string for fuzzy matching.
    """
    text_lower = text.lower()
    now = datetime.now()

    # Handle relative dates
    if "today" in text_lower:
        return "today"
    elif "tomorrow" in text_lower:
        return "tomorrow"

    # Try to parse day of week
    days = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
    for day_name in days:
        if day_name in text_lower:
            return day_name

    # Try to parse "May 3" or "may 3"
    month_pattern = r"(january|february|march|april|may|june|july|august|september|october|november|december)\s+(\d{1,2})"
    match = re.search(month_pattern, text_lower)
    if match:
        return f"{match.group(1)} {match.group(2)}"

    # Try to parse "5/3" or "05/03"
    date_pattern = r"(\d{1,2})[/-](\d{1,2})"
    match = re.search(date_pattern, text_lower)
    if match:
        return f"{match.group(1)}/{match.group(2)}"

    # Try to parse military time as hint for "today"
    military_pattern = r"\b(0\d{3}|1\d{3}|2[0-3]\d{2})\b"
    if re.search(military_pattern, text_lower):
        return "today"

    # Try to parse AM/PM time as hint for "today"
    time_pattern = r"\b(\d{1,2}:\d{2}\s*(AM|PM|am|pm)?|\d{1,2}\s*(AM|PM|am|pm))\b"
    if re.search(time_pattern, text_lower):
        return "today"

    return ""

# -----------------------------------------------------------------------
# Admin DM Exhaust Methods (Enriched Shadow Mode)
# -----------------------------------------------------------------------

async def _send_admin_dm_no_match(
    self,
    shadow_msg_id: int,
    extraction_id: int,
    extracted_event: Dict[str, Any],
):
    """Send DM to admin about proposed calendar action (NO_MATCH).

    Called when calendar validation finds no matching event.
    """
    if not self.admin_chat_id:
        logging.warning("[ShadowBot] No admin_chat_id configured, skipping DM")
        return

    what = extracted_event.get("what", "Unknown")
    when = extracted_event.get("when", "Unknown")
    who = extracted_event.get("who", [])
    who_str = ", ".join(who) if who else "Unknown"

    message = f"""🔍 Shadow Mode  Proposed Calendar Action

Message ID: {shadow_msg_id}
Extracted: {what}
When: {when}
Who: {who_str}

Calendar: ❌ No matching event found

I would have added:
📅 "{what}" — {when}

Accurate? [Yes] [No] [Edit]

⚠️ This is a simulation in shadow mode. Calendar not modified.
Reply with /create_event {extraction_id} to create the event on the real calendar."""

    try:
        async with httpx.AsyncClient(timeout=30.0) as client:
            url = f"{self.base_url}/sendMessage"
            resp = await client.post(
                url,
                json={
                    "chat_id": self.admin_chat_id,
                    "text": message,
                    "parse_mode": "HTML",
                },
            )
            resp.raise_for_status()

            # Also send a compact confirmation message with inline buttons
            # (even in shadow mode, we can send buttons for admin interaction)
            button_url = f"{self.base_url}/sendMessage"
            await client.post(
                button_url,
                json={
                    "chat_id": self.admin_chat_id,
                    "text": f'📅 Create calendar event for "{what}"?',
                    "parse_mode": "HTML",
                    "reply_markup": {
                        "inline_keyboard": [
                            [
                                {"text": "✅ Create Event", "callback_data": f"create_event:{extraction_id}"},
                                {"text": "❌ Skip", "callback_data": f"skip_event:{extraction_id}"},
                            ]
                        ]
                    },
                },
            )

    except Exception as e:
        logging.exception("[ShadowBot] Failed to send admin DM: %s", e)

async def _send_admin_dm_conflict(
    self,
    shadow_msg_id: int,
    extraction_id: int,
    extracted_event: Dict[str, Any],
    calendar_event: Dict[str, Any],
    conflict_description: str,
):
    """Send DM to admin about calendar conflict (CONFLICT).

    Called when calendar validation finds a conflicting event.
    """
    if not self.admin_chat_id:
        logging.warning("[ShadowBot] No admin_chat_id configured, skipping DM")
        return

    what = extracted_event.get("what", "Unknown")
    when = extracted_event.get("when", "Unknown")

    cal_summary = calendar_event.get("summary", "Unknown")
    cal_start = calendar_event.get("start", {})
    cal_time = cal_start.get("dateTime", cal_start.get("date", "Unknown"))

    message = f"""🔍 Shadow Mode  Calendar Conflict Detected

Message ID: {shadow_msg_id}
Extracted: {what} at {when}
Calendar: 📅 "{cal_summary}" at {cal_time}

Conflict: {conflict_description}

Which is correct?
[📅 Keep Calendar]
[💬 Use Message]
[❓ I'm Not Sure]

⚠️ This is a simulation. Calendar not modified."""

    try:
        async with httpx.AsyncClient(timeout=30.0) as client:
            url = f"{self.base_url}/sendMessage"
            resp = await client.post(
                url,
                json={
                    "chat_id": self.admin_chat_id,
                    "text": message,
                    "parse_mode": "HTML",
                },
            )
            resp.raise_for_status()

            # Mark DM as sent in database
            self.db.update_admin_dm_sent(extraction_id, "conflict")

            logging.info("[ShadowBot] Admin DM sent for CONFLICT extraction_id=%s", extraction_id)

    except Exception as e:
        logging.exception("[ShadowBot] Failed to send admin DM: %s", e)

# -----------------------------------------------------------------------
# Brain Ingest  POST extraction to staging API
# -----------------------------------------------------------------------

async def _ingest_to_brain(
    self,
    extraction_id: int,
    extraction_result: "ExtractionResult",
    message_text: str,
    sender_name: str,
    sender_username: str,
    chat_title: str,
    msg_id: str,
    calendar_check_status: str = "pending",
    calendar_event_id: str | None = None,
    calendar_event_title: str | None = None,
    fuzzy_match_score: float | None = None,
) -> dict | None:
    """POST extraction to staging API /brain/ingest.

    Sends the processed extraction + calendar check result to
    the Family Brain (ChromaDB) via the staging API.
    """
    payload = {
        "extraction_id": extraction_id,
        "message_id": str(msg_id),
        "sender_name": sender_name,
        "sender_username": sender_username,
        "chat_title": chat_title,
        "message_text": message_text,
        "extraction_type": extraction_result.extraction_type,
        "extracted_who": extraction_result.who,
        "extracted_what": extraction_result.what,
        "extracted_when": extraction_result.when,
        "extracted_where": extraction_result.where,
        "confidence": extraction_result.confidence,
        "needs_confirmation": extraction_result.needs_confirmation,
        "calendar_check_status": calendar_check_status,
        "calendar_event_id": calendar_event_id,
        "calendar_event_title": calendar_event_title,
        "fuzzy_match_score": fuzzy_match_score,
    }

    try:
        async with httpx.AsyncClient(timeout=10.0) as client:
            resp = await client.post(
                "http://127.0.0.1:8001/brain/ingest",
                json=payload,
            )

            if resp.status_code == 200:
                result = resp.json()
                logging.info(
                    "[ShadowBot] Brain ingest OK: extraction_id=%s chunks=%s time=%sms",
                    extraction_id, result.get("chunks"), result.get("ingest_time_ms")
                )
                return result
            else:
                logging.warning(
                    "[ShadowBot] Brain ingest failed (HTTP %s): %s",
                    resp.status_code, resp.text[:500]
                )
                return None

    except httpx.ConnectError:
        logging.warning(
            "[ShadowBot] Brain ingest skipped — staging API not available "
            "(extraction_id=%s)", extraction_id
        )
        return None
    except Exception as e:
        logging.exception("[ShadowBot] Brain ingest error: %s", e)
        return None

# -----------------------------------------------------------------------
# Verbose Admin DM Methods (Staging Validation)
# -----------------------------------------------------------------------

async def _send_verbose_dm(self, text: str):
    """Send a verbose DM to admin chat. Only used in VERBOSE_ADMIN_DM mode.

    These DMs go to admin_chat_id (your private DM), NEVER to the Family
    Logistics group. This is the firehose — every pipeline event gets a DM.
    """
    if not self.admin_chat_id or not self.verbose_admin_dm:
        return

    try:
        async with httpx.AsyncClient(timeout=10.0) as client:
            url = f"{self.base_url}/sendMessage"
            resp = await client.post(
                url,
                json={
                    "chat_id": self.admin_chat_id,
                    "text": text,
                    "parse_mode": "HTML",
                },
            )
            resp.raise_for_status()
    except Exception as e:
        logging.warning("[ShadowBot] Verbose DM failed: %s", e)

async def _send_verbose_dm_tripwire(
    self,
    sender_name: str,
    text: str,
    fired: bool,
    score: float,
    matches: list,
    confidence: str,
):
    """Send tripwire result DM."""
    emoji = "🔴" if fired else "⚪"
    matches_str = ", ".join(matches) if matches else "none"
    msg = (
        f"{emoji} <b>Tripwire</b> | {sender_name}\n"
        f"\n"
        f"\"{text}\"\n"
        f"\n"
        f"Score: {score:.2f} | {confidence}\n"
        f"Matches: {matches_str}\n"
        f"Fired: {'YES → Tier 2' if fired else 'NO (dropped)'}"
    )
    await self._send_verbose_dm(msg)

async def _send_verbose_dm_extraction(
    self,
    extraction_result: Optional[ExtractionResult],
    calendar_status: str = "",
    calendar_title: str = "",
    calendar_fuzzy: float = 0.0,
    brain_ingested: bool = False,
    brain_chunks: int = 0,
):
    """Send extraction + calendar + brain result DM."""
    if not extraction_result:
        await self._send_verbose_dm("⚪ <b>Extraction</b> | No LLM call (tripwire not fired)")
        return

    who_str = ", ".join(extraction_result.who) if extraction_result.who else "—"
    cal_emoji = {"match": "✅", "no_match": "❌", "conflict": "⚠️", "task_reminder": "📋", "pending": "⏳"}.get(calendar_status, "❓")
    brain_str = f"🧠 {brain_chunks} chunks" if brain_ingested else "🧠 skipped"

    msg = (
        f"🔵 <b>Extraction</b> | {extraction_result.extraction_type}\n"
        f"\n"
        f"What: {extraction_result.what}\n"
        f"When: {extraction_result.when or '—'}\n"
        f"Who: {who_str}\n"
        f"Where: {extraction_result.where or '—'}\n"
        f"\n"
        f"Confidence: {extraction_result.confidence:.2f}\n"
        f"Confirm needed: {'Yes' if extraction_result.needs_confirmation else 'No'}\n"
        f"\n"
        f"{cal_emoji} Calendar: {calendar_status or 'not checked'}"
    )
    if calendar_title:
        msg += f" → \"{calendar_title}\" (score={calendar_fuzzy:.2f})"
    msg += f"\n{brain_str}"

    await self._send_verbose_dm(msg)

# -----------------------------------------------------------------------
# Safety: Speak Prevention
# -----------------------------------------------------------------------

async def _send_message(self, *args, **kwargs):
    """SAFETY: Prevent any message sending in shadow mode.

    This method should never be called. If called, it logs an error
    and returns a failure response.
    """
    logging.error("[ShadowBot] ATTEMPTED TO SEND MESSAGE IN SHADOW MODE - BLOCKED")
    logging.error("[ShadowBot] Call args: %s kwargs: %s", args, kwargs)
    return {"ok": False, "error": "SPEAK_DISABLED_IN_SHADOW_MODE"}

async def _answer_callback(self, *args, **kwargs):
    """SAFETY: Prevent any callback answering."""
    logging.error("[ShadowBot] ATTEMPTED TO ANSWER CALLBACK IN SHADOW MODE - BLOCKED")
    return {"ok": False, "error": "SPEAK_DISABLED_IN_SHADOW_MODE"}