📄 grocery_list.py 7,399 bytes Apr 26, 2026 📋 Raw

"""Grocery list SQLite schema and CRUD.

Tables:
- grocery_list: Committed items from recipes
- recipe_temp_state: Ephemeral toggle state (5-min TTL)
"""

import json
import logging
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timedelta
from pathlib import Path

from icarus.core.config.staging import DATA_DIR

DB_PATH = DATA_DIR / "icarus.db"

---------------------------------------------------------------------------

Schema

---------------------------------------------------------------------------

SCHEMA = """
CREATE TABLE IF NOT EXISTS grocery_list (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item TEXT NOT NULL,
normalized_item TEXT,
quantity TEXT,
recipe_source TEXT,
recipe_url TEXT,
requested_by TEXT,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'pending'
);

CREATE TABLE IF NOT EXISTS recipe_temp_state (
recipe_id TEXT PRIMARY KEY,
user_id INTEGER,
chat_id INTEGER,
message_id INTEGER,
title TEXT,
source_url TEXT,
ingredients_json TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_grocery_status ON grocery_list(status);
CREATE INDEX IF NOT EXISTS idx_grocery_added_by ON grocery_list(requested_by);
CREATE INDEX IF NOT EXISTS idx_grocery_recipe ON grocery_list(recipe_source);
CREATE INDEX IF NOT EXISTS idx_temp_expires ON recipe_temp_state(expires_at);
"""

def _get_connection() -> sqlite3.Connection:
DATA_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn

def init_db():
"""Initialize tables. Called on startup."""
with _get_connection() as conn:
conn.executescript(SCHEMA)

def _clean_expired():
"""Remove expired temp state entries."""
with _get_connection() as conn:
conn.execute(
"DELETE FROM recipe_temp_state WHERE expires_at < datetime('now')"
)
conn.commit()

---------------------------------------------------------------------------

Grocery List CRUD

---------------------------------------------------------------------------

def add_items(items: list[dict], recipe_source: str, recipe_url: str | None, requested_by: str) -> int:
"""Add items to grocery list.

Args:
    items: List of dicts with 'text', optionally 'quantity'
    recipe_source: Recipe title
    recipe_url: Source URL
    requested_by: User name/ID

Returns:
    Number of items added.
"""
with _get_connection() as conn:
    for item in items:
        conn.execute(
            """
            INSERT INTO grocery_list (item, normalized_item, quantity, recipe_source, recipe_url, requested_by)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            (
                item["text"],
                _normalize_item(item["text"]),
                item.get("quantity"),
                recipe_source,
                recipe_url,
                requested_by,
            ),
        )
    conn.commit()
return len(items)

def list_items(status: str | None = None, requested_by: str | None = None) -> list[dict]:
"""List grocery items with optional filters."""
query = "SELECT * FROM grocery_list WHERE 1=1"
params = []
if status:
query += " AND status = ?"
params.append(status)
if requested_by:
query += " AND requested_by = ?"
params.append(requested_by)
query += " ORDER BY added_at DESC"

with _get_connection() as conn:
    rows = conn.execute(query, params).fetchall()
    return [dict(r) for r in rows]

def update_status(item_id: int, status: str) -> bool:
"""Update item status."""
valid = {"pending", "in_cart", "purchased", "skipped"}
if status not in valid:
return False
with _get_connection() as conn:
cur = conn.execute(
"UPDATE grocery_list SET status = ? WHERE id = ?",
(status, item_id),
)
conn.commit()
return cur.rowcount > 0

def clear_list(requested_by: str | None = None) -> int:
"""Clear items. If requested_by provided, only their items."""
with _get_connection() as conn:
if requested_by:
cur = conn.execute(
"DELETE FROM grocery_list WHERE requested_by = ?",
(requested_by,),
)
else:
cur = conn.execute("DELETE FROM grocery_list")
conn.commit()
return cur.rowcount

def _normalize_item(text: str) -> str:
"""Normalize item text for deduplication."""
import re
# Remove quantities, units, adjectives
text = re.sub(r"^\d+[\d\s./-]\s(cups?|tbsp|tsp|lbs?|oz|g|kg|ml|l|large|small|medium)\s", "", text, flags=re.I)
text = re.sub(r"([^)]
)", "", text)
text = re.sub(r"\s+", " ", text).strip().lower()
return text

---------------------------------------------------------------------------

Temp State CRUD

---------------------------------------------------------------------------

def save_temp_state(
recipe_id: str,
user_id: int,
chat_id: int,
message_id: int,
title: str,
source_url: str | None,
ingredients: list[dict],
) -> None:
"""Save temporary toggle state with 5-minute TTL."""
_clean_expired()
expires = datetime.now() + timedelta(minutes=5)

with _get_connection() as conn:
    conn.execute(
        """
        INSERT OR REPLACE INTO recipe_temp_state
        (recipe_id, user_id, chat_id, message_id, title, source_url, ingredients_json, expires_at)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """,
        (
            recipe_id,
            user_id,
            chat_id,
            message_id,
            title,
            source_url,
            json.dumps(ingredients),
            expires.isoformat(),
        ),
    )
    conn.commit()

def get_temp_state(recipe_id: str) -> dict | None:
"""Retrieve temp state if not expired."""
_clean_expired()

with _get_connection() as conn:
    row = conn.execute(
        "SELECT * FROM recipe_temp_state WHERE recipe_id = ? AND expires_at > datetime('now')",
        (recipe_id,),
    ).fetchone()

    if not row:
        return None

    return {
        "recipe_id": row["recipe_id"],
        "user_id": row["user_id"],
        "chat_id": row["chat_id"],
        "message_id": row["message_id"],
        "title": row["title"],
        "source_url": row["source_url"],
        "ingredients": json.loads(row["ingredients_json"]),
    }

def update_temp_state(recipe_id: str, ingredients: list[dict]) -> None:
"""Update ingredients in temp state."""
with _get_connection() as conn:
conn.execute(
"UPDATE recipe_temp_state SET ingredients_json = ? WHERE recipe_id = ?",
(json.dumps(ingredients), recipe_id),
)
conn.commit()

def delete_temp_state(recipe_id: str) -> None:
"""Delete temp state."""
with _get_connection() as conn:
conn.execute("DELETE FROM recipe_temp_state WHERE recipe_id = ?", (recipe_id,))
conn.commit()