"""Grocery list SQLite schema and CRUD. Tables: - grocery_list: Committed items from recipes - recipe_temp_state: Ephemeral toggle state (5-min TTL) """ import json import logging import sqlite3 from contextlib import contextmanager from datetime import datetime, timedelta from pathlib import Path from icarus.core.config.staging import DATA_DIR DB_PATH = DATA_DIR / "icarus.db" # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- SCHEMA = """ CREATE TABLE IF NOT EXISTS grocery_list ( id INTEGER PRIMARY KEY AUTOINCREMENT, item TEXT NOT NULL, normalized_item TEXT, quantity TEXT, recipe_source TEXT, recipe_url TEXT, requested_by TEXT, added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status TEXT DEFAULT 'pending' ); CREATE TABLE IF NOT EXISTS recipe_temp_state ( recipe_id TEXT PRIMARY KEY, user_id INTEGER, chat_id INTEGER, message_id INTEGER, title TEXT, source_url TEXT, ingredients_json TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NOT NULL ); CREATE INDEX IF NOT EXISTS idx_grocery_status ON grocery_list(status); CREATE INDEX IF NOT EXISTS idx_grocery_added_by ON grocery_list(requested_by); CREATE INDEX IF NOT EXISTS idx_grocery_recipe ON grocery_list(recipe_source); CREATE INDEX IF NOT EXISTS idx_temp_expires ON recipe_temp_state(expires_at); """ def _get_connection() -> sqlite3.Connection: DATA_DIR.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row return conn def init_db(): """Initialize tables. Called on startup.""" with _get_connection() as conn: conn.executescript(SCHEMA) def _clean_expired(): """Remove expired temp state entries.""" with _get_connection() as conn: conn.execute( "DELETE FROM recipe_temp_state WHERE expires_at < datetime('now')" ) conn.commit() # --------------------------------------------------------------------------- # Grocery List CRUD # --------------------------------------------------------------------------- def add_items(items: list[dict], recipe_source: str, recipe_url: str | None, requested_by: str) -> int: """Add items to grocery list. Args: items: List of dicts with 'text', optionally 'quantity' recipe_source: Recipe title recipe_url: Source URL requested_by: User name/ID Returns: Number of items added. """ with _get_connection() as conn: for item in items: conn.execute( """ INSERT INTO grocery_list (item, normalized_item, quantity, recipe_source, recipe_url, requested_by) VALUES (?, ?, ?, ?, ?, ?) """, ( item["text"], _normalize_item(item["text"]), item.get("quantity"), recipe_source, recipe_url, requested_by, ), ) conn.commit() return len(items) def list_items(status: str | None = None, requested_by: str | None = None) -> list[dict]: """List grocery items with optional filters.""" query = "SELECT * FROM grocery_list WHERE 1=1" params = [] if status: query += " AND status = ?" params.append(status) if requested_by: query += " AND requested_by = ?" params.append(requested_by) query += " ORDER BY added_at DESC" with _get_connection() as conn: rows = conn.execute(query, params).fetchall() return [dict(r) for r in rows] def update_status(item_id: int, status: str) -> bool: """Update item status.""" valid = {"pending", "in_cart", "purchased", "skipped"} if status not in valid: return False with _get_connection() as conn: cur = conn.execute( "UPDATE grocery_list SET status = ? WHERE id = ?", (status, item_id), ) conn.commit() return cur.rowcount > 0 def clear_list(requested_by: str | None = None) -> int: """Clear items. If requested_by provided, only their items.""" with _get_connection() as conn: if requested_by: cur = conn.execute( "DELETE FROM grocery_list WHERE requested_by = ?", (requested_by,), ) else: cur = conn.execute("DELETE FROM grocery_list") conn.commit() return cur.rowcount def _normalize_item(text: str) -> str: """Normalize item text for deduplication.""" import re # Remove quantities, units, adjectives text = re.sub(r"^\d+[\d\s./-]*\s*(cups?|tbsp|tsp|lbs?|oz|g|kg|ml|l|large|small|medium)\s*", "", text, flags=re.I) text = re.sub(r"\([^)]*\)", "", text) text = re.sub(r"\s+", " ", text).strip().lower() return text # --------------------------------------------------------------------------- # Temp State CRUD # --------------------------------------------------------------------------- def save_temp_state( recipe_id: str, user_id: int, chat_id: int, message_id: int, title: str, source_url: str | None, ingredients: list[dict], ) -> None: """Save temporary toggle state with 5-minute TTL.""" _clean_expired() expires = datetime.now() + timedelta(minutes=5) with _get_connection() as conn: conn.execute( """ INSERT OR REPLACE INTO recipe_temp_state (recipe_id, user_id, chat_id, message_id, title, source_url, ingredients_json, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( recipe_id, user_id, chat_id, message_id, title, source_url, json.dumps(ingredients), expires.isoformat(), ), ) conn.commit() def get_temp_state(recipe_id: str) -> dict | None: """Retrieve temp state if not expired.""" _clean_expired() with _get_connection() as conn: row = conn.execute( "SELECT * FROM recipe_temp_state WHERE recipe_id = ? AND expires_at > datetime('now')", (recipe_id,), ).fetchone() if not row: return None return { "recipe_id": row["recipe_id"], "user_id": row["user_id"], "chat_id": row["chat_id"], "message_id": row["message_id"], "title": row["title"], "source_url": row["source_url"], "ingredients": json.loads(row["ingredients_json"]), } def update_temp_state(recipe_id: str, ingredients: list[dict]) -> None: """Update ingredients in temp state.""" with _get_connection() as conn: conn.execute( "UPDATE recipe_temp_state SET ingredients_json = ? WHERE recipe_id = ?", (json.dumps(ingredients), recipe_id), ) conn.commit() def delete_temp_state(recipe_id: str) -> None: """Delete temp state.""" with _get_connection() as conn: conn.execute("DELETE FROM recipe_temp_state WHERE recipe_id = ?", (recipe_id,)) conn.commit()