📄 replay.py 5,887 bytes Apr 26, 2026 📋 Raw

"""Replay test payloads through memory loops for debugging.

Hooks into SQLite Ingress Spooler for test replay.
Sovereign: Zero imports from costco_route.
"""

import sqlite3
import json
from pathlib import Path
from typing import Optional

from icarus.core.memory.engine import lookup_assignment, learn_assignment
from icarus.core.ingress_spooler import get_raw_payload

def replay_payload(log_id: int, dry_run: bool = False) -> dict:
"""Replay a payload through the memory system.

Args:
    log_id: ID in ingress spooler database
    dry_run: If True, preview without saving to memory

Returns:
    Replay results with before/after comparison
"""
# Fetch payload from spooler
payload = get_raw_payload(log_id)
if not payload:
    return {"error": f"Payload {log_id} not found"}

# Get log metadata
from icarus.core.ingress_spooler import get_log
log = get_log(log_id)

# Extract pattern from document content
content = payload.get("body", "")
subject = payload.get("subject", "")
pattern = subject or content[:100]  # Use subject or first 100 chars

# Check memory BEFORE
memory_before = lookup_assignment(pattern)

# Simulate classification (would call actual pipeline in production)
# For replay, we use the inferred members from family_loader
from icarus.core.family_loader import get_family_config
family_config = get_family_config()
inferred = family_config.infer_recipients(content)

if inferred:
    suggested_member = inferred[0]["member_id"]
    confidence = inferred[0]["confidence"]
else:
    suggested_member = "family"
    confidence = 0.5

# If not dry_run, save to memory
if not dry_run:
    # Generate document hash
    import hashlib
    doc_hash = hashlib.sha256(content.encode()).hexdigest()[:32]
    learn_assignment(
        document_hash=doc_hash,
        member_id=suggested_member,
        pattern=pattern,
        notes=f"Replayed from log {log_id}"
    )

# Check memory AFTER
memory_after = lookup_assignment(pattern)

return {
    "log_id": log_id,
    "source": log.get("source", "unknown"),
    "pattern": pattern,
    "suggested_member": suggested_member,
    "confidence": confidence,
    "memory_before": memory_before,
    "memory_after": memory_after,
    "dry_run": dry_run,
    "timestamp": log.get("received_at", "unknown"),
}

def replay_and_learn(log_id: int, correct_member: str) -> dict:
"""Replay a payload and save the correct assignment.

For user calibration: after reviewing a document, the user
confirms the correct family member.

Args:
    log_id: ID in ingress spooler database
    correct_member: Verified family member ID

Returns:
    Confirmation of learning
"""
payload = get_raw_payload(log_id)
if not payload:
    return {"error": f"Payload {log_id} not found"}

content = payload.get("body", "")
subject = payload.get("subject", "")
pattern = subject or content[:100]

# Generate document hash
import hashlib
doc_hash = hashlib.sha256(content.encode()).hexdigest()[:32]

# Learn the assignment
learn_assignment(
    document_hash=doc_hash,
    member_id=correct_member,
    pattern=pattern,
    notes=f"User calibration from log {log_id}"
)

return {
    "status": "learned",
    "log_id": log_id,
    "pattern": pattern,
    "member_id": correct_member,
    "doc_hash": doc_hash,
}

def batch_replay(log_ids: list[int], dry_run: bool = False) -> list[dict]:
"""Replay multiple payloads.

Args:
    log_ids: List of log IDs to replay
    dry_run: If True, preview without saving

Returns:
    List of replay results
"""
results = []
for log_id in log_ids:
    result = replay_payload(log_id, dry_run)
    results.append(result)
return results

def compare_replays(log_id: int, member_a: str, member_b: str) -> dict:
"""Compare two member assignments for the same document.

Useful for testing inference accuracy.

Args:
    log_id: ID in ingress spooler database
    member_a: First member to test
    member_b: Second member to test

Returns:
    Comparison results
"""
payload = get_raw_payload(log_id)
if not payload:
    return {"error": f"Payload {log_id} not found"}

content = payload.get("body", "")

# Check keyword matches for both members
from icarus.core.family_loader import get_family_config
family_config = get_family_config()

member_a_info = family_config.get_member(member_a)
member_b_info = family_config.get_member(member_b)

# Simple keyword scoring
def score_member(content: str, member_id: str) -> int:
    score = 0
    content_lower = content.lower()

    if member_id in content_lower:
        score += 10

    # Check member-specific keywords
    keywords = {
        "sully": ["first grade", "mrs. smith", "sullivan"],
        "harper": ["pre-k", "ms. johnson", "harper"],
        "aundrea": ["hospital", "aundrea"],
        "matt": ["software", "matt"],
    }.get(member_id, [])

    for kw in keywords:
        if kw in content_lower:
            score += 5

    return score

score_a = score_member(content, member_a)
score_b = score_member(content, member_b)

return {
    "log_id": log_id,
    "member_a": {"id": member_a, "name": member_a_info["name"], "score": score_a},
    "member_b": {"id": member_b, "name": member_b_info["name"], "score": score_b},
    "winner": member_a if score_a > score_b else member_b if score_b > score_a else "tie",
    "confidence": max(score_a, score_b) / (score_a + score_b + 1),
}