"""Telegram webhook handler and bot API client."""
import os
import io
import re
import logging
from pathlib import Path
from fastapi import Request, HTTPException
import httpx
from icarus.core.config.staging import (
TELEGRAM_BOT_TOKEN,
TELEGRAM_ALLOWED_USERS,
OLLAMA_BASE_URL
)
from icarus.core.vision.pipeline import process_attachment
from icarus.core.handlers.recipe_toggle import (
handle_toggle,
handle_commit,
handle_cancel,
handle_recipe_url,
handle_groceries_command,
handle_clear_groceries,
)
from icarus.core.handlers.calendar_action import (
handle_calendar_callback,
build_action_buttons,
)
from icarus.core.db.documents import store_briefing_event
Known recipe domains — URLs from these are always treated as recipe URLs.
URLs from unknown domains are still attempted if they look recipe-like
(contain /recipe/ in the path) or if no command/text match is found.
RECIPE_DOMAINS = [
"allrecipes.com",
"budgetbytes.com",
"seriouseats.com",
"foodnetwork.com",
"cooking.nytimes.com",
"bonappetit.com",
"epicurious.com",
"smittenkitchen.com",
"thestayathomechef.com",
"pinchofyum.com",
"cookieandkate.com",
"minimalistbaker.com",
"thewoksoflife.com",
"177milkstreet.com",
"americastestkitchen.com",
"delish.com",
"tasty.co",
"kitchn.com",
"recipetineats.com",
"halfbakedharvest.com",
"damndelicious.net",
"therecipecritic.com",
"spendwithpennies.com",
"joshuaweissman.com",
"grilledcheesesocial.com",
]
Pattern for URLs that look like recipe pages even on unknown domains
RECIPE_URL_PATTERN = re.compile(r'https?://[^/]+/recipe/', re.IGNORECASE)
URL_PATTERN = re.compile(r'https?://[^\s<>"{}|\^`[]]+')
---------------------------------------------------------------------------
Bot API proxy — thin wrapper over Telegram Bot API for recipe_toggle handlers
---------------------------------------------------------------------------
class _BotProxy:
"""Provides the bot API surface expected by recipe_toggle handlers.
Wraps send_message, edit_message_text, and answer_callback_query
using direct HTTP calls to the Telegram Bot API.
"""
async def send_message(self, chat_id: int, text: str, reply_markup=None, parse_mode="unset") -> dict:
payload = {
"chat_id": chat_id,
"text": text[:4000],
}
if reply_markup:
payload["reply_markup"] = reply_markup
# Determine parse_mode: explicit "none" means no parse_mode,
# "unset" (default) with reply_markup defaults to MarkdownV2,
# "unset" without reply_markup means no parse_mode.
if parse_mode != "unset":
# Explicitly set ("none" = plain text, or a real format like "MarkdownV2")
if parse_mode and parse_mode != "none":
payload["parse_mode"] = parse_mode
# else: "none" or empty string → no parse_mode at all
elif reply_markup:
# Default: MarkdownV2 when keyboard is present (recipe toggle UI)
payload["parse_mode"] = "MarkdownV2"
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
json=payload,
)
if resp.status_code == 400:
# MarkdownV2 parse failed — log and retry without parse_mode
logging.warning("send_message MarkdownV2 failed: %s", resp.text[:300])
# Retry without parse_mode — keep reply_markup if present
fallback_payload = {"chat_id": chat_id, "text": text[:4000]}
if reply_markup:
fallback_payload["reply_markup"] = reply_markup
resp2 = await client.post(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
json=fallback_payload,
)
if resp2.status_code == 200:
return resp2.json()
logging.error("send_message fallback also failed: %s", resp2.text[:300])
resp.raise_for_status()
return resp.json()
except Exception as e:
logging.error("send_message failed: %s", e)
return {"ok": False, "error": str(e)}
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
reply_markup=None,
) -> dict:
payload = {
"chat_id": chat_id,
"message_id": message_id,
"text": text[:4000],
"parse_mode": "MarkdownV2",
}
# Handle reply_markup: None means remove inline keyboard,
# dict means set new inline keyboard
if reply_markup is None:
# To remove inline keyboard, we must NOT include reply_markup at all
# (Telegram requires either no field or inline_keyboard: [])
# We'll use inline_keyboard: [] to explicitly remove it
payload["reply_markup"] = {"inline_keyboard": []}
elif reply_markup:
payload["reply_markup"] = reply_markup
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/editMessageText",
json=payload,
)
if resp.status_code == 400:
# "message is not modified" — safe to ignore
logging.debug("edit_message_text 400: %s", resp.text[:200])
return {"ok": True, "description": "not modified"}
resp.raise_for_status()
return resp.json()
except Exception as e:
logging.warning("edit_message_text failed: %s", e)
return {"ok": False, "error": str(e)}
async def answer_callback_query(self, callback_query_id: str, text: str = "") -> dict:
payload = {
"callback_query_id": callback_query_id,
}
if text:
payload["text"] = text[:200]
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/answerCallbackQuery",
json=payload,
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logging.warning("answer_callback_query failed: %s", e)
return {"ok": False, "error": str(e)}
---------------------------------------------------------------------------
Update processing (used by both webhook and polling)
---------------------------------------------------------------------------
async def process_update(update: dict) -> None:
"""Process a Telegram update dict directly.
Routes message updates and callback queries (inline keyboard).
Used by both the webhook handler and the long-polling loop.
"""
logging.info("HANDLER: raw keys=%s", list(update.keys()))
# ---- Callback query routing (inline keyboard buttons) ----
if "callback_query" in update:
callback = update["callback_query"]
# Validate user
cb_user_id = str(callback.get("from", {}).get("id", ""))
if cb_user_id not in TELEGRAM_ALLOWED_USERS:
return {"ok": True}
cb_data = callback.get("data", "")
bot = _BotProxy()
if cb_data.startswith("toggle:"):
await handle_toggle(callback, bot)
elif cb_data.startswith("commit:"):
await handle_commit(callback, bot)
elif cb_data.startswith("cancel:"):
await handle_cancel(callback, bot)
elif cb_data == "clear:groceries":
await handle_clear_groceries(callback, bot)
elif cb_data.startswith("calendar_add|") or cb_data.startswith("calendar_done|"):
await handle_calendar_callback(callback, bot)
elif cb_data.startswith("create_event:"):
await _handle_create_event_callback(callback, bot)
elif cb_data.startswith("skip_event:"):
await _handle_skip_event_callback(callback, bot)
else:
await bot.answer_callback_query(callback["id"], "Unknown action")
return
# ---- Message routing ----
user_id = str(update.get("message", {}).get("from", {}).get("id", ""))
logging.info("HANDLER: message present=%s, user_id=%r, allowed=%s", "message" in update, user_id, user_id in TELEGRAM_ALLOWED_USERS)
if user_id not in TELEGRAM_ALLOWED_USERS:
return # Silent ignore
message = update.get("message", {})
chat_id = message.get("chat", {}).get("id")
if not chat_id:
return {"ok": True}
# Handle photo (largest size)
if "photo" in message:
photo = message["photo"][-1]
file_id = photo["file_id"]
await _process_file(file_id, chat_id, "image.jpg", user_id)
return {"ok": True}
# Handle document (PDF, etc.)
if "document" in message:
doc = message["document"]
file_id = doc["file_id"]
file_name = doc.get("file_name", "document.pdf")
mime_type = doc.get("mime_type", "")
# Validate supported types
if not _is_supported_file(file_name, mime_type):
await _send_message(
chat_id,
"📄 Please send PDF or image files only (PNG, JPG)."
)
return {"ok": True}
await _process_file(file_id, chat_id, file_name, user_id)
return {"ok": True}
# Handle text commands and URLs
if "text" not in message:
return
text = message["text"]
logging.info("TEXT handler: text=%r, len=%d", text[:200], len(text))
# Check for recipe URLs before command handling
# Strategy: accept URLs from known recipe domains OR URLs with /recipe/ in the path.
# For any other URL, still try it as a recipe — fetch_recipe handles failures gracefully.
urls = URL_PATTERN.findall(text)
logging.info("TEXT handler: URLs found=%r", urls)
recipe_url = None
for url in urls:
# Known recipe domain → always try
if any(domain in url for domain in RECIPE_DOMAINS):
recipe_url = url
logging.info("TEXT handler: matched known domain for %r", url)
break
# URL looks like a recipe page (/recipe/ in path) → try
if RECIPE_URL_PATTERN.search(url):
recipe_url = url
logging.info("TEXT handler: matched recipe-like URL for %r", url)
break
# If we found any URL but none matched recipe patterns, try the first URL anyway.
# Many recipe pages don't have /recipe/ in the URL. fetch_recipe will handle failures.
if recipe_url is None and urls:
recipe_url = urls[0]
logging.info("TEXT handler: trying first URL as recipe candidate: %r", recipe_url)
logging.info("TEXT handler: recipe_url=%r", recipe_url)
if recipe_url:
logging.info("TEXT handler: Calling handle_recipe_url with url=%r", recipe_url)
try:
await handle_recipe_url(message, _BotProxy(), url=recipe_url)
except Exception as e:
logging.exception("TEXT handler: handle_recipe_url raised: %s", e)
await _send_message(chat_id, f"❌ Recipe handler error: {e}")
return
if text == "/start":
await _send_message(
chat_id,
"👋 Send me a document, photo, or recipe URL and I'll help you out!"
)
elif text == "/status":
status = await _get_status()
await _send_message(chat_id, status)
elif text == "/groceries" or text.startswith("/groceries "):
parts = text.strip().split()
if len(parts) > 1 and parts[1].lower() == "clear":
await handle_clear_groceries(message, _BotProxy())
else:
await handle_groceries_command(message, _BotProxy())
else:
await _send_message(
chat_id,
"📎 Send a PDF, image, or recipe URL to get started."
)
---------------------------------------------------------------------------
Webhook handler (legacy — kept for backward compatibility)
---------------------------------------------------------------------------
async def handle_telegram_update(request: Request) -> dict:
"""Handle incoming Telegram webhook update.
Routes message updates and callback queries (inline keyboard).
Simply wraps process_update() for webhook compatibility.
Returns: {"ok": True} to acknowledge receipt
"""
try:
data = await request.json()
except Exception as e:
logging.warning("Failed to parse webhook JSON: %s", e)
raise HTTPException(status_code=400, detail="Invalid JSON body")
await process_update(data)
return {"ok": True}
def _is_supported_file(file_name: str, mime_type: str) -> bool:
"""Check if file type is supported."""
supported_ext = [".pdf", ".png", ".jpg", ".jpeg"]
supported_mime = [
"application/pdf",
"image/png",
"image/jpeg",
"image/jpg"
]
has_supported_ext = any(file_name.lower().endswith(ext) for ext in supported_ext)
has_supported_mime = mime_type in supported_mime
return has_supported_ext or has_supported_mime
async def _process_file(file_id: str, chat_id: int, file_name: str, user_id: str):
"""Download file from Telegram, process, send briefing."""
# Send "processing" message
processing_msg = await _send_message(chat_id, "⏳ Processing document...")
try:
# Download file from Telegram
file_bytes = await _download_telegram_file(file_id)
# Build email_meta for pipeline
email_meta = {
"from": f"telegram:{user_id}",
"subject": file_name,
"date": "now",
"to": "family",
"body": ""
}
# Process through vision pipeline
result = await process_attachment(email_meta, file_bytes, file_name)
# Debug: log the result structure
print(f"DEBUG: Vision pipeline result keys: {result.keys()}")
print(f"DEBUG: Briefing type: {type(result.get('briefing'))}")
# Format and send response
text = _format_briefing(result)
# Check for calendar action buttons
reply_markup = _build_briefing_buttons(result, file_name)
await _send_message(chat_id, text, markdown=True, reply_markup=reply_markup)
except Exception as e:
import traceback
error_detail = traceback.format_exc()
print(f"Processing error: {error_detail}")
await _send_message(
chat_id,
f"❌ Processing failed: {str(e)[:200]}"
)
async def _download_telegram_file(file_id: str) -> bytes:
"""Download file from Telegram servers."""
async with httpx.AsyncClient() as client:
# Get file path
resp = await client.get(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile",
params={"file_id": file_id}
)
resp.raise_for_status()
result = resp.json()
if not result.get("ok"):
raise ValueError(f"Failed to get file: {result}")
file_path = result["result"]["file_path"]
# Download actual file
file_resp = await client.get(
f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}"
)
file_resp.raise_for_status()
return file_resp.content
async def _send_message(chat_id: int, text: str, markdown: bool = False, reply_markup: dict | None = None) -> dict:
"""Send message via Telegram Bot API.
Args:
chat_id: Telegram chat ID
text: Message text (max 4000 chars)
markdown: Whether to use Markdown parse mode
reply_markup: Optional inline keyboard dict
"""
payload = {
"chat_id": chat_id,
"text": text[:4000] # Telegram limit
}
if markdown:
payload["parse_mode"] = "Markdown"
if reply_markup:
payload["reply_markup"] = reply_markup
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
json=payload
)
if resp.status_code == 400:
# Bad request - log but don't crash
print(f"Telegram 400 error: {resp.text}")
# Retry without markdown if it was a parse issue
if markdown:
fallback_payload = {"chat_id": chat_id, "text": text[:4000]}
if reply_markup:
fallback_payload["reply_markup"] = reply_markup
resp2 = await client.post(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
json=fallback_payload,
)
if resp2.status_code == 200:
return resp2.json()
print(f"Telegram fallback also failed: {resp2.text[:300]}")
return {"ok": False, "error": resp.text}
resp.raise_for_status()
return resp.json()
except Exception as e:
print(f"Failed to send message: {e}")
return {"ok": False, "error": str(e)}
def _format_briefing(result: dict) -> str:
"""Format briefing card for Telegram (Markdown)."""
# Handle different result structures
if not isinstance(result, dict):
return "❌ Invalid briefing result (not a dict)"
briefing = result.get("briefing", {})
# If briefing is not a dict, try to use result directly (flat structure)
if not isinstance(briefing, dict):
briefing = result
# Extract fields with fallbacks
title = briefing.get("title") or briefing.get("Title", "Document Briefing")
summary = briefing.get("summary") or briefing.get("Summary", "No summary available.")
key_details = briefing.get("key_details") or briefing.get("keyDetails", {})
if not isinstance(key_details, dict):
key_details = {}
conflicts = briefing.get("conflicts", [])
if not isinstance(conflicts, list):
conflicts = []
actions = briefing.get("suggested_actions") or briefing.get("suggestedActions", [])
if not isinstance(actions, list):
actions = []
confidence = briefing.get("confidence", 0.5)
lines = [
f"📋 *{title}*",
"",
summary,
""
]
# Key details
if key_details:
lines.append("*Key Details:*")
for key, value in key_details.items():
if value:
lines.append(f"• {key.replace('_', ' ').title()}: {value}")
lines.append("")
# Conflicts
if conflicts:
lines.append("⚠️ *Conflicts:*")
for conflict in conflicts:
lines.append(f"• {conflict}")
lines.append("")
# Suggested actions
if actions:
lines.append("✅ *Suggested Actions:*")
for action in actions:
lines.append(f"• {action}")
# Confidence
if confidence:
lines.append(f"")
lines.append(f"_Confidence: {int(float(confidence) * 100)}%_")
return "\n".join(lines)
def _build_briefing_buttons(result: dict, filename: str = "") -> dict | None:
"""Build inline keyboard buttons for a briefing card if calendar-applicable.
Stores event data in the documents DB so it can be retrieved when
the user taps "Add to Calendar".
Returns:
Telegram ReplyMarkupKeyboard dict, or None if no buttons applicable.
"""
briefing = result.get("briefing", {})
if not isinstance(briefing, dict):
briefing = result
action_info = briefing.get("action_buttons")
if not action_info or not isinstance(action_info, dict):
return None
# Only show buttons for calendar-applicable events
if not action_info.get("can_add_to_calendar"):
return None
# Extract event details from the briefing
summary = action_info.get("event_summary", briefing.get("title", "Event"))
start_time = action_info.get("event_start", "")
end_time = action_info.get("event_end", "")
location = action_info.get("event_location", "")
category = action_info.get("category", "event")
# Get additional details from the briefing
key_details = briefing.get("key_details", {})
if not isinstance(key_details, dict):
key_details = {}
description = briefing.get("summary", "")
if key_details.get("deadline"):
description += f"\nDeadline: {key_details['deadline']}"
if key_details.get("contact"):
description += f"\nContact: {key_details['contact']}"
if key_details.get("requirements"):
description += f"\nRequirements: {key_details['requirements']}"
# Try to parse/construct start/end times
# If key_details has individual date/time fields, combine them
date_val = key_details.get("date", "") or key_details.get("Date", "")
time_val = key_details.get("time", "") or key_details.get("Time", "")
if not start_time and date_val:
start_time = f"{date_val} {time_val}" if time_val else date_val
elif not start_time:
# No date/time available — can't add to calendar meaningfully
return None
# Store the event in the documents DB for later retrieval
try:
store_result = store_briefing_event(
summary=summary,
start_time=start_time,
end_time=end_time,
location=location if isinstance(location, str) else str(location),
description=description[:500], # Limit description length
category=category,
who=[], # Populated from family inference if available
source_filename=filename,
full_briefing=briefing,
)
doc_id = store_result["doc_id"]
event_hash = store_result["event_hash"]
except Exception as e:
print(f"Failed to store briefing event: {e}")
return None
# Build and return the action buttons
return build_action_buttons(doc_id, event_hash, has_datetime=True)
---------------------------------------------------------------------------
Shadow Bot admin confirmation callbacks (create_event / skip_event)
---------------------------------------------------------------------------
async def _handle_create_event_callback(callback: dict, bot: _BotProxy) -> None:
"""Handle admin confirmation to create a calendar event from a NO_MATCH extraction.
Callback data format: create_event:{extraction_id}
"""
import json
from datetime import datetime
cb_data = callback.get("data", "")
parts = cb_data.split(":")
if len(parts) < 2:
await bot.answer_callback_query(callback["id"], "Invalid callback data")
return
extraction_id = int(parts[1])
chat_id = callback["message"]["chat"]["id"]
msg_id = callback["message"]["message_id"]
# Fetch extraction from shadow DB
from icarus.core.observer.shadow_database import ShadowDatabase
db = ShadowDatabase()
extraction = db.get_extraction_by_id(extraction_id)
if not extraction:
await bot.answer_callback_query(callback["id"], f"Extraction {extraction_id} not found")
await bot.edit_message_text(
chat_id, msg_id,
f"❌ Extraction {extraction_id} not found in shadow database."
)
return
await bot.answer_callback_query(callback["id"], "Creating event...")
await bot.edit_message_text(
chat_id, msg_id,
f"⏳ Creating calendar event for extraction #{extraction_id}..."
)
try:
# Use CalendarValidator to create the event
from icarus.core.observer.calendar_validator import CalendarValidator
validator = CalendarValidator()
# Parse extraction fields (dict-based access)
extraction_type = extraction.get("extraction_type", "")
extracted_who_str = extraction.get("extracted_who", "[]")
extracted_who = json.loads(extracted_who_str) if isinstance(extracted_who_str, str) else extracted_who_str or []
extracted_what = extraction.get("extracted_what", "")
extracted_when = extraction.get("extracted_when", "")
result = validator.create_event(
summary=extracted_what or "Event",
description=f"From Telegram shadow extraction #{extraction_id}\nType: {extraction_type}",
extracted_when=extracted_when,
)
if result["status"] == "created":
event_url = result.get("event_url", "")
msg = f"✅ Calendar event created!\n📅 {result.get('event_summary')}\n🕐 {result.get('event_start')}"
if event_url:
msg += f"\n🔗 {event_url}"
# Update shadow DB
db.update_calendar_check(
extraction_id=extraction_id,
status="created_via_admin",
event_id=result.get("event_id"),
event_title=result.get("event_summary"),
event_time=str(result.get("event_start", "")),
fuzzy_score=0.0,
)
await bot.edit_message_text(chat_id, msg_id, msg)
else:
await bot.edit_message_text(
chat_id, msg_id,
f"❌ Failed to create event: {result.get('error', 'Unknown error')}"
)
except Exception as e:
logging.exception("Failed to create event from shadow extraction: %s", e)
await bot.edit_message_text(
chat_id, msg_id,
f"❌ Error creating event: {e}"
)
async def _handle_skip_event_callback(callback: dict, bot: _BotProxy) -> None:
"""Handle admin skipping event creation from a NO_MATCH extraction.
Callback data format: skip_event:{extraction_id}
"""
cb_data = callback.get("data", "")
parts = cb_data.split(":")
if len(parts) < 2:
await bot.answer_callback_query(callback["id"], "Invalid callback data")
return
extraction_id = int(parts[1])
chat_id = callback["message"]["chat"]["id"]
msg_id = callback["message"]["message_id"]
# Update shadow DB
from icarus.core.observer.shadow_database import ShadowDatabase
db = ShadowDatabase()
db.update_calendar_check(
extraction_id=extraction_id,
status="skipped_by_admin",
event_id=None,
event_title=None,
event_time=None,
fuzzy_score=0.0,
)
await bot.answer_callback_query(callback["id"], "Event skipped")
await bot.edit_message_text(
chat_id, msg_id,
f"⏭️ Skipped — no calendar event created for extraction #{extraction_id}."
)