"""Pipeline orchestration: fetch emails → classify → parse → route to calendar/reminder/info.""" import json import os import sys import traceback from datetime import datetime, timedelta, date from zoneinfo import ZoneInfo from icarus.core.config import CHICAGO_TZ, GMAIL_APP_PASSWORD, AUTH_CIRCUIT_BREAKER_FILE from icarus.core.email_fetcher import fetch_unread from icarus.core.appointment_parser import parse_email_with_llm from icarus.core.newsletter_parser import classify_email, parse_newsletter_with_llm from icarus.core.family_brain import ingest_email, ingest_newsletter from icarus.core.calendar_sync import ( event_exists, create_event, create_recurring_event, find_and_cancel_event, ) from icarus.core.db.event_graph import EventGraphWriter, classify_event_type from icarus.core.conflict_engine import detect_conflicts from icarus.core.hermes import push_pipeline_results, hermes_notify, _send_telegram, format_info_summary # --------------------------------------------------------------------------- # Event Graph Persistence # --------------------------------------------------------------------------- def _persist_to_event_graph(email_data: dict, parsed_items: list, event_type_hint: str = None) -> None: """Persist parsed email items to the Event Graph. Called alongside Family Brain ingestion. Non-fatal — errors are logged but never block the pipeline. Args: email_data: Dict with email metadata (from, subject, date) parsed_items: List of parsed items (appointments or newsletter items) event_type_hint: Optional type override ("calendar_event", "coordination", "info") """ import hashlib if not parsed_items: return for item in parsed_items: try: # Generate document_id from email subject + item summary doc_id = f"email_{hashlib.sha256((email_data.get('subject', '') + '_' + item.get('summary', '')).encode()).hexdigest()[:16]}" # Build extraction dict for classification extraction = { "key_details": { "date": item.get("start", "").isoformat() if hasattr(item.get("start"), "isoformat") else str(item.get("start", "")), "time": item.get("start", "").isoformat() if hasattr(item.get("start"), "isoformat") else str(item.get("start", "")), "location": item.get("location", ""), "who": item.get("who", []), }, "title": item.get("summary", ""), "summary": item.get("description", ""), "category": event_type_hint or item.get("type", "info"), "suggested_actions": [], "assigned_to": item.get("who", []), "confidence": item.get("confidence", 0.7), } # Classify if event_type_hint: event_type = event_type_hint else: event_type = classify_event_type(extraction) # Skip info-only if event_type == "info" and not item.get("start"): continue # Override: items with start times that go to calendar are calendar_events if item.get("start") and event_type == "info": event_type = "calendar_event" extraction["event_type"] = event_type EventGraphWriter().write(doc_id, extraction, extraction) print(f" [Event Graph] Persisted: {item.get('summary', '?')} as {event_type}", file=sys.stderr) except Exception as e: print(f" [Event Graph] Persist error: {e}", file=sys.stderr) # --------------------------------------------------------------------------- # Payment Alert Detection # --------------------------------------------------------------------------- _PAYMENT_ALERT_PHRASES = [ "payment failed", "card declined", "transaction failed", "card expired", "update your payment", "payment method", "subscription suspended", "account will be canceled", "billing error", "payment declined", "renewal failed", "expired on", "your card on file", ] def _check_payment_alert(subject: str, body: str, from_addr: str) -> dict | None: """Check if email is a payment alert and return alert data if found. Returns dict with alert details if payment-related phrases found, None otherwise. """ text = f"{subject} {body}".lower() for phrase in _PAYMENT_ALERT_PHRASES: if phrase in text: return { "alert_reason": phrase, "merchant": from_addr.split("<")[-1].rstrip(">").split("@")[-1].split(".")[-2] if "<" in from_addr else (from_addr.split("@")[-1].split(".")[-2] if "@" in from_addr else "Unknown"), "subject": subject, "service": None, # Could be extracted by LLM "deadline": None, # Could be extracted by LLM } return None def _send_payment_alert_notification(alert: dict, email_data: dict, target: str = None): """Send immediate Telegram notification for payment alerts. Uses the existing _send_telegram path for reliability. """ # Import targets from hermes module (not config) from icarus.core.hermes import DEFAULT_TELEGRAM_TARGET, DEV_TELEGRAM_TARGET target = target or DEFAULT_TELEGRAM_TARGET subject = email_data.get("subject", "Unknown") from_addr = email_data.get("from", "Unknown") alert_reason = alert.get("alert_reason", "payment issue") merchant = alert.get("merchant", "Unknown Service") msg = f"""🚨 **[PAYMENT ALERT]** 🚨 **{merchant}** Reason: {alert_reason} Subject: {subject} From: {from_addr} ⚠️ Please update payment method immediately.""" try: _send_telegram(msg, target=target) print(f" [PAYMENT ALERT] Sent immediate notification: {alert_reason}", file=sys.stderr) return True except Exception as e: print(f" [PAYMENT ALERT] Failed to send notification: {e}", file=sys.stderr) return False # --------------------------------------------------------------------------- # Auth Failure Circuit Breaker # --------------------------------------------------------------------------- # Tracks consecutive auth failures to prevent error spam. # After MAX_AUTH_FAILURES consecutive failures, the IMAP pipeline is paused # and Matt is alerted once. Resets on first success or manual reset. _AUTH_FAILURE_FILE = AUTH_CIRCUIT_BREAKER_FILE MAX_AUTH_FAILURES = 3 # Pause after this many consecutive failures def _read_circuit_breaker(): """Read circuit breaker state from disk.""" try: with open(_AUTH_FAILURE_FILE, "r") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {"consecutive_failures": 0, "paused": False, "last_alert_ts": None} def _write_circuit_breaker(state): """Write circuit breaker state to disk.""" os.makedirs(os.path.dirname(_AUTH_FAILURE_FILE), exist_ok=True) with open(_AUTH_FAILURE_FILE, "w") as f: json.dump(state, f, indent=2) def _record_auth_failure(error_msg: str) -> bool: """Record an auth failure. Returns True if circuit breaker just tripped. After MAX_AUTH_FAILURES consecutive failures, sets paused=True and alerts Matt once. Subsequent calls return False (already paused). """ state = _read_circuit_breaker() state["consecutive_failures"] = state.get("consecutive_failures", 0) + 1 just_tripped = False if state["consecutive_failures"] >= MAX_AUTH_FAILURES and not state.get("paused"): state["paused"] = True just_tripped = True _write_circuit_breaker(state) if just_tripped: _alert_admin( "🔴 IMAP Circuit Breaker Tripped", f"Gmail auth failed {state['consecutive_failures']} times in a row. " f"IMAP pipeline is now PAUSED to stop error spam.\n\n" f"To reset: python -m icarus.core reset-circuit\n" f"Or: rm {_AUTH_FAILURE_FILE}", ) return just_tripped def _record_auth_success(): """Record a successful auth — resets the circuit breaker.""" state = _read_circuit_breaker() if state.get("consecutive_failures", 0) > 0 or state.get("paused"): was_paused = state.get("paused", False) state["consecutive_failures"] = 0 state["paused"] = False _write_circuit_breaker(state) if was_paused: _alert_admin( "🟢 IMAP Circuit Breaker Reset", "Gmail auth succeeded. IMAP pipeline is back online.", ) def is_imap_paused() -> bool: """Check if the IMAP pipeline is paused due to auth failures.""" return _read_circuit_breaker().get("paused", False) def reset_circuit_breaker(): """Manually reset the circuit breaker.""" state = {"consecutive_failures": 0, "paused": False, "last_alert_ts": None} _write_circuit_breaker(state) print("✅ Circuit breaker reset. IMAP pipeline re-enabled.") def _resolve_location(location_str): """Resolve a location string through the cache to get the canonical name. Prevents LLM hallucinations like 'at Golrusk' → 'PetSmart' by preferring the known-locations cache. Returns the canonical cached name if found, otherwise returns the original string unchanged. """ if not location_str or not location_str.strip(): return location_str try: from icarus.core.location_cache import resolve result = resolve(location_str.strip(), use_home_bias=True) if result and result.get("name"): return result["name"] except Exception: pass return location_str from icarus.core.url_fetcher import enrich_body_with_urls # --------------------------------------------------------------------------- # Global error handler — no silent deaths # --------------------------------------------------------------------------- _DEV_TARGET = os.environ.get("TELEGRAM_DEV_ID", "") def _alert_admin(title: str, detail: str): """Push an error alert to Matt's DM. Best-effort, never raises.""" if not _DEV_TARGET: print(f" [ALERT] {title}: {detail}", file=sys.stderr) return # Strip markdown special characters from detail to prevent Telegram parse failures. # Brackets, backticks, asterisks, underscores in error strings break Markdown mode. import re safe_detail = re.sub(r'[_*\[\]`]', '', detail[:800]) msg = f"⚠️ **{title}**\n\n{safe_detail}" try: _send_telegram(msg, target=_DEV_TARGET) except Exception: print(f" [ALERT] Failed to send admin alert: {title}", file=sys.stderr) def _serialize_result(parsed): """Convert datetime objects and other non-serializable types to strings.""" result = {} for key, value in parsed.items(): if isinstance(value, datetime): result[key] = value.isoformat() elif isinstance(value, date) and not isinstance(value, datetime): result[key] = value.isoformat() elif isinstance(value, list): result[key] = [ v.isoformat() if isinstance(v, (datetime, date)) else v for v in value ] elif isinstance(value, dict): result[key] = { k: v.isoformat() if isinstance(v, (datetime, date)) else v for k, v in value.items() } else: result[key] = value return result def _triage_email(subject, body, from_addr): """Classify an email as 'appointment' or 'newsletter'. Uses the local LLM to make the classification. Falls back to 'appointment' on errors (simpler parsing path, existing parser handles it). """ try: return classify_email(subject, body, from_addr) except Exception as e: print(f" [Triage] Classification error: {e}, defaulting to appointment", file=sys.stderr) return "appointment" def _route_newsletter_items(items, dry_run=False, notify=True, email_subject=""): """Route newsletter items to their appropriate destinations. Routing based on relevance: - high relevance events/reminders/actions → Google Calendar - low relevance items of any type → Telegram Info Digest (no calendar) - info items → Telegram Info Digest (regardless of relevance) - rejected items (per family.yaml rules) → skipped with note Returns a dict with routed items and any errors. """ result = { "events_created": [], "reminders_created": [], "actions_created": [], "info_items": [], # high-relevance info + low-relevance items of any type "low_relevance_items": [], # explicit low-relevance tracking "rejected_items": [], # items filtered by rejection rules "auto_filtered": [], # items shadow-filtered to low relevance "errors": [], } # Shadow-filter rejected items — they stay in the list but get flagged # as low_relevance with a note about which rule caught them. # The user still sees what was filtered in the digest and can restore items. from .rejection_engine import shadow_filter_items, should_reject items = shadow_filter_items(items, scope="newsletter") # Collect auto-filtered items for notification for item in items: if item.get("_auto_filtered"): who_str = ", ".join(item.get("who", [])) result["auto_filtered"].append({ "type": item.get("original_type", "?"), "summary": item.get("summary", ""), "who": who_str, "rule": item.get("reason", ""), }) print(f" Auto-filtered: {item.get('summary', '?')} → low relevance", file=sys.stderr) for item in items: item_type = item.get("type", "info") relevance = item.get("relevance", "high") reason = item.get("reason", "No reason provided") # LOW RELEVANCE → always goes to Telegram Info Digest, never calendar if relevance == "low": who_str = ", ".join(item.get("who", [])) result["low_relevance_items"].append({ "type": item_type, "summary": item.get("summary", ""), "who": who_str, "description": item.get("description", ""), "reason": reason, }) # Also add to info_items for Telegram summary result["info_items"].append({ "summary": f"[{item_type}] {item['summary']}", "who": who_str, "description": f"{item.get('description', '')} (low relevance: {reason})", }) continue # INFO type → Telegram Info Digest regardless of relevance if item_type == "info": who_str = ", ".join(item.get("who", [])) result["info_items"].append({ "summary": item["summary"], "who": who_str, "description": item.get("description", ""), }) continue # HIGH RELEVANCE items → route to calendar if item_type == "event": # Same path as appointments — create on calendar with dedup start_dt = item.get("start") if not start_dt: result["errors"].append(f"Event missing start: {item.get('summary', '?')}") continue end_dt = item.get("end") duration_minutes = item.get("duration_minutes", 60) if not end_dt: end_dt = start_dt + timedelta(minutes=duration_minutes) who_str = ", ".join(item.get("who", [])) summary = item["summary"] if who_str: summary = f"{summary} ({who_str})" if dry_run: result["events_created"].append({ "status": "DRY_RUN", "summary": summary, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "location": _resolve_location(item.get("location", "")), }) continue existing = event_exists(summary, start_dt) if existing: result["events_created"].append({ "status": "DUPLICATE_SKIPPED", "summary": existing.get("summary", summary), "start": existing["start"].get("dateTime", start_dt.isoformat()), "end": existing["end"].get("dateTime", end_dt.isoformat()), "existing_id": existing["id"], }) print(f" Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr) continue try: # Check if this is a recurring event is_recurring = item.get("is_recurring", False) recurrence = item.get("recurrence") if is_recurring else None if is_recurring and recurrence: event = create_recurring_event( summary=summary, start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=item.get("description", ""), location=_resolve_location(item.get("location", "")), ) else: event = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=item.get("description", ""), location=_resolve_location(item.get("location", "")), ) result["events_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "start": event["start"].get("dateTime", start_dt.isoformat()), "end": event["end"].get("dateTime", end_dt.isoformat()), "link": event.get("htmlLink", ""), "recurring": is_recurring, "location": _resolve_location(item.get("location", "")), "_claimed_day_of_week": item.get("claimed_day_of_week", ""), }) # Immediate per-event success notification if notify and not dry_run: hermes_notify(result["events_created"][-1], email_subject=email_subject) except Exception as e: result["errors"].append(f"Calendar create error: {e}") elif item_type == "reminder": # All-day event with [REMINDER] prefix due = item.get("due") who_str = ", ".join(item.get("who", [])) summary = f"[REMINDER] {item['summary']}" if who_str: summary += f" ({who_str})" if due: due_str = due.isoformat() if isinstance(due, date) else str(due) else: due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d") description = item.get("description", "") if due: description = f"Due: {due_str}\n" + description if dry_run: result["reminders_created"].append({ "status": "DRY_RUN", "summary": summary, "date": due_str, }) continue try: # Create as all-day event (date only, no time) event = create_event( summary=summary, start_dt=due_str, # date string for all-day event end_dt=due_str, description=description, ) result["reminders_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "date": due_str, }) except Exception as e: result["errors"].append(f"Reminder create error: {e}") elif item_type == "action_item": # All-day event with [ACTION] prefix due = item.get("due") who_str = ", ".join(item.get("who", [])) summary = f"[ACTION] {item['summary']}" if who_str: summary += f" ({who_str})" if due: due_str = due.isoformat() if isinstance(due, date) else str(due) else: due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d") description = item.get("description", "") if due: description = f"Due: {due_str}\n" + description if dry_run: result["actions_created"].append({ "status": "DRY_RUN", "summary": summary, "date": due_str, "description": description, }) continue try: event = create_event( summary=summary, start_dt=due_str, end_dt=due_str, description=description, ) result["actions_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "date": due_str, "description": description, }) except Exception as e: result["errors"].append(f"Action create error: {e}") return result # Note: format_info_summary is defined in hermes.py - use that version def process_emails(dry_run=False, notify=True, quiet=False): """Fetch unread emails, parse appointments via LLM, create calendar events. All fetched emails are marked as read after processing, regardless of whether they contained appointments. This prevents re-processing the same emails on subsequent heartbeat runs. Includes auth failure circuit breaker: after 3 consecutive Gmail auth failures, the IMAP pipeline is paused to prevent error spam. Args: dry_run: If True, don't create calendar events or mark emails read. notify: If True, push results to Telegram via Hermes. quiet: If True, suppress family group notifications (only DM alerts). """ # Check circuit breaker first — skip IMAP if paused if is_imap_paused(): return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": ["IMAP_PAUSED: Gmail auth circuit breaker is tripped. Run 'family-assistant reset-circuit' to re-enable."], } # Skip IMAP entirely if no credentials configured (webhook-only mode) if not GMAIL_APP_PASSWORD: return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": ["IMAP_SKIPPED: No Gmail credentials (webhook-only mode)"], } try: result = _process_emails_inner(dry_run=dry_run, notify=notify, quiet=quiet) # Success — reset circuit breaker _record_auth_success() return result except Exception as e: # Check if this is an auth failure error_str = str(e).lower() is_auth_failure = any( sig in error_str for sig in ["authentication", "invalid_grant", "credentials", "auth failed", "login failed", "[auth]", "[alert]", "suspen", "disabled", "access denied"] ) if is_auth_failure: just_tripped = _record_auth_failure(str(e)[:200]) # Whether we just tripped or are still counting — return silently return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [ "IMAP_PAUSED: circuit breaker tripped" if is_imap_paused() else "IMAP_AUTH_FAILED: Gmail auth failed (circuit breaker counting)" ], } # Non-auth failure or first-trip alert — send stack trace tb = traceback.format_exc() _alert_admin( "Pipeline Error", f"process_emails() crashed:\n\n```\n{tb[:600]}\n```", ) return { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [f"UNHANDLED: {e}"], } def _process_emails_inner(dry_run=False, notify=True, quiet=False): """Internal implementation — wrapped by process_emails() error handler.""" result = { "emails_scanned": 0, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [], } # Fetch and mark as read in one pass — no second connection needed mail_data = fetch_unread(mark_read=not dry_run) if "error" in mail_data: result["errors"].append(mail_data["error"]) return result emails = mail_data.get("emails", []) result["emails_scanned"] = len(emails) if not emails: return result for em in emails: subject = em.get("subject", "") body = em.get("body", "") from_addr = em.get("from", "") date_str = em.get("date", "") print(f" Processing: {subject[:60]}...", file=sys.stderr) # Check for payment alerts FIRST — before classification payment_alert = _check_payment_alert(subject, body, from_addr) if payment_alert: print(f" [PAYMENT ALERT] Detected: {payment_alert['alert_reason']}", file=sys.stderr) if not dry_run: _send_payment_alert_notification(payment_alert, em) result.setdefault("payment_alerts", []).append(payment_alert) # Step 1: Triage — classify email type email_class = _triage_email(subject, body, from_addr) print(f" Classified as: {email_class}", file=sys.stderr) if email_class == "newsletter": # Enrich body with URL content (e.g. Smore links) enriched_body = enrich_body_with_urls(body) # Parse as newsletter with multi-type extraction items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str) if not items: print(f" No items extracted from newsletter", file=sys.stderr) continue result["newsletter_items"].append({ "email_from": from_addr, "email_subject": subject, "items_count": len(items), "item_types": [i.get("type", "info") for i in items], }) # Route each item type to its destination routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject) result["events_created"].extend(routed["events_created"]) result["errors"].extend(routed["errors"]) # Collect reminders/actions/info for reporting if routed["reminders_created"]: result.setdefault("reminders_created", []).extend(routed["reminders_created"]) if routed["actions_created"]: result.setdefault("actions_created", []).extend(routed["actions_created"]) if routed.get("low_relevance_items"): result["low_relevance_items"].extend(routed["low_relevance_items"]) # Build info summary for Telegram push - include ALL actioned items for visibility info_items = routed.get("info_items", []) low_items = routed.get("low_relevance_items", []) events = result.get("events_created", []) reminders = result.get("reminders_created", []) actions = result.get("actions_created", []) if info_items or low_items or events or reminders or actions: info_block = format_info_summary( info_items, low_relevance_items=low_items, source_subject=subject, events_created=events, reminders_created=reminders, actions_created=actions, ) if result["info_summary"]: result["info_summary"] += "\n\n" result["info_summary"] += info_block # Ingest into Family Brain for RAG retrieval if not dry_run: try: ingest_newsletter( subject=subject, body=enriched_body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), items=items, ) print(f" Ingested newsletter into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) # Persist to Event Graph if not dry_run: try: _persist_to_event_graph(em, items) except Exception as e: print(f" [Event Graph] Error: {e}", file=sys.stderr) continue # Step 2: Parse as appointment (existing path) parsed_list = parse_email_with_llm(subject, body, from_addr, date_str) for parsed in parsed_list: serialized = _serialize_result(parsed) if parsed["type"] == "cancellation": # Act on cancellation: find and delete the matching event cancel_summary = parsed.get("summary", "") cancel_start = parsed.get("start") cancel_result = None if cancel_start: if dry_run: cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary, "start": cancel_start.isoformat()} print(f" [DRY RUN] Would cancel: {cancel_summary} at {cancel_start.isoformat()}", file=sys.stderr) else: cancel_result = find_and_cancel_event(cancel_summary, cancel_start) if cancel_result is None: cancel_result = { "status": "CANCELLATION_NO_MATCH", "summary": cancel_summary, "start": cancel_start.isoformat() if cancel_start else None, } print(f" No matching event found for cancellation: {cancel_summary}", file=sys.stderr) result["cancellations"].append({ "email_from": em["from"], "email_subject": em["subject"], "parsed": serialized, "action": cancel_result, }) continue # It's an appointment result["appointments_found"].append({ "email_from": em["from"], "email_subject": em["subject"], "parsed": serialized, }) start_dt = parsed.get("start") if not start_dt: result["errors"].append( f"Could not resolve datetime for: {parsed['summary']}" ) continue end_dt = parsed.get("end") if not end_dt: end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"]) who_str = ", ".join(parsed["who"]) if parsed["who"] else "" summary = parsed["summary"] if who_str: summary = f"{summary} ({who_str})" if dry_run: is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None result["events_created"].append({ "status": "DRY_RUN", "summary": summary, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "location": _resolve_location(parsed["location"]), "recurring": is_recurring, "recurrence": recurrence, }) else: # Duplicate check: skip if event already exists on calendar existing = event_exists(summary, start_dt) if existing: result["events_created"].append({ "status": "DUPLICATE_SKIPPED", "summary": existing.get("summary", summary), "start": existing["start"].get("dateTime", start_dt.isoformat()), "end": existing["end"].get("dateTime", end_dt.isoformat()), "existing_id": existing["id"], }) print(f" Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr) continue try: # Check if this is a recurring event is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None if is_recurring and recurrence: event = create_recurring_event( summary=summary, start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=parsed["description"], location=_resolve_location(parsed["location"]), ) else: event = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=parsed["description"], location=_resolve_location(parsed["location"]), ) result["events_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "start": event["start"].get("dateTime", start_dt.isoformat()), "end": event["end"].get("dateTime", end_dt.isoformat()), "link": event.get("htmlLink", ""), "recurring": is_recurring, }) except Exception as e: result["errors"].append(f"Calendar create error: {e}") # Ingest appointment email into Family Brain if not dry_run: try: ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=parsed_list, ) print(f" Ingested appointment email into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) # Persist to Event Graph try: _persist_to_event_graph(em, parsed_list, event_type_hint="calendar_event") except Exception as e: print(f" [Event Graph] Error: {e}", file=sys.stderr) # Conflict detection: check newly created events against the calendar created_events = [e for e in result["events_created"] if e.get("status") == "CREATED" and e.get("id")] if created_events and not dry_run: try: conflicts = detect_conflicts(events=[{"id": e["id"], "summary": e.get("summary", ""), "start": e.get("start", ""), "end": e.get("end", "")} for e in created_events]) result["conflicts"] = conflicts if conflicts: print(f" ⚠️ {len(conflicts)} scheduling conflict(s) detected", file=sys.stderr) for c in conflicts: print(f" {c['event1']['summary']} conflicts with {c['event2']['summary']} ({c['overlap_minutes']}min overlap)", file=sys.stderr) except Exception as e: result["conflicts"] = [] result["errors"].append(f"Conflict detection error: {e}") else: result["conflicts"] = [] # Push results to Telegram via Hermes if notify and not dry_run: try: hermes_stats = push_pipeline_results(result, quiet=quiet) result["hermes"] = hermes_stats except Exception as e: result["errors"].append(f"Hermes notification error: {e}") return result def process_webhook_email(email_data: dict, dry_run=False, notify=True): """Process a single email received via the Cloudflare webhook. This is the push-pipeline path: email arrives via the webhook address, Cloudflare Worker POSTs to hook.hoffdesk.com/webhook, which calls this. Reuses the same extraction and routing logic as process_emails(), but skips the IMAP fetch entirely. Args: email_data: Dict with keys: from, to, subject, date, body_text, body_html, has_attachments, attachments, dedup_key, message_id, source ('webhook') dry_run: If True, don't create calendar events. notify: If True, push results to Telegram via Hermes. Returns: Dict with processing results. """ try: return _process_webhook_email_inner(email_data, dry_run=dry_run, notify=notify) except Exception as e: tb = traceback.format_exc() _alert_admin( "Webhook Pipeline Error", f"process_webhook_email() crashed:\n\n```\n{tb[:600]}\n```", ) return { "emails_scanned": 1, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [f"UNHANDLED: {e}"], } def _process_webhook_email_inner(email_data: dict, dry_run=False, notify=True): """Internal implementation — wrapped by process_webhook_email() error handler.""" result = { "emails_scanned": 1, "appointments_found": [], "cancellations": [], "events_created": [], "newsletter_items": [], "info_summary": "", "low_relevance_items": [], "errors": [], } subject = email_data.get("subject", "") body = email_data.get("body_text", "") from_addr = email_data.get("from", "") date_str = email_data.get("date", "") print(f" [Webhook] Processing: {subject[:60]}...", file=sys.stderr) # Check for payment alerts FIRST — before classification payment_alert = _check_payment_alert(subject, body, from_addr) if payment_alert: print(f" [PAYMENT ALERT] Detected: {payment_alert['alert_reason']}", file=sys.stderr) _send_payment_alert_notification(payment_alert, email_data) result["payment_alert"] = payment_alert # Still process normally to extract additional details, but alert is sent # Mark this in result so caller knows it was urgent # Step 1: Triage email_class = _triage_email(subject, body, from_addr) print(f" Classified as: {email_class}", file=sys.stderr) if email_class == "newsletter": enriched_body = enrich_body_with_urls(body) items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str) if not items: print(f" No items extracted from newsletter", file=sys.stderr) return result result["newsletter_items"].append({ "email_from": from_addr, "email_subject": subject, "items_count": len(items), "item_types": [i.get("type", "info") for i in items], }) routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject) result["events_created"].extend(routed["events_created"]) result["errors"].extend(routed["errors"]) if routed.get("reminders_created"): result.setdefault("reminders_created", []).extend(routed["reminders_created"]) if routed.get("actions_created"): result.setdefault("actions_created", []).extend(routed["actions_created"]) if routed.get("low_relevance_items"): result["low_relevance_items"].extend(routed["low_relevance_items"]) info_items = routed.get("info_items", []) low_items = routed.get("low_relevance_items", []) events = result.get("events_created", []) reminders = result.get("reminders_created", []) actions = result.get("actions_created", []) if info_items or low_items or events or reminders or actions: result["info_summary"] = format_info_summary( info_items, low_relevance_items=low_items, source_subject=subject, events_created=events, reminders_created=reminders, actions_created=actions, ) if not dry_run: try: ingest_newsletter( subject=subject, body=enriched_body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), items=items, ) print(f" Ingested newsletter into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) # Persist to Event Graph try: _persist_to_event_graph(em, items) except Exception as e: print(f" [Event Graph] Error: {e}", file=sys.stderr) return _finalize_result(result, dry_run, notify) # Step 2: Parse as appointment parsed_list = parse_email_with_llm(subject, body, from_addr, date_str) for parsed in parsed_list: serialized = _serialize_result(parsed) if parsed["type"] == "cancellation": cancel_summary = parsed.get("summary", "") cancel_start = parsed.get("start") cancel_result = None if cancel_start: if dry_run: cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary, "start": cancel_start.isoformat()} else: cancel_result = find_and_cancel_event(cancel_summary, cancel_start) if cancel_result is None: cancel_result = {"status": "CANCELLATION_NO_MATCH", "summary": cancel_summary, "start": cancel_start.isoformat() if cancel_start else None} result["cancellations"].append({ "email_from": from_addr, "email_subject": subject, "parsed": serialized, "action": cancel_result, }) continue result["appointments_found"].append({ "email_from": from_addr, "email_subject": subject, "parsed": serialized, }) start_dt = parsed.get("start") if not start_dt: result["errors"].append(f"Could not resolve datetime for: {parsed['summary']}") continue end_dt = parsed.get("end") if not end_dt: end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"]) who_str = ", ".join(parsed["who"]) if parsed["who"] else "" summary = parsed["summary"] if who_str: summary = f"{summary} ({who_str})" if dry_run: is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None result["events_created"].append({ "status": "DRY_RUN", "summary": summary, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "location": _resolve_location(parsed["location"]), "recurring": is_recurring, "recurrence": recurrence, }) else: existing = event_exists(summary, start_dt) if existing: result["events_created"].append({ "status": "DUPLICATE_SKIPPED", "summary": existing.get("summary", summary), "start": existing["start"].get("dateTime", start_dt.isoformat()), "end": existing["end"].get("dateTime", end_dt.isoformat()), "existing_id": existing.get("id"), }) print(f" Skipping duplicate: {summary}", file=sys.stderr) continue try: is_recurring = parsed.get("is_recurring", False) recurrence = parsed.get("recurrence") if is_recurring else None if is_recurring and recurrence: event = create_recurring_event( summary=summary, start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=parsed["description"], location=_resolve_location(parsed["location"]), ) else: event = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=parsed["description"], location=_resolve_location(parsed["location"]), ) result["events_created"].append({ "status": "CREATED", "id": event["id"], "summary": event.get("summary", summary), "start": event["start"].get("dateTime", start_dt.isoformat()), "end": event["end"].get("dateTime", end_dt.isoformat()), "link": event.get("htmlLink", ""), "recurring": is_recurring, "location": _resolve_location(parsed.get("location", "")), "_claimed_day_of_week": parsed.get("claimed_day_of_week", ""), }) # Immediate per-event success notification if notify and not dry_run: hermes_notify(result["events_created"][-1], email_subject=subject) except Exception as e: result["errors"].append(f"Calendar create error: {e}") # Ingest into Family Brain if not dry_run: try: ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=parsed_list, ) print(f" Ingested appointment email into Family Brain", file=sys.stderr) except Exception as e: print(f" Family Brain ingestion error: {e}", file=sys.stderr) # Persist to Event Graph try: _persist_to_event_graph(em, parsed_list, event_type_hint="calendar_event") except Exception as e: print(f" [Event Graph] Error: {e}", file=sys.stderr) return _finalize_result(result, dry_run, notify) def _finalize_result(result, dry_run, notify): """Shared finalization: conflict detection + Hermes push.""" created_events = [e for e in result["events_created"] if e.get("status") == "CREATED" and e.get("id")] if created_events and not dry_run: try: conflicts = detect_conflicts( events=[{"id": e["id"], "summary": e.get("summary", ""), "start": e.get("start", ""), "end": e.get("end", "")} for e in created_events] ) result["conflicts"] = conflicts except Exception as e: result["conflicts"] = [] result["errors"].append(f"Conflict detection error: {e}") else: result["conflicts"] = [] if notify and not dry_run: try: hermes_stats = push_pipeline_results(result) result["hermes"] = hermes_stats except Exception as e: result["errors"].append(f"Hermes notification error: {e}") return result def backfill_brain(days: int = 30) -> dict: """Backfill the Family Brain with emails from the last N days. Re-processes already-seen emails purely for ChromaDB ingestion. Does NOT create calendar events or send notifications. Args: days: Number of days to look back Returns: Dict with ingestion stats. """ try: return _backfill_brain_inner(days=days) except Exception as e: tb = traceback.format_exc() _alert_admin( "Backfill Error", f"backfill_brain() crashed:\n\n```\n{tb[:600]}\n```", ) return { "emails_scanned": 0, "ingested_emails": 0, "ingested_newsletters": 0, "errors": [f"UNHANDLED: {e}"], "skipped": 0, } def _backfill_brain_inner(days: int = 30) -> dict: """Internal implementation — wrapped by backfill_brain() error handler.""" from icarus.core.email_fetcher import fetch_since from icarus.core.family_brain import ingest_email, ingest_newsletter from icarus.core.newsletter_parser import classify_email, parse_newsletter_with_llm from icarus.core.appointment_parser import parse_email_with_llm # IMAP SINCE format: DD-Mon-YYYY (e.g. 18-Mar-2026) since_dt = datetime.now(CHICAGO_TZ) - timedelta(days=days) imap_date = since_dt.strftime("%d-%b-%Y") print(f"Backfilling Family Brain: emails since {imap_date} ({days} days)", file=sys.stderr) result = { "emails_scanned": 0, "ingested_emails": 0, "ingested_newsletters": 0, "errors": [], "skipped": 0, } mail_data = fetch_since(imap_date) if "error" in mail_data: result["errors"].append(mail_data["error"]) return result emails = mail_data.get("emails", []) result["emails_scanned"] = len(emails) if not emails: print(" No emails found in date range", file=sys.stderr) return result for em in emails: subject = em.get("subject", "") body = em.get("body", "") from_addr = em.get("from", "") date_str = em.get("date", "") print(f" Processing: {subject[:60]}...", file=sys.stderr) # Classify email_class = _triage_email(subject, body, from_addr) print(f" Classified as: {email_class}", file=sys.stderr) try: if email_class == "newsletter": enriched_body = enrich_body_with_urls(body) items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str) ingest_newsletter( subject=subject, body=enriched_body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), items=items or [], ) result["ingested_newsletters"] += 1 print(f" ✅ Ingested newsletter", file=sys.stderr) else: # Appointment email parsed_list = parse_email_with_llm(subject, body, from_addr, date_str) # Serialize datetimes before passing to ingest safe_parsed = [] for p in (parsed_list or []): item = {} for k, v in p.items(): if isinstance(v, datetime): item[k] = v.isoformat() elif isinstance(v, date) and not isinstance(v, datetime): item[k] = v.isoformat() else: item[k] = v safe_parsed.append(item) ingest_email( subject=subject, body=body, from_addr=from_addr, email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(), parsed_items=safe_parsed, ) result["ingested_emails"] += 1 print(f" ✅ Ingested email", file=sys.stderr) except Exception as e: result["errors"].append(f"{subject[:40]}: {e}") print(f" ❌ Error: {e}", file=sys.stderr) print( f"\nBackfill complete: {result['ingested_emails']} emails + " f"{result['ingested_newsletters']} newsletters ingested", file=sys.stderr, ) return result