"""Telegram webhook handler and bot API client.""" import os import io import re import logging from pathlib import Path from fastapi import Request, HTTPException import httpx from icarus.core.config.staging import ( TELEGRAM_BOT_TOKEN, TELEGRAM_ALLOWED_USERS, OLLAMA_BASE_URL ) from icarus.core.vision.pipeline import process_attachment from icarus.core.handlers.recipe_toggle import ( handle_toggle, handle_commit, handle_cancel, handle_recipe_url, handle_groceries_command, handle_clear_groceries, ) from icarus.core.handlers.calendar_action import ( handle_calendar_callback, build_action_buttons, ) from icarus.core.db.documents import store_briefing_event # Known recipe domains — URLs from these are always treated as recipe URLs. # URLs from unknown domains are still attempted if they look recipe-like # (contain /recipe/ in the path) or if no command/text match is found. RECIPE_DOMAINS = [ "allrecipes.com", "budgetbytes.com", "seriouseats.com", "foodnetwork.com", "cooking.nytimes.com", "bonappetit.com", "epicurious.com", "smittenkitchen.com", "thestayathomechef.com", "pinchofyum.com", "cookieandkate.com", "minimalistbaker.com", "thewoksoflife.com", "177milkstreet.com", "americastestkitchen.com", "delish.com", "tasty.co", "kitchn.com", "recipetineats.com", "halfbakedharvest.com", "damndelicious.net", "therecipecritic.com", "spendwithpennies.com", "joshuaweissman.com", "grilledcheesesocial.com", ] # Pattern for URLs that look like recipe pages even on unknown domains RECIPE_URL_PATTERN = re.compile(r'https?://[^/]+/recipe/', re.IGNORECASE) URL_PATTERN = re.compile(r'https?://[^\s<>"{}|\\^`\[\]]+') # --------------------------------------------------------------------------- # Bot API proxy — thin wrapper over Telegram Bot API for recipe_toggle handlers # --------------------------------------------------------------------------- class _BotProxy: """Provides the bot API surface expected by recipe_toggle handlers. Wraps send_message, edit_message_text, and answer_callback_query using direct HTTP calls to the Telegram Bot API. """ async def send_message(self, chat_id: int, text: str, reply_markup=None, parse_mode="unset") -> dict: payload = { "chat_id": chat_id, "text": text[:4000], } if reply_markup: payload["reply_markup"] = reply_markup # Determine parse_mode: explicit "none" means no parse_mode, # "unset" (default) with reply_markup defaults to MarkdownV2, # "unset" without reply_markup means no parse_mode. if parse_mode != "unset": # Explicitly set ("none" = plain text, or a real format like "MarkdownV2") if parse_mode and parse_mode != "none": payload["parse_mode"] = parse_mode # else: "none" or empty string → no parse_mode at all elif reply_markup: # Default: MarkdownV2 when keyboard is present (recipe toggle UI) payload["parse_mode"] = "MarkdownV2" try: async with httpx.AsyncClient() as client: resp = await client.post( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage", json=payload, ) if resp.status_code == 400: # MarkdownV2 parse failed — log and retry without parse_mode logging.warning("send_message MarkdownV2 failed: %s", resp.text[:300]) # Retry without parse_mode — keep reply_markup if present fallback_payload = {"chat_id": chat_id, "text": text[:4000]} if reply_markup: fallback_payload["reply_markup"] = reply_markup resp2 = await client.post( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage", json=fallback_payload, ) if resp2.status_code == 200: return resp2.json() logging.error("send_message fallback also failed: %s", resp2.text[:300]) resp.raise_for_status() return resp.json() except Exception as e: logging.error("send_message failed: %s", e) return {"ok": False, "error": str(e)} async def edit_message_text( self, chat_id: int, message_id: int, text: str, reply_markup=None, ) -> dict: payload = { "chat_id": chat_id, "message_id": message_id, "text": text[:4000], "parse_mode": "MarkdownV2", } # Handle reply_markup: None means remove inline keyboard, # dict means set new inline keyboard if reply_markup is None: # To remove inline keyboard, we must NOT include reply_markup at all # (Telegram requires either no field or inline_keyboard: []) # We'll use inline_keyboard: [] to explicitly remove it payload["reply_markup"] = {"inline_keyboard": []} elif reply_markup: payload["reply_markup"] = reply_markup try: async with httpx.AsyncClient() as client: resp = await client.post( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/editMessageText", json=payload, ) if resp.status_code == 400: # "message is not modified" — safe to ignore logging.debug("edit_message_text 400: %s", resp.text[:200]) return {"ok": True, "description": "not modified"} resp.raise_for_status() return resp.json() except Exception as e: logging.warning("edit_message_text failed: %s", e) return {"ok": False, "error": str(e)} async def answer_callback_query(self, callback_query_id: str, text: str = "") -> dict: payload = { "callback_query_id": callback_query_id, } if text: payload["text"] = text[:200] try: async with httpx.AsyncClient() as client: resp = await client.post( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/answerCallbackQuery", json=payload, ) resp.raise_for_status() return resp.json() except Exception as e: logging.warning("answer_callback_query failed: %s", e) return {"ok": False, "error": str(e)} # --------------------------------------------------------------------------- # Update processing (used by both webhook and polling) # --------------------------------------------------------------------------- async def process_update(update: dict) -> None: """Process a Telegram update dict directly. Routes message updates and callback queries (inline keyboard). Used by both the webhook handler and the long-polling loop. """ logging.info("HANDLER: raw keys=%s", list(update.keys())) # ---- Callback query routing (inline keyboard buttons) ---- if "callback_query" in update: callback = update["callback_query"] # Validate user cb_user_id = str(callback.get("from", {}).get("id", "")) if cb_user_id not in TELEGRAM_ALLOWED_USERS: return {"ok": True} cb_data = callback.get("data", "") bot = _BotProxy() if cb_data.startswith("toggle:"): await handle_toggle(callback, bot) elif cb_data.startswith("commit:"): await handle_commit(callback, bot) elif cb_data.startswith("cancel:"): await handle_cancel(callback, bot) elif cb_data == "clear:groceries": await handle_clear_groceries(callback, bot) elif cb_data.startswith("calendar_add|") or cb_data.startswith("calendar_done|"): await handle_calendar_callback(callback, bot) elif cb_data.startswith("create_event:"): await _handle_create_event_callback(callback, bot) elif cb_data.startswith("skip_event:"): await _handle_skip_event_callback(callback, bot) else: await bot.answer_callback_query(callback["id"], "Unknown action") return # ---- Message routing ---- user_id = str(update.get("message", {}).get("from", {}).get("id", "")) logging.info("HANDLER: message present=%s, user_id=%r, allowed=%s", "message" in update, user_id, user_id in TELEGRAM_ALLOWED_USERS) if user_id not in TELEGRAM_ALLOWED_USERS: return # Silent ignore message = update.get("message", {}) chat_id = message.get("chat", {}).get("id") if not chat_id: return {"ok": True} # Handle photo (largest size) if "photo" in message: photo = message["photo"][-1] file_id = photo["file_id"] await _process_file(file_id, chat_id, "image.jpg", user_id) return {"ok": True} # Handle document (PDF, etc.) if "document" in message: doc = message["document"] file_id = doc["file_id"] file_name = doc.get("file_name", "document.pdf") mime_type = doc.get("mime_type", "") # Validate supported types if not _is_supported_file(file_name, mime_type): await _send_message( chat_id, "📄 Please send PDF or image files only (PNG, JPG)." ) return {"ok": True} await _process_file(file_id, chat_id, file_name, user_id) return {"ok": True} # Handle text commands and URLs if "text" not in message: return text = message["text"] logging.info("TEXT handler: text=%r, len=%d", text[:200], len(text)) # Check for recipe URLs before command handling # Strategy: accept URLs from known recipe domains OR URLs with /recipe/ in the path. # For any other URL, still try it as a recipe — fetch_recipe handles failures gracefully. urls = URL_PATTERN.findall(text) logging.info("TEXT handler: URLs found=%r", urls) recipe_url = None for url in urls: # Known recipe domain → always try if any(domain in url for domain in RECIPE_DOMAINS): recipe_url = url logging.info("TEXT handler: matched known domain for %r", url) break # URL looks like a recipe page (/recipe/ in path) → try if RECIPE_URL_PATTERN.search(url): recipe_url = url logging.info("TEXT handler: matched recipe-like URL for %r", url) break # If we found any URL but none matched recipe patterns, try the first URL anyway. # Many recipe pages don't have /recipe/ in the URL. fetch_recipe will handle failures. if recipe_url is None and urls: recipe_url = urls[0] logging.info("TEXT handler: trying first URL as recipe candidate: %r", recipe_url) logging.info("TEXT handler: recipe_url=%r", recipe_url) if recipe_url: logging.info("TEXT handler: Calling handle_recipe_url with url=%r", recipe_url) try: await handle_recipe_url(message, _BotProxy(), url=recipe_url) except Exception as e: logging.exception("TEXT handler: handle_recipe_url raised: %s", e) await _send_message(chat_id, f"❌ Recipe handler error: {e}") return if text == "/start": await _send_message( chat_id, "👋 Send me a document, photo, or recipe URL and I'll help you out!" ) elif text == "/status": status = await _get_status() await _send_message(chat_id, status) elif text == "/groceries" or text.startswith("/groceries "): parts = text.strip().split() if len(parts) > 1 and parts[1].lower() == "clear": await handle_clear_groceries(message, _BotProxy()) else: await handle_groceries_command(message, _BotProxy()) else: await _send_message( chat_id, "📎 Send a PDF, image, or recipe URL to get started." ) # --------------------------------------------------------------------------- # Webhook handler (legacy — kept for backward compatibility) # --------------------------------------------------------------------------- async def handle_telegram_update(request: Request) -> dict: """Handle incoming Telegram webhook update. Routes message updates and callback queries (inline keyboard). Simply wraps process_update() for webhook compatibility. Returns: {"ok": True} to acknowledge receipt """ try: data = await request.json() except Exception as e: logging.warning("Failed to parse webhook JSON: %s", e) raise HTTPException(status_code=400, detail="Invalid JSON body") await process_update(data) return {"ok": True} def _is_supported_file(file_name: str, mime_type: str) -> bool: """Check if file type is supported.""" supported_ext = [".pdf", ".png", ".jpg", ".jpeg"] supported_mime = [ "application/pdf", "image/png", "image/jpeg", "image/jpg" ] has_supported_ext = any(file_name.lower().endswith(ext) for ext in supported_ext) has_supported_mime = mime_type in supported_mime return has_supported_ext or has_supported_mime async def _process_file(file_id: str, chat_id: int, file_name: str, user_id: str): """Download file from Telegram, process, send briefing.""" # Send "processing" message processing_msg = await _send_message(chat_id, "⏳ Processing document...") try: # Download file from Telegram file_bytes = await _download_telegram_file(file_id) # Build email_meta for pipeline email_meta = { "from": f"telegram:{user_id}", "subject": file_name, "date": "now", "to": "family", "body": "" } # Process through vision pipeline result = await process_attachment(email_meta, file_bytes, file_name) # Debug: log the result structure print(f"DEBUG: Vision pipeline result keys: {result.keys()}") print(f"DEBUG: Briefing type: {type(result.get('briefing'))}") # Format and send response text = _format_briefing(result) # Check for calendar action buttons reply_markup = _build_briefing_buttons(result, file_name) await _send_message(chat_id, text, markdown=True, reply_markup=reply_markup) except Exception as e: import traceback error_detail = traceback.format_exc() print(f"Processing error: {error_detail}") await _send_message( chat_id, f"❌ Processing failed: {str(e)[:200]}" ) async def _download_telegram_file(file_id: str) -> bytes: """Download file from Telegram servers.""" async with httpx.AsyncClient() as client: # Get file path resp = await client.get( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile", params={"file_id": file_id} ) resp.raise_for_status() result = resp.json() if not result.get("ok"): raise ValueError(f"Failed to get file: {result}") file_path = result["result"]["file_path"] # Download actual file file_resp = await client.get( f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}" ) file_resp.raise_for_status() return file_resp.content async def _send_message(chat_id: int, text: str, markdown: bool = False, reply_markup: dict | None = None) -> dict: """Send message via Telegram Bot API. Args: chat_id: Telegram chat ID text: Message text (max 4000 chars) markdown: Whether to use Markdown parse mode reply_markup: Optional inline keyboard dict """ payload = { "chat_id": chat_id, "text": text[:4000] # Telegram limit } if markdown: payload["parse_mode"] = "Markdown" if reply_markup: payload["reply_markup"] = reply_markup try: async with httpx.AsyncClient() as client: resp = await client.post( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage", json=payload ) if resp.status_code == 400: # Bad request - log but don't crash print(f"Telegram 400 error: {resp.text}") # Retry without markdown if it was a parse issue if markdown: fallback_payload = {"chat_id": chat_id, "text": text[:4000]} if reply_markup: fallback_payload["reply_markup"] = reply_markup resp2 = await client.post( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage", json=fallback_payload, ) if resp2.status_code == 200: return resp2.json() print(f"Telegram fallback also failed: {resp2.text[:300]}") return {"ok": False, "error": resp.text} resp.raise_for_status() return resp.json() except Exception as e: print(f"Failed to send message: {e}") return {"ok": False, "error": str(e)} def _format_briefing(result: dict) -> str: """Format briefing card for Telegram (Markdown).""" # Handle different result structures if not isinstance(result, dict): return "❌ Invalid briefing result (not a dict)" briefing = result.get("briefing", {}) # If briefing is not a dict, try to use result directly (flat structure) if not isinstance(briefing, dict): briefing = result # Extract fields with fallbacks title = briefing.get("title") or briefing.get("Title", "Document Briefing") summary = briefing.get("summary") or briefing.get("Summary", "No summary available.") key_details = briefing.get("key_details") or briefing.get("keyDetails", {}) if not isinstance(key_details, dict): key_details = {} conflicts = briefing.get("conflicts", []) if not isinstance(conflicts, list): conflicts = [] actions = briefing.get("suggested_actions") or briefing.get("suggestedActions", []) if not isinstance(actions, list): actions = [] confidence = briefing.get("confidence", 0.5) lines = [ f"📋 *{title}*", "", summary, "" ] # Key details if key_details: lines.append("*Key Details:*") for key, value in key_details.items(): if value: lines.append(f"• {key.replace('_', ' ').title()}: {value}") lines.append("") # Conflicts if conflicts: lines.append("⚠️ *Conflicts:*") for conflict in conflicts: lines.append(f"• {conflict}") lines.append("") # Suggested actions if actions: lines.append("✅ *Suggested Actions:*") for action in actions: lines.append(f"• {action}") # Confidence if confidence: lines.append(f"") lines.append(f"_Confidence: {int(float(confidence) * 100)}%_") return "\n".join(lines) def _build_briefing_buttons(result: dict, filename: str = "") -> dict | None: """Build inline keyboard buttons for a briefing card if calendar-applicable. Stores event data in the documents DB so it can be retrieved when the user taps "Add to Calendar". Returns: Telegram ReplyMarkupKeyboard dict, or None if no buttons applicable. """ briefing = result.get("briefing", {}) if not isinstance(briefing, dict): briefing = result action_info = briefing.get("action_buttons") if not action_info or not isinstance(action_info, dict): return None # Only show buttons for calendar-applicable events if not action_info.get("can_add_to_calendar"): return None # Extract event details from the briefing summary = action_info.get("event_summary", briefing.get("title", "Event")) start_time = action_info.get("event_start", "") end_time = action_info.get("event_end", "") location = action_info.get("event_location", "") category = action_info.get("category", "event") # Get additional details from the briefing key_details = briefing.get("key_details", {}) if not isinstance(key_details, dict): key_details = {} description = briefing.get("summary", "") if key_details.get("deadline"): description += f"\nDeadline: {key_details['deadline']}" if key_details.get("contact"): description += f"\nContact: {key_details['contact']}" if key_details.get("requirements"): description += f"\nRequirements: {key_details['requirements']}" # Try to parse/construct start/end times # If key_details has individual date/time fields, combine them date_val = key_details.get("date", "") or key_details.get("Date", "") time_val = key_details.get("time", "") or key_details.get("Time", "") if not start_time and date_val: start_time = f"{date_val} {time_val}" if time_val else date_val elif not start_time: # No date/time available — can't add to calendar meaningfully return None # Store the event in the documents DB for later retrieval try: store_result = store_briefing_event( summary=summary, start_time=start_time, end_time=end_time, location=location if isinstance(location, str) else str(location), description=description[:500], # Limit description length category=category, who=[], # Populated from family inference if available source_filename=filename, full_briefing=briefing, ) doc_id = store_result["doc_id"] event_hash = store_result["event_hash"] except Exception as e: print(f"Failed to store briefing event: {e}") return None # Build and return the action buttons return build_action_buttons(doc_id, event_hash, has_datetime=True) # --------------------------------------------------------------------------- # Shadow Bot admin confirmation callbacks (create_event / skip_event) # --------------------------------------------------------------------------- async def _handle_create_event_callback(callback: dict, bot: _BotProxy) -> None: """Handle admin confirmation to create a calendar event from a NO_MATCH extraction. Callback data format: create_event:{extraction_id} """ import json from datetime import datetime cb_data = callback.get("data", "") parts = cb_data.split(":") if len(parts) < 2: await bot.answer_callback_query(callback["id"], "Invalid callback data") return extraction_id = int(parts[1]) chat_id = callback["message"]["chat"]["id"] msg_id = callback["message"]["message_id"] # Fetch extraction from shadow DB from icarus.core.observer.shadow_database import ShadowDatabase db = ShadowDatabase() extraction = db.get_extraction_by_id(extraction_id) if not extraction: await bot.answer_callback_query(callback["id"], f"Extraction {extraction_id} not found") await bot.edit_message_text( chat_id, msg_id, f"❌ Extraction {extraction_id} not found in shadow database." ) return await bot.answer_callback_query(callback["id"], "Creating event...") await bot.edit_message_text( chat_id, msg_id, f"⏳ Creating calendar event for extraction #{extraction_id}..." ) try: # Use CalendarValidator to create the event from icarus.core.observer.calendar_validator import CalendarValidator validator = CalendarValidator() # Parse extraction fields (dict-based access) extraction_type = extraction.get("extraction_type", "") extracted_who_str = extraction.get("extracted_who", "[]") extracted_who = json.loads(extracted_who_str) if isinstance(extracted_who_str, str) else extracted_who_str or [] extracted_what = extraction.get("extracted_what", "") extracted_when = extraction.get("extracted_when", "") result = validator.create_event( summary=extracted_what or "Event", description=f"From Telegram shadow extraction #{extraction_id}\nType: {extraction_type}", extracted_when=extracted_when, ) if result["status"] == "created": event_url = result.get("event_url", "") msg = f"✅ Calendar event created!\n📅 {result.get('event_summary')}\n🕐 {result.get('event_start')}" if event_url: msg += f"\n🔗 {event_url}" # Update shadow DB db.update_calendar_check( extraction_id=extraction_id, status="created_via_admin", event_id=result.get("event_id"), event_title=result.get("event_summary"), event_time=str(result.get("event_start", "")), fuzzy_score=0.0, ) await bot.edit_message_text(chat_id, msg_id, msg) else: await bot.edit_message_text( chat_id, msg_id, f"❌ Failed to create event: {result.get('error', 'Unknown error')}" ) except Exception as e: logging.exception("Failed to create event from shadow extraction: %s", e) await bot.edit_message_text( chat_id, msg_id, f"❌ Error creating event: {e}" ) async def _handle_skip_event_callback(callback: dict, bot: _BotProxy) -> None: """Handle admin skipping event creation from a NO_MATCH extraction. Callback data format: skip_event:{extraction_id} """ cb_data = callback.get("data", "") parts = cb_data.split(":") if len(parts) < 2: await bot.answer_callback_query(callback["id"], "Invalid callback data") return extraction_id = int(parts[1]) chat_id = callback["message"]["chat"]["id"] msg_id = callback["message"]["message_id"] # Update shadow DB from icarus.core.observer.shadow_database import ShadowDatabase db = ShadowDatabase() db.update_calendar_check( extraction_id=extraction_id, status="skipped_by_admin", event_id=None, event_title=None, event_time=None, fuzzy_score=0.0, ) await bot.answer_callback_query(callback["id"], "Event skipped") await bot.edit_message_text( chat_id, msg_id, f"⏭️ Skipped — no calendar event created for extraction #{extraction_id}." )