"""Pipeline orchestration: fetch emails → classify → parse → route to calendar/reminder/info."""
import json
import os
import sys
import traceback
from datetime import datetime, timedelta, date
from zoneinfo import ZoneInfo
from icarus.core.config import CHICAGO_TZ, GMAIL_APP_PASSWORD, AUTH_CIRCUIT_BREAKER_FILE
from icarus.core.email_fetcher import fetch_unread
from icarus.core.appointment_parser import parse_email_with_llm
from icarus.core.newsletter_parser import classify_email, parse_newsletter_with_llm
from icarus.core.family_brain import ingest_email, ingest_newsletter
from icarus.core.calendar_sync import (
event_exists,
create_event,
create_recurring_event,
find_and_cancel_event,
)
from icarus.core.db.event_graph import EventGraphWriter, classify_event_type
from icarus.core.conflict_engine import detect_conflicts
from icarus.core.hermes import push_pipeline_results, hermes_notify, _send_telegram, format_info_summary
---------------------------------------------------------------------------
Event Graph Persistence
---------------------------------------------------------------------------
def _persist_to_event_graph(email_data: dict, parsed_items: list, event_type_hint: str = None) -> None:
"""Persist parsed email items to the Event Graph.
Called alongside Family Brain ingestion. Non-fatal — errors are logged
but never block the pipeline.
Args:
email_data: Dict with email metadata (from, subject, date)
parsed_items: List of parsed items (appointments or newsletter items)
event_type_hint: Optional type override ("calendar_event", "coordination", "info")
"""
import hashlib
if not parsed_items:
return
for item in parsed_items:
try:
# Generate document_id from email subject + item summary
doc_id = f"email_{hashlib.sha256((email_data.get('subject', '') + '_' + item.get('summary', '')).encode()).hexdigest()[:16]}"
# Build extraction dict for classification
extraction = {
"key_details": {
"date": item.get("start", "").isoformat() if hasattr(item.get("start"), "isoformat") else str(item.get("start", "")),
"time": item.get("start", "").isoformat() if hasattr(item.get("start"), "isoformat") else str(item.get("start", "")),
"location": item.get("location", ""),
"who": item.get("who", []),
},
"title": item.get("summary", ""),
"summary": item.get("description", ""),
"category": event_type_hint or item.get("type", "info"),
"suggested_actions": [],
"assigned_to": item.get("who", []),
"confidence": item.get("confidence", 0.7),
}
# Classify
if event_type_hint:
event_type = event_type_hint
else:
event_type = classify_event_type(extraction)
# Skip info-only
if event_type == "info" and not item.get("start"):
continue
# Override: items with start times that go to calendar are calendar_events
if item.get("start") and event_type == "info":
event_type = "calendar_event"
extraction["event_type"] = event_type
EventGraphWriter().write(doc_id, extraction, extraction)
print(f" [Event Graph] Persisted: {item.get('summary', '?')} as {event_type}", file=sys.stderr)
except Exception as e:
print(f" [Event Graph] Persist error: {e}", file=sys.stderr)
---------------------------------------------------------------------------
Payment Alert Detection
---------------------------------------------------------------------------
_PAYMENT_ALERT_PHRASES = [
"payment failed", "card declined", "transaction failed",
"card expired", "update your payment", "payment method",
"subscription suspended", "account will be canceled",
"billing error", "payment declined", "renewal failed",
"expired on", "your card on file",
]
def _check_payment_alert(subject: str, body: str, from_addr: str) -> dict | None:
"""Check if email is a payment alert and return alert data if found.
Returns dict with alert details if payment-related phrases found,
None otherwise.
"""
text = f"{subject} {body}".lower()
for phrase in _PAYMENT_ALERT_PHRASES:
if phrase in text:
return {
"alert_reason": phrase,
"merchant": from_addr.split("<")[-1].rstrip(">").split("@")[-1].split(".")[-2] if "<" in from_addr else (from_addr.split("@")[-1].split(".")[-2] if "@" in from_addr else "Unknown"),
"subject": subject,
"service": None, # Could be extracted by LLM
"deadline": None, # Could be extracted by LLM
}
return None
def _send_payment_alert_notification(alert: dict, email_data: dict, target: str = None):
"""Send immediate Telegram notification for payment alerts.
Uses the existing _send_telegram path for reliability.
"""
# Import targets from hermes module (not config)
from icarus.core.hermes import DEFAULT_TELEGRAM_TARGET, DEV_TELEGRAM_TARGET
target = target or DEFAULT_TELEGRAM_TARGET
subject = email_data.get("subject", "Unknown")
from_addr = email_data.get("from", "Unknown")
alert_reason = alert.get("alert_reason", "payment issue")
merchant = alert.get("merchant", "Unknown Service")
msg = f"""🚨 **[PAYMENT ALERT]** 🚨
{merchant}
Reason: {alert_reason}
Subject: {subject}
From: {from_addr}
⚠️ Please update payment method immediately."""
try:
_send_telegram(msg, target=target)
print(f" [PAYMENT ALERT] Sent immediate notification: {alert_reason}", file=sys.stderr)
return True
except Exception as e:
print(f" [PAYMENT ALERT] Failed to send notification: {e}", file=sys.stderr)
return False
---------------------------------------------------------------------------
Auth Failure Circuit Breaker
---------------------------------------------------------------------------
Tracks consecutive auth failures to prevent error spam.
After MAX_AUTH_FAILURES consecutive failures, the IMAP pipeline is paused
and Matt is alerted once. Resets on first success or manual reset.
_AUTH_FAILURE_FILE = AUTH_CIRCUIT_BREAKER_FILE
MAX_AUTH_FAILURES = 3 # Pause after this many consecutive failures
def _read_circuit_breaker():
"""Read circuit breaker state from disk."""
try:
with open(_AUTH_FAILURE_FILE, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {"consecutive_failures": 0, "paused": False, "last_alert_ts": None}
def _write_circuit_breaker(state):
"""Write circuit breaker state to disk."""
os.makedirs(os.path.dirname(_AUTH_FAILURE_FILE), exist_ok=True)
with open(_AUTH_FAILURE_FILE, "w") as f:
json.dump(state, f, indent=2)
def _record_auth_failure(error_msg: str) -> bool:
"""Record an auth failure. Returns True if circuit breaker just tripped.
After MAX_AUTH_FAILURES consecutive failures, sets paused=True and
alerts Matt once. Subsequent calls return False (already paused).
"""
state = _read_circuit_breaker()
state["consecutive_failures"] = state.get("consecutive_failures", 0) + 1
just_tripped = False
if state["consecutive_failures"] >= MAX_AUTH_FAILURES and not state.get("paused"):
state["paused"] = True
just_tripped = True
_write_circuit_breaker(state)
if just_tripped:
_alert_admin(
"🔴 IMAP Circuit Breaker Tripped",
f"Gmail auth failed {state['consecutive_failures']} times in a row. "
f"IMAP pipeline is now PAUSED to stop error spam.\n\n"
f"To reset: python -m icarus.core reset-circuit\n"
f"Or: rm {_AUTH_FAILURE_FILE}",
)
return just_tripped
def _record_auth_success():
"""Record a successful auth — resets the circuit breaker."""
state = _read_circuit_breaker()
if state.get("consecutive_failures", 0) > 0 or state.get("paused"):
was_paused = state.get("paused", False)
state["consecutive_failures"] = 0
state["paused"] = False
_write_circuit_breaker(state)
if was_paused:
_alert_admin(
"🟢 IMAP Circuit Breaker Reset",
"Gmail auth succeeded. IMAP pipeline is back online.",
)
def is_imap_paused() -> bool:
"""Check if the IMAP pipeline is paused due to auth failures."""
return _read_circuit_breaker().get("paused", False)
def reset_circuit_breaker():
"""Manually reset the circuit breaker."""
state = {"consecutive_failures": 0, "paused": False, "last_alert_ts": None}
_write_circuit_breaker(state)
print("✅ Circuit breaker reset. IMAP pipeline re-enabled.")
def _resolve_location(location_str):
"""Resolve a location string through the cache to get the canonical name.
Prevents LLM hallucinations like 'at Golrusk' → 'PetSmart' by preferring
the known-locations cache. Returns the canonical cached name if found,
otherwise returns the original string unchanged.
"""
if not location_str or not location_str.strip():
return location_str
try:
from icarus.core.location_cache import resolve
result = resolve(location_str.strip(), use_home_bias=True)
if result and result.get("name"):
return result["name"]
except Exception:
pass
return location_str
from icarus.core.url_fetcher import enrich_body_with_urls
---------------------------------------------------------------------------
Global error handler — no silent deaths
---------------------------------------------------------------------------
_DEV_TARGET = os.environ.get("TELEGRAM_DEV_ID", "")
def alert_admin(title: str, detail: str):
"""Push an error alert to Matt's DM. Best-effort, never raises."""
if not _DEV_TARGET:
print(f" [ALERT] {title}: {detail}", file=sys.stderr)
return
# Strip markdown special characters from detail to prevent Telegram parse failures.
# Brackets, backticks, asterisks, underscores in error strings break Markdown mode.
import re
safe_detail = re.sub(r'[[]`]', '', detail[:800])
msg = f"⚠️ {title}*\n\n{safe_detail}"
try:
_send_telegram(msg, target=_DEV_TARGET)
except Exception:
print(f" [ALERT] Failed to send admin alert: {title}", file=sys.stderr)
def _serialize_result(parsed):
"""Convert datetime objects and other non-serializable types to strings."""
result = {}
for key, value in parsed.items():
if isinstance(value, datetime):
result[key] = value.isoformat()
elif isinstance(value, date) and not isinstance(value, datetime):
result[key] = value.isoformat()
elif isinstance(value, list):
result[key] = [
v.isoformat() if isinstance(v, (datetime, date)) else v
for v in value
]
elif isinstance(value, dict):
result[key] = {
k: v.isoformat() if isinstance(v, (datetime, date)) else v
for k, v in value.items()
}
else:
result[key] = value
return result
def _triage_email(subject, body, from_addr):
"""Classify an email as 'appointment' or 'newsletter'.
Uses the local LLM to make the classification. Falls back to 'appointment'
on errors (simpler parsing path, existing parser handles it).
"""
try:
return classify_email(subject, body, from_addr)
except Exception as e:
print(f" [Triage] Classification error: {e}, defaulting to appointment", file=sys.stderr)
return "appointment"
def _route_newsletter_items(items, dry_run=False, notify=True, email_subject=""):
"""Route newsletter items to their appropriate destinations.
Routing based on relevance:
- high relevance events/reminders/actions → Google Calendar
- low relevance items of any type → Telegram Info Digest (no calendar)
- info items → Telegram Info Digest (regardless of relevance)
- rejected items (per family.yaml rules) → skipped with note
Returns a dict with routed items and any errors.
"""
result = {
"events_created": [],
"reminders_created": [],
"actions_created": [],
"info_items": [], # high-relevance info + low-relevance items of any type
"low_relevance_items": [], # explicit low-relevance tracking
"rejected_items": [], # items filtered by rejection rules
"auto_filtered": [], # items shadow-filtered to low relevance
"errors": [],
}
# Shadow-filter rejected items — they stay in the list but get flagged
# as low_relevance with a note about which rule caught them.
# The user still sees what was filtered in the digest and can restore items.
from .rejection_engine import shadow_filter_items, should_reject
items = shadow_filter_items(items, scope="newsletter")
# Collect auto-filtered items for notification
for item in items:
if item.get("_auto_filtered"):
who_str = ", ".join(item.get("who", []))
result["auto_filtered"].append({
"type": item.get("original_type", "?"),
"summary": item.get("summary", ""),
"who": who_str,
"rule": item.get("reason", ""),
})
print(f" Auto-filtered: {item.get('summary', '?')} → low relevance", file=sys.stderr)
for item in items:
item_type = item.get("type", "info")
relevance = item.get("relevance", "high")
reason = item.get("reason", "No reason provided")
# LOW RELEVANCE → always goes to Telegram Info Digest, never calendar
if relevance == "low":
who_str = ", ".join(item.get("who", []))
result["low_relevance_items"].append({
"type": item_type,
"summary": item.get("summary", ""),
"who": who_str,
"description": item.get("description", ""),
"reason": reason,
})
# Also add to info_items for Telegram summary
result["info_items"].append({
"summary": f"[{item_type}] {item['summary']}",
"who": who_str,
"description": f"{item.get('description', '')} (low relevance: {reason})",
})
continue
# INFO type → Telegram Info Digest regardless of relevance
if item_type == "info":
who_str = ", ".join(item.get("who", []))
result["info_items"].append({
"summary": item["summary"],
"who": who_str,
"description": item.get("description", ""),
})
continue
# HIGH RELEVANCE items → route to calendar
if item_type == "event":
# Same path as appointments — create on calendar with dedup
start_dt = item.get("start")
if not start_dt:
result["errors"].append(f"Event missing start: {item.get('summary', '?')}")
continue
end_dt = item.get("end")
duration_minutes = item.get("duration_minutes", 60)
if not end_dt:
end_dt = start_dt + timedelta(minutes=duration_minutes)
who_str = ", ".join(item.get("who", []))
summary = item["summary"]
if who_str:
summary = f"{summary} ({who_str})"
if dry_run:
result["events_created"].append({
"status": "DRY_RUN",
"summary": summary,
"start": start_dt.isoformat(),
"end": end_dt.isoformat(),
"location": _resolve_location(item.get("location", "")),
})
continue
existing = event_exists(summary, start_dt)
if existing:
result["events_created"].append({
"status": "DUPLICATE_SKIPPED",
"summary": existing.get("summary", summary),
"start": existing["start"].get("dateTime", start_dt.isoformat()),
"end": existing["end"].get("dateTime", end_dt.isoformat()),
"existing_id": existing["id"],
})
print(f" Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr)
continue
try:
# Check if this is a recurring event
is_recurring = item.get("is_recurring", False)
recurrence = item.get("recurrence") if is_recurring else None
if is_recurring and recurrence:
event = create_recurring_event(
summary=summary,
start_dt=start_dt,
end_dt=end_dt,
recurrence=recurrence,
description=item.get("description", ""),
location=_resolve_location(item.get("location", "")),
)
else:
event = create_event(
summary=summary,
start_dt=start_dt,
end_dt=end_dt,
description=item.get("description", ""),
location=_resolve_location(item.get("location", "")),
)
result["events_created"].append({
"status": "CREATED",
"id": event["id"],
"summary": event.get("summary", summary),
"start": event["start"].get("dateTime", start_dt.isoformat()),
"end": event["end"].get("dateTime", end_dt.isoformat()),
"link": event.get("htmlLink", ""),
"recurring": is_recurring,
"location": _resolve_location(item.get("location", "")),
"_claimed_day_of_week": item.get("claimed_day_of_week", ""),
})
# Immediate per-event success notification
if notify and not dry_run:
hermes_notify(result["events_created"][-1], email_subject=email_subject)
except Exception as e:
result["errors"].append(f"Calendar create error: {e}")
elif item_type == "reminder":
# All-day event with [REMINDER] prefix
due = item.get("due")
who_str = ", ".join(item.get("who", []))
summary = f"[REMINDER] {item['summary']}"
if who_str:
summary += f" ({who_str})"
if due:
due_str = due.isoformat() if isinstance(due, date) else str(due)
else:
due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d")
description = item.get("description", "")
if due:
description = f"Due: {due_str}\n" + description
if dry_run:
result["reminders_created"].append({
"status": "DRY_RUN",
"summary": summary,
"date": due_str,
})
continue
try:
# Create as all-day event (date only, no time)
event = create_event(
summary=summary,
start_dt=due_str, # date string for all-day event
end_dt=due_str,
description=description,
)
result["reminders_created"].append({
"status": "CREATED",
"id": event["id"],
"summary": event.get("summary", summary),
"date": due_str,
})
except Exception as e:
result["errors"].append(f"Reminder create error: {e}")
elif item_type == "action_item":
# All-day event with [ACTION] prefix
due = item.get("due")
who_str = ", ".join(item.get("who", []))
summary = f"[ACTION] {item['summary']}"
if who_str:
summary += f" ({who_str})"
if due:
due_str = due.isoformat() if isinstance(due, date) else str(due)
else:
due_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d")
description = item.get("description", "")
if due:
description = f"Due: {due_str}\n" + description
if dry_run:
result["actions_created"].append({
"status": "DRY_RUN",
"summary": summary,
"date": due_str,
"description": description,
})
continue
try:
event = create_event(
summary=summary,
start_dt=due_str,
end_dt=due_str,
description=description,
)
result["actions_created"].append({
"status": "CREATED",
"id": event["id"],
"summary": event.get("summary", summary),
"date": due_str,
"description": description,
})
except Exception as e:
result["errors"].append(f"Action create error: {e}")
return result
Note: format_info_summary is defined in hermes.py - use that version
def process_emails(dry_run=False, notify=True, quiet=False):
"""Fetch unread emails, parse appointments via LLM, create calendar events.
All fetched emails are marked as read after processing, regardless of
whether they contained appointments. This prevents re-processing the same
emails on subsequent heartbeat runs.
Includes auth failure circuit breaker: after 3 consecutive Gmail auth
failures, the IMAP pipeline is paused to prevent error spam.
Args:
dry_run: If True, don't create calendar events or mark emails read.
notify: If True, push results to Telegram via Hermes.
quiet: If True, suppress family group notifications (only DM alerts).
"""
# Check circuit breaker first — skip IMAP if paused
if is_imap_paused():
return {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": ["IMAP_PAUSED: Gmail auth circuit breaker is tripped. Run 'family-assistant reset-circuit' to re-enable."],
}
# Skip IMAP entirely if no credentials configured (webhook-only mode)
if not GMAIL_APP_PASSWORD:
return {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": ["IMAP_SKIPPED: No Gmail credentials (webhook-only mode)"],
}
try:
result = _process_emails_inner(dry_run=dry_run, notify=notify, quiet=quiet)
# Success — reset circuit breaker
_record_auth_success()
return result
except Exception as e:
# Check if this is an auth failure
error_str = str(e).lower()
is_auth_failure = any(
sig in error_str
for sig in ["authentication", "invalid_grant", "credentials",
"auth failed", "login failed", "[auth]",
"[alert]", "suspen", "disabled", "access denied"]
)
if is_auth_failure:
just_tripped = _record_auth_failure(str(e)[:200])
# Whether we just tripped or are still counting — return silently
return {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [
"IMAP_PAUSED: circuit breaker tripped" if is_imap_paused()
else "IMAP_AUTH_FAILED: Gmail auth failed (circuit breaker counting)"
],
}
# Non-auth failure or first-trip alert — send stack trace
tb = traceback.format_exc()
_alert_admin(
"Pipeline Error",
f"process_emails() crashed:\n\n```\n{tb[:600]}\n```",
)
return {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [f"UNHANDLED: {e}"],
}
def _process_emails_inner(dry_run=False, notify=True, quiet=False):
"""Internal implementation — wrapped by process_emails() error handler."""
result = {
"emails_scanned": 0,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [],
}
# Fetch and mark as read in one pass — no second connection needed
mail_data = fetch_unread(mark_read=not dry_run)
if "error" in mail_data:
result["errors"].append(mail_data["error"])
return result
emails = mail_data.get("emails", [])
result["emails_scanned"] = len(emails)
if not emails:
return result
for em in emails:
subject = em.get("subject", "")
body = em.get("body", "")
from_addr = em.get("from", "")
date_str = em.get("date", "")
print(f" Processing: {subject[:60]}...", file=sys.stderr)
# Check for payment alerts FIRST — before classification
payment_alert = _check_payment_alert(subject, body, from_addr)
if payment_alert:
print(f" [PAYMENT ALERT] Detected: {payment_alert['alert_reason']}", file=sys.stderr)
if not dry_run:
_send_payment_alert_notification(payment_alert, em)
result.setdefault("payment_alerts", []).append(payment_alert)
# Step 1: Triage — classify email type
email_class = _triage_email(subject, body, from_addr)
print(f" Classified as: {email_class}", file=sys.stderr)
if email_class == "newsletter":
# Enrich body with URL content (e.g. Smore links)
enriched_body = enrich_body_with_urls(body)
# Parse as newsletter with multi-type extraction
items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
if not items:
print(f" No items extracted from newsletter", file=sys.stderr)
continue
result["newsletter_items"].append({
"email_from": from_addr,
"email_subject": subject,
"items_count": len(items),
"item_types": [i.get("type", "info") for i in items],
})
# Route each item type to its destination
routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject)
result["events_created"].extend(routed["events_created"])
result["errors"].extend(routed["errors"])
# Collect reminders/actions/info for reporting
if routed["reminders_created"]:
result.setdefault("reminders_created", []).extend(routed["reminders_created"])
if routed["actions_created"]:
result.setdefault("actions_created", []).extend(routed["actions_created"])
if routed.get("low_relevance_items"):
result["low_relevance_items"].extend(routed["low_relevance_items"])
# Build info summary for Telegram push - include ALL actioned items for visibility
info_items = routed.get("info_items", [])
low_items = routed.get("low_relevance_items", [])
events = result.get("events_created", [])
reminders = result.get("reminders_created", [])
actions = result.get("actions_created", [])
if info_items or low_items or events or reminders or actions:
info_block = format_info_summary(
info_items,
low_relevance_items=low_items,
source_subject=subject,
events_created=events,
reminders_created=reminders,
actions_created=actions,
)
if result["info_summary"]:
result["info_summary"] += "\n\n"
result["info_summary"] += info_block
# Ingest into Family Brain for RAG retrieval
if not dry_run:
try:
ingest_newsletter(
subject=subject,
body=enriched_body,
from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
items=items,
)
print(f" Ingested newsletter into Family Brain", file=sys.stderr)
except Exception as e:
print(f" Family Brain ingestion error: {e}", file=sys.stderr)
# Persist to Event Graph
if not dry_run:
try:
_persist_to_event_graph(em, items)
except Exception as e:
print(f" [Event Graph] Error: {e}", file=sys.stderr)
continue
# Step 2: Parse as appointment (existing path)
parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)
for parsed in parsed_list:
serialized = _serialize_result(parsed)
if parsed["type"] == "cancellation":
# Act on cancellation: find and delete the matching event
cancel_summary = parsed.get("summary", "")
cancel_start = parsed.get("start")
cancel_result = None
if cancel_start:
if dry_run:
cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary, "start": cancel_start.isoformat()}
print(f" [DRY RUN] Would cancel: {cancel_summary} at {cancel_start.isoformat()}", file=sys.stderr)
else:
cancel_result = find_and_cancel_event(cancel_summary, cancel_start)
if cancel_result is None:
cancel_result = {
"status": "CANCELLATION_NO_MATCH",
"summary": cancel_summary,
"start": cancel_start.isoformat() if cancel_start else None,
}
print(f" No matching event found for cancellation: {cancel_summary}", file=sys.stderr)
result["cancellations"].append({
"email_from": em["from"],
"email_subject": em["subject"],
"parsed": serialized,
"action": cancel_result,
})
continue
# It's an appointment
result["appointments_found"].append({
"email_from": em["from"],
"email_subject": em["subject"],
"parsed": serialized,
})
start_dt = parsed.get("start")
if not start_dt:
result["errors"].append(
f"Could not resolve datetime for: {parsed['summary']}"
)
continue
end_dt = parsed.get("end")
if not end_dt:
end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"])
who_str = ", ".join(parsed["who"]) if parsed["who"] else ""
summary = parsed["summary"]
if who_str:
summary = f"{summary} ({who_str})"
if dry_run:
is_recurring = parsed.get("is_recurring", False)
recurrence = parsed.get("recurrence") if is_recurring else None
result["events_created"].append({
"status": "DRY_RUN",
"summary": summary,
"start": start_dt.isoformat(),
"end": end_dt.isoformat(),
"location": _resolve_location(parsed["location"]),
"recurring": is_recurring,
"recurrence": recurrence,
})
else:
# Duplicate check: skip if event already exists on calendar
existing = event_exists(summary, start_dt)
if existing:
result["events_created"].append({
"status": "DUPLICATE_SKIPPED",
"summary": existing.get("summary", summary),
"start": existing["start"].get("dateTime", start_dt.isoformat()),
"end": existing["end"].get("dateTime", end_dt.isoformat()),
"existing_id": existing["id"],
})
print(f" Skipping duplicate: {summary} at {start_dt.isoformat()}", file=sys.stderr)
continue
try:
# Check if this is a recurring event
is_recurring = parsed.get("is_recurring", False)
recurrence = parsed.get("recurrence") if is_recurring else None
if is_recurring and recurrence:
event = create_recurring_event(
summary=summary,
start_dt=start_dt,
end_dt=end_dt,
recurrence=recurrence,
description=parsed["description"],
location=_resolve_location(parsed["location"]),
)
else:
event = create_event(
summary=summary,
start_dt=start_dt,
end_dt=end_dt,
description=parsed["description"],
location=_resolve_location(parsed["location"]),
)
result["events_created"].append({
"status": "CREATED",
"id": event["id"],
"summary": event.get("summary", summary),
"start": event["start"].get("dateTime", start_dt.isoformat()),
"end": event["end"].get("dateTime", end_dt.isoformat()),
"link": event.get("htmlLink", ""),
"recurring": is_recurring,
})
except Exception as e:
result["errors"].append(f"Calendar create error: {e}")
# Ingest appointment email into Family Brain
if not dry_run:
try:
ingest_email(
subject=subject,
body=body,
from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
parsed_items=parsed_list,
)
print(f" Ingested appointment email into Family Brain", file=sys.stderr)
except Exception as e:
print(f" Family Brain ingestion error: {e}", file=sys.stderr)
# Persist to Event Graph
try:
_persist_to_event_graph(em, parsed_list, event_type_hint="calendar_event")
except Exception as e:
print(f" [Event Graph] Error: {e}", file=sys.stderr)
# Conflict detection: check newly created events against the calendar
created_events = [e for e in result["events_created"] if e.get("status") == "CREATED" and e.get("id")]
if created_events and not dry_run:
try:
conflicts = detect_conflicts(events=[{"id": e["id"], "summary": e.get("summary", ""), "start": e.get("start", ""), "end": e.get("end", "")} for e in created_events])
result["conflicts"] = conflicts
if conflicts:
print(f" ⚠️ {len(conflicts)} scheduling conflict(s) detected", file=sys.stderr)
for c in conflicts:
print(f" {c['event1']['summary']} conflicts with {c['event2']['summary']} ({c['overlap_minutes']}min overlap)", file=sys.stderr)
except Exception as e:
result["conflicts"] = []
result["errors"].append(f"Conflict detection error: {e}")
else:
result["conflicts"] = []
# Push results to Telegram via Hermes
if notify and not dry_run:
try:
hermes_stats = push_pipeline_results(result, quiet=quiet)
result["hermes"] = hermes_stats
except Exception as e:
result["errors"].append(f"Hermes notification error: {e}")
return result
def process_webhook_email(email_data: dict, dry_run=False, notify=True):
"""Process a single email received via the Cloudflare webhook.
This is the push-pipeline path: email arrives via the webhook address,
Cloudflare Worker POSTs to hook.hoffdesk.com/webhook, which calls this.
Reuses the same extraction and routing logic as process_emails(),
but skips the IMAP fetch entirely.
Args:
email_data: Dict with keys: from, to, subject, date, body_text,
body_html, has_attachments, attachments, dedup_key,
message_id, source ('webhook')
dry_run: If True, don't create calendar events.
notify: If True, push results to Telegram via Hermes.
Returns:
Dict with processing results.
"""
try:
return _process_webhook_email_inner(email_data, dry_run=dry_run, notify=notify)
except Exception as e:
tb = traceback.format_exc()
_alert_admin(
"Webhook Pipeline Error",
f"process_webhook_email() crashed:\n\n```\n{tb[:600]}\n```",
)
return {
"emails_scanned": 1,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [f"UNHANDLED: {e}"],
}
def _process_webhook_email_inner(email_data: dict, dry_run=False, notify=True):
"""Internal implementation — wrapped by process_webhook_email() error handler."""
result = {
"emails_scanned": 1,
"appointments_found": [],
"cancellations": [],
"events_created": [],
"newsletter_items": [],
"info_summary": "",
"low_relevance_items": [],
"errors": [],
}
subject = email_data.get("subject", "")
body = email_data.get("body_text", "")
from_addr = email_data.get("from", "")
date_str = email_data.get("date", "")
print(f" [Webhook] Processing: {subject[:60]}...", file=sys.stderr)
# Check for payment alerts FIRST — before classification
payment_alert = _check_payment_alert(subject, body, from_addr)
if payment_alert:
print(f" [PAYMENT ALERT] Detected: {payment_alert['alert_reason']}", file=sys.stderr)
_send_payment_alert_notification(payment_alert, email_data)
result["payment_alert"] = payment_alert
# Still process normally to extract additional details, but alert is sent
# Mark this in result so caller knows it was urgent
# Step 1: Triage
email_class = _triage_email(subject, body, from_addr)
print(f" Classified as: {email_class}", file=sys.stderr)
if email_class == "newsletter":
enriched_body = enrich_body_with_urls(body)
items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
if not items:
print(f" No items extracted from newsletter", file=sys.stderr)
return result
result["newsletter_items"].append({
"email_from": from_addr,
"email_subject": subject,
"items_count": len(items),
"item_types": [i.get("type", "info") for i in items],
})
routed = _route_newsletter_items(items, dry_run=dry_run, notify=notify, email_subject=subject)
result["events_created"].extend(routed["events_created"])
result["errors"].extend(routed["errors"])
if routed.get("reminders_created"):
result.setdefault("reminders_created", []).extend(routed["reminders_created"])
if routed.get("actions_created"):
result.setdefault("actions_created", []).extend(routed["actions_created"])
if routed.get("low_relevance_items"):
result["low_relevance_items"].extend(routed["low_relevance_items"])
info_items = routed.get("info_items", [])
low_items = routed.get("low_relevance_items", [])
events = result.get("events_created", [])
reminders = result.get("reminders_created", [])
actions = result.get("actions_created", [])
if info_items or low_items or events or reminders or actions:
result["info_summary"] = format_info_summary(
info_items,
low_relevance_items=low_items,
source_subject=subject,
events_created=events,
reminders_created=reminders,
actions_created=actions,
)
if not dry_run:
try:
ingest_newsletter(
subject=subject, body=enriched_body, from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
items=items,
)
print(f" Ingested newsletter into Family Brain", file=sys.stderr)
except Exception as e:
print(f" Family Brain ingestion error: {e}", file=sys.stderr)
# Persist to Event Graph
try:
_persist_to_event_graph(em, items)
except Exception as e:
print(f" [Event Graph] Error: {e}", file=sys.stderr)
return _finalize_result(result, dry_run, notify)
# Step 2: Parse as appointment
parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)
for parsed in parsed_list:
serialized = _serialize_result(parsed)
if parsed["type"] == "cancellation":
cancel_summary = parsed.get("summary", "")
cancel_start = parsed.get("start")
cancel_result = None
if cancel_start:
if dry_run:
cancel_result = {"status": "DRY_RUN_CANCEL", "summary": cancel_summary,
"start": cancel_start.isoformat()}
else:
cancel_result = find_and_cancel_event(cancel_summary, cancel_start)
if cancel_result is None:
cancel_result = {"status": "CANCELLATION_NO_MATCH", "summary": cancel_summary,
"start": cancel_start.isoformat() if cancel_start else None}
result["cancellations"].append({
"email_from": from_addr, "email_subject": subject,
"parsed": serialized, "action": cancel_result,
})
continue
result["appointments_found"].append({
"email_from": from_addr, "email_subject": subject, "parsed": serialized,
})
start_dt = parsed.get("start")
if not start_dt:
result["errors"].append(f"Could not resolve datetime for: {parsed['summary']}")
continue
end_dt = parsed.get("end")
if not end_dt:
end_dt = start_dt + timedelta(minutes=parsed["duration_minutes"])
who_str = ", ".join(parsed["who"]) if parsed["who"] else ""
summary = parsed["summary"]
if who_str:
summary = f"{summary} ({who_str})"
if dry_run:
is_recurring = parsed.get("is_recurring", False)
recurrence = parsed.get("recurrence") if is_recurring else None
result["events_created"].append({
"status": "DRY_RUN", "summary": summary,
"start": start_dt.isoformat(), "end": end_dt.isoformat(),
"location": _resolve_location(parsed["location"]), "recurring": is_recurring,
"recurrence": recurrence,
})
else:
existing = event_exists(summary, start_dt)
if existing:
result["events_created"].append({
"status": "DUPLICATE_SKIPPED",
"summary": existing.get("summary", summary),
"start": existing["start"].get("dateTime", start_dt.isoformat()),
"end": existing["end"].get("dateTime", end_dt.isoformat()),
"existing_id": existing.get("id"),
})
print(f" Skipping duplicate: {summary}", file=sys.stderr)
continue
try:
is_recurring = parsed.get("is_recurring", False)
recurrence = parsed.get("recurrence") if is_recurring else None
if is_recurring and recurrence:
event = create_recurring_event(
summary=summary, start_dt=start_dt, end_dt=end_dt,
recurrence=recurrence, description=parsed["description"],
location=_resolve_location(parsed["location"]),
)
else:
event = create_event(
summary=summary, start_dt=start_dt, end_dt=end_dt,
description=parsed["description"], location=_resolve_location(parsed["location"]),
)
result["events_created"].append({
"status": "CREATED", "id": event["id"],
"summary": event.get("summary", summary),
"start": event["start"].get("dateTime", start_dt.isoformat()),
"end": event["end"].get("dateTime", end_dt.isoformat()),
"link": event.get("htmlLink", ""), "recurring": is_recurring,
"location": _resolve_location(parsed.get("location", "")),
"_claimed_day_of_week": parsed.get("claimed_day_of_week", ""),
})
# Immediate per-event success notification
if notify and not dry_run:
hermes_notify(result["events_created"][-1], email_subject=subject)
except Exception as e:
result["errors"].append(f"Calendar create error: {e}")
# Ingest into Family Brain
if not dry_run:
try:
ingest_email(
subject=subject, body=body, from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
parsed_items=parsed_list,
)
print(f" Ingested appointment email into Family Brain", file=sys.stderr)
except Exception as e:
print(f" Family Brain ingestion error: {e}", file=sys.stderr)
# Persist to Event Graph
try:
_persist_to_event_graph(em, parsed_list, event_type_hint="calendar_event")
except Exception as e:
print(f" [Event Graph] Error: {e}", file=sys.stderr)
return _finalize_result(result, dry_run, notify)
def _finalize_result(result, dry_run, notify):
"""Shared finalization: conflict detection + Hermes push."""
created_events = [e for e in result["events_created"]
if e.get("status") == "CREATED" and e.get("id")]
if created_events and not dry_run:
try:
conflicts = detect_conflicts(
events=[{"id": e["id"], "summary": e.get("summary", ""),
"start": e.get("start", ""), "end": e.get("end", "")}
for e in created_events]
)
result["conflicts"] = conflicts
except Exception as e:
result["conflicts"] = []
result["errors"].append(f"Conflict detection error: {e}")
else:
result["conflicts"] = []
if notify and not dry_run:
try:
hermes_stats = push_pipeline_results(result)
result["hermes"] = hermes_stats
except Exception as e:
result["errors"].append(f"Hermes notification error: {e}")
return result
def backfill_brain(days: int = 30) -> dict:
"""Backfill the Family Brain with emails from the last N days.
Re-processes already-seen emails purely for ChromaDB ingestion.
Does NOT create calendar events or send notifications.
Args:
days: Number of days to look back
Returns:
Dict with ingestion stats.
"""
try:
return _backfill_brain_inner(days=days)
except Exception as e:
tb = traceback.format_exc()
_alert_admin(
"Backfill Error",
f"backfill_brain() crashed:\n\n```\n{tb[:600]}\n```",
)
return {
"emails_scanned": 0,
"ingested_emails": 0,
"ingested_newsletters": 0,
"errors": [f"UNHANDLED: {e}"],
"skipped": 0,
}
def _backfill_brain_inner(days: int = 30) -> dict:
"""Internal implementation — wrapped by backfill_brain() error handler."""
from icarus.core.email_fetcher import fetch_since
from icarus.core.family_brain import ingest_email, ingest_newsletter
from icarus.core.newsletter_parser import classify_email, parse_newsletter_with_llm
from icarus.core.appointment_parser import parse_email_with_llm
# IMAP SINCE format: DD-Mon-YYYY (e.g. 18-Mar-2026)
since_dt = datetime.now(CHICAGO_TZ) - timedelta(days=days)
imap_date = since_dt.strftime("%d-%b-%Y")
print(f"Backfilling Family Brain: emails since {imap_date} ({days} days)", file=sys.stderr)
result = {
"emails_scanned": 0,
"ingested_emails": 0,
"ingested_newsletters": 0,
"errors": [],
"skipped": 0,
}
mail_data = fetch_since(imap_date)
if "error" in mail_data:
result["errors"].append(mail_data["error"])
return result
emails = mail_data.get("emails", [])
result["emails_scanned"] = len(emails)
if not emails:
print(" No emails found in date range", file=sys.stderr)
return result
for em in emails:
subject = em.get("subject", "")
body = em.get("body", "")
from_addr = em.get("from", "")
date_str = em.get("date", "")
print(f" Processing: {subject[:60]}...", file=sys.stderr)
# Classify
email_class = _triage_email(subject, body, from_addr)
print(f" Classified as: {email_class}", file=sys.stderr)
try:
if email_class == "newsletter":
enriched_body = enrich_body_with_urls(body)
items = parse_newsletter_with_llm(subject, enriched_body, from_addr, date_str)
ingest_newsletter(
subject=subject,
body=enriched_body,
from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
items=items or [],
)
result["ingested_newsletters"] += 1
print(f" ✅ Ingested newsletter", file=sys.stderr)
else:
# Appointment email
parsed_list = parse_email_with_llm(subject, body, from_addr, date_str)
# Serialize datetimes before passing to ingest
safe_parsed = []
for p in (parsed_list or []):
item = {}
for k, v in p.items():
if isinstance(v, datetime):
item[k] = v.isoformat()
elif isinstance(v, date) and not isinstance(v, datetime):
item[k] = v.isoformat()
else:
item[k] = v
safe_parsed.append(item)
ingest_email(
subject=subject,
body=body,
from_addr=from_addr,
email_date=date_str or datetime.now(CHICAGO_TZ).isoformat(),
parsed_items=safe_parsed,
)
result["ingested_emails"] += 1
print(f" ✅ Ingested email", file=sys.stderr)
except Exception as e:
result["errors"].append(f"{subject[:40]}: {e}")
print(f" ❌ Error: {e}", file=sys.stderr)
print(
f"\nBackfill complete: {result['ingested_emails']} emails + "
f"{result['ingested_newsletters']} newsletters ingested",
file=sys.stderr,
)
return result