"""Shadow Mode Bot - Silent Telegram observer with SPEAK_HARD_DISABLED.
This bot observes messages from the Family Logistics group,
runs tripwire detection, performs LLM extraction, and logs everything
to the shadow database. It NEVER sends messages.
Hard constraints:
- SPEAK_ENABLED = False (compile-time constant, cannot be overridden)
- No reply to any message
- No confirmation buttons
- No Event Graph writes
- Isolated shadow database only
"""
import os
import re
import json
import time
import logging
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
import httpx
from .shadow_database import ShadowDatabase
from .calendar_validator import CalendarValidator, CalendarCheckResult
# SPEAK IS HARD-DISABLED - This is a compile-time safety constant.
# The bot will NEVER send messages. To re-enable speaking, code changes
# are required (removing this assertion would cause runtime errors).
SPEAK_ENABLED = False
# VERBOSE ADMIN DM - When True, sends a DM to admin for EVERY processing
# event (tripwire, extraction, calendar check, brain ingest).
# This is for staging validation only — never enable in production.
VERBOSE_ADMIN_DM = os.environ.get("VERBOSE_ADMIN_DM", "false").lower() == "true"
# Tripwire patterns for coordination signal detection
# Tier 1: Fast regex/keyword matching (runs on every message)
#
# Design principles (v2 — 2026-05-04 calibration):
# 1. Time signals are capped (max of all time scores, not additive)
# 2. Combos are limited to max 2 to prevent score inflation
# 3. Noisy words removed from broad patterns ("grab", "get", "change")
# 4. Implicit assignment patterns added ("X is going with you")
# 5. Suggestion intent patterns added ("we should", "let's")
# 6. Event recall is for referencing existing events, not task reminders
TRIPWIRE_PATTERNS = {
# Time/schedule indicators
"time_specific": r"\b(\d{1,2}:\d{2}\s*(AM|PM|am|pm)?|\d{1,2}\s*(AM|PM|am|pm))\b|\b(morning|afternoon|evening|tonight)\b",
"time_military": r"\b(0\d{3}|1\d{3}|2[0-3]\d{2})\b",
"day_reference": r"\b(today|tomorrow|tonight|yesterday|monday|tuesday|wednesday|thursday|friday|saturday|sunday)\b",
"vague_time": r"\b(this week|next week|this weekend|next weekend)\b",
"date_reference": r"\b(\d{1,2}/\d{1,2}|\d{1,2}-\d{1,2}|\b(january|february|march|april|may|june|july|august|september|october|november|december)\s+\d{1,2})\b",
# Meal/meeting/social indicators
"meal_meeting": r"\b(breakfast|lunch|brunch|dinner|drinks|coffee|meet|meeting|hang out|get together)\b",
# Direct coordination intent (explicit requests)
"can_you": r"\b(can you|could you|would you|will you|are you able to)\b",
"need_someone": r"\b(need someone|need a|looking for|anyone available|who can)\b",
"pickup_dropoff": r"\b(pick up|pickup|drop off|dropoff)\b",
"coverage_offer": r"\b(i can (take|grab|do)|i(?:'ll| will) (take|grab|do)|let me (take|grab|do)|i could take)\b",
"swap_change": r"\b(swap|switch|reschedule)\b",
# Implicit assignment — someone is going/assigned somewhere
"implicit_assign": r"\b(is going (with|to)|are going (with|to)|going with (you|me|us)|has to (go|be|get)|needs to (go|be|get))\b",
# Possessive family mention — "Sully's haircut", "Harper's dance"
"family_possessive": r"\b(Sully's|Sullivan's|Harper's|Maggie's|Matt's|Aundrea's)\b",
# Suggestion/planning intent — "we should", "let's", "how about"
"suggestion_intent": r"\b(we should|let's|how about|i was thinking|i'd like to|we could|we need to|we have to)\b",
# Location indicators (proper nouns + known venues)
"location": r"\b(at|to)\s+(?:the\s+)?([A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,2})|\b(school|practice|game|dentist|doctor|hospital|vet)\b",
"school_practice": r"\b(school|practice|game|match|dentist|doctor|appointment|OT|therapy)\b",
# Offer/help — narrowed to avoid catching casual chat
"offer_help": r"\b(do you want me to|should i|i'll take|i'll grab|i'll get|let me know|i could take)\b",
# Event recall / confirmation patterns — triggers CalendarValidator directly
# NOTE: "remember to" is a task reminder, NOT an event recall.
# Only "remember we have" (referencing a known event) is event recall.
"event_recall": r"\b(remember we have|don't forget (we|that)|are we still on for|looking forward to)\b",
"event_ack": r"\b(yes, i have|i'll be there|we're going to|see you at)\b",
}
# Coordination keywords for combo scoring (narrowed to reduce false positives)
COORDINATION_KEYWORDS = [
"can you", "could you", "would you", "will you",
"need someone", "need a", "looking for", "anyone available",
"pick up", "drop off",
"cover", "watch", "stay with", "babysit", "fill in",
"swap", "switch", "reschedule",
"who can",
"breakfast", "lunch", "dinner", "drinks", "coffee", "meet",
]
# Family member patterns (will be loaded from config)
FAMILY_MEMBER_ALIASES = {
"matt": ["matt", "matthew", "hoffmann"],
"aundrea": ["aundrea", "mom"],
"sullivan": ["sully", "sullivan"],
"harper": ["harper"],
"maggie": ["maggie", "dog"],
}
# Family member patterns (will be loaded from config)
FAMILY_MEMBER_ALIASES = {
"matt": ["matt", "matthew", "hoffmann"],
"aundrea": ["aundrea", "mom"],
"sullivan": ["sully", "sullivan"],
"harper": ["harper"],
"maggie": ["maggie", "dog"],
}
# Coordination signal scoring
COORDINATION_KEYWORDS = [
"can you", "could you", "would you", "will you",
"need someone", "need a", "looking for", "anyone available",
"pick up", "drop off", "grab", "get",
"cover", "watch", "stay with", "babysit", "fill in",
"swap", "switch", "change", "move",
"who can", "are you", "will you be",
"breakfast", "lunch", "dinner", "coffee", "meet", "hang out",
]
@dataclass
class TripwireResult:
"""Result of Tier 1 tripwire matching."""
fired: bool
score: float
matches: List[str]
confidence: str # "high", "medium", "low"
@dataclass
class ExtractionResult:
"""Result of Tier 2 LLM extraction."""
extraction_type: str
who: List[str]
what: str
when: str
where: str
confidence: float
needs_confirmation: bool
raw_extraction: Dict[str, Any]
class ShadowBot:
"""Silent observer bot for shadow mode.
NEVER SPEAKS. Only observes, extracts, and logs.
"""
def __init__(
self,
db: ShadowDatabase,
bot_token: str,
family_context: Optional[Dict[str, Any]] = None,
llm_url: Optional[str] = None,
llm_model: Optional[str] = None,
admin_chat_id: Optional[str] = None,
telegram_api_url: str = "https://api.telegram.org/bot",
verbose_admin_dm: bool = False,
):
self.db = db
self.bot_token = bot_token
self.base_url = f"{telegram_api_url}{bot_token}"
self.family_context = family_context or {}
self.llm_url = llm_url
self.llm_model = llm_model or "qwen2.5-coder:7b"
self.admin_chat_id = admin_chat_id # For sending Admin DMs
self.verbose_admin_dm = verbose_admin_dm # Firehose mode for staging validation
# Initialize calendar validator
self.calendar_validator = CalendarValidator()
# Compile tripwire patterns
self._compiled_patterns = {
name: re.compile(pattern, re.IGNORECASE)
for name, pattern in TRIPWIRE_PATTERNS.items()
}
# Load family member aliases from context
self._load_family_aliases()
logging.info("[ShadowBot] Initialized - SPEAK_ENABLED=%s VERBOSE_ADMIN_DM=%s",
SPEAK_ENABLED, self.verbose_admin_dm)
def _load_family_aliases(self):
"""Load family member aliases from family context."""
members = self.family_context.get("family", {}).get("members", [])
for member in members:
name = member.get("name", "").lower()
if name:
aliases = [name]
for nick in member.get("nicknames", []):
aliases.append(nick.lower())
# Also add role-based aliases
role = member.get("role", "").lower()
if role in ["dad", "mom", "son", "daughter"]:
aliases.append(role)
FAMILY_MEMBER_ALIASES[name] = aliases
# -----------------------------------------------------------------------
# Core Message Processing
# -----------------------------------------------------------------------
async def process_message(self, update: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Process a Telegram message update.
Returns processing result dict or None if not processed.
"""
message = update.get("message", {})
if not message:
return None
# Skip non-text messages for now (photos, documents handled separately if needed)
if "text" not in message:
logging.debug("[ShadowBot] Skipping non-text message")
return None
# Extract message metadata
msg_id = str(message.get("message_id", ""))
chat = message.get("chat", {})
chat_id = str(chat.get("id", ""))
chat_title = chat.get("title", "")
sender = message.get("from", {})
sender_id = str(sender.get("id", ""))
sender_name = sender.get("first_name", "") + " " + sender.get("last_name", "")
sender_name = sender_name.strip()
sender_username = sender.get("username", "")
text = message.get("text", "")
timestamp = datetime.fromtimestamp(
message.get("date", 0), tz=timezone.utc
)
logging.info("[ShadowBot] Processing message %s from %s", msg_id, sender_name or sender_username)
# Stage 1: Store raw message
shadow_msg_id = self.db.store_message(
message_id=msg_id,
chat_id=chat_id,
sender_id=sender_id,
message_text=text,
sender_name=sender_name,
sender_username=sender_username,
chat_title=chat_title,
timestamp=timestamp,
)
if shadow_msg_id < 0:
logging.warning("[ShadowBot] Failed to store message %s", msg_id)
return None
# Stage 2: Run tripwire detection
start_time = time.time()
tripwire = self._run_tripwire(text)
# Track pipeline results for verbose DM firehose
_vd_calendar_status = ""
_vd_calendar_title = ""
_vd_calendar_fuzzy = 0.0
self.db.update_tripwire_result(
shadow_message_id=shadow_msg_id,
fired=tripwire.fired,
score=tripwire.score,
matches=tripwire.matches,
)
logging.info("[ShadowBot] Tripwire fired=%s score=%.2f confidence=%s matches=%s",
tripwire.fired, tripwire.score, tripwire.confidence, tripwire.matches)
# VERBOSE DM: Tripwire result (every message, even if tripwire didn't fire)
if self.verbose_admin_dm:
await self._send_verbose_dm_tripwire(
sender_name=sender_name or sender_username,
text=text[:200], # Truncate long messages
fired=tripwire.fired,
score=tripwire.score,
matches=tripwire.matches,
confidence=tripwire.confidence,
)
# === EVENT RECALL DETECTION (Tier 1 Expansion) ===
# Check if this is an event recall/confirmation (not coordination)
has_event_recall = any(m in tripwire.matches for m in ["event_recall", "event_ack"])
has_coordination = any(m in tripwire.matches for m in ["can_you", "need_someone", "pickup_dropoff", "coverage_offer", "offer_help", "swap_change"])
if tripwire.fired and has_event_recall and not has_coordination:
# Event recall detected — bypass LLM, route to CalendarValidator directly
logging.info("[ShadowBot] Event recall detected — bypassing LLM extraction, routing to CalendarValidator")
# Build simple extracted event from message text
extracted_event = {
"what": text, # Use full text as what (CalendarValidator will fuzzy match)
"when": self._extract_date_from_text(text),
"where": "",
}
# Run calendar validation directly
try:
calendar_check = self.calendar_validator.check_event(extracted_event)
if calendar_check.status == "MATCH":
# Event acknowledged — log silently
logging.info("[ShadowBot] Event recall MATCH — '%s' found in calendar (score=%.2f)",
calendar_check.event_title, calendar_check.fuzzy_score)
# Store minimal extraction record
extraction_id = self.db.store_extraction(
message_id=msg_id,
shadow_message_id=shadow_msg_id,
extraction_type="event_ack",
extracted_who=[],
extracted_what=text,
extracted_when=extracted_event["when"],
extracted_where="",
confidence=0.5,
needs_confirmation=False,
)
self.db.update_calendar_check(
extraction_id=extraction_id,
status="match",
event_id=calendar_check.event_id,
event_title=calendar_check.event_title,
event_time=calendar_check.event_start,
fuzzy_score=calendar_check.fuzzy_score,
)
# Ingest to brain
await self._ingest_to_brain(
extraction_id=extraction_id,
extraction_result=ExtractionResult(
extraction_type="event_recall",
who=[],
what=text,
when=extracted_event["when"],
where="",
raw_extraction=extracted_event,
confidence=0.5,
needs_confirmation=False,
),
message_text=text,
sender_name=sender_name,
sender_username=sender_username,
chat_title=chat_title,
msg_id=msg_id,
calendar_check_status="match",
calendar_event_id=calendar_check.event_id,
calendar_event_title=calendar_check.event_title,
fuzzy_match_score=calendar_check.fuzzy_score,
)
elif calendar_check.status == "NO_MATCH":
# Event not in calendar — alert admin
logging.info("[ShadowBot] Event recall NO_MATCH — '%s' not found in calendar", text)
extraction_id = self.db.store_extraction(
message_id=msg_id,
shadow_message_id=shadow_msg_id,
extraction_type="event_recall",
extracted_who=[],
extracted_what=text,
extracted_when=extracted_event["when"],
extracted_where="",
confidence=0.5,
needs_confirmation=True,
)
self.db.update_calendar_check(
extraction_id=extraction_id,
status="no_match",
event_id=None,
event_title=None,
event_time=None,
fuzzy_score=0.0,
)
# Send admin DM about missing event
await self._send_admin_dm_event_recall(
shadow_msg_id=shadow_msg_id,
extraction_id=extraction_id,
extracted_event=extracted_event,
)
# Ingest to brain
await self._ingest_to_brain(
extraction_id=extraction_id,
extraction_result=ExtractionResult(
extraction_type="event_recall",
who=[],
what=text,
when=extracted_event["when"],
where="",
raw_extraction=extracted_event,
confidence=0.5,
needs_confirmation=True,
),
message_text=text,
sender_name=sender_name,
sender_username=sender_username,
chat_title=chat_title,
msg_id=msg_id,
calendar_check_status="no_match",
)
# CONFLICT status handled same as MATCH for event recalls
elif calendar_check.status == "CONFLICT":
logging.info("[ShadowBot] Event recall CONFLICT — logging for review")
extraction_id = self.db.store_extraction(
message_id=msg_id,
shadow_message_id=shadow_msg_id,
extraction_type="event_recall",
extracted_who=[],
extracted_what=text,
extracted_when=extracted_event["when"],
extracted_where="",
confidence=0.5,
needs_confirmation=True,
)
self.db.update_calendar_check(
extraction_id=extraction_id,
status="conflict",
event_id=calendar_check.event_id,
event_title=calendar_check.event_title,
event_time=calendar_check.event_start,
fuzzy_score=calendar_check.fuzzy_score,
)
# Ingest to brain
await self._ingest_to_brain(
extraction_id=extraction_id,
extraction_result=ExtractionResult(
extraction_type="event_recall",
who=[],
what=text,
when=extracted_event["when"],
where="",
raw_extraction=extracted_event,
confidence=0.5,
needs_confirmation=True,
),
message_text=text,
sender_name=sender_name,
sender_username=sender_username,
chat_title=chat_title,
msg_id=msg_id,
calendar_check_status="conflict",
calendar_event_id=calendar_check.event_id,
calendar_event_title=calendar_check.event_title,
fuzzy_match_score=calendar_check.fuzzy_score,
)
except Exception as e:
logging.exception("[ShadowBot] Calendar validation failed for event recall: %s", e)
# Skip normal LLM extraction pipeline
if self.verbose_admin_dm:
await self._send_verbose_dm_extraction(
extraction_result=ExtractionResult(
extraction_type="event_recall",
who=[],
what=text,
when=extracted_event.get("when", ""),
where="",
raw_extraction=extracted_event,
confidence=0.5,
needs_confirmation=calendar_check.status != "MATCH" if 'calendar_check' in dir() else True,
),
calendar_status=_vd_calendar_status,
calendar_title=_vd_calendar_title,
calendar_fuzzy=_vd_calendar_fuzzy,
)
return {
"shadow_message_id": shadow_msg_id,
"tripwire": tripwire,
"extraction": None,
"entities": [],
}
# Stage 3: If tripwire fires (normal coordination), run LLM extraction
extraction_result = None
entities_detected = []
normalized_entities = []
if tripwire.fired and self.llm_url:
try:
extraction_result = await self._run_llm_extraction(
text, tripwire,
chat_id=chat_id,
message_id=msg_id
)
# Detect entities in the extraction
entities_detected, normalized_entities = self._detect_entities(text, extraction_result)
# Store extraction
extraction_id = self.db.store_extraction(
message_id=msg_id,
shadow_message_id=shadow_msg_id,
extraction_type=extraction_result.extraction_type,
extracted_who=extraction_result.who,
extracted_what=extraction_result.what,
extracted_when=extraction_result.when,
extracted_where=extraction_result.where,
confidence=extraction_result.confidence,
needs_confirmation=extraction_result.needs_confirmation,
)
logging.info("[ShadowBot] Extraction stored id=%s type=%s confidence=%.2f",
extraction_id, extraction_result.extraction_type, extraction_result.confidence)
# VERBOSE DM: LLM extraction result
if self.verbose_admin_dm:
await self._send_verbose_dm_extraction(extraction_result=extraction_result)
# Stage 3b: Calendar validation (Enriched Shadow Mode)
# Skip calendar check for task_reminder items (shopping lists, etc.)
if extraction_result.extraction_type == "task_reminder":
_vd_calendar_status = "task_reminder"
logging.info("[ShadowBot] Task reminder detected — skipping calendar check")
self.db.update_calendar_check(
extraction_id=extraction_id,
status="task_reminder",
event_id=None,
event_title=None,
event_time=None,
fuzzy_score=0.0,
)
# Ingest task reminders to brain too (for shopping/errand retrieval)
await self._ingest_to_brain(
extraction_id=extraction_id,
extraction_result=extraction_result,
message_text=text,
sender_name=sender_name,
sender_username=sender_username,
chat_title=chat_title,
msg_id=msg_id,
calendar_check_status="task_reminder",
)
elif extraction_result.confidence >= 0.7:
try:
calendar_check = self.calendar_validator.check_event(
extraction_result.raw_extraction
)
# Update database with calendar check result
self.db.update_calendar_check(
extraction_id=extraction_id,
status=calendar_check.status.lower(), # match, no_match, conflict
event_id=calendar_check.event_id,
event_title=calendar_check.event_title,
event_time=calendar_check.event_start,
fuzzy_score=calendar_check.fuzzy_score,
)
logging.info("[ShadowBot] Calendar check: %s (score=%.2f)",
calendar_check.status, calendar_check.fuzzy_score)
# Ingest MATCH results to brain
if calendar_check.status == "MATCH":
_vd_calendar_status = "match"
_vd_calendar_title = calendar_check.event_title or ""
_vd_calendar_fuzzy = calendar_check.fuzzy_score
await self._ingest_to_brain(
extraction_id=extraction_id,
extraction_result=extraction_result,
message_text=text,
sender_name=sender_name,
sender_username=sender_username,
chat_title=chat_title,
msg_id=msg_id,
calendar_check_status="match",
calendar_event_id=calendar_check.event_id,
calendar_event_title=calendar_check.event_title,
fuzzy_match_score=calendar_check.fuzzy_score,
)
# Handle NO_MATCH and CONFLICT with Admin DM
if calendar_check.status == "NO_MATCH":
_vd_calendar_status = "no_match"
await self._send_admin_dm_no_match(
shadow_msg_id=shadow_msg_id,
extraction_id=extraction_id,
extracted_event=extraction_result.raw_extraction,
)
# Ingest to brain
await self._ingest_to_brain(
extraction_id=extraction_id,
extraction_result=extraction_result,
message_text=text,
sender_name=sender_name,
sender_username=sender_username,
chat_title=chat_title,
msg_id=msg_id,
calendar_check_status="no_match",
)
elif calendar_check.status == "CONFLICT":
_vd_calendar_status = "conflict"
_vd_calendar_title = calendar_check.event_title or ""
_vd_calendar_fuzzy = calendar_check.fuzzy_score
await self._send_admin_dm_conflict(
shadow_msg_id=shadow_msg_id,
extraction_id=extraction_id,
extracted_event=extraction_result.raw_extraction,
calendar_event=calendar_check.matched_event,
conflict_description=calendar_check.conflict_description,
)
# Ingest to brain
await self._ingest_to_brain(
extraction_id=extraction_id,
extraction_result=extraction_result,
message_text=text,
sender_name=sender_name,
sender_username=sender_username,
chat_title=chat_title,
msg_id=msg_id,
calendar_check_status="conflict",
calendar_event_id=calendar_check.event_id,
calendar_event_title=calendar_check.event_title,
fuzzy_match_score=calendar_check.fuzzy_score,
)
except Exception as e:
logging.exception("[ShadowBot] Calendar validation failed: %s", e)
# Don't fail the whole pipeline if calendar check fails
except Exception as e:
logging.exception("[ShadowBot] Extraction failed: %s", e)
extraction_result = None
# Stage 4: Update message with extraction results
processing_duration = int((time.time() - start_time) * 1000)
self.db.update_extraction_result(
shadow_message_id=shadow_msg_id,
extraction=extraction_result.raw_extraction if extraction_result else {},
confidence=extraction_result.confidence if extraction_result else 0.0,
model=self.llm_model,
entities=entities_detected,
normalized_entities=normalized_entities,
processing_duration_ms=processing_duration,
error=None,
)
# VERBOSE DM: Final pipeline summary (normal coordination path)
# This sends a DM for every processed message showing the full pipeline result
if self.verbose_admin_dm and extraction_result:
await self._send_verbose_dm_extraction(
extraction_result=extraction_result,
calendar_status=_vd_calendar_status,
calendar_title=_vd_calendar_title,
calendar_fuzzy=_vd_calendar_fuzzy,
)
return {
"shadow_message_id": shadow_msg_id,
"tripwire": tripwire,
"extraction": extraction_result,
"entities": normalized_entities,
}
# -----------------------------------------------------------------------
# Tier 1: Tripwire Detection
# -----------------------------------------------------------------------
def _run_tripwire(self, text: str) -> TripwireResult:
"""Run regex/keyword tripwire on message text.
Scoring Strategy (v2 — 2026-05-04 calibration):
- Time signals: capped at max() to prevent Chronometer Spam
- Base patterns: 0.10-0.25 per match
- Combo bonuses: max 2 combos can fire (prevents inflation)
- Family member presence: boosts when combined with action
- Coordination keywords: 0.08 each (cap 0.24)
- Fire threshold: 0.4 (catches coordination, ignores chat)
"""
text_lower = text.lower()
matches = []
score = 0.0
# Define time signals before use
time_signals = ["time_specific", "time_military", "day_reference", "date_reference", "vague_time"]
# Pattern matching - base scores (v2)
pattern_scores = {
"time_specific": 0.15,
"time_military": 0.12,
"day_reference": 0.15,
"date_reference": 0.15,
"vague_time": 0.08,
"meal_meeting": 0.10,
"can_you": 0.25,
"need_someone": 0.25,
"pickup_dropoff": 0.25,
"coverage_offer": 0.20,
"offer_help": 0.15,
"event_recall": 0.25,
"event_ack": 0.15,
"swap_change": 0.20,
"implicit_assign": 0.20,
"family_possessive": 0.15,
"suggestion_intent": 0.15,
"location": 0.10,
"school_practice": 0.10,
"coordination_question": 0.10,
}
# Validate time_signals exist in pattern_scores
assert all(m in pattern_scores for m in time_signals), f"Missing time signal in pattern_scores"
for name, pattern in self._compiled_patterns.items():
if pattern.search(text):
matches.append(name)
# === CAP TIME SIGNALS - prevent Chronometer Spam ===
# Use max() to prevent date+day+time from stacking into false positives
time_score = max(
[pattern_scores.get(m, 0) for m in time_signals if m in matches],
default=0
)
# Start score with capped time value
score = time_score
# Track which time signals matched for combo detection
matched_time_signals = [m for m in matches if m in time_signals]
# Remove time signals from matches list to prevent double-counting
matches = [m for m in matches if m not in time_signals]
# Continue with non-time pattern scores
for name in matches:
if name in pattern_scores:
score += pattern_scores.get(name, 0.10)
# === COMBO BONUSES (max 2 can fire, prevents inflation) ===
combo_count = 0
max_combos = 2
# Define what counts as "coordination intent"
has_time = len(matched_time_signals) > 0
has_action = any(m in matches for m in [
"can_you", "need_someone", "pickup_dropoff", "coverage_offer", "offer_help"
])
has_implicit = "implicit_assign" in matches
has_suggestion = "suggestion_intent" in matches
has_family_possessive = "family_possessive" in matches
has_coordination = has_action or has_implicit or has_suggestion
# Combo 1: Time + Coordination Intent = Clear coordination
if has_time and has_coordination and combo_count < max_combos:
score += 0.20
matches.append("combo:time+coord")
combo_count += 1
# Combo 2: Coordination Intent + Family Member = Direct personal coordination
has_family = any(name.lower() in text_lower for name in FAMILY_MEMBER_ALIASES.keys())
if has_coordination and has_family and combo_count < max_combos:
score += 0.15
matches.append("combo:coord+family")
combo_count += 1
# Combo 2b: Time + Possessive family = Family schedule reference
# "Sully's haircut tomorrow" — not coordination intent, but family schedule relevant
if has_time and has_family_possessive and combo_count < max_combos:
score += 0.20
matches.append("combo:time+family_possessive")
combo_count += 1
# Combo 3: Time + Meal/Meeting = Social coordination (requires question or action)
if has_time and "meal_meeting" in matches and (has_coordination or "?" in text) and combo_count < max_combos:
score += 0.15
matches.append("combo:time+meal")
combo_count += 1
# Coordination keyword bonus (individual words, reduced weight)
coord_matches = [kw for kw in COORDINATION_KEYWORDS if kw in text_lower]
if coord_matches:
score += min(len(coord_matches) * 0.08, 0.24)
matches.extend([f"coord:{kw}" for kw in coord_matches[:3]])
# Cap score at 1.0
score = min(score, 1.0)
# Confidence levels
if score >= 0.7:
confidence = "high"
elif score >= 0.4:
confidence = "medium"
else:
confidence = "low"
# Fire threshold - catches combos, ignores chat
fired = score >= 0.4
return TripwireResult(
fired=fired,
score=round(score, 2),
matches=matches,
confidence=confidence,
)
# -----------------------------------------------------------------------
# Tier 2: LLM Extraction
# -----------------------------------------------------------------------
async def _run_llm_extraction(
self,
text: str,
tripwire: TripwireResult,
chat_id: Optional[str] = None,
message_id: Optional[str] = None,
) -> ExtractionResult:
"""Run LLM extraction to extract 4W data with Rolling Context Window.
Extracts: Who, What, When, Where from coordination messages.
Uses previous messages for coreference resolution.
"""
# Build context block from recent messages
context_messages = []
if chat_id and self.db:
# Fetch last 5 messages before current one (within 15 min window)
context_messages = self.db.get_recent_messages(
chat_id=chat_id,
before_message_id=message_id,
limit=5,
within_minutes=15,
)
# Build extraction prompt
family_members_str = ", ".join(
name.title() for name in FAMILY_MEMBER_ALIASES.keys()
)
# Build context block string
context_str = ""
if context_messages:
context_str = "RECENT CONVERSATION CONTEXT:\n"
for msg in reversed(context_messages): # Oldest first
sender = msg.get("sender_name", "Unknown")
msg_text = msg.get("message_text", "")
if msg_text:
context_str += f"{sender}: {msg_text}\n"
context_str += "\n"
prompt = f"""You are analyzing a family logistics message to extract coordination details.
FAMILY MEMBERS: {family_members_str}
{context_str}CURRENT MESSAGE:
{text}
Extract the following as JSON:
{{
"coordination_type": "coordination" | "schedule" | "reminder" | "task_reminder" | "info",
"who": ["list of people mentioned or implied"],
"what": "the EXACT event/activity name from the message",
"when": "when is this happening (be specific: date and/or time, or empty if not stated)",
"where": "where is this happening",
"confidence": 0.0-1.0,
"needs_confirmation": true | false,
"reasoning": "brief explanation"
}}
CRITICAL CLASSIFICATION RULES:
- "task_reminder": Use for shopping lists, grocery items, "remember to...", "don't forget...", household tasks without specific dates. These do NOT go on the calendar.
- "reminder": Use for time-specific reminders ("remind me tomorrow at 3pm about...").
- "schedule": Use for events with specific dates/times (appointments, practices, meetings).
- "coordination": Use for coverage requests, swaps, or who-does-what logistics.
- Extract the FINAL, AGREED-UPON state from the conversation context
- If the current message corrects a previous statement (e.g., "Actually, it's..."), use the CORRECTED information
- what: Preserve original phrasing. Do NOT add words like "check", "meeting", etc.
- when: Use the most specific date/time mentioned across the conversation
- where: Use the most specific location mentioned across the conversation
- Ignore retracted or corrected statements
- confidence: higher if all fields are clearly stated, lower if inference needed
Respond with ONLY the JSON object."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
self.llm_url,
json={
"model": self.llm_model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"stream": False,
},
)
resp.raise_for_status()
data = resp.json()
# Parse response
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
# Extract JSON from response (handle markdown code blocks)
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
content = json_match.group(0)
extraction = json.loads(content)
return ExtractionResult(
extraction_type=extraction.get("coordination_type", "info"),
who=extraction.get("who", []),
what=extraction.get("what", ""),
when=extraction.get("when", ""),
where=extraction.get("where", ""),
confidence=extraction.get("confidence", 0.5),
needs_confirmation=extraction.get("needs_confirmation", True),
raw_extraction=extraction,
)
except Exception as e:
logging.exception("[ShadowBot] LLM extraction failed: %s", e)
# Return fallback extraction
return ExtractionResult(
extraction_type="unclear",
who=[],
what="",
when="",
where="",
confidence=0.0,
needs_confirmation=True,
raw_extraction={"error": str(e)},
)
# -----------------------------------------------------------------------
# Entity Detection
# -----------------------------------------------------------------------
def _detect_entities(
self,
text: str,
extraction: ExtractionResult
) -> Tuple[List[str], List[str]]:
"""Detect and normalize family member entities.
Returns: (raw_entities, normalized_entities)
"""
text_lower = text.lower()
raw_detected = []
normalized = []
# Check for each family member's aliases
for canonical, aliases in FAMILY_MEMBER_ALIASES.items():
for alias in aliases:
# Use word boundary matching
pattern = r'\b' + re.escape(alias) + r'\b'
if re.search(pattern, text_lower):
raw_detected.append(alias)
if canonical not in normalized:
normalized.append(canonical)
break
# Also check extraction who field
for who in extraction.who:
who_lower = who.lower()
for canonical, aliases in FAMILY_MEMBER_ALIASES.items():
if who_lower in aliases or who_lower == canonical:
if canonical not in normalized:
normalized.append(canonical)
raw_detected.append(who)
break
return raw_detected, normalized
# -----------------------------------------------------------------------
# Event Recall — Admin DM Exhaust Methods
# -----------------------------------------------------------------------
async def _send_admin_dm_event_recall(
self,
shadow_msg_id: int,
extraction_id: int,
extracted_event: Dict[str, Any],
):
"""Send DM to admin about event not found in calendar.
Called when event recall validation finds NO_MATCH.
"""
if not self.admin_chat_id:
logging.warning("[ShadowBot] No admin_chat_id configured, skipping DM")
return
what = extracted_event.get("what", "Unknown")
when = extracted_event.get("when", "Unknown")
message = f"""🔍 Shadow Mode — Event Recall Not Found
Message: "{what}"
When: {when}
Calendar: ❌ No matching event found
The family is discussing an event that does not appear on the calendar.
Would you like me to add it?
⚠️ This is a simulation. Calendar not modified."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
url = f"{self.base_url}/sendMessage"
resp = await client.post(
url,
json={
"chat_id": self.admin_chat_id,
"text": message,
"parse_mode": "HTML",
},
)
resp.raise_for_status()
# Mark DM as sent in database
self.db.update_admin_dm_sent(extraction_id, "event_recall")
logging.info("[ShadowBot] Admin DM sent for event recall NO_MATCH extraction_id=%s", extraction_id)
except Exception as e:
logging.exception("[ShadowBot] Failed to send admin DM: %s", e)
# -----------------------------------------------------------------------
# Helper: Extract date from text for event recall bypass
# -----------------------------------------------------------------------
def _extract_date_from_text(self, text: str) -> str:
"""Extract date reference from text for event recall calendar check.
Uses the same patterns as CalendarValidator._parse_date_range
but returns a simple string for fuzzy matching.
"""
text_lower = text.lower()
now = datetime.now()
# Handle relative dates
if "today" in text_lower:
return "today"
elif "tomorrow" in text_lower:
return "tomorrow"
# Try to parse day of week
days = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
for day_name in days:
if day_name in text_lower:
return day_name
# Try to parse "May 3" or "may 3"
month_pattern = r"(january|february|march|april|may|june|july|august|september|october|november|december)\s+(\d{1,2})"
match = re.search(month_pattern, text_lower)
if match:
return f"{match.group(1)} {match.group(2)}"
# Try to parse "5/3" or "05/03"
date_pattern = r"(\d{1,2})[/-](\d{1,2})"
match = re.search(date_pattern, text_lower)
if match:
return f"{match.group(1)}/{match.group(2)}"
# Try to parse military time as hint for "today"
military_pattern = r"\b(0\d{3}|1\d{3}|2[0-3]\d{2})\b"
if re.search(military_pattern, text_lower):
return "today"
# Try to parse AM/PM time as hint for "today"
time_pattern = r"\b(\d{1,2}:\d{2}\s*(AM|PM|am|pm)?|\d{1,2}\s*(AM|PM|am|pm))\b"
if re.search(time_pattern, text_lower):
return "today"
return ""
# -----------------------------------------------------------------------
# Admin DM Exhaust Methods (Enriched Shadow Mode)
# -----------------------------------------------------------------------
async def _send_admin_dm_no_match(
self,
shadow_msg_id: int,
extraction_id: int,
extracted_event: Dict[str, Any],
):
"""Send DM to admin about proposed calendar action (NO_MATCH).
Called when calendar validation finds no matching event.
"""
if not self.admin_chat_id:
logging.warning("[ShadowBot] No admin_chat_id configured, skipping DM")
return
what = extracted_event.get("what", "Unknown")
when = extracted_event.get("when", "Unknown")
who = extracted_event.get("who", [])
who_str = ", ".join(who) if who else "Unknown"
message = f"""🔍 Shadow Mode — Proposed Calendar Action
Message ID: {shadow_msg_id}
Extracted: {what}
When: {when}
Who: {who_str}
Calendar: ❌ No matching event found
I would have added:
📅 "{what}" — {when}
Accurate? [Yes] [No] [Edit]
⚠️ This is a simulation in shadow mode. Calendar not modified.
Reply with /create_event {extraction_id} to create the event on the real calendar."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
url = f"{self.base_url}/sendMessage"
resp = await client.post(
url,
json={
"chat_id": self.admin_chat_id,
"text": message,
"parse_mode": "HTML",
},
)
resp.raise_for_status()
# Also send a compact confirmation message with inline buttons
# (even in shadow mode, we can send buttons for admin interaction)
button_url = f"{self.base_url}/sendMessage"
await client.post(
button_url,
json={
"chat_id": self.admin_chat_id,
"text": f'📅 Create calendar event for "{what}"?',
"parse_mode": "HTML",
"reply_markup": {
"inline_keyboard": [
[
{"text": "✅ Create Event", "callback_data": f"create_event:{extraction_id}"},
{"text": "❌ Skip", "callback_data": f"skip_event:{extraction_id}"},
]
]
},
},
)
except Exception as e:
logging.exception("[ShadowBot] Failed to send admin DM: %s", e)
async def _send_admin_dm_conflict(
self,
shadow_msg_id: int,
extraction_id: int,
extracted_event: Dict[str, Any],
calendar_event: Dict[str, Any],
conflict_description: str,
):
"""Send DM to admin about calendar conflict (CONFLICT).
Called when calendar validation finds a conflicting event.
"""
if not self.admin_chat_id:
logging.warning("[ShadowBot] No admin_chat_id configured, skipping DM")
return
what = extracted_event.get("what", "Unknown")
when = extracted_event.get("when", "Unknown")
cal_summary = calendar_event.get("summary", "Unknown")
cal_start = calendar_event.get("start", {})
cal_time = cal_start.get("dateTime", cal_start.get("date", "Unknown"))
message = f"""🔍 Shadow Mode — Calendar Conflict Detected
Message ID: {shadow_msg_id}
Extracted: {what} at {when}
Calendar: 📅 "{cal_summary}" at {cal_time}
Conflict: {conflict_description}
Which is correct?
[📅 Keep Calendar]
[💬 Use Message]
[❓ I'm Not Sure]
⚠️ This is a simulation. Calendar not modified."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
url = f"{self.base_url}/sendMessage"
resp = await client.post(
url,
json={
"chat_id": self.admin_chat_id,
"text": message,
"parse_mode": "HTML",
},
)
resp.raise_for_status()
# Mark DM as sent in database
self.db.update_admin_dm_sent(extraction_id, "conflict")
logging.info("[ShadowBot] Admin DM sent for CONFLICT extraction_id=%s", extraction_id)
except Exception as e:
logging.exception("[ShadowBot] Failed to send admin DM: %s", e)
# -----------------------------------------------------------------------
# Brain Ingest — POST extraction to staging API
# -----------------------------------------------------------------------
async def _ingest_to_brain(
self,
extraction_id: int,
extraction_result: "ExtractionResult",
message_text: str,
sender_name: str,
sender_username: str,
chat_title: str,
msg_id: str,
calendar_check_status: str = "pending",
calendar_event_id: str | None = None,
calendar_event_title: str | None = None,
fuzzy_match_score: float | None = None,
) -> dict | None:
"""POST extraction to staging API /brain/ingest.
Sends the processed extraction + calendar check result to
the Family Brain (ChromaDB) via the staging API.
"""
payload = {
"extraction_id": extraction_id,
"message_id": str(msg_id),
"sender_name": sender_name,
"sender_username": sender_username,
"chat_title": chat_title,
"message_text": message_text,
"extraction_type": extraction_result.extraction_type,
"extracted_who": extraction_result.who,
"extracted_what": extraction_result.what,
"extracted_when": extraction_result.when,
"extracted_where": extraction_result.where,
"confidence": extraction_result.confidence,
"needs_confirmation": extraction_result.needs_confirmation,
"calendar_check_status": calendar_check_status,
"calendar_event_id": calendar_event_id,
"calendar_event_title": calendar_event_title,
"fuzzy_match_score": fuzzy_match_score,
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
"http://127.0.0.1:8001/brain/ingest",
json=payload,
)
if resp.status_code == 200:
result = resp.json()
logging.info(
"[ShadowBot] Brain ingest OK: extraction_id=%s chunks=%s time=%sms",
extraction_id, result.get("chunks"), result.get("ingest_time_ms")
)
return result
else:
logging.warning(
"[ShadowBot] Brain ingest failed (HTTP %s): %s",
resp.status_code, resp.text[:500]
)
return None
except httpx.ConnectError:
logging.warning(
"[ShadowBot] Brain ingest skipped — staging API not available "
"(extraction_id=%s)", extraction_id
)
return None
except Exception as e:
logging.exception("[ShadowBot] Brain ingest error: %s", e)
return None
# -----------------------------------------------------------------------
# Verbose Admin DM Methods (Staging Validation)
# -----------------------------------------------------------------------
async def _send_verbose_dm(self, text: str):
"""Send a verbose DM to admin chat. Only used in VERBOSE_ADMIN_DM mode.
These DMs go to admin_chat_id (your private DM), NEVER to the Family
Logistics group. This is the firehose — every pipeline event gets a DM.
"""
if not self.admin_chat_id or not self.verbose_admin_dm:
return
try:
async with httpx.AsyncClient(timeout=10.0) as client:
url = f"{self.base_url}/sendMessage"
resp = await client.post(
url,
json={
"chat_id": self.admin_chat_id,
"text": text,
"parse_mode": "HTML",
},
)
resp.raise_for_status()
except Exception as e:
logging.warning("[ShadowBot] Verbose DM failed: %s", e)
async def _send_verbose_dm_tripwire(
self,
sender_name: str,
text: str,
fired: bool,
score: float,
matches: list,
confidence: str,
):
"""Send tripwire result DM."""
emoji = "🔴" if fired else "⚪"
matches_str = ", ".join(matches) if matches else "none"
msg = (
f"{emoji} Tripwire | {sender_name}\n"
f"\n"
f"\"{text}\"\n"
f"\n"
f"Score: {score:.2f} | {confidence}\n"
f"Matches: {matches_str}\n"
f"Fired: {'YES → Tier 2' if fired else 'NO (dropped)'}"
)
await self._send_verbose_dm(msg)
async def _send_verbose_dm_extraction(
self,
extraction_result: Optional[ExtractionResult],
calendar_status: str = "",
calendar_title: str = "",
calendar_fuzzy: float = 0.0,
brain_ingested: bool = False,
brain_chunks: int = 0,
):
"""Send extraction + calendar + brain result DM."""
if not extraction_result:
await self._send_verbose_dm("⚪ Extraction | No LLM call (tripwire not fired)")
return
who_str = ", ".join(extraction_result.who) if extraction_result.who else "—"
cal_emoji = {"match": "✅", "no_match": "❌", "conflict": "⚠️", "task_reminder": "📋", "pending": "⏳"}.get(calendar_status, "❓")
brain_str = f"🧠 {brain_chunks} chunks" if brain_ingested else "🧠 skipped"
msg = (
f"🔵 Extraction | {extraction_result.extraction_type}\n"
f"\n"
f"What: {extraction_result.what}\n"
f"When: {extraction_result.when or '—'}\n"
f"Who: {who_str}\n"
f"Where: {extraction_result.where or '—'}\n"
f"\n"
f"Confidence: {extraction_result.confidence:.2f}\n"
f"Confirm needed: {'Yes' if extraction_result.needs_confirmation else 'No'}\n"
f"\n"
f"{cal_emoji} Calendar: {calendar_status or 'not checked'}"
)
if calendar_title:
msg += f" → \"{calendar_title}\" (score={calendar_fuzzy:.2f})"
msg += f"\n{brain_str}"
await self._send_verbose_dm(msg)
# -----------------------------------------------------------------------
# Safety: Speak Prevention
# -----------------------------------------------------------------------
async def _send_message(self, *args, **kwargs):
"""SAFETY: Prevent any message sending in shadow mode.
This method should never be called. If called, it logs an error
and returns a failure response.
"""
logging.error("[ShadowBot] ATTEMPTED TO SEND MESSAGE IN SHADOW MODE - BLOCKED")
logging.error("[ShadowBot] Call args: %s kwargs: %s", args, kwargs)
return {"ok": False, "error": "SPEAK_DISABLED_IN_SHADOW_MODE"}
async def _answer_callback(self, *args, **kwargs):
"""SAFETY: Prevent any callback answering."""
logging.error("[ShadowBot] ATTEMPTED TO ANSWER CALLBACK IN SHADOW MODE - BLOCKED")
return {"ok": False, "error": "SPEAK_DISABLED_IN_SHADOW_MODE"}