"""Pipeline orchestration: fetch emails → classify → parse → route to calendar/reminder/info.""" import json import os import sys import traceback from datetime import datetime, timedelta, date from zoneinfo import ZoneInfo from family_assistant.config import CHICAGO_TZ, GMAIL_APP_PASSWORD from family_assistant.email_fetcher import fetch_unread from family_assistant.appointment_parser import parse_email_with_llm from family_assistant.newsletter_parser import classify_email, parse_newsletter_with_llm from family_assistant.family_brain import ingest_email, ingest_newsletter from family_assistant.calendar_sync import ( event_exists, create_event, create_recurring_event, find_and_cancel_event, ) from family_assistant.conflict_engine import detect_conflicts from family_assistant.hermes import push_pipeline_results, hermes_notify, _send_telegram # --------------------------------------------------------------------------- # Auth Failure Circuit Breaker # --------------------------------------------------------------------------- # Tracks consecutive auth failures to prevent error spam. # After MAX_AUTH_FAILURES consecutive failures, the IMAP pipeline is paused # and Matt is alerted once. Resets on first success or manual reset. _AUTH_FAILURE_FILE = os.path.expanduser("~/.family_assistant/auth_circuit_breaker.json") MAX_AUTH_FAILURES = 3 # Pause after this many consecutive failures def _read_circuit_breaker(): """Read circuit breaker state from disk.""" try: with open(_AUTH_FAILURE_FILE, "r") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {"consecutive_failures": 0, "paused": False, "last_alert_ts": None} def _write_circuit_breaker(state): """Write circuit breaker state to disk.""" os.makedirs(os.path.dirname(_AUTH_FAILURE_FILE), exist_ok=True) with open(_AUTH_FAILURE_FILE, "w") as f: json.dump(state, f, indent=2) def _record_auth_failure(error_msg: str) -> bool: """Record an auth failure. Returns True if circuit breaker just tripped. After MAX_AUTH_FAILURES consecutive failures, sets paused=True and alerts Matt once. Subsequent calls return False (already paused). """ state = _read_circuit_breaker() state["consecutive_failures"] = state.get("consecutive_failures", 0) + 1 just_tripped = False if state["consecutive_failures"] >= MAX_AUTH_FAILURES and not state.get("paused"): state["paused"] = True just_tripped = True _write_circuit_breaker(state) if just_tripped: _alert_admin( "🔴 IMAP Circuit Breaker Tripped", f"Gmail auth failed {state['consecutive_failures']} times in a row. " f"IMAP pipeline is now PAUSED to stop error spam.\n\n" f"To reset: python -m family_assistant reset-circuit\n" f"Or: rm {_AUTH_FAILURE_FILE}", ) return just_tripped def _record_auth_success(): """Record a successful auth — resets the circuit breaker.""" state = _read_circuit_breaker() if state.get("consecutive_failures", 0) > 0 or state.get("paused"): was_paused = state.get("paused", False) state["consecutive_failures"] = 0 state["paused"] = False _write_circuit_breaker(state) if was_paused: _alert_admin( "🟢 IMAP Circuit Breaker Reset", "Gmail auth succeeded. IMAP pipeline is back online.", ) def is_imap_paused() -> bool: """Check if the IMAP pipeline is paused due to auth failures.""" return _read_circuit_breaker().get("paused", False) def reset_circuit_breaker(): """Manually reset the circuit breaker.""" state = {"consecutive_failures": 0, "paused": False, "last_alert_ts": None} _write_circuit_breaker(state) print("✅ Circuit breaker reset. IMAP pipeline re-enabled.") def _resolve_location(location_str): """Resolve a location string through the cache to get the canonical name. Prevents LLM hallucinations like 'at Golrusk' → 'PetSmart' by preferring the known-locations cache. Returns the canonical cached name if found, otherwise returns the original string unchanged. """ if not location_str or not location_str.strip(): return location_str try: from family_assistant.location_cache import resolve result = resolve(location_str.strip(), use_home_bias=True) if result and result.get("name"): return result["name"] except Exception: pass return location_str from family_assistant.url_fetcher import enrich_body_with_urls # --------------------------------------------------------------------------- # Global error handler — no silent deaths # --------------------------------------------------------------------------- _DEV_TARGET = os.environ.get("TELEGRAM_DEV_ID", "") def _alert_admin(title: str, detail: str): """Push an error alert to Matt's DM. Best-effort, never raises.""" if not _DEV_TARGET: print(f" [ALERT] {title}: {detail}", file=sys.stderr) return # Strip markdown special characters from detail to prevent Telegram parse failures. # Brackets, backticks, asterisks, underscores in error strings break Markdown mode. import re safe_detail = re.sub(r'[_*\[\]`]', '', detail[:800]) msg = f"⚠️ **{title}**\n\n{safe_detail}" try: _send_telegram(msg, target=_DEV_TARGET) except Exception: print(f" [ALERT] Failed to send admin alert: {title}", file=sys.stderr) def _serialize_result(parsed): """Convert datetime objects and other non-serializable types to strings.""" result = {} for key, value in parsed.items(): if isinstance(value, datetime): result[key] = value.isoformat() elif isinstance(value, date) and not isinstance(value, datetime): result[key] = value.isoformat() elif isinstance(value, list): result[key] = [ v.isoformat() if isinstance(v, (datetime, date)) else v for v in value ] elif isinstance(value, dict): result[key] = { k: v.isoformat() if isinstance(v, (datetime, date)) else v for k, v in value.items() } else: result[key] = value return result def _triage_email(subject, body, from_addr): """Classify an email as 'appointment' or 'newsletter'. Uses the local LLM to make the classification. Falls back to 'appointment' on errors (simpler parsing path, existing parser handles it). """ try: return classify_email(subject, body, from_addr) except Exception as e: print(f" [Triage] Classification error: {e}, defaulting to appointment", file=sys.stderr) return "appointment" def _route_newsletter_items(items, dry_run=False, notify=True, email_subject=""): """Route newsletter items to their appropriate destinations. Routing based on relevance: - high relevance events/reminders/actions → Google Calendar - low relevance items of any type → Telegram Info Digest (no calendar) - info items → Telegram Info Digest (regardless of relevance) - rejected items (per family.yaml rules) → skipped with note Returns a dict with routed items and any errors. """ result = { "events_created": [], "reminders_created": [], "actions_created": [], "info_items": [], # high-relevance info + low-relevance items of any type "low_relevance_items": [], # explicit low-relevance tracking "rejected_items": [], # items filtered by rejection rules "auto_filtered": [], # items shadow-filtered to low relevance "errors": [], } # Shadow-filter rejected items — they stay in the list but get flagged # as low_relevance with a note about which rule caught them. # The user still sees what was filtered in the digest and can restore items. from .rejection_engine import shadow_filter_items, should_reject items = shadow_filter_items(items, scope="newsletter") # Collect auto-filtered items for notification for item in items: if item.get("_auto_filtered"): who_str = ", ".join(item.get("who", [])) result["auto_filtered"].append({ "type": item.get("original_type", "?"), "summary": item.get("summary", ""), "who": who_str, "rule": item.get("reason", ""), }) print(f" Auto-filtered: {item.get('summary', '?')} → low relevance", file=sys.stderr) for item in items: item_type = item.get("type", "info") relevance = item.get("relevance", "high") reason = item.get("reason", "No reason provided") # LOW RELEVANCE → always goes to Telegram Info Digest, never calendar if relevance == "low": who_str = ", ".join(item.get("who", [])) result["low_relevance_items"].append({ "type": item_type, "summary": item.get("summary", ""), "who": who_str, "description": item.get("description", ""), "reason": reason, }) # Also add to info_items for Telegram summary result["info_items"].append({ "summary": f"[{item_type}] {item['summary']}", "who": who_str, "description": f"{item.get('description', '')} (low relevance: {reason})", }) continue # INFO type → Telegram Info Digest regardless of relevance if item_type == "info": who_str = ", ".join(item.get("who", [])) result["info_items"].append({ "summary": item["summary"], "who": who_str, "description": item.get("description", ""), }) continue # HIGH RELEVANCE items → route to calendar if item_type == "event": # Same path as appointments — create on calendar with dedup start_dt = item.get("start") if not start_dt: result["errors"].append(f"Event missing start: {item.get('summary', '?')}") continue end_dt = item.get("end") duration_minutes = item.get("duration_minutes", 60) if not end_dt: end_dt = start_dt + timedelta(minutes=duration_minutes) who_str = ", ".join(item.get("who", [])) summary = item["summary"] if who_str: summary = f"{summary} ({who_str})" if dry_run: result["events_created"].append({ "status": "DRY_RUN", "summary": summary, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "location": _resolve_location(item.get("location", "")), }) continue existing = event_exists(summary, start_dt) if existing: result["events_created"].append({ "status": "DUPLICATE_SKIPPED", "summary": existing.get("summary", summary), "start": existing["start"].get("dateTime", start_dt.isoformat()), "end": existing["end"].get("dateTime", end_dt.isoformat()), "existing_id": existing["id"], }) print(f" Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr) continue try: # Check if this is a recurring event is_recurring = item.get("is_recurring", False) recurrence = item.get("recurrence") if is_recurring else None if is_recurring and recurrence: event = create_recurring_event( summary=summary, start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=item.get("description", ""), location=_resolve_location(item.get("location", "")), ) else: event = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=item.get("description", ""), location=_resolve_location(item.get("location", "")), ) result["events_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "start": event["start"].get("dateTime", start_dt.isoformat()), "end": event["end"].get("dateTime", end_dt.isoformat()), "link": event.get("htmlLink", ""), "recurring": is_recurring, "location": _resolve_location(item.get("location", "")), "_claimed_day_of_week": item.get("claimed_day_of_week", ""), }) # Immediate per-event success notification if notify and not dry_run: hermes_notify(result["events_created"][-1], email_subject=email_subject) except Exception as e: result["errors"].append(f"Calendar create error: {e}") elif item_type == "reminder": # All-day event with [REMINDER] prefix due = item.get("due") who_str = ", ".join(item.get("who", [])) summary = f"[REMINDER] {item['summary']}" if who_str: summary += f" ({who_str})" if due: due_str = due.isoformat() if isinstance(due, date) else str(due) else: due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d") description = item.get("description", "") if due: description = f"Due: {due_str}\n" + description if dry_run: result["reminders_created"].append({ "status": "DRY_RUN", "summary": summary, "date": due_str, }) continue try: # Create as all-day event (date only, no time) event = create_event( summary=summary, start_dt=due_str, # date string for all-day event end_dt=due_str, description=description, ) result["reminders_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "date": due_str, }) except Exception as e: result["errors"].append(f"Reminder create error: {e}") elif item_type == "action_item": # All-day event with [ACTION] prefix due = item.get("due") who_str = ", ".join(item.get("who", [])) summary = f"[ACTION] {item['summary']}" if who_str: summary += f" ({who_str})" if due: due_str = due.isoformat() if isinstance(due, date) else str(due) else: due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d") description = item.get("description", "") if due: description = f"Due: {due_str}\n" + description if dry_run: result["actions_created"].append({ "status": "DRY_RUN", "summary": summary, "date": due_str, "description": description, }) continue try: event = create_event( summary=summary, start_dt=due_str, end_dt=due_str, description=description, ) result["actions_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "date": due_str, "description": description, }) except Exception as e: result["errors"].append(f"Action create error: {e}") return result def format_info_summary(info_items, low_relevance_items=None, source_subject=""): """Format info items into a Telegram-friendly summary string.""" if not info_items and not low_relevance_items: return "" lines = [f"📋 **Newsletter Summary{': ' + source_subject if source_subject else ''}**"] for item in info_items: who = f" ({item['who']})" if item.get('who') else "" lines.append(f" • {item['summary']}{who}: {item['description']}") if low_relevance_items: lines.append("\n 📌 **Low relevance (not added to calendar):**") for item in low_relevance_items: who = f" ({item['who']})" if item.get('who') else "" lines.append(f" • [{item.get('type', '?')}] {item['summary']}{who} — {item.get('reason', '')}") return "\n".join(lines) def process_emails(dry_run=False, notify=True, quiet=False): """Fetch unread emails, parse appointments via LLM, create calendar events. All fetched emails are marked as read after processing, regardless of whether they contained appointments. This prevents re-processing the same emails on subsequent heartbeat runs. Includes auth failure circuit breaker: after 3 consecutive Gmail auth failures, the IMAP pipeline is paused to prevent error spam. Args: dry_run: If True, don't create calendar events or mark emails read. notify: If True, push results to Telegram via Hermes. quiet: If True, suppress family group notifications (only DM alerts). """ # Check circuit breaker first — skip IMAP if paused if is_imap_paused(): return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": ["IMAP_PAUSED: Gmail auth circuit breaker is tripped. Run 'family-assistant reset-circuit' to re-enable."], } # Skip IMAP entirely if no credentials configured (webhook-only mode) if not GMAIL_APP_PASSWORD: return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": ["IMAP_SKIPPED: No Gmail credentials (webhook-only mode)"], } try: result = _process_emails_inner(dry_run=dry_run, notify=notify, quiet=quiet) # Success — reset circuit breaker _record_auth_success() return result except Exception as e: # Check if this is an auth failure error_str = str(e).lower() is_auth_failure = any( sig in error_str for sig in ["authentication", "invalid_grant", "credentials", "auth failed", "login failed", "[auth]", "[alert]", "suspen", "disabled", "access denied"] ) if is_auth_failure: just_tripped = _record_auth_failure(str(e)[:200]) # Whether we just tripped or are still counting — return silently return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [ "IMAP_PAUSED: circuit breaker tripped" if is_imap_paused() else "IMAP_AUTH_FAILED: Gmail auth failed (circuit breaker counting)" ], } # Non-auth failure or first-trip alert — send stack trace tb = traceback.format_exc() _alert_admin( "Pipeline Error", f"process_emails() crashed:\n\n```\n{tb[:600]}\n```", ) return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [f"UNHANDLED: {e}"], } def _process_emails_inner(dry_run=False, notify=True, quiet=False): """Internal implementation — wrapped by process_emails() error handler.""" result = { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [], } # Fetch and mark as read in one pass — no second connection needed mail_data = fetch_unread(mark_read=not dry_run) if "error" in mail_data: result["errors"].append(mail_data["error"]) return result emails = mail_data.get("emails", []) result["emails_scanned"] = len(emails) if not emails: return result for em in emails: subject = em.get("subject", "") body = em.get("body", "") from_addr = em.get("from", "") date_str = em.get("date", "") print(f" Processing: {subject[:60]}...", file=sys.stderr) # Step 1: Triage — classify email type email_class = _triage_email(subject, body, from_addr) print(f" Classified as: {email_class}", file=sys.stderr) if email_class == "newsletter": # Enrich body with URL content (e.g. Smore links) enriched_body = enrich_body_with_urls(body) # Parse as newsletter with multi-type extraction items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str) if not items: print(f" No items extracted from newsletter", file=sys.stderr) continue result["newsletter_items"].append({ "email_from": from_addr, "email_subject": subject, "items_count": len(items), "item_types": [i.get("type", "info") for i in items], }) # Route each item type to its destination routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject) result["events_created"].extend(routed["events_created"]) result["errors"].extend(routed["errors"]) # Collect reminders/actions/info for reporting if routed["reminders_created"]: result.setdefault("reminders_created", []).extend(routed["reminders_created"]) if routed["actions_created"]: result.setdefault("actions_created", []).extend(routed["actions_created"]) if routed.get("low_relevance_items"): result["low_relevance_items"].extend(routed["low_relevance_items"]) # Build info summary for Telegram push info_items = routed.get("info_items", []) low_items = routed.get("low_relevance_items", []) if info_items or low_items: info_block = format_info_summary( info_items, low_relevance_items=low_items, source_subject=subject, ) if result["info_summary"]: result["info_summary"] += "\n\n" result["info_summary"] += info_block # Ingest into Family Brain for RAG retrieval if not dry_run: try: ingest_newsletter( subject=subject, body=enriched_body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), items=items, ) print(f" Ingested newsletter into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) continue # Step 2: Parse as appointment (existing path) parsed_list = parse_email_with_llm(subject, body, from_addr, date_str) for parsed in parsed_list: serialized = _serialize_result(parsed) if parsed["type"] == "cancellation": # Act on cancellation: find and delete the matching event cancel_summary = parsed.get("summary", "") cancel_start = parsed.get("start") cancel_result = None if cancel_start: if dry_run: cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary, "start": cancel_start.isoformat()} print(f" [DRY RUN] Would cancel: {cancel_summary} at {cancel_start.isoformat()}", file=sys.stderr) else: cancel_result = find_and_cancel_event(cancel_summary, cancel_start) if cancel_result is None: cancel_result = { "status": "CANCELLATION_NO_MATCH", "summary": cancel_summary, "start": cancel_start.isoformat() if cancel_start else None, } print(f" No matching event found for cancellation: {cancel_summary}", file=sys.stderr) result["cancellations"].append({ "email_from": em["from"], "email_subject": em["subject"], "parsed": serialized, "action": cancel_result, }) continue # It's an appointment result["appointments_found"].append({ "email_from": em["from"], "email_subject": em["subject"], "parsed": serialized, }) start_dt = parsed.get("start") if not start_dt: result["errors"].append( f"Could not resolve datetime for: {parsed['summary']}" ) continue end_dt = parsed.get("end") if not end_dt: end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"]) who_str = ", ".join(parsed["who"]) if parsed["who"] else "" summary = parsed["summary"] if who_str: summary = f"{summary} ({who_str})" if dry_run: is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None result["events_created"].append({ "status": "DRY_RUN", "summary": summary, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "location": _resolve_location(parsed["location"]), "recurring": is_recurring, "recurrence": recurrence, }) else: # Duplicate check: skip if event already exists on calendar existing = event_exists(summary, start_dt) if existing: result["events_created"].append({ "status": "DUPLICATE_SKIPPED", "summary": existing.get("summary", summary), "start": existing["start"].get("dateTime", start_dt.isoformat()), "end": existing["end"].get("dateTime", end_dt.isoformat()), "existing_id": existing["id"], }) print(f" Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr) continue try: # Check if this is a recurring event is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None if is_recurring and recurrence: event = create_recurring_event( summary=summary, start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=parsed["description"], location=_resolve_location(parsed["location"]), ) else: event = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=parsed["description"], location=_resolve_location(parsed["location"]), ) result["events_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "start": event["start"].get("dateTime", start_dt.isoformat()), "end": event["end"].get("dateTime", end_dt.isoformat()), "link": event.get("htmlLink", ""), "recurring": is_recurring, }) except Exception as e: result["errors"].append(f"Calendar create error: {e}") # Ingest appointment email into Family Brain if not dry_run: try: ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=parsed_list, ) print(f" Ingested appointment email into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) # Conflict detection: check newly created events against the calendar created_events = [e for e in result["events_created"] if e.get("status") == "CREATED" and e.get("id")] if created_events and not dry_run: try: conflicts = detect_conflicts(events=[{"id": e["id"], "summary": e.get("summary", ""), "start": e.get("start", ""), "end": e.get("end", "")} for e in created_events]) result["conflicts"] = conflicts if conflicts: print(f" ⚠️ {len(conflicts)} scheduling conflict(s) detected", file=sys.stderr) for c in conflicts: print(f" {c['event1']['summary']} conflicts with {c['event2']['summary']} ({c['overlap_minutes']}min overlap)", file=sys.stderr) except Exception as e: result["conflicts"] = [] result["errors"].append(f"Conflict detection error: {e}") else: result["conflicts"] = [] # Push results to Telegram via Hermes if notify and not dry_run: try: hermes_stats = push_pipeline_results(result, quiet=quiet) result["hermes"] = hermes_stats except Exception as e: result["errors"].append(f"Hermes notification error: {e}") return result def process_webhook_email(email_data: dict, dry_run=False, notify=True): """Process a single email received via the Cloudflare webhook. This is the push-pipeline path: email arrives via the webhook address, Cloudflare Worker POSTs to hook.hoffdesk.com/webhook, which calls this. Reuses the same extraction and routing logic as process_emails(), but skips the IMAP fetch entirely. Args: email_data: Dict with keys: from, to, subject, date, body_text, body_html, has_attachments, attachments, dedup_key, message_id, source ('webhook') dry_run: If True, don't create calendar events. notify: If True, push results to Telegram via Hermes. Returns: Dict with processing results. """ try: return _process_webhook_email_inner(email_data, dry_run=dry_run, notify=notify) except Exception as e: tb = traceback.format_exc() _alert_admin( "Webhook Pipeline Error", f"process_webhook_email() crashed:\n\n```\n{tb[:600]}\n```", ) return { "emails_scanned": 1, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [f"UNHANDLED: {e}"], } def _process_webhook_email_inner(email_data: dict, dry_run=False, notify=True): """Internal implementation — wrapped by process_webhook_email() error handler.""" result = { "emails_scanned": 1, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [], } subject = email_data.get("subject", "") body = email_data.get("body_text", "") from_addr = email_data.get("from", "") date_str = email_data.get("date", "") print(f" [Webhook] Processing: {subject[:60]}...", file=sys.stderr) # Step 1: Triage email_class = _triage_email(subject, body, from_addr) print(f" Classified as: {email_class}", file=sys.stderr) if email_class == "newsletter": enriched_body = enrich_body_with_urls(body) items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str) if not items: print(f" No items extracted from newsletter", file=sys.stderr) return result result["newsletter_items"].append({ "email_from": from_addr, "email_subject": subject, "items_count": len(items), "item_types": [i.get("type", "info") for i in items], }) routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject) result["events_created"].extend(routed["events_created"]) result["errors"].extend(routed["errors"]) if routed.get("reminders_created"): result.setdefault("reminders_created", []).extend(routed["reminders_created"]) if routed.get("actions_created"): result.setdefault("actions_created", []).extend(routed["actions_created"]) if routed.get("low_relevance_items"): result["low_relevance_items"].extend(routed["low_relevance_items"]) info_items = routed.get("info_items", []) low_items = routed.get("low_relevance_items", []) if info_items or low_items: result["info_summary"] = format_info_summary( info_items, low_relevance_items=low_items, source_subject=subject, ) if not dry_run: try: ingest_newsletter( subject=subject, body=enriched_body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), items=items, ) print(f" Ingested newsletter into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) return _finalize_result(result, dry_run, notify) # Step 2: Parse as appointment parsed_list = parse_email_with_llm(subject, body, from_addr, date_str) for parsed in parsed_list: serialized = _serialize_result(parsed) if parsed["type"] == "cancellation": cancel_summary = parsed.get("summary", "") cancel_start = parsed.get("start") cancel_result = None if cancel_start: if dry_run: cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary, "start": cancel_start.isoformat()} else: cancel_result = find_and_cancel_event(cancel_summary, cancel_start) if cancel_result is None: cancel_result = {"status": "CANCELLATION_NO_MATCH", "summary": cancel_summary, "start": cancel_start.isoformat() if cancel_start else None} result["cancellations"].append({ "email_from": from_addr, "email_subject": subject, "parsed": serialized, "action": cancel_result, }) continue result["appointments_found"].append({ "email_from": from_addr, "email_subject": subject, "parsed": serialized, }) start_dt = parsed.get("start") if not start_dt: result["errors"].append(f"Could not resolve datetime for: {parsed['summary']}") continue end_dt = parsed.get("end") if not end_dt: end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"]) who_str = ", ".join(parsed["who"]) if parsed["who"] else "" summary = parsed["summary"] if who_str: summary = f"{summary} ({who_str})" if dry_run: is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None result["events_created"].append({ "status": "DRY_RUN", "summary": summary, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "location": _resolve_location(parsed["location"]), "recurring": is_recurring, "recurrence": recurrence, }) else: existing = event_exists(summary, start_dt) if existing: result["events_created"].append({ "status": "DUPLICATE_SKIPPED", "summary": existing.get("summary", summary), "start": existing["start"].get("dateTime", start_dt.isoformat()), "end": existing["end"].get("dateTime", end_dt.isoformat()), "existing_id": existing.get("id"), }) print(f" Skipping duplicate: {summary}", file=sys.stderr) continue try: is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None if is_recurring and recurrence: event = create_recurring_event( summary=summary, start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=parsed["description"], location=_resolve_location(parsed["location"]), ) else: event = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=parsed["description"], location=_resolve_location(parsed["location"]), ) result["events_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "start": event["start"].get("dateTime", start_dt.isoformat()), "end": event["end"].get("dateTime", end_dt.isoformat()), "link": event.get("htmlLink", ""), "recurring": is_recurring, "location": _resolve_location(parsed.get("location", "")), "_claimed_day_of_week": parsed.get("claimed_day_of_week", ""), }) # Immediate per-event success notification if notify and not dry_run: hermes_notify(result["events_created"][-1], email_subject=subject) except Exception as e: result["errors"].append(f"Calendar create error: {e}") # Ingest into Family Brain if not dry_run: try: ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=parsed_list, ) print(f" Ingested appointment email into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) return _finalize_result(result, dry_run, notify) def _finalize_result(result, dry_run, notify): """Shared finalization: conflict detection + Hermes push.""" created_events = [e for e in result["events_created"] if e.get("status") == "CREATED" and e.get("id")] if created_events and not dry_run: try: conflicts = detect_conflicts( events=[{"id": e["id"], "summary": e.get("summary", ""), "start": e.get("start", ""), "end": e.get("end", "")} for e in created_events] ) result["conflicts"] = conflicts except Exception as e: result["conflicts"] = [] result["errors"].append(f"Conflict detection error: {e}") else: result["conflicts"] = [] if notify and not dry_run: try: hermes_stats = push_pipeline_results(result) result["hermes"] = hermes_stats except Exception as e: result["errors"].append(f"Hermes notification error: {e}") return result def backfill_brain(days: int = 30) -> dict: """Backfill the Family Brain with emails from the last N days. Re-processes already-seen emails purely for ChromaDB ingestion. Does NOT create calendar events or send notifications. Args: days: Number of days to look back Returns: Dict with ingestion stats. """ try: return _backfill_brain_inner(days=days) except Exception as e: tb = traceback.format_exc() _alert_admin( "Backfill Error", f"backfill_brain() crashed:\n\n```\n{tb[:600]}\n```", ) return { "emails_scanned": 0, "ingested_emails": 0, "ingested_newsletters": 0, "errors": [f"UNHANDLED: {e}"], "skipped": 0, } def _backfill_brain_inner(days: int = 30) -> dict: """Internal implementation — wrapped by backfill_brain() error handler.""" from family_assistant.email_fetcher import fetch_since from family_assistant.family_brain import ingest_email, ingest_newsletter from family_assistant.newsletter_parser import classify_email, parse_newsletter_with_llm from family_assistant.appointment_parser import parse_email_with_llm # IMAP SINCE format: DD-Mon-YYYY (e.g. 18-Mar-2026) since_dt = datetime.now(CHICAGO_TZ) - timedelta(days=days) imap_date = since_dt.strftime("%d-%b-%Y") print(f"Backfilling Family Brain: emails since {imap_date} ({days} days)", file=sys.stderr) result = { "emails_scanned": 0, "ingested_emails": 0, "ingested_newsletters": 0, "errors": [], "skipped": 0, } mail_data = fetch_since(imap_date) if "error" in mail_data: result["errors"].append(mail_data["error"]) return result emails = mail_data.get("emails", []) result["emails_scanned"] = len(emails) if not emails: print(" No emails found in date range", file=sys.stderr) return result for em in emails: subject = em.get("subject", "") body = em.get("body", "") from_addr = em.get("from", "") date_str = em.get("date", "") print(f" Processing: {subject[:60]}...", file=sys.stderr) # Classify email_class = _triage_email(subject, body, from_addr) print(f" Classified as: {email_class}", file=sys.stderr) try: if email_class == "newsletter": enriched_body = enrich_body_with_urls(body) items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str) ingest_newsletter( subject=subject, body=enriched_body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), items=items or [], ) result["ingested_newsletters"] += 1 print(f" ✅ Ingested newsletter", file=sys.stderr) else: # Appointment email parsed_list = parse_email_with_llm(subject, body, from_addr, date_str) # Serialize datetimes before passing to ingest safe_parsed = [] for p in (parsed_list or []): item = {} for k, v in p.items(): if isinstance(v, datetime): item[k] = v.isoformat() elif isinstance(v, date) and not isinstance(v, datetime): item[k] = v.isoformat() else: item[k] = v safe_parsed.append(item) ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=safe_parsed, ) result["ingested_emails"] += 1 print(f" ✅ Ingested email", file=sys.stderr) except Exception as e: result["errors"].append(f"{subject[:40]}: {e}") print(f" ❌ Error: {e}", file=sys.stderr) print( f"\nBackfill complete: {result['ingested_emails']} emails + " f"{result['ingested_newsletters']} newsletters ingested", file=sys.stderr, ) return result