!/usr/bin/env python3
"""Replay email — Pull raw payload from spool and reprocess.
Usage:
replay_email.py --id 12 # Replay log ID 12
replay_email.py --list # Show recent logs
replay_email.py --list --limit 10 # Show last 10
replay_email.py --stats # Show spool statistics
replay_email.py --dry-run --id 12 # Preview without replaying
"""
import argparse
import asyncio
import json
import sys
from pathlib import Path
Add paths for imports
sys.path.insert(0, str(Path(file).parent))
sys.path.insert(0, str(Path(file).parent.parent.parent))
try:
from ingress_spooler import (
get_log,
get_raw_payload,
list_recent_logs,
get_stats,
log_ingress
)
except ImportError:
# Fallback to direct import
sys.path.insert(0, '/home/hoffmann_admin/.openclaw/workspace/services')
from icarus.core.ingress_spooler import (
get_log,
get_raw_payload,
list_recent_logs,
get_stats,
log_ingress
)
Note: process_email_payload would be imported from your pipeline module
For now, this is a stub that just logs the replay attempt
def process_email_payload_stub(payload: dict) -> dict:
"""Stub for pipeline processing. Implement by importing actual processor."""
return {"status": "stub", "note": "Pipeline integration pending"}
def print_log_entry(log: dict):
"""Pretty print a log entry."""
print(f"\n{'='*60}")
print(f"Log ID: {log['id']}")
print(f"Received: {log['received_at']}")
print(f"Source: {log['source']}")
if log.get('replayed_from_id'):
print(f"Replayed from: {log['replayed_from_id']}")
print(f"Result: {log.get('processing_result', 'unknown')}")
if log.get('error_message'):
print(f"Error: {log['error_message']}")
# Show payload preview
if log.get('raw_payload'):
payload = json.loads(log['raw_payload'])
print(f"\nPayload Preview:")
if 'from' in payload:
print(f" From: {payload['from']}")
if 'subject' in payload:
print(f" Subject: {payload['subject']}")
if 'attachments' in payload:
print(f" Attachments: {len(payload['attachments'])}")
print('='*60)
async def replay_log(log_id: int, dry_run: bool = False):
"""Replay a specific log entry."""
log = get_log(log_id)
if not log:
print(f"Error: Log ID {log_id} not found", file=sys.stderr)
return 1
payload = get_raw_payload(log_id)
if not payload:
print(f"Error: Could not parse payload for log {log_id}", file=sys.stderr)
return 1
print(f"Replaying log ID: {log_id}")
print(f"Original received: {log['received_at']}")
print(f"Source: {log['source']}")
if dry_run:
print("\n--- DRY RUN ---")
print("Payload that would be processed:")
print(json.dumps(payload, indent=2, default=str))
return 0
# Log this replay attempt
replay_id = log_ingress(payload, source="replay", replayed_from_id=log_id)
print(f"Created replay log ID: {replay_id}")
# Process the payload
try:
# TODO: Replace with actual pipeline import
# from your_module import process_email_payload
result = process_email_payload_stub(payload)
print(f"\nProcessing result:")
print(json.dumps(result, indent=2, default=str))
return 0
except Exception as e:
print(f"\nProcessing failed: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
return 1
def main():
parser = argparse.ArgumentParser(
description="Replay email payloads from ingress spool"
)
parser.add_argument('--id', type=int, help='Log ID to replay')
parser.add_argument('--list', action='store_true', help='List recent logs')
parser.add_argument('--limit', type=int, default=20, help='Limit for --list')
parser.add_argument('--stats', action='store_true', help='Show statistics')
parser.add_argument('--dry-run', action='store_true', help='Preview without replaying')
args = parser.parse_args()
if args.stats:
stats = get_stats()
print("\nIngress Spool Statistics:")
print(f" Total logs: {stats['total_logs']}")
print(f" Today: {stats['today']}")
print(f" Failed: {stats['failed']}")
print(f" DB path: {stats['db_path']}")
return 0
if args.list:
logs = list_recent_logs(limit=args.limit)
print(f"\nRecent logs (last {len(logs)}):")
for log in logs:
print_log_entry(log)
return 0
if args.id:
return asyncio.run(replay_log(args.id, dry_run=args.dry_run))
parser.print_help()
return 1
if name == 'main':
sys.exit(main())