"""Pipeline orchestration: fetch emails → classify → parse → route to calendar/reminder/info."""
import json
import os
import sys
import traceback
from datetime import datetime, timedelta, date
from zoneinfo import ZoneInfo
from family_assistant.config import CHICAGO_TZ, GMAIL_APP_PASSWORD
from family_assistant.email_fetcher import fetch_unread
from family_assistant.appointment_parser import parse_email_with_llm
from family_assistant.newsletter_parser import classify_email, parse_newsletter_with_llm
from family_assistant.family_brain import ingest_email, ingest_newsletter
from family_assistant.calendar_sync import (
event_exists,
create_event,
create_recurring_event,
find_and_cancel_event,
)
from family_assistant.conflict_engine import detect_conflicts
from family_assistant.hermes import push_pipeline_results, hermes_notify, _send_telegram
---------------------------------------------------------------------------
Auth Failure Circuit Breaker
---------------------------------------------------------------------------
Tracks consecutive auth failures to prevent error spam.
After MAX_AUTH_FAILURES consecutive failures, the IMAP pipeline is paused
and Matt is alerted once. Resets on first success or manual reset.
_AUTH_FAILURE_FILE = os.path.expanduser("~/.family_assistant/auth_circuit_breaker.json")
MAX_AUTH_FAILURES = 3 # Pause after this many consecutive failures
def _read_circuit_breaker():
"""Read circuit breaker state from disk."""
try:
with open(_AUTH_FAILURE_FILE, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {"consecutive_failures": 0, "paused": False, "last_alert_ts": None}
def _write_circuit_breaker(state):
"""Write circuit breaker state to disk."""
os.makedirs(os.path.dirname(_AUTH_FAILURE_FILE), exist_ok=True)
with open(_AUTH_FAILURE_FILE, "w") as f:
json.dump(state, f, indent=2)
def _record_auth_failure(error_msg: str) -> bool:
"""Record an auth failure. Returns True if circuit breaker just tripped.
After MAX_AUTH_FAILURES consecutive failures, sets paused=True and
alerts Matt once. Subsequent calls return False (already paused).
"""
state = _read_circuit_breaker()
state["consecutive_failures"] = state.get("consecutive_failures", 0) + 1
just_tripped = False
if state["consecutive_failures"] >= MAX_AUTH_FAILURES and not state.get("paused"):
state["paused"] = True
just_tripped = True
_write_circuit_breaker(state)
if just_tripped:
_alert_admin(
"🔴 IMAP Circuit Breaker Tripped",
f"Gmail auth failed {state['consecutive_failures']} times in a row. "
f"IMAP pipeline is now PAUSED to stop error spam.\n\n"
f"To reset: python -m family_assistant reset-circuit\n"
f"Or: rm {_AUTH_FAILURE_FILE}",
)
return just_tripped
def _record_auth_success():
"""Record a successful auth — resets the circuit breaker."""
state = _read_circuit_breaker()
if state.get("consecutive_failures", 0) > 0 or state.get("paused"):
was_paused = state.get("paused", False)
state["consecutive_failures"] = 0
state["paused"] = False
_write_circuit_breaker(state)
if was_paused:
_alert_admin(
"🟢 IMAP Circuit Breaker Reset",
"Gmail auth succeeded. IMAP pipeline is back online.",
)
def is_imap_paused() -> bool:
"""Check if the IMAP pipeline is paused due to auth failures."""
return _read_circuit_breaker().get("paused", False)
def reset_circuit_breaker():
"""Manually reset the circuit breaker."""
state = {"consecutive_failures": 0, "paused": False, "last_alert_ts": None}
_write_circuit_breaker(state)
print("✅ Circuit breaker reset. IMAP pipeline re-enabled.")
def _resolve_location(location_str):
"""Resolve a location string through the cache to get the canonical name.
Prevents LLM hallucinations like 'at Golrusk' → 'PetSmart' by preferring
the known-locations cache. Returns the canonical cached name if found,
otherwise returns the original string unchanged.
"""
if not location_str or not location_str.strip():
return location_str
try:
from family_assistant.location_cache import resolve
result = resolve(location_str.strip(), use_home_bias=True)
if result and result.get("name"):
return result["name"]
except Exception:
pass
return location_str
from family_assistant.url_fetcher import enrich_body_with_urls
---------------------------------------------------------------------------
Global error handler — no silent deaths
---------------------------------------------------------------------------
_DEV_TARGET = os.environ.get("TELEGRAM_DEV_ID", "")
def alert_admin(title: str, detail: str):
"""Push an error alert to Matt's DM. Best-effort, never raises."""
if not _DEV_TARGET:
print(f" [ALERT] {title}: {detail}", file=sys.stderr)
return
# Strip markdown special characters from detail to prevent Telegram parse failures.
# Brackets, backticks, asterisks, underscores in error strings break Markdown mode.
import re
safe_detail = re.sub(r'[[]`]', '', detail[:800])
msg = f"⚠️ {title}*\n\n{safe_detail}"
try:
_send_telegram(msg, target=_DEV_TARGET)
except Exception:
print(f" [ALERT] Failed to send admin alert: {title}", file=sys.stderr)
def _serialize_result(parsed):
"""Convert datetime objects and other non-serializable types to strings."""
result = {}
for key, value in parsed.items():
if isinstance(value, datetime):
result[key] = value.isoformat()
elif isinstance(value, date) and not isinstance(value, datetime):
result[key] = value.isoformat()
elif isinstance(value, list):
result[key] = [
v.isoformat() if isinstance(v, (datetime, date)) else v
for v in value
]
elif isinstance(value, dict):
result[key] = {
k: v.isoformat() if isinstance(v, (datetime, date)) else v
for k, v in value.items()
}
else:
result[key] = value
return result
def _triage_email(subject, body, from_addr):
"""Classify an email as 'appointment' or 'newsletter'.
Uses the local LLM to make the classification. Falls back to 'appointment'
on errors (simpler parsing path, existing parser handles it).
"""
try:
return classify_email(subject, body, from_addr)
except Exception as e:
print(f" [Triage] Classification error: {e}, defaulting to appointment", file=sys.stderr)
return "appointment"
def _route_newsletter_items(items, dry_run=False, notify=True, email_subject=""):
"""Route newsletter items to their appropriate destinations.
Routing based on relevance:
- high relevance events/reminders/actions → Google Calendar
- low relevance items of any type → Telegram Info Digest (no calendar)
- info items → Telegram Info Digest (regardless of relevance)
- rejected items (per family.yaml rules) → skipped with note
Returns a dict with routed items and any errors.
"""
result = {
"events_created": [],
"reminders_created": [],
"actions_created": [],
"info_items": [], # high-relevance info + low-relevance items of any type
"low_relevance_items": [], # explicit low-relevance tracking
"rejected_items": [], # items filtered by rejection rules
"auto_filtered": [], # items shadow-filtered to low relevance
"errors": [],
}
# Shadow-filter rejected items — they stay in the list but get flagged
# as low_relevance with a note about which rule caught them.
# The user still sees what was filtered in the digest and can restore items.
from .rejection_engine import shadow_filter_items, should_reject
items = shadow_filter_items(items, scope="newsletter")
# Collect auto-filtered items for notification
for item in items:
if item.get("_auto_filtered"):
who_str = ", ".join(item.get("who", []))
result["auto_filtered"].append({
"type": item.get("original_type", "?"),
"summary": item.get("summary", ""),
"who": who_str,
"rule": item.get("reason", ""),
})
print(f" Auto-filtered: {item.get('summary', '?')} → low relevance", file=sys.stderr)
for item in items:
item_type = item.get("type", "info")
relevance = item.get("relevance", "high")
reason = item.get("reason", "No reason provided")
# LOW RELEVANCE → always goes to Telegram Info Digest, never calendar
if relevance == "low":
who_str = ", ".join(item.get("who", []))
result["low_relevance_items"].append({
"type": item_type,
"summary": item.get("summary", ""),
"who": who_str,
"description": item.get("description", ""),
"reason": reason,
})
# Also add to info_items for Telegram summary
result["info_items"].append({
"summary": f"[{item_type}] {item['summary']}",
"who": who_str,
"description": f"{item.get('description', '')} (low relevance: {reason})",
})
continue
# INFO type → Telegram Info Digest regardless of relevance
if item_type == "info":
who_str = ", ".join(item.get("who", []))
result["info_items"].append({
"summary": item["summary"],
"who": who_str,
"description": item.get("description", ""),
})
continue
# HIGH RELEVANCE items → route to calendar
if item_type == "event":
# Same path as appointments — create on calendar with dedup
start_dt = item.get("start")
if not start_dt:
result["errors"].append(f"Event missing start: {item.get('summary', '?')}")
continue
end_dt = item.get("end")
duration_minutes = item.get("duration_minutes", 60)
if not end_dt:
end_dt = start_dt + timedelta(minutes=duration_minutes)
who_str = ", ".join(item.get("who", []))
summary = item["summary"]
if who_str:
summary = f"{summary} ({who_str})"
if dry_run:
result["events_created"].append({
"status": "DRY_RUN",
"summary": summary,
"start": start_dt.isoformat(),
"end": end_dt.isoformat(),
"location": _resolve_location(item.get("location", "")),
})
continue
existing = event_exists(summary, start_dt)
if existing:
result["events_created"].append({
"status": "DUPLICATE_SKIPPED",
"summary": existing.get("summary", summary),
"start": existing["start"].get("dateTime", start_dt.isoformat()),
"end": existing["end"].get("dateTime", end_dt.isoformat()),
"existing_id": existing["id"],
})
print(f" Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr)
continue
try:
# Check if this is a recurring event
is_recurring = item.get("is_recurring", False)
recurrence = item.get("recurrence") if is_recurring else None
if is_recurring and recurrence:
event = create_recurring_event(
summary=summary,
start_dt=start_dt,
end_dt=end_dt,
recurrence=recurrence,
description=item.get("description", ""),
location=_resolve_location(item.get("location", "")),
)
else:
event = create_event(
summary=summary,
start_dt=start_dt,
end_dt=end_dt,
description=item.get("description", ""),
location=_resolve_location(item.get("location", "")),
)
result["events_created"].append({
"status": "CREATED",
"id": event["id"],
"summary": event.get("summary", summary),
"start": event["start"].get("dateTime", start_dt.isoformat()),
"end": event["end"].get("dateTime", end_dt.isoformat()),
"link": event.get("htmlLink", ""),
"recurring": is_recurring,
"location": _resolve_location(item.get("location", "")),
"_claimed_day_of_week": item.get("claimed_day_of_week", ""),
})
# Immediate per-event success notification
if notify and not dry_run:
hermes_notify(result["events_created"][-1], email_subject=email_subject)
except Exception as e:
result["errors"].append(f"Calendar create error: {e}")
elif item_type == "reminder":
# All-day event with [REMINDER] prefix
due = item.get("due")
who_str = ", ".join(item.get("who", []))
summary = f"[REMINDER] {item['summary']}"
if who_str:
summary += f" ({who_str})"
if due:
due_str = due.isoformat() if isinstance(due, date) else str(due)
else:
due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d")
description = item.get("description", "")
if due:
description = f"Due: {due_str}\n" + description
if dry_run:
result["reminders_created"].append({
"status": "DRY_RUN",
"summary": summary,
"date": due_str,
})
continue
try:
# Create as all-day event (date only, no time)
event = create_event(
summary=summary,
start_dt=due_str, # date string for all-day event
end_dt=due_str,
description=description,
)
result["reminders_created"].append({
"status": "CREATED",
"id": event["id"],
"summary": event.get("summary", summary),
"date": due_str,
})
except Exception as e:
result["errors"].append(f"Reminder create error: {e}")
elif item_type == "action_item":
# All-day event with [ACTION] prefix
due = item.get("due")
who_str = ", ".join(item.get("who", []))
summary = f"[ACTION] {item['summary']}"
if who_str:
summary += f" ({who_str})"
if due:
due_str = due.isoformat() if isinstance(due, date) else str(due)
else:
due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d")
description = item.get("description", "")
if due:
description = f"Due: {due_str}\n" + description
if dry_run:
result["actions_created"].append({
"status": "DRY_RUN",
"summary": summary,
"date": due_str,
"description": description,
})
continue
try:
event = create_event(
summary=summary,
start_dt=due_str,
end_dt=due_str,
description=description,
)
result["actions_created"].append({
"status": "CREATED",
"id": event["id"],
"summary": event.get("summary", summary),
"date": due_str,
"description": description,
})
except Exception as e:
result["errors"].append(f"Action create error: {e}")
return result
def format_info_summary(info_items, low_relevance_items=None, source_subject=""):
"""Format info items into a Telegram-friendly summary string."""
if not info_items and not low_relevance_items:
return ""
lines = [f"📋 Newsletter Summary{': ' + source_subject if source_subject else ''}"]
for item in info_items:
who = f" ({item['who']})" if item.get('who') else ""
lines.append(f" • {item['summary']}{who}: {item['description']}")
if low_relevance_items:
lines.append("\n 📌 Low relevance (not added to calendar):")
for item in low_relevance_items:
who = f" ({item['who']})" if item.get('who') else ""
lines.append(f" • [{item.get('type', '?')}] {item['summary']}{who} — {item.get('reason', '')}")
return "\n".join(lines)
def process_emails(dry_run=False, notify=True, quiet=False):
"""Fetch unread emails, parse appointments via LLM, create calendar events.
All fetched emails are marked as read after processing, regardless of
whether they contained appointments. This prevents re-processing the same
emails on subsequent heartbeat runs.
Includes auth failure circuit breaker: after 3 consecutive Gmail auth
failures, the IMAP pipeline is paused to prevent error spam.
Args:
dry_run: If True, don't create calendar events or mark emails read.
notify: If True, push results to Telegram via Hermes.
quiet: If True, suppress family group notifications (only DM alerts).
"""
# Check circuit breaker first — skip IMAP if paused
if is_imap_paused():
return {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": ["IMAP_PAUSED: Gmail auth circuit breaker is tripped. Run 'family-assistant reset-circuit' to re-enable."],
}
# Skip IMAP entirely if no credentials configured (webhook-only mode)
if not GMAIL_APP_PASSWORD:
return {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": ["IMAP_SKIPPED: No Gmail credentials (webhook-only mode)"],
}
try:
result = _process_emails_inner(dry_run=dry_run, notify=notify, quiet=quiet)
# Success — reset circuit breaker
_record_auth_success()
return result
except Exception as e:
# Check if this is an auth failure
error_str = str(e).lower()
is_auth_failure = any(
sig in error_str
for sig in ["authentication", "invalid_grant", "credentials",
"auth failed", "login failed", "[auth]",
"[alert]", "suspen", "disabled", "access denied"]
)
if is_auth_failure:
just_tripped = _record_auth_failure(str(e)[:200])
# Whether we just tripped or are still counting — return silently
return {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [
"IMAP_PAUSED: circuit breaker tripped" if is_imap_paused()
else "IMAP_AUTH_FAILED: Gmail auth failed (circuit breaker counting)"
],
}
# Non-auth failure or first-trip alert — send stack trace
tb = traceback.format_exc()
_alert_admin(
"Pipeline Error",
f"process_emails() crashed:\n\n```\n{tb[:600]}\n```",
)
return {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [f"UNHANDLED: {e}"],
}
def _process_emails_inner(dry_run=False, notify=True, quiet=False):
"""Internal implementation — wrapped by process_emails() error handler."""
result = {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [],
}
# Fetch and mark as read in one pass — no second connection needed
mail_data = fetch_unread(mark_read=not dry_run)
if "error" in mail_data:
result["errors"].append(mail_data["error"])
return result
emails = mail_data.get("emails", [])
result["emails_scanned"] = len(emails)
if not emails:
return result
for em in emails:
subject = em.get("subject", "")
body = em.get("body", "")
from_addr = em.get("from", "")
date_str = em.get("date", "")
print(f" Processing: {subject[:60]}...", file=sys.stderr)
# Step 1: Triage — classify email type
email_class = _triage_email(subject, body, from_addr)
print(f" Classified as: {email_class}", file=sys.stderr)
if email_class == "newsletter":
# Enrich body with URL content (e.g. Smore links)
enriched_body = enrich_body_with_urls(body)
# Parse as newsletter with multi-type extraction
items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
if not items:
print(f" No items extracted from newsletter", file=sys.stderr)
continue
result["newsletter_items"].append({
"email_from": from_addr,
"email_subject": subject,
"items_count": len(items),
"item_types": [i.get("type", "info") for i in items],
})
# Route each item type to its destination
routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject)
result["events_created"].extend(routed["events_created"])
result["errors"].extend(routed["errors"])
# Collect reminders/actions/info for reporting
if routed["reminders_created"]:
result.setdefault("reminders_created", []).extend(routed["reminders_created"])
if routed["actions_created"]:
result.setdefault("actions_created", []).extend(routed["actions_created"])
if routed.get("low_relevance_items"):
result["low_relevance_items"].extend(routed["low_relevance_items"])
# Build info summary for Telegram push
info_items = routed.get("info_items", [])
low_items = routed.get("low_relevance_items", [])
if info_items or low_items:
info_block = format_info_summary(
info_items,
low_relevance_items=low_items,
source_subject=subject,
)
if result["info_summary"]:
result["info_summary"] += "\n\n"
result["info_summary"] += info_block
# Ingest into Family Brain for RAG retrieval
if not dry_run:
try:
ingest_newsletter(
subject=subject,
body=enriched_body,
from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
items=items,
)
print(f" Ingested newsletter into Family Brain", file=sys.stderr)
except Exception as e:
print(f" Family Brain ingestion error: {e}", file=sys.stderr)
continue
# Step 2: Parse as appointment (existing path)
parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)
for parsed in parsed_list:
serialized = _serialize_result(parsed)
if parsed["type"] == "cancellation":
# Act on cancellation: find and delete the matching event
cancel_summary = parsed.get("summary", "")
cancel_start = parsed.get("start")
cancel_result = None
if cancel_start:
if dry_run:
cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary, "start": cancel_start.isoformat()}
print(f" [DRY RUN] Would cancel: {cancel_summary} at {cancel_start.isoformat()}", file=sys.stderr)
else:
cancel_result = find_and_cancel_event(cancel_summary, cancel_start)
if cancel_result is None:
cancel_result = {
"status": "CANCELLATION_NO_MATCH",
"summary": cancel_summary,
"start": cancel_start.isoformat() if cancel_start else None,
}
print(f" No matching event found for cancellation: {cancel_summary}", file=sys.stderr)
result["cancellations"].append({
"email_from": em["from"],
"email_subject": em["subject"],
"parsed": serialized,
"action": cancel_result,
})
continue
# It's an appointment
result["appointments_found"].append({
"email_from": em["from"],
"email_subject": em["subject"],
"parsed": serialized,
})
start_dt = parsed.get("start")
if not start_dt:
result["errors"].append(
f"Could not resolve datetime for: {parsed['summary']}"
)
continue
end_dt = parsed.get("end")
if not end_dt:
end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"])
who_str = ", ".join(parsed["who"]) if parsed["who"] else ""
summary = parsed["summary"]
if who_str:
summary = f"{summary} ({who_str})"
if dry_run:
is_recurring = parsed.get("is_recurring", False)
recurrence = parsed.get("recurrence") if is_recurring else None
result["events_created"].append({
"status": "DRY_RUN",
"summary": summary,
"start": start_dt.isoformat(),
"end": end_dt.isoformat(),
"location": _resolve_location(parsed["location"]),
"recurring": is_recurring,
"recurrence": recurrence,
})
else:
# Duplicate check: skip if event already exists on calendar
existing = event_exists(summary, start_dt)
if existing:
result["events_created"].append({
"status": "DUPLICATE_SKIPPED",
"summary": existing.get("summary", summary),
"start": existing["start"].get("dateTime", start_dt.isoformat()),
"end": existing["end"].get("dateTime", end_dt.isoformat()),
"existing_id": existing["id"],
})
print(f" Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr)
continue
try:
# Check if this is a recurring event
is_recurring = parsed.get("is_recurring", False)
recurrence = parsed.get("recurrence") if is_recurring else None
if is_recurring and recurrence:
event = create_recurring_event(
summary=summary,
start_dt=start_dt,
end_dt=end_dt,
recurrence=recurrence,
description=parsed["description"],
location=_resolve_location(parsed["location"]),
)
else:
event = create_event(
summary=summary,
start_dt=start_dt,
end_dt=end_dt,
description=parsed["description"],
location=_resolve_location(parsed["location"]),
)
result["events_created"].append({
"status": "CREATED",
"id": event["id"],
"summary": event.get("summary", summary),
"start": event["start"].get("dateTime", start_dt.isoformat()),
"end": event["end"].get("dateTime", end_dt.isoformat()),
"link": event.get("htmlLink", ""),
"recurring": is_recurring,
})
except Exception as e:
result["errors"].append(f"Calendar create error: {e}")
# Ingest appointment email into Family Brain
if not dry_run:
try:
ingest_email(
subject=subject,
body=body,
from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
parsed_items=parsed_list,
)
print(f" Ingested appointment email into Family Brain", file=sys.stderr)
except Exception as e:
print(f" Family Brain ingestion error: {e}", file=sys.stderr)
# Conflict detection: check newly created events against the calendar
created_events = [e for e in result["events_created"] if e.get("status") == "CREATED" and e.get("id")]
if created_events and not dry_run:
try:
conflicts = detect_conflicts(events=[{"id": e["id"], "summary": e.get("summary", ""), "start": e.get("start", ""), "end": e.get("end", "")} for e in created_events])
result["conflicts"] = conflicts
if conflicts:
print(f" ⚠️ {len(conflicts)} scheduling conflict(s) detected", file=sys.stderr)
for c in conflicts:
print(f" {c['event1']['summary']} conflicts with {c['event2']['summary']} ({c['overlap_minutes']}min overlap)", file=sys.stderr)
except Exception as e:
result["conflicts"] = []
result["errors"].append(f"Conflict detection error: {e}")
else:
result["conflicts"] = []
# Push results to Telegram via Hermes
if notify and not dry_run:
try:
hermes_stats = push_pipeline_results(result, quiet=quiet)
result["hermes"] = hermes_stats
except Exception as e:
result["errors"].append(f"Hermes notification error: {e}")
return result
def process_webhook_email(email_data: dict, dry_run=False, notify=True):
"""Process a single email received via the Cloudflare webhook.
This is the push-pipeline path: email arrives via the webhook address,
Cloudflare Worker POSTs to hook.hoffdesk.com/webhook, which calls this.
Reuses the same extraction and routing logic as process_emails(),
but skips the IMAP fetch entirely.
Args:
email_data: Dict with keys: from, to, subject, date, body_text,
body_html, has_attachments, attachments, dedup_key,
message_id, source ('webhook')
dry_run: If True, don't create calendar events.
notify: If True, push results to Telegram via Hermes.
Returns:
Dict with processing results.
"""
try:
return _process_webhook_email_inner(email_data, dry_run=dry_run, notify=notify)
except Exception as e:
tb = traceback.format_exc()
_alert_admin(
"Webhook Pipeline Error",
f"process_webhook_email() crashed:\n\n```\n{tb[:600]}\n```",
)
return {
"emails_scanned": 1,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [f"UNHANDLED: {e}"],
}
def _process_webhook_email_inner(email_data: dict, dry_run=False, notify=True):
"""Internal implementation — wrapped by process_webhook_email() error handler."""
result = {
"emails_scanned": 1,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [],
}
subject = email_data.get("subject", "")
body = email_data.get("body_text", "")
from_addr = email_data.get("from", "")
date_str = email_data.get("date", "")
print(f" [Webhook] Processing: {subject[:60]}...", file=sys.stderr)
# Step 1: Triage
email_class = _triage_email(subject, body, from_addr)
print(f" Classified as: {email_class}", file=sys.stderr)
if email_class == "newsletter":
enriched_body = enrich_body_with_urls(body)
items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
if not items:
print(f" No items extracted from newsletter", file=sys.stderr)
return result
result["newsletter_items"].append({
"email_from": from_addr,
"email_subject": subject,
"items_count": len(items),
"item_types": [i.get("type", "info") for i in items],
})
routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject)
result["events_created"].extend(routed["events_created"])
result["errors"].extend(routed["errors"])
if routed.get("reminders_created"):
result.setdefault("reminders_created", []).extend(routed["reminders_created"])
if routed.get("actions_created"):
result.setdefault("actions_created", []).extend(routed["actions_created"])
if routed.get("low_relevance_items"):
result["low_relevance_items"].extend(routed["low_relevance_items"])
info_items = routed.get("info_items", [])
low_items = routed.get("low_relevance_items", [])
if info_items or low_items:
result["info_summary"] = format_info_summary(
info_items, low_relevance_items=low_items, source_subject=subject,
)
if not dry_run:
try:
ingest_newsletter(
subject=subject, body=enriched_body, from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
items=items,
)
print(f" Ingested newsletter into Family Brain", file=sys.stderr)
except Exception as e:
print(f" Family Brain ingestion error: {e}", file=sys.stderr)
return _finalize_result(result, dry_run, notify)
# Step 2: Parse as appointment
parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)
for parsed in parsed_list:
serialized = _serialize_result(parsed)
if parsed["type"] == "cancellation":
cancel_summary = parsed.get("summary", "")
cancel_start = parsed.get("start")
cancel_result = None
if cancel_start:
if dry_run:
cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary,
"start": cancel_start.isoformat()}
else:
cancel_result = find_and_cancel_event(cancel_summary, cancel_start)
if cancel_result is None:
cancel_result = {"status": "CANCELLATION_NO_MATCH", "summary": cancel_summary,
"start": cancel_start.isoformat() if cancel_start else None}
result["cancellations"].append({
"email_from": from_addr, "email_subject": subject,
"parsed": serialized, "action": cancel_result,
})
continue
result["appointments_found"].append({
"email_from": from_addr, "email_subject": subject, "parsed": serialized,
})
start_dt = parsed.get("start")
if not start_dt:
result["errors"].append(f"Could not resolve datetime for: {parsed['summary']}")
continue
end_dt = parsed.get("end")
if not end_dt:
end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"])
who_str = ", ".join(parsed["who"]) if parsed["who"] else ""
summary = parsed["summary"]
if who_str:
summary = f"{summary} ({who_str})"
if dry_run:
is_recurring = parsed.get("is_recurring", False)
recurrence = parsed.get("recurrence") if is_recurring else None
result["events_created"].append({
"status": "DRY_RUN", "summary": summary,
"start": start_dt.isoformat(), "end": end_dt.isoformat(),
"location": _resolve_location(parsed["location"]), "recurring": is_recurring,
"recurrence": recurrence,
})
else:
existing = event_exists(summary, start_dt)
if existing:
result["events_created"].append({
"status": "DUPLICATE_SKIPPED",
"summary": existing.get("summary", summary),
"start": existing["start"].get("dateTime", start_dt.isoformat()),
"end": existing["end"].get("dateTime", end_dt.isoformat()),
"existing_id": existing.get("id"),
})
print(f" Skipping duplicate: {summary}", file=sys.stderr)
continue
try:
is_recurring = parsed.get("is_recurring", False)
recurrence = parsed.get("recurrence") if is_recurring else None
if is_recurring and recurrence:
event = create_recurring_event(
summary=summary, start_dt=start_dt, end_dt=end_dt,
recurrence=recurrence, description=parsed["description"],
location=_resolve_location(parsed["location"]),
)
else:
event = create_event(
summary=summary, start_dt=start_dt, end_dt=end_dt,
description=parsed["description"], location=_resolve_location(parsed["location"]),
)
result["events_created"].append({
"status": "CREATED", "id": event["id"],
"summary": event.get("summary", summary),
"start": event["start"].get("dateTime", start_dt.isoformat()),
"end": event["end"].get("dateTime", end_dt.isoformat()),
"link": event.get("htmlLink", ""), "recurring": is_recurring,
"location": _resolve_location(parsed.get("location", "")),
"_claimed_day_of_week": parsed.get("claimed_day_of_week", ""),
})
# Immediate per-event success notification
if notify and not dry_run:
hermes_notify(result["events_created"][-1], email_subject=subject)
except Exception as e:
result["errors"].append(f"Calendar create error: {e}")
# Ingest into Family Brain
if not dry_run:
try:
ingest_email(
subject=subject, body=body, from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
parsed_items=parsed_list,
)
print(f" Ingested appointment email into Family Brain", file=sys.stderr)
except Exception as e:
print(f" Family Brain ingestion error: {e}", file=sys.stderr)
return _finalize_result(result, dry_run, notify)
def _finalize_result(result, dry_run, notify):
"""Shared finalization: conflict detection + Hermes push."""
created_events = [e for e in result["events_created"]
if e.get("status") == "CREATED" and e.get("id")]
if created_events and not dry_run:
try:
conflicts = detect_conflicts(
events=[{"id": e["id"], "summary": e.get("summary", ""),
"start": e.get("start", ""), "end": e.get("end", "")}
for e in created_events]
)
result["conflicts"] = conflicts
except Exception as e:
result["conflicts"] = []
result["errors"].append(f"Conflict detection error: {e}")
else:
result["conflicts"] = []
if notify and not dry_run:
try:
hermes_stats = push_pipeline_results(result)
result["hermes"] = hermes_stats
except Exception as e:
result["errors"].append(f"Hermes notification error: {e}")
return result
def backfill_brain(days: int = 30) -> dict:
"""Backfill the Family Brain with emails from the last N days.
Re-processes already-seen emails purely for ChromaDB ingestion.
Does NOT create calendar events or send notifications.
Args:
days: Number of days to look back
Returns:
Dict with ingestion stats.
"""
try:
return _backfill_brain_inner(days=days)
except Exception as e:
tb = traceback.format_exc()
_alert_admin(
"Backfill Error",
f"backfill_brain() crashed:\n\n```\n{tb[:600]}\n```",
)
return {
"emails_scanned": 0,
"ingested_emails": 0,
"ingested_newsletters": 0,
"errors": [f"UNHANDLED: {e}"],
"skipped": 0,
}
def _backfill_brain_inner(days: int = 30) -> dict:
"""Internal implementation — wrapped by backfill_brain() error handler."""
from family_assistant.email_fetcher import fetch_since
from family_assistant.family_brain import ingest_email, ingest_newsletter
from family_assistant.newsletter_parser import classify_email, parse_newsletter_with_llm
from family_assistant.appointment_parser import parse_email_with_llm
# IMAP SINCE format: DD-Mon-YYYY (e.g. 18-Mar-2026)
since_dt = datetime.now(CHICAGO_TZ) - timedelta(days=days)
imap_date = since_dt.strftime("%d-%b-%Y")
print(f"Backfilling Family Brain: emails since {imap_date} ({days} days)", file=sys.stderr)
result = {
"emails_scanned": 0,
"ingested_emails": 0,
"ingested_newsletters": 0,
"errors": [],
"skipped": 0,
}
mail_data = fetch_since(imap_date)
if "error" in mail_data:
result["errors"].append(mail_data["error"])
return result
emails = mail_data.get("emails", [])
result["emails_scanned"] = len(emails)
if not emails:
print(" No emails found in date range", file=sys.stderr)
return result
for em in emails:
subject = em.get("subject", "")
body = em.get("body", "")
from_addr = em.get("from", "")
date_str = em.get("date", "")
print(f" Processing: {subject[:60]}...", file=sys.stderr)
# Classify
email_class = _triage_email(subject, body, from_addr)
print(f" Classified as: {email_class}", file=sys.stderr)
try:
if email_class == "newsletter":
enriched_body = enrich_body_with_urls(body)
items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
ingest_newsletter(
subject=subject,
body=enriched_body,
from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
items=items or [],
)
result["ingested_newsletters"] += 1
print(f" ✅ Ingested newsletter", file=sys.stderr)
else:
# Appointment email
parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)
# Serialize datetimes before passing to ingest
safe_parsed = []
for p in (parsed_list or []):
item = {}
for k, v in p.items():
if isinstance(v, datetime):
item[k] = v.isoformat()
elif isinstance(v, date) and not isinstance(v, datetime):
item[k] = v.isoformat()
else:
item[k] = v
safe_parsed.append(item)
ingest_email(
subject=subject,
body=body,
from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
parsed_items=safe_parsed,
)
result["ingested_emails"] += 1
print(f" ✅ Ingested email", file=sys.stderr)
except Exception as e:
result["errors"].append(f"{subject[:40]}: {e}")
print(f" ❌ Error: {e}", file=sys.stderr)
print(
f"\nBackfill complete: {result['ingested_emails']} emails + "
f"{result['ingested_newsletters']} newsletters ingested",
file=sys.stderr,
)
return result