"""Gmail IMAP email fetching.""" import imaplib import email import re from email.header import decode_header from html.parser import HTMLParser from family_assistant.config import ( GMAIL_USER, GMAIL_APP_PASSWORD, IMAP_SERVER, IMAP_PORT, ) class _HTMLToText(HTMLParser): """Lightweight HTML-to-plaintext converter. Strips tags, collapses whitespace, and adds newlines around block elements so the LLM gets clean text instead of 44KB of markup noise. """ BLOCK_TAGS = frozenset([ 'p', 'div', 'br', 'hr', 'tr', 'li', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'blockquote', 'table', 'section', 'article', 'header', 'footer', 'main', 'aside', 'details', 'summary', 'figure', 'figcaption', ]) SKIP_TAGS = frozenset(['style', 'script', 'head']) def __init__(self): super().__init__() self._pieces = [] self._skip_depth = 0 def handle_starttag(self, tag, attrs): tag = tag.lower() if tag in self.SKIP_TAGS: self._skip_depth += 1 elif tag in self.BLOCK_TAGS: self._pieces.append('\n') elif tag == 'li': self._pieces.append('\n• ') def handle_endtag(self, tag): tag = tag.lower() if tag in self.SKIP_TAGS: self._skip_depth = max(0, self._skip_depth - 1) elif tag in self.BLOCK_TAGS: self._pieces.append('\n') def handle_data(self, data): if self._skip_depth > 0: return self._pieces.append(data) def handle_entityref(self, name): if self._skip_depth > 0: return entities = {'nbsp': ' ', 'amp': '&', 'lt': '<', 'gt': '>', '#39': "'", 'quot': '"'} self._pieces.append(entities.get(name, f'&{name};')) def handle_charref(self, name): if self._skip_depth > 0: return self._pieces.append(f'&#{name};') def get_text(self): text = ''.join(self._pieces) # Collapse runs of whitespace but keep intentional newlines text = re.sub(r'[ \t]+', ' ', text) text = re.sub(r'\n{3,}', '\n\n', text) return text.strip() def html_to_text(html): """Convert HTML to clean plaintext for LLM consumption.""" if not html or '<' not in html: return html or '' parser = _HTMLToText() parser.feed(html) return parser.get_text() def decode_str(s): """Decode an email header value, handling encoded words.""" if s is None: return "" decoded = decode_header(s) parts = [] for data, charset in decoded: if isinstance(data, bytes): parts.append(data.decode(charset or "utf-8", errors="replace")) else: parts.append(data) return "".join(parts) def get_body(msg): """Extract the text body from an email message. Prefers text/plain, falls back to text/html (converted to plaintext). """ if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) charset = part.get_content_charset() or "utf-8" return payload.decode(charset, errors="replace") if payload else "" for part in msg.walk(): if part.get_content_type() == "text/html": payload = part.get_payload(decode=True) charset = part.get_content_charset() or "utf-8" html = payload.decode(charset, errors="replace") if payload else "" return html_to_text(html) else: payload = msg.get_payload(decode=True) charset = msg.get_content_charset() or "utf-8" text = payload.decode(charset, errors="replace") if payload else "" # If the single part is HTML, convert it if msg.get_content_type() == "text/html": return html_to_text(text) return text def fetch_unread(mark_read=False): """Fetch unread emails from Gmail via IMAP. If mark_read=True, marks all fetched emails as seen after retrieval. This prevents re-processing the same emails on subsequent runs. """ if not GMAIL_APP_PASSWORD: return {"error": "GMAIL_APP_PASSWORD env var not set"} mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) try: mail.login(GMAIL_USER, GMAIL_APP_PASSWORD) mail.select("INBOX") # Only fetch truly unread emails — NEVER fall back to ALL status, message_ids = mail.search(None, "UNSEEN") if status != "OK": return {"emails": [], "count": 0} id_list = message_ids[0].split() if message_ids[0] else [] if not id_list: return {"emails": [], "count": 0} emails = [] for msg_id in id_list: status, msg_data = mail.fetch(msg_id, "(RFC822)") if status != "OK": continue raw = msg_data[0][1] msg = email.message_from_bytes(raw) emails.append({ "id": msg_id.decode(), "from": decode_str(msg.get("From")), "to": decode_str(msg.get("To")), "subject": decode_str(msg.get("Subject")), "date": msg.get("Date"), "body": get_body(msg).strip(), }) # Mark all fetched emails as read to prevent re-processing if mark_read: for msg_id in id_list: mail.store(msg_id, "+FLAGS", "\\Seen") return {"emails": emails, "count": len(emails)} finally: mail.logout() def fetch_since(since_date: str, max_emails: int = 100) -> dict: """Fetch emails received since a given IMAP date string. Used for backfilling the Family Brain with recently processed emails. Does NOT mark emails as read — they were already processed. Args: since_date: IMAP date string, e.g. "17-Mar-2026" max_emails: Maximum emails to fetch Returns: Dict with list of emails and count. """ if not GMAIL_APP_PASSWORD: return {"error": "GMAIL_APP_PASSWORD env var not set"} mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) try: mail.login(GMAIL_USER, GMAIL_APP_PASSWORD) mail.select("INBOX") status, message_ids = mail.search(None, f'SINCE "{since_date}"') if status != "OK": return {"emails": [], "count": 0} id_list = message_ids[0].split() if message_ids[0] else [] # Process newest first for backfill id_list = list(reversed(id_list))[:max_emails] if not id_list: return {"emails": [], "count": 0} emails = [] for msg_id in id_list: status, msg_data = mail.fetch(msg_id, "(RFC822)") if status != "OK": continue raw = msg_data[0][1] msg = email.message_from_bytes(raw) emails.append({ "id": msg_id.decode(), "from": decode_str(msg.get("From")), "to": decode_str(msg.get("To")), "subject": decode_str(msg.get("Subject")), "date": msg.get("Date"), "body": get_body(msg).strip(), }) return {"emails": emails, "count": len(emails)} finally: mail.logout()