"""iCloud IMAP email fetcher — standalone for testing, then integration.""" import os import imaplib import email from email.header import decode_header import re from html.parser import HTMLParser from datetime import datetime, timezone from typing import Optional # iCloud IMAP config ICLOUD_EMAIL = os.getenv("ICLOUD_EMAIL", "Matthew.hoffmann89@gmail.com") ICLOUD_APP_PASSWORD = os.getenv("ICLOUD_APP_PASSWORD", "") IMAP_SERVER = "imap.mail.me.com" IMAP_PORT = 993 class _HTMLToText(HTMLParser): """Lightweight HTML-to-plaintext converter.""" BLOCK_TAGS = frozenset([ 'p', 'div', 'br', 'hr', 'tr', 'li', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'blockquote', 'table', 'section', 'article', ]) SKIP_TAGS = frozenset(['style', 'script', 'head']) def __init__(self): super().__init__() self._pieces = [] self._skip_depth = 0 def handle_starttag(self, tag, attrs): tag = tag.lower() if tag in self.SKIP_TAGS: self._skip_depth += 1 elif tag in self.BLOCK_TAGS: self._pieces.append('\n') elif tag == 'li': self._pieces.append('\n• ') def handle_endtag(self, tag): tag = tag.lower() if tag in self.SKIP_TAGS: self._skip_depth = max(0, self._skip_depth - 1) elif tag in self.BLOCK_TAGS: self._pieces.append('\n') def handle_data(self, data): if self._skip_depth > 0: return self._pieces.append(data) def get_text(self): text = ''.join(self._pieces) text = re.sub(r'[ \t]+', ' ', text) text = re.sub(r'\n{3,}', '\n\n', text) return text.strip() def html_to_text(html: str) -> str: """Convert HTML to clean plaintext.""" if not html or '<' not in html: return html or '' parser = _HTMLToText() try: parser.feed(html) return parser.get_text() except Exception: return html def decode_email_header(header: Optional[str]) -> str: """Decode MIME-encoded email headers.""" if not header: return "" parts = decode_header(header) result = [] for content, charset in parts: if isinstance(content, bytes): try: result.append(content.decode(charset or 'utf-8', errors='replace')) except Exception: result.append(content.decode('utf-8', errors='replace')) else: result.append(content) return ''.join(result) def fetch_latest_emails(limit: int = 10, unread_only: bool = False) -> list[dict]: """Fetch latest emails from iCloud IMAP. Returns list of dicts with: - uid: str - subject: str - from: str - date: str (ISO) - body_text: str - is_unread: bool """ if not ICLOUD_APP_PASSWORD: raise ValueError("ICLOUD_APP_PASSWORD not set") emails = [] # Connect to iCloud IMAP mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) mail.login(ICLOUD_EMAIL, ICLOUD_APP_PASSWORD) try: # Select inbox status, _ = mail.select("INBOX") if status != "OK": raise ConnectionError("Could not select INBOX") # Search for emails if unread_only: status, messages = mail.search(None, "UNSEEN") else: status, messages = mail.search(None, "ALL") if status != "OK" or not messages[0]: return [] # Get UIDs in reverse order (newest first) uids_str = messages[0].decode() if isinstance(messages[0], bytes) else str(messages[0]) uids = uids_str.split() if not uids: return [] uids = uids[-limit:] # Get last N for uid in reversed(uids): # Reverse to get newest first status, msg_data = mail.fetch(uid, "(RFC822)") if status != "OK": continue # Handle fetch response structure if not msg_data or not msg_data[0]: continue if isinstance(msg_data[0], tuple): raw_email = msg_data[0][1] elif isinstance(msg_data[0], bytes): # Sometimes iCloud returns bytes directly raw_email = msg_data[0] else: continue msg = email.message_from_bytes(raw_email) # Parse headers subject = decode_email_header(msg.get("Subject", "")) from_addr = decode_email_header(msg.get("From", "")) date_str = msg.get("Date", "") # Parse body body_text = "" if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() if content_type == "text/plain": try: body_text = part.get_payload(decode=True).decode('utf-8', errors='replace') break except Exception: continue elif content_type == "text/html" and not body_text: try: html = part.get_payload(decode=True).decode('utf-8', errors='replace') body_text = html_to_text(html) except Exception: continue else: content_type = msg.get_content_type() try: payload = msg.get_payload(decode=True) if payload: if content_type == "text/html": body_text = html_to_text(payload.decode('utf-8', errors='replace')) else: body_text = payload.decode('utf-8', errors='replace') except Exception: pass # Check if unread (not \Seen flag) status, flags = mail.fetch(uid, "(FLAGS)") is_unread = b"\\Seen" not in (flags[0] if flags and flags[0] else b"") emails.append({ "uid": uid, "subject": subject, "from": from_addr, "date": date_str, "body_text": body_text[:5000], # Limit size "is_unread": is_unread }) finally: mail.logout() return emails if __name__ == "__main__": # Test the fetcher import json try: emails = fetch_latest_emails(limit=5, unread_only=False) print(f"Fetched {len(emails)} emails") for e in emails: print(f"\n--- {e['subject'][:60]} ---") print(f"From: {e['from'][:50]}") print(f"Unread: {e['is_unread']}") print(f"Body preview: {e['body_text'][:200]}...") except Exception as ex: print(f"Error: {ex}")