๐Ÿ“„ bot.py 23,195 bytes Apr 19, 2026 ๐Ÿ“‹ Raw

"""Telegram bot for Costco Route Optimizer.

Run: python -m costco_route.bot

Commands:
/costco โ€” Generate a route-optimized shopping list
/learn โ€” Teach the bot where an item lives
/zones โ€” Show the zone map
/stats โ€” Show learned item count
/recipe โ€” Extract recipe from URL
/recipes โ€” Browse saved recipes
/help โ€” Show help
"""

import json
import logging
import os
import re
import sys

from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
CallbackQueryHandler,
ContextTypes,
MessageHandler,
filters,
)

from costco_route.config import ZONES, ZONE_ORDER
from costco_route.pipeline import optimize, learn_correction
from costco_route.item_memory import stats as memory_stats
from costco_route.recipe_extractor import fetch_recipe, list_recipes, get_recipe, format_recipe
from costco_route.shopping_list import (
add_items, add_recipe_ingredients, remove_item, check_item,
clear_list, get_list, format_list,
)

logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(name)

Bot token from env

BOT_TOKEN = os.environ.get("COSTCO_BOT_TOKEN", "")

Allowlisted user IDs (comma-separated in env)

ALLOWED_USERS_STR = os.environ.get("COSTCO_ALLOWED_USERS", "")
ALLOWED_USERS = set()
if ALLOWED_USERS_STR:
ALLOWED_USERS = {int(uid.strip()) for uid in ALLOWED_USERS_STR.split(",") if uid.strip()}

def _check_auth(user_id: int) -> bool:
"""Check if user is allowed to use the bot."""
if not ALLOWED_USERS:
return True # No allowlist = open access
return user_id in ALLOWED_USERS

---------------------------------------------------------------------------

/start and /help

---------------------------------------------------------------------------

async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not _check_auth(update.effective_user.id):
return
await update.message.reply_text(
"๐Ÿ›’ Costco Route Optimizer\n\n"
"Send me your grocery list and I'll sort it into a Costco route plan.\n\n"
"Just type your items naturally:\n"
'\"milk eggs chicken paper towels dog food\"\n\n'
"Or paste a recipe URL and I'll extract ingredients + route:\n"
'\"https://...recipe...\"\n\n'
"Commands:\n"
"/costco โ€” Generate route\n"
"/recipe โ€” Extract recipe from URL\n"
"/recipes โ€” Browse saved recipes\n"
"/add โ€” Add to shopping list\n"
"/list โ€” Show shopping list\n"
"/learn โ€” Teach a location\n"
"/zones โ€” Show zone map\n"
"/stats โ€” Learned items count\n"
"/help โ€” This message"
)

async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not _check_auth(update.effective_user.id):
return
await cmd_start(update, context)

---------------------------------------------------------------------------

/zones โ€” show zone map

---------------------------------------------------------------------------

async def cmd_zones(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not _check_auth(update.effective_user.id):
return
lines = ["๐Ÿ“ Costco Zone Map\n"]
for zid in ZONE_ORDER:
zinfo = ZONES[zid]
lines.append(f" {zid} โ€” {zinfo['name']}")
lines.append("\nCounter-clockwise: entrance โ†’ exit")
await update.message.reply_text("\n".join(lines))

---------------------------------------------------------------------------

/stats โ€” show learned item count

---------------------------------------------------------------------------

async def cmd_stats(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not _check_auth(update.effective_user.id):
return
s = memory_stats()
await update.message.reply_text(
f"๐Ÿง  Learned Items: {s['total_items']}\n"
f"Path: {s['path']}"
)

---------------------------------------------------------------------------

/learn โ€” teach the bot a location

---------------------------------------------------------------------------

async def cmd_learn(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not _check_auth(update.effective_user.id):
return

if not context.args or len(context.args) < 2:
    await update.message.reply_text(
        "Usage: /learn <item> <zone>\n"
        'Example: /learn "almond milk" 07\n\n'
        "Zones: /zones"
    )
    return

zone_id = context.args[-1]
# Item is everything before the zone ID
item = " ".join(context.args[:-1]).strip('"').strip("'")

if zone_id not in ZONES:
    await update.message.reply_text(
        f"โŒ Invalid zone: {zone_id}. Must be 01-10.\n/zones"
    )
    return

result = learn_correction(item, zone_id)
if "error" in result:
    await update.message.reply_text(f"โŒ {result['error']}")
else:
    zname = ZONES[zone_id]["name"]
    await update.message.reply_text(
        f"โœ… Learned: {item} โ†’ Zone {zone_id} ({zname})"
    )

---------------------------------------------------------------------------

/recipe โ€” extract recipe from URL

---------------------------------------------------------------------------

async def cmd_recipe(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /recipe command."""
if not _check_auth(update.effective_user.id):
return

if not context.args:
    await update.message.reply_text(
        "Usage: /recipe <url>\n\n"
        "Paste a recipe URL and I'll extract ingredients + instructions.\n"
        "The ingredients will be automatically sorted into Costco zones.\n\n"
        "Example: /recipe https://www.allrecipes.com/recipe/158968/..."
    )
    return

url = context.args[0]
await _process_recipe(update, url)

async def _process_recipe(update: Update, url: str):
"""Fetch and process a recipe URL."""
await update.message.chat.send_action("typing")

try:
    recipe = fetch_recipe(url)
except Exception as e:
    logger.exception("Recipe fetch failed")
    await update.message.reply_text(f"โŒ Failed to fetch recipe: {e}")
    return

if "error" in recipe:
    await update.message.reply_text(f"โŒ {recipe['error']}")
    return

# Format for Telegram
formatted = format_recipe(recipe, include_zones=True)

# Split long messages
if len(formatted) > 4000:
    # Send recipe without zones first, then zones separately
    formatted = format_recipe(recipe, include_zones=False)
    await update.message.reply_text(formatted, parse_mode="Markdown")

    if recipe.get("zone_map"):
        await update.message.reply_text(
            f"**Costco Route:**\n{recipe['zone_map']}",
            parse_mode="Markdown"
        )
else:
    await update.message.reply_text(formatted, parse_mode="Markdown")

# Add "Add to list" button
keyboard = InlineKeyboardMarkup([
    [InlineKeyboardButton("๐Ÿ›’ Add ingredients to list", callback_data=f"add_recipe|{recipe['recipe_id']}")]
])

await update.message.reply_text(
    f"๐Ÿ’พ Saved to Rolodex as: `{recipe['recipe_id']}`\n"
    f"Type `/recipes` to browse all saved recipes.",
    parse_mode="Markdown",
    reply_markup=keyboard,
)

---------------------------------------------------------------------------

/recipes โ€” browse saved recipes

---------------------------------------------------------------------------

async def cmd_recipes(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Show list of saved recipes."""
if not _check_auth(update.effective_user.id):
return

recipes = list_recipes()
if not recipes:
    await update.message.reply_text(
        "๐Ÿณ Recipe Rolodex is empty.\n\n"
        "Send me a recipe URL to save your first recipe:\n"
        "/recipe <url>"
    )
    return

lines = [f"๐Ÿณ Recipe Rolodex ({len(recipes)} recipes):\n"]

for r in recipes:
    tags = " | ".join(r.get("tags", []))
    servings = f" ยท Serves {r['servings']}" if r.get("servings") else ""
    ingredients = f" ยท {r['ingredient_count']} ingredients" if r.get("ingredient_count") else ""
    lines.append(f"โ€ข `{r['id']}` โ€” {r['title']}{servings}{ingredients}")
    if tags:
        lines.append(f"  ๐Ÿท {tags}")

lines.append("\nTo view a recipe: /recipe_show <id>")

await update.message.reply_text("\n".join(lines), parse_mode="Markdown")

---------------------------------------------------------------------------

/list โ€” show shopping list

---------------------------------------------------------------------------

async def cmd_list(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Show the current shopping list."""
if not _check_auth(update.effective_user.id):
return

formatted = format_list(include_checked=False)
# Telegram markdown doesn't support strikethrough well, skip checked items
await update.message.reply_text(formatted, parse_mode="Markdown")

---------------------------------------------------------------------------

/add โ€” add items to shopping list

---------------------------------------------------------------------------

async def cmd_add(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Add items or recipe ingredients to the shopping list."""
if not _check_auth(update.effective_user.id):
return

if not context.args:
    await update.message.reply_text(
        "Usage: /add <items> or /add <recipe_id>\n\n"
        "Examples:\n"
        '/add milk eggs chicken\n'
        '/add bolognese-recipe\n\n'
        "Use /recipes to browse recipe IDs."
    )
    return

raw_input = " ".join(context.args)

# Check if it's a recipe ID (no spaces, matches a saved recipe)
recipe = get_recipe(raw_input) if " " not in raw_input else None
if recipe:
    await update.message.chat.send_action("typing")
    result = add_recipe_ingredients(raw_input)
    if "error" in result:
        await update.message.reply_text(f"โŒ {result['error']}")
    else:
        title = result.get("recipe_title", raw_input)
        await update.message.reply_text(
            f"โœ… Added {result['added']} ingredients from *{title}* to the list\n"
            f"Zones: {', '.join(result['zones'])}\n\n"
            "/list to view",
            parse_mode="Markdown"
        )
else:
    # Treat as raw items
    result = add_items(raw_input)
    if result['added'] == 0:
        await update.message.reply_text("All items already on the list!")
    else:
        await update.message.reply_text(
            f"โœ… Added {result['added']} items to the list\n"
            f"Zones: {', '.join(result['zones'])}\n\n"
            "/list to view"
        )

---------------------------------------------------------------------------

/check โ€” check off an item

---------------------------------------------------------------------------

async def cmd_check(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Check off an item on the shopping list."""
if not _check_auth(update.effective_user.id):
return

if not context.args:
    await update.message.reply_text("Usage: /check <item name>")
    return

item_name = " ".join(context.args)
result = check_item(item_name)
if "error" in result:
    await update.message.reply_text(f"โŒ {result['error']}")
else:
    await update.message.reply_text(f"โœ… Checked off: {result['checked']}")

---------------------------------------------------------------------------

/clearlist โ€” clear the shopping list

---------------------------------------------------------------------------

async def cmd_clearlist(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Clear the shopping list."""
if not _check_auth(update.effective_user.id):
return

result = clear_list(keep_unchecked=False)
await update.message.reply_text(
    f"๐Ÿ—‘ Cleared {result['removed']} items from the list."
)

---------------------------------------------------------------------------

/costco โ€” the main event

---------------------------------------------------------------------------

async def cmd_costco(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not _check_auth(update.effective_user.id):
return

if not context.args:
    await update.message.reply_text(
        "Send your grocery list after /costco\n\n"
        'Example: /costco milk eggs chicken paper towels\n\n'
        'Or just type your list directly (no command needed)!'
    )
    return

raw_input = " ".join(context.args)
await _process_list(update, raw_input)

---------------------------------------------------------------------------

Free-text handler โ€” any non-command message is treated as a grocery list

---------------------------------------------------------------------------

async def handle_free_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not _check_auth(update.effective_user.id):
return

text = update.message.text.strip()
if len(text) < 3:
    return

# Ignore if it looks like a question
if text.lower().startswith(("what", "how", "why", "when", "where", "who")):
    return

# Detect URLs โ€” if the message is or contains a URL, treat as recipe
url_match = re.search(r'https?://\S+', text)
if url_match:
    url = url_match.group(0).rstrip('.,;:!?)')
    await _process_recipe(update, url)
    return

# Otherwise treat as grocery list
await _process_list(update, text)

---------------------------------------------------------------------------

Core list processing (shared by /costco and free-text)

---------------------------------------------------------------------------

async def _process_list(update: Update, raw_input: str):
"""Run the pipeline and send the result with inline calibration buttons."""
# Send "thinking" indicator
await update.message.chat.send_action("typing")

try:
    result = optimize(raw_input, use_memory=True, markdown=False)
except Exception as e:
    await update.message.reply_text(f"โŒ Pipeline error: {str(e)[:200]}")
    return

output = result["output"]

# Add learned override notes
if result["learned_overrides"]:
    notes = []
    for item, info in result["learned_overrides"].items():
        note = f"  โ€ข {item} โ†’ Zone {info['zone']}"
        if info.get("notes"):
            note += f" ({info['notes']})"
        notes.append(note)
    output += f"\n\n๐Ÿง  {len(result['learned_overrides'])} from memory:\n" + "\n".join(notes)

# Build inline keyboard for calibration
buttons = _build_calibration_buttons(result["route"])

if buttons:
    await update.message.reply_text(
        output,
        reply_markup=InlineKeyboardMarkup(buttons),
    )
else:
    await update.message.reply_text(output)

def _build_calibration_buttons(route: list[dict]) -> list[list[InlineKeyboardButton]]:
"""Build inline buttons for correcting item zone assignments.

Each zone row gets a button that shows zone options for reassignment.
"""
buttons = []

for zone in route:
    zid = zone["zone_id"]
    zname = zone["zone_name"]
    items_str = ", ".join(zone["items"][:3])
    if len(zone["items"]) > 3:
        items_str += f" +{len(zone['items'])-3} more"

    # Button to reassign items in this zone
    buttons.append([
        InlineKeyboardButton(
            f"๐Ÿ”„ Zone {zid}: {items_str}",
            callback_data=f"reassign|{zid}",
        )
    ])

return buttons

---------------------------------------------------------------------------

Callback handler for inline buttons

---------------------------------------------------------------------------

async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle inline button presses for calibration."""
query = update.callback_query
await query.answer()

if not _check_auth(query.from_user.id):
    await query.answer("Not authorized", show_alert=True)
    return

data = query.data

if data.startswith("reassign|"):
    zone_id = data.split("|")[1]
    # Show zone picker for reassignment
    buttons = []
    row = []
    for zid in ZONE_ORDER:
        if zid == zone_id:
            continue  # Skip the zone items are already in
        zname = ZONES[zid]["name"]
        row.append(
            InlineKeyboardButton(
                f"{zid}: {zname}",
                callback_data=f"move|{zone_id}|{zid}",
            )
        )
        if len(row) == 2:
            buttons.append(row)
            row = []
    if row:
        buttons.append(row)

    buttons.append([
        InlineKeyboardButton("โฌ…๏ธ Back", callback_data="back"),
        InlineKeyboardButton("โŒ Done", callback_data="done"),
    ])

    zname = ZONES.get(zone_id, {}).get("name", zone_id)
    # Store the original route message ID and zone buttons in user_data
    # so we can restore them on "Back"
    context.user_data["last_route_msg_id"] = query.message.message_id
    context.user_data["last_route_buttons"] = query.message.reply_markup

    await query.edit_message_reply_markup(
        reply_markup=InlineKeyboardMarkup(buttons),
    )
    await query.answer(f"Zone {zone_id} ({zname}) โ€” pick destination")

elif data.startswith("move|"):
    _, from_zone, to_zone = data.split("|")
    original_text = query.message.text or query.message.caption or ""
    items_in_zone = _extract_items_from_zone_text(original_text, from_zone)

    if not items_in_zone:
        await query.answer("Couldn't find items. Try /learn instead.", show_alert=True)
        return

    zname_from = ZONES.get(from_zone, {}).get("name", from_zone)
    zname_to = ZONES.get(to_zone, {}).get("name", to_zone)
    moved = 0
    for item in items_in_zone:
        result = learn_correction(item, to_zone)
        if "error" not in result:
            moved += 1

    # Remove the inline keyboard (auto-close the picker)
    await query.edit_message_reply_markup(reply_markup=None)

    await query.message.reply_text(
        f"โœ… Moved {moved} item(s): Zone {from_zone} ({zname_from}) โ†’ Zone {to_zone} ({zname_to})\n\n"
        "Learned! Next run will use the correct zone."
    )

elif data == "back":
    # Restore the original zone list buttons
    last_buttons = context.user_data.get("last_route_buttons")
    if last_buttons:
        await query.edit_message_reply_markup(reply_markup=last_buttons)
        await query.answer("Back to zone list")
    else:
        # No saved buttons โ€” just close the keyboard
        await query.edit_message_reply_markup(reply_markup=None)

elif data == "done":
    # Close the inline keyboard
    await query.edit_message_reply_markup(reply_markup=None)
    await query.answer("Done!")

elif data.startswith("add_recipe|"):
    # Add recipe ingredients to shopping list
    recipe_id = data.split("|", 1)[1]
    result = add_recipe_ingredients(recipe_id)

    if "error" in result:
        await query.answer(f"โŒ {result['error']}", show_alert=True)
    else:
        title = result.get("recipe_title", recipe_id)
        await query.answer(f"Added {result['added']} ingredients from {title}", show_alert=False)
        # Remove the button (replace with confirmation)
        await query.edit_message_reply_markup(
            reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("โœ… Added to list", callback_data="noop")]])
        )
        # Send list preview
        await query.message.reply_text(
            f"โœ… Added {result['added']} ingredients from *{title}*\n"
            f"Zones: {', '.join(result['zones'])}\n\n"
            "/list to view full list",
            parse_mode="Markdown"
        )

elif data == "noop":
    await query.answer()

def _extract_items_from_zone_text(text: str, zone_id: str) -> list[str]:
"""Extract item names from a formatted route message for a given zone."""
items = []
in_zone = False

for line in text.split("\n"):
    if f"Zone {zone_id}" in line:
        in_zone = True
        continue
    if line.startswith("๐Ÿ“ฆ Zone") and in_zone:
        break  # Next zone starts
    if in_zone and line.strip().startswith("โ˜"):
        item = line.strip().lstrip("โ˜").strip()
        if item:
            items.append(item)

return items

---------------------------------------------------------------------------

Application startup

---------------------------------------------------------------------------

def main():
if not BOT_TOKEN:
print("โŒ COSTCO_BOT_TOKEN env var not set", file=sys.stderr)
print("Get a token from @BotFather, then:", file=sys.stderr)
print(" export COSTCO_BOT_TOKEN=your-token-here", file=sys.stderr)
sys.exit(1)

app = Application.builder().token(BOT_TOKEN).build()

# Command handlers
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("help", cmd_help))
app.add_handler(CommandHandler("costco", cmd_costco))
app.add_handler(CommandHandler("learn", cmd_learn))
app.add_handler(CommandHandler("zones", cmd_zones))
app.add_handler(CommandHandler("stats", cmd_stats))
app.add_handler(CommandHandler("recipe", cmd_recipe))
app.add_handler(CommandHandler("recipes", cmd_recipes))
app.add_handler(CommandHandler("list", cmd_list))
app.add_handler(CommandHandler("add", cmd_add))
app.add_handler(CommandHandler("check", cmd_check))
app.add_handler(CommandHandler("clearlist", cmd_clearlist))

# Callback handler for inline buttons
app.add_handler(CallbackQueryHandler(handle_callback))

# Free-text handler โ€” any non-command message is a grocery list or URL
app.add_handler(
    MessageHandler(filters.TEXT & ~filters.COMMAND, handle_free_text)
)

# Run
logger.info("๐Ÿ›’ Costco Route Bot starting...")
app.run_polling(drop_pending_updates=True)

if name == "main":
main()