📄 pipeline.py 47,135 bytes Apr 19, 2026 📋 Raw

"""Pipeline orchestration: fetch emails → classify → parse → route to calendar/reminder/info."""

import json
import os
import sys
import traceback
from datetime import datetime, timedelta, date
from zoneinfo import ZoneInfo

from family_assistant.config import CHICAGO_TZ, GMAIL_APP_PASSWORD
from family_assistant.email_fetcher import fetch_unread
from family_assistant.appointment_parser import parse_email_with_llm
from family_assistant.newsletter_parser import classify_email, parse_newsletter_with_llm
from family_assistant.family_brain import ingest_email, ingest_newsletter
from family_assistant.calendar_sync import (
event_exists,
create_event,
create_recurring_event,
find_and_cancel_event,
)
from family_assistant.conflict_engine import detect_conflicts
from family_assistant.hermes import push_pipeline_results, hermes_notify, _send_telegram

---------------------------------------------------------------------------

Auth Failure Circuit Breaker

---------------------------------------------------------------------------

Tracks consecutive auth failures to prevent error spam.

After MAX_AUTH_FAILURES consecutive failures, the IMAP pipeline is paused

and Matt is alerted once. Resets on first success or manual reset.

_AUTH_FAILURE_FILE = os.path.expanduser("~/.family_assistant/auth_circuit_breaker.json")
MAX_AUTH_FAILURES = 3 # Pause after this many consecutive failures

def _read_circuit_breaker():
"""Read circuit breaker state from disk."""
try:
with open(_AUTH_FAILURE_FILE, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {"consecutive_failures": 0, "paused": False, "last_alert_ts": None}

def _write_circuit_breaker(state):
"""Write circuit breaker state to disk."""
os.makedirs(os.path.dirname(_AUTH_FAILURE_FILE), exist_ok=True)
with open(_AUTH_FAILURE_FILE, "w") as f:
json.dump(state, f, indent=2)

def _record_auth_failure(error_msg: str) -> bool:
"""Record an auth failure. Returns True if circuit breaker just tripped.

After MAX_AUTH_FAILURES consecutive failures, sets paused=True and
alerts Matt once. Subsequent calls return False (already paused).
"""
state = _read_circuit_breaker()
state["consecutive_failures"] = state.get("consecutive_failures", 0) + 1

just_tripped = False
if state["consecutive_failures"] >= MAX_AUTH_FAILURES and not state.get("paused"):
    state["paused"] = True
    just_tripped = True

_write_circuit_breaker(state)

if just_tripped:
    _alert_admin(
        "🔴 IMAP Circuit Breaker Tripped",
        f"Gmail auth failed {state['consecutive_failures']} times in a row. "
        f"IMAP pipeline is now PAUSED to stop error spam.\n\n"
        f"To reset: python -m family_assistant reset-circuit\n"
        f"Or: rm {_AUTH_FAILURE_FILE}",
    )
return just_tripped

def _record_auth_success():
"""Record a successful auth — resets the circuit breaker."""
state = _read_circuit_breaker()
if state.get("consecutive_failures", 0) > 0 or state.get("paused"):
was_paused = state.get("paused", False)
state["consecutive_failures"] = 0
state["paused"] = False
_write_circuit_breaker(state)
if was_paused:
_alert_admin(
"🟢 IMAP Circuit Breaker Reset",
"Gmail auth succeeded. IMAP pipeline is back online.",
)

def is_imap_paused() -> bool:
"""Check if the IMAP pipeline is paused due to auth failures."""
return _read_circuit_breaker().get("paused", False)

def reset_circuit_breaker():
"""Manually reset the circuit breaker."""
state = {"consecutive_failures": 0, "paused": False, "last_alert_ts": None}
_write_circuit_breaker(state)
print("✅ Circuit breaker reset. IMAP pipeline re-enabled.")

def _resolve_location(location_str):
"""Resolve a location string through the cache to get the canonical name.

Prevents LLM hallucinations like 'at Golrusk'  'PetSmart' by preferring
the known-locations cache. Returns the canonical cached name if found,
otherwise returns the original string unchanged.
"""
if not location_str or not location_str.strip():
    return location_str
try:
    from family_assistant.location_cache import resolve
    result = resolve(location_str.strip(), use_home_bias=True)
    if result and result.get("name"):
        return result["name"]
except Exception:
    pass
return location_str

from family_assistant.url_fetcher import enrich_body_with_urls

---------------------------------------------------------------------------

Global error handler — no silent deaths

---------------------------------------------------------------------------

_DEV_TARGET = os.environ.get("TELEGRAM_DEV_ID", "")

def alert_admin(title: str, detail: str):
"""Push an error alert to Matt's DM. Best-effort, never raises."""
if not _DEV_TARGET:
print(f" [ALERT] {title}: {detail}", file=sys.stderr)
return
# Strip markdown special characters from detail to prevent Telegram parse failures.
# Brackets, backticks, asterisks, underscores in error strings break Markdown mode.
import re
safe_detail = re.sub(r'[
[]`]', '', detail[:800])
msg = f"⚠️
{title}*\n\n{safe_detail}"
try:
_send_telegram(msg, target=_DEV_TARGET)
except Exception:
print(f" [ALERT] Failed to send admin alert: {title}", file=sys.stderr)

def _serialize_result(parsed):
"""Convert datetime objects and other non-serializable types to strings."""
result = {}
for key, value in parsed.items():
if isinstance(value, datetime):
result[key] = value.isoformat()
elif isinstance(value, date) and not isinstance(value, datetime):
result[key] = value.isoformat()
elif isinstance(value, list):
result[key] = [
v.isoformat() if isinstance(v, (datetime, date)) else v
for v in value
]
elif isinstance(value, dict):
result[key] = {
k: v.isoformat() if isinstance(v, (datetime, date)) else v
for k, v in value.items()
}
else:
result[key] = value
return result

def _triage_email(subject, body, from_addr):
"""Classify an email as 'appointment' or 'newsletter'.

Uses the local LLM to make the classification. Falls back to 'appointment'
on errors (simpler parsing path, existing parser handles it).
"""
try:
    return classify_email(subject, body, from_addr)
except Exception as e:
    print(f"  [Triage] Classification error: {e}, defaulting to appointment", file=sys.stderr)
    return "appointment"

def _route_newsletter_items(items, dry_run=False, notify=True, email_subject=""):
"""Route newsletter items to their appropriate destinations.

Routing based on relevance:
- high relevance events/reminders/actions  Google Calendar
- low relevance items of any type  Telegram Info Digest (no calendar)
- info items  Telegram Info Digest (regardless of relevance)
- rejected items (per family.yaml rules)  skipped with note

Returns a dict with routed items and any errors.
"""
result = {
    "events_created": [],
    "reminders_created": [],
    "actions_created": [],
    "info_items": [],      # high-relevance info + low-relevance items of any type
    "low_relevance_items": [],  # explicit low-relevance tracking
    "rejected_items": [],  # items filtered by rejection rules
    "auto_filtered": [],      # items shadow-filtered to low relevance
    "errors": [],
}

# Shadow-filter rejected items — they stay in the list but get flagged
# as low_relevance with a note about which rule caught them.
# The user still sees what was filtered in the digest and can restore items.
from .rejection_engine import shadow_filter_items, should_reject
items = shadow_filter_items(items, scope="newsletter")

# Collect auto-filtered items for notification
for item in items:
    if item.get("_auto_filtered"):
        who_str = ", ".join(item.get("who", []))
        result["auto_filtered"].append({
            "type": item.get("original_type", "?"),
            "summary": item.get("summary", ""),
            "who": who_str,
            "rule": item.get("reason", ""),
        })
        print(f"    Auto-filtered: {item.get('summary', '?')} → low relevance", file=sys.stderr)

for item in items:
    item_type = item.get("type", "info")
    relevance = item.get("relevance", "high")
    reason = item.get("reason", "No reason provided")

    # LOW RELEVANCE → always goes to Telegram Info Digest, never calendar
    if relevance == "low":
        who_str = ", ".join(item.get("who", []))
        result["low_relevance_items"].append({
            "type": item_type,
            "summary": item.get("summary", ""),
            "who": who_str,
            "description": item.get("description", ""),
            "reason": reason,
        })
        # Also add to info_items for Telegram summary
        result["info_items"].append({
            "summary": f"[{item_type}] {item['summary']}",
            "who": who_str,
            "description": f"{item.get('description', '')} (low relevance: {reason})",
        })
        continue

    # INFO type → Telegram Info Digest regardless of relevance
    if item_type == "info":
        who_str = ", ".join(item.get("who", []))
        result["info_items"].append({
            "summary": item["summary"],
            "who": who_str,
            "description": item.get("description", ""),
        })
        continue

    # HIGH RELEVANCE items → route to calendar
    if item_type == "event":
        # Same path as appointments — create on calendar with dedup
        start_dt = item.get("start")
        if not start_dt:
            result["errors"].append(f"Event missing start: {item.get('summary', '?')}")
            continue

        end_dt = item.get("end")
        duration_minutes = item.get("duration_minutes", 60)
        if not end_dt:
            end_dt = start_dt + timedelta(minutes=duration_minutes)

        who_str = ", ".join(item.get("who", []))
        summary = item["summary"]
        if who_str:
            summary = f"{summary} ({who_str})"

        if dry_run:
            result["events_created"].append({
                "status": "DRY_RUN",
                "summary": summary,
                "start": start_dt.isoformat(),
                "end": end_dt.isoformat(),
                "location": _resolve_location(item.get("location", "")),
            })
            continue

        existing = event_exists(summary, start_dt)
        if existing:
            result["events_created"].append({
                "status": "DUPLICATE_SKIPPED",
                "summary": existing.get("summary", summary),
                "start": existing["start"].get("dateTime", start_dt.isoformat()),
                "end": existing["end"].get("dateTime", end_dt.isoformat()),
                "existing_id": existing["id"],
            })
            print(f"    Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr)
            continue

        try:
            # Check if this is a recurring event
            is_recurring = item.get("is_recurring", False)
            recurrence = item.get("recurrence") if is_recurring else None

            if is_recurring and recurrence:
                event = create_recurring_event(
                    summary=summary,
                    start_dt=start_dt,
                    end_dt=end_dt,
                    recurrence=recurrence,
                    description=item.get("description", ""),
                    location=_resolve_location(item.get("location", "")),
                )
            else:
                event = create_event(
                    summary=summary,
                    start_dt=start_dt,
                    end_dt=end_dt,
                    description=item.get("description", ""),
                    location=_resolve_location(item.get("location", "")),
                )
            result["events_created"].append({
                "status": "CREATED",
                "id": event["id"],
                "summary": event.get("summary", summary),
                "start": event["start"].get("dateTime", start_dt.isoformat()),
                "end": event["end"].get("dateTime", end_dt.isoformat()),
                "link": event.get("htmlLink", ""),
                "recurring": is_recurring,
                "location": _resolve_location(item.get("location", "")),
                "_claimed_day_of_week": item.get("claimed_day_of_week", ""),
            })
            # Immediate per-event success notification
            if notify and not dry_run:
                hermes_notify(result["events_created"][-1], email_subject=email_subject)
        except Exception as e:
            result["errors"].append(f"Calendar create error: {e}")

    elif item_type == "reminder":
        # All-day event with [REMINDER] prefix
        due = item.get("due")
        who_str = ", ".join(item.get("who", []))
        summary = f"[REMINDER] {item['summary']}"
        if who_str:
            summary += f" ({who_str})"
        if due:
            due_str = due.isoformat() if isinstance(due, date) else str(due)
        else:
            due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d")

        description = item.get("description", "")
        if due:
            description = f"Due: {due_str}\n" + description

        if dry_run:
            result["reminders_created"].append({
                "status": "DRY_RUN",
                "summary": summary,
                "date": due_str,
            })
            continue

        try:
            # Create as all-day event (date only, no time)
            event = create_event(
                summary=summary,
                start_dt=due_str,  # date string for all-day event
                end_dt=due_str,
                description=description,
            )
            result["reminders_created"].append({
                "status": "CREATED",
                "id": event["id"],
                "summary": event.get("summary", summary),
                "date": due_str,
            })
        except Exception as e:
            result["errors"].append(f"Reminder create error: {e}")

    elif item_type == "action_item":
        # All-day event with [ACTION] prefix
        due = item.get("due")
        who_str = ", ".join(item.get("who", []))
        summary = f"[ACTION] {item['summary']}"
        if who_str:
            summary += f" ({who_str})"
        if due:
            due_str = due.isoformat() if isinstance(due, date) else str(due)
        else:
            due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d")

        description = item.get("description", "")
        if due:
            description = f"Due: {due_str}\n" + description

        if dry_run:
            result["actions_created"].append({
                "status": "DRY_RUN",
                "summary": summary,
                "date": due_str,
                "description": description,
            })
            continue

        try:
            event = create_event(
                summary=summary,
                start_dt=due_str,
                end_dt=due_str,
                description=description,
            )
            result["actions_created"].append({
                "status": "CREATED",
                "id": event["id"],
                "summary": event.get("summary", summary),
                "date": due_str,
                "description": description,
            })
        except Exception as e:
            result["errors"].append(f"Action create error: {e}")


return result

def format_info_summary(info_items, low_relevance_items=None, source_subject=""):
"""Format info items into a Telegram-friendly summary string."""
if not info_items and not low_relevance_items:
return ""
lines = [f"📋 Newsletter Summary{': ' + source_subject if source_subject else ''}"]
for item in info_items:
who = f" ({item['who']})" if item.get('who') else ""
lines.append(f" • {item['summary']}{who}: {item['description']}")
if low_relevance_items:
lines.append("\n 📌 Low relevance (not added to calendar):")
for item in low_relevance_items:
who = f" ({item['who']})" if item.get('who') else ""
lines.append(f" • [{item.get('type', '?')}] {item['summary']}{who} — {item.get('reason', '')}")
return "\n".join(lines)

def process_emails(dry_run=False, notify=True, quiet=False):
"""Fetch unread emails, parse appointments via LLM, create calendar events.

All fetched emails are marked as read after processing, regardless of
whether they contained appointments. This prevents re-processing the same
emails on subsequent heartbeat runs.

Includes auth failure circuit breaker: after 3 consecutive Gmail auth
failures, the IMAP pipeline is paused to prevent error spam.

Args:
    dry_run: If True, don't create calendar events or mark emails read.
    notify: If True, push results to Telegram via Hermes.
    quiet: If True, suppress family group notifications (only DM alerts).
"""
# Check circuit breaker first — skip IMAP if paused
if is_imap_paused():
    return {
        "emails_scanned": 0,
        "appointments_found": [],
        "cancellations": [],
        "events_created": [],
        "newsletter_items": [],
        "info_summary": "",
        "low_relevance_items": [],
        "errors": ["IMAP_PAUSED: Gmail auth circuit breaker is tripped. Run 'family-assistant reset-circuit' to re-enable."],
    }

# Skip IMAP entirely if no credentials configured (webhook-only mode)
if not GMAIL_APP_PASSWORD:
    return {
        "emails_scanned": 0,
        "appointments_found": [],
        "cancellations": [],
        "events_created": [],
        "newsletter_items": [],
        "info_summary": "",
        "low_relevance_items": [],
        "errors": ["IMAP_SKIPPED: No Gmail credentials (webhook-only mode)"],
    }

try:
    result = _process_emails_inner(dry_run=dry_run, notify=notify, quiet=quiet)
    # Success — reset circuit breaker
    _record_auth_success()
    return result
except Exception as e:
    # Check if this is an auth failure
    error_str = str(e).lower()
    is_auth_failure = any(
        sig in error_str
        for sig in ["authentication", "invalid_grant", "credentials",
                    "auth failed", "login failed", "[auth]",
                    "[alert]", "suspen", "disabled", "access denied"]
    )

    if is_auth_failure:
        just_tripped = _record_auth_failure(str(e)[:200])
        # Whether we just tripped or are still counting — return silently
        return {
            "emails_scanned": 0,
            "appointments_found": [],
            "cancellations": [],
            "events_created": [],
            "newsletter_items": [],
            "info_summary": "",
            "low_relevance_items": [],
            "errors": [
                "IMAP_PAUSED: circuit breaker tripped" if is_imap_paused()
                else "IMAP_AUTH_FAILED: Gmail auth failed (circuit breaker counting)"
            ],
        }

    # Non-auth failure or first-trip alert — send stack trace
    tb = traceback.format_exc()
    _alert_admin(
        "Pipeline Error",
        f"process_emails() crashed:\n\n```\n{tb[:600]}\n```",
    )
    return {
        "emails_scanned": 0,
        "appointments_found": [],
        "cancellations": [],
        "events_created": [],
        "newsletter_items": [],
        "info_summary": "",
        "low_relevance_items": [],
        "errors": [f"UNHANDLED: {e}"],
    }

def _process_emails_inner(dry_run=False, notify=True, quiet=False):
"""Internal implementation — wrapped by process_emails() error handler."""
result = {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [],
}

# Fetch and mark as read in one pass — no second connection needed
mail_data = fetch_unread(mark_read=not dry_run)
if "error" in mail_data:
    result["errors"].append(mail_data["error"])
    return result

emails = mail_data.get("emails", [])
result["emails_scanned"] = len(emails)

if not emails:
    return result

for em in emails:
    subject = em.get("subject", "")
    body = em.get("body", "")
    from_addr = em.get("from", "")
    date_str = em.get("date", "")

    print(f"  Processing: {subject[:60]}...", file=sys.stderr)

    # Step 1: Triage — classify email type
    email_class = _triage_email(subject, body, from_addr)
    print(f"    Classified as: {email_class}", file=sys.stderr)

    if email_class == "newsletter":
        # Enrich body with URL content (e.g. Smore links)
        enriched_body = enrich_body_with_urls(body)

        # Parse as newsletter with multi-type extraction
        items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
        if not items:
            print(f"    No items extracted from newsletter", file=sys.stderr)
            continue

        result["newsletter_items"].append({
            "email_from": from_addr,
            "email_subject": subject,
            "items_count": len(items),
            "item_types": [i.get("type", "info") for i in items],
        })

        # Route each item type to its destination
        routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject)
        result["events_created"].extend(routed["events_created"])
        result["errors"].extend(routed["errors"])

        # Collect reminders/actions/info for reporting
        if routed["reminders_created"]:
            result.setdefault("reminders_created", []).extend(routed["reminders_created"])
        if routed["actions_created"]:
            result.setdefault("actions_created", []).extend(routed["actions_created"])
        if routed.get("low_relevance_items"):
            result["low_relevance_items"].extend(routed["low_relevance_items"])

        # Build info summary for Telegram push
        info_items = routed.get("info_items", [])
        low_items = routed.get("low_relevance_items", [])
        if info_items or low_items:
            info_block = format_info_summary(
                info_items,
                low_relevance_items=low_items,
                source_subject=subject,
            )
            if result["info_summary"]:
                result["info_summary"] += "\n\n"
            result["info_summary"] += info_block

        # Ingest into Family Brain for RAG retrieval
        if not dry_run:
            try:
                ingest_newsletter(
                    subject=subject,
                    body=enriched_body,
                    from_addr=from_addr,
                    email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                    items=items,
                )
                print(f"    Ingested newsletter into Family Brain", file=sys.stderr)
            except Exception as e:
                print(f"    Family Brain ingestion error: {e}", file=sys.stderr)

        continue

    # Step 2: Parse as appointment (existing path)
    parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)

    for parsed in parsed_list:
        serialized = _serialize_result(parsed)
        if parsed["type"] == "cancellation":
            # Act on cancellation: find and delete the matching event
            cancel_summary = parsed.get("summary", "")
            cancel_start = parsed.get("start")
            cancel_result = None
            if cancel_start:
                if dry_run:
                    cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary, "start": cancel_start.isoformat()}
                    print(f"    [DRY RUN] Would cancel: {cancel_summary} at {cancel_start.isoformat()}", file=sys.stderr)
                else:
                    cancel_result = find_and_cancel_event(cancel_summary, cancel_start)
            if cancel_result is None:
                cancel_result = {
                    "status": "CANCELLATION_NO_MATCH",
                    "summary": cancel_summary,
                    "start": cancel_start.isoformat() if cancel_start else None,
                }
                print(f"    No matching event found for cancellation: {cancel_summary}", file=sys.stderr)
            result["cancellations"].append({
                "email_from": em["from"],
                "email_subject": em["subject"],
                "parsed": serialized,
                "action": cancel_result,
            })
            continue

        # It's an appointment
        result["appointments_found"].append({
            "email_from": em["from"],
            "email_subject": em["subject"],
            "parsed": serialized,
        })

        start_dt = parsed.get("start")
        if not start_dt:
            result["errors"].append(
                f"Could not resolve datetime for: {parsed['summary']}"
            )
            continue

        end_dt = parsed.get("end")
        if not end_dt:
            end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"])

        who_str = ", ".join(parsed["who"]) if parsed["who"] else ""
        summary = parsed["summary"]
        if who_str:
            summary = f"{summary} ({who_str})"

        if dry_run:
            is_recurring = parsed.get("is_recurring", False)
            recurrence = parsed.get("recurrence") if is_recurring else None
            result["events_created"].append({
                "status": "DRY_RUN",
                "summary": summary,
                "start": start_dt.isoformat(),
                "end": end_dt.isoformat(),
                "location": _resolve_location(parsed["location"]),
                "recurring": is_recurring,
                "recurrence": recurrence,
            })
        else:
            # Duplicate check: skip if event already exists on calendar
            existing = event_exists(summary, start_dt)
            if existing:
                result["events_created"].append({
                    "status": "DUPLICATE_SKIPPED",
                    "summary": existing.get("summary", summary),
                    "start": existing["start"].get("dateTime", start_dt.isoformat()),
                    "end": existing["end"].get("dateTime", end_dt.isoformat()),
                    "existing_id": existing["id"],
                })
                print(f"    Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr)
                continue

            try:
                # Check if this is a recurring event
                is_recurring = parsed.get("is_recurring", False)
                recurrence = parsed.get("recurrence") if is_recurring else None

                if is_recurring and recurrence:
                    event = create_recurring_event(
                        summary=summary,
                        start_dt=start_dt,
                        end_dt=end_dt,
                        recurrence=recurrence,
                        description=parsed["description"],
                        location=_resolve_location(parsed["location"]),
                    )
                else:
                    event = create_event(
                        summary=summary,
                        start_dt=start_dt,
                        end_dt=end_dt,
                        description=parsed["description"],
                        location=_resolve_location(parsed["location"]),
                    )
                result["events_created"].append({
                    "status": "CREATED",
                    "id": event["id"],
                    "summary": event.get("summary", summary),
                    "start": event["start"].get("dateTime", start_dt.isoformat()),
                    "end": event["end"].get("dateTime", end_dt.isoformat()),
                    "link": event.get("htmlLink", ""),
                    "recurring": is_recurring,
                })
            except Exception as e:
                result["errors"].append(f"Calendar create error: {e}")

        # Ingest appointment email into Family Brain
        if not dry_run:
            try:
                ingest_email(
                    subject=subject,
                    body=body,
                    from_addr=from_addr,
                    email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                    parsed_items=parsed_list,
                )
                print(f"    Ingested appointment email into Family Brain", file=sys.stderr)
            except Exception as e:
                print(f"    Family Brain ingestion error: {e}", file=sys.stderr)

# Conflict detection: check newly created events against the calendar
created_events = [e for e in result["events_created"] if e.get("status") == "CREATED" and e.get("id")]
if created_events and not dry_run:
    try:
        conflicts = detect_conflicts(events=[{"id": e["id"], "summary": e.get("summary", ""), "start": e.get("start", ""), "end": e.get("end", "")} for e in created_events])
        result["conflicts"] = conflicts
        if conflicts:
            print(f"  ⚠️  {len(conflicts)} scheduling conflict(s) detected", file=sys.stderr)
            for c in conflicts:
                print(f"    {c['event1']['summary']} conflicts with {c['event2']['summary']} ({c['overlap_minutes']}min overlap)", file=sys.stderr)
    except Exception as e:
        result["conflicts"] = []
        result["errors"].append(f"Conflict detection error: {e}")
else:
    result["conflicts"] = []

# Push results to Telegram via Hermes
if notify and not dry_run:
    try:
        hermes_stats = push_pipeline_results(result, quiet=quiet)
        result["hermes"] = hermes_stats
    except Exception as e:
        result["errors"].append(f"Hermes notification error: {e}")

return result

def process_webhook_email(email_data: dict, dry_run=False, notify=True):
"""Process a single email received via the Cloudflare webhook.

This is the push-pipeline path: email arrives via the webhook address,
Cloudflare Worker POSTs to hook.hoffdesk.com/webhook, which calls this.

Reuses the same extraction and routing logic as process_emails(),
but skips the IMAP fetch entirely.

Args:
    email_data: Dict with keys: from, to, subject, date, body_text,
                body_html, has_attachments, attachments, dedup_key,
                message_id, source ('webhook')
    dry_run: If True, don't create calendar events.
    notify: If True, push results to Telegram via Hermes.

Returns:
    Dict with processing results.
"""
try:
    return _process_webhook_email_inner(email_data, dry_run=dry_run, notify=notify)
except Exception as e:
    tb = traceback.format_exc()
    _alert_admin(
        "Webhook Pipeline Error",
        f"process_webhook_email() crashed:\n\n```\n{tb[:600]}\n```",
    )
    return {
        "emails_scanned": 1,
        "appointments_found": [],
        "cancellations": [],
        "events_created": [],
        "newsletter_items": [],
        "info_summary": "",
        "low_relevance_items": [],
        "errors": [f"UNHANDLED: {e}"],
    }

def _process_webhook_email_inner(email_data: dict, dry_run=False, notify=True):
"""Internal implementation — wrapped by process_webhook_email() error handler."""
result = {
"emails_scanned": 1,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [],
}

subject = email_data.get("subject", "")
body = email_data.get("body_text", "")
from_addr = email_data.get("from", "")
date_str = email_data.get("date", "")

print(f"  [Webhook] Processing: {subject[:60]}...", file=sys.stderr)

# Step 1: Triage
email_class = _triage_email(subject, body, from_addr)
print(f"    Classified as: {email_class}", file=sys.stderr)

if email_class == "newsletter":
    enriched_body = enrich_body_with_urls(body)
    items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
    if not items:
        print(f"    No items extracted from newsletter", file=sys.stderr)
        return result

    result["newsletter_items"].append({
        "email_from": from_addr,
        "email_subject": subject,
        "items_count": len(items),
        "item_types": [i.get("type", "info") for i in items],
    })

    routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject)
    result["events_created"].extend(routed["events_created"])
    result["errors"].extend(routed["errors"])

    if routed.get("reminders_created"):
        result.setdefault("reminders_created", []).extend(routed["reminders_created"])
    if routed.get("actions_created"):
        result.setdefault("actions_created", []).extend(routed["actions_created"])
    if routed.get("low_relevance_items"):
        result["low_relevance_items"].extend(routed["low_relevance_items"])

    info_items = routed.get("info_items", [])
    low_items = routed.get("low_relevance_items", [])
    if info_items or low_items:
        result["info_summary"] = format_info_summary(
            info_items, low_relevance_items=low_items, source_subject=subject,
        )

    if not dry_run:
        try:
            ingest_newsletter(
                subject=subject, body=enriched_body, from_addr=from_addr,
                email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                items=items,
            )
            print(f"    Ingested newsletter into Family Brain", file=sys.stderr)
        except Exception as e:
            print(f"    Family Brain ingestion error: {e}", file=sys.stderr)

    return _finalize_result(result, dry_run, notify)

# Step 2: Parse as appointment
parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)

for parsed in parsed_list:
    serialized = _serialize_result(parsed)

    if parsed["type"] == "cancellation":
        cancel_summary = parsed.get("summary", "")
        cancel_start = parsed.get("start")
        cancel_result = None
        if cancel_start:
            if dry_run:
                cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary,
                                 "start": cancel_start.isoformat()}
            else:
                cancel_result = find_and_cancel_event(cancel_summary, cancel_start)
        if cancel_result is None:
            cancel_result = {"status": "CANCELLATION_NO_MATCH", "summary": cancel_summary,
                             "start": cancel_start.isoformat() if cancel_start else None}
        result["cancellations"].append({
            "email_from": from_addr, "email_subject": subject,
            "parsed": serialized, "action": cancel_result,
        })
        continue

    result["appointments_found"].append({
        "email_from": from_addr, "email_subject": subject, "parsed": serialized,
    })

    start_dt = parsed.get("start")
    if not start_dt:
        result["errors"].append(f"Could not resolve datetime for: {parsed['summary']}")
        continue

    end_dt = parsed.get("end")
    if not end_dt:
        end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"])

    who_str = ", ".join(parsed["who"]) if parsed["who"] else ""
    summary = parsed["summary"]
    if who_str:
        summary = f"{summary} ({who_str})"

    if dry_run:
        is_recurring = parsed.get("is_recurring", False)
        recurrence = parsed.get("recurrence") if is_recurring else None
        result["events_created"].append({
            "status": "DRY_RUN", "summary": summary,
            "start": start_dt.isoformat(), "end": end_dt.isoformat(),
            "location": _resolve_location(parsed["location"]), "recurring": is_recurring,
            "recurrence": recurrence,
        })
    else:
        existing = event_exists(summary, start_dt)
        if existing:
            result["events_created"].append({
                "status": "DUPLICATE_SKIPPED",
                "summary": existing.get("summary", summary),
                "start": existing["start"].get("dateTime", start_dt.isoformat()),
                "end": existing["end"].get("dateTime", end_dt.isoformat()),
                "existing_id": existing.get("id"),
            })
            print(f"    Skipping duplicate: {summary}", file=sys.stderr)
            continue

        try:
            is_recurring = parsed.get("is_recurring", False)
            recurrence = parsed.get("recurrence") if is_recurring else None

            if is_recurring and recurrence:
                event = create_recurring_event(
                    summary=summary, start_dt=start_dt, end_dt=end_dt,
                    recurrence=recurrence, description=parsed["description"],
                    location=_resolve_location(parsed["location"]),
                )
            else:
                event = create_event(
                    summary=summary, start_dt=start_dt, end_dt=end_dt,
                    description=parsed["description"], location=_resolve_location(parsed["location"]),
                )
            result["events_created"].append({
                "status": "CREATED", "id": event["id"],
                "summary": event.get("summary", summary),
                "start": event["start"].get("dateTime", start_dt.isoformat()),
                "end": event["end"].get("dateTime", end_dt.isoformat()),
                "link": event.get("htmlLink", ""), "recurring": is_recurring,
                "location": _resolve_location(parsed.get("location", "")),
                "_claimed_day_of_week": parsed.get("claimed_day_of_week", ""),
            })
            # Immediate per-event success notification
            if notify and not dry_run:
                hermes_notify(result["events_created"][-1], email_subject=subject)
        except Exception as e:
            result["errors"].append(f"Calendar create error: {e}")

    # Ingest into Family Brain
    if not dry_run:
        try:
            ingest_email(
                subject=subject, body=body, from_addr=from_addr,
                email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                parsed_items=parsed_list,
            )
            print(f"    Ingested appointment email into Family Brain", file=sys.stderr)
        except Exception as e:
            print(f"    Family Brain ingestion error: {e}", file=sys.stderr)

return _finalize_result(result, dry_run, notify)

def _finalize_result(result, dry_run, notify):
"""Shared finalization: conflict detection + Hermes push."""
created_events = [e for e in result["events_created"]
if e.get("status") == "CREATED" and e.get("id")]
if created_events and not dry_run:
try:
conflicts = detect_conflicts(
events=[{"id": e["id"], "summary": e.get("summary", ""),
"start": e.get("start", ""), "end": e.get("end", "")}
for e in created_events]
)
result["conflicts"] = conflicts
except Exception as e:
result["conflicts"] = []
result["errors"].append(f"Conflict detection error: {e}")
else:
result["conflicts"] = []

if notify and not dry_run:
    try:
        hermes_stats = push_pipeline_results(result)
        result["hermes"] = hermes_stats
    except Exception as e:
        result["errors"].append(f"Hermes notification error: {e}")

return result

def backfill_brain(days: int = 30) -> dict:
"""Backfill the Family Brain with emails from the last N days.

Re-processes already-seen emails purely for ChromaDB ingestion.
Does NOT create calendar events or send notifications.

Args:
    days: Number of days to look back

Returns:
    Dict with ingestion stats.
"""
try:
    return _backfill_brain_inner(days=days)
except Exception as e:
    tb = traceback.format_exc()
    _alert_admin(
        "Backfill Error",
        f"backfill_brain() crashed:\n\n```\n{tb[:600]}\n```",
    )
    return {
        "emails_scanned": 0,
        "ingested_emails": 0,
        "ingested_newsletters": 0,
        "errors": [f"UNHANDLED: {e}"],
        "skipped": 0,
    }

def _backfill_brain_inner(days: int = 30) -> dict:
"""Internal implementation — wrapped by backfill_brain() error handler."""
from family_assistant.email_fetcher import fetch_since
from family_assistant.family_brain import ingest_email, ingest_newsletter
from family_assistant.newsletter_parser import classify_email, parse_newsletter_with_llm
from family_assistant.appointment_parser import parse_email_with_llm

# IMAP SINCE format: DD-Mon-YYYY (e.g. 18-Mar-2026)
since_dt = datetime.now(CHICAGO_TZ) - timedelta(days=days)
imap_date = since_dt.strftime("%d-%b-%Y")

print(f"Backfilling Family Brain: emails since {imap_date} ({days} days)", file=sys.stderr)

result = {
    "emails_scanned": 0,
    "ingested_emails": 0,
    "ingested_newsletters": 0,
    "errors": [],
    "skipped": 0,
}

mail_data = fetch_since(imap_date)
if "error" in mail_data:
    result["errors"].append(mail_data["error"])
    return result

emails = mail_data.get("emails", [])
result["emails_scanned"] = len(emails)

if not emails:
    print("  No emails found in date range", file=sys.stderr)
    return result

for em in emails:
    subject = em.get("subject", "")
    body = em.get("body", "")
    from_addr = em.get("from", "")
    date_str = em.get("date", "")

    print(f"  Processing: {subject[:60]}...", file=sys.stderr)

    # Classify
    email_class = _triage_email(subject, body, from_addr)
    print(f"    Classified as: {email_class}", file=sys.stderr)

    try:
        if email_class == "newsletter":
            enriched_body = enrich_body_with_urls(body)
            items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
            ingest_newsletter(
                subject=subject,
                body=enriched_body,
                from_addr=from_addr,
                email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                items=items or [],
            )
            result["ingested_newsletters"] += 1
            print(f"    ✅ Ingested newsletter", file=sys.stderr)

        else:
            # Appointment email
            parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)
            # Serialize datetimes before passing to ingest
            safe_parsed = []
            for p in (parsed_list or []):
                item = {}
                for k, v in p.items():
                    if isinstance(v, datetime):
                        item[k] = v.isoformat()
                    elif isinstance(v, date) and not isinstance(v, datetime):
                        item[k] = v.isoformat()
                    else:
                        item[k] = v
                safe_parsed.append(item)
            ingest_email(
                subject=subject,
                body=body,
                from_addr=from_addr,
                email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                parsed_items=safe_parsed,
            )
            result["ingested_emails"] += 1
            print(f"    ✅ Ingested email", file=sys.stderr)
    except Exception as e:
        result["errors"].append(f"{subject[:40]}: {e}")
        print(f"    ❌ Error: {e}", file=sys.stderr)

print(
    f"\nBackfill complete: {result['ingested_emails']} emails + "
    f"{result['ingested_newsletters']} newsletters ingested",
    file=sys.stderr,
)
return result