📄 replay_email.py 4,880 bytes Apr 26, 2026 📋 Raw

!/usr/bin/env python3

"""Replay email — Pull raw payload from spool and reprocess.

Usage:
replay_email.py --id 12 # Replay log ID 12
replay_email.py --list # Show recent logs
replay_email.py --list --limit 10 # Show last 10
replay_email.py --stats # Show spool statistics
replay_email.py --dry-run --id 12 # Preview without replaying
"""

import argparse
import asyncio
import json
import sys
from pathlib import Path

Add paths for imports

sys.path.insert(0, str(Path(file).parent))
sys.path.insert(0, str(Path(file).parent.parent.parent))

try:
from ingress_spooler import (
get_log,
get_raw_payload,
list_recent_logs,
get_stats,
log_ingress
)
except ImportError:
# Fallback to direct import
sys.path.insert(0, '/home/hoffmann_admin/.openclaw/workspace/services')
from icarus.core.ingress_spooler import (
get_log,
get_raw_payload,
list_recent_logs,
get_stats,
log_ingress
)

Note: process_email_payload would be imported from your pipeline module

For now, this is a stub that just logs the replay attempt

def process_email_payload_stub(payload: dict) -> dict:
"""Stub for pipeline processing. Implement by importing actual processor."""
return {"status": "stub", "note": "Pipeline integration pending"}

def print_log_entry(log: dict):
"""Pretty print a log entry."""
print(f"\n{'='*60}")
print(f"Log ID: {log['id']}")
print(f"Received: {log['received_at']}")
print(f"Source: {log['source']}")
if log.get('replayed_from_id'):
print(f"Replayed from: {log['replayed_from_id']}")
print(f"Result: {log.get('processing_result', 'unknown')}")
if log.get('error_message'):
print(f"Error: {log['error_message']}")

# Show payload preview
if log.get('raw_payload'):
    payload = json.loads(log['raw_payload'])
    print(f"\nPayload Preview:")
    if 'from' in payload:
        print(f"  From: {payload['from']}")
    if 'subject' in payload:
        print(f"  Subject: {payload['subject']}")
    if 'attachments' in payload:
        print(f"  Attachments: {len(payload['attachments'])}")
print('='*60)

async def replay_log(log_id: int, dry_run: bool = False):
"""Replay a specific log entry."""
log = get_log(log_id)
if not log:
print(f"Error: Log ID {log_id} not found", file=sys.stderr)
return 1

payload = get_raw_payload(log_id)
if not payload:
    print(f"Error: Could not parse payload for log {log_id}", file=sys.stderr)
    return 1

print(f"Replaying log ID: {log_id}")
print(f"Original received: {log['received_at']}")
print(f"Source: {log['source']}")

if dry_run:
    print("\n--- DRY RUN ---")
    print("Payload that would be processed:")
    print(json.dumps(payload, indent=2, default=str))
    return 0

# Log this replay attempt
replay_id = log_ingress(payload, source="replay", replayed_from_id=log_id)
print(f"Created replay log ID: {replay_id}")

# Process the payload
try:
    # TODO: Replace with actual pipeline import
    # from your_module import process_email_payload
    result = process_email_payload_stub(payload)
    print(f"\nProcessing result:")
    print(json.dumps(result, indent=2, default=str))
    return 0
except Exception as e:
    print(f"\nProcessing failed: {e}", file=sys.stderr)
    import traceback
    traceback.print_exc()
    return 1

def main():
parser = argparse.ArgumentParser(
description="Replay email payloads from ingress spool"
)
parser.add_argument('--id', type=int, help='Log ID to replay')
parser.add_argument('--list', action='store_true', help='List recent logs')
parser.add_argument('--limit', type=int, default=20, help='Limit for --list')
parser.add_argument('--stats', action='store_true', help='Show statistics')
parser.add_argument('--dry-run', action='store_true', help='Preview without replaying')

args = parser.parse_args()

if args.stats:
    stats = get_stats()
    print("\nIngress Spool Statistics:")
    print(f"  Total logs: {stats['total_logs']}")
    print(f"  Today: {stats['today']}")
    print(f"  Failed: {stats['failed']}")
    print(f"  DB path: {stats['db_path']}")
    return 0

if args.list:
    logs = list_recent_logs(limit=args.limit)
    print(f"\nRecent logs (last {len(logs)}):")
    for log in logs:
        print_log_entry(log)
    return 0

if args.id:
    return asyncio.run(replay_log(args.id, dry_run=args.dry_run))

parser.print_help()
return 1

if name == 'main':
sys.exit(main())