"""iCloud IMAP email fetcher ā standalone for testing, then integration."""
import os
import imaplib
import email
from email.header import decode_header
import re
from html.parser import HTMLParser
from datetime import datetime, timezone
from typing import Optional
iCloud IMAP config
ICLOUD_EMAIL = os.getenv("ICLOUD_EMAIL", "Matthew.hoffmann89@gmail.com")
ICLOUD_APP_PASSWORD = os.getenv("ICLOUD_APP_PASSWORD", "")
IMAP_SERVER = "imap.mail.me.com"
IMAP_PORT = 993
class _HTMLToText(HTMLParser):
"""Lightweight HTML-to-plaintext converter."""
BLOCK_TAGS = frozenset([
'p', 'div', 'br', 'hr', 'tr', 'li', 'h1', 'h2', 'h3', 'h4',
'h5', 'h6', 'blockquote', 'table', 'section', 'article',
])
SKIP_TAGS = frozenset(['style', 'script', 'head'])
def __init__(self):
super().__init__()
self._pieces = []
self._skip_depth = 0
def handle_starttag(self, tag, attrs):
tag = tag.lower()
if tag in self.SKIP_TAGS:
self._skip_depth += 1
elif tag in self.BLOCK_TAGS:
self._pieces.append('\n')
elif tag == 'li':
self._pieces.append('\n⢠')
def handle_endtag(self, tag):
tag = tag.lower()
if tag in self.SKIP_TAGS:
self._skip_depth = max(0, self._skip_depth - 1)
elif tag in self.BLOCK_TAGS:
self._pieces.append('\n')
def handle_data(self, data):
if self._skip_depth > 0:
return
self._pieces.append(data)
def get_text(self):
text = ''.join(self._pieces)
text = re.sub(r'[ \t]+', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
def html_to_text(html: str) -> str:
"""Convert HTML to clean plaintext."""
if not html or '<' not in html:
return html or ''
parser = _HTMLToText()
try:
parser.feed(html)
return parser.get_text()
except Exception:
return html
def decode_email_header(header: Optional[str]) -> str:
"""Decode MIME-encoded email headers."""
if not header:
return ""
parts = decode_header(header)
result = []
for content, charset in parts:
if isinstance(content, bytes):
try:
result.append(content.decode(charset or 'utf-8', errors='replace'))
except Exception:
result.append(content.decode('utf-8', errors='replace'))
else:
result.append(content)
return ''.join(result)
def fetch_latest_emails(limit: int = 10, unread_only: bool = False) -> list[dict]:
"""Fetch latest emails from iCloud IMAP.
Returns list of dicts with:
- uid: str
- subject: str
- from: str
- date: str (ISO)
- body_text: str
- is_unread: bool
"""
if not ICLOUD_APP_PASSWORD:
raise ValueError("ICLOUD_APP_PASSWORD not set")
emails = []
# Connect to iCloud IMAP
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
mail.login(ICLOUD_EMAIL, ICLOUD_APP_PASSWORD)
try:
# Select inbox
status, _ = mail.select("INBOX")
if status != "OK":
raise ConnectionError("Could not select INBOX")
# Search for emails
if unread_only:
status, messages = mail.search(None, "UNSEEN")
else:
status, messages = mail.search(None, "ALL")
if status != "OK" or not messages[0]:
return []
# Get UIDs in reverse order (newest first)
uids_str = messages[0].decode() if isinstance(messages[0], bytes) else str(messages[0])
uids = uids_str.split()
if not uids:
return []
uids = uids[-limit:] # Get last N
for uid in reversed(uids): # Reverse to get newest first
status, msg_data = mail.fetch(uid, "(RFC822)")
if status != "OK":
continue
# Handle fetch response structure
if not msg_data or not msg_data[0]:
continue
if isinstance(msg_data[0], tuple):
raw_email = msg_data[0][1]
elif isinstance(msg_data[0], bytes):
# Sometimes iCloud returns bytes directly
raw_email = msg_data[0]
else:
continue
msg = email.message_from_bytes(raw_email)
# Parse headers
subject = decode_email_header(msg.get("Subject", ""))
from_addr = decode_email_header(msg.get("From", ""))
date_str = msg.get("Date", "")
# Parse body
body_text = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
try:
body_text = part.get_payload(decode=True).decode('utf-8', errors='replace')
break
except Exception:
continue
elif content_type == "text/html" and not body_text:
try:
html = part.get_payload(decode=True).decode('utf-8', errors='replace')
body_text = html_to_text(html)
except Exception:
continue
else:
content_type = msg.get_content_type()
try:
payload = msg.get_payload(decode=True)
if payload:
if content_type == "text/html":
body_text = html_to_text(payload.decode('utf-8', errors='replace'))
else:
body_text = payload.decode('utf-8', errors='replace')
except Exception:
pass
# Check if unread (not \Seen flag)
status, flags = mail.fetch(uid, "(FLAGS)")
is_unread = b"\\Seen" not in (flags[0] if flags and flags[0] else b"")
emails.append({
"uid": uid,
"subject": subject,
"from": from_addr,
"date": date_str,
"body_text": body_text[:5000], # Limit size
"is_unread": is_unread
})
finally:
mail.logout()
return emails
if name == "main":
# Test the fetcher
import json
try:
emails = fetch_latest_emails(limit=5, unread_only=False)
print(f"Fetched {len(emails)} emails")
for e in emails:
print(f"\n--- {e['subject'][:60]} ---")
print(f"From: {e['from'][:50]}")
print(f"Unread: {e['is_unread']}")
print(f"Body preview: {e['body_text'][:200]}...")
except Exception as ex:
print(f"Error: {ex}")