📄 handler.py 27,115 bytes Sunday 22:12 📋 Raw

"""Telegram webhook handler and bot API client."""
import os
import io
import re
import logging
from pathlib import Path
from fastapi import Request, HTTPException
import httpx
from icarus.core.config.staging import (
TELEGRAM_BOT_TOKEN,
TELEGRAM_ALLOWED_USERS,
OLLAMA_BASE_URL
)
from icarus.core.vision.pipeline import process_attachment
from icarus.core.handlers.recipe_toggle import (
handle_toggle,
handle_commit,
handle_cancel,
handle_recipe_url,
handle_groceries_command,
handle_clear_groceries,
)
from icarus.core.handlers.calendar_action import (
handle_calendar_callback,
build_action_buttons,
)
from icarus.core.db.documents import store_briefing_event

Known recipe domains — URLs from these are always treated as recipe URLs.

URLs from unknown domains are still attempted if they look recipe-like

(contain /recipe/ in the path) or if no command/text match is found.

RECIPE_DOMAINS = [
"allrecipes.com",
"budgetbytes.com",
"seriouseats.com",
"foodnetwork.com",
"cooking.nytimes.com",
"bonappetit.com",
"epicurious.com",
"smittenkitchen.com",
"thestayathomechef.com",
"pinchofyum.com",
"cookieandkate.com",
"minimalistbaker.com",
"thewoksoflife.com",
"177milkstreet.com",
"americastestkitchen.com",
"delish.com",
"tasty.co",
"kitchn.com",
"recipetineats.com",
"halfbakedharvest.com",
"damndelicious.net",
"therecipecritic.com",
"spendwithpennies.com",
"joshuaweissman.com",
"grilledcheesesocial.com",
]

Pattern for URLs that look like recipe pages even on unknown domains

RECIPE_URL_PATTERN = re.compile(r'https?://[^/]+/recipe/', re.IGNORECASE)

URL_PATTERN = re.compile(r'https?://[^\s<>"{}|\^`[]]+')

---------------------------------------------------------------------------

Bot API proxy — thin wrapper over Telegram Bot API for recipe_toggle handlers

---------------------------------------------------------------------------

class _BotProxy:
"""Provides the bot API surface expected by recipe_toggle handlers.

Wraps send_message, edit_message_text, and answer_callback_query
using direct HTTP calls to the Telegram Bot API.
"""

async def send_message(self, chat_id: int, text: str, reply_markup=None, parse_mode="unset") -> dict:
    payload = {
        "chat_id": chat_id,
        "text": text[:4000],
    }
    if reply_markup:
        payload["reply_markup"] = reply_markup
    # Determine parse_mode: explicit "none" means no parse_mode,
    # "unset" (default) with reply_markup defaults to MarkdownV2,
    # "unset" without reply_markup means no parse_mode.
    if parse_mode != "unset":
        # Explicitly set ("none" = plain text, or a real format like "MarkdownV2")
        if parse_mode and parse_mode != "none":
            payload["parse_mode"] = parse_mode
        # else: "none" or empty string → no parse_mode at all
    elif reply_markup:
        # Default: MarkdownV2 when keyboard is present (recipe toggle UI)
        payload["parse_mode"] = "MarkdownV2"
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
                json=payload,
            )
            if resp.status_code == 400:
                # MarkdownV2 parse failed — log and retry without parse_mode
                logging.warning("send_message MarkdownV2 failed: %s", resp.text[:300])
                # Retry without parse_mode — keep reply_markup if present
                fallback_payload = {"chat_id": chat_id, "text": text[:4000]}
                if reply_markup:
                    fallback_payload["reply_markup"] = reply_markup
                resp2 = await client.post(
                    f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
                    json=fallback_payload,
                )
                if resp2.status_code == 200:
                    return resp2.json()
                logging.error("send_message fallback also failed: %s", resp2.text[:300])
            resp.raise_for_status()
            return resp.json()
    except Exception as e:
        logging.error("send_message failed: %s", e)
        return {"ok": False, "error": str(e)}

async def edit_message_text(
    self,
    chat_id: int,
    message_id: int,
    text: str,
    reply_markup=None,
) -> dict:
    payload = {
        "chat_id": chat_id,
        "message_id": message_id,
        "text": text[:4000],
        "parse_mode": "MarkdownV2",
    }
    # Handle reply_markup: None means remove inline keyboard,
    # dict means set new inline keyboard
    if reply_markup is None:
        # To remove inline keyboard, we must NOT include reply_markup at all
        # (Telegram requires either no field or inline_keyboard: [])
        # We'll use inline_keyboard: [] to explicitly remove it
        payload["reply_markup"] = {"inline_keyboard": []}
    elif reply_markup:
        payload["reply_markup"] = reply_markup
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/editMessageText",
                json=payload,
            )
            if resp.status_code == 400:
                # "message is not modified" — safe to ignore
                logging.debug("edit_message_text 400: %s", resp.text[:200])
                return {"ok": True, "description": "not modified"}
            resp.raise_for_status()
            return resp.json()
    except Exception as e:
        logging.warning("edit_message_text failed: %s", e)
        return {"ok": False, "error": str(e)}

async def answer_callback_query(self, callback_query_id: str, text: str = "") -> dict:
    payload = {
        "callback_query_id": callback_query_id,
    }
    if text:
        payload["text"] = text[:200]
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/answerCallbackQuery",
                json=payload,
            )
            resp.raise_for_status()
            return resp.json()
    except Exception as e:
        logging.warning("answer_callback_query failed: %s", e)
        return {"ok": False, "error": str(e)}

---------------------------------------------------------------------------

Update processing (used by both webhook and polling)

---------------------------------------------------------------------------

async def process_update(update: dict) -> None:
"""Process a Telegram update dict directly.

Routes message updates and callback queries (inline keyboard).
Used by both the webhook handler and the long-polling loop.
"""
logging.info("HANDLER: raw keys=%s", list(update.keys()))

# ---- Callback query routing (inline keyboard buttons) ----
if "callback_query" in update:
    callback = update["callback_query"]
    # Validate user
    cb_user_id = str(callback.get("from", {}).get("id", ""))
    if cb_user_id not in TELEGRAM_ALLOWED_USERS:
        return {"ok": True}

    cb_data = callback.get("data", "")
    bot = _BotProxy()

    if cb_data.startswith("toggle:"):
        await handle_toggle(callback, bot)
    elif cb_data.startswith("commit:"):
        await handle_commit(callback, bot)
    elif cb_data.startswith("cancel:"):
        await handle_cancel(callback, bot)
    elif cb_data == "clear:groceries":
        await handle_clear_groceries(callback, bot)
    elif cb_data.startswith("calendar_add|") or cb_data.startswith("calendar_done|"):
        await handle_calendar_callback(callback, bot)
    elif cb_data.startswith("create_event:"):
        await _handle_create_event_callback(callback, bot)
    elif cb_data.startswith("skip_event:"):
        await _handle_skip_event_callback(callback, bot)
    else:
        await bot.answer_callback_query(callback["id"], "Unknown action")

    return

# ---- Message routing ----
user_id = str(update.get("message", {}).get("from", {}).get("id", ""))
logging.info("HANDLER: message present=%s, user_id=%r, allowed=%s", "message" in update, user_id, user_id in TELEGRAM_ALLOWED_USERS)
if user_id not in TELEGRAM_ALLOWED_USERS:
    return  # Silent ignore

message = update.get("message", {})
chat_id = message.get("chat", {}).get("id")

if not chat_id:
    return {"ok": True}

# Handle photo (largest size)
if "photo" in message:
    photo = message["photo"][-1]
    file_id = photo["file_id"]
    await _process_file(file_id, chat_id, "image.jpg", user_id)
    return {"ok": True}

# Handle document (PDF, etc.)
if "document" in message:
    doc = message["document"]
    file_id = doc["file_id"]
    file_name = doc.get("file_name", "document.pdf")
    mime_type = doc.get("mime_type", "")

    # Validate supported types
    if not _is_supported_file(file_name, mime_type):
        await _send_message(
            chat_id,
            "📄 Please send PDF or image files only (PNG, JPG)."
        )
        return {"ok": True}

    await _process_file(file_id, chat_id, file_name, user_id)
    return {"ok": True}

# Handle text commands and URLs
if "text" not in message:
    return

text = message["text"]
logging.info("TEXT handler: text=%r, len=%d", text[:200], len(text))

# Check for recipe URLs before command handling
# Strategy: accept URLs from known recipe domains OR URLs with /recipe/ in the path.
# For any other URL, still try it as a recipe — fetch_recipe handles failures gracefully.
urls = URL_PATTERN.findall(text)
logging.info("TEXT handler: URLs found=%r", urls)
recipe_url = None
for url in urls:
    # Known recipe domain → always try
    if any(domain in url for domain in RECIPE_DOMAINS):
        recipe_url = url
        logging.info("TEXT handler: matched known domain for %r", url)
        break
    # URL looks like a recipe page (/recipe/ in path) → try
    if RECIPE_URL_PATTERN.search(url):
        recipe_url = url
        logging.info("TEXT handler: matched recipe-like URL for %r", url)
        break

# If we found any URL but none matched recipe patterns, try the first URL anyway.
# Many recipe pages don't have /recipe/ in the URL. fetch_recipe will handle failures.
if recipe_url is None and urls:
    recipe_url = urls[0]
    logging.info("TEXT handler: trying first URL as recipe candidate: %r", recipe_url)

logging.info("TEXT handler: recipe_url=%r", recipe_url)

if recipe_url:
    logging.info("TEXT handler: Calling handle_recipe_url with url=%r", recipe_url)
    try:
        await handle_recipe_url(message, _BotProxy(), url=recipe_url)
    except Exception as e:
        logging.exception("TEXT handler: handle_recipe_url raised: %s", e)
        await _send_message(chat_id, f" Recipe handler error: {e}")
    return

if text == "/start":
    await _send_message(
        chat_id,
        "👋 Send me a document, photo, or recipe URL and I'll help you out!"
    )
elif text == "/status":
    status = await _get_status()
    await _send_message(chat_id, status)
elif text == "/groceries" or text.startswith("/groceries "):
    parts = text.strip().split()
    if len(parts) > 1 and parts[1].lower() == "clear":
        await handle_clear_groceries(message, _BotProxy())
    else:
        await handle_groceries_command(message, _BotProxy())
else:
    await _send_message(
        chat_id,
        "📎 Send a PDF, image, or recipe URL to get started."
    )

---------------------------------------------------------------------------

Webhook handler (legacy — kept for backward compatibility)

---------------------------------------------------------------------------

async def handle_telegram_update(request: Request) -> dict:
"""Handle incoming Telegram webhook update.

Routes message updates and callback queries (inline keyboard).
Simply wraps process_update() for webhook compatibility.

Returns: {"ok": True} to acknowledge receipt
"""
try:
    data = await request.json()
except Exception as e:
    logging.warning("Failed to parse webhook JSON: %s", e)
    raise HTTPException(status_code=400, detail="Invalid JSON body")

await process_update(data)
return {"ok": True}

def _is_supported_file(file_name: str, mime_type: str) -> bool:
"""Check if file type is supported."""
supported_ext = [".pdf", ".png", ".jpg", ".jpeg"]
supported_mime = [
"application/pdf",
"image/png",
"image/jpeg",
"image/jpg"
]

has_supported_ext = any(file_name.lower().endswith(ext) for ext in supported_ext)
has_supported_mime = mime_type in supported_mime

return has_supported_ext or has_supported_mime

async def _process_file(file_id: str, chat_id: int, file_name: str, user_id: str):
"""Download file from Telegram, process, send briefing."""
# Send "processing" message
processing_msg = await _send_message(chat_id, "⏳ Processing document...")

try:
    # Download file from Telegram
    file_bytes = await _download_telegram_file(file_id)

    # Build email_meta for pipeline
    email_meta = {
        "from": f"telegram:{user_id}",
        "subject": file_name,
        "date": "now",
        "to": "family",
        "body": ""
    }

    # Process through vision pipeline
    result = await process_attachment(email_meta, file_bytes, file_name)

    # Debug: log the result structure
    print(f"DEBUG: Vision pipeline result keys: {result.keys()}")
    print(f"DEBUG: Briefing type: {type(result.get('briefing'))}")

    # Format and send response
    text = _format_briefing(result)

    # Check for calendar action buttons
    reply_markup = _build_briefing_buttons(result, file_name)

    await _send_message(chat_id, text, markdown=True, reply_markup=reply_markup)

except Exception as e:
    import traceback
    error_detail = traceback.format_exc()
    print(f"Processing error: {error_detail}")
    await _send_message(
        chat_id,
        f"❌ Processing failed: {str(e)[:200]}"
    )

async def _download_telegram_file(file_id: str) -> bytes:
"""Download file from Telegram servers."""
async with httpx.AsyncClient() as client:
# Get file path
resp = await client.get(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile",
params={"file_id": file_id}
)
resp.raise_for_status()
result = resp.json()

    if not result.get("ok"):
        raise ValueError(f"Failed to get file: {result}")

    file_path = result["result"]["file_path"]

    # Download actual file
    file_resp = await client.get(
        f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}"
    )
    file_resp.raise_for_status()

    return file_resp.content

async def _send_message(chat_id: int, text: str, markdown: bool = False, reply_markup: dict | None = None) -> dict:
"""Send message via Telegram Bot API.

Args:
    chat_id: Telegram chat ID
    text: Message text (max 4000 chars)
    markdown: Whether to use Markdown parse mode
    reply_markup: Optional inline keyboard dict
"""
payload = {
    "chat_id": chat_id,
    "text": text[:4000]  # Telegram limit
}

if markdown:
    payload["parse_mode"] = "Markdown"

if reply_markup:
    payload["reply_markup"] = reply_markup

try:
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
            json=payload
        )
        if resp.status_code == 400:
            # Bad request - log but don't crash
            print(f"Telegram 400 error: {resp.text}")
            # Retry without markdown if it was a parse issue
            if markdown:
                fallback_payload = {"chat_id": chat_id, "text": text[:4000]}
                if reply_markup:
                    fallback_payload["reply_markup"] = reply_markup
                resp2 = await client.post(
                    f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
                    json=fallback_payload,
                )
                if resp2.status_code == 200:
                    return resp2.json()
                print(f"Telegram fallback also failed: {resp2.text[:300]}")
            return {"ok": False, "error": resp.text}
        resp.raise_for_status()
        return resp.json()
except Exception as e:
    print(f"Failed to send message: {e}")
    return {"ok": False, "error": str(e)}

def _format_briefing(result: dict) -> str:
"""Format briefing card for Telegram (Markdown)."""
# Handle different result structures
if not isinstance(result, dict):
return "❌ Invalid briefing result (not a dict)"

briefing = result.get("briefing", {})

# If briefing is not a dict, try to use result directly (flat structure)
if not isinstance(briefing, dict):
    briefing = result

# Extract fields with fallbacks
title = briefing.get("title") or briefing.get("Title", "Document Briefing")
summary = briefing.get("summary") or briefing.get("Summary", "No summary available.")
key_details = briefing.get("key_details") or briefing.get("keyDetails", {})
if not isinstance(key_details, dict):
    key_details = {}

conflicts = briefing.get("conflicts", [])
if not isinstance(conflicts, list):
    conflicts = []

actions = briefing.get("suggested_actions") or briefing.get("suggestedActions", [])
if not isinstance(actions, list):
    actions = []

confidence = briefing.get("confidence", 0.5)

lines = [
    f"📋 *{title}*",
    "",
    summary,
    ""
]

# Key details
if key_details:
    lines.append("*Key Details:*")
    for key, value in key_details.items():
        if value:
            lines.append(f"• {key.replace('_', ' ').title()}: {value}")
    lines.append("")

# Conflicts
if conflicts:
    lines.append("⚠️ *Conflicts:*")
    for conflict in conflicts:
        lines.append(f"• {conflict}")
    lines.append("")

# Suggested actions
if actions:
    lines.append("✅ *Suggested Actions:*")
    for action in actions:
        lines.append(f"• {action}")

# Confidence
if confidence:
    lines.append(f"")
    lines.append(f"_Confidence: {int(float(confidence) * 100)}%_")

return "\n".join(lines)

def _build_briefing_buttons(result: dict, filename: str = "") -> dict | None:
"""Build inline keyboard buttons for a briefing card if calendar-applicable.

Stores event data in the documents DB so it can be retrieved when
the user taps "Add to Calendar".

Returns:
    Telegram ReplyMarkupKeyboard dict, or None if no buttons applicable.
"""
briefing = result.get("briefing", {})
if not isinstance(briefing, dict):
    briefing = result

action_info = briefing.get("action_buttons")
if not action_info or not isinstance(action_info, dict):
    return None

# Only show buttons for calendar-applicable events
if not action_info.get("can_add_to_calendar"):
    return None

# Extract event details from the briefing
summary = action_info.get("event_summary", briefing.get("title", "Event"))
start_time = action_info.get("event_start", "")
end_time = action_info.get("event_end", "")
location = action_info.get("event_location", "")
category = action_info.get("category", "event")

# Get additional details from the briefing
key_details = briefing.get("key_details", {})
if not isinstance(key_details, dict):
    key_details = {}

description = briefing.get("summary", "")
if key_details.get("deadline"):
    description += f"\nDeadline: {key_details['deadline']}"
if key_details.get("contact"):
    description += f"\nContact: {key_details['contact']}"
if key_details.get("requirements"):
    description += f"\nRequirements: {key_details['requirements']}"

# Try to parse/construct start/end times
# If key_details has individual date/time fields, combine them
date_val = key_details.get("date", "") or key_details.get("Date", "")
time_val = key_details.get("time", "") or key_details.get("Time", "")

if not start_time and date_val:
    start_time = f"{date_val} {time_val}" if time_val else date_val
elif not start_time:
    # No date/time available — can't add to calendar meaningfully
    return None

# Store the event in the documents DB for later retrieval
try:
    store_result = store_briefing_event(
        summary=summary,
        start_time=start_time,
        end_time=end_time,
        location=location if isinstance(location, str) else str(location),
        description=description[:500],  # Limit description length
        category=category,
        who=[],  # Populated from family inference if available
        source_filename=filename,
        full_briefing=briefing,
    )
    doc_id = store_result["doc_id"]
    event_hash = store_result["event_hash"]
except Exception as e:
    print(f"Failed to store briefing event: {e}")
    return None

# Build and return the action buttons
return build_action_buttons(doc_id, event_hash, has_datetime=True)

---------------------------------------------------------------------------

Shadow Bot admin confirmation callbacks (create_event / skip_event)

---------------------------------------------------------------------------

async def _handle_create_event_callback(callback: dict, bot: _BotProxy) -> None:
"""Handle admin confirmation to create a calendar event from a NO_MATCH extraction.

Callback data format: create_event:{extraction_id}
"""
import json
from datetime import datetime

cb_data = callback.get("data", "")
parts = cb_data.split(":")
if len(parts) < 2:
    await bot.answer_callback_query(callback["id"], "Invalid callback data")
    return

extraction_id = int(parts[1])
chat_id = callback["message"]["chat"]["id"]
msg_id = callback["message"]["message_id"]

# Fetch extraction from shadow DB
from icarus.core.observer.shadow_database import ShadowDatabase
db = ShadowDatabase()

extraction = db.get_extraction_by_id(extraction_id)
if not extraction:
    await bot.answer_callback_query(callback["id"], f"Extraction {extraction_id} not found")
    await bot.edit_message_text(
        chat_id, msg_id,
        f"❌ Extraction {extraction_id} not found in shadow database."
    )
    return

await bot.answer_callback_query(callback["id"], "Creating event...")
await bot.edit_message_text(
    chat_id, msg_id,
    f"⏳ Creating calendar event for extraction #{extraction_id}..."
)

try:
    # Use CalendarValidator to create the event
    from icarus.core.observer.calendar_validator import CalendarValidator
    validator = CalendarValidator()

    # Parse extraction fields (dict-based access)
    extraction_type = extraction.get("extraction_type", "")
    extracted_who_str = extraction.get("extracted_who", "[]")
    extracted_who = json.loads(extracted_who_str) if isinstance(extracted_who_str, str) else extracted_who_str or []
    extracted_what = extraction.get("extracted_what", "")
    extracted_when = extraction.get("extracted_when", "")

    result = validator.create_event(
        summary=extracted_what or "Event",
        description=f"From Telegram shadow extraction #{extraction_id}\nType: {extraction_type}",
        extracted_when=extracted_when,
    )

    if result["status"] == "created":
        event_url = result.get("event_url", "")
        msg = f"✅ Calendar event created!\n📅 {result.get('event_summary')}\n🕐 {result.get('event_start')}"
        if event_url:
            msg += f"\n🔗 {event_url}"

        # Update shadow DB
        db.update_calendar_check(
            extraction_id=extraction_id,
            status="created_via_admin",
            event_id=result.get("event_id"),
            event_title=result.get("event_summary"),
            event_time=str(result.get("event_start", "")),
            fuzzy_score=0.0,
        )

        await bot.edit_message_text(chat_id, msg_id, msg)
    else:
        await bot.edit_message_text(
            chat_id, msg_id,
            f"❌ Failed to create event: {result.get('error', 'Unknown error')}"
        )

except Exception as e:
    logging.exception("Failed to create event from shadow extraction: %s", e)
    await bot.edit_message_text(
        chat_id, msg_id,
        f"❌ Error creating event: {e}"
    )

async def _handle_skip_event_callback(callback: dict, bot: _BotProxy) -> None:
"""Handle admin skipping event creation from a NO_MATCH extraction.

Callback data format: skip_event:{extraction_id}
"""
cb_data = callback.get("data", "")
parts = cb_data.split(":")
if len(parts) < 2:
    await bot.answer_callback_query(callback["id"], "Invalid callback data")
    return

extraction_id = int(parts[1])
chat_id = callback["message"]["chat"]["id"]
msg_id = callback["message"]["message_id"]

# Update shadow DB
from icarus.core.observer.shadow_database import ShadowDatabase
db = ShadowDatabase()
db.update_calendar_check(
    extraction_id=extraction_id,
    status="skipped_by_admin",
    event_id=None,
    event_title=None,
    event_time=None,
    fuzzy_score=0.0,
)

await bot.answer_callback_query(callback["id"], "Event skipped")
await bot.edit_message_text(
    chat_id, msg_id,
    f"⏭️ Skipped — no calendar event created for extraction #{extraction_id}."
)