πŸ“„ seed_instacart.py 11,467 bytes Apr 19, 2026 πŸ“‹ Raw

"""Seed Costco Route ChromaDB from Instacart Market Basket dataset.

Maps Instacart's 21 departments + 134 aisles β†’ 10 Costco warehouse zones,
then upserts all ~50K products into ChromaDB with embeddings.

Usage:
python seed_instacart.py /path/to/instacart_csvs [--dry-run] [--limit N]
"""

import argparse
import csv
import json
import os
import sys
from pathlib import Path

import requests

---------------------------------------------------------------------------

Instacart department β†’ Costco zone mapping

---------------------------------------------------------------------------

Costco zones (from config.py):

01 Electronics / Entrance

02 Seasonal / Center

03 Health & Beauty

04 Pantry / Snacks

05 Beverages

06 Dairy / Cold Room

07 Fresh / Bakery / Meat

08 Household / Cleaning

09 Freezer

10 Checkout / Food Court

DEPARTMENT_TO_ZONE = {
"1": "09", # frozen β†’ Freezer
"2": "04", # other β†’ Pantry (catch-all)
"3": "07", # bakery β†’ Fresh / Bakery / Meat
"4": "07", # produce β†’ Fresh / Bakery / Meat
"5": "05", # alcohol β†’ Beverages
"6": "04", # international β†’ Pantry (center aisles)
"7": "05", # beverages β†’ Beverages
"8": "04", # pets β†’ Pantry (pet food in center aisles)
"9": "04", # dry goods pasta β†’ Pantry
"10": "04", # bulk β†’ Pantry (bulk section is center aisles)
"11": "03", # personal care β†’ Health & Beauty
"12": "07", # meat seafood β†’ Fresh / Bakery / Meat
"13": "04", # pantry β†’ Pantry
"14": "04", # breakfast β†’ Pantry (cereal/granola bars in center aisles)
"15": "04", # canned goods β†’ Pantry
"16": "06", # dairy eggs β†’ Dairy / Cold Room
"17": "08", # household β†’ Household / Cleaning
"18": "02", # babies β†’ Seasonal (baby section varies, often seasonal area)
"19": "04", # snacks β†’ Pantry / Snacks
"20": "07", # deli β†’ Fresh / Bakery / Meat
"21": "04", # missing β†’ Pantry (fallback)
}

Aisle-level overrides: when an aisle's items belong in a different zone

than its department's default. Key = aisle_id, Value = zone_id.

AISLE_TO_ZONE_OVERRIDE = {
# Specialty cheeses are in Dairy/Cold, not with snacks/pantry
"2": "06", # specialty cheeses β†’ Dairy
# Packaged cheese β†’ Dairy
"37": "06", # packaged cheese β†’ Dairy (wait, let me check the aisle IDs)
}

We'll build aisle overrides dynamically after loading the data

Some aisles cross department boundaries at Costco

---------------------------------------------------------------------------

Embedding function

---------------------------------------------------------------------------

EMBED_URL = os.environ.get(
"EMBED_URL",
"http://localhost:11434/api/embeddings",
)
EMBED_MODEL = os.environ.get("EMBED_MODEL", "nomic-embed-text")

def get_embedding(text: str) -> list[float]:
"""Get embedding from Ollama API."""
try:
resp = requests.post(
EMBED_URL,
json={"model": EMBED_MODEL, "prompt": text},
timeout=30,
)
resp.raise_for_status()
return resp.json()["embedding"]
except (requests.RequestException, KeyError) as e:
print(f" ⚠️ Embedding failed for '{text[:40]}...': {e}", file=sys.stderr)
return None

---------------------------------------------------------------------------

Main seed logic

---------------------------------------------------------------------------

def seed(csv_dir: str, dry_run: bool = False, limit: int = 0):
"""Load Instacart CSVs and seed ChromaDB."""
csv_path = Path(csv_dir)

# Load departments
dept_names = {}
with open(csv_path / "departments.csv") as f:
    for row in csv.DictReader(f):
        dept_names[row["department_id"]] = row["department"]

# Load aisles
aisle_names = {}
with open(csv_path / "aisles.csv") as f:
    for row in csv.DictReader(f):
        aisle_names[row["aisle_id"]] = row["aisle"]

# Build aisle-level zone overrides
# These aisles have products that belong in a different Costco zone
# than their department default
aisle_overrides = _build_aisle_overrides(aisle_names)

# Load products
products = []
with open(csv_path / "products.csv") as f:
    for row in csv.DictReader(f):
        products.append(row)

if limit > 0:
    products = products[:limit]

print(f"πŸ“¦ Loaded {len(products)} products from Instacart dataset")
print(f"   {len(dept_names)} departments, {len(aisle_names)} aisles")
print()

# Map each product to a Costco zone
zone_products = {}  # zone_id β†’ [(product_name, aisle, department)]
unmapped = []

for prod in products:
    dept_id = prod["department_id"]
    aisle_id = prod["aisle_id"]
    product_name = prod["product_name"]

    # Check aisle-level override first
    if aisle_id in aisle_overrides:
        zone_id = aisle_overrides[aisle_id]
    elif dept_id in DEPARTMENT_TO_ZONE:
        zone_id = DEPARTMENT_TO_ZONE[dept_id]
    else:
        zone_id = "04"  # fallback to Pantry
        unmapped.append(product_name)

    zone_products.setdefault(zone_id, []).append(
        (product_name, aisle_names.get(aisle_id, "?"), dept_names.get(dept_id, "?"))
    )

# Print zone distribution
from costco_route.config import ZONES

print("πŸ—ΊοΈ  Zone Distribution:")
for zid in ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10"]:
    count = len(zone_products.get(zid, []))
    zname = ZONES.get(zid, {}).get("name", "Unknown")
    bar = "β–ˆ" * (count // 100)
    print(f"  Zone {zid} ({zname}): {count:>5} items  {bar}")

if unmapped:
    print(f"\n⚠️  {len(unmapped)} products fell to Pantry fallback")

if dry_run:
    print(f"\nπŸ§ͺ DRY RUN β€” no ChromaDB writes. Use --limit to test a subset.")
    # Show some samples per zone
    for zid in ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10"]:
        items = zone_products.get(zid, [])[:5]
        if items:
            print(f"\n  Zone {zid} samples:")
            for name, aisle, dept in items:
                print(f"    β€’ {name} (aisle: {aisle}, dept: {dept})")
    return

import chromadb
from costco_route.config import CHROMA_PATH, CHROMA_COLLECTION

# Flush prints immediately for progress tracking
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, line_buffering=True)
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, line_buffering=True)

client = chromadb.PersistentClient(path=str(CHROMA_PATH))
collection = client.get_or_create_collection(
    name=CHROMA_COLLECTION,
    metadata={"hnsw:space": "cosine"},
)

# Seed in batches
BATCH_SIZE = 100
total_upserted = 0
total_failed = 0

print(f"\n🌱 Seeding ChromaDB at {CHROMA_PATH}...")
print(f"   Collection: {CHROMA_COLLECTION}")
print()

all_items = []
for zid, items in zone_products.items():
    zname = ZONES.get(zid, {}).get("name", "Unknown")
    for product_name, aisle, dept in items:
        all_items.append((product_name, zid, zname, aisle, dept))

for i in range(0, len(all_items), BATCH_SIZE):
    batch = all_items[i : i + BATCH_SIZE]
    ids = []
    docs = []
    metas = []
    embeds = []

    for product_name, zid, zname, aisle, dept in batch:
        # Create a searchable document with context
        doc = f"{product_name} (aisle: {aisle}, dept: {dept})"
        ids.append(f"instacart_{product_name.lower().replace(' ', '_')}")
        docs.append(doc)
        metas.append({
            "zone": zid,
            "zone_name": zname,
            "source": "instacart",
            "aisle": aisle,
            "department": dept,
            "item_name": product_name,
        })

    # Get embeddings for batch
    for doc_text in docs:
        emb = get_embedding(doc_text)
        if emb:
            embeds.append(emb)
        else:
            embeds.append([0.0] * 768)  # zero vector fallback
            total_failed += 1

    # Upsert batch
    try:
        collection.upsert(
            ids=ids,
            documents=docs,
            embeddings=embeds,
            metadatas=metas,
        )
        total_upserted += len(batch)
    except Exception as e:
        print(f"  ❌ Batch {i//BATCH_SIZE} failed: {e}", file=sys.stderr)
        total_failed += len(batch)

    # Progress
    pct = min(i + BATCH_SIZE, len(all_items)) / len(all_items) * 100
    print(f"  {pct:5.1f}% β€” {total_upserted} upserted, {total_failed} failed", end="\r")

print()
print(f"\nβœ… Seeding complete: {total_upserted} products upserted, {total_failed} failed")

# Verify
count = collection.count()
print(f"   Collection now has {count} total documents")

def _build_aisle_overrides(aisle_names: dict[str, str]) -> dict[str, str]:
"""Build aisle-level zone overrides for items that don't match their department.

At Costco, some items are in different zones than their Instacart department
would suggest. For example:
- Specialty cheeses (Instacart: snacks dept) β†’ Costco Zone 06 (Dairy/Cold)
- Packaged cheese (Instacart: dairy) β†’ stays in Zone 06 βœ…
- Bread (Instacart: bakery) β†’ Costco Zone 07 (Fresh/Bakery) βœ…
- Vitamins (Instacart: personal care) β†’ Costco Zone 03 βœ…
"""
overrides = {}

for aid, name in aisle_names.items():
    name_lower = name.lower()

    # Cheese aisles β†’ Dairy (Zone 06) regardless of department
    if "cheese" in name_lower:
        overrides[aid] = "06"

    # Bread/bakery β†’ Zone 07 (already correct from dept mapping, but be explicit)
    elif name_lower in ("bread", "bakery desserts", "breakfast bakery",
                        "buns rolls", "tortillas flat bread"):
        overrides[aid] = "07"

    # Hot dogs/bacon/sausage β†’ could be Zone 07 (meat) not Zone 09 (frozen)
    elif name_lower == "hot dogs bacon sausage":
        overrides[aid] = "07"

    # Baby items β†’ at Costco, baby stuff is often in the center/seasonal area
    # Keep Zone 02 from dept mapping

    # Protein/energy bars β†’ Zone 04 (Pantry/Snacks), not Zone 03 (Health/Beauty)
    elif "energy" in name_lower or "protein" in name_lower:
        overrides[aid] = "04"

return overrides

def main():
parser = argparse.ArgumentParser(
description="Seed Costco Route ChromaDB from Instacart dataset"
)
parser.add_argument(
"csv_dir",
help="Path to directory containing Instacart CSV files"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show mapping without writing to ChromaDB"
)
parser.add_argument(
"--limit",
type=int,
default=0,
help="Limit number of products to seed (0 = all)"
)
args = parser.parse_args()

if not Path(args.csv_dir).exists():
    print(f"❌ Directory not found: {args.csv_dir}", file=sys.stderr)
    sys.exit(1)

seed(args.csv_dir, dry_run=args.dry_run, limit=args.limit)

if name == "main":
main()