"""IMAP Proxy - Zero-config email ingress for Icarus.
Phase 6.2: IMAP IDLE client for iCloud Mail (and other providers).
- Outbound-only connection (appliance-initiated)
- App-specific password support
- IDLE monitoring for real-time email arrival
- Integrates with existing document pipeline
"""
import asyncio
import email
import hashlib
import imaplib
import json
import logging
import os
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from email.header import decode_header
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Callable, Dict, List, Optional, Any
logger = logging.getLogger(name)
HTML stripping (reused from family_assistant)
from family_assistant.email_fetcher import html_to_text
---------------------------------------------------------------------------
Provider presets
---------------------------------------------------------------------------
PROVIDERS = {
"icloud": {
"host": "imap.mail.me.com",
"port": 993,
"label": "iCloud Mail",
},
"gmail": {
"host": "imap.gmail.com",
"port": 993,
"label": "Gmail",
},
"outlook": {
"host": "outlook.office365.com",
"port": 993,
"label": "Outlook",
},
"yahoo": {
"host": "imap.mail.yahoo.com",
"port": 993,
"label": "Yahoo",
},
}
@dataclass
class IMAPConfig:
"""Configuration for a single IMAP account."""
provider: str = "icloud"
host: str = ""
port: int = 993
username: str = ""
password: str = "" # App-specific password
mailbox: str = "INBOX"
idle_timeout: int = 60 * 5 # 5 min IDLE refresh
poll_interval: int = 30 # fallback poll if IDLE unsupported
enabled: bool = True
def __post_init__(self):
if not self.host and self.provider in PROVIDERS:
preset = PROVIDERS[self.provider]
self.host = preset["host"]
self.port = preset["port"]
@classmethod
def from_env(cls, prefix: str = "IMAP") -> "IMAPConfig":
"""Load config from environment variables.
Expected vars: {PREFIX}_PROVIDER, {PREFIX}_USER, {PREFIX}_PASSWORD,
optional: {PREFIX}_HOST, {PREFIX}_PORT, {PREFIX}_MAILBOX, {PREFIX}_ENABLED.
"""
provider = os.getenv(f"{prefix}_PROVIDER", "icloud")
return cls(
provider=provider,
host=os.getenv(f"{prefix}_HOST", ""),
port=int(os.getenv(f"{prefix}_PORT", "993")),
username=os.getenv(f"{prefix}_USER", ""),
password=os.getenv(f"{prefix}_PASSWORD", ""),
mailbox=os.getenv(f"{prefix}_MAILBOX", "INBOX"),
enabled=os.getenv(f"{prefix}_ENABLED", "true").lower() == "true",
)
def decode_str(s):
"""Decode an email header value."""
if s is None:
return ""
decoded = decode_header(s)
parts = []
for data, charset in decoded:
if isinstance(data, bytes):
parts.append(data.decode(charset or "utf-8", errors="replace"))
else:
parts.append(data)
return "".join(parts)
def get_body(msg) -> str:
"""Extract text body from email message."""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
if payload is None:
return ""
if isinstance(payload, bytes):
return payload.decode(charset, errors="replace")
elif isinstance(payload, int):
return str(payload)
else:
return str(payload)
for part in msg.walk():
if part.get_content_type() == "text/html":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
if payload is None:
return ""
if isinstance(payload, bytes):
html = payload.decode(charset, errors="replace")
elif isinstance(payload, int):
html = str(payload)
else:
html = str(payload)
return html_to_text(html)
else:
payload = msg.get_payload(decode=True)
charset = msg.get_content_charset() or "utf-8"
if payload is None:
return ""
if isinstance(payload, bytes):
text = payload.decode(charset, errors="replace")
elif isinstance(payload, int):
text = str(payload)
else:
text = str(payload)
if msg.get_content_type() == "text/html":
return html_to_text(text)
return text
class IMAPProxy:
"""IMAP IDLE proxy for zero-config email ingress.
Connects outbound to IMAP server, monitors for new mail via IDLE,
and dispatches to a handler callback for pipeline processing.
"""
def __init__(
self,
config: IMAPConfig,
handler: Optional[Callable[[Dict], None]] = None,
metrics_callback: Optional[Callable] = None,
):
self.config = config
self.handler = handler
self.metrics_callback = metrics_callback
self._connection: Optional[imaplib.IMAP4_SSL] = None
self._running = False
self._thread: Optional[threading.Thread] = None
self._last_error: Optional[str] = None
self._connected_at: Optional[datetime] = None
self._emails_fetched: int = 0
self._last_fetch_at: Optional[datetime] = None
self._status: str = "disconnected"
self._lock = threading.Lock()
# Circuit breaker state
self._auth_failures: int = 0
self._consecutive_errors: int = 0
self._last_alert_at: Optional[datetime] = None
self._alert_cooldown_minutes: int = 30 # Don't spam alerts
# Deduplication: track processed message IDs
self._processed_ids_file = Path("/tmp/icarus-imap-processed.json")
self._processed_ids: set = self._load_processed_ids()
def _load_processed_ids(self) -> set:
"""Load set of already-processed message IDs from disk."""
try:
if self._processed_ids_file.exists():
data = json.loads(self._processed_ids_file.read_text())
return set(data.get("processed_ids", []))
except Exception as e:
logger.warning(f"Could not load processed IDs: {e}")
return set()
def _save_processed_id(self, msg_id: str):
"""Save a message ID as processed to disk."""
self._processed_ids.add(msg_id)
try:
self._processed_ids_file.write_text(json.dumps({
"processed_ids": list(self._processed_ids),
"last_updated": datetime.now(timezone.utc).isoformat(),
}, indent=2))
except Exception as e:
logger.warning(f"Could not save processed ID: {e}")
def _is_already_processed(self, msg_id: str) -> bool:
"""Check if a message ID has already been processed."""
return msg_id in self._processed_ids
def _alert_on_failure(self, error_type: str, error_msg: str):
"""Send Telegram alert on persistent failures (circuit breaker)."""
try:
# Check cooldown
if self._last_alert_at:
minutes_since = (datetime.now(timezone.utc) - self._last_alert_at).total_seconds() / 60
if minutes_since < self._alert_cooldown_minutes:
return # Still in cooldown
# Only alert on repeated failures
if self._consecutive_errors < 3 and error_type not in ["auth_error"]:
return
# Import here to avoid circular deps
from shared.notify import TelegramNotifier
telegram = TelegramNotifier()
# Async send in sync context
import asyncio
async def _send_alert():
msg = f"🚨 <b>IMAP Proxy Alert</b>\n\n"
msg += f"<b>Error Type:</b> {error_type}\n"
msg += f"<b>Account:</b> {self.config.username}\n"
msg += f"<b>Provider:</b> {self.config.provider}\n"
msg += f"<b>Consecutive Errors:</b> {self._consecutive_errors}\n\n"
msg += f"<b>Error:</b> {error_msg[:200]}\n\n"
if error_type == "auth_error":
msg += "🔑 <b>Action Required:</b> Check app-specific password"
else:
msg += "🔧 <b>Action:</b> Check IMAP proxy status"
await telegram.to_matt(msg)
try:
asyncio.run(_send_alert())
self._last_alert_at = datetime.now(timezone.utc)
logger.warning(f"Sent IMAP alert to Matt: {error_type}")
except Exception as e:
logger.error(f"Failed to send alert: {e}")
except Exception as e:
logger.error(f"Alert system failed: {e}")
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def start(self):
"""Start the IMAP IDLE listener in a background thread."""
if self._running:
logger.warning("IMAP proxy already running")
return
self._running = True
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
logger.info(f"IMAP proxy started for {self.config.username} @ {self.config.host}")
def startup_sync(self, limit: int = 10) -> int:
"""Sync recent emails on startup to catch any that arrived while offline.
This fetches the last N emails (regardless of read status) and processes
any that haven't been seen yet. Used to ensure forwarded emails (which
arrive as SEEN from iPhone) are properly handled.
Returns: Number of new emails processed.
"""
if not self._connect():
logger.error("Failed to connect for startup sync")
return 0
logger.info(f"Running startup sync: checking last {limit} emails...")
self._fetch_all_messages(limit=limit)
# Return number of new emails found
conn = self._connection
if conn:
try:
conn.select(self.config.mailbox)
status, msg_ids = conn.search(None, "ALL")
if status == "OK" and msg_ids[0]:
id_list = msg_ids[0].split()
recent_ids = id_list[-limit:] if len(id_list) > limit else id_list
new_count = sum(1 for msg_id in recent_ids if not self._is_already_processed(
msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id)
))
logger.info(f"Startup sync complete. {new_count} unprocessed emails in last {len(recent_ids)} messages.")
return new_count
except Exception as e:
logger.warning(f"Error during startup sync check: {e}")
return 0
def stop(self):
"""Gracefully stop the IDLE listener."""
self._running = False
if self._connection:
try:
self._connection.close()
except Exception:
pass
self._status = "stopped"
logger.info("IMAP proxy stopped")
def get_status(self) -> Dict[str, Any]:
"""Return current proxy status for dashboard."""
uptime = None
if self._connected_at:
uptime = (datetime.now(timezone.utc) - self._connected_at).total_seconds()
provider_label = PROVIDERS.get(self.config.provider, {}).get("label", self.config.provider)
return {
"provider": self.config.provider,
"provider_label": provider_label,
"host": self.config.host,
"username": self.config.username,
"status": self._status,
"connected_at": self._connected_at.isoformat() if self._connected_at else None,
"uptime_seconds": uptime,
"emails_fetched": self._emails_fetched,
"last_fetch_at": self._last_fetch_at.isoformat() if self._last_fetch_at else None,
"last_error": self._last_error,
"enabled": self.config.enabled,
"idle_timeout": self.config.idle_timeout,
}
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _connect(self) -> bool:
"""Establish IMAP connection."""
try:
self._connection = imaplib.IMAP4_SSL(self.config.host, self.config.port)
self._connection.login(self.config.username, self.config.password)
self._connection.select(self.config.mailbox)
self._connected_at = datetime.now(timezone.utc)
self._status = "connected"
self._last_error = None
self._consecutive_errors = 0 # Reset on success
logger.info(f"Connected to {self.config.host} as {self.config.username}")
return True
except imaplib.IMAP4.error as e:
self._status = "auth_error"
self._last_error = f"Auth failed: {e}"
self._auth_failures += 1
self._consecutive_errors += 1
logger.error(f"IMAP auth error: {e}")
# Alert on auth errors immediately
self._alert_on_failure("auth_error", str(e))
return False
except Exception as e:
self._status = "connection_error"
self._last_error = str(e)
self._consecutive_errors += 1
logger.error(f"IMAP connection error: {e}")
# Alert on repeated connection errors
self._alert_on_failure("connection_error", str(e))
return False
def _run_loop(self):
"""Main IDLE loop — runs in background thread."""
while self._running:
if not self._connect():
# Auth errors need human intervention; connection errors retry
if self._status == "auth_error":
logger.error("Auth error — stopping proxy. Check app-specific password.")
self._running = False
break
time.sleep(30)
continue
try:
self._idle_loop()
except Exception as e:
self._status = "error"
self._last_error = str(e)
self._consecutive_errors += 1
logger.error(f"IDLE loop error: {e}")
self._alert_on_failure("idle_error", str(e))
time.sleep(10)
def _idle_loop(self):
"""Monitor for new emails via IMAP IDLE."""
conn = self._connection
if not conn:
return
while self._running:
# Check for ALL messages (not just unseen) — handles forwarded emails
self._fetch_all_messages()
# Enter IDLE mode
try:
conn.select(self.config.mailbox)
conn.send(b"%s IDLE\r\n" % conn._new_tag().encode())
# Wait for IDLE responses
start = time.time()
while self._running and (time.time() - start) < self.config.idle_timeout:
# Check for new data (non-blocking read with timeout)
conn.socket().settimeout(5)
try:
data = conn.readline()
if b"EXISTS" in data or b"RECENT" in data:
# New mail! Break out of IDLE to process
conn.send(b"DONE\r\n")
conn.readline() # consume IDLE terminated
break
except socket.timeout:
continue
except Exception:
break
# Refresh IDLE if timeout reached
if self._running:
try:
conn.send(b"DONE\r\n")
conn.readline()
except Exception:
pass
except Exception as e:
logger.debug(f"IDLE cycle: {e}")
# Reconnect if needed
if not self._check_connection():
return
def _fetch_all_messages(self, limit: int = 50):
"""Fetch and process ALL emails (not just unseen), with deduplication.
This handles forwarded emails that arrive as SEEN (already read).
Uses message ID hashing to avoid reprocessing the same email.
"""
conn = self._connection
if not conn:
return
try:
conn.select(self.config.mailbox)
# Fetch ALL messages (both seen and unseen)
status, msg_ids = conn.search(None, "ALL")
if status != "OK" or not msg_ids[0]:
return
id_list = msg_ids[0].split()
# Process most recent messages first, limit to avoid overload
id_list = id_list[-limit:] if len(id_list) > limit else id_list
processed_count = 0
for msg_id in id_list:
# Handle both bytes and int message IDs
if isinstance(msg_id, bytes):
msg_id_str = msg_id.decode()
elif isinstance(msg_id, int):
msg_id_str = str(msg_id)
else:
msg_id_str = str(msg_id)
# Deduplication: skip if already processed
if self._is_already_processed(msg_id_str):
continue
status, msg_data = conn.fetch(msg_id_str, "(RFC822)")
if status != "OK":
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
# Generate unique hash for this email
subject = decode_str(msg.get("Subject", ""))
sender = decode_str(msg.get("From", ""))
date = msg.get("Date", "")
body_preview = get_body(msg)[:500] # First 500 chars for hash
unique_key = hashlib.sha256(
f"{subject}|{sender}|{date}|{body_preview}".encode()
).hexdigest()
# Skip if already processed (double-check with content hash)
if self._is_already_processed(unique_key):
# Also mark the msg_id as processed so we skip it next time
self._save_processed_id(msg_id_str)
continue
email_data = {
"id": msg_id_str,
"message_hash": unique_key,
"from": sender,
"to": decode_str(msg.get("To")),
"subject": subject,
"date": date,
"body": get_body(msg).strip(),
"provider": self.config.provider,
"fetched_at": datetime.now(timezone.utc).isoformat(),
}
# Dispatch to handler
if self.handler:
try:
self.handler(email_data)
except Exception as e:
logger.error(f"Handler error for email {msg_id}: {e}")
self._consecutive_errors += 1
self._alert_on_failure("handler_error", f"Handler failed for email {msg_id}: {e}")
# Mark as seen (so we don't re-fetch as "new" on next cycle)
# Encode msg_id_str back to bytes for IMAP command if needed
if isinstance(msg_id, bytes):
store_msg_id = msg_id
else:
store_msg_id = msg_id_str.encode()
conn.store(store_msg_id, "+FLAGS", "\\Seen")
# Save both IDs to dedupe file
self._save_processed_id(msg_id_str)
self._save_processed_id(unique_key)
with self._lock:
self._emails_fetched += 1
self._last_fetch_at = datetime.now(timezone.utc)
processed_count += 1
if self.metrics_callback:
try:
self.metrics_callback(email_data)
except Exception:
pass
if processed_count > 0:
logger.info(f"Processed {processed_count} new email(s) from {limit} checked")
except Exception as e:
logger.error(f"Fetch error: {e}")
def _check_connection(self) -> bool:
"""Verify IMAP connection is alive."""
try:
if self._connection:
self._connection.noop()
return True
except Exception:
pass
self._status = "disconnected"
return False
---------------------------------------------------------------------------
Dashboard metrics module
---------------------------------------------------------------------------
_METRICS_FILE = Path("/tmp/icarus-imap-metrics.json")
_recent_emails: List[Dict] = []
_MAX_RECENT = 50
def record_email_processed(email_data: Dict):
"""Record a processed email for dashboard metrics."""
global _recent_emails
entry = {
"subject": email_data.get("subject", "(no subject)")[:80],
"from": email_data.get("from", "unknown"),
"provider": email_data.get("provider", "unknown"),
"fetched_at": email_data.get("fetched_at", datetime.now(timezone.utc).isoformat()),
}
_recent_emails.append(entry)
if len(_recent_emails) > _MAX_RECENT:
_recent_emails = _recent_emails[-_MAX_RECENT:]
# Persist to file for cross-process dashboard reads
try:
_METRICS_FILE.write_text(json.dumps({
"recent_emails": _recent_emails,
"total_processed": len(_recent_emails),
"last_updated": datetime.now(timezone.utc).isoformat(),
}, indent=2))
except Exception:
pass
def get_metrics() -> Dict:
"""Get IMAP metrics for dashboard."""
try:
if _METRICS_FILE.exists():
return json.loads(_METRICS_FILE.read_text())
except Exception:
pass
return {
"recent_emails": _recent_emails,
"total_processed": len(_recent_emails),
"last_updated": datetime.now(timezone.utc).isoformat(),
}
Socket import for IDLE timeout
import socket
import hashlib