!/usr/bin/env python3
"""
Fetch unread emails from Gmail via IMAP using an App Password.
Returns structured JSON for downstream processing.
"""
import imaplib
import email
import json
import os
import sys
from email.header import decode_header
from datetime import datetime
GMAIL_USER = os.environ.get("GMAIL_USER", "hoffmann.family.manager@gmail.com")
GMAIL_APP_PASSWORD = os.environ.get("GMAIL_APP_PASSWORD")
if not GMAIL_APP_PASSWORD:
print(json.dumps({"error": "GMAIL_APP_PASSWORD env var not set"}))
sys.exit(1)
IMAP_SERVER = "imap.gmail.com"
IMAP_PORT = 993
def decode_str(s):
"""Decode email header value."""
if s is None:
return ""
decoded = decode_header(s)
parts = []
for data, charset in decoded:
if isinstance(data, bytes):
parts.append(data.decode(charset or "utf-8", errors="replace"))
else:
parts.append(data)
return "".join(parts)
def get_body(msg):
"""Extract plain text body from email message."""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace")
# Fallback: try HTML
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/html":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace")
else:
payload = msg.get_payload(decode=True)
charset = msg.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace") if payload else ""
def fetch_unread():
"""Connect to Gmail, fetch unread messages, return as JSON."""
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
try:
mail.login(GMAIL_USER, GMAIL_APP_PASSWORD)
mail.select("INBOX")
# Search for unread messages
status, message_ids = mail.search(None, "UNSEEN")
if status != "OK":
print(json.dumps({"error": "Search failed", "status": status}))
return
id_list = message_ids[0].split()
if not id_list:
print(json.dumps({"emails": [], "count": 0}))
return
emails = []
for msg_id in id_list:
status, msg_data = mail.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
email_entry = {
"id": msg_id.decode(),
"from": decode_str(msg.get("From")),
"to": decode_str(msg.get("To")),
"subject": decode_str(msg.get("Subject")),
"date": msg.get("Date"),
"body": get_body(msg).strip(),
}
emails.append(email_entry)
print(json.dumps({"emails": emails, "count": len(emails)}, indent=2))
finally:
mail.logout()
if name == "main":
try:
fetch_unread()
except imaplib.IMAP4.error as e:
print(json.dumps({"error": f"IMAP error: {e}"}))
sys.exit(1)
except Exception as e:
print(json.dumps({"error": f"Unexpected: {e}"}))
sys.exit(1)