"""Gmail IMAP email fetching."""
import imaplib
import email
import re
from email.header import decode_header
from html.parser import HTMLParser
from family_assistant.config import (
GMAIL_USER,
GMAIL_APP_PASSWORD,
IMAP_SERVER,
IMAP_PORT,
)
class _HTMLToText(HTMLParser):
"""Lightweight HTML-to-plaintext converter.
Strips tags, collapses whitespace, and adds newlines around block elements
so the LLM gets clean text instead of 44KB of markup noise.
"""
BLOCK_TAGS = frozenset([
'p', 'div', 'br', 'hr', 'tr', 'li', 'h1', 'h2', 'h3', 'h4',
'h5', 'h6', 'blockquote', 'table', 'section', 'article', 'header',
'footer', 'main', 'aside', 'details', 'summary', 'figure', 'figcaption',
])
SKIP_TAGS = frozenset(['style', 'script', 'head'])
def __init__(self):
super().__init__()
self._pieces = []
self._skip_depth = 0
def handle_starttag(self, tag, attrs):
tag = tag.lower()
if tag in self.SKIP_TAGS:
self._skip_depth += 1
elif tag in self.BLOCK_TAGS:
self._pieces.append('\n')
elif tag == 'li':
self._pieces.append('\n⢠')
def handle_endtag(self, tag):
tag = tag.lower()
if tag in self.SKIP_TAGS:
self._skip_depth = max(0, self._skip_depth - 1)
elif tag in self.BLOCK_TAGS:
self._pieces.append('\n')
def handle_data(self, data):
if self._skip_depth > 0:
return
self._pieces.append(data)
def handle_entityref(self, name):
if self._skip_depth > 0:
return
entities = {'nbsp': ' ', 'amp': '&', 'lt': '<', 'gt': '>', '#39': "'", 'quot': '"'}
self._pieces.append(entities.get(name, f'&{name};'))
def handle_charref(self, name):
if self._skip_depth > 0:
return
self._pieces.append(f'&#{name};')
def get_text(self):
text = ''.join(self._pieces)
# Collapse runs of whitespace but keep intentional newlines
text = re.sub(r'[ \t]+', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
def html_to_text(html):
"""Convert HTML to clean plaintext for LLM consumption."""
if not html or '<' not in html:
return html or ''
parser = _HTMLToText()
parser.feed(html)
return parser.get_text()
def decode_str(s):
"""Decode an email header value, handling encoded words."""
if s is None:
return ""
decoded = decode_header(s)
parts = []
for data, charset in decoded:
if isinstance(data, bytes):
parts.append(data.decode(charset or "utf-8", errors="replace"))
else:
parts.append(data)
return "".join(parts)
def get_body(msg):
"""Extract the text body from an email message.
Prefers text/plain, falls back to text/html (converted to plaintext).
"""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace") if payload else ""
for part in msg.walk():
if part.get_content_type() == "text/html":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
html = payload.decode(charset, errors="replace") if payload else ""
return html_to_text(html)
else:
payload = msg.get_payload(decode=True)
charset = msg.get_content_charset() or "utf-8"
text = payload.decode(charset, errors="replace") if payload else ""
# If the single part is HTML, convert it
if msg.get_content_type() == "text/html":
return html_to_text(text)
return text
def fetch_unread(mark_read=False):
"""Fetch unread emails from Gmail via IMAP.
If mark_read=True, marks all fetched emails as seen after retrieval.
This prevents re-processing the same emails on subsequent runs.
"""
if not GMAIL_APP_PASSWORD:
return {"error": "GMAIL_APP_PASSWORD env var not set"}
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
try:
mail.login(GMAIL_USER, GMAIL_APP_PASSWORD)
mail.select("INBOX")
# Only fetch truly unread emails ā NEVER fall back to ALL
status, message_ids = mail.search(None, "UNSEEN")
if status != "OK":
return {"emails": [], "count": 0}
id_list = message_ids[0].split() if message_ids[0] else []
if not id_list:
return {"emails": [], "count": 0}
emails = []
for msg_id in id_list:
status, msg_data = mail.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
emails.append({
"id": msg_id.decode(),
"from": decode_str(msg.get("From")),
"to": decode_str(msg.get("To")),
"subject": decode_str(msg.get("Subject")),
"date": msg.get("Date"),
"body": get_body(msg).strip(),
})
# Mark all fetched emails as read to prevent re-processing
if mark_read:
for msg_id in id_list:
mail.store(msg_id, "+FLAGS", "\\Seen")
return {"emails": emails, "count": len(emails)}
finally:
mail.logout()
def fetch_since(since_date: str, max_emails: int = 100) -> dict:
"""Fetch emails received since a given IMAP date string.
Used for backfilling the Family Brain with recently processed emails.
Does NOT mark emails as read ā they were already processed.
Args:
since_date: IMAP date string, e.g. "17-Mar-2026"
max_emails: Maximum emails to fetch
Returns:
Dict with list of emails and count.
"""
if not GMAIL_APP_PASSWORD:
return {"error": "GMAIL_APP_PASSWORD env var not set"}
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
try:
mail.login(GMAIL_USER, GMAIL_APP_PASSWORD)
mail.select("INBOX")
status, message_ids = mail.search(None, f'SINCE "{since_date}"')
if status != "OK":
return {"emails": [], "count": 0}
id_list = message_ids[0].split() if message_ids[0] else []
# Process newest first for backfill
id_list = list(reversed(id_list))[:max_emails]
if not id_list:
return {"emails": [], "count": 0}
emails = []
for msg_id in id_list:
status, msg_data = mail.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
emails.append({
"id": msg_id.decode(),
"from": decode_str(msg.get("From")),
"to": decode_str(msg.get("To")),
"subject": decode_str(msg.get("Subject")),
"date": msg.get("Date"),
"body": get_body(msg).strip(),
})
return {"emails": emails, "count": len(emails)}
finally:
mail.logout()