"""Replay test payloads through memory loops for debugging. Hooks into SQLite Ingress Spooler for test replay. Sovereign: Zero imports from costco_route. """ import sqlite3 import json from pathlib import Path from typing import Optional from icarus.core.memory.engine import lookup_assignment, learn_assignment from icarus.core.ingress_spooler import get_raw_payload def replay_payload(log_id: int, dry_run: bool = False) -> dict: """Replay a payload through the memory system. Args: log_id: ID in ingress spooler database dry_run: If True, preview without saving to memory Returns: Replay results with before/after comparison """ # Fetch payload from spooler payload = get_raw_payload(log_id) if not payload: return {"error": f"Payload {log_id} not found"} # Get log metadata from icarus.core.ingress_spooler import get_log log = get_log(log_id) # Extract pattern from document content content = payload.get("body", "") subject = payload.get("subject", "") pattern = subject or content[:100] # Use subject or first 100 chars # Check memory BEFORE memory_before = lookup_assignment(pattern) # Simulate classification (would call actual pipeline in production) # For replay, we use the inferred members from family_loader from icarus.core.family_loader import get_family_config family_config = get_family_config() inferred = family_config.infer_recipients(content) if inferred: suggested_member = inferred[0]["member_id"] confidence = inferred[0]["confidence"] else: suggested_member = "family" confidence = 0.5 # If not dry_run, save to memory if not dry_run: # Generate document hash import hashlib doc_hash = hashlib.sha256(content.encode()).hexdigest()[:32] learn_assignment( document_hash=doc_hash, member_id=suggested_member, pattern=pattern, notes=f"Replayed from log {log_id}" ) # Check memory AFTER memory_after = lookup_assignment(pattern) return { "log_id": log_id, "source": log.get("source", "unknown"), "pattern": pattern, "suggested_member": suggested_member, "confidence": confidence, "memory_before": memory_before, "memory_after": memory_after, "dry_run": dry_run, "timestamp": log.get("received_at", "unknown"), } def replay_and_learn(log_id: int, correct_member: str) -> dict: """Replay a payload and save the correct assignment. For user calibration: after reviewing a document, the user confirms the correct family member. Args: log_id: ID in ingress spooler database correct_member: Verified family member ID Returns: Confirmation of learning """ payload = get_raw_payload(log_id) if not payload: return {"error": f"Payload {log_id} not found"} content = payload.get("body", "") subject = payload.get("subject", "") pattern = subject or content[:100] # Generate document hash import hashlib doc_hash = hashlib.sha256(content.encode()).hexdigest()[:32] # Learn the assignment learn_assignment( document_hash=doc_hash, member_id=correct_member, pattern=pattern, notes=f"User calibration from log {log_id}" ) return { "status": "learned", "log_id": log_id, "pattern": pattern, "member_id": correct_member, "doc_hash": doc_hash, } def batch_replay(log_ids: list[int], dry_run: bool = False) -> list[dict]: """Replay multiple payloads. Args: log_ids: List of log IDs to replay dry_run: If True, preview without saving Returns: List of replay results """ results = [] for log_id in log_ids: result = replay_payload(log_id, dry_run) results.append(result) return results def compare_replays(log_id: int, member_a: str, member_b: str) -> dict: """Compare two member assignments for the same document. Useful for testing inference accuracy. Args: log_id: ID in ingress spooler database member_a: First member to test member_b: Second member to test Returns: Comparison results """ payload = get_raw_payload(log_id) if not payload: return {"error": f"Payload {log_id} not found"} content = payload.get("body", "") # Check keyword matches for both members from icarus.core.family_loader import get_family_config family_config = get_family_config() member_a_info = family_config.get_member(member_a) member_b_info = family_config.get_member(member_b) # Simple keyword scoring def score_member(content: str, member_id: str) -> int: score = 0 content_lower = content.lower() if member_id in content_lower: score += 10 # Check member-specific keywords keywords = { "sully": ["first grade", "mrs. smith", "sullivan"], "harper": ["pre-k", "ms. johnson", "harper"], "aundrea": ["hospital", "aundrea"], "matt": ["software", "matt"], }.get(member_id, []) for kw in keywords: if kw in content_lower: score += 5 return score score_a = score_member(content, member_a) score_b = score_member(content, member_b) return { "log_id": log_id, "member_a": {"id": member_a, "name": member_a_info["name"], "score": score_a}, "member_b": {"id": member_b, "name": member_b_info["name"], "score": score_b}, "winner": member_a if score_a > score_b else member_b if score_b > score_a else "tie", "confidence": max(score_a, score_b) / (score_a + score_b + 1), }