šŸ“„ icloud_fetcher.py 7,073 bytes Apr 29, 2026 šŸ“‹ Raw

"""iCloud IMAP email fetcher — standalone for testing, then integration."""
import os
import imaplib
import email
from email.header import decode_header
import re
from html.parser import HTMLParser
from datetime import datetime, timezone
from typing import Optional

iCloud IMAP config

ICLOUD_EMAIL = os.getenv("ICLOUD_EMAIL", "Matthew.hoffmann89@gmail.com")
ICLOUD_APP_PASSWORD = os.getenv("ICLOUD_APP_PASSWORD", "")
IMAP_SERVER = "imap.mail.me.com"
IMAP_PORT = 993

class _HTMLToText(HTMLParser):
"""Lightweight HTML-to-plaintext converter."""

BLOCK_TAGS = frozenset([
    'p', 'div', 'br', 'hr', 'tr', 'li', 'h1', 'h2', 'h3', 'h4',
    'h5', 'h6', 'blockquote', 'table', 'section', 'article',
])
SKIP_TAGS = frozenset(['style', 'script', 'head'])

def __init__(self):
    super().__init__()
    self._pieces = []
    self._skip_depth = 0

def handle_starttag(self, tag, attrs):
    tag = tag.lower()
    if tag in self.SKIP_TAGS:
        self._skip_depth += 1
    elif tag in self.BLOCK_TAGS:
        self._pieces.append('\n')
    elif tag == 'li':
        self._pieces.append('\n• ')

def handle_endtag(self, tag):
    tag = tag.lower()
    if tag in self.SKIP_TAGS:
        self._skip_depth = max(0, self._skip_depth - 1)
    elif tag in self.BLOCK_TAGS:
        self._pieces.append('\n')

def handle_data(self, data):
    if self._skip_depth > 0:
        return
    self._pieces.append(data)

def get_text(self):
    text = ''.join(self._pieces)
    text = re.sub(r'[ \t]+', ' ', text)
    text = re.sub(r'\n{3,}', '\n\n', text)
    return text.strip()

def html_to_text(html: str) -> str:
"""Convert HTML to clean plaintext."""
if not html or '<' not in html:
return html or ''
parser = _HTMLToText()
try:
parser.feed(html)
return parser.get_text()
except Exception:
return html

def decode_email_header(header: Optional[str]) -> str:
"""Decode MIME-encoded email headers."""
if not header:
return ""
parts = decode_header(header)
result = []
for content, charset in parts:
if isinstance(content, bytes):
try:
result.append(content.decode(charset or 'utf-8', errors='replace'))
except Exception:
result.append(content.decode('utf-8', errors='replace'))
else:
result.append(content)
return ''.join(result)

def fetch_latest_emails(limit: int = 10, unread_only: bool = False) -> list[dict]:
"""Fetch latest emails from iCloud IMAP.

Returns list of dicts with:
- uid: str
- subject: str
- from: str
- date: str (ISO)
- body_text: str
- is_unread: bool
"""
if not ICLOUD_APP_PASSWORD:
    raise ValueError("ICLOUD_APP_PASSWORD not set")

emails = []

# Connect to iCloud IMAP
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
mail.login(ICLOUD_EMAIL, ICLOUD_APP_PASSWORD)

try:
    # Select inbox
    status, _ = mail.select("INBOX")
    if status != "OK":
        raise ConnectionError("Could not select INBOX")

    # Search for emails
    if unread_only:
        status, messages = mail.search(None, "UNSEEN")
    else:
        status, messages = mail.search(None, "ALL")

    if status != "OK" or not messages[0]:
        return []

    # Get UIDs in reverse order (newest first)
    uids_str = messages[0].decode() if isinstance(messages[0], bytes) else str(messages[0])
    uids = uids_str.split()
    if not uids:
        return []
    uids = uids[-limit:]  # Get last N

    for uid in reversed(uids):  # Reverse to get newest first
        status, msg_data = mail.fetch(uid, "(RFC822)")
        if status != "OK":
            continue


        # Handle fetch response structure
        if not msg_data or not msg_data[0]:
            continue

        if isinstance(msg_data[0], tuple):
            raw_email = msg_data[0][1]
        elif isinstance(msg_data[0], bytes):
            # Sometimes iCloud returns bytes directly
            raw_email = msg_data[0]
        else:
            continue

        msg = email.message_from_bytes(raw_email)

        # Parse headers
        subject = decode_email_header(msg.get("Subject", ""))
        from_addr = decode_email_header(msg.get("From", ""))
        date_str = msg.get("Date", "")

        # Parse body
        body_text = ""
        if msg.is_multipart():
            for part in msg.walk():
                content_type = part.get_content_type()
                if content_type == "text/plain":
                    try:
                        body_text = part.get_payload(decode=True).decode('utf-8', errors='replace')
                        break
                    except Exception:
                        continue
                elif content_type == "text/html" and not body_text:
                    try:
                        html = part.get_payload(decode=True).decode('utf-8', errors='replace')
                        body_text = html_to_text(html)
                    except Exception:
                        continue
        else:
            content_type = msg.get_content_type()
            try:
                payload = msg.get_payload(decode=True)
                if payload:
                    if content_type == "text/html":
                        body_text = html_to_text(payload.decode('utf-8', errors='replace'))
                    else:
                        body_text = payload.decode('utf-8', errors='replace')
            except Exception:
                pass

        # Check if unread (not \Seen flag)
        status, flags = mail.fetch(uid, "(FLAGS)")
        is_unread = b"\\Seen" not in (flags[0] if flags and flags[0] else b"")

        emails.append({
            "uid": uid,
            "subject": subject,
            "from": from_addr,
            "date": date_str,
            "body_text": body_text[:5000],  # Limit size
            "is_unread": is_unread
        })

finally:
    mail.logout()

return emails

if name == "main":
# Test the fetcher
import json

try:
    emails = fetch_latest_emails(limit=5, unread_only=False)
    print(f"Fetched {len(emails)} emails")
    for e in emails:
        print(f"\n--- {e['subject'][:60]} ---")
        print(f"From: {e['from'][:50]}")
        print(f"Unread: {e['is_unread']}")
        print(f"Body preview: {e['body_text'][:200]}...")
except Exception as ex:
    print(f"Error: {ex}")