"""Telegram bot for Costco Route Optimizer. Run: python -m costco_route.bot Commands: /costco — Generate a route-optimized shopping list /learn — Teach the bot where an item lives /zones — Show the zone map /stats — Show learned item count /recipe — Extract recipe from URL /recipes — Browse saved recipes /help — Show help """ import json import logging import os import re import sys from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import ( Application, CommandHandler, CallbackQueryHandler, ContextTypes, MessageHandler, filters, ) from costco_route.config import ZONES, ZONE_ORDER from costco_route.pipeline import optimize, learn_correction from costco_route.item_memory import stats as memory_stats from costco_route.recipe_extractor import fetch_recipe, list_recipes, get_recipe, format_recipe from costco_route.shopping_list import ( add_items, add_recipe_ingredients, remove_item, check_item, clear_list, get_list, format_list, ) logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO, ) logger = logging.getLogger(__name__) # Bot token from env BOT_TOKEN = os.environ.get("COSTCO_BOT_TOKEN", "") # Allowlisted user IDs (comma-separated in env) ALLOWED_USERS_STR = os.environ.get("COSTCO_ALLOWED_USERS", "") ALLOWED_USERS = set() if ALLOWED_USERS_STR: ALLOWED_USERS = {int(uid.strip()) for uid in ALLOWED_USERS_STR.split(",") if uid.strip()} def _check_auth(user_id: int) -> bool: """Check if user is allowed to use the bot.""" if not ALLOWED_USERS: return True # No allowlist = open access return user_id in ALLOWED_USERS # --------------------------------------------------------------------------- # /start and /help # --------------------------------------------------------------------------- async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE): if not _check_auth(update.effective_user.id): return await update.message.reply_text( "🛒 Costco Route Optimizer\n\n" "Send me your grocery list and I'll sort it into a Costco route plan.\n\n" "Just type your items naturally:\n" '\"milk eggs chicken paper towels dog food\"\n\n' "Or paste a recipe URL and I'll extract ingredients + route:\n" '\"https://...recipe...\"\n\n' "Commands:\n" "/costco — Generate route\n" "/recipe — Extract recipe from URL\n" "/recipes — Browse saved recipes\n" "/add — Add to shopping list\n" "/list — Show shopping list\n" "/learn — Teach a location\n" "/zones — Show zone map\n" "/stats — Learned items count\n" "/help — This message" ) async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE): if not _check_auth(update.effective_user.id): return await cmd_start(update, context) # --------------------------------------------------------------------------- # /zones — show zone map # --------------------------------------------------------------------------- async def cmd_zones(update: Update, context: ContextTypes.DEFAULT_TYPE): if not _check_auth(update.effective_user.id): return lines = ["📍 Costco Zone Map\n"] for zid in ZONE_ORDER: zinfo = ZONES[zid] lines.append(f" {zid} — {zinfo['name']}") lines.append("\nCounter-clockwise: entrance → exit") await update.message.reply_text("\n".join(lines)) # --------------------------------------------------------------------------- # /stats — show learned item count # --------------------------------------------------------------------------- async def cmd_stats(update: Update, context: ContextTypes.DEFAULT_TYPE): if not _check_auth(update.effective_user.id): return s = memory_stats() await update.message.reply_text( f"🧠 Learned Items: {s['total_items']}\n" f"Path: {s['path']}" ) # --------------------------------------------------------------------------- # /learn — teach the bot a location # --------------------------------------------------------------------------- async def cmd_learn(update: Update, context: ContextTypes.DEFAULT_TYPE): if not _check_auth(update.effective_user.id): return if not context.args or len(context.args) < 2: await update.message.reply_text( "Usage: /learn \n" 'Example: /learn "almond milk" 07\n\n' "Zones: /zones" ) return zone_id = context.args[-1] # Item is everything before the zone ID item = " ".join(context.args[:-1]).strip('"').strip("'") if zone_id not in ZONES: await update.message.reply_text( f"❌ Invalid zone: {zone_id}. Must be 01-10.\n/zones" ) return result = learn_correction(item, zone_id) if "error" in result: await update.message.reply_text(f"❌ {result['error']}") else: zname = ZONES[zone_id]["name"] await update.message.reply_text( f"✅ Learned: {item} → Zone {zone_id} ({zname})" ) # --------------------------------------------------------------------------- # /recipe — extract recipe from URL # --------------------------------------------------------------------------- async def cmd_recipe(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /recipe command.""" if not _check_auth(update.effective_user.id): return if not context.args: await update.message.reply_text( "Usage: /recipe \n\n" "Paste a recipe URL and I'll extract ingredients + instructions.\n" "The ingredients will be automatically sorted into Costco zones.\n\n" "Example: /recipe https://www.allrecipes.com/recipe/158968/..." ) return url = context.args[0] await _process_recipe(update, url) async def _process_recipe(update: Update, url: str): """Fetch and process a recipe URL.""" await update.message.chat.send_action("typing") try: recipe = fetch_recipe(url) except Exception as e: logger.exception("Recipe fetch failed") await update.message.reply_text(f"❌ Failed to fetch recipe: {e}") return if "error" in recipe: await update.message.reply_text(f"❌ {recipe['error']}") return # Format for Telegram formatted = format_recipe(recipe, include_zones=True) # Split long messages if len(formatted) > 4000: # Send recipe without zones first, then zones separately formatted = format_recipe(recipe, include_zones=False) await update.message.reply_text(formatted, parse_mode="Markdown") if recipe.get("zone_map"): await update.message.reply_text( f"**Costco Route:**\n{recipe['zone_map']}", parse_mode="Markdown" ) else: await update.message.reply_text(formatted, parse_mode="Markdown") # Add "Add to list" button keyboard = InlineKeyboardMarkup([ [InlineKeyboardButton("🛒 Add ingredients to list", callback_data=f"add_recipe|{recipe['recipe_id']}")] ]) await update.message.reply_text( f"💾 Saved to Rolodex as: `{recipe['recipe_id']}`\n" f"Type `/recipes` to browse all saved recipes.", parse_mode="Markdown", reply_markup=keyboard, ) # --------------------------------------------------------------------------- # /recipes — browse saved recipes # --------------------------------------------------------------------------- async def cmd_recipes(update: Update, context: ContextTypes.DEFAULT_TYPE): """Show list of saved recipes.""" if not _check_auth(update.effective_user.id): return recipes = list_recipes() if not recipes: await update.message.reply_text( "🍳 Recipe Rolodex is empty.\n\n" "Send me a recipe URL to save your first recipe:\n" "/recipe " ) return lines = [f"🍳 Recipe Rolodex ({len(recipes)} recipes):\n"] for r in recipes: tags = " | ".join(r.get("tags", [])) servings = f" · Serves {r['servings']}" if r.get("servings") else "" ingredients = f" · {r['ingredient_count']} ingredients" if r.get("ingredient_count") else "" lines.append(f"• `{r['id']}` — {r['title']}{servings}{ingredients}") if tags: lines.append(f" 🏷 {tags}") lines.append("\nTo view a recipe: /recipe_show ") await update.message.reply_text("\n".join(lines), parse_mode="Markdown") # --------------------------------------------------------------------------- # /list — show shopping list # --------------------------------------------------------------------------- async def cmd_list(update: Update, context: ContextTypes.DEFAULT_TYPE): """Show the current shopping list.""" if not _check_auth(update.effective_user.id): return formatted = format_list(include_checked=False) # Telegram markdown doesn't support strikethrough well, skip checked items await update.message.reply_text(formatted, parse_mode="Markdown") # --------------------------------------------------------------------------- # /add — add items to shopping list # --------------------------------------------------------------------------- async def cmd_add(update: Update, context: ContextTypes.DEFAULT_TYPE): """Add items or recipe ingredients to the shopping list.""" if not _check_auth(update.effective_user.id): return if not context.args: await update.message.reply_text( "Usage: /add or /add \n\n" "Examples:\n" '/add milk eggs chicken\n' '/add bolognese-recipe\n\n' "Use /recipes to browse recipe IDs." ) return raw_input = " ".join(context.args) # Check if it's a recipe ID (no spaces, matches a saved recipe) recipe = get_recipe(raw_input) if " " not in raw_input else None if recipe: await update.message.chat.send_action("typing") result = add_recipe_ingredients(raw_input) if "error" in result: await update.message.reply_text(f"❌ {result['error']}") else: title = result.get("recipe_title", raw_input) await update.message.reply_text( f"✅ Added {result['added']} ingredients from *{title}* to the list\n" f"Zones: {', '.join(result['zones'])}\n\n" "/list to view", parse_mode="Markdown" ) else: # Treat as raw items result = add_items(raw_input) if result['added'] == 0: await update.message.reply_text("All items already on the list!") else: await update.message.reply_text( f"✅ Added {result['added']} items to the list\n" f"Zones: {', '.join(result['zones'])}\n\n" "/list to view" ) # --------------------------------------------------------------------------- # /check — check off an item # --------------------------------------------------------------------------- async def cmd_check(update: Update, context: ContextTypes.DEFAULT_TYPE): """Check off an item on the shopping list.""" if not _check_auth(update.effective_user.id): return if not context.args: await update.message.reply_text("Usage: /check ") return item_name = " ".join(context.args) result = check_item(item_name) if "error" in result: await update.message.reply_text(f"❌ {result['error']}") else: await update.message.reply_text(f"✅ Checked off: {result['checked']}") # --------------------------------------------------------------------------- # /clearlist — clear the shopping list # --------------------------------------------------------------------------- async def cmd_clearlist(update: Update, context: ContextTypes.DEFAULT_TYPE): """Clear the shopping list.""" if not _check_auth(update.effective_user.id): return result = clear_list(keep_unchecked=False) await update.message.reply_text( f"🗑 Cleared {result['removed']} items from the list." ) # --------------------------------------------------------------------------- # /costco — the main event # --------------------------------------------------------------------------- async def cmd_costco(update: Update, context: ContextTypes.DEFAULT_TYPE): if not _check_auth(update.effective_user.id): return if not context.args: await update.message.reply_text( "Send your grocery list after /costco\n\n" 'Example: /costco milk eggs chicken paper towels\n\n' 'Or just type your list directly (no command needed)!' ) return raw_input = " ".join(context.args) await _process_list(update, raw_input) # --------------------------------------------------------------------------- # Free-text handler — any non-command message is treated as a grocery list # --------------------------------------------------------------------------- async def handle_free_text(update: Update, context: ContextTypes.DEFAULT_TYPE): if not _check_auth(update.effective_user.id): return text = update.message.text.strip() if len(text) < 3: return # Ignore if it looks like a question if text.lower().startswith(("what", "how", "why", "when", "where", "who")): return # Detect URLs — if the message is or contains a URL, treat as recipe url_match = re.search(r'https?://\S+', text) if url_match: url = url_match.group(0).rstrip('.,;:!?)') await _process_recipe(update, url) return # Otherwise treat as grocery list await _process_list(update, text) # --------------------------------------------------------------------------- # Core list processing (shared by /costco and free-text) # --------------------------------------------------------------------------- async def _process_list(update: Update, raw_input: str): """Run the pipeline and send the result with inline calibration buttons.""" # Send "thinking" indicator await update.message.chat.send_action("typing") try: result = optimize(raw_input, use_memory=True, markdown=False) except Exception as e: await update.message.reply_text(f"❌ Pipeline error: {str(e)[:200]}") return output = result["output"] # Add learned override notes if result["learned_overrides"]: notes = [] for item, info in result["learned_overrides"].items(): note = f" • {item} → Zone {info['zone']}" if info.get("notes"): note += f" ({info['notes']})" notes.append(note) output += f"\n\n🧠 {len(result['learned_overrides'])} from memory:\n" + "\n".join(notes) # Build inline keyboard for calibration buttons = _build_calibration_buttons(result["route"]) if buttons: await update.message.reply_text( output, reply_markup=InlineKeyboardMarkup(buttons), ) else: await update.message.reply_text(output) def _build_calibration_buttons(route: list[dict]) -> list[list[InlineKeyboardButton]]: """Build inline buttons for correcting item zone assignments. Each zone row gets a button that shows zone options for reassignment. """ buttons = [] for zone in route: zid = zone["zone_id"] zname = zone["zone_name"] items_str = ", ".join(zone["items"][:3]) if len(zone["items"]) > 3: items_str += f" +{len(zone['items'])-3} more" # Button to reassign items in this zone buttons.append([ InlineKeyboardButton( f"🔄 Zone {zid}: {items_str}", callback_data=f"reassign|{zid}", ) ]) return buttons # --------------------------------------------------------------------------- # Callback handler for inline buttons # --------------------------------------------------------------------------- async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle inline button presses for calibration.""" query = update.callback_query await query.answer() if not _check_auth(query.from_user.id): await query.answer("Not authorized", show_alert=True) return data = query.data if data.startswith("reassign|"): zone_id = data.split("|")[1] # Show zone picker for reassignment buttons = [] row = [] for zid in ZONE_ORDER: if zid == zone_id: continue # Skip the zone items are already in zname = ZONES[zid]["name"] row.append( InlineKeyboardButton( f"{zid}: {zname}", callback_data=f"move|{zone_id}|{zid}", ) ) if len(row) == 2: buttons.append(row) row = [] if row: buttons.append(row) buttons.append([ InlineKeyboardButton("⬅️ Back", callback_data="back"), InlineKeyboardButton("❌ Done", callback_data="done"), ]) zname = ZONES.get(zone_id, {}).get("name", zone_id) # Store the original route message ID and zone buttons in user_data # so we can restore them on "Back" context.user_data["last_route_msg_id"] = query.message.message_id context.user_data["last_route_buttons"] = query.message.reply_markup await query.edit_message_reply_markup( reply_markup=InlineKeyboardMarkup(buttons), ) await query.answer(f"Zone {zone_id} ({zname}) — pick destination") elif data.startswith("move|"): _, from_zone, to_zone = data.split("|") original_text = query.message.text or query.message.caption or "" items_in_zone = _extract_items_from_zone_text(original_text, from_zone) if not items_in_zone: await query.answer("Couldn't find items. Try /learn instead.", show_alert=True) return zname_from = ZONES.get(from_zone, {}).get("name", from_zone) zname_to = ZONES.get(to_zone, {}).get("name", to_zone) moved = 0 for item in items_in_zone: result = learn_correction(item, to_zone) if "error" not in result: moved += 1 # Remove the inline keyboard (auto-close the picker) await query.edit_message_reply_markup(reply_markup=None) await query.message.reply_text( f"✅ Moved {moved} item(s): Zone {from_zone} ({zname_from}) → Zone {to_zone} ({zname_to})\n\n" "Learned! Next run will use the correct zone." ) elif data == "back": # Restore the original zone list buttons last_buttons = context.user_data.get("last_route_buttons") if last_buttons: await query.edit_message_reply_markup(reply_markup=last_buttons) await query.answer("Back to zone list") else: # No saved buttons — just close the keyboard await query.edit_message_reply_markup(reply_markup=None) elif data == "done": # Close the inline keyboard await query.edit_message_reply_markup(reply_markup=None) await query.answer("Done!") elif data.startswith("add_recipe|"): # Add recipe ingredients to shopping list recipe_id = data.split("|", 1)[1] result = add_recipe_ingredients(recipe_id) if "error" in result: await query.answer(f"❌ {result['error']}", show_alert=True) else: title = result.get("recipe_title", recipe_id) await query.answer(f"Added {result['added']} ingredients from {title}", show_alert=False) # Remove the button (replace with confirmation) await query.edit_message_reply_markup( reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("✅ Added to list", callback_data="noop")]]) ) # Send list preview await query.message.reply_text( f"✅ Added {result['added']} ingredients from *{title}*\n" f"Zones: {', '.join(result['zones'])}\n\n" "/list to view full list", parse_mode="Markdown" ) elif data == "noop": await query.answer() def _extract_items_from_zone_text(text: str, zone_id: str) -> list[str]: """Extract item names from a formatted route message for a given zone.""" items = [] in_zone = False for line in text.split("\n"): if f"Zone {zone_id}" in line: in_zone = True continue if line.startswith("📦 Zone") and in_zone: break # Next zone starts if in_zone and line.strip().startswith("☐"): item = line.strip().lstrip("☐").strip() if item: items.append(item) return items # --------------------------------------------------------------------------- # Application startup # --------------------------------------------------------------------------- def main(): if not BOT_TOKEN: print("❌ COSTCO_BOT_TOKEN env var not set", file=sys.stderr) print("Get a token from @BotFather, then:", file=sys.stderr) print(" export COSTCO_BOT_TOKEN=your-token-here", file=sys.stderr) sys.exit(1) app = Application.builder().token(BOT_TOKEN).build() # Command handlers app.add_handler(CommandHandler("start", cmd_start)) app.add_handler(CommandHandler("help", cmd_help)) app.add_handler(CommandHandler("costco", cmd_costco)) app.add_handler(CommandHandler("learn", cmd_learn)) app.add_handler(CommandHandler("zones", cmd_zones)) app.add_handler(CommandHandler("stats", cmd_stats)) app.add_handler(CommandHandler("recipe", cmd_recipe)) app.add_handler(CommandHandler("recipes", cmd_recipes)) app.add_handler(CommandHandler("list", cmd_list)) app.add_handler(CommandHandler("add", cmd_add)) app.add_handler(CommandHandler("check", cmd_check)) app.add_handler(CommandHandler("clearlist", cmd_clearlist)) # Callback handler for inline buttons app.add_handler(CallbackQueryHandler(handle_callback)) # Free-text handler — any non-command message is a grocery list or URL app.add_handler( MessageHandler(filters.TEXT & ~filters.COMMAND, handle_free_text) ) # Run logger.info("🛒 Costco Route Bot starting...") app.run_polling(drop_pending_updates=True) if __name__ == "__main__": main()