"""Recipe toggle handler — Telegram inline keyboard for ingredient selection.
Handles callback queries: toggle, commit, cancel.
"""
import logging
import re
from icarus.core.db.grocery_list import (
add_items,
delete_temp_state,
get_temp_state,
update_temp_state,
)
from icarus.core.extractors.recipe import fetch_recipe
---------------------------------------------------------------------------
MarkdownV2 escaping
---------------------------------------------------------------------------
Characters that Telegram MarkdownV2 requires escaped
MDV2_SPECIAL = r'*~`>#+-=|{}.!'
_MDV2_ESCAPE_RE = re.compile(r'([%s])' % re.escape(_MDV2_SPECIAL))
def escape_markdown(text: str) -> str:
"""Escape Telegram MarkdownV2 special characters."""
return _MDV2_ESCAPE_RE.sub(r'\\1', text)
---------------------------------------------------------------------------
Caption builder (new)
---------------------------------------------------------------------------
def build_caption(title: str, domain: str, ingredients: list[dict]) -> str:
"""Build message caption with ingredient state visible.
Shows ✅/❌ for each ingredient with index numbers.
Uses Telegram MarkdownV2 strikethrough (~text~) for deselected items.
All free text is escaped for MarkdownV2 safety.
"""
lines = [
f"🍝 {escape_markdown(title)}",
f"From: {escape_markdown(domain)}",
"",
"Tap to deselect items you don't need:",
"",
]
for ing in ingredients:
prefix = "✅" if ing["selected"] else "❌"
# Strikethrough for deselected — MarkdownV2 syntax ~text~
# Only escape the text portion, not the strikethrough delimiters
escaped_text = escape_markdown(ing["text"])
if not ing["selected"]:
text = f"~{escaped_text}~"
else:
text = escaped_text
# Escape the full label line for MarkdownV2
label = escape_markdown(f"#{ing['index'] + 1}") + " " + escape_markdown("-") + " " + text
lines.append(f"{prefix} {label}")
return "\n".join(lines)
---------------------------------------------------------------------------
Keyboard builder
---------------------------------------------------------------------------
def build_toggle_keyboard(ingredients: list[dict], selected_count: int) -> dict:
"""Build compact 4-per-row inline keyboard.
Args:
ingredients: List of {index, text, selected, recipe_id}
selected_count: Number of selected items
Returns:
Telegram ReplyMarkupKeyboard dict.
"""
keyboard = []
# Ingredient buttons — compact 4-per-row grid
row = []
for ing in ingredients:
emoji = "✅" if ing["selected"] else "❌"
label = f"{emoji} #{ing['index'] + 1}"
row.append({
"text": label,
"callback_data": f"toggle:{ing['recipe_id']}:{ing['index']}"
})
if len(row) == 4:
keyboard.append(row)
row = []
if row:
keyboard.append(row)
# Commit + Cancel row
commit_text = f"✅ Commit ({selected_count})"
if selected_count == 0:
commit_text = "⬜ Select at least 1"
keyboard.append([
{
"text": commit_text,
"callback_data": f"commit:{ingredients[0]['recipe_id']}" if ingredients else "noop"
},
{
"text": "❌ Cancel",
"callback_data": f"cancel:{ingredients[0]['recipe_id']}" if ingredients else "noop"
},
])
return {"inline_keyboard": keyboard}
---------------------------------------------------------------------------
Callback handlers
---------------------------------------------------------------------------
async def handle_toggle(callback_query: dict, bot) -> None:
"""Toggle ingredient selection, update caption + keyboard."""
data = callback_query.get("data", "")
parts = data.split(":")
if len(parts) != 3:
logging.warning("Invalid toggle callback data: %s", data)
await bot.answer_callback_query(callback_query["id"], text="Invalid request")
return
_, recipe_id, idx_str = parts
try:
ingredient_idx = int(idx_str)
except ValueError:
await bot.answer_callback_query(callback_query["id"], text="Invalid index")
return
# Get temp state
state = get_temp_state(recipe_id)
if not state:
await bot.answer_callback_query(
callback_query["id"],
text="Session expired. Send the recipe again."
)
return
# Toggle the ingredient
ingredients = state["ingredients"]
if ingredient_idx < 0 or ingredient_idx >= len(ingredients):
await bot.answer_callback_query(callback_query["id"], text="Invalid ingredient")
return
ingredients[ingredient_idx]["selected"] = not ingredients[ingredient_idx]["selected"]
selected_count = sum(1 for i in ingredients if i["selected"])
# Update state
update_temp_state(recipe_id, ingredients)
# Build new caption + keyboard
domain = state.get("source_url", "").split("/")[2].replace("www.", "") if state.get("source_url") else "recipe"
caption = build_caption(state["title"], domain, ingredients)
keyboard = build_toggle_keyboard(ingredients, selected_count)
# Edit message text + keyboard
message = callback_query.get("message", {})
try:
await bot.edit_message_text(
chat_id=message["chat"]["id"],
message_id=message["message_id"],
text=caption,
reply_markup=keyboard,
)
except Exception as e:
logging.warning("Failed to edit message: %s", e)
# Ack callback
await bot.answer_callback_query(callback_query["id"])
async def handle_commit(callback_query: dict, bot) -> None:
"""Commit selected ingredients to grocery list."""
data = callback_query.get("data", "")
parts = data.split(":")
if len(parts) != 2:
await bot.answer_callback_query(callback_query["id"], text="Invalid request")
return
_, recipe_id = parts
# Get temp state
state = get_temp_state(recipe_id)
if not state:
await bot.answer_callback_query(
callback_query["id"],
text="Session expired. Send the recipe again."
)
return
# Filter selected ingredients
selected = [i for i in state["ingredients"] if i["selected"]]
if not selected:
await bot.answer_callback_query(
callback_query["id"],
text="No items selected. Tap ingredients to select them."
)
return
# Insert into grocery_list
user = callback_query.get("from", {})
requested_by = user.get("first_name", user.get("username", "unknown")).lower()
try:
count = add_items(
items=[{"text": i["text"]} for i in selected],
recipe_source=state["title"],
recipe_url=state.get("source_url"),
requested_by=requested_by,
)
except Exception as e:
logging.error("Failed to add items: %s", e)
await bot.answer_callback_query(callback_query["id"], text="Error saving items")
return
# Update message (MarkdownV2‑safe)
msg = rf"✅ Added {count} items to your grocery list\!\n\nView: /groceries"
message = callback_query.get("message", {})
try:
await bot.edit_message_text(
chat_id=message["chat"]["id"],
message_id=message["message_id"],
text=msg,
reply_markup=None
)
except Exception as e:
logging.warning("Failed to edit message: %s", e)
# Clean up temp state
delete_temp_state(recipe_id)
# Ack callback
await bot.answer_callback_query(callback_query["id"])
async def handle_cancel(callback_query: dict, bot) -> None:
"""Cancel operation, clean up."""
data = callback_query.get("data", "")
parts = data.split(":")
if len(parts) != 2:
await bot.answer_callback_query(callback_query["id"], text="Invalid request")
return
_, recipe_id = parts
# Delete temp state
delete_temp_state(recipe_id)
# Update message (MarkdownV2‑safe)
msg = r"❌ Cancelled\. No items added\."
message = callback_query.get("message", {})
try:
await bot.edit_message_text(
chat_id=message["chat"]["id"],
message_id=message["message_id"],
text=msg,
reply_markup=None
)
except Exception as e:
logging.warning("Failed to edit message: %s", e)
await bot.answer_callback_query(callback_query["id"])
---------------------------------------------------------------------------
Recipe URL handler
---------------------------------------------------------------------------
async def handle_recipe_url(message: dict, bot, url: str | None = None) -> None:
"""Extract recipe from URL and present toggle UI."""
logging.info("handle_recipe_url: called with url=%r, message keys=%s", url, list(message.keys()) if message else None)
# Use provided URL, or extract from message text
if url is None:
url = message.get("text", "").strip()
# Quick validation
if not url.startswith(("http://", "https://")):
logging.warning("handle_recipe_url: invalid URL: %r", url)
await bot.send_message(
chat_id=message["chat"]["id"],
text="Please send a valid recipe URL."
)
return
# Fetch recipe
logging.info("handle_recipe_url: fetching recipe from %s", url)
try:
recipe = await fetch_recipe(url)
logging.info("handle_recipe_url: fetch complete, keys=%s", list(recipe.keys()) if isinstance(recipe, dict) else type(recipe))
except Exception as e:
logging.exception("handle_recipe_url: Recipe extraction failed: %s", e)
await bot.send_message(
chat_id=message["chat"]["id"],
text=f"❌ Could not extract recipe. Error: {e}"
)
return
if "error" in recipe:
error_msg = recipe["error"]
diagnostics = recipe.get("diagnostics", {})
logging.warning("handle_recipe_url: recipe returned error: %s, diagnostics: %s", error_msg, diagnostics)
# Build user-friendly error
user_msg = f"❌ {error_msg}"
await bot.send_message(
chat_id=message["chat"]["id"],
text=user_msg
)
return
# Build initial state (all selected)
recipe_id = recipe["recipe_id"]
ingredients = [
{"index": i, "text": ing, "selected": True, "recipe_id": recipe_id}
for i, ing in enumerate(recipe["ingredients"])
]
logging.info("handle_recipe_url: built %d ingredients for recipe_id=%s", len(ingredients), recipe_id)
if not ingredients:
await bot.send_message(
chat_id=message["chat"]["id"],
text="No ingredients found in this recipe."
)
return
# Store temp state
from icarus.core.db.grocery_list import save_temp_state
save_temp_state(
recipe_id=recipe_id,
user_id=message["from"]["id"],
chat_id=message["chat"]["id"],
message_id=0, # Will be set after sending
title=recipe["title"],
source_url=url,
ingredients=ingredients,
)
# Build caption + keyboard
domain = url.split("/")[2].replace("www.", "")
caption = build_caption(recipe["title"], domain, ingredients)
keyboard = build_toggle_keyboard(ingredients, len(ingredients))
logging.info("handle_recipe_url: sending message to chat_id=%s", message["chat"]["id"])
# Send message
sent = await bot.send_message(
chat_id=message["chat"]["id"],
text=caption,
reply_markup=keyboard
)
logging.info("handle_recipe_url: send_message response=%s", {k: sent.get(k) if isinstance(sent, dict) else type(sent) for k in (['ok', 'error', 'message_id'] if isinstance(sent, dict) else [])})
# Update message_id in temp state
# Telegram API returns {"ok": True, "result": {"message_id": N, ...}}
msg_id = None
if sent and isinstance(sent, dict):
result = sent.get("result", sent) # Handle both nested and flat
if isinstance(result, dict):
msg_id = result.get("message_id")
if msg_id:
save_temp_state(
recipe_id=recipe_id,
user_id=message["from"]["id"],
chat_id=message["chat"]["id"],
message_id=msg_id,
title=recipe["title"],
source_url=url,
ingredients=ingredients,
)
else:
logging.warning("handle_recipe_url: no message_id in response, response=%s", str(sent)[:200])
---------------------------------------------------------------------------
Grocery list command
---------------------------------------------------------------------------
async def handle_groceries_command(message: dict, bot) -> None:
"""Handle /groceries command — show current list."""
from icarus.core.db.grocery_list import list_items
items = list_items()
if not items:
await bot.send_message(
chat_id=message["chat"]["id"],
text="Your grocery list is empty.\n\nSend a recipe URL to add items!"
)
return
# Group by recipe
by_recipe = {}
for item in items:
recipe = item["recipe_source"] or "Other"
if recipe not in by_recipe:
by_recipe[recipe] = []
by_recipe[recipe].append(item)
lines = ["📋 Grocery List"]
for recipe, recipe_items in by_recipe.items():
lines.append(f"\n🍳 {recipe}")
for idx, item in enumerate(recipe_items, 1):
status_emoji = {"pending": "⬜", "in_cart": "🛒", "purchased": "✅", "skipped": "❌"}
emoji = status_emoji.get(item["status"], "⬜")
lines.append(f" {emoji} {idx}. {item['item']}")
# Add Clear List inline button
keyboard = {
"inline_keyboard": [[
{"text": "🗑 Clear List", "callback_data": "clear:groceries"}
]]
}
await bot.send_message(
chat_id=message["chat"]["id"],
text="\n".join(lines),
reply_markup=keyboard,
parse_mode="none", # Plain text — no MarkdownV2
)
async def handle_clear_groceries(callback_or_message: dict, bot) -> None:
"""Handle /groceries clear command or clear:groceries callback.
If called from a callback query, edits the original message.
If called from a text command, sends a new message.
"""
from icarus.core.db.grocery_list import clear_list, list_items
# Determine who requested the clear and the chat context
is_callback = "callback_query" in callback_or_message or "data" in callback_or_message
if is_callback:
# callback_or_message IS the callback query
callback = callback_or_message
user = callback.get("from", {})
requested_by = user.get("first_name", user.get("username", "unknown")).lower()
chat_id = callback["message"]["chat"]["id"]
message_id = callback["message"]["message_id"]
else:
# callback_or_message is a message dict
user = callback_or_message.get("from", {})
requested_by = user.get("first_name", user.get("username", "unknown")).lower()
chat_id = callback_or_message["chat"]["id"]
message_id = None
count = clear_list(requested_by=requested_by)
confirmation = f"🗑 Cleared {count} item{'s' if count != 1 else ''} from your grocery list."
if is_callback:
# Edit the original grocery list message
try:
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=confirmation,
reply_markup=None,
)
except Exception as e:
# Fallback: send new message if edit fails
logging.warning("Failed to edit message after clear: %s", e)
await bot.send_message(chat_id=chat_id, text=confirmation)
await bot.answer_callback_query(callback["id"], text=f"Cleared {count} items")
else:
await bot.send_message(chat_id=chat_id, text=confirmation)