"""Hermes ā Telegram push notification agent.
Formats and delivers pipeline results as Telegram messages via the
OpenClaw CLI. No custom bot needed.
Routing (v1.0 ā Trust Over Silence):
Family Group (TELEGRAM_CHAT_ID):
- New appointments (visual confirmation that the system processed the email)
- New reminders and action items
- Conflict alerts with inline buttons
- Newsletter digest (info summary)
Matt's DM (TELEGRAM_DEV_ID):
- Auto-filtered items (shadow filter guardrail ā silent failure prevention)
- Pipeline errors (operational, not household-facing)
--quiet flag available for future use: suppresses family group notifications,
sends only DM alerts. Not the default until household trusts the system.
"""
import json
import os
import re
import subprocess
import sys
from datetime import datetime
from zoneinfo import ZoneInfo
from icarus.core.config import CHICAGO_TZ
Telegram targets ā from env vars, with fallback defaults
DEFAULT_TELEGRAM_TARGET = os.environ.get("TELEGRAM_CHAT_ID", "-1000000000000")
Override for dev notifications (e.g., auto-filter alerts)
DEV_TELEGRAM_TARGET = os.environ.get("TELEGRAM_DEV_ID", "")
Bot token for direct Telegram API calls (faster than openclaw CLI)
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
import requests as _requests
def _send_telegram(message, target=None, silent=False):
"""Send a message via Telegram Bot API directly.
Falls back to openclaw CLI if bot token is not configured.
Uses direct API for speed ā the openclaw CLI was timing out.
"""
target = target or os.environ.get("TELEGRAM_TARGET", DEFAULT_TELEGRAM_TARGET)
# Primary: direct Telegram Bot API
if TELEGRAM_BOT_TOKEN:
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": target,
"text": message,
"parse_mode": "Markdown",
}
if silent:
payload["disable_notification"] = True
resp = _requests.post(url, json=payload, timeout=10)
if resp.status_code == 200 and resp.json().get("ok"):
return True
else:
print(f" [Hermes] Bot API error: {resp.status_code} {resp.text[:200]}", file=sys.stderr)
except Exception as e:
print(f" [Hermes] Bot API exception: {e}", file=sys.stderr)
# Fallback: openclaw CLI
cmd = [
"openclaw", "message", "send",
"--channel", "telegram",
"--target", target,
"--message", message,
]
if silent:
cmd.append("--silent")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
print(f" [Hermes] CLI failed (exit {result.returncode}): {result.stderr.strip()}", file=sys.stderr)
return False
return True
except subprocess.TimeoutExpired:
print(" [Hermes] CLI timed out (15s)", file=sys.stderr)
return False
except Exception as e:
print(f" [Hermes] CLI error: {e}", file=sys.stderr)
return False
def _extract_urls(text):
"""Extract HTTP(S) URLs from text."""
if not text:
return []
urls = re.findall(r'https?://[^\s<>"\']+', text)
return [u.rstrip('.,;:)>]') for u in urls]
def _send_telegram_with_buttons(message, buttons, target=None):
"""Send a message with inline keyboard buttons via OpenClaw.
Args:
message: Text message to send
buttons: List of rows, each row is a list of dicts
[{"text": "Label", "callback_data": "data"}, ...]
target: Telegram target (defaults to TELEGRAM_CHAT_ID)
"""
target = target or os.environ.get("TELEGRAM_TARGET", DEFAULT_TELEGRAM_TARGET)
cmd = [
"openclaw", "message", "send",
"--channel", "telegram",
"--target", target,
"--message", message,
"--buttons", json.dumps(buttons),
]
# Primary: direct Telegram Bot API with inline keyboard
if TELEGRAM_BOT_TOKEN:
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
inline_keyboard = [[{"text": b.get("text", "?"), "callback_data": b.get("callback_data", "")} for b in row] for row in buttons]
payload = {
"chat_id": target,
"text": message,
"parse_mode": "Markdown",
"reply_markup": {"inline_keyboard": inline_keyboard},
}
resp = _requests.post(url, json=payload, timeout=10)
if resp.status_code == 200 and resp.json().get("ok"):
return True
else:
print(f" [Hermes] Button Bot API error: {resp.status_code} {resp.text[:200]}", file=sys.stderr)
except Exception as e:
print(f" [Hermes] Button Bot API exception: {e}", file=sys.stderr)
# Fallback: openclaw CLI
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
print(f" [Hermes] Button CLI failed (exit {result.returncode}): {result.stderr.strip()}", file=sys.stderr)
return False
return True
except subprocess.TimeoutExpired:
print(" [Hermes] Button CLI timed out (15s)", file=sys.stderr)
return False
except Exception as e:
print(f" [Hermes] Button CLI error: {e}", file=sys.stderr)
return False
---------------------------------------------------------------------------
Message Formatters
---------------------------------------------------------------------------
def _fmt_datetime(dt_str):
"""Format an ISO datetime string for human-readable display.
Includes the year if it differs from the current year.
"""
if not dt_str:
return "TBD"
try:
dt = datetime.fromisoformat(dt_str.replace("+00:00", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=CHICAGO_TZ)
dt = dt.astimezone(CHICAGO_TZ)
fmt = "%a %b %d, %-I:%M %p"
if dt.year != datetime.now(CHICAGO_TZ).year:
fmt = "%a %b %d %Y, %-I:%M %p"
return dt.strftime(fmt)
except (ValueError, TypeError):
return dt_str
def _fmt_date(d_str):
"""Format a date string for display.
Includes the year if it differs from the current year.
"""
if not d_str:
return "TBD"
try:
dt = datetime.fromisoformat(d_str)
fmt = "%a %b %d"
if dt.year != datetime.now(CHICAGO_TZ).year:
fmt = "%a %b %d %Y"
return dt.strftime(fmt)
except (ValueError, TypeError):
return d_str
def _check_day_mismatch(event_dict):
"""Check if the event's day-of-week might not match the actual date.
This catches cases where the LLM parsed a date from an email that
said e.g. "Monday, April 21" but April 21 is actually a Tuesday.
Returns a warning string or empty string.
"""
start_str = event_dict.get("start", {})
if isinstance(start_str, dict):
start_str = start_str.get("dateTime", start_str.get("date", ""))
if not start_str or not isinstance(start_str, str):
return ""
try:
dt = datetime.fromisoformat(start_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=CHICAGO_TZ)
dt = dt.astimezone(CHICAGO_TZ)
actual_day = dt.strftime("%A")
# Check if a different day name appears in the original event context
# We can't re-parse the email here, but we can store the LLM's
# original day-of-week hint in the event dict if provided
claimed_day = event_dict.get("_claimed_day_of_week", "")
if claimed_day and claimed_day.lower() != actual_day.lower():
return f"ā ļø Day mismatch: email said {claimed_day} but {dt.strftime('%b %d')} is actually {actual_day}"
return ""
except (ValueError, TypeError):
return ""
def hermes_notify(event, email_subject="", target=None):
"""Send a Hoffdesk success notification for a calendar event write.
Called every time pipeline successfully creates an event on Radicale.
Includes day-of-week mismatch warning if the email's claimed day
doesn't match the actual calendar date.
Args:
event: Event dict with summary, start, end, location, etc.
May include '_claimed_day_of_week' from LLM extraction.
email_subject: Original email subject for context.
target: Telegram target (defaults to family group).
Returns:
True if sent successfully, False otherwise.
"""
group_target = target or DEFAULT_TELEGRAM_TARGET
summary = event.get("summary", "Unknown Event")
start_raw = event.get("start", "")
end_raw = event.get("end", "")
location = event.get("location", "")
# Format date and time
date_str = _fmt_datetime(start_raw) if start_raw else "TBD"
# Extract just the time portion for the Time line
time_str = ""
try:
start_dt = datetime.fromisoformat(start_raw.replace("+00:00", "+00:00")) if isinstance(start_raw, str) else None
if start_dt:
if start_dt.tzinfo is None:
start_dt = start_dt.replace(tzinfo=CHICAGO_TZ)
start_dt = start_dt.astimezone(CHICAGO_TZ)
time_str = start_dt.strftime("%-I:%M %p")
# Add end time if available
end_dt = None
if isinstance(end_raw, str) and end_raw:
end_dt = datetime.fromisoformat(end_raw.replace("+00:00", "+00:00"))
if end_dt.tzinfo is None:
end_dt = end_dt.replace(tzinfo=CHICAGO_TZ)
end_dt = end_dt.astimezone(CHICAGO_TZ)
if end_dt:
time_str += f" ā {end_dt.strftime('%-I:%M %p')}"
except (ValueError, TypeError):
time_str = date_str # Fallback to full datetime
# Build message
lines = [
"ā
**Hoffdesk Success**",
f"Event: {summary}",
f"Date: {date_str}",
]
if time_str and time_str != date_str:
lines.append(f"Time: {time_str}")
# Location with travel enrichment
if location:
try:
from icarus.core.location_cache import resolve, format_travel_info
loc_result = resolve(location, use_home_bias=True)
if loc_result:
travel = format_travel_info(loc_result)
lines.append(f"š {travel}" if travel else f"š {location}")
else:
lines.append(f"š {location}")
except Exception:
lines.append(f"š {location}")
# Day-of-week mismatch warning
day_warning = _check_day_mismatch(event)
if day_warning:
lines.append(f"\n{day_warning}")
msg = "\n".join(lines)
return _send_telegram(msg, target=group_target)
"""Format a single new event for Telegram notification.
Enriches location strings with travel time from home when possible.
"""
summary = event.get("summary", "Unknown Event")
start = _fmt_datetime(event.get("start", ""))
location = event.get("location", "")
msg = f"š
**{summary}**\n{start}"
if location:
# Enrich location with travel time from cache
try:
from icarus.core.location_cache import resolve, format_travel_info
loc_result = resolve(location, use_home_bias=True)
if loc_result:
travel_info = format_travel_info(loc_result)
if travel_info:
# Use the resolved name + travel time if available
msg += f"\nš {travel_info}"
else:
msg += f"\nš {location}"
else:
msg += f"\nš {location}"
except Exception:
# If location resolution fails, just use the raw location string
msg += f"\nš {location}"
return msg
def format_reminder_notification(reminder):
"""Format a single reminder for Telegram notification."""
summary = reminder.get("summary", "Unknown Reminder")
date = _fmt_date(reminder.get("date", ""))
msg = f"š {summary}\n{date}"
return msg
def format_action_notification(action):
"""Format a single action item for Telegram notification."""
summary = action.get("summary", "Unknown Action")
date = _fmt_date(action.get("date", ""))
msg = f"ā
{summary}"
if date and date != "TBD":
msg += f"\nDue: {date}"
return msg
def format_conflict_alert(conflict, resolution=None):
"""Format a scheduling conflict for Telegram notification with inline buttons."""
e1 = conflict.get("event1", {})
e2 = conflict.get("event2", {})
overlap = conflict.get("overlap_minutes", 0)
msg = "ā ļø Scheduling Conflict\n\n"
msg += f"š
{e1.get('summary', '?')}\n"
msg += f"š
{e2.get('summary', '?')}\n"
msg += f"ā±ļø {overlap}min overlap"
return msg
def format_error_alert(error_msg):
"""Format a pipeline error for Telegram notification."""
return f"ā Pipeline Error\n{error_msg}"
def format_info_summary(info_items, low_relevance_items=None, source_subject="",
events_created=None, reminders_created=None, actions_created=None):
"""Format newsletter digest with full visibility into what was actioned.
Shows events, reminders, and action items that were created, plus info items
and low-relevance items. This gives the family full visibility into what
the assistant extracted and scheduled.
"""
lines = []
if source_subject:
lines.append(f"š¬ **{source_subject}**\n")
# Section 1: Events Created (Calendar)
events_created = events_created or []
# Include both CREATED and DRY_RUN status for visibility
created_events = [e for e in events_created if e.get("status") in ("CREATED", "DRY_RUN")]
if created_events:
dry_run_prefix = "Would be " if any(e.get("status") == "DRY_RUN" for e in created_events) else ""
lines.append(f"**š
{dry_run_prefix}Events Added to Calendar ({len(created_events)}):**")
for evt in created_events:
summary = evt.get("summary", "?")
start = evt.get("start", "")
date_str = _fmt_date(start) if start else "?"
line = f" ⢠{summary} ā {date_str}"
if evt.get("location"):
line += f" @ {evt['location']}"
lines.append(line)
lines.append("")
# Section 2: Reminders Created
reminders_created = reminders_created or []
# Include both CREATED and DRY_RUN status for visibility
created_reminders = [r for r in reminders_created if r.get("status") in ("CREATED", "DRY_RUN")]
if created_reminders:
dry_run_prefix = "Would be " if any(r.get("status") == "DRY_RUN" for r in created_reminders) else ""
lines.append(f"**ā° {dry_run_prefix}Reminders Set ({len(created_reminders)}):**")
for rem in created_reminders:
summary = rem.get("summary", "?")
date_str = _fmt_date(rem.get("date", ""))
lines.append(f" ⢠{summary} ā {date_str}")
lines.append("")
# Section 3: Action Items Created
actions_created = actions_created or []
# Include both CREATED and DRY_RUN status for visibility
created_actions = [a for a in actions_created if a.get("status") in ("CREATED", "DRY_RUN")]
if created_actions:
dry_run_prefix = "Would be " if any(a.get("status") == "DRY_RUN" for a in created_actions) else ""
lines.append(f"**ā
{dry_run_prefix}Action Items ({len(created_actions)}):**")
for act in created_actions:
summary = act.get("summary", "?")
date_str = _fmt_date(act.get("date", ""))
desc = act.get("description", "")
line = f" ⢠{summary}"
if date_str:
line += f" ā due {date_str}"
if desc and len(desc) < 80:
line += f"\n _{desc[:80]}_"
lines.append(line)
lines.append("")
# Section 4: Info Items (High relevance, no calendar action)
if info_items:
lines.append(f"**š Info ({len(info_items)} items):**")
for item in info_items:
who = item.get("who", "")
desc = item.get("description", "")
summary = item.get("summary", "")
line = f" ⢠{summary}"
if who:
line += f" ({who})"
if desc and desc != summary and len(desc) < 60:
line += f" ā {desc}"
lines.append(line)
lines.append("")
# Section 5: Low Relevance Items (Not added to calendar)
if low_relevance_items:
lines.append(f"**š Low Relevance ({len(low_relevance_items)} items ā not added):**")
for item in low_relevance_items:
who = item.get("who", "")
summary = item.get("summary", "")
reason = item.get("reason", "")
line = f" ⢠{summary}"
if who:
line += f" ({who})"
if reason:
short_reason = reason[:60] + "..." if len(reason) > 60 else reason
line += f" ā {short_reason}"
lines.append(line)
lines.append("")
# Footer: Nothing found case
if not any([created_events, created_reminders, created_actions, info_items, low_relevance_items]):
lines.append("_No actionable items found in this newsletter._")
return "\n".join(lines).strip()
---------------------------------------------------------------------------
Push Pipeline Results ā Main Entry Point
---------------------------------------------------------------------------
def push_pipeline_results(result, quiet=False, target=None):
"""Take a process_emails() result dict and push all notifications.
Routing (v1.0 ā Trust Over Silence):
Family Group (TELEGRAM_CHAT_ID):
- New appointments, reminders, action items
- Conflict alerts with inline buttons
- Newsletter digest
Matt's DM (TELEGRAM_DEV_ID):
- Auto-filtered items (shadow filter guardrail)
- Pipeline errors
--quiet: suppress family group notifications, only send DM alerts.
Not the default until household trusts the system.
Returns a dict with counts of messages sent/failed.
"""
stats = {"sent": 0, "failed": 0}
group_target = target or DEFAULT_TELEGRAM_TARGET
dm_target = DEV_TELEGRAM_TARGET
if not quiet:
# āā Family Group Notifications āā
# 1. New events ā SKIPPED here; hermes_notify() handles each event
# individually with the "ā
Hoffdesk Success" format + day-of-week
# mismatch detection. No duplicate batch message.
# 2. New reminders ā batch into one message
created_reminders = [r for r in result.get("reminders_created", []) if r.get("status") == "CREATED"]
if created_reminders:
lines = ["š **New Reminders**"]
for r in created_reminders:
summary = r.get("summary", "?")
date = _fmt_date(r.get("date", ""))
lines.append(f" ⢠{summary} ā {date}")
msg = "\n".join(lines)
if _send_telegram(msg, target=group_target):
stats["sent"] += 1
else:
stats["failed"] += 1
# 3. New action items ā enrich with Clicker if URLs found
created_actions = [a for a in result.get("actions_created", []) if a.get("status") == "CREATED"]
if created_actions:
lines = ["ā
**New Action Items**"]
for a in created_actions:
summary = a.get("summary", "?")
date = _fmt_date(a.get("date", ""))
description = a.get("description", "") or ""
# Check if the action item has a URL ā if so, run the Clicker
urls = _extract_urls(description)
if urls:
from icarus.core.clicker import click_url, build_slot_buttons, hash_url
from icarus.core.slot_handler import cache_slots
for url in urls[:1]: # First URL only
click_result = click_url(url, context_summary=summary)
if click_result["status"] == "SLOTS_FOUND":
# Cache slots so callback handler can look them up on button tap
url_hash = cache_slots(url, click_result["slots"], hash_url(url))
# Send enriched message with slot buttons
slot_msg = click_result["message"]
slot_buttons = build_slot_buttons(click_result["slots"], url_hash)
if _send_telegram_with_buttons(slot_msg, slot_buttons, target=group_target):
stats["sent"] += 1
else:
stats["failed"] += 1
continue # Skip plain-text line for this action
elif click_result["status"] == "FETCH_FAILED":
lines.append(f" ⢠{summary} ā {date}")
lines.append(f" ā ļø Signup page couldn't be loaded")
else:
# NO_SLOTS or other ā just show the plain action
lines.append(f" ⢠{summary} ā {date}")
else:
lines.append(f" ⢠{summary} ā {date}")
# Send any remaining plain-text action items
if len(lines) > 1:
msg = "\n".join(lines)
if _send_telegram(msg, target=group_target):
stats["sent"] += 1
else:
stats["failed"] += 1
# 4. Conflict alerts ā one message per conflict (urgent)
conflicts = result.get("conflicts", [])
for conflict in conflicts:
msg = format_conflict_alert(conflict)
if _send_telegram(msg, target=group_target):
stats["sent"] += 1
else:
stats["failed"] += 1
# 5. Newsletter digest ā one message
info_summary = result.get("info_summary", "")
if info_summary:
if _send_telegram(info_summary, target=group_target):
stats["sent"] += 1
else:
stats["failed"] += 1
# āā Matt's DM Notifications (always sent, even in quiet mode) āā
# 6. Auto-filtered items ā silent failure guardrail
auto_filtered = result.get("auto_filtered", [])
if auto_filtered:
lines = ["š **Auto-Filtered**"]
for af in auto_filtered:
lines.append(f" ⢠{af['summary']} ā {af['rule']}")
lines.append("Reply 'restore <name>' to add it back.")
msg = "\n".join(lines)
if dm_target and _send_telegram(msg, target=dm_target):
stats["sent"] += 1
elif not dm_target:
# No DM target configured ā fall back to group in quiet mode
print(" [Hermes] No TELEGRAM_DEV_ID set, auto-filter alert not sent", file=sys.stderr)
# 7. Errors ā always to DM
errors = result.get("errors", [])
if errors:
lines = ["ā **Pipeline Errors**"]
for e in errors:
lines.append(f" ⢠{e}")
msg = "\n".join(lines)
if dm_target:
if _send_telegram(msg, target=dm_target):
stats["sent"] += 1
else:
stats["failed"] += 1
else:
# Fall back to group if no DM target
if _send_telegram(msg, target=group_target):
stats["sent"] += 1
else:
stats["failed"] += 1
return stats
def push_maintenance_alert(target=None, dedup=True):
"""Check maintenance items and post alerts for due/overdue items.
Called by the heartbeat and Daily Brief. Sends to family group
with inline Done buttons.
Args:
target: Override target chat ID (defaults to family group).
dedup: If True, only alert for items not previously notified
(prevents spam on 30-min heartbeat). Daily Brief should
set dedup=False to always show current state.
Returns:
Dict with sent/failed stats.
"""
from icarus.core.maintenance_sentinel import (
check_maintenance,
format_maintenance_brief,
build_maintenance_buttons,
filter_new_alerts,
)
group_target = target or os.environ.get("TELEGRAM_CHAT_ID", "")
stats = {"sent": 0, "failed": 0}
result = check_maintenance()
# Dedup: only alert for items we haven't notified about yet
if dedup:
result = filter_new_alerts(result)
brief = format_maintenance_brief(result)
buttons = build_maintenance_buttons(result)
if not brief:
return stats # Nothing due (or already alerted), nothing to send
# Send with inline Done buttons if items are due now
if buttons:
if _send_telegram_with_buttons(brief, buttons, target=group_target):
stats["sent"] += 1
else:
stats["failed"] += 1
else:
# Only upcoming items (no Done buttons needed)
if _send_telegram(brief, target=group_target):
stats["sent"] += 1
else:
stats["failed"] += 1
return stats