"""Seed Costco Route ChromaDB from Instacart Market Basket dataset. Maps Instacart's 21 departments + 134 aisles → 10 Costco warehouse zones, then upserts all ~50K products into ChromaDB with embeddings. Usage: python seed_instacart.py /path/to/instacart_csvs [--dry-run] [--limit N] """ import argparse import csv import json import os import sys from pathlib import Path import requests # --------------------------------------------------------------------------- # Instacart department → Costco zone mapping # --------------------------------------------------------------------------- # Costco zones (from config.py): # 01 Electronics / Entrance # 02 Seasonal / Center # 03 Health & Beauty # 04 Pantry / Snacks # 05 Beverages # 06 Dairy / Cold Room # 07 Fresh / Bakery / Meat # 08 Household / Cleaning # 09 Freezer # 10 Checkout / Food Court DEPARTMENT_TO_ZONE = { "1": "09", # frozen → Freezer "2": "04", # other → Pantry (catch-all) "3": "07", # bakery → Fresh / Bakery / Meat "4": "07", # produce → Fresh / Bakery / Meat "5": "05", # alcohol → Beverages "6": "04", # international → Pantry (center aisles) "7": "05", # beverages → Beverages "8": "04", # pets → Pantry (pet food in center aisles) "9": "04", # dry goods pasta → Pantry "10": "04", # bulk → Pantry (bulk section is center aisles) "11": "03", # personal care → Health & Beauty "12": "07", # meat seafood → Fresh / Bakery / Meat "13": "04", # pantry → Pantry "14": "04", # breakfast → Pantry (cereal/granola bars in center aisles) "15": "04", # canned goods → Pantry "16": "06", # dairy eggs → Dairy / Cold Room "17": "08", # household → Household / Cleaning "18": "02", # babies → Seasonal (baby section varies, often seasonal area) "19": "04", # snacks → Pantry / Snacks "20": "07", # deli → Fresh / Bakery / Meat "21": "04", # missing → Pantry (fallback) } # Aisle-level overrides: when an aisle's items belong in a different zone # than its department's default. Key = aisle_id, Value = zone_id. AISLE_TO_ZONE_OVERRIDE = { # Specialty cheeses are in Dairy/Cold, not with snacks/pantry "2": "06", # specialty cheeses → Dairy # Packaged cheese → Dairy "37": "06", # packaged cheese → Dairy (wait, let me check the aisle IDs) } # We'll build aisle overrides dynamically after loading the data # Some aisles cross department boundaries at Costco # --------------------------------------------------------------------------- # Embedding function # --------------------------------------------------------------------------- EMBED_URL = os.environ.get( "EMBED_URL", "http://localhost:11434/api/embeddings", ) EMBED_MODEL = os.environ.get("EMBED_MODEL", "nomic-embed-text") def get_embedding(text: str) -> list[float]: """Get embedding from Ollama API.""" try: resp = requests.post( EMBED_URL, json={"model": EMBED_MODEL, "prompt": text}, timeout=30, ) resp.raise_for_status() return resp.json()["embedding"] except (requests.RequestException, KeyError) as e: print(f" ⚠️ Embedding failed for '{text[:40]}...': {e}", file=sys.stderr) return None # --------------------------------------------------------------------------- # Main seed logic # --------------------------------------------------------------------------- def seed(csv_dir: str, dry_run: bool = False, limit: int = 0): """Load Instacart CSVs and seed ChromaDB.""" csv_path = Path(csv_dir) # Load departments dept_names = {} with open(csv_path / "departments.csv") as f: for row in csv.DictReader(f): dept_names[row["department_id"]] = row["department"] # Load aisles aisle_names = {} with open(csv_path / "aisles.csv") as f: for row in csv.DictReader(f): aisle_names[row["aisle_id"]] = row["aisle"] # Build aisle-level zone overrides # These aisles have products that belong in a different Costco zone # than their department default aisle_overrides = _build_aisle_overrides(aisle_names) # Load products products = [] with open(csv_path / "products.csv") as f: for row in csv.DictReader(f): products.append(row) if limit > 0: products = products[:limit] print(f"📦 Loaded {len(products)} products from Instacart dataset") print(f" {len(dept_names)} departments, {len(aisle_names)} aisles") print() # Map each product to a Costco zone zone_products = {} # zone_id → [(product_name, aisle, department)] unmapped = [] for prod in products: dept_id = prod["department_id"] aisle_id = prod["aisle_id"] product_name = prod["product_name"] # Check aisle-level override first if aisle_id in aisle_overrides: zone_id = aisle_overrides[aisle_id] elif dept_id in DEPARTMENT_TO_ZONE: zone_id = DEPARTMENT_TO_ZONE[dept_id] else: zone_id = "04" # fallback to Pantry unmapped.append(product_name) zone_products.setdefault(zone_id, []).append( (product_name, aisle_names.get(aisle_id, "?"), dept_names.get(dept_id, "?")) ) # Print zone distribution from costco_route.config import ZONES print("🗺️ Zone Distribution:") for zid in ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10"]: count = len(zone_products.get(zid, [])) zname = ZONES.get(zid, {}).get("name", "Unknown") bar = "█" * (count // 100) print(f" Zone {zid} ({zname}): {count:>5} items {bar}") if unmapped: print(f"\n⚠️ {len(unmapped)} products fell to Pantry fallback") if dry_run: print(f"\n🧪 DRY RUN — no ChromaDB writes. Use --limit to test a subset.") # Show some samples per zone for zid in ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10"]: items = zone_products.get(zid, [])[:5] if items: print(f"\n Zone {zid} samples:") for name, aisle, dept in items: print(f" • {name} (aisle: {aisle}, dept: {dept})") return import chromadb from costco_route.config import CHROMA_PATH, CHROMA_COLLECTION # Flush prints immediately for progress tracking import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True) sys.stderr = io.TextIOWrapper(sys.stderr.buffer, line_buffering=True) client = chromadb.PersistentClient(path=str(CHROMA_PATH)) collection = client.get_or_create_collection( name=CHROMA_COLLECTION, metadata={"hnsw:space": "cosine"}, ) # Seed in batches BATCH_SIZE = 100 total_upserted = 0 total_failed = 0 print(f"\n🌱 Seeding ChromaDB at {CHROMA_PATH}...") print(f" Collection: {CHROMA_COLLECTION}") print() all_items = [] for zid, items in zone_products.items(): zname = ZONES.get(zid, {}).get("name", "Unknown") for product_name, aisle, dept in items: all_items.append((product_name, zid, zname, aisle, dept)) for i in range(0, len(all_items), BATCH_SIZE): batch = all_items[i : i + BATCH_SIZE] ids = [] docs = [] metas = [] embeds = [] for product_name, zid, zname, aisle, dept in batch: # Create a searchable document with context doc = f"{product_name} (aisle: {aisle}, dept: {dept})" ids.append(f"instacart_{product_name.lower().replace(' ', '_')}") docs.append(doc) metas.append({ "zone": zid, "zone_name": zname, "source": "instacart", "aisle": aisle, "department": dept, "item_name": product_name, }) # Get embeddings for batch for doc_text in docs: emb = get_embedding(doc_text) if emb: embeds.append(emb) else: embeds.append([0.0] * 768) # zero vector fallback total_failed += 1 # Upsert batch try: collection.upsert( ids=ids, documents=docs, embeddings=embeds, metadatas=metas, ) total_upserted += len(batch) except Exception as e: print(f" ❌ Batch {i//BATCH_SIZE} failed: {e}", file=sys.stderr) total_failed += len(batch) # Progress pct = min(i + BATCH_SIZE, len(all_items)) / len(all_items) * 100 print(f" {pct:5.1f}% — {total_upserted} upserted, {total_failed} failed", end="\r") print() print(f"\n✅ Seeding complete: {total_upserted} products upserted, {total_failed} failed") # Verify count = collection.count() print(f" Collection now has {count} total documents") def _build_aisle_overrides(aisle_names: dict[str, str]) -> dict[str, str]: """Build aisle-level zone overrides for items that don't match their department. At Costco, some items are in different zones than their Instacart department would suggest. For example: - Specialty cheeses (Instacart: snacks dept) → Costco Zone 06 (Dairy/Cold) - Packaged cheese (Instacart: dairy) → stays in Zone 06 ✅ - Bread (Instacart: bakery) → Costco Zone 07 (Fresh/Bakery) ✅ - Vitamins (Instacart: personal care) → Costco Zone 03 ✅ """ overrides = {} for aid, name in aisle_names.items(): name_lower = name.lower() # Cheese aisles → Dairy (Zone 06) regardless of department if "cheese" in name_lower: overrides[aid] = "06" # Bread/bakery → Zone 07 (already correct from dept mapping, but be explicit) elif name_lower in ("bread", "bakery desserts", "breakfast bakery", "buns rolls", "tortillas flat bread"): overrides[aid] = "07" # Hot dogs/bacon/sausage → could be Zone 07 (meat) not Zone 09 (frozen) elif name_lower == "hot dogs bacon sausage": overrides[aid] = "07" # Baby items → at Costco, baby stuff is often in the center/seasonal area # Keep Zone 02 from dept mapping # Protein/energy bars → Zone 04 (Pantry/Snacks), not Zone 03 (Health/Beauty) elif "energy" in name_lower or "protein" in name_lower: overrides[aid] = "04" return overrides def main(): parser = argparse.ArgumentParser( description="Seed Costco Route ChromaDB from Instacart dataset" ) parser.add_argument( "csv_dir", help="Path to directory containing Instacart CSV files" ) parser.add_argument( "--dry-run", action="store_true", help="Show mapping without writing to ChromaDB" ) parser.add_argument( "--limit", type=int, default=0, help="Limit number of products to seed (0 = all)" ) args = parser.parse_args() if not Path(args.csv_dir).exists(): print(f"❌ Directory not found: {args.csv_dir}", file=sys.stderr) sys.exit(1) seed(args.csv_dir, dry_run=args.dry_run, limit=args.limit) if __name__ == "__main__": main()