"""Hermes — Telegram push notification agent. Formats and delivers pipeline results as Telegram messages via the OpenClaw CLI. No custom bot needed. Routing (v1.0 — Trust Over Silence): Family Group (TELEGRAM_CHAT_ID): - New appointments (visual confirmation that the system processed the email) - New reminders and action items - Conflict alerts with inline buttons - Newsletter digest (info summary) Matt's DM (TELEGRAM_DEV_ID): - Auto-filtered items (shadow filter guardrail — silent failure prevention) - Pipeline errors (operational, not household-facing) --quiet flag available for future use: suppresses family group notifications, sends only DM alerts. Not the default until household trusts the system. """ import json import os import re import subprocess import sys from datetime import datetime from zoneinfo import ZoneInfo from family_assistant.config import CHICAGO_TZ # Telegram targets — from env vars, with fallback defaults DEFAULT_TELEGRAM_TARGET = os.environ.get("TELEGRAM_CHAT_ID", "-1000000000000") # Override for dev notifications (e.g., auto-filter alerts) DEV_TELEGRAM_TARGET = os.environ.get("TELEGRAM_DEV_ID", "") # Bot token for direct Telegram API calls (faster than openclaw CLI) TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") import requests as _requests def _send_telegram(message, target=None, silent=False): """Send a message via Telegram Bot API directly. Falls back to openclaw CLI if bot token is not configured. Uses direct API for speed — the openclaw CLI was timing out. """ target = target or os.environ.get("TELEGRAM_TARGET", DEFAULT_TELEGRAM_TARGET) # Primary: direct Telegram Bot API if TELEGRAM_BOT_TOKEN: try: url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" payload = { "chat_id": target, "text": message, "parse_mode": "Markdown", } if silent: payload["disable_notification"] = True resp = _requests.post(url, json=payload, timeout=10) if resp.status_code == 200 and resp.json().get("ok"): return True else: print(f" [Hermes] Bot API error: {resp.status_code} {resp.text[:200]}", file=sys.stderr) except Exception as e: print(f" [Hermes] Bot API exception: {e}", file=sys.stderr) # Fallback: openclaw CLI cmd = [ "openclaw", "message", "send", "--channel", "telegram", "--target", target, "--message", message, ] if silent: cmd.append("--silent") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=15, ) if result.returncode != 0: print(f" [Hermes] CLI failed (exit {result.returncode}): {result.stderr.strip()}", file=sys.stderr) return False return True except subprocess.TimeoutExpired: print(" [Hermes] CLI timed out (15s)", file=sys.stderr) return False except Exception as e: print(f" [Hermes] CLI error: {e}", file=sys.stderr) return False def _extract_urls(text): """Extract HTTP(S) URLs from text.""" if not text: return [] urls = re.findall(r'https?://[^\s<>"\']+', text) return [u.rstrip('.,;:)>]') for u in urls] def _send_telegram_with_buttons(message, buttons, target=None): """Send a message with inline keyboard buttons via OpenClaw. Args: message: Text message to send buttons: List of rows, each row is a list of dicts [{"text": "Label", "callback_data": "data"}, ...] target: Telegram target (defaults to TELEGRAM_CHAT_ID) """ target = target or os.environ.get("TELEGRAM_TARGET", DEFAULT_TELEGRAM_TARGET) cmd = [ "openclaw", "message", "send", "--channel", "telegram", "--target", target, "--message", message, "--buttons", json.dumps(buttons), ] # Primary: direct Telegram Bot API with inline keyboard if TELEGRAM_BOT_TOKEN: try: url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" inline_keyboard = [[{"text": b.get("text", "?"), "callback_data": b.get("callback_data", "")} for b in row] for row in buttons] payload = { "chat_id": target, "text": message, "parse_mode": "Markdown", "reply_markup": {"inline_keyboard": inline_keyboard}, } resp = _requests.post(url, json=payload, timeout=10) if resp.status_code == 200 and resp.json().get("ok"): return True else: print(f" [Hermes] Button Bot API error: {resp.status_code} {resp.text[:200]}", file=sys.stderr) except Exception as e: print(f" [Hermes] Button Bot API exception: {e}", file=sys.stderr) # Fallback: openclaw CLI try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=15, ) if result.returncode != 0: print(f" [Hermes] Button CLI failed (exit {result.returncode}): {result.stderr.strip()}", file=sys.stderr) return False return True except subprocess.TimeoutExpired: print(" [Hermes] Button CLI timed out (15s)", file=sys.stderr) return False except Exception as e: print(f" [Hermes] Button CLI error: {e}", file=sys.stderr) return False # --------------------------------------------------------------------------- # Message Formatters # --------------------------------------------------------------------------- def _fmt_datetime(dt_str): """Format an ISO datetime string for human-readable display. Includes the year if it differs from the current year. """ if not dt_str: return "TBD" try: dt = datetime.fromisoformat(dt_str.replace("+00:00", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=CHICAGO_TZ) dt = dt.astimezone(CHICAGO_TZ) fmt = "%a %b %d, %-I:%M %p" if dt.year != datetime.now(CHICAGO_TZ).year: fmt = "%a %b %d %Y, %-I:%M %p" return dt.strftime(fmt) except (ValueError, TypeError): return dt_str def _fmt_date(d_str): """Format a date string for display. Includes the year if it differs from the current year. """ if not d_str: return "TBD" try: dt = datetime.fromisoformat(d_str) fmt = "%a %b %d" if dt.year != datetime.now(CHICAGO_TZ).year: fmt = "%a %b %d %Y" return dt.strftime(fmt) except (ValueError, TypeError): return d_str def _check_day_mismatch(event_dict): """Check if the event's day-of-week might not match the actual date. This catches cases where the LLM parsed a date from an email that said e.g. "Monday, April 21" but April 21 is actually a Tuesday. Returns a warning string or empty string. """ start_str = event_dict.get("start", {}) if isinstance(start_str, dict): start_str = start_str.get("dateTime", start_str.get("date", "")) if not start_str or not isinstance(start_str, str): return "" try: dt = datetime.fromisoformat(start_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=CHICAGO_TZ) dt = dt.astimezone(CHICAGO_TZ) actual_day = dt.strftime("%A") # Check if a different day name appears in the original event context # We can't re-parse the email here, but we can store the LLM's # original day-of-week hint in the event dict if provided claimed_day = event_dict.get("_claimed_day_of_week", "") if claimed_day and claimed_day.lower() != actual_day.lower(): return f"⚠️ Day mismatch: email said {claimed_day} but {dt.strftime('%b %d')} is actually {actual_day}" return "" except (ValueError, TypeError): return "" def hermes_notify(event, email_subject="", target=None): """Send a Hoffdesk success notification for a calendar event write. Called every time pipeline successfully creates an event on Radicale. Includes day-of-week mismatch warning if the email's claimed day doesn't match the actual calendar date. Args: event: Event dict with summary, start, end, location, etc. May include '_claimed_day_of_week' from LLM extraction. email_subject: Original email subject for context. target: Telegram target (defaults to family group). Returns: True if sent successfully, False otherwise. """ group_target = target or DEFAULT_TELEGRAM_TARGET summary = event.get("summary", "Unknown Event") start_raw = event.get("start", "") end_raw = event.get("end", "") location = event.get("location", "") # Format date and time date_str = _fmt_datetime(start_raw) if start_raw else "TBD" # Extract just the time portion for the Time line time_str = "" try: start_dt = datetime.fromisoformat(start_raw.replace("+00:00", "+00:00")) if isinstance(start_raw, str) else None if start_dt: if start_dt.tzinfo is None: start_dt = start_dt.replace(tzinfo=CHICAGO_TZ) start_dt = start_dt.astimezone(CHICAGO_TZ) time_str = start_dt.strftime("%-I:%M %p") # Add end time if available end_dt = None if isinstance(end_raw, str) and end_raw: end_dt = datetime.fromisoformat(end_raw.replace("+00:00", "+00:00")) if end_dt.tzinfo is None: end_dt = end_dt.replace(tzinfo=CHICAGO_TZ) end_dt = end_dt.astimezone(CHICAGO_TZ) if end_dt: time_str += f" – {end_dt.strftime('%-I:%M %p')}" except (ValueError, TypeError): time_str = date_str # Fallback to full datetime # Build message lines = [ "✅ **Hoffdesk Success**", f"Event: {summary}", f"Date: {date_str}", ] if time_str and time_str != date_str: lines.append(f"Time: {time_str}") # Location with travel enrichment if location: try: from family_assistant.location_cache import resolve, format_travel_info loc_result = resolve(location, use_home_bias=True) if loc_result: travel = format_travel_info(loc_result) lines.append(f"📍 {travel}" if travel else f"📍 {location}") else: lines.append(f"📍 {location}") except Exception: lines.append(f"📍 {location}") # Day-of-week mismatch warning day_warning = _check_day_mismatch(event) if day_warning: lines.append(f"\n{day_warning}") msg = "\n".join(lines) return _send_telegram(msg, target=group_target) """Format a single new event for Telegram notification. Enriches location strings with travel time from home when possible. """ summary = event.get("summary", "Unknown Event") start = _fmt_datetime(event.get("start", "")) location = event.get("location", "") msg = f"📅 **{summary}**\n{start}" if location: # Enrich location with travel time from cache try: from family_assistant.location_cache import resolve, format_travel_info loc_result = resolve(location, use_home_bias=True) if loc_result: travel_info = format_travel_info(loc_result) if travel_info: # Use the resolved name + travel time if available msg += f"\n📍 {travel_info}" else: msg += f"\n📍 {location}" else: msg += f"\n📍 {location}" except Exception: # If location resolution fails, just use the raw location string msg += f"\n📍 {location}" return msg def format_reminder_notification(reminder): """Format a single reminder for Telegram notification.""" summary = reminder.get("summary", "Unknown Reminder") date = _fmt_date(reminder.get("date", "")) msg = f"📋 **{summary}**\n{date}" return msg def format_action_notification(action): """Format a single action item for Telegram notification.""" summary = action.get("summary", "Unknown Action") date = _fmt_date(action.get("date", "")) msg = f"✅ **{summary}**" if date and date != "TBD": msg += f"\nDue: {date}" return msg def format_conflict_alert(conflict, resolution=None): """Format a scheduling conflict for Telegram notification with inline buttons.""" e1 = conflict.get("event1", {}) e2 = conflict.get("event2", {}) overlap = conflict.get("overlap_minutes", 0) msg = "⚠️ **Scheduling Conflict**\n\n" msg += f"📅 {e1.get('summary', '?')}\n" msg += f"📅 {e2.get('summary', '?')}\n" msg += f"⏱️ {overlap}min overlap" return msg def format_error_alert(error_msg): """Format a pipeline error for Telegram notification.""" return f"❌ **Pipeline Error**\n{error_msg}" def format_info_summary(info_items, low_relevance_items=None, source_subject=""): """Format info items and low-relevance items into a digest message.""" lines = [] if source_subject: lines.append(f"📬 **{source_subject}**\n") if info_items: lines.append("**Info:**") for item in info_items: who = item.get("who", "") desc = item.get("description", "") summary = item.get("summary", "") line = f" • {summary}" if who: line += f" ({who})" if desc and desc != summary: line += f" — {desc}" lines.append(line) if low_relevance_items: lines.append("\n**Low Relevance:**") for item in low_relevance_items: who = item.get("who", "") summary = item.get("summary", "") reason = item.get("reason", "") line = f" • {summary}" if who: line += f" ({who})" if reason: line += f" — {reason}" lines.append(line) return "\n".join(lines) # --------------------------------------------------------------------------- # Push Pipeline Results — Main Entry Point # --------------------------------------------------------------------------- def push_pipeline_results(result, quiet=False, target=None): """Take a process_emails() result dict and push all notifications. Routing (v1.0 — Trust Over Silence): Family Group (TELEGRAM_CHAT_ID): - New appointments, reminders, action items - Conflict alerts with inline buttons - Newsletter digest Matt's DM (TELEGRAM_DEV_ID): - Auto-filtered items (shadow filter guardrail) - Pipeline errors --quiet: suppress family group notifications, only send DM alerts. Not the default until household trusts the system. Returns a dict with counts of messages sent/failed. """ stats = {"sent": 0, "failed": 0} group_target = target or DEFAULT_TELEGRAM_TARGET dm_target = DEV_TELEGRAM_TARGET if not quiet: # ── Family Group Notifications ── # 1. New events — SKIPPED here; hermes_notify() handles each event # individually with the "✅ Hoffdesk Success" format + day-of-week # mismatch detection. No duplicate batch message. # 2. New reminders — batch into one message created_reminders = [r for r in result.get("reminders_created", []) if r.get("status") == "CREATED"] if created_reminders: lines = ["📋 **New Reminders**"] for r in created_reminders: summary = r.get("summary", "?") date = _fmt_date(r.get("date", "")) lines.append(f" • {summary} — {date}") msg = "\n".join(lines) if _send_telegram(msg, target=group_target): stats["sent"] += 1 else: stats["failed"] += 1 # 3. New action items — enrich with Clicker if URLs found created_actions = [a for a in result.get("actions_created", []) if a.get("status") == "CREATED"] if created_actions: lines = ["✅ **New Action Items**"] for a in created_actions: summary = a.get("summary", "?") date = _fmt_date(a.get("date", "")) description = a.get("description", "") or "" # Check if the action item has a URL — if so, run the Clicker urls = _extract_urls(description) if urls: from family_assistant.clicker import click_url, build_slot_buttons, hash_url from family_assistant.slot_handler import cache_slots for url in urls[:1]: # First URL only click_result = click_url(url, context_summary=summary) if click_result["status"] == "SLOTS_FOUND": # Cache slots so callback handler can look them up on button tap url_hash = cache_slots(url, click_result["slots"], hash_url(url)) # Send enriched message with slot buttons slot_msg = click_result["message"] slot_buttons = build_slot_buttons(click_result["slots"], url_hash) if _send_telegram_with_buttons(slot_msg, slot_buttons, target=group_target): stats["sent"] += 1 else: stats["failed"] += 1 continue # Skip plain-text line for this action elif click_result["status"] == "FETCH_FAILED": lines.append(f" • {summary} — {date}") lines.append(f" ⚠️ Signup page couldn't be loaded") else: # NO_SLOTS or other — just show the plain action lines.append(f" • {summary} — {date}") else: lines.append(f" • {summary} — {date}") # Send any remaining plain-text action items if len(lines) > 1: msg = "\n".join(lines) if _send_telegram(msg, target=group_target): stats["sent"] += 1 else: stats["failed"] += 1 # 4. Conflict alerts — one message per conflict (urgent) conflicts = result.get("conflicts", []) for conflict in conflicts: msg = format_conflict_alert(conflict) if _send_telegram(msg, target=group_target): stats["sent"] += 1 else: stats["failed"] += 1 # 5. Newsletter digest — one message info_summary = result.get("info_summary", "") if info_summary: if _send_telegram(info_summary, target=group_target): stats["sent"] += 1 else: stats["failed"] += 1 # ── Matt's DM Notifications (always sent, even in quiet mode) ── # 6. Auto-filtered items — silent failure guardrail auto_filtered = result.get("auto_filtered", []) if auto_filtered: lines = ["🔇 **Auto-Filtered**"] for af in auto_filtered: lines.append(f" • {af['summary']} — {af['rule']}") lines.append("Reply 'restore ' to add it back.") msg = "\n".join(lines) if dm_target and _send_telegram(msg, target=dm_target): stats["sent"] += 1 elif not dm_target: # No DM target configured — fall back to group in quiet mode print(" [Hermes] No TELEGRAM_DEV_ID set, auto-filter alert not sent", file=sys.stderr) # 7. Errors — always to DM errors = result.get("errors", []) if errors: lines = ["❌ **Pipeline Errors**"] for e in errors: lines.append(f" • {e}") msg = "\n".join(lines) if dm_target: if _send_telegram(msg, target=dm_target): stats["sent"] += 1 else: stats["failed"] += 1 else: # Fall back to group if no DM target if _send_telegram(msg, target=group_target): stats["sent"] += 1 else: stats["failed"] += 1 return stats def push_maintenance_alert(target=None, dedup=True): """Check maintenance items and post alerts for due/overdue items. Called by the heartbeat and Daily Brief. Sends to family group with inline Done buttons. Args: target: Override target chat ID (defaults to family group). dedup: If True, only alert for items not previously notified (prevents spam on 30-min heartbeat). Daily Brief should set dedup=False to always show current state. Returns: Dict with sent/failed stats. """ from family_assistant.maintenance_sentinel import ( check_maintenance, format_maintenance_brief, build_maintenance_buttons, filter_new_alerts, ) group_target = target or os.environ.get("TELEGRAM_CHAT_ID", "") stats = {"sent": 0, "failed": 0} result = check_maintenance() # Dedup: only alert for items we haven't notified about yet if dedup: result = filter_new_alerts(result) brief = format_maintenance_brief(result) buttons = build_maintenance_buttons(result) if not brief: return stats # Nothing due (or already alerted), nothing to send # Send with inline Done buttons if items are due now if buttons: if _send_telegram_with_buttons(brief, buttons, target=group_target): stats["sent"] += 1 else: stats["failed"] += 1 else: # Only upcoming items (no Done buttons needed) if _send_telegram(brief, target=group_target): stats["sent"] += 1 else: stats["failed"] += 1 return stats