"""Replay test payloads through memory loops for debugging.
Hooks into SQLite Ingress Spooler for test replay.
Sovereign: Zero imports from costco_route.
"""
import sqlite3
import json
from pathlib import Path
from typing import Optional
from icarus.core.memory.engine import lookup_assignment, learn_assignment
from icarus.core.ingress_spooler import get_raw_payload
def replay_payload(log_id: int, dry_run: bool = False) -> dict:
"""Replay a payload through the memory system.
Args:
log_id: ID in ingress spooler database
dry_run: If True, preview without saving to memory
Returns:
Replay results with before/after comparison
"""
# Fetch payload from spooler
payload = get_raw_payload(log_id)
if not payload:
return {"error": f"Payload {log_id} not found"}
# Get log metadata
from icarus.core.ingress_spooler import get_log
log = get_log(log_id)
# Extract pattern from document content
content = payload.get("body", "")
subject = payload.get("subject", "")
pattern = subject or content[:100] # Use subject or first 100 chars
# Check memory BEFORE
memory_before = lookup_assignment(pattern)
# Simulate classification (would call actual pipeline in production)
# For replay, we use the inferred members from family_loader
from icarus.core.family_loader import get_family_config
family_config = get_family_config()
inferred = family_config.infer_recipients(content)
if inferred:
suggested_member = inferred[0]["member_id"]
confidence = inferred[0]["confidence"]
else:
suggested_member = "family"
confidence = 0.5
# If not dry_run, save to memory
if not dry_run:
# Generate document hash
import hashlib
doc_hash = hashlib.sha256(content.encode()).hexdigest()[:32]
learn_assignment(
document_hash=doc_hash,
member_id=suggested_member,
pattern=pattern,
notes=f"Replayed from log {log_id}"
)
# Check memory AFTER
memory_after = lookup_assignment(pattern)
return {
"log_id": log_id,
"source": log.get("source", "unknown"),
"pattern": pattern,
"suggested_member": suggested_member,
"confidence": confidence,
"memory_before": memory_before,
"memory_after": memory_after,
"dry_run": dry_run,
"timestamp": log.get("received_at", "unknown"),
}
def replay_and_learn(log_id: int, correct_member: str) -> dict:
"""Replay a payload and save the correct assignment.
For user calibration: after reviewing a document, the user
confirms the correct family member.
Args:
log_id: ID in ingress spooler database
correct_member: Verified family member ID
Returns:
Confirmation of learning
"""
payload = get_raw_payload(log_id)
if not payload:
return {"error": f"Payload {log_id} not found"}
content = payload.get("body", "")
subject = payload.get("subject", "")
pattern = subject or content[:100]
# Generate document hash
import hashlib
doc_hash = hashlib.sha256(content.encode()).hexdigest()[:32]
# Learn the assignment
learn_assignment(
document_hash=doc_hash,
member_id=correct_member,
pattern=pattern,
notes=f"User calibration from log {log_id}"
)
return {
"status": "learned",
"log_id": log_id,
"pattern": pattern,
"member_id": correct_member,
"doc_hash": doc_hash,
}
def batch_replay(log_ids: list[int], dry_run: bool = False) -> list[dict]:
"""Replay multiple payloads.
Args:
log_ids: List of log IDs to replay
dry_run: If True, preview without saving
Returns:
List of replay results
"""
results = []
for log_id in log_ids:
result = replay_payload(log_id, dry_run)
results.append(result)
return results
def compare_replays(log_id: int, member_a: str, member_b: str) -> dict:
"""Compare two member assignments for the same document.
Useful for testing inference accuracy.
Args:
log_id: ID in ingress spooler database
member_a: First member to test
member_b: Second member to test
Returns:
Comparison results
"""
payload = get_raw_payload(log_id)
if not payload:
return {"error": f"Payload {log_id} not found"}
content = payload.get("body", "")
# Check keyword matches for both members
from icarus.core.family_loader import get_family_config
family_config = get_family_config()
member_a_info = family_config.get_member(member_a)
member_b_info = family_config.get_member(member_b)
# Simple keyword scoring
def score_member(content: str, member_id: str) -> int:
score = 0
content_lower = content.lower()
if member_id in content_lower:
score += 10
# Check member-specific keywords
keywords = {
"sully": ["first grade", "mrs. smith", "sullivan"],
"harper": ["pre-k", "ms. johnson", "harper"],
"aundrea": ["hospital", "aundrea"],
"matt": ["software", "matt"],
}.get(member_id, [])
for kw in keywords:
if kw in content_lower:
score += 5
return score
score_a = score_member(content, member_a)
score_b = score_member(content, member_b)
return {
"log_id": log_id,
"member_a": {"id": member_a, "name": member_a_info["name"], "score": score_a},
"member_b": {"id": member_b, "name": member_b_info["name"], "score": score_b},
"winner": member_a if score_a > score_b else member_b if score_b > score_a else "tie",
"confidence": max(score_a, score_b) / (score_a + score_b + 1),
}