πŸ“„ hermes.py 23,410 bytes Apr 27, 2026 πŸ“‹ Raw

"""Hermes β€” Telegram push notification agent.

Formats and delivers pipeline results as Telegram messages via the
OpenClaw CLI. No custom bot needed.

Routing (v1.0 β€” Trust Over Silence):
Family Group (TELEGRAM_CHAT_ID):
- New appointments (visual confirmation that the system processed the email)
- New reminders and action items
- Conflict alerts with inline buttons
- Newsletter digest (info summary)

Matt's DM (TELEGRAM_DEV_ID):
- Auto-filtered items (shadow filter guardrail β€” silent failure prevention)
- Pipeline errors (operational, not household-facing)

--quiet flag available for future use: suppresses family group notifications,
sends only DM alerts. Not the default until household trusts the system.
"""

import json
import os
import re
import subprocess
import sys
from datetime import datetime
from zoneinfo import ZoneInfo

from family_assistant.config import CHICAGO_TZ

Telegram targets β€” from env vars, with fallback defaults

DEFAULT_TELEGRAM_TARGET = os.environ.get("TELEGRAM_CHAT_ID", "-1000000000000")

Override for dev notifications (e.g., auto-filter alerts)

DEV_TELEGRAM_TARGET = os.environ.get("TELEGRAM_DEV_ID", "")

Bot token for direct Telegram API calls (faster than openclaw CLI)

TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")

import requests as _requests

def _send_telegram(message, target=None, silent=False):
"""Send a message via Telegram Bot API directly.

Falls back to openclaw CLI if bot token is not configured.
Uses direct API for speed β€” the openclaw CLI was timing out.
"""
target = target or os.environ.get("TELEGRAM_TARGET", DEFAULT_TELEGRAM_TARGET)

# Primary: direct Telegram Bot API
if TELEGRAM_BOT_TOKEN:
    try:
        url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
        payload = {
            "chat_id": target,
            "text": message,
            "parse_mode": "Markdown",
        }
        if silent:
            payload["disable_notification"] = True
        resp = _requests.post(url, json=payload, timeout=10)
        if resp.status_code == 200 and resp.json().get("ok"):
            return True
        else:
            print(f"  [Hermes] Bot API error: {resp.status_code} {resp.text[:200]}", file=sys.stderr)
    except Exception as e:
        print(f"  [Hermes] Bot API exception: {e}", file=sys.stderr)

# Fallback: openclaw CLI
cmd = [
    "openclaw", "message", "send",
    "--channel", "telegram",
    "--target", target,
    "--message", message,
]
if silent:
    cmd.append("--silent")
try:
    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        timeout=15,
    )
    if result.returncode != 0:
        print(f"  [Hermes] CLI failed (exit {result.returncode}): {result.stderr.strip()}", file=sys.stderr)
        return False
    return True
except subprocess.TimeoutExpired:
    print("  [Hermes] CLI timed out (15s)", file=sys.stderr)
    return False
except Exception as e:
    print(f"  [Hermes] CLI error: {e}", file=sys.stderr)
    return False

def _extract_urls(text):
"""Extract HTTP(S) URLs from text."""
if not text:
return []
urls = re.findall(r'https?://[^\s<>"\']+', text)
return [u.rstrip('.,;:)>]') for u in urls]

def _send_telegram_with_buttons(message, buttons, target=None):
"""Send a message with inline keyboard buttons via OpenClaw.

Args:
    message: Text message to send
    buttons: List of rows, each row is a list of dicts
              [{"text": "Label", "callback_data": "data"}, ...]
    target: Telegram target (defaults to TELEGRAM_CHAT_ID)
"""
target = target or os.environ.get("TELEGRAM_TARGET", DEFAULT_TELEGRAM_TARGET)
cmd = [
    "openclaw", "message", "send",
    "--channel", "telegram",
    "--target", target,
    "--message", message,
    "--buttons", json.dumps(buttons),
]

# Primary: direct Telegram Bot API with inline keyboard
if TELEGRAM_BOT_TOKEN:
    try:
        url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
        inline_keyboard = [[{"text": b.get("text", "?"), "callback_data": b.get("callback_data", "")} for b in row] for row in buttons]
        payload = {
            "chat_id": target,
            "text": message,
            "parse_mode": "Markdown",
            "reply_markup": {"inline_keyboard": inline_keyboard},
        }
        resp = _requests.post(url, json=payload, timeout=10)
        if resp.status_code == 200 and resp.json().get("ok"):
            return True
        else:
            print(f"  [Hermes] Button Bot API error: {resp.status_code} {resp.text[:200]}", file=sys.stderr)
    except Exception as e:
        print(f"  [Hermes] Button Bot API exception: {e}", file=sys.stderr)

# Fallback: openclaw CLI
try:
    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        timeout=15,
    )
    if result.returncode != 0:
        print(f"  [Hermes] Button CLI failed (exit {result.returncode}): {result.stderr.strip()}", file=sys.stderr)
        return False
    return True
except subprocess.TimeoutExpired:
    print("  [Hermes] Button CLI timed out (15s)", file=sys.stderr)
    return False
except Exception as e:
    print(f"  [Hermes] Button CLI error: {e}", file=sys.stderr)
    return False

---------------------------------------------------------------------------

Message Formatters

---------------------------------------------------------------------------

def _fmt_datetime(dt_str):
"""Format an ISO datetime string for human-readable display.

Includes the year if it differs from the current year.
"""
if not dt_str:
    return "TBD"
try:
    dt = datetime.fromisoformat(dt_str.replace("+00:00", "+00:00"))
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=CHICAGO_TZ)
    dt = dt.astimezone(CHICAGO_TZ)
    fmt = "%a %b %d, %-I:%M %p"
    if dt.year != datetime.now(CHICAGO_TZ).year:
        fmt = "%a %b %d %Y, %-I:%M %p"
    return dt.strftime(fmt)
except (ValueError, TypeError):
    return dt_str

def _fmt_date(d_str):
"""Format a date string for display.

Includes the year if it differs from the current year.
"""
if not d_str:
    return "TBD"
try:
    dt = datetime.fromisoformat(d_str)
    fmt = "%a %b %d"
    if dt.year != datetime.now(CHICAGO_TZ).year:
        fmt = "%a %b %d %Y"
    return dt.strftime(fmt)
except (ValueError, TypeError):
    return d_str

def _check_day_mismatch(event_dict):
"""Check if the event's day-of-week might not match the actual date.

This catches cases where the LLM parsed a date from an email that
said e.g. "Monday, April 21" but April 21 is actually a Tuesday.
Returns a warning string or empty string.
"""
start_str = event_dict.get("start", {})
if isinstance(start_str, dict):
    start_str = start_str.get("dateTime", start_str.get("date", ""))
if not start_str or not isinstance(start_str, str):
    return ""
try:
    dt = datetime.fromisoformat(start_str)
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=CHICAGO_TZ)
    dt = dt.astimezone(CHICAGO_TZ)
    actual_day = dt.strftime("%A")
    # Check if a different day name appears in the original event context
    # We can't re-parse the email here, but we can store the LLM's
    # original day-of-week hint in the event dict if provided
    claimed_day = event_dict.get("_claimed_day_of_week", "")
    if claimed_day and claimed_day.lower() != actual_day.lower():
        return f"⚠️ Day mismatch: email said {claimed_day} but {dt.strftime('%b %d')} is actually {actual_day}"
    return ""
except (ValueError, TypeError):
    return ""

def hermes_notify(event, email_subject="", target=None):
"""Send a Hoffdesk success notification for a calendar event write.

Called every time pipeline successfully creates an event on Radicale.
Includes day-of-week mismatch warning if the email's claimed day
doesn't match the actual calendar date.

Args:
    event: Event dict with summary, start, end, location, etc.
           May include '_claimed_day_of_week' from LLM extraction.
    email_subject: Original email subject for context.
    target: Telegram target (defaults to family group).

Returns:
    True if sent successfully, False otherwise.
"""
group_target = target or DEFAULT_TELEGRAM_TARGET

summary = event.get("summary", "Unknown Event")
start_raw = event.get("start", "")
end_raw = event.get("end", "")
location = event.get("location", "")

# Format date and time
date_str = _fmt_datetime(start_raw) if start_raw else "TBD"

# Extract just the time portion for the Time line
time_str = ""
try:
    start_dt = datetime.fromisoformat(start_raw.replace("+00:00", "+00:00")) if isinstance(start_raw, str) else None
    if start_dt:
        if start_dt.tzinfo is None:
            start_dt = start_dt.replace(tzinfo=CHICAGO_TZ)
        start_dt = start_dt.astimezone(CHICAGO_TZ)
        time_str = start_dt.strftime("%-I:%M %p")

        # Add end time if available
        end_dt = None
        if isinstance(end_raw, str) and end_raw:
            end_dt = datetime.fromisoformat(end_raw.replace("+00:00", "+00:00"))
            if end_dt.tzinfo is None:
                end_dt = end_dt.replace(tzinfo=CHICAGO_TZ)
            end_dt = end_dt.astimezone(CHICAGO_TZ)
        if end_dt:
            time_str += f" – {end_dt.strftime('%-I:%M %p')}"
except (ValueError, TypeError):
    time_str = date_str  # Fallback to full datetime

# Build message
lines = [
    "βœ… **Hoffdesk Success**",
    f"Event: {summary}",
    f"Date: {date_str}",
]
if time_str and time_str != date_str:
    lines.append(f"Time: {time_str}")

# Location with travel enrichment
if location:
    try:
        from family_assistant.location_cache import resolve, format_travel_info
        loc_result = resolve(location, use_home_bias=True)
        if loc_result:
            travel = format_travel_info(loc_result)
            lines.append(f"πŸ“ {travel}" if travel else f"πŸ“ {location}")
        else:
            lines.append(f"πŸ“ {location}")
    except Exception:
        lines.append(f"πŸ“ {location}")

# Day-of-week mismatch warning
day_warning = _check_day_mismatch(event)
if day_warning:
    lines.append(f"\n{day_warning}")

msg = "\n".join(lines)
return _send_telegram(msg, target=group_target)
"""Format a single new event for Telegram notification.

Enriches location strings with travel time from home when possible.
"""
summary = event.get("summary", "Unknown Event")
start = _fmt_datetime(event.get("start", ""))
location = event.get("location", "")
msg = f"πŸ“… **{summary}**\n{start}"
if location:
    # Enrich location with travel time from cache
    try:
        from family_assistant.location_cache import resolve, format_travel_info
        loc_result = resolve(location, use_home_bias=True)
        if loc_result:
            travel_info = format_travel_info(loc_result)
            if travel_info:
                # Use the resolved name + travel time if available
                msg += f"\nπŸ“ {travel_info}"
            else:
                msg += f"\nπŸ“ {location}"
        else:
            msg += f"\nπŸ“ {location}"
    except Exception:
        # If location resolution fails, just use the raw location string
        msg += f"\nπŸ“ {location}"
return msg

def format_reminder_notification(reminder):
"""Format a single reminder for Telegram notification."""
summary = reminder.get("summary", "Unknown Reminder")
date = _fmt_date(reminder.get("date", ""))
msg = f"πŸ“‹ {summary}\n{date}"
return msg

def format_action_notification(action):
"""Format a single action item for Telegram notification."""
summary = action.get("summary", "Unknown Action")
date = _fmt_date(action.get("date", ""))
msg = f"βœ… {summary}"
if date and date != "TBD":
msg += f"\nDue: {date}"
return msg

def format_conflict_alert(conflict, resolution=None):
"""Format a scheduling conflict for Telegram notification with inline buttons."""
e1 = conflict.get("event1", {})
e2 = conflict.get("event2", {})
overlap = conflict.get("overlap_minutes", 0)
msg = "⚠️ Scheduling Conflict\n\n"
msg += f"πŸ“… {e1.get('summary', '?')}\n"
msg += f"πŸ“… {e2.get('summary', '?')}\n"
msg += f"⏱️ {overlap}min overlap"
return msg

def format_error_alert(error_msg):
"""Format a pipeline error for Telegram notification."""
return f"❌ Pipeline Error\n{error_msg}"

def format_payment_alert(alert_data):
"""Format a payment alert for immediate Telegram notification.

Args:
    alert_data: Dict with merchant, alert_type, amount, deadline,
                 action_needed, summary keys.

Returns:
    Formatted string with [PAYMENT ALERT] prefix.
"""
merchant = alert_data.get("merchant", "Unknown Service")
alert_type = alert_data.get("alert_type", "payment_failed")
amount = alert_data.get("amount", "")
deadline = alert_data.get("deadline", "")
action_needed = alert_data.get("action_needed", "")
summary = alert_data.get("summary", "")

# Type emoji mapping
type_emoji = {
    "payment_failed": "πŸ’³",
    "card_expired": "πŸͺͺ",
    "subscription_suspended": "⏸️",
    "payment_reminder": "⏰",
}
emoji = type_emoji.get(alert_type, "⚠️")

# Format deadline for display
deadline_str = ""
if deadline:
    try:
        from datetime import datetime as _dt
        dl_dt = _dt.fromisoformat(deadline)
        deadline_str = dl_dt.strftime("%a %b %d")
    except (ValueError, TypeError):
        deadline_str = str(deadline)

lines = [f"🚨 [PAYMENT ALERT] {emoji} {merchant}"]
if amount:
    # Escape $ for Telegram Markdown
    safe_amount = amount.replace('$', '\\$')
    lines.append(f"Amount: {safe_amount}")
if deadline_str:
    lines.append(f"Deadline: {deadline_str}")
if action_needed:
    lines.append(f"Action: {action_needed}")
if summary:
    # Escape $ for Telegram Markdown
    safe_summary = summary.replace('$', '\\$')
    lines.append(f"_{safe_summary}_")

return "\n".join(lines)

def format_info_summary(info_items, low_relevance_items=None, source_subject=""):
"""Format info items and low-relevance items into a digest message."""
lines = []
if source_subject:
lines.append(f"πŸ“¬ {source_subject}\n")

if info_items:
    lines.append("**Info:**")
    for item in info_items:
        who = item.get("who", "")
        desc = item.get("description", "")
        summary = item.get("summary", "")
        line = f"  β€’ {summary}"
        if who:
            line += f" ({who})"
        if desc and desc != summary:
            line += f" β€” {desc}"
        lines.append(line)

if low_relevance_items:
    lines.append("\n**Low Relevance:**")
    for item in low_relevance_items:
        who = item.get("who", "")
        summary = item.get("summary", "")
        reason = item.get("reason", "")
        line = f"  β€’ {summary}"
        if who:
            line += f" ({who})"
        if reason:
            line += f" β€” {reason}"
        lines.append(line)

return "\n".join(lines)

---------------------------------------------------------------------------

Push Pipeline Results β€” Main Entry Point

---------------------------------------------------------------------------

def push_pipeline_results(result, quiet=False, target=None):
"""Take a process_emails() result dict and push all notifications.

Routing (v1.0 β€” Trust Over Silence):
  Family Group (TELEGRAM_CHAT_ID):
    - New appointments, reminders, action items
    - Conflict alerts with inline buttons
    - Newsletter digest

  Matt's DM (TELEGRAM_DEV_ID):
    - Auto-filtered items (shadow filter guardrail)
    - Pipeline errors

  --quiet: suppress family group notifications, only send DM alerts.
  Not the default until household trusts the system.

Returns a dict with counts of messages sent/failed.
"""
stats = {"sent": 0, "failed": 0}
group_target = target or DEFAULT_TELEGRAM_TARGET
dm_target = DEV_TELEGRAM_TARGET

if not quiet:
    # ── Family Group Notifications ──

    # 0. Payment alerts β€” sent IMMEDIATELY, never batched
    payment_alerts = result.get("payment_alerts", [])
    for pa in payment_alerts:
        msg = format_payment_alert(pa)
        if _send_telegram(msg, target=group_target):
            stats["sent"] += 1
        else:
            stats["failed"] += 1

    # 1. New events β€” SKIPPED here; hermes_notify() handles each event
    #    individually with the "βœ… Hoffdesk Success" format + day-of-week
    #    mismatch detection. No duplicate batch message.

    # 2. New reminders β€” batch into one message
    created_reminders = [r for r in result.get("reminders_created", []) if r.get("status") == "CREATED"]
    if created_reminders:
        lines = ["πŸ“‹ **New Reminders**"]
        for r in created_reminders:
            summary = r.get("summary", "?")
            date = _fmt_date(r.get("date", ""))
            lines.append(f"  β€’ {summary} β€” {date}")
        msg = "\n".join(lines)
        if _send_telegram(msg, target=group_target):
            stats["sent"] += 1
        else:
            stats["failed"] += 1

    # 3. New action items β€” enrich with Clicker if URLs found
    created_actions = [a for a in result.get("actions_created", []) if a.get("status") == "CREATED"]
    if created_actions:
        lines = ["βœ… **New Action Items**"]
        for a in created_actions:
            summary = a.get("summary", "?")
            date = _fmt_date(a.get("date", ""))
            description = a.get("description", "") or ""

            # Check if the action item has a URL β€” if so, run the Clicker
            urls = _extract_urls(description)
            if urls:
                from family_assistant.clicker import click_url, build_slot_buttons, hash_url
                from family_assistant.slot_handler import cache_slots
                for url in urls[:1]:  # First URL only
                    click_result = click_url(url, context_summary=summary)
                    if click_result["status"] == "SLOTS_FOUND":
                        # Cache slots so callback handler can look them up on button tap
                        url_hash = cache_slots(url, click_result["slots"], hash_url(url))
                        # Send enriched message with slot buttons
                        slot_msg = click_result["message"]
                        slot_buttons = build_slot_buttons(click_result["slots"], url_hash)
                        if _send_telegram_with_buttons(slot_msg, slot_buttons, target=group_target):
                            stats["sent"] += 1
                        else:
                            stats["failed"] += 1
                        continue  # Skip plain-text line for this action
                    elif click_result["status"] == "FETCH_FAILED":
                        lines.append(f"  β€’ {summary} β€” {date}")
                        lines.append(f"    ⚠️ Signup page couldn't be loaded")
                    else:
                        # NO_SLOTS or other β€” just show the plain action
                        lines.append(f"  β€’ {summary} β€” {date}")
            else:
                lines.append(f"  β€’ {summary} β€” {date}")

        # Send any remaining plain-text action items
        if len(lines) > 1:
            msg = "\n".join(lines)
            if _send_telegram(msg, target=group_target):
                stats["sent"] += 1
            else:
                stats["failed"] += 1

    # 4. Conflict alerts β€” one message per conflict (urgent)
    conflicts = result.get("conflicts", [])
    for conflict in conflicts:
        msg = format_conflict_alert(conflict)
        if _send_telegram(msg, target=group_target):
            stats["sent"] += 1
        else:
            stats["failed"] += 1

    # 5. Newsletter digest β€” one message
    info_summary = result.get("info_summary", "")
    if info_summary:
        if _send_telegram(info_summary, target=group_target):
            stats["sent"] += 1
        else:
            stats["failed"] += 1

# ── Matt's DM Notifications (always sent, even in quiet mode) ──

# 6. Auto-filtered items β€” silent failure guardrail
auto_filtered = result.get("auto_filtered", [])
if auto_filtered:
    lines = ["πŸ”‡ **Auto-Filtered**"]
    for af in auto_filtered:
        lines.append(f"  β€’ {af['summary']} β€” {af['rule']}")
    lines.append("Reply 'restore <name>' to add it back.")
    msg = "\n".join(lines)
    if dm_target and _send_telegram(msg, target=dm_target):
        stats["sent"] += 1
    elif not dm_target:
        # No DM target configured β€” fall back to group in quiet mode
        print("  [Hermes] No TELEGRAM_DEV_ID set, auto-filter alert not sent", file=sys.stderr)

# 7. Errors β€” always to DM
errors = result.get("errors", [])
if errors:
    lines = ["❌ **Pipeline Errors**"]
    for e in errors:
        lines.append(f"  β€’ {e}")
    msg = "\n".join(lines)
    if dm_target:
        if _send_telegram(msg, target=dm_target):
            stats["sent"] += 1
        else:
            stats["failed"] += 1
    else:
        # Fall back to group if no DM target
        if _send_telegram(msg, target=group_target):
            stats["sent"] += 1
        else:
            stats["failed"] += 1

return stats

push_maintenance_alert β€” DISABLED during maintenance tracking purge (Phase 5β†’6).

Kept for reference; maintenance_sentinel.py module preserved for Phase 6.

See: /home/hoffmann_admin/.openclaw/workspace/scripts/family_assistant/maintenance_sentinel.py

def push_maintenance_alert(target=None, dedup=True):
"""DISABLED β€” maintenance tracking purged. Returns empty stats."""
return {"sent": 0, "failed": 0}