"""Pipeline orchestration: fetch emails → classify → parse → route to calendar/reminder/info.""" import json import os import sys import traceback from datetime import datetime, timedelta, date from zoneinfo import ZoneInfo from family_assistant.config import CHICAGO_TZ, GMAIL_APP_PASSWORD from family_assistant.email_fetcher import fetch_unread from family_assistant.appointment_parser import parse_email_with_llm from family_assistant.newsletter_parser import classify_email, parse_newsletter_with_llm from family_assistant.family_brain import ingest_email, ingest_newsletter from family_assistant.calendar_sync import ( event_exists, create_event, create_recurring_event, find_and_cancel_event, ) from family_assistant.conflict_engine import detect_conflicts from family_assistant.hermes import push_pipeline_results, hermes_notify, _send_telegram, format_payment_alert # --------------------------------------------------------------------------- # Payment Alert Detection # --------------------------------------------------------------------------- # Fast keyword pre-scan: if ANY of these appear in subject or body, # the LLM classifier is strongly biased toward payment_alert. _PAYMENT_ALERT_KEYWORDS = ( "payment failed", "card declined", "declined", # catch "payment declined" or "transaction declined" "transaction failed", "card expired", "update your payment", "payment method", "subscription suspended", "account will be canceled", "your payment could not be processed", "payment was unsuccessful", "unpaid invoice", "past due", "overdue payment", ) def _scan_payment_alert(subject, body): """Quick keyword scan for payment alert signals. Returns True if any payment alert keyword is found in subject or body. This runs BEFORE the LLM classification to provide a fast signal. """ text = f"{subject} {body}".lower() return any(kw in text for kw in _PAYMENT_ALERT_KEYWORDS) def parse_payment_alert_with_llm(subject, body, from_addr="", date_str=""): """Extract payment alert details from an email using the LLM. Returns a dict with merchant, alert_type, amount, deadline, action_needed, summary — or None if not a real payment alert. """ from family_assistant.config import LLM_URL, LLM_MODEL, LLM_TIMEOUT, MAX_BODY_CHARS, load_prompts import requests as _requests prompts = load_prompts() system_template = prompts.get("payment_alert_extract", "") if not system_template: # Fallback if prompt not found print(" [PaymentAlert] payment_alert_extract prompt not found", file=sys.stderr) return None today = datetime.now(CHICAGO_TZ) today_str = today.strftime("%Y-%m-%d") system_msg = system_template.format(today=today_str) trimmed_body = body[:MAX_BODY_CHARS] if body else "" user_msg = f"Subject: {subject}\nFrom: {from_addr}\nDate: {date_str}\n\n{trimmed_body}" payload = { "model": LLM_MODEL, "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": user_msg}, ], "temperature": 0, } try: resp = _requests.post(LLM_URL, json=payload, timeout=LLM_TIMEOUT) resp.raise_for_status() data = resp.json() raw = data["choices"][0]["message"]["content"].strip() except Exception as e: print(f" [PaymentAlert] LLM error: {e}", file=sys.stderr) return None # Parse JSON from LLM response if not raw: return None raw = raw.strip() if raw.startswith("```"): import re raw = re.sub(r'^```(?:json)?\s*\n?', '', raw) raw = re.sub(r'\n?```\s*$', '', raw) raw = raw.strip() try: parsed = json.loads(raw) except json.JSONDecodeError: print(f" [PaymentAlert] JSON parse error", file=sys.stderr) return None if not isinstance(parsed, dict) or parsed.get("not_a_payment_alert"): return None # Validate required fields if not parsed.get("merchant"): parsed["merchant"] = "Unknown Service" if not parsed.get("alert_type"): parsed["alert_type"] = "payment_failed" if not parsed.get("summary"): merchant = parsed.get("merchant", "") amount = parsed.get("amount", "") parsed["summary"] = f"{merchant} payment failed" + (f" ({amount})" if amount else "") return parsed # --------------------------------------------------------------------------- # Auth Failure Circuit Breaker # --------------------------------------------------------------------------- # Tracks consecutive auth failures to prevent error spam. # After MAX_AUTH_FAILURES consecutive failures, the IMAP pipeline is paused # and Matt is alerted once. Resets on first success or manual reset. _AUTH_FAILURE_FILE = os.path.expanduser("~/.family_assistant/auth_circuit_breaker.json") MAX_AUTH_FAILURES = 3 # Pause after this many consecutive failures def _read_circuit_breaker(): """Read circuit breaker state from disk.""" try: with open(_AUTH_FAILURE_FILE, "r") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {"consecutive_failures": 0, "paused": False, "last_alert_ts": None} def _write_circuit_breaker(state): """Write circuit breaker state to disk.""" os.makedirs(os.path.dirname(_AUTH_FAILURE_FILE), exist_ok=True) with open(_AUTH_FAILURE_FILE, "w") as f: json.dump(state, f, indent=2) def _record_auth_failure(error_msg: str) -> bool: """Record an auth failure. Returns True if circuit breaker just tripped. After MAX_AUTH_FAILURES consecutive failures, sets paused=True and alerts Matt once. Subsequent calls return False (already paused). """ state = _read_circuit_breaker() state["consecutive_failures"] = state.get("consecutive_failures", 0) + 1 just_tripped = False if state["consecutive_failures"] >= MAX_AUTH_FAILURES and not state.get("paused"): state["paused"] = True just_tripped = True _write_circuit_breaker(state) if just_tripped: _alert_admin( "🔴 IMAP Circuit Breaker Tripped", f"Gmail auth failed {state['consecutive_failures']} times in a row. " f"IMAP pipeline is now PAUSED to stop error spam.\n\n" f"To reset: python -m family_assistant reset-circuit\n" f"Or: rm {_AUTH_FAILURE_FILE}", ) return just_tripped def _record_auth_success(): """Record a successful auth — resets the circuit breaker.""" state = _read_circuit_breaker() if state.get("consecutive_failures", 0) > 0 or state.get("paused"): was_paused = state.get("paused", False) state["consecutive_failures"] = 0 state["paused"] = False _write_circuit_breaker(state) if was_paused: _alert_admin( "🟢 IMAP Circuit Breaker Reset", "Gmail auth succeeded. IMAP pipeline is back online.", ) def is_imap_paused() -> bool: """Check if the IMAP pipeline is paused due to auth failures.""" return _read_circuit_breaker().get("paused", False) def reset_circuit_breaker(): """Manually reset the circuit breaker.""" state = {"consecutive_failures": 0, "paused": False, "last_alert_ts": None} _write_circuit_breaker(state) print("✅ Circuit breaker reset. IMAP pipeline re-enabled.") def _resolve_location(location_str): """Resolve a location string through the cache to get the canonical name. Prevents LLM hallucinations like 'at Golrusk' → 'PetSmart' by preferring the known-locations cache. Returns the canonical cached name if found, otherwise returns the original string unchanged. """ if not location_str or not location_str.strip(): return location_str try: from family_assistant.location_cache import resolve result = resolve(location_str.strip(), use_home_bias=True) if result and result.get("name"): return result["name"] except Exception: pass return location_str from family_assistant.url_fetcher import enrich_body_with_urls # --------------------------------------------------------------------------- # Global error handler — no silent deaths # --------------------------------------------------------------------------- _DEV_TARGET = os.environ.get("TELEGRAM_DEV_ID", "") def _alert_admin(title: str, detail: str): """Push an error alert to Matt's DM. Best-effort, never raises.""" if not _DEV_TARGET: print(f" [ALERT] {title}: {detail}", file=sys.stderr) return # Strip markdown special characters from detail to prevent Telegram parse failures. # Brackets, backticks, asterisks, underscores in error strings break Markdown mode. import re safe_detail = re.sub(r'[_*\[\]`]', '', detail[:800]) msg = f"⚠️ **{title}**\n\n{safe_detail}" try: _send_telegram(msg, target=_DEV_TARGET) except Exception: print(f" [ALERT] Failed to send admin alert: {title}", file=sys.stderr) def _serialize_result(parsed): """Convert datetime objects and other non-serializable types to strings.""" result = {} for key, value in parsed.items(): if isinstance(value, datetime): result[key] = value.isoformat() elif isinstance(value, date) and not isinstance(value, datetime): result[key] = value.isoformat() elif isinstance(value, list): result[key] = [ v.isoformat() if isinstance(v, (datetime, date)) else v for v in value ] elif isinstance(value, dict): result[key] = { k: v.isoformat() if isinstance(v, (datetime, date)) else v for k, v in value.items() } else: result[key] = value return result def _triage_email(subject, body, from_addr): """Classify an email as 'appointment', 'newsletter', or 'payment_alert'. Uses a fast keyword pre-scan for payment alerts, then the LLM classifier. If payment alert keywords are found, injects a hint into the classification prompt so the LLM knows to check for payment issues. Falls back to 'appointment' on errors (simpler parsing path). """ # Fast pre-scan: if payment alert keywords are present, add context is_payment_candidate = _scan_payment_alert(subject, body) try: result = classify_email(subject, body, from_addr, payment_hint=is_payment_candidate) return result except Exception as e: print(f" [Triage] Classification error: {e}, defaulting to appointment", file=sys.stderr) return "appointment" def _route_newsletter_items(items, dry_run=False, notify=True, email_subject=""): """Route newsletter items to their appropriate destinations. Routing based on relevance: - high relevance events/reminders/actions → Google Calendar - low relevance items of any type → Telegram Info Digest (no calendar) - info items → Telegram Info Digest (regardless of relevance) - rejected items (per family.yaml rules) → skipped with note Returns a dict with routed items and any errors. """ result = { "events_created": [], "reminders_created": [], "actions_created": [], "info_items": [], # high-relevance info + low-relevance items of any type "low_relevance_items": [], # explicit low-relevance tracking "rejected_items": [], # items filtered by rejection rules "auto_filtered": [], # items shadow-filtered to low relevance "errors": [], } # Shadow-filter rejected items — they stay in the list but get flagged # as low_relevance with a note about which rule caught them. # The user still sees what was filtered in the digest and can restore items. from .rejection_engine import shadow_filter_items, should_reject items = shadow_filter_items(items, scope="newsletter") # Collect auto-filtered items for notification for item in items: if item.get("_auto_filtered"): who_str = ", ".join(item.get("who", [])) result["auto_filtered"].append({ "type": item.get("original_type", "?"), "summary": item.get("summary", ""), "who": who_str, "rule": item.get("reason", ""), }) print(f" Auto-filtered: {item.get('summary', '?')} → low relevance", file=sys.stderr) for item in items: item_type = item.get("type", "info") relevance = item.get("relevance", "high") reason = item.get("reason", "No reason provided") # LOW RELEVANCE → always goes to Telegram Info Digest, never calendar if relevance == "low": who_str = ", ".join(item.get("who", [])) result["low_relevance_items"].append({ "type": item_type, "summary": item.get("summary", ""), "who": who_str, "description": item.get("description", ""), "reason": reason, }) # Also add to info_items for Telegram summary result["info_items"].append({ "summary": f"[{item_type}] {item['summary']}", "who": who_str, "description": f"{item.get('description', '')} (low relevance: {reason})", }) continue # INFO type → Telegram Info Digest regardless of relevance if item_type == "info": who_str = ", ".join(item.get("who", [])) result["info_items"].append({ "summary": item["summary"], "who": who_str, "description": item.get("description", ""), }) continue # HIGH RELEVANCE items → route to calendar if item_type == "event": # Same path as appointments — create on calendar with dedup start_dt = item.get("start") if not start_dt: result["errors"].append(f"Event missing start: {item.get('summary', '?')}") continue end_dt = item.get("end") duration_minutes = item.get("duration_minutes", 60) if not end_dt: end_dt = start_dt + timedelta(minutes=duration_minutes) who_str = ", ".join(item.get("who", [])) summary = item["summary"] if who_str: summary = f"{summary} ({who_str})" if dry_run: result["events_created"].append({ "status": "DRY_RUN", "summary": summary, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "location": _resolve_location(item.get("location", "")), }) continue existing = event_exists(summary, start_dt) if existing: result["events_created"].append({ "status": "DUPLICATE_SKIPPED", "summary": existing.get("summary", summary), "start": existing["start"].get("dateTime", start_dt.isoformat()), "end": existing["end"].get("dateTime", end_dt.isoformat()), "existing_id": existing["id"], }) print(f" Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr) continue try: # Check if this is a recurring event is_recurring = item.get("is_recurring", False) recurrence = item.get("recurrence") if is_recurring else None if is_recurring and recurrence: event = create_recurring_event( summary=summary, start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=item.get("description", ""), location=_resolve_location(item.get("location", "")), ) else: event = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=item.get("description", ""), location=_resolve_location(item.get("location", "")), ) result["events_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "start": event["start"].get("dateTime", start_dt.isoformat()), "end": event["end"].get("dateTime", end_dt.isoformat()), "link": event.get("htmlLink", ""), "recurring": is_recurring, "location": _resolve_location(item.get("location", "")), "_claimed_day_of_week": item.get("claimed_day_of_week", ""), }) # Immediate per-event success notification if notify and not dry_run: hermes_notify(result["events_created"][-1], email_subject=email_subject) except Exception as e: result["errors"].append(f"Calendar create error: {e}") elif item_type == "reminder": # All-day event with [REMINDER] prefix due = item.get("due") who_str = ", ".join(item.get("who", [])) summary = f"[REMINDER] {item['summary']}" if who_str: summary += f" ({who_str})" if due: due_str = due.isoformat() if isinstance(due, date) else str(due) else: due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d") description = item.get("description", "") if due: description = f"Due: {due_str}\n" + description if dry_run: result["reminders_created"].append({ "status": "DRY_RUN", "summary": summary, "date": due_str, }) continue try: # Create as all-day event (date only, no time) event = create_event( summary=summary, start_dt=due_str, # date string for all-day event end_dt=due_str, description=description, ) result["reminders_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "date": due_str, }) except Exception as e: result["errors"].append(f"Reminder create error: {e}") elif item_type == "action_item": # All-day event with [ACTION] prefix due = item.get("due") who_str = ", ".join(item.get("who", [])) summary = f"[ACTION] {item['summary']}" if who_str: summary += f" ({who_str})" if due: due_str = due.isoformat() if isinstance(due, date) else str(due) else: due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d") description = item.get("description", "") if due: description = f"Due: {due_str}\n" + description if dry_run: result["actions_created"].append({ "status": "DRY_RUN", "summary": summary, "date": due_str, "description": description, }) continue try: event = create_event( summary=summary, start_dt=due_str, end_dt=due_str, description=description, ) result["actions_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "date": due_str, "description": description, }) except Exception as e: result["errors"].append(f"Action create error: {e}") return result def format_info_summary(info_items, low_relevance_items=None, source_subject=""): """Format info items into a Telegram-friendly summary string.""" if not info_items and not low_relevance_items: return "" lines = [f"📋 **Newsletter Summary{': ' + source_subject if source_subject else ''}**"] for item in info_items: who = f" ({item['who']})" if item.get('who') else "" lines.append(f" • {item['summary']}{who}: {item['description']}") if low_relevance_items: lines.append("\n 📌 **Low relevance (not added to calendar):**") for item in low_relevance_items: who = f" ({item['who']})" if item.get('who') else "" lines.append(f" • [{item.get('type', '?')}] {item['summary']}{who} — {item.get('reason', '')}") return "\n".join(lines) def process_emails(dry_run=False, notify=True, quiet=False): """Fetch unread emails, parse appointments via LLM, create calendar events. All fetched emails are marked as read after processing, regardless of whether they contained appointments. This prevents re-processing the same emails on subsequent heartbeat runs. Includes auth failure circuit breaker: after 3 consecutive Gmail auth failures, the IMAP pipeline is paused to prevent error spam. Args: dry_run: If True, don't create calendar events or mark emails read. notify: If True, push results to Telegram via Hermes. quiet: If True, suppress family group notifications (only DM alerts). """ # Check circuit breaker first — skip IMAP if paused if is_imap_paused(): return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": ["IMAP_PAUSED: Gmail auth circuit breaker is tripped. Run 'family-assistant reset-circuit' to re-enable."], } # Skip IMAP entirely if no credentials configured (webhook-only mode) if not GMAIL_APP_PASSWORD: return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": ["IMAP_SKIPPED: No Gmail credentials (webhook-only mode)"], } try: result = _process_emails_inner(dry_run=dry_run, notify=notify, quiet=quiet) # Success — reset circuit breaker _record_auth_success() return result except Exception as e: # Check if this is an auth failure error_str = str(e).lower() is_auth_failure = any( sig in error_str for sig in ["authentication", "invalid_grant", "credentials", "auth failed", "login failed", "[auth]", "[alert]", "suspen", "disabled", "access denied"] ) if is_auth_failure: just_tripped = _record_auth_failure(str(e)[:200]) # Whether we just tripped or are still counting — return silently return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [ "IMAP_PAUSED: circuit breaker tripped" if is_imap_paused() else "IMAP_AUTH_FAILED: Gmail auth failed (circuit breaker counting)" ], } # Non-auth failure or first-trip alert — send stack trace tb = traceback.format_exc() _alert_admin( "Pipeline Error", f"process_emails() crashed:\n\n```\n{tb[:600]}\n```", ) return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [f"UNHANDLED: {e}"], } def _process_emails_inner(dry_run=False, notify=True, quiet=False): """Internal implementation — wrapped by process_emails() error handler.""" result = { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [], "payment_alerts": [], } # Fetch and mark as read in one pass — no second connection needed mail_data = fetch_unread(mark_read=not dry_run) if "error" in mail_data: result["errors"].append(mail_data["error"]) return result emails = mail_data.get("emails", []) result["emails_scanned"] = len(emails) if not emails: return result for em in emails: subject = em.get("subject", "") body = em.get("body", "") from_addr = em.get("from", "") date_str = em.get("date", "") print(f" Processing: {subject[:60]}...", file=sys.stderr) # Step 1: Triage — classify email type email_class = _triage_email(subject, body, from_addr) print(f" Classified as: {email_class}", file=sys.stderr) if email_class == "payment_alert": # Extract payment alert details via LLM alert_data = parse_payment_alert_with_llm(subject, body, from_addr, date_str) if alert_data: print(f" Payment alert: {alert_data.get('merchant', '?')} — {alert_data.get('alert_type', '?')}", file=sys.stderr) result["payment_alerts"].append(alert_data) # IMMEDIATE notification — don't wait for batch if notify and not dry_run: try: msg = format_payment_alert(alert_data) _send_telegram(msg) except Exception as e: result["errors"].append(f"Payment alert notification error: {e}") else: print(f" LLM did not confirm payment alert, skipping", file=sys.stderr) # Ingest into Family Brain for RAG retrieval if not dry_run: try: ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=[{"type": "payment_alert", "merchant": alert_data.get("merchant", "")} if alert_data else {}], ) print(f" Ingested payment alert into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) continue if email_class == "newsletter": # Enrich body with URL content (e.g. Smore links) enriched_body = enrich_body_with_urls(body) # Parse as newsletter with multi-type extraction items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str) if not items: print(f" No items extracted from newsletter", file=sys.stderr) continue result["newsletter_items"].append({ "email_from": from_addr, "email_subject": subject, "items_count": len(items), "item_types": [i.get("type", "info") for i in items], }) # Route each item type to its destination routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject) result["events_created"].extend(routed["events_created"]) result["errors"].extend(routed["errors"]) # Collect reminders/actions/info for reporting if routed["reminders_created"]: result.setdefault("reminders_created", []).extend(routed["reminders_created"]) if routed["actions_created"]: result.setdefault("actions_created", []).extend(routed["actions_created"]) if routed.get("low_relevance_items"): result["low_relevance_items"].extend(routed["low_relevance_items"]) # Build info summary for Telegram push info_items = routed.get("info_items", []) low_items = routed.get("low_relevance_items", []) if info_items or low_items: info_block = format_info_summary( info_items, low_relevance_items=low_items, source_subject=subject, ) if result["info_summary"]: result["info_summary"] += "\n\n" result["info_summary"] += info_block # Ingest into Family Brain for RAG retrieval if not dry_run: try: ingest_newsletter( subject=subject, body=enriched_body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), items=items, ) print(f" Ingested newsletter into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) continue # Step 2: Parse as appointment (existing path) parsed_list = parse_email_with_llm(subject, body, from_addr, date_str) for parsed in parsed_list: serialized = _serialize_result(parsed) if parsed["type"] == "cancellation": # Act on cancellation: find and delete the matching event cancel_summary = parsed.get("summary", "") cancel_start = parsed.get("start") cancel_result = None if cancel_start: if dry_run: cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary, "start": cancel_start.isoformat()} print(f" [DRY RUN] Would cancel: {cancel_summary} at {cancel_start.isoformat()}", file=sys.stderr) else: cancel_result = find_and_cancel_event(cancel_summary, cancel_start) if cancel_result is None: cancel_result = { "status": "CANCELLATION_NO_MATCH", "summary": cancel_summary, "start": cancel_start.isoformat() if cancel_start else None, } print(f" No matching event found for cancellation: {cancel_summary}", file=sys.stderr) result["cancellations"].append({ "email_from": em["from"], "email_subject": em["subject"], "parsed": serialized, "action": cancel_result, }) continue # It's an appointment result["appointments_found"].append({ "email_from": em["from"], "email_subject": em["subject"], "parsed": serialized, }) start_dt = parsed.get("start") if not start_dt: result["errors"].append( f"Could not resolve datetime for: {parsed['summary']}" ) continue end_dt = parsed.get("end") if not end_dt: end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"]) who_str = ", ".join(parsed["who"]) if parsed["who"] else "" summary = parsed["summary"] if who_str: summary = f"{summary} ({who_str})" if dry_run: is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None result["events_created"].append({ "status": "DRY_RUN", "summary": summary, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "location": _resolve_location(parsed["location"]), "recurring": is_recurring, "recurrence": recurrence, }) else: # Duplicate check: skip if event already exists on calendar existing = event_exists(summary, start_dt) if existing: result["events_created"].append({ "status": "DUPLICATE_SKIPPED", "summary": existing.get("summary", summary), "start": existing["start"].get("dateTime", start_dt.isoformat()), "end": existing["end"].get("dateTime", end_dt.isoformat()), "existing_id": existing["id"], }) print(f" Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr) continue try: # Check if this is a recurring event is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None if is_recurring and recurrence: event = create_recurring_event( summary=summary, start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=parsed["description"], location=_resolve_location(parsed["location"]), ) else: event = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=parsed["description"], location=_resolve_location(parsed["location"]), ) result["events_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "start": event["start"].get("dateTime", start_dt.isoformat()), "end": event["end"].get("dateTime", end_dt.isoformat()), "link": event.get("htmlLink", ""), "recurring": is_recurring, }) except Exception as e: result["errors"].append(f"Calendar create error: {e}") # Ingest appointment email into Family Brain if not dry_run: try: ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=parsed_list, ) print(f" Ingested appointment email into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) # Conflict detection: check newly created events against the calendar created_events = [e for e in result["events_created"] if e.get("status") == "CREATED" and e.get("id")] if created_events and not dry_run: try: conflicts = detect_conflicts(events=[{"id": e["id"], "summary": e.get("summary", ""), "start": e.get("start", ""), "end": e.get("end", "")} for e in created_events]) result["conflicts"] = conflicts if conflicts: print(f" ⚠️ {len(conflicts)} scheduling conflict(s) detected", file=sys.stderr) for c in conflicts: print(f" {c['event1']['summary']} conflicts with {c['event2']['summary']} ({c['overlap_minutes']}min overlap)", file=sys.stderr) except Exception as e: result["conflicts"] = [] result["errors"].append(f"Conflict detection error: {e}") else: result["conflicts"] = [] # Push results to Telegram via Hermes if notify and not dry_run: try: hermes_stats = push_pipeline_results(result, quiet=quiet) result["hermes"] = hermes_stats except Exception as e: result["errors"].append(f"Hermes notification error: {e}") return result def process_webhook_email(email_data: dict, dry_run=False, notify=True): """Process a single email received via the Cloudflare webhook. This is the push-pipeline path: email arrives via the webhook address, Cloudflare Worker POSTs to hook.hoffdesk.com/webhook, which calls this. Reuses the same extraction and routing logic as process_emails(), but skips the IMAP fetch entirely. Args: email_data: Dict with keys: from, to, subject, date, body_text, body_html, has_attachments, attachments, dedup_key, message_id, source ('webhook') dry_run: If True, don't create calendar events. notify: If True, push results to Telegram via Hermes. Returns: Dict with processing results. """ try: return _process_webhook_email_inner(email_data, dry_run=dry_run, notify=notify) except Exception as e: tb = traceback.format_exc() _alert_admin( "Webhook Pipeline Error", f"process_webhook_email() crashed:\n\n```\n{tb[:600]}\n```", ) return { "emails_scanned": 1, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "payment_alerts": [], "errors": [f"UNHANDLED: {e}"], } def _process_webhook_email_inner(email_data: dict, dry_run=False, notify=True): """Internal implementation — wrapped by process_webhook_email() error handler.""" result = { "emails_scanned": 1, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [], "payment_alerts": [], } subject = email_data.get("subject", "") body = email_data.get("body_text", "") from_addr = email_data.get("from", "") date_str = email_data.get("date", "") print(f" [Webhook] Processing: {subject[:60]}...", file=sys.stderr) # Step 1: Triage email_class = _triage_email(subject, body, from_addr) print(f" Classified as: {email_class}", file=sys.stderr) if email_class == "payment_alert": # Extract payment alert details via LLM alert_data = parse_payment_alert_with_llm(subject, body, from_addr, date_str) if alert_data: print(f" Payment alert: {alert_data.get('merchant', '?')} — {alert_data.get('alert_type', '?')}", file=sys.stderr) result["payment_alerts"].append(alert_data) # IMMEDIATE notification if notify and not dry_run: try: msg = format_payment_alert(alert_data) _send_telegram(msg) except Exception as e: result["errors"].append(f"Payment alert notification error: {e}") else: print(f" LLM did not confirm payment alert, skipping", file=sys.stderr) # Ingest into Family Brain if not dry_run: try: ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=[{"type": "payment_alert", "merchant": alert_data.get("merchant", "")} if alert_data else {}], ) print(f" Ingested payment alert into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) return _finalize_result(result, dry_run, notify) if email_class == "newsletter": enriched_body = enrich_body_with_urls(body) items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str) if not items: print(f" No items extracted from newsletter", file=sys.stderr) return result result["newsletter_items"].append({ "email_from": from_addr, "email_subject": subject, "items_count": len(items), "item_types": [i.get("type", "info") for i in items], }) routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject) result["events_created"].extend(routed["events_created"]) result["errors"].extend(routed["errors"]) if routed.get("reminders_created"): result.setdefault("reminders_created", []).extend(routed["reminders_created"]) if routed.get("actions_created"): result.setdefault("actions_created", []).extend(routed["actions_created"]) if routed.get("low_relevance_items"): result["low_relevance_items"].extend(routed["low_relevance_items"]) info_items = routed.get("info_items", []) low_items = routed.get("low_relevance_items", []) if info_items or low_items: result["info_summary"] = format_info_summary( info_items, low_relevance_items=low_items, source_subject=subject, ) if not dry_run: try: ingest_newsletter( subject=subject, body=enriched_body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), items=items, ) print(f" Ingested newsletter into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) return _finalize_result(result, dry_run, notify) # Step 2: Parse as appointment parsed_list = parse_email_with_llm(subject, body, from_addr, date_str) for parsed in parsed_list: serialized = _serialize_result(parsed) if parsed["type"] == "cancellation": cancel_summary = parsed.get("summary", "") cancel_start = parsed.get("start") cancel_result = None if cancel_start: if dry_run: cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary, "start": cancel_start.isoformat()} else: cancel_result = find_and_cancel_event(cancel_summary, cancel_start) if cancel_result is None: cancel_result = {"status": "CANCELLATION_NO_MATCH", "summary": cancel_summary, "start": cancel_start.isoformat() if cancel_start else None} result["cancellations"].append({ "email_from": from_addr, "email_subject": subject, "parsed": serialized, "action": cancel_result, }) continue result["appointments_found"].append({ "email_from": from_addr, "email_subject": subject, "parsed": serialized, }) start_dt = parsed.get("start") if not start_dt: result["errors"].append(f"Could not resolve datetime for: {parsed['summary']}") continue end_dt = parsed.get("end") if not end_dt: end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"]) who_str = ", ".join(parsed["who"]) if parsed["who"] else "" summary = parsed["summary"] if who_str: summary = f"{summary} ({who_str})" if dry_run: is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None result["events_created"].append({ "status": "DRY_RUN", "summary": summary, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "location": _resolve_location(parsed["location"]), "recurring": is_recurring, "recurrence": recurrence, }) else: existing = event_exists(summary, start_dt) if existing: result["events_created"].append({ "status": "DUPLICATE_SKIPPED", "summary": existing.get("summary", summary), "start": existing["start"].get("dateTime", start_dt.isoformat()), "end": existing["end"].get("dateTime", end_dt.isoformat()), "existing_id": existing.get("id"), }) print(f" Skipping duplicate: {summary}", file=sys.stderr) continue try: is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None if is_recurring and recurrence: event = create_recurring_event( summary=summary, start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=parsed["description"], location=_resolve_location(parsed["location"]), ) else: event = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=parsed["description"], location=_resolve_location(parsed["location"]), ) result["events_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "start": event["start"].get("dateTime", start_dt.isoformat()), "end": event["end"].get("dateTime", end_dt.isoformat()), "link": event.get("htmlLink", ""), "recurring": is_recurring, "location": _resolve_location(parsed.get("location", "")), "_claimed_day_of_week": parsed.get("claimed_day_of_week", ""), }) # Immediate per-event success notification if notify and not dry_run: hermes_notify(result["events_created"][-1], email_subject=subject) except Exception as e: result["errors"].append(f"Calendar create error: {e}") # Ingest into Family Brain if not dry_run: try: ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=parsed_list, ) print(f" Ingested appointment email into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) return _finalize_result(result, dry_run, notify) def _finalize_result(result, dry_run, notify): """Shared finalization: conflict detection + Hermes push.""" created_events = [e for e in result["events_created"] if e.get("status") == "CREATED" and e.get("id")] if created_events and not dry_run: try: conflicts = detect_conflicts( events=[{"id": e["id"], "summary": e.get("summary", ""), "start": e.get("start", ""), "end": e.get("end", "")} for e in created_events] ) result["conflicts"] = conflicts except Exception as e: result["conflicts"] = [] result["errors"].append(f"Conflict detection error: {e}") else: result["conflicts"] = [] if notify and not dry_run: try: hermes_stats = push_pipeline_results(result) result["hermes"] = hermes_stats except Exception as e: result["errors"].append(f"Hermes notification error: {e}") return result def backfill_brain(days: int = 30) -> dict: """Backfill the Family Brain with emails from the last N days. Re-processes already-seen emails purely for ChromaDB ingestion. Does NOT create calendar events or send notifications. Args: days: Number of days to look back Returns: Dict with ingestion stats. """ try: return _backfill_brain_inner(days=days) except Exception as e: tb = traceback.format_exc() _alert_admin( "Backfill Error", f"backfill_brain() crashed:\n\n```\n{tb[:600]}\n```", ) return { "emails_scanned": 0, "ingested_emails": 0, "ingested_newsletters": 0, "errors": [f"UNHANDLED: {e}"], "skipped": 0, } def _backfill_brain_inner(days: int = 30) -> dict: """Internal implementation — wrapped by backfill_brain() error handler.""" from family_assistant.email_fetcher import fetch_since from family_assistant.family_brain import ingest_email, ingest_newsletter from family_assistant.newsletter_parser import classify_email, parse_newsletter_with_llm from family_assistant.appointment_parser import parse_email_with_llm # IMAP SINCE format: DD-Mon-YYYY (e.g. 18-Mar-2026) since_dt = datetime.now(CHICAGO_TZ) - timedelta(days=days) imap_date = since_dt.strftime("%d-%b-%Y") print(f"Backfilling Family Brain: emails since {imap_date} ({days} days)", file=sys.stderr) result = { "emails_scanned": 0, "ingested_emails": 0, "ingested_newsletters": 0, "errors": [], "skipped": 0, } mail_data = fetch_since(imap_date) if "error" in mail_data: result["errors"].append(mail_data["error"]) return result emails = mail_data.get("emails", []) result["emails_scanned"] = len(emails) if not emails: print(" No emails found in date range", file=sys.stderr) return result for em in emails: subject = em.get("subject", "") body = em.get("body", "") from_addr = em.get("from", "") date_str = em.get("date", "") print(f" Processing: {subject[:60]}...", file=sys.stderr) # Classify email_class = _triage_email(subject, body, from_addr) print(f" Classified as: {email_class}", file=sys.stderr) try: if email_class == "newsletter": enriched_body = enrich_body_with_urls(body) items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str) ingest_newsletter( subject=subject, body=enriched_body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), items=items or [], ) result["ingested_newsletters"] += 1 print(f" ✅ Ingested newsletter", file=sys.stderr) else: # Appointment email parsed_list = parse_email_with_llm(subject, body, from_addr, date_str) # Serialize datetimes before passing to ingest safe_parsed = [] for p in (parsed_list or []): item = {} for k, v in p.items(): if isinstance(v, datetime): item[k] = v.isoformat() elif isinstance(v, date) and not isinstance(v, datetime): item[k] = v.isoformat() else: item[k] = v safe_parsed.append(item) ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=safe_parsed, ) result["ingested_emails"] += 1 print(f" ✅ Ingested email", file=sys.stderr) except Exception as e: result["errors"].append(f"{subject[:40]}: {e}") print(f" ❌ Error: {e}", file=sys.stderr) print( f"\nBackfill complete: {result['ingested_emails']} emails + " f"{result['ingested_newsletters']} newsletters ingested", file=sys.stderr, ) return result