"""IMAP Proxy - Zero-config email ingress for Icarus. Phase 6.2: IMAP IDLE client for iCloud Mail (and other providers). - Outbound-only connection (appliance-initiated) - App-specific password support - IDLE monitoring for real-time email arrival - Integrates with existing document pipeline """ import asyncio import email import hashlib import imaplib import json import logging import os import threading import time from dataclasses import dataclass, field from datetime import datetime, timezone from email.header import decode_header from email.utils import parsedate_to_datetime from pathlib import Path from typing import Callable, Dict, List, Optional, Any logger = logging.getLogger(__name__) # HTML stripping (reused from family_assistant) from family_assistant.email_fetcher import html_to_text # --------------------------------------------------------------------------- # Provider presets # --------------------------------------------------------------------------- PROVIDERS = { "icloud": { "host": "imap.mail.me.com", "port": 993, "label": "iCloud Mail", }, "gmail": { "host": "imap.gmail.com", "port": 993, "label": "Gmail", }, "outlook": { "host": "outlook.office365.com", "port": 993, "label": "Outlook", }, "yahoo": { "host": "imap.mail.yahoo.com", "port": 993, "label": "Yahoo", }, } @dataclass class IMAPConfig: """Configuration for a single IMAP account.""" provider: str = "icloud" host: str = "" port: int = 993 username: str = "" password: str = "" # App-specific password mailbox: str = "INBOX" idle_timeout: int = 60 * 5 # 5 min IDLE refresh poll_interval: int = 30 # fallback poll if IDLE unsupported enabled: bool = True def __post_init__(self): if not self.host and self.provider in PROVIDERS: preset = PROVIDERS[self.provider] self.host = preset["host"] self.port = preset["port"] @classmethod def from_env(cls, prefix: str = "IMAP") -> "IMAPConfig": """Load config from environment variables. Expected vars: {PREFIX}_PROVIDER, {PREFIX}_USER, {PREFIX}_PASSWORD, optional: {PREFIX}_HOST, {PREFIX}_PORT, {PREFIX}_MAILBOX, {PREFIX}_ENABLED. """ provider = os.getenv(f"{prefix}_PROVIDER", "icloud") return cls( provider=provider, host=os.getenv(f"{prefix}_HOST", ""), port=int(os.getenv(f"{prefix}_PORT", "993")), username=os.getenv(f"{prefix}_USER", ""), password=os.getenv(f"{prefix}_PASSWORD", ""), mailbox=os.getenv(f"{prefix}_MAILBOX", "INBOX"), enabled=os.getenv(f"{prefix}_ENABLED", "true").lower() == "true", ) def decode_str(s): """Decode an email header value.""" if s is None: return "" decoded = decode_header(s) parts = [] for data, charset in decoded: if isinstance(data, bytes): parts.append(data.decode(charset or "utf-8", errors="replace")) else: parts.append(data) return "".join(parts) def get_body(msg) -> str: """Extract text body from email message.""" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) charset = part.get_content_charset() or "utf-8" if payload is None: return "" if isinstance(payload, bytes): return payload.decode(charset, errors="replace") elif isinstance(payload, int): return str(payload) else: return str(payload) for part in msg.walk(): if part.get_content_type() == "text/html": payload = part.get_payload(decode=True) charset = part.get_content_charset() or "utf-8" if payload is None: return "" if isinstance(payload, bytes): html = payload.decode(charset, errors="replace") elif isinstance(payload, int): html = str(payload) else: html = str(payload) return html_to_text(html) else: payload = msg.get_payload(decode=True) charset = msg.get_content_charset() or "utf-8" if payload is None: return "" if isinstance(payload, bytes): text = payload.decode(charset, errors="replace") elif isinstance(payload, int): text = str(payload) else: text = str(payload) if msg.get_content_type() == "text/html": return html_to_text(text) return text class IMAPProxy: """IMAP IDLE proxy for zero-config email ingress. Connects outbound to IMAP server, monitors for new mail via IDLE, and dispatches to a handler callback for pipeline processing. """ def __init__( self, config: IMAPConfig, handler: Optional[Callable[[Dict], None]] = None, metrics_callback: Optional[Callable] = None, ): self.config = config self.handler = handler self.metrics_callback = metrics_callback self._connection: Optional[imaplib.IMAP4_SSL] = None self._running = False self._thread: Optional[threading.Thread] = None self._last_error: Optional[str] = None self._connected_at: Optional[datetime] = None self._emails_fetched: int = 0 self._last_fetch_at: Optional[datetime] = None self._status: str = "disconnected" self._lock = threading.Lock() # Circuit breaker state self._auth_failures: int = 0 self._consecutive_errors: int = 0 self._last_alert_at: Optional[datetime] = None self._alert_cooldown_minutes: int = 30 # Don't spam alerts # Deduplication: track processed message IDs self._processed_ids_file = Path("/tmp/icarus-imap-processed.json") self._processed_ids: set = self._load_processed_ids() def _load_processed_ids(self) -> set: """Load set of already-processed message IDs from disk.""" try: if self._processed_ids_file.exists(): data = json.loads(self._processed_ids_file.read_text()) return set(data.get("processed_ids", [])) except Exception as e: logger.warning(f"Could not load processed IDs: {e}") return set() def _save_processed_id(self, msg_id: str): """Save a message ID as processed to disk.""" self._processed_ids.add(msg_id) try: self._processed_ids_file.write_text(json.dumps({ "processed_ids": list(self._processed_ids), "last_updated": datetime.now(timezone.utc).isoformat(), }, indent=2)) except Exception as e: logger.warning(f"Could not save processed ID: {e}") def _is_already_processed(self, msg_id: str) -> bool: """Check if a message ID has already been processed.""" return msg_id in self._processed_ids def _alert_on_failure(self, error_type: str, error_msg: str): """Send Telegram alert on persistent failures (circuit breaker).""" try: # Check cooldown if self._last_alert_at: minutes_since = (datetime.now(timezone.utc) - self._last_alert_at).total_seconds() / 60 if minutes_since < self._alert_cooldown_minutes: return # Still in cooldown # Only alert on repeated failures if self._consecutive_errors < 3 and error_type not in ["auth_error"]: return # Import here to avoid circular deps from shared.notify import TelegramNotifier telegram = TelegramNotifier() # Async send in sync context import asyncio async def _send_alert(): msg = f"🚨 IMAP Proxy Alert\n\n" msg += f"Error Type: {error_type}\n" msg += f"Account: {self.config.username}\n" msg += f"Provider: {self.config.provider}\n" msg += f"Consecutive Errors: {self._consecutive_errors}\n\n" msg += f"Error: {error_msg[:200]}\n\n" if error_type == "auth_error": msg += "🔑 Action Required: Check app-specific password" else: msg += "🔧 Action: Check IMAP proxy status" await telegram.to_matt(msg) try: asyncio.run(_send_alert()) self._last_alert_at = datetime.now(timezone.utc) logger.warning(f"Sent IMAP alert to Matt: {error_type}") except Exception as e: logger.error(f"Failed to send alert: {e}") except Exception as e: logger.error(f"Alert system failed: {e}") # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def start(self): """Start the IMAP IDLE listener in a background thread.""" if self._running: logger.warning("IMAP proxy already running") return self._running = True self._thread = threading.Thread(target=self._run_loop, daemon=True) self._thread.start() logger.info(f"IMAP proxy started for {self.config.username} @ {self.config.host}") def startup_sync(self, limit: int = 10) -> int: """Sync recent emails on startup to catch any that arrived while offline. This fetches the last N emails (regardless of read status) and processes any that haven't been seen yet. Used to ensure forwarded emails (which arrive as SEEN from iPhone) are properly handled. Returns: Number of new emails processed. """ if not self._connect(): logger.error("Failed to connect for startup sync") return 0 logger.info(f"Running startup sync: checking last {limit} emails...") self._fetch_all_messages(limit=limit) # Return number of new emails found conn = self._connection if conn: try: conn.select(self.config.mailbox) status, msg_ids = conn.search(None, "ALL") if status == "OK" and msg_ids[0]: id_list = msg_ids[0].split() recent_ids = id_list[-limit:] if len(id_list) > limit else id_list new_count = sum(1 for msg_id in recent_ids if not self._is_already_processed( msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id) )) logger.info(f"Startup sync complete. {new_count} unprocessed emails in last {len(recent_ids)} messages.") return new_count except Exception as e: logger.warning(f"Error during startup sync check: {e}") return 0 def stop(self): """Gracefully stop the IDLE listener.""" self._running = False if self._connection: try: self._connection.close() except Exception: pass self._status = "stopped" logger.info("IMAP proxy stopped") def get_status(self) -> Dict[str, Any]: """Return current proxy status for dashboard.""" uptime = None if self._connected_at: uptime = (datetime.now(timezone.utc) - self._connected_at).total_seconds() provider_label = PROVIDERS.get(self.config.provider, {}).get("label", self.config.provider) return { "provider": self.config.provider, "provider_label": provider_label, "host": self.config.host, "username": self.config.username, "status": self._status, "connected_at": self._connected_at.isoformat() if self._connected_at else None, "uptime_seconds": uptime, "emails_fetched": self._emails_fetched, "last_fetch_at": self._last_fetch_at.isoformat() if self._last_fetch_at else None, "last_error": self._last_error, "enabled": self.config.enabled, "idle_timeout": self.config.idle_timeout, } # ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _connect(self) -> bool: """Establish IMAP connection.""" try: self._connection = imaplib.IMAP4_SSL(self.config.host, self.config.port) self._connection.login(self.config.username, self.config.password) self._connection.select(self.config.mailbox) self._connected_at = datetime.now(timezone.utc) self._status = "connected" self._last_error = None self._consecutive_errors = 0 # Reset on success logger.info(f"Connected to {self.config.host} as {self.config.username}") return True except imaplib.IMAP4.error as e: self._status = "auth_error" self._last_error = f"Auth failed: {e}" self._auth_failures += 1 self._consecutive_errors += 1 logger.error(f"IMAP auth error: {e}") # Alert on auth errors immediately self._alert_on_failure("auth_error", str(e)) return False except Exception as e: self._status = "connection_error" self._last_error = str(e) self._consecutive_errors += 1 logger.error(f"IMAP connection error: {e}") # Alert on repeated connection errors self._alert_on_failure("connection_error", str(e)) return False def _run_loop(self): """Main IDLE loop — runs in background thread.""" while self._running: if not self._connect(): # Auth errors need human intervention; connection errors retry if self._status == "auth_error": logger.error("Auth error — stopping proxy. Check app-specific password.") self._running = False break time.sleep(30) continue try: self._idle_loop() except Exception as e: self._status = "error" self._last_error = str(e) self._consecutive_errors += 1 logger.error(f"IDLE loop error: {e}") self._alert_on_failure("idle_error", str(e)) time.sleep(10) def _idle_loop(self): """Monitor for new emails via IMAP IDLE.""" conn = self._connection if not conn: return while self._running: # Check for ALL messages (not just unseen) — handles forwarded emails self._fetch_all_messages() # Enter IDLE mode try: conn.select(self.config.mailbox) conn.send(b"%s IDLE\r\n" % conn._new_tag().encode()) # Wait for IDLE responses start = time.time() while self._running and (time.time() - start) < self.config.idle_timeout: # Check for new data (non-blocking read with timeout) conn.socket().settimeout(5) try: data = conn.readline() if b"EXISTS" in data or b"RECENT" in data: # New mail! Break out of IDLE to process conn.send(b"DONE\r\n") conn.readline() # consume IDLE terminated break except socket.timeout: continue except Exception: break # Refresh IDLE if timeout reached if self._running: try: conn.send(b"DONE\r\n") conn.readline() except Exception: pass except Exception as e: logger.debug(f"IDLE cycle: {e}") # Reconnect if needed if not self._check_connection(): return def _fetch_all_messages(self, limit: int = 50): """Fetch and process ALL emails (not just unseen), with deduplication. This handles forwarded emails that arrive as SEEN (already read). Uses message ID hashing to avoid reprocessing the same email. """ conn = self._connection if not conn: return try: conn.select(self.config.mailbox) # Fetch ALL messages (both seen and unseen) status, msg_ids = conn.search(None, "ALL") if status != "OK" or not msg_ids[0]: return id_list = msg_ids[0].split() # Process most recent messages first, limit to avoid overload id_list = id_list[-limit:] if len(id_list) > limit else id_list processed_count = 0 for msg_id in id_list: # Handle both bytes and int message IDs if isinstance(msg_id, bytes): msg_id_str = msg_id.decode() elif isinstance(msg_id, int): msg_id_str = str(msg_id) else: msg_id_str = str(msg_id) # Deduplication: skip if already processed if self._is_already_processed(msg_id_str): continue status, msg_data = conn.fetch(msg_id_str, "(RFC822)") if status != "OK": continue raw = msg_data[0][1] msg = email.message_from_bytes(raw) # Generate unique hash for this email subject = decode_str(msg.get("Subject", "")) sender = decode_str(msg.get("From", "")) date = msg.get("Date", "") body_preview = get_body(msg)[:500] # First 500 chars for hash unique_key = hashlib.sha256( f"{subject}|{sender}|{date}|{body_preview}".encode() ).hexdigest() # Skip if already processed (double-check with content hash) if self._is_already_processed(unique_key): # Also mark the msg_id as processed so we skip it next time self._save_processed_id(msg_id_str) continue email_data = { "id": msg_id_str, "message_hash": unique_key, "from": sender, "to": decode_str(msg.get("To")), "subject": subject, "date": date, "body": get_body(msg).strip(), "provider": self.config.provider, "fetched_at": datetime.now(timezone.utc).isoformat(), } # Dispatch to handler if self.handler: try: self.handler(email_data) except Exception as e: logger.error(f"Handler error for email {msg_id}: {e}") self._consecutive_errors += 1 self._alert_on_failure("handler_error", f"Handler failed for email {msg_id}: {e}") # Mark as seen (so we don't re-fetch as "new" on next cycle) # Encode msg_id_str back to bytes for IMAP command if needed if isinstance(msg_id, bytes): store_msg_id = msg_id else: store_msg_id = msg_id_str.encode() conn.store(store_msg_id, "+FLAGS", "\\Seen") # Save both IDs to dedupe file self._save_processed_id(msg_id_str) self._save_processed_id(unique_key) with self._lock: self._emails_fetched += 1 self._last_fetch_at = datetime.now(timezone.utc) processed_count += 1 if self.metrics_callback: try: self.metrics_callback(email_data) except Exception: pass if processed_count > 0: logger.info(f"Processed {processed_count} new email(s) from {limit} checked") except Exception as e: logger.error(f"Fetch error: {e}") def _check_connection(self) -> bool: """Verify IMAP connection is alive.""" try: if self._connection: self._connection.noop() return True except Exception: pass self._status = "disconnected" return False # --------------------------------------------------------------------------- # Dashboard metrics module # --------------------------------------------------------------------------- _METRICS_FILE = Path("/tmp/icarus-imap-metrics.json") _recent_emails: List[Dict] = [] _MAX_RECENT = 50 def record_email_processed(email_data: Dict): """Record a processed email for dashboard metrics.""" global _recent_emails entry = { "subject": email_data.get("subject", "(no subject)")[:80], "from": email_data.get("from", "unknown"), "provider": email_data.get("provider", "unknown"), "fetched_at": email_data.get("fetched_at", datetime.now(timezone.utc).isoformat()), } _recent_emails.append(entry) if len(_recent_emails) > _MAX_RECENT: _recent_emails = _recent_emails[-_MAX_RECENT:] # Persist to file for cross-process dashboard reads try: _METRICS_FILE.write_text(json.dumps({ "recent_emails": _recent_emails, "total_processed": len(_recent_emails), "last_updated": datetime.now(timezone.utc).isoformat(), }, indent=2)) except Exception: pass def get_metrics() -> Dict: """Get IMAP metrics for dashboard.""" try: if _METRICS_FILE.exists(): return json.loads(_METRICS_FILE.read_text()) except Exception: pass return { "recent_emails": _recent_emails, "total_processed": len(_recent_emails), "last_updated": datetime.now(timezone.utc).isoformat(), } # Socket import for IDLE timeout import socket import hashlib