šŸ“„ email_fetcher.py 7,342 bytes Apr 18, 2026 šŸ“‹ Raw

"""Gmail IMAP email fetching."""

import imaplib
import email
import re
from email.header import decode_header
from html.parser import HTMLParser

from family_assistant.config import (
GMAIL_USER,
GMAIL_APP_PASSWORD,
IMAP_SERVER,
IMAP_PORT,
)

class _HTMLToText(HTMLParser):
"""Lightweight HTML-to-plaintext converter.

Strips tags, collapses whitespace, and adds newlines around block elements
so the LLM gets clean text instead of 44KB of markup noise.
"""

BLOCK_TAGS = frozenset([
    'p', 'div', 'br', 'hr', 'tr', 'li', 'h1', 'h2', 'h3', 'h4',
    'h5', 'h6', 'blockquote', 'table', 'section', 'article', 'header',
    'footer', 'main', 'aside', 'details', 'summary', 'figure', 'figcaption',
])
SKIP_TAGS = frozenset(['style', 'script', 'head'])

def __init__(self):
    super().__init__()
    self._pieces = []
    self._skip_depth = 0

def handle_starttag(self, tag, attrs):
    tag = tag.lower()
    if tag in self.SKIP_TAGS:
        self._skip_depth += 1
    elif tag in self.BLOCK_TAGS:
        self._pieces.append('\n')
    elif tag == 'li':
        self._pieces.append('\n• ')

def handle_endtag(self, tag):
    tag = tag.lower()
    if tag in self.SKIP_TAGS:
        self._skip_depth = max(0, self._skip_depth - 1)
    elif tag in self.BLOCK_TAGS:
        self._pieces.append('\n')

def handle_data(self, data):
    if self._skip_depth > 0:
        return
    self._pieces.append(data)

def handle_entityref(self, name):
    if self._skip_depth > 0:
        return
    entities = {'nbsp': ' ', 'amp': '&', 'lt': '<', 'gt': '>', '#39': "'", 'quot': '"'}
    self._pieces.append(entities.get(name, f'&{name};'))

def handle_charref(self, name):
    if self._skip_depth > 0:
        return
    self._pieces.append(f'&#{name};')

def get_text(self):
    text = ''.join(self._pieces)
    # Collapse runs of whitespace but keep intentional newlines
    text = re.sub(r'[ \t]+', ' ', text)
    text = re.sub(r'\n{3,}', '\n\n', text)
    return text.strip()

def html_to_text(html):
"""Convert HTML to clean plaintext for LLM consumption."""
if not html or '<' not in html:
return html or ''
parser = _HTMLToText()
parser.feed(html)
return parser.get_text()

def decode_str(s):
"""Decode an email header value, handling encoded words."""
if s is None:
return ""
decoded = decode_header(s)
parts = []
for data, charset in decoded:
if isinstance(data, bytes):
parts.append(data.decode(charset or "utf-8", errors="replace"))
else:
parts.append(data)
return "".join(parts)

def get_body(msg):
"""Extract the text body from an email message.

Prefers text/plain, falls back to text/html (converted to plaintext).
"""
if msg.is_multipart():
    for part in msg.walk():
        if part.get_content_type() == "text/plain":
            payload = part.get_payload(decode=True)
            charset = part.get_content_charset() or "utf-8"
            return payload.decode(charset, errors="replace") if payload else ""
    for part in msg.walk():
        if part.get_content_type() == "text/html":
            payload = part.get_payload(decode=True)
            charset = part.get_content_charset() or "utf-8"
            html = payload.decode(charset, errors="replace") if payload else ""
            return html_to_text(html)
else:
    payload = msg.get_payload(decode=True)
    charset = msg.get_content_charset() or "utf-8"
    text = payload.decode(charset, errors="replace") if payload else ""
    # If the single part is HTML, convert it
    if msg.get_content_type() == "text/html":
        return html_to_text(text)
    return text

def fetch_unread(mark_read=False):
"""Fetch unread emails from Gmail via IMAP.

If mark_read=True, marks all fetched emails as seen after retrieval.
This prevents re-processing the same emails on subsequent runs.
"""
if not GMAIL_APP_PASSWORD:
    return {"error": "GMAIL_APP_PASSWORD env var not set"}

mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
try:
    mail.login(GMAIL_USER, GMAIL_APP_PASSWORD)
    mail.select("INBOX")

    # Only fetch truly unread emails — NEVER fall back to ALL
    status, message_ids = mail.search(None, "UNSEEN")
    if status != "OK":
        return {"emails": [], "count": 0}

    id_list = message_ids[0].split() if message_ids[0] else []
    if not id_list:
        return {"emails": [], "count": 0}

    emails = []
    for msg_id in id_list:
        status, msg_data = mail.fetch(msg_id, "(RFC822)")
        if status != "OK":
            continue
        raw = msg_data[0][1]
        msg = email.message_from_bytes(raw)
        emails.append({
            "id": msg_id.decode(),
            "from": decode_str(msg.get("From")),
            "to": decode_str(msg.get("To")),
            "subject": decode_str(msg.get("Subject")),
            "date": msg.get("Date"),
            "body": get_body(msg).strip(),
        })

    # Mark all fetched emails as read to prevent re-processing
    if mark_read:
        for msg_id in id_list:
            mail.store(msg_id, "+FLAGS", "\\Seen")

    return {"emails": emails, "count": len(emails)}
finally:
    mail.logout()

def fetch_since(since_date: str, max_emails: int = 100) -> dict:
"""Fetch emails received since a given IMAP date string.

Used for backfilling the Family Brain with recently processed emails.
Does NOT mark emails as read — they were already processed.

Args:
    since_date: IMAP date string, e.g. "17-Mar-2026"
    max_emails: Maximum emails to fetch

Returns:
    Dict with list of emails and count.
"""
if not GMAIL_APP_PASSWORD:
    return {"error": "GMAIL_APP_PASSWORD env var not set"}

mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
try:
    mail.login(GMAIL_USER, GMAIL_APP_PASSWORD)
    mail.select("INBOX")

    status, message_ids = mail.search(None, f'SINCE "{since_date}"')
    if status != "OK":
        return {"emails": [], "count": 0}

    id_list = message_ids[0].split() if message_ids[0] else []
    # Process newest first for backfill
    id_list = list(reversed(id_list))[:max_emails]

    if not id_list:
        return {"emails": [], "count": 0}

    emails = []
    for msg_id in id_list:
        status, msg_data = mail.fetch(msg_id, "(RFC822)")
        if status != "OK":
            continue
        raw = msg_data[0][1]
        msg = email.message_from_bytes(raw)
        emails.append({
            "id": msg_id.decode(),
            "from": decode_str(msg.get("From")),
            "to": decode_str(msg.get("To")),
            "subject": decode_str(msg.get("Subject")),
            "date": msg.get("Date"),
            "body": get_body(msg).strip(),
        })

    return {"emails": emails, "count": len(emails)}
finally:
    mail.logout()