📄 pipeline.py 54,230 bytes Apr 27, 2026 📋 Raw

"""Pipeline orchestration: fetch emails → classify → parse → route to calendar/reminder/info."""

import json
import os
import sys
import traceback
from datetime import datetime, timedelta, date
from zoneinfo import ZoneInfo

from family_assistant.config import CHICAGO_TZ, GMAIL_APP_PASSWORD
from family_assistant.email_fetcher import fetch_unread
from family_assistant.appointment_parser import parse_email_with_llm
from family_assistant.newsletter_parser import classify_email, parse_newsletter_with_llm
from family_assistant.family_brain import ingest_email, ingest_newsletter
from family_assistant.calendar_sync import (
event_exists,
create_event,
create_recurring_event,
find_and_cancel_event,
)
from family_assistant.conflict_engine import detect_conflicts
from family_assistant.hermes import push_pipeline_results, hermes_notify, _send_telegram, format_payment_alert

---------------------------------------------------------------------------

Payment Alert Detection

---------------------------------------------------------------------------

Fast keyword pre-scan: if ANY of these appear in subject or body,

the LLM classifier is strongly biased toward payment_alert.

_PAYMENT_ALERT_KEYWORDS = (
"payment failed",
"card declined",
"declined", # catch "payment declined" or "transaction declined"
"transaction failed",
"card expired",
"update your payment",
"payment method",
"subscription suspended",
"account will be canceled",
"your payment could not be processed",
"payment was unsuccessful",
"unpaid invoice",
"past due",
"overdue payment",
)

def _scan_payment_alert(subject, body):
"""Quick keyword scan for payment alert signals.

Returns True if any payment alert keyword is found in subject or body.
This runs BEFORE the LLM classification to provide a fast signal.
"""
text = f"{subject} {body}".lower()
return any(kw in text for kw in _PAYMENT_ALERT_KEYWORDS)

def parse_payment_alert_with_llm(subject, body, from_addr="", date_str=""):
"""Extract payment alert details from an email using the LLM.

Returns a dict with merchant, alert_type, amount, deadline,
action_needed, summary  or None if not a real payment alert.
"""
from family_assistant.config import LLM_URL, LLM_MODEL, LLM_TIMEOUT, MAX_BODY_CHARS, load_prompts
import requests as _requests

prompts = load_prompts()
system_template = prompts.get("payment_alert_extract", "")
if not system_template:
    # Fallback if prompt not found
    print("  [PaymentAlert] payment_alert_extract prompt not found", file=sys.stderr)
    return None

today = datetime.now(CHICAGO_TZ)
today_str = today.strftime("%Y-%m-%d")
system_msg = system_template.format(today=today_str)

trimmed_body = body[:MAX_BODY_CHARS] if body else ""
user_msg = f"Subject: {subject}\nFrom: {from_addr}\nDate: {date_str}\n\n{trimmed_body}"

payload = {
    "model": LLM_MODEL,
    "messages": [
        {"role": "system", "content": system_msg},
        {"role": "user", "content": user_msg},
    ],
    "temperature": 0,
}
try:
    resp = _requests.post(LLM_URL, json=payload, timeout=LLM_TIMEOUT)
    resp.raise_for_status()
    data = resp.json()
    raw = data["choices"][0]["message"]["content"].strip()
except Exception as e:
    print(f"  [PaymentAlert] LLM error: {e}", file=sys.stderr)
    return None

# Parse JSON from LLM response
if not raw:
    return None
raw = raw.strip()
if raw.startswith("```"):
    import re
    raw = re.sub(r'^```(?:json)?\s*\n?', '', raw)
    raw = re.sub(r'\n?```\s*$', '', raw)
    raw = raw.strip()
try:
    parsed = json.loads(raw)
except json.JSONDecodeError:
    print(f"  [PaymentAlert] JSON parse error", file=sys.stderr)
    return None

if not isinstance(parsed, dict) or parsed.get("not_a_payment_alert"):
    return None

# Validate required fields
if not parsed.get("merchant"):
    parsed["merchant"] = "Unknown Service"
if not parsed.get("alert_type"):
    parsed["alert_type"] = "payment_failed"
if not parsed.get("summary"):
    merchant = parsed.get("merchant", "")
    amount = parsed.get("amount", "")
    parsed["summary"] = f"{merchant} payment failed" + (f" ({amount})" if amount else "")

return parsed

---------------------------------------------------------------------------

Auth Failure Circuit Breaker

---------------------------------------------------------------------------

Tracks consecutive auth failures to prevent error spam.

After MAX_AUTH_FAILURES consecutive failures, the IMAP pipeline is paused

and Matt is alerted once. Resets on first success or manual reset.

_AUTH_FAILURE_FILE = os.path.expanduser("~/.family_assistant/auth_circuit_breaker.json")
MAX_AUTH_FAILURES = 3 # Pause after this many consecutive failures

def _read_circuit_breaker():
"""Read circuit breaker state from disk."""
try:
with open(_AUTH_FAILURE_FILE, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {"consecutive_failures": 0, "paused": False, "last_alert_ts": None}

def _write_circuit_breaker(state):
"""Write circuit breaker state to disk."""
os.makedirs(os.path.dirname(_AUTH_FAILURE_FILE), exist_ok=True)
with open(_AUTH_FAILURE_FILE, "w") as f:
json.dump(state, f, indent=2)

def _record_auth_failure(error_msg: str) -> bool:
"""Record an auth failure. Returns True if circuit breaker just tripped.

After MAX_AUTH_FAILURES consecutive failures, sets paused=True and
alerts Matt once. Subsequent calls return False (already paused).
"""
state = _read_circuit_breaker()
state["consecutive_failures"] = state.get("consecutive_failures", 0) + 1

just_tripped = False
if state["consecutive_failures"] >= MAX_AUTH_FAILURES and not state.get("paused"):
    state["paused"] = True
    just_tripped = True

_write_circuit_breaker(state)

if just_tripped:
    _alert_admin(
        "🔴 IMAP Circuit Breaker Tripped",
        f"Gmail auth failed {state['consecutive_failures']} times in a row. "
        f"IMAP pipeline is now PAUSED to stop error spam.\n\n"
        f"To reset: python -m family_assistant reset-circuit\n"
        f"Or: rm {_AUTH_FAILURE_FILE}",
    )
return just_tripped

def _record_auth_success():
"""Record a successful auth — resets the circuit breaker."""
state = _read_circuit_breaker()
if state.get("consecutive_failures", 0) > 0 or state.get("paused"):
was_paused = state.get("paused", False)
state["consecutive_failures"] = 0
state["paused"] = False
_write_circuit_breaker(state)
if was_paused:
_alert_admin(
"🟢 IMAP Circuit Breaker Reset",
"Gmail auth succeeded. IMAP pipeline is back online.",
)

def is_imap_paused() -> bool:
"""Check if the IMAP pipeline is paused due to auth failures."""
return _read_circuit_breaker().get("paused", False)

def reset_circuit_breaker():
"""Manually reset the circuit breaker."""
state = {"consecutive_failures": 0, "paused": False, "last_alert_ts": None}
_write_circuit_breaker(state)
print("✅ Circuit breaker reset. IMAP pipeline re-enabled.")

def _resolve_location(location_str):
"""Resolve a location string through the cache to get the canonical name.

Prevents LLM hallucinations like 'at Golrusk'  'PetSmart' by preferring
the known-locations cache. Returns the canonical cached name if found,
otherwise returns the original string unchanged.
"""
if not location_str or not location_str.strip():
    return location_str
try:
    from family_assistant.location_cache import resolve
    result = resolve(location_str.strip(), use_home_bias=True)
    if result and result.get("name"):
        return result["name"]
except Exception:
    pass
return location_str

from family_assistant.url_fetcher import enrich_body_with_urls

---------------------------------------------------------------------------

Global error handler — no silent deaths

---------------------------------------------------------------------------

_DEV_TARGET = os.environ.get("TELEGRAM_DEV_ID", "")

def alert_admin(title: str, detail: str):
"""Push an error alert to Matt's DM. Best-effort, never raises."""
if not _DEV_TARGET:
print(f" [ALERT] {title}: {detail}", file=sys.stderr)
return
# Strip markdown special characters from detail to prevent Telegram parse failures.
# Brackets, backticks, asterisks, underscores in error strings break Markdown mode.
import re
safe_detail = re.sub(r'[
[]`]', '', detail[:800])
msg = f"⚠️
{title}*\n\n{safe_detail}"
try:
_send_telegram(msg, target=_DEV_TARGET)
except Exception:
print(f" [ALERT] Failed to send admin alert: {title}", file=sys.stderr)

def _serialize_result(parsed):
"""Convert datetime objects and other non-serializable types to strings."""
result = {}
for key, value in parsed.items():
if isinstance(value, datetime):
result[key] = value.isoformat()
elif isinstance(value, date) and not isinstance(value, datetime):
result[key] = value.isoformat()
elif isinstance(value, list):
result[key] = [
v.isoformat() if isinstance(v, (datetime, date)) else v
for v in value
]
elif isinstance(value, dict):
result[key] = {
k: v.isoformat() if isinstance(v, (datetime, date)) else v
for k, v in value.items()
}
else:
result[key] = value
return result

def _triage_email(subject, body, from_addr):
"""Classify an email as 'appointment', 'newsletter', or 'payment_alert'.

Uses a fast keyword pre-scan for payment alerts, then the LLM classifier.
If payment alert keywords are found, injects a hint into the classification
prompt so the LLM knows to check for payment issues.

Falls back to 'appointment' on errors (simpler parsing path).
"""
# Fast pre-scan: if payment alert keywords are present, add context
is_payment_candidate = _scan_payment_alert(subject, body)

try:
    result = classify_email(subject, body, from_addr, payment_hint=is_payment_candidate)
    return result
except Exception as e:
    print(f"  [Triage] Classification error: {e}, defaulting to appointment", file=sys.stderr)
    return "appointment"

def _route_newsletter_items(items, dry_run=False, notify=True, email_subject=""):
"""Route newsletter items to their appropriate destinations.

Routing based on relevance:
- high relevance events/reminders/actions  Google Calendar
- low relevance items of any type  Telegram Info Digest (no calendar)
- info items  Telegram Info Digest (regardless of relevance)
- rejected items (per family.yaml rules)  skipped with note

Returns a dict with routed items and any errors.
"""
result = {
    "events_created": [],
    "reminders_created": [],
    "actions_created": [],
    "info_items": [],      # high-relevance info + low-relevance items of any type
    "low_relevance_items": [],  # explicit low-relevance tracking
    "rejected_items": [],  # items filtered by rejection rules
    "auto_filtered": [],      # items shadow-filtered to low relevance
    "errors": [],
}

# Shadow-filter rejected items — they stay in the list but get flagged
# as low_relevance with a note about which rule caught them.
# The user still sees what was filtered in the digest and can restore items.
from .rejection_engine import shadow_filter_items, should_reject
items = shadow_filter_items(items, scope="newsletter")

# Collect auto-filtered items for notification
for item in items:
    if item.get("_auto_filtered"):
        who_str = ", ".join(item.get("who", []))
        result["auto_filtered"].append({
            "type": item.get("original_type", "?"),
            "summary": item.get("summary", ""),
            "who": who_str,
            "rule": item.get("reason", ""),
        })
        print(f"    Auto-filtered: {item.get('summary', '?')} → low relevance", file=sys.stderr)

for item in items:
    item_type = item.get("type", "info")
    relevance = item.get("relevance", "high")
    reason = item.get("reason", "No reason provided")

    # LOW RELEVANCE → always goes to Telegram Info Digest, never calendar
    if relevance == "low":
        who_str = ", ".join(item.get("who", []))
        result["low_relevance_items"].append({
            "type": item_type,
            "summary": item.get("summary", ""),
            "who": who_str,
            "description": item.get("description", ""),
            "reason": reason,
        })
        # Also add to info_items for Telegram summary
        result["info_items"].append({
            "summary": f"[{item_type}] {item['summary']}",
            "who": who_str,
            "description": f"{item.get('description', '')} (low relevance: {reason})",
        })
        continue

    # INFO type → Telegram Info Digest regardless of relevance
    if item_type == "info":
        who_str = ", ".join(item.get("who", []))
        result["info_items"].append({
            "summary": item["summary"],
            "who": who_str,
            "description": item.get("description", ""),
        })
        continue

    # HIGH RELEVANCE items → route to calendar
    if item_type == "event":
        # Same path as appointments — create on calendar with dedup
        start_dt = item.get("start")
        if not start_dt:
            result["errors"].append(f"Event missing start: {item.get('summary', '?')}")
            continue

        end_dt = item.get("end")
        duration_minutes = item.get("duration_minutes", 60)
        if not end_dt:
            end_dt = start_dt + timedelta(minutes=duration_minutes)

        who_str = ", ".join(item.get("who", []))
        summary = item["summary"]
        if who_str:
            summary = f"{summary} ({who_str})"

        if dry_run:
            result["events_created"].append({
                "status": "DRY_RUN",
                "summary": summary,
                "start": start_dt.isoformat(),
                "end": end_dt.isoformat(),
                "location": _resolve_location(item.get("location", "")),
            })
            continue

        existing = event_exists(summary, start_dt)
        if existing:
            result["events_created"].append({
                "status": "DUPLICATE_SKIPPED",
                "summary": existing.get("summary", summary),
                "start": existing["start"].get("dateTime", start_dt.isoformat()),
                "end": existing["end"].get("dateTime", end_dt.isoformat()),
                "existing_id": existing["id"],
            })
            print(f"    Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr)
            continue

        try:
            # Check if this is a recurring event
            is_recurring = item.get("is_recurring", False)
            recurrence = item.get("recurrence") if is_recurring else None

            if is_recurring and recurrence:
                event = create_recurring_event(
                    summary=summary,
                    start_dt=start_dt,
                    end_dt=end_dt,
                    recurrence=recurrence,
                    description=item.get("description", ""),
                    location=_resolve_location(item.get("location", "")),
                )
            else:
                event = create_event(
                    summary=summary,
                    start_dt=start_dt,
                    end_dt=end_dt,
                    description=item.get("description", ""),
                    location=_resolve_location(item.get("location", "")),
                )
            result["events_created"].append({
                "status": "CREATED",
                "id": event["id"],
                "summary": event.get("summary", summary),
                "start": event["start"].get("dateTime", start_dt.isoformat()),
                "end": event["end"].get("dateTime", end_dt.isoformat()),
                "link": event.get("htmlLink", ""),
                "recurring": is_recurring,
                "location": _resolve_location(item.get("location", "")),
                "_claimed_day_of_week": item.get("claimed_day_of_week", ""),
            })
            # Immediate per-event success notification
            if notify and not dry_run:
                hermes_notify(result["events_created"][-1], email_subject=email_subject)
        except Exception as e:
            result["errors"].append(f"Calendar create error: {e}")

    elif item_type == "reminder":
        # All-day event with [REMINDER] prefix
        due = item.get("due")
        who_str = ", ".join(item.get("who", []))
        summary = f"[REMINDER] {item['summary']}"
        if who_str:
            summary += f" ({who_str})"
        if due:
            due_str = due.isoformat() if isinstance(due, date) else str(due)
        else:
            due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d")

        description = item.get("description", "")
        if due:
            description = f"Due: {due_str}\n" + description

        if dry_run:
            result["reminders_created"].append({
                "status": "DRY_RUN",
                "summary": summary,
                "date": due_str,
            })
            continue

        try:
            # Create as all-day event (date only, no time)
            event = create_event(
                summary=summary,
                start_dt=due_str,  # date string for all-day event
                end_dt=due_str,
                description=description,
            )
            result["reminders_created"].append({
                "status": "CREATED",
                "id": event["id"],
                "summary": event.get("summary", summary),
                "date": due_str,
            })
        except Exception as e:
            result["errors"].append(f"Reminder create error: {e}")

    elif item_type == "action_item":
        # All-day event with [ACTION] prefix
        due = item.get("due")
        who_str = ", ".join(item.get("who", []))
        summary = f"[ACTION] {item['summary']}"
        if who_str:
            summary += f" ({who_str})"
        if due:
            due_str = due.isoformat() if isinstance(due, date) else str(due)
        else:
            due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d")

        description = item.get("description", "")
        if due:
            description = f"Due: {due_str}\n" + description

        if dry_run:
            result["actions_created"].append({
                "status": "DRY_RUN",
                "summary": summary,
                "date": due_str,
                "description": description,
            })
            continue

        try:
            event = create_event(
                summary=summary,
                start_dt=due_str,
                end_dt=due_str,
                description=description,
            )
            result["actions_created"].append({
                "status": "CREATED",
                "id": event["id"],
                "summary": event.get("summary", summary),
                "date": due_str,
                "description": description,
            })
        except Exception as e:
            result["errors"].append(f"Action create error: {e}")


return result

def format_info_summary(info_items, low_relevance_items=None, source_subject=""):
"""Format info items into a Telegram-friendly summary string."""
if not info_items and not low_relevance_items:
return ""
lines = [f"📋 Newsletter Summary{': ' + source_subject if source_subject else ''}"]
for item in info_items:
who = f" ({item['who']})" if item.get('who') else ""
lines.append(f" • {item['summary']}{who}: {item['description']}")
if low_relevance_items:
lines.append("\n 📌 Low relevance (not added to calendar):")
for item in low_relevance_items:
who = f" ({item['who']})" if item.get('who') else ""
lines.append(f" • [{item.get('type', '?')}] {item['summary']}{who} — {item.get('reason', '')}")
return "\n".join(lines)

def process_emails(dry_run=False, notify=True, quiet=False):
"""Fetch unread emails, parse appointments via LLM, create calendar events.

All fetched emails are marked as read after processing, regardless of
whether they contained appointments. This prevents re-processing the same
emails on subsequent heartbeat runs.

Includes auth failure circuit breaker: after 3 consecutive Gmail auth
failures, the IMAP pipeline is paused to prevent error spam.

Args:
    dry_run: If True, don't create calendar events or mark emails read.
    notify: If True, push results to Telegram via Hermes.
    quiet: If True, suppress family group notifications (only DM alerts).
"""
# Check circuit breaker first — skip IMAP if paused
if is_imap_paused():
    return {
        "emails_scanned": 0,
        "appointments_found": [],
        "cancellations": [],
        "events_created": [],
        "newsletter_items": [],
        "info_summary": "",
        "low_relevance_items": [],
        "errors": ["IMAP_PAUSED: Gmail auth circuit breaker is tripped. Run 'family-assistant reset-circuit' to re-enable."],
    }

# Skip IMAP entirely if no credentials configured (webhook-only mode)
if not GMAIL_APP_PASSWORD:
    return {
        "emails_scanned": 0,
        "appointments_found": [],
        "cancellations": [],
        "events_created": [],
        "newsletter_items": [],
        "info_summary": "",
        "low_relevance_items": [],
        "errors": ["IMAP_SKIPPED: No Gmail credentials (webhook-only mode)"],
    }

try:
    result = _process_emails_inner(dry_run=dry_run, notify=notify, quiet=quiet)
    # Success — reset circuit breaker
    _record_auth_success()
    return result
except Exception as e:
    # Check if this is an auth failure
    error_str = str(e).lower()
    is_auth_failure = any(
        sig in error_str
        for sig in ["authentication", "invalid_grant", "credentials",
                    "auth failed", "login failed", "[auth]",
                    "[alert]", "suspen", "disabled", "access denied"]
    )

    if is_auth_failure:
        just_tripped = _record_auth_failure(str(e)[:200])
        # Whether we just tripped or are still counting — return silently
        return {
            "emails_scanned": 0,
            "appointments_found": [],
            "cancellations": [],
            "events_created": [],
            "newsletter_items": [],
            "info_summary": "",
            "low_relevance_items": [],
            "errors": [
                "IMAP_PAUSED: circuit breaker tripped" if is_imap_paused()
                else "IMAP_AUTH_FAILED: Gmail auth failed (circuit breaker counting)"
            ],
        }

    # Non-auth failure or first-trip alert — send stack trace
    tb = traceback.format_exc()
    _alert_admin(
        "Pipeline Error",
        f"process_emails() crashed:\n\n```\n{tb[:600]}\n```",
    )
    return {
        "emails_scanned": 0,
        "appointments_found": [],
        "cancellations": [],
        "events_created": [],
        "newsletter_items": [],
        "info_summary": "",
        "low_relevance_items": [],
        "errors": [f"UNHANDLED: {e}"],
    }

def _process_emails_inner(dry_run=False, notify=True, quiet=False):
"""Internal implementation — wrapped by process_emails() error handler."""
result = {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [],
"payment_alerts": [],
}

# Fetch and mark as read in one pass — no second connection needed
mail_data = fetch_unread(mark_read=not dry_run)
if "error" in mail_data:
    result["errors"].append(mail_data["error"])
    return result

emails = mail_data.get("emails", [])
result["emails_scanned"] = len(emails)

if not emails:
    return result

for em in emails:
    subject = em.get("subject", "")
    body = em.get("body", "")
    from_addr = em.get("from", "")
    date_str = em.get("date", "")

    print(f"  Processing: {subject[:60]}...", file=sys.stderr)

    # Step 1: Triage — classify email type
    email_class = _triage_email(subject, body, from_addr)
    print(f"    Classified as: {email_class}", file=sys.stderr)

    if email_class == "payment_alert":
        # Extract payment alert details via LLM
        alert_data = parse_payment_alert_with_llm(subject, body, from_addr, date_str)
        if alert_data:
            print(f"    Payment alert: {alert_data.get('merchant', '?')} — {alert_data.get('alert_type', '?')}", file=sys.stderr)
            result["payment_alerts"].append(alert_data)

            # IMMEDIATE notification — don't wait for batch
            if notify and not dry_run:
                try:
                    msg = format_payment_alert(alert_data)
                    _send_telegram(msg)
                except Exception as e:
                    result["errors"].append(f"Payment alert notification error: {e}")
        else:
            print(f"    LLM did not confirm payment alert, skipping", file=sys.stderr)

        # Ingest into Family Brain for RAG retrieval
        if not dry_run:
            try:
                ingest_email(
                    subject=subject, body=body, from_addr=from_addr,
                    email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                    parsed_items=[{"type": "payment_alert", "merchant": alert_data.get("merchant", "")} if alert_data else {}],
                )
                print(f"    Ingested payment alert into Family Brain", file=sys.stderr)
            except Exception as e:
                print(f"    Family Brain ingestion error: {e}", file=sys.stderr)

        continue

    if email_class == "newsletter":
        # Enrich body with URL content (e.g. Smore links)
        enriched_body = enrich_body_with_urls(body)

        # Parse as newsletter with multi-type extraction
        items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
        if not items:
            print(f"    No items extracted from newsletter", file=sys.stderr)
            continue

        result["newsletter_items"].append({
            "email_from": from_addr,
            "email_subject": subject,
            "items_count": len(items),
            "item_types": [i.get("type", "info") for i in items],
        })

        # Route each item type to its destination
        routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject)
        result["events_created"].extend(routed["events_created"])
        result["errors"].extend(routed["errors"])

        # Collect reminders/actions/info for reporting
        if routed["reminders_created"]:
            result.setdefault("reminders_created", []).extend(routed["reminders_created"])
        if routed["actions_created"]:
            result.setdefault("actions_created", []).extend(routed["actions_created"])
        if routed.get("low_relevance_items"):
            result["low_relevance_items"].extend(routed["low_relevance_items"])

        # Build info summary for Telegram push
        info_items = routed.get("info_items", [])
        low_items = routed.get("low_relevance_items", [])
        if info_items or low_items:
            info_block = format_info_summary(
                info_items,
                low_relevance_items=low_items,
                source_subject=subject,
            )
            if result["info_summary"]:
                result["info_summary"] += "\n\n"
            result["info_summary"] += info_block

        # Ingest into Family Brain for RAG retrieval
        if not dry_run:
            try:
                ingest_newsletter(
                    subject=subject,
                    body=enriched_body,
                    from_addr=from_addr,
                    email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                    items=items,
                )
                print(f"    Ingested newsletter into Family Brain", file=sys.stderr)
            except Exception as e:
                print(f"    Family Brain ingestion error: {e}", file=sys.stderr)

        continue

    # Step 2: Parse as appointment (existing path)
    parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)

    for parsed in parsed_list:
        serialized = _serialize_result(parsed)
        if parsed["type"] == "cancellation":
            # Act on cancellation: find and delete the matching event
            cancel_summary = parsed.get("summary", "")
            cancel_start = parsed.get("start")
            cancel_result = None
            if cancel_start:
                if dry_run:
                    cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary, "start": cancel_start.isoformat()}
                    print(f"    [DRY RUN] Would cancel: {cancel_summary} at {cancel_start.isoformat()}", file=sys.stderr)
                else:
                    cancel_result = find_and_cancel_event(cancel_summary, cancel_start)
            if cancel_result is None:
                cancel_result = {
                    "status": "CANCELLATION_NO_MATCH",
                    "summary": cancel_summary,
                    "start": cancel_start.isoformat() if cancel_start else None,
                }
                print(f"    No matching event found for cancellation: {cancel_summary}", file=sys.stderr)
            result["cancellations"].append({
                "email_from": em["from"],
                "email_subject": em["subject"],
                "parsed": serialized,
                "action": cancel_result,
            })
            continue

        # It's an appointment
        result["appointments_found"].append({
            "email_from": em["from"],
            "email_subject": em["subject"],
            "parsed": serialized,
        })

        start_dt = parsed.get("start")
        if not start_dt:
            result["errors"].append(
                f"Could not resolve datetime for: {parsed['summary']}"
            )
            continue

        end_dt = parsed.get("end")
        if not end_dt:
            end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"])

        who_str = ", ".join(parsed["who"]) if parsed["who"] else ""
        summary = parsed["summary"]
        if who_str:
            summary = f"{summary} ({who_str})"

        if dry_run:
            is_recurring = parsed.get("is_recurring", False)
            recurrence = parsed.get("recurrence") if is_recurring else None
            result["events_created"].append({
                "status": "DRY_RUN",
                "summary": summary,
                "start": start_dt.isoformat(),
                "end": end_dt.isoformat(),
                "location": _resolve_location(parsed["location"]),
                "recurring": is_recurring,
                "recurrence": recurrence,
            })
        else:
            # Duplicate check: skip if event already exists on calendar
            existing = event_exists(summary, start_dt)
            if existing:
                result["events_created"].append({
                    "status": "DUPLICATE_SKIPPED",
                    "summary": existing.get("summary", summary),
                    "start": existing["start"].get("dateTime", start_dt.isoformat()),
                    "end": existing["end"].get("dateTime", end_dt.isoformat()),
                    "existing_id": existing["id"],
                })
                print(f"    Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr)
                continue

            try:
                # Check if this is a recurring event
                is_recurring = parsed.get("is_recurring", False)
                recurrence = parsed.get("recurrence") if is_recurring else None

                if is_recurring and recurrence:
                    event = create_recurring_event(
                        summary=summary,
                        start_dt=start_dt,
                        end_dt=end_dt,
                        recurrence=recurrence,
                        description=parsed["description"],
                        location=_resolve_location(parsed["location"]),
                    )
                else:
                    event = create_event(
                        summary=summary,
                        start_dt=start_dt,
                        end_dt=end_dt,
                        description=parsed["description"],
                        location=_resolve_location(parsed["location"]),
                    )
                result["events_created"].append({
                    "status": "CREATED",
                    "id": event["id"],
                    "summary": event.get("summary", summary),
                    "start": event["start"].get("dateTime", start_dt.isoformat()),
                    "end": event["end"].get("dateTime", end_dt.isoformat()),
                    "link": event.get("htmlLink", ""),
                    "recurring": is_recurring,
                })
            except Exception as e:
                result["errors"].append(f"Calendar create error: {e}")

        # Ingest appointment email into Family Brain
        if not dry_run:
            try:
                ingest_email(
                    subject=subject,
                    body=body,
                    from_addr=from_addr,
                    email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                    parsed_items=parsed_list,
                )
                print(f"    Ingested appointment email into Family Brain", file=sys.stderr)
            except Exception as e:
                print(f"    Family Brain ingestion error: {e}", file=sys.stderr)

# Conflict detection: check newly created events against the calendar
created_events = [e for e in result["events_created"] if e.get("status") == "CREATED" and e.get("id")]
if created_events and not dry_run:
    try:
        conflicts = detect_conflicts(events=[{"id": e["id"], "summary": e.get("summary", ""), "start": e.get("start", ""), "end": e.get("end", "")} for e in created_events])
        result["conflicts"] = conflicts
        if conflicts:
            print(f"  ⚠️  {len(conflicts)} scheduling conflict(s) detected", file=sys.stderr)
            for c in conflicts:
                print(f"    {c['event1']['summary']} conflicts with {c['event2']['summary']} ({c['overlap_minutes']}min overlap)", file=sys.stderr)
    except Exception as e:
        result["conflicts"] = []
        result["errors"].append(f"Conflict detection error: {e}")
else:
    result["conflicts"] = []

# Push results to Telegram via Hermes
if notify and not dry_run:
    try:
        hermes_stats = push_pipeline_results(result, quiet=quiet)
        result["hermes"] = hermes_stats
    except Exception as e:
        result["errors"].append(f"Hermes notification error: {e}")

return result

def process_webhook_email(email_data: dict, dry_run=False, notify=True):
"""Process a single email received via the Cloudflare webhook.

This is the push-pipeline path: email arrives via the webhook address,
Cloudflare Worker POSTs to hook.hoffdesk.com/webhook, which calls this.

Reuses the same extraction and routing logic as process_emails(),
but skips the IMAP fetch entirely.

Args:
    email_data: Dict with keys: from, to, subject, date, body_text,
                body_html, has_attachments, attachments, dedup_key,
                message_id, source ('webhook')
    dry_run: If True, don't create calendar events.
    notify: If True, push results to Telegram via Hermes.

Returns:
    Dict with processing results.
"""
try:
    return _process_webhook_email_inner(email_data, dry_run=dry_run, notify=notify)
except Exception as e:
    tb = traceback.format_exc()
    _alert_admin(
        "Webhook Pipeline Error",
        f"process_webhook_email() crashed:\n\n```\n{tb[:600]}\n```",
    )
    return {
        "emails_scanned": 1,
        "appointments_found": [],
        "cancellations": [],
        "events_created": [],
        "newsletter_items": [],
        "info_summary": "",
        "low_relevance_items": [],
        "payment_alerts": [],
        "errors": [f"UNHANDLED: {e}"],
    }

def _process_webhook_email_inner(email_data: dict, dry_run=False, notify=True):
"""Internal implementation — wrapped by process_webhook_email() error handler."""
result = {
"emails_scanned": 1,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [],
"payment_alerts": [],
}

subject = email_data.get("subject", "")
body = email_data.get("body_text", "")
from_addr = email_data.get("from", "")
date_str = email_data.get("date", "")

print(f"  [Webhook] Processing: {subject[:60]}...", file=sys.stderr)

# Step 1: Triage
email_class = _triage_email(subject, body, from_addr)
print(f"    Classified as: {email_class}", file=sys.stderr)

if email_class == "payment_alert":
    # Extract payment alert details via LLM
    alert_data = parse_payment_alert_with_llm(subject, body, from_addr, date_str)
    if alert_data:
        print(f"    Payment alert: {alert_data.get('merchant', '?')} — {alert_data.get('alert_type', '?')}", file=sys.stderr)
        result["payment_alerts"].append(alert_data)

        # IMMEDIATE notification
        if notify and not dry_run:
            try:
                msg = format_payment_alert(alert_data)
                _send_telegram(msg)
            except Exception as e:
                result["errors"].append(f"Payment alert notification error: {e}")
    else:
        print(f"    LLM did not confirm payment alert, skipping", file=sys.stderr)

    # Ingest into Family Brain
    if not dry_run:
        try:
            ingest_email(
                subject=subject, body=body, from_addr=from_addr,
                email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                parsed_items=[{"type": "payment_alert", "merchant": alert_data.get("merchant", "")} if alert_data else {}],
            )
            print(f"    Ingested payment alert into Family Brain", file=sys.stderr)
        except Exception as e:
            print(f"    Family Brain ingestion error: {e}", file=sys.stderr)

    return _finalize_result(result, dry_run, notify)

if email_class == "newsletter":
    enriched_body = enrich_body_with_urls(body)
    items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
    if not items:
        print(f"    No items extracted from newsletter", file=sys.stderr)
        return result

    result["newsletter_items"].append({
        "email_from": from_addr,
        "email_subject": subject,
        "items_count": len(items),
        "item_types": [i.get("type", "info") for i in items],
    })

    routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject)
    result["events_created"].extend(routed["events_created"])
    result["errors"].extend(routed["errors"])

    if routed.get("reminders_created"):
        result.setdefault("reminders_created", []).extend(routed["reminders_created"])
    if routed.get("actions_created"):
        result.setdefault("actions_created", []).extend(routed["actions_created"])
    if routed.get("low_relevance_items"):
        result["low_relevance_items"].extend(routed["low_relevance_items"])

    info_items = routed.get("info_items", [])
    low_items = routed.get("low_relevance_items", [])
    if info_items or low_items:
        result["info_summary"] = format_info_summary(
            info_items, low_relevance_items=low_items, source_subject=subject,
        )

    if not dry_run:
        try:
            ingest_newsletter(
                subject=subject, body=enriched_body, from_addr=from_addr,
                email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                items=items,
            )
            print(f"    Ingested newsletter into Family Brain", file=sys.stderr)
        except Exception as e:
            print(f"    Family Brain ingestion error: {e}", file=sys.stderr)

    return _finalize_result(result, dry_run, notify)

# Step 2: Parse as appointment
parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)

for parsed in parsed_list:
    serialized = _serialize_result(parsed)

    if parsed["type"] == "cancellation":
        cancel_summary = parsed.get("summary", "")
        cancel_start = parsed.get("start")
        cancel_result = None
        if cancel_start:
            if dry_run:
                cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary,
                                 "start": cancel_start.isoformat()}
            else:
                cancel_result = find_and_cancel_event(cancel_summary, cancel_start)
        if cancel_result is None:
            cancel_result = {"status": "CANCELLATION_NO_MATCH", "summary": cancel_summary,
                             "start": cancel_start.isoformat() if cancel_start else None}
        result["cancellations"].append({
            "email_from": from_addr, "email_subject": subject,
            "parsed": serialized, "action": cancel_result,
        })
        continue

    result["appointments_found"].append({
        "email_from": from_addr, "email_subject": subject, "parsed": serialized,
    })

    start_dt = parsed.get("start")
    if not start_dt:
        result["errors"].append(f"Could not resolve datetime for: {parsed['summary']}")
        continue

    end_dt = parsed.get("end")
    if not end_dt:
        end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"])

    who_str = ", ".join(parsed["who"]) if parsed["who"] else ""
    summary = parsed["summary"]
    if who_str:
        summary = f"{summary} ({who_str})"

    if dry_run:
        is_recurring = parsed.get("is_recurring", False)
        recurrence = parsed.get("recurrence") if is_recurring else None
        result["events_created"].append({
            "status": "DRY_RUN", "summary": summary,
            "start": start_dt.isoformat(), "end": end_dt.isoformat(),
            "location": _resolve_location(parsed["location"]), "recurring": is_recurring,
            "recurrence": recurrence,
        })
    else:
        existing = event_exists(summary, start_dt)
        if existing:
            result["events_created"].append({
                "status": "DUPLICATE_SKIPPED",
                "summary": existing.get("summary", summary),
                "start": existing["start"].get("dateTime", start_dt.isoformat()),
                "end": existing["end"].get("dateTime", end_dt.isoformat()),
                "existing_id": existing.get("id"),
            })
            print(f"    Skipping duplicate: {summary}", file=sys.stderr)
            continue

        try:
            is_recurring = parsed.get("is_recurring", False)
            recurrence = parsed.get("recurrence") if is_recurring else None

            if is_recurring and recurrence:
                event = create_recurring_event(
                    summary=summary, start_dt=start_dt, end_dt=end_dt,
                    recurrence=recurrence, description=parsed["description"],
                    location=_resolve_location(parsed["location"]),
                )
            else:
                event = create_event(
                    summary=summary, start_dt=start_dt, end_dt=end_dt,
                    description=parsed["description"], location=_resolve_location(parsed["location"]),
                )
            result["events_created"].append({
                "status": "CREATED", "id": event["id"],
                "summary": event.get("summary", summary),
                "start": event["start"].get("dateTime", start_dt.isoformat()),
                "end": event["end"].get("dateTime", end_dt.isoformat()),
                "link": event.get("htmlLink", ""), "recurring": is_recurring,
                "location": _resolve_location(parsed.get("location", "")),
                "_claimed_day_of_week": parsed.get("claimed_day_of_week", ""),
            })
            # Immediate per-event success notification
            if notify and not dry_run:
                hermes_notify(result["events_created"][-1], email_subject=subject)
        except Exception as e:
            result["errors"].append(f"Calendar create error: {e}")

    # Ingest into Family Brain
    if not dry_run:
        try:
            ingest_email(
                subject=subject, body=body, from_addr=from_addr,
                email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                parsed_items=parsed_list,
            )
            print(f"    Ingested appointment email into Family Brain", file=sys.stderr)
        except Exception as e:
            print(f"    Family Brain ingestion error: {e}", file=sys.stderr)

return _finalize_result(result, dry_run, notify)

def _finalize_result(result, dry_run, notify):
"""Shared finalization: conflict detection + Hermes push."""
created_events = [e for e in result["events_created"]
if e.get("status") == "CREATED" and e.get("id")]
if created_events and not dry_run:
try:
conflicts = detect_conflicts(
events=[{"id": e["id"], "summary": e.get("summary", ""),
"start": e.get("start", ""), "end": e.get("end", "")}
for e in created_events]
)
result["conflicts"] = conflicts
except Exception as e:
result["conflicts"] = []
result["errors"].append(f"Conflict detection error: {e}")
else:
result["conflicts"] = []

if notify and not dry_run:
    try:
        hermes_stats = push_pipeline_results(result)
        result["hermes"] = hermes_stats
    except Exception as e:
        result["errors"].append(f"Hermes notification error: {e}")

return result

def backfill_brain(days: int = 30) -> dict:
"""Backfill the Family Brain with emails from the last N days.

Re-processes already-seen emails purely for ChromaDB ingestion.
Does NOT create calendar events or send notifications.

Args:
    days: Number of days to look back

Returns:
    Dict with ingestion stats.
"""
try:
    return _backfill_brain_inner(days=days)
except Exception as e:
    tb = traceback.format_exc()
    _alert_admin(
        "Backfill Error",
        f"backfill_brain() crashed:\n\n```\n{tb[:600]}\n```",
    )
    return {
        "emails_scanned": 0,
        "ingested_emails": 0,
        "ingested_newsletters": 0,
        "errors": [f"UNHANDLED: {e}"],
        "skipped": 0,
    }

def _backfill_brain_inner(days: int = 30) -> dict:
"""Internal implementation — wrapped by backfill_brain() error handler."""
from family_assistant.email_fetcher import fetch_since
from family_assistant.family_brain import ingest_email, ingest_newsletter
from family_assistant.newsletter_parser import classify_email, parse_newsletter_with_llm
from family_assistant.appointment_parser import parse_email_with_llm

# IMAP SINCE format: DD-Mon-YYYY (e.g. 18-Mar-2026)
since_dt = datetime.now(CHICAGO_TZ) - timedelta(days=days)
imap_date = since_dt.strftime("%d-%b-%Y")

print(f"Backfilling Family Brain: emails since {imap_date} ({days} days)", file=sys.stderr)

result = {
    "emails_scanned": 0,
    "ingested_emails": 0,
    "ingested_newsletters": 0,
    "errors": [],
    "skipped": 0,
}

mail_data = fetch_since(imap_date)
if "error" in mail_data:
    result["errors"].append(mail_data["error"])
    return result

emails = mail_data.get("emails", [])
result["emails_scanned"] = len(emails)

if not emails:
    print("  No emails found in date range", file=sys.stderr)
    return result

for em in emails:
    subject = em.get("subject", "")
    body = em.get("body", "")
    from_addr = em.get("from", "")
    date_str = em.get("date", "")

    print(f"  Processing: {subject[:60]}...", file=sys.stderr)

    # Classify
    email_class = _triage_email(subject, body, from_addr)
    print(f"    Classified as: {email_class}", file=sys.stderr)

    try:
        if email_class == "newsletter":
            enriched_body = enrich_body_with_urls(body)
            items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
            ingest_newsletter(
                subject=subject,
                body=enriched_body,
                from_addr=from_addr,
                email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                items=items or [],
            )
            result["ingested_newsletters"] += 1
            print(f"    ✅ Ingested newsletter", file=sys.stderr)

        else:
            # Appointment email
            parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)
            # Serialize datetimes before passing to ingest
            safe_parsed = []
            for p in (parsed_list or []):
                item = {}
                for k, v in p.items():
                    if isinstance(v, datetime):
                        item[k] = v.isoformat()
                    elif isinstance(v, date) and not isinstance(v, datetime):
                        item[k] = v.isoformat()
                    else:
                        item[k] = v
                safe_parsed.append(item)
            ingest_email(
                subject=subject,
                body=body,
                from_addr=from_addr,
                email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
                parsed_items=safe_parsed,
            )
            result["ingested_emails"] += 1
            print(f"    ✅ Ingested email", file=sys.stderr)
    except Exception as e:
        result["errors"].append(f"{subject[:40]}: {e}")
        print(f"    ❌ Error: {e}", file=sys.stderr)

print(
    f"\nBackfill complete: {result['ingested_emails']} emails + "
    f"{result['ingested_newsletters']} newsletters ingested",
    file=sys.stderr,
)
return result