📄 proxy.py 23,278 bytes Apr 30, 2026 📋 Raw

"""IMAP Proxy - Zero-config email ingress for Icarus.

Phase 6.2: IMAP IDLE client for iCloud Mail (and other providers).
- Outbound-only connection (appliance-initiated)
- App-specific password support
- IDLE monitoring for real-time email arrival
- Integrates with existing document pipeline
"""

import asyncio
import email
import hashlib
import imaplib
import json
import logging
import os
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from email.header import decode_header
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Callable, Dict, List, Optional, Any

logger = logging.getLogger(name)

HTML stripping (reused from family_assistant)

from family_assistant.email_fetcher import html_to_text

---------------------------------------------------------------------------

Provider presets

---------------------------------------------------------------------------

PROVIDERS = {
"icloud": {
"host": "imap.mail.me.com",
"port": 993,
"label": "iCloud Mail",
},
"gmail": {
"host": "imap.gmail.com",
"port": 993,
"label": "Gmail",
},
"outlook": {
"host": "outlook.office365.com",
"port": 993,
"label": "Outlook",
},
"yahoo": {
"host": "imap.mail.yahoo.com",
"port": 993,
"label": "Yahoo",
},
}

@dataclass
class IMAPConfig:
"""Configuration for a single IMAP account."""

provider: str = "icloud"
host: str = ""
port: int = 993
username: str = ""
password: str = ""  # App-specific password
mailbox: str = "INBOX"
idle_timeout: int = 60 * 5  # 5 min IDLE refresh
poll_interval: int = 30  # fallback poll if IDLE unsupported
enabled: bool = True

def __post_init__(self):
    if not self.host and self.provider in PROVIDERS:
        preset = PROVIDERS[self.provider]
        self.host = preset["host"]
        self.port = preset["port"]

@classmethod
def from_env(cls, prefix: str = "IMAP") -> "IMAPConfig":
    """Load config from environment variables.

    Expected vars: {PREFIX}_PROVIDER, {PREFIX}_USER, {PREFIX}_PASSWORD,
    optional: {PREFIX}_HOST, {PREFIX}_PORT, {PREFIX}_MAILBOX, {PREFIX}_ENABLED.
    """
    provider = os.getenv(f"{prefix}_PROVIDER", "icloud")
    return cls(
        provider=provider,
        host=os.getenv(f"{prefix}_HOST", ""),
        port=int(os.getenv(f"{prefix}_PORT", "993")),
        username=os.getenv(f"{prefix}_USER", ""),
        password=os.getenv(f"{prefix}_PASSWORD", ""),
        mailbox=os.getenv(f"{prefix}_MAILBOX", "INBOX"),
        enabled=os.getenv(f"{prefix}_ENABLED", "true").lower() == "true",
    )

def decode_str(s):
"""Decode an email header value."""
if s is None:
return ""
decoded = decode_header(s)
parts = []
for data, charset in decoded:
if isinstance(data, bytes):
parts.append(data.decode(charset or "utf-8", errors="replace"))
else:
parts.append(data)
return "".join(parts)

def get_body(msg) -> str:
"""Extract text body from email message."""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
if payload is None:
return ""
if isinstance(payload, bytes):
return payload.decode(charset, errors="replace")
elif isinstance(payload, int):
return str(payload)
else:
return str(payload)
for part in msg.walk():
if part.get_content_type() == "text/html":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
if payload is None:
return ""
if isinstance(payload, bytes):
html = payload.decode(charset, errors="replace")
elif isinstance(payload, int):
html = str(payload)
else:
html = str(payload)
return html_to_text(html)
else:
payload = msg.get_payload(decode=True)
charset = msg.get_content_charset() or "utf-8"
if payload is None:
return ""
if isinstance(payload, bytes):
text = payload.decode(charset, errors="replace")
elif isinstance(payload, int):
text = str(payload)
else:
text = str(payload)
if msg.get_content_type() == "text/html":
return html_to_text(text)
return text

class IMAPProxy:
"""IMAP IDLE proxy for zero-config email ingress.

Connects outbound to IMAP server, monitors for new mail via IDLE,
and dispatches to a handler callback for pipeline processing.
"""

def __init__(
    self,
    config: IMAPConfig,
    handler: Optional[Callable[[Dict], None]] = None,
    metrics_callback: Optional[Callable] = None,
):
    self.config = config
    self.handler = handler
    self.metrics_callback = metrics_callback
    self._connection: Optional[imaplib.IMAP4_SSL] = None
    self._running = False
    self._thread: Optional[threading.Thread] = None
    self._last_error: Optional[str] = None
    self._connected_at: Optional[datetime] = None
    self._emails_fetched: int = 0
    self._last_fetch_at: Optional[datetime] = None
    self._status: str = "disconnected"
    self._lock = threading.Lock()
    # Circuit breaker state
    self._auth_failures: int = 0
    self._consecutive_errors: int = 0
    self._last_alert_at: Optional[datetime] = None
    self._alert_cooldown_minutes: int = 30  # Don't spam alerts

    # Deduplication: track processed message IDs
    self._processed_ids_file = Path("/tmp/icarus-imap-processed.json")
    self._processed_ids: set = self._load_processed_ids()

def _load_processed_ids(self) -> set:
    """Load set of already-processed message IDs from disk."""
    try:
        if self._processed_ids_file.exists():
            data = json.loads(self._processed_ids_file.read_text())
            return set(data.get("processed_ids", []))
    except Exception as e:
        logger.warning(f"Could not load processed IDs: {e}")
    return set()

def _save_processed_id(self, msg_id: str):
    """Save a message ID as processed to disk."""
    self._processed_ids.add(msg_id)
    try:
        self._processed_ids_file.write_text(json.dumps({
            "processed_ids": list(self._processed_ids),
            "last_updated": datetime.now(timezone.utc).isoformat(),
        }, indent=2))
    except Exception as e:
        logger.warning(f"Could not save processed ID: {e}")

def _is_already_processed(self, msg_id: str) -> bool:
    """Check if a message ID has already been processed."""
    return msg_id in self._processed_ids

def _alert_on_failure(self, error_type: str, error_msg: str):
    """Send Telegram alert on persistent failures (circuit breaker)."""
    try:
        # Check cooldown
        if self._last_alert_at:
            minutes_since = (datetime.now(timezone.utc) - self._last_alert_at).total_seconds() / 60
            if minutes_since < self._alert_cooldown_minutes:
                return  # Still in cooldown

        # Only alert on repeated failures
        if self._consecutive_errors < 3 and error_type not in ["auth_error"]:
            return

        # Import here to avoid circular deps
        from shared.notify import TelegramNotifier
        telegram = TelegramNotifier()

        # Async send in sync context
        import asyncio

        async def _send_alert():
            msg = f"🚨 <b>IMAP Proxy Alert</b>\n\n"
            msg += f"<b>Error Type:</b> {error_type}\n"
            msg += f"<b>Account:</b> {self.config.username}\n"
            msg += f"<b>Provider:</b> {self.config.provider}\n"
            msg += f"<b>Consecutive Errors:</b> {self._consecutive_errors}\n\n"
            msg += f"<b>Error:</b> {error_msg[:200]}\n\n"

            if error_type == "auth_error":
                msg += "🔑 <b>Action Required:</b> Check app-specific password"
            else:
                msg += "🔧 <b>Action:</b> Check IMAP proxy status"

            await telegram.to_matt(msg)

        try:
            asyncio.run(_send_alert())
            self._last_alert_at = datetime.now(timezone.utc)
            logger.warning(f"Sent IMAP alert to Matt: {error_type}")
        except Exception as e:
            logger.error(f"Failed to send alert: {e}")

    except Exception as e:
        logger.error(f"Alert system failed: {e}")

# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------

def start(self):
    """Start the IMAP IDLE listener in a background thread."""
    if self._running:
        logger.warning("IMAP proxy already running")
        return
    self._running = True
    self._thread = threading.Thread(target=self._run_loop, daemon=True)
    self._thread.start()
    logger.info(f"IMAP proxy started for {self.config.username} @ {self.config.host}")

def startup_sync(self, limit: int = 10) -> int:
    """Sync recent emails on startup to catch any that arrived while offline.

    This fetches the last N emails (regardless of read status) and processes
    any that haven't been seen yet. Used to ensure forwarded emails (which
    arrive as SEEN from iPhone) are properly handled.

    Returns: Number of new emails processed.
    """
    if not self._connect():
        logger.error("Failed to connect for startup sync")
        return 0

    logger.info(f"Running startup sync: checking last {limit} emails...")
    self._fetch_all_messages(limit=limit)

    # Return number of new emails found
    conn = self._connection
    if conn:
        try:
            conn.select(self.config.mailbox)
            status, msg_ids = conn.search(None, "ALL")
            if status == "OK" and msg_ids[0]:
                id_list = msg_ids[0].split()
                recent_ids = id_list[-limit:] if len(id_list) > limit else id_list
                new_count = sum(1 for msg_id in recent_ids if not self._is_already_processed(
                    msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id)
                ))
                logger.info(f"Startup sync complete. {new_count} unprocessed emails in last {len(recent_ids)} messages.")
                return new_count
        except Exception as e:
            logger.warning(f"Error during startup sync check: {e}")
    return 0

def stop(self):
    """Gracefully stop the IDLE listener."""
    self._running = False
    if self._connection:
        try:
            self._connection.close()
        except Exception:
            pass
    self._status = "stopped"
    logger.info("IMAP proxy stopped")

def get_status(self) -> Dict[str, Any]:
    """Return current proxy status for dashboard."""
    uptime = None
    if self._connected_at:
        uptime = (datetime.now(timezone.utc) - self._connected_at).total_seconds()

    provider_label = PROVIDERS.get(self.config.provider, {}).get("label", self.config.provider)

    return {
        "provider": self.config.provider,
        "provider_label": provider_label,
        "host": self.config.host,
        "username": self.config.username,
        "status": self._status,
        "connected_at": self._connected_at.isoformat() if self._connected_at else None,
        "uptime_seconds": uptime,
        "emails_fetched": self._emails_fetched,
        "last_fetch_at": self._last_fetch_at.isoformat() if self._last_fetch_at else None,
        "last_error": self._last_error,
        "enabled": self.config.enabled,
        "idle_timeout": self.config.idle_timeout,
    }

# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------

def _connect(self) -> bool:
    """Establish IMAP connection."""
    try:
        self._connection = imaplib.IMAP4_SSL(self.config.host, self.config.port)
        self._connection.login(self.config.username, self.config.password)
        self._connection.select(self.config.mailbox)
        self._connected_at = datetime.now(timezone.utc)
        self._status = "connected"
        self._last_error = None
        self._consecutive_errors = 0  # Reset on success
        logger.info(f"Connected to {self.config.host} as {self.config.username}")
        return True
    except imaplib.IMAP4.error as e:
        self._status = "auth_error"
        self._last_error = f"Auth failed: {e}"
        self._auth_failures += 1
        self._consecutive_errors += 1
        logger.error(f"IMAP auth error: {e}")
        # Alert on auth errors immediately
        self._alert_on_failure("auth_error", str(e))
        return False
    except Exception as e:
        self._status = "connection_error"
        self._last_error = str(e)
        self._consecutive_errors += 1
        logger.error(f"IMAP connection error: {e}")
        # Alert on repeated connection errors
        self._alert_on_failure("connection_error", str(e))
        return False

def _run_loop(self):
    """Main IDLE loop — runs in background thread."""
    while self._running:
        if not self._connect():
            # Auth errors need human intervention; connection errors retry
            if self._status == "auth_error":
                logger.error("Auth error — stopping proxy. Check app-specific password.")
                self._running = False
                break
            time.sleep(30)
            continue

        try:
            self._idle_loop()
        except Exception as e:
            self._status = "error"
            self._last_error = str(e)
            self._consecutive_errors += 1
            logger.error(f"IDLE loop error: {e}")
            self._alert_on_failure("idle_error", str(e))
            time.sleep(10)

def _idle_loop(self):
    """Monitor for new emails via IMAP IDLE."""
    conn = self._connection
    if not conn:
        return

    while self._running:
        # Check for ALL messages (not just unseen) — handles forwarded emails
        self._fetch_all_messages()

        # Enter IDLE mode
        try:
            conn.select(self.config.mailbox)
            conn.send(b"%s IDLE\r\n" % conn._new_tag().encode())
            # Wait for IDLE responses
            start = time.time()
            while self._running and (time.time() - start) < self.config.idle_timeout:
                # Check for new data (non-blocking read with timeout)
                conn.socket().settimeout(5)
                try:
                    data = conn.readline()
                    if b"EXISTS" in data or b"RECENT" in data:
                        # New mail! Break out of IDLE to process
                        conn.send(b"DONE\r\n")
                        conn.readline()  # consume IDLE terminated
                        break
                except socket.timeout:
                    continue
                except Exception:
                    break

            # Refresh IDLE if timeout reached
            if self._running:
                try:
                    conn.send(b"DONE\r\n")
                    conn.readline()
                except Exception:
                    pass

        except Exception as e:
            logger.debug(f"IDLE cycle: {e}")
            # Reconnect if needed
            if not self._check_connection():
                return

def _fetch_all_messages(self, limit: int = 50):
    """Fetch and process ALL emails (not just unseen), with deduplication.

    This handles forwarded emails that arrive as SEEN (already read).
    Uses message ID hashing to avoid reprocessing the same email.
    """
    conn = self._connection
    if not conn:
        return

    try:
        conn.select(self.config.mailbox)
        # Fetch ALL messages (both seen and unseen)
        status, msg_ids = conn.search(None, "ALL")
        if status != "OK" or not msg_ids[0]:
            return

        id_list = msg_ids[0].split()
        # Process most recent messages first, limit to avoid overload
        id_list = id_list[-limit:] if len(id_list) > limit else id_list

        processed_count = 0
        for msg_id in id_list:
            # Handle both bytes and int message IDs
            if isinstance(msg_id, bytes):
                msg_id_str = msg_id.decode()
            elif isinstance(msg_id, int):
                msg_id_str = str(msg_id)
            else:
                msg_id_str = str(msg_id)

            # Deduplication: skip if already processed
            if self._is_already_processed(msg_id_str):
                continue

            status, msg_data = conn.fetch(msg_id_str, "(RFC822)")
            if status != "OK":
                continue

            raw = msg_data[0][1]
            msg = email.message_from_bytes(raw)

            # Generate unique hash for this email
            subject = decode_str(msg.get("Subject", ""))
            sender = decode_str(msg.get("From", ""))
            date = msg.get("Date", "")
            body_preview = get_body(msg)[:500]  # First 500 chars for hash
            unique_key = hashlib.sha256(
                f"{subject}|{sender}|{date}|{body_preview}".encode()
            ).hexdigest()

            # Skip if already processed (double-check with content hash)
            if self._is_already_processed(unique_key):
                # Also mark the msg_id as processed so we skip it next time
                self._save_processed_id(msg_id_str)
                continue

            email_data = {
                "id": msg_id_str,
                "message_hash": unique_key,
                "from": sender,
                "to": decode_str(msg.get("To")),
                "subject": subject,
                "date": date,
                "body": get_body(msg).strip(),
                "provider": self.config.provider,
                "fetched_at": datetime.now(timezone.utc).isoformat(),
            }

            # Dispatch to handler
            if self.handler:
                try:
                    self.handler(email_data)
                except Exception as e:
                    logger.error(f"Handler error for email {msg_id}: {e}")
                    self._consecutive_errors += 1
                    self._alert_on_failure("handler_error", f"Handler failed for email {msg_id}: {e}")

            # Mark as seen (so we don't re-fetch as "new" on next cycle)
            # Encode msg_id_str back to bytes for IMAP command if needed
            if isinstance(msg_id, bytes):
                store_msg_id = msg_id
            else:
                store_msg_id = msg_id_str.encode()
            conn.store(store_msg_id, "+FLAGS", "\\Seen")

            # Save both IDs to dedupe file
            self._save_processed_id(msg_id_str)
            self._save_processed_id(unique_key)

            with self._lock:
                self._emails_fetched += 1
                self._last_fetch_at = datetime.now(timezone.utc)

            processed_count += 1

            if self.metrics_callback:
                try:
                    self.metrics_callback(email_data)
                except Exception:
                    pass

        if processed_count > 0:
            logger.info(f"Processed {processed_count} new email(s) from {limit} checked")

    except Exception as e:
        logger.error(f"Fetch error: {e}")

def _check_connection(self) -> bool:
    """Verify IMAP connection is alive."""
    try:
        if self._connection:
            self._connection.noop()
            return True
    except Exception:
        pass
    self._status = "disconnected"
    return False

---------------------------------------------------------------------------

Dashboard metrics module

---------------------------------------------------------------------------

_METRICS_FILE = Path("/tmp/icarus-imap-metrics.json")
_recent_emails: List[Dict] = []
_MAX_RECENT = 50

def record_email_processed(email_data: Dict):
"""Record a processed email for dashboard metrics."""
global _recent_emails
entry = {
"subject": email_data.get("subject", "(no subject)")[:80],
"from": email_data.get("from", "unknown"),
"provider": email_data.get("provider", "unknown"),
"fetched_at": email_data.get("fetched_at", datetime.now(timezone.utc).isoformat()),
}
_recent_emails.append(entry)
if len(_recent_emails) > _MAX_RECENT:
_recent_emails = _recent_emails[-_MAX_RECENT:]

# Persist to file for cross-process dashboard reads
try:
    _METRICS_FILE.write_text(json.dumps({
        "recent_emails": _recent_emails,
        "total_processed": len(_recent_emails),
        "last_updated": datetime.now(timezone.utc).isoformat(),
    }, indent=2))
except Exception:
    pass

def get_metrics() -> Dict:
"""Get IMAP metrics for dashboard."""
try:
if _METRICS_FILE.exists():
return json.loads(_METRICS_FILE.read_text())
except Exception:
pass
return {
"recent_emails": _recent_emails,
"total_processed": len(_recent_emails),
"last_updated": datetime.now(timezone.utc).isoformat(),
}

Socket import for IDLE timeout

import socket
import hashlib