"""Grocery list SQLite schema and CRUD.
Tables:
- grocery_list: Committed items from recipes
- recipe_temp_state: Ephemeral toggle state (5-min TTL)
"""
import json
import logging
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timedelta
from pathlib import Path
from icarus.core.config.staging import DATA_DIR
DB_PATH = DATA_DIR / "icarus.db"
---------------------------------------------------------------------------
Schema
---------------------------------------------------------------------------
SCHEMA = """
CREATE TABLE IF NOT EXISTS grocery_list (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item TEXT NOT NULL,
normalized_item TEXT,
quantity TEXT,
recipe_source TEXT,
recipe_url TEXT,
requested_by TEXT,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'pending'
);
CREATE TABLE IF NOT EXISTS recipe_temp_state (
recipe_id TEXT PRIMARY KEY,
user_id INTEGER,
chat_id INTEGER,
message_id INTEGER,
title TEXT,
source_url TEXT,
ingredients_json TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_grocery_status ON grocery_list(status);
CREATE INDEX IF NOT EXISTS idx_grocery_added_by ON grocery_list(requested_by);
CREATE INDEX IF NOT EXISTS idx_grocery_recipe ON grocery_list(recipe_source);
CREATE INDEX IF NOT EXISTS idx_temp_expires ON recipe_temp_state(expires_at);
"""
def _get_connection() -> sqlite3.Connection:
DATA_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""Initialize tables. Called on startup."""
with _get_connection() as conn:
conn.executescript(SCHEMA)
def _clean_expired():
"""Remove expired temp state entries."""
with _get_connection() as conn:
conn.execute(
"DELETE FROM recipe_temp_state WHERE expires_at < datetime('now')"
)
conn.commit()
---------------------------------------------------------------------------
Grocery List CRUD
---------------------------------------------------------------------------
def add_items(items: list[dict], recipe_source: str, recipe_url: str | None, requested_by: str) -> int:
"""Add items to grocery list.
Args:
items: List of dicts with 'text', optionally 'quantity'
recipe_source: Recipe title
recipe_url: Source URL
requested_by: User name/ID
Returns:
Number of items added.
"""
with _get_connection() as conn:
for item in items:
conn.execute(
"""
INSERT INTO grocery_list (item, normalized_item, quantity, recipe_source, recipe_url, requested_by)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
item["text"],
_normalize_item(item["text"]),
item.get("quantity"),
recipe_source,
recipe_url,
requested_by,
),
)
conn.commit()
return len(items)
def list_items(status: str | None = None, requested_by: str | None = None) -> list[dict]:
"""List grocery items with optional filters."""
query = "SELECT * FROM grocery_list WHERE 1=1"
params = []
if status:
query += " AND status = ?"
params.append(status)
if requested_by:
query += " AND requested_by = ?"
params.append(requested_by)
query += " ORDER BY added_at DESC"
with _get_connection() as conn:
rows = conn.execute(query, params).fetchall()
return [dict(r) for r in rows]
def update_status(item_id: int, status: str) -> bool:
"""Update item status."""
valid = {"pending", "in_cart", "purchased", "skipped"}
if status not in valid:
return False
with _get_connection() as conn:
cur = conn.execute(
"UPDATE grocery_list SET status = ? WHERE id = ?",
(status, item_id),
)
conn.commit()
return cur.rowcount > 0
def clear_list(requested_by: str | None = None) -> int:
"""Clear items. If requested_by provided, only their items."""
with _get_connection() as conn:
if requested_by:
cur = conn.execute(
"DELETE FROM grocery_list WHERE requested_by = ?",
(requested_by,),
)
else:
cur = conn.execute("DELETE FROM grocery_list")
conn.commit()
return cur.rowcount
def _normalize_item(text: str) -> str:
"""Normalize item text for deduplication."""
import re
# Remove quantities, units, adjectives
text = re.sub(r"^\d+[\d\s./-]\s(cups?|tbsp|tsp|lbs?|oz|g|kg|ml|l|large|small|medium)\s", "", text, flags=re.I)
text = re.sub(r"([^)])", "", text)
text = re.sub(r"\s+", " ", text).strip().lower()
return text
---------------------------------------------------------------------------
Temp State CRUD
---------------------------------------------------------------------------
def save_temp_state(
recipe_id: str,
user_id: int,
chat_id: int,
message_id: int,
title: str,
source_url: str | None,
ingredients: list[dict],
) -> None:
"""Save temporary toggle state with 5-minute TTL."""
_clean_expired()
expires = datetime.now() + timedelta(minutes=5)
with _get_connection() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO recipe_temp_state
(recipe_id, user_id, chat_id, message_id, title, source_url, ingredients_json, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
recipe_id,
user_id,
chat_id,
message_id,
title,
source_url,
json.dumps(ingredients),
expires.isoformat(),
),
)
conn.commit()
def get_temp_state(recipe_id: str) -> dict | None:
"""Retrieve temp state if not expired."""
_clean_expired()
with _get_connection() as conn:
row = conn.execute(
"SELECT * FROM recipe_temp_state WHERE recipe_id = ? AND expires_at > datetime('now')",
(recipe_id,),
).fetchone()
if not row:
return None
return {
"recipe_id": row["recipe_id"],
"user_id": row["user_id"],
"chat_id": row["chat_id"],
"message_id": row["message_id"],
"title": row["title"],
"source_url": row["source_url"],
"ingredients": json.loads(row["ingredients_json"]),
}
def update_temp_state(recipe_id: str, ingredients: list[dict]) -> None:
"""Update ingredients in temp state."""
with _get_connection() as conn:
conn.execute(
"UPDATE recipe_temp_state SET ingredients_json = ? WHERE recipe_id = ?",
(json.dumps(ingredients), recipe_id),
)
conn.commit()
def delete_temp_state(recipe_id: str) -> None:
"""Delete temp state."""
with _get_connection() as conn:
conn.execute("DELETE FROM recipe_temp_state WHERE recipe_id = ?", (recipe_id,))
conn.commit()