#!/usr/bin/env python3 """Replay email — Pull raw payload from spool and reprocess. Usage: replay_email.py --id 12 # Replay log ID 12 replay_email.py --list # Show recent logs replay_email.py --list --limit 10 # Show last 10 replay_email.py --stats # Show spool statistics replay_email.py --dry-run --id 12 # Preview without replaying """ import argparse import asyncio import json import sys from pathlib import Path # Add paths for imports sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent.parent.parent)) try: from ingress_spooler import ( get_log, get_raw_payload, list_recent_logs, get_stats, log_ingress ) except ImportError: # Fallback to direct import sys.path.insert(0, '/home/hoffmann_admin/.openclaw/workspace/services') from icarus.core.ingress_spooler import ( get_log, get_raw_payload, list_recent_logs, get_stats, log_ingress ) # Note: process_email_payload would be imported from your pipeline module # For now, this is a stub that just logs the replay attempt def process_email_payload_stub(payload: dict) -> dict: """Stub for pipeline processing. Implement by importing actual processor.""" return {"status": "stub", "note": "Pipeline integration pending"} def print_log_entry(log: dict): """Pretty print a log entry.""" print(f"\n{'='*60}") print(f"Log ID: {log['id']}") print(f"Received: {log['received_at']}") print(f"Source: {log['source']}") if log.get('replayed_from_id'): print(f"Replayed from: {log['replayed_from_id']}") print(f"Result: {log.get('processing_result', 'unknown')}") if log.get('error_message'): print(f"Error: {log['error_message']}") # Show payload preview if log.get('raw_payload'): payload = json.loads(log['raw_payload']) print(f"\nPayload Preview:") if 'from' in payload: print(f" From: {payload['from']}") if 'subject' in payload: print(f" Subject: {payload['subject']}") if 'attachments' in payload: print(f" Attachments: {len(payload['attachments'])}") print('='*60) async def replay_log(log_id: int, dry_run: bool = False): """Replay a specific log entry.""" log = get_log(log_id) if not log: print(f"Error: Log ID {log_id} not found", file=sys.stderr) return 1 payload = get_raw_payload(log_id) if not payload: print(f"Error: Could not parse payload for log {log_id}", file=sys.stderr) return 1 print(f"Replaying log ID: {log_id}") print(f"Original received: {log['received_at']}") print(f"Source: {log['source']}") if dry_run: print("\n--- DRY RUN ---") print("Payload that would be processed:") print(json.dumps(payload, indent=2, default=str)) return 0 # Log this replay attempt replay_id = log_ingress(payload, source="replay", replayed_from_id=log_id) print(f"Created replay log ID: {replay_id}") # Process the payload try: # TODO: Replace with actual pipeline import # from your_module import process_email_payload result = process_email_payload_stub(payload) print(f"\nProcessing result:") print(json.dumps(result, indent=2, default=str)) return 0 except Exception as e: print(f"\nProcessing failed: {e}", file=sys.stderr) import traceback traceback.print_exc() return 1 def main(): parser = argparse.ArgumentParser( description="Replay email payloads from ingress spool" ) parser.add_argument('--id', type=int, help='Log ID to replay') parser.add_argument('--list', action='store_true', help='List recent logs') parser.add_argument('--limit', type=int, default=20, help='Limit for --list') parser.add_argument('--stats', action='store_true', help='Show statistics') parser.add_argument('--dry-run', action='store_true', help='Preview without replaying') args = parser.parse_args() if args.stats: stats = get_stats() print("\nIngress Spool Statistics:") print(f" Total logs: {stats['total_logs']}") print(f" Today: {stats['today']}") print(f" Failed: {stats['failed']}") print(f" DB path: {stats['db_path']}") return 0 if args.list: logs = list_recent_logs(limit=args.limit) print(f"\nRecent logs (last {len(logs)}):") for log in logs: print_log_entry(log) return 0 if args.id: return asyncio.run(replay_log(args.id, dry_run=args.dry_run)) parser.print_help() return 1 if __name__ == '__main__': sys.exit(main())