"""Recipe extractor — fetch recipe URLs and extract structured ingredients + instructions.
Uses a two-step approach:
1. Fetch URL → strip HTML to clean text (BeautifulSoup)
2. Route cleaned text through local LLM → structured JSON (ingredients, instructions, metadata)
Then classify ingredients into Costco zones using the existing pipeline
and store recipes in a family dinner Rolodex (JSON file).
"""
import json
import logging
import os
import re
from datetime import datetime
from pathlib import Path
import requests
from bs4 import BeautifulSoup
from costco_route.config import LLM_URL, LLM_MODEL
from costco_route.llm_client import _call_llm
from costco_route.pipeline import optimize
---------------------------------------------------------------------------
Recipe storage
---------------------------------------------------------------------------
RECIPES_DIR = Path(os.environ.get("RECIPES_DIR", os.path.expanduser("~/.costco_route/recipes")))
def _ensure_recipes_dir() -> Path:
"""Create recipes directory if it doesn't exist."""
RECIPES_DIR.mkdir(parents=True, exist_ok=True)
return RECIPES_DIR
---------------------------------------------------------------------------
Structured data helpers (ld+json parsing)
---------------------------------------------------------------------------
def _parse_servings(yield_str: str | None) -> str | None:
"""Parse recipeYield into clean servings string."""
if not yield_str:
return None
yield_str = str(yield_str).strip()
# Handle "Serves 4-6" or "4-6 servings" or just "4"
if match := re.search(r'(?i)(serves?\s+)?(\d+[\s-]\d)', yield_str):
return match.group(2).strip()
return yield_str if yield_str else None
def _parse_duration(iso_duration: str | None) -> str | None:
"""Parse ISO 8601 duration (PT30M) into readable string."""
if not iso_duration:
return None
# PT30M → 30 min, PT1H30M → 1 hr 30 min
match = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?', str(iso_duration))
if not match:
return str(iso_duration)
hours, minutes = match.groups()
parts = []
if hours:
parts.append(f"{hours} hr")
if minutes:
parts.append(f"{minutes} min")
return " ".join(parts) if parts else None
def _clean_ingredients(ingredients: list) -> list[str]:
"""Clean ingredient strings (remove newlines, extra spaces)."""
cleaned = []
for ing in ingredients:
if ing:
cleaned.append(re.sub(r'\s+', ' ', str(ing).replace('\r', '').replace('\n', ' ')).strip())
return [i for i in cleaned if i]
def _parse_instructions(instructions: list | str) -> list[str]:
"""Parse recipeInstructions from ld+json (handles HowToStep or plain text)."""
if isinstance(instructions, str):
# Split on numbers or newlines
steps = re.split(r'\n+|\d+)\s*', instructions)
return [s.strip() for s in steps if s.strip()]
result = []
for step in instructions:
if isinstance(step, dict):
# HowToStep
text = step.get("text", step.get("name", ""))
if text:
result.append(str(text).strip())
elif isinstance(step, str):
result.append(step.strip())
return result
def _extract_tags(ld_recipe: dict) -> list[str]:
"""Extract tags from recipeCategory and recipeCuisine."""
tags = []
for field in ["recipeCategory", "recipeCuisine", "keywords"]:
val = ld_recipe.get(field)
if val:
if isinstance(val, str):
tags.extend([t.strip() for t in val.split(",") if t.strip()])
elif isinstance(val, list):
tags.extend([str(v).strip() for v in val if v])
# Add some inference
name = ld_recipe.get("name", "").lower()
if any(w in name for w in ["chicken", "beef", "pork", "fish"]):
tags.append("dinner")
return list(set(tags))[:8] # Deduplicate and limit
---------------------------------------------------------------------------
URL fetching + HTML cleaning
---------------------------------------------------------------------------
RECIPE_FETCH_PROMPT = """SYSTEM: You are a recipe extraction engine. Extract the recipe from the following text, which was scraped from a recipe website.
The text will contain ads, navigation, stories, and other garbage. Ignore ALL of that. Extract ONLY the recipe.
Return a JSON object with this exact structure:
{{
"title": "Recipe title",
"servings": "Number of servings (e.g., '4' or '6-8')",
"prep_time": "Prep time if stated (e.g., '15 min'), null if not found",
"cook_time": "Cook time if stated (e.g., '30 min'), null if not found",
"total_time": "Total time if stated, null if not found",
"ingredients": ["ingredient 1", "ingredient 2", ...],
"instructions": ["step 1", "step 2", ...],
"tags": ["dinner", "chicken", "easy", ...],
"source_url": "the URL provided"
}}
RULES:
- ingredients: One item per list entry, including quantities (e.g., "2 cups flour", "1 lb chicken thighs")
- instructions: Numbered steps as separate list entries. Combine multiple short steps if they're fragments.
- tags: 3-8 relevant tags for categorization (meal type, protein, cuisine, difficulty)
- If the page contains multiple recipes, extract the MAIN one
- If no recipe is found, return {{"error": "no recipe found", "source_url": "..."}}
- Strip ALL ad copy, nutrition disclaimers, "jump to recipe" buttons, social sharing text
- Return ONLY the JSON object, no markdown fences
USER: Extract the recipe from this text:
URL: {url}
TEXT:
{text}
RESPONSE (JSON only):"""
def _extract_ld_json(soup: BeautifulSoup) -> dict | None:
"""Extract structured recipe data from ld+json script tags.
Returns:
Recipe dict if found, None otherwise.
"""
for script in soup.find_all("script", type="application/ld+json"):
try:
raw = script.string if script.string else ""
# Remove carriage returns (Windows line endings in strings)
raw = raw.replace("\r", "")
# Fix unescaped newlines inside JSON strings (common recipe site issue)
# We need to escape \n that appear inside quoted string values
raw = _fix_json_newlines(raw)
data = json.loads(raw)
# Handle @graph structure (some sites wrap recipes)
if isinstance(data, dict) and "@graph" in data:
for item in data["@graph"]:
if item.get("@type") == "Recipe":
return item
# Direct Recipe type
if isinstance(data, dict) and data.get("@type") == "Recipe":
return data
# Array of items
if isinstance(data, list):
for item in data:
if isinstance(item, dict) and item.get("@type") == "Recipe":
return item
except (json.JSONDecodeError, TypeError):
continue
return None
def _fix_json_newlines(json_str: str) -> str:
"""Fix unescaped newlines inside JSON string values.
Recipe sites often have multiline strings in their ld+json that aren't
properly escaped. This attempts to fix them by escaping newlines that
appear inside quoted strings.
"""
result = []
in_string = False
i = 0
while i < len(json_str):
c = json_str[i]
if c == '"' and (i == 0 or json_str[i-1] != '\\'):
in_string = not in_string
result.append(c)
elif c == '\n' and in_string:
# Replace unescaped newline with escaped newline
result.append('\\n')
elif c == '\t' and in_string:
# Replace unescaped tab with escaped tab
result.append('\\t')
else:
result.append(c)
i += 1
return ''.join(result)
BROWSERLESS_URL = os.environ.get("BROWSERLESS_URL", "http://127.0.0.1:3000")
Pull from config if available (centralized), otherwise use the fallback above
try:
from costco_route.config import BROWSERLESS_URL as _BL_CFG
BROWSERLESS_URL = _BL_CFG
except ImportError:
pass
def _fetch_via_browserless(url: str, timeout: int = 30) -> str:
"""Fetch a URL through local Browserless container (headless Chrome).
Renders JavaScript, bypasses bot detection, returns full HTML.
Best used to extract ld+json from JS-heavy sites where direct requests fail.
Returns:
Raw HTML string, or empty string on failure.
"""
try:
resp = requests.post(
f"{BROWSERLESS_URL}/content",
json={"url": url},
timeout=timeout,
)
resp.raise_for_status()
content = resp.text.strip()
if content and len(content) > 200:
return content[:50000] # Cap at 50K chars
except requests.RequestException:
pass
return ""
def _fetch_via_jina(url: str, timeout: int = 30) -> str:
"""Fetch a URL through Jina Reader API and return clean markdown.
Jina Reader renders JavaScript server-side and strips ads/nav/cookies,
returning clean Markdown. Much better than BeautifulSoup for JS-heavy sites.
Returns:
Clean markdown text, or empty string on failure.
"""
jina_url = f"https://r.jina.ai/{url}"
headers = {"Accept": "text/markdown"}
try:
resp = requests.get(jina_url, headers=headers, timeout=timeout)
resp.raise_for_status()
content = resp.text.strip()
if content and len(content) > 100: # Skip obviously empty pages
return content[:15000]
except requests.RequestException:
pass
return ""
def _build_text_from_html(soup: BeautifulSoup) -> str:
"""Build clean text from HTML, preserving recipe content.
This is a gentler approach than the original — targets only clear
navigation/ad elements, not broad class/id regexes that strip content.
"""
# Make a copy to avoid modifying original
soup = BeautifulSoup(str(soup), "html.parser")
# Remove obvious non-content elements
for tag in soup.find_all(["script", "style", "nav", "footer", "header", "aside",
"noscript", "iframe", "form"]):
tag.decompose()
# Try to find recipe-specific content areas first
recipe_selectors = [
"[id*='recipe']", "[class*='recipe']",
"[id*='ingredient']", "[class*='ingredient']",
"[id*='direction']", "[class*='direction']",
"[id*='instruction']", "[class*='instruction']",
"article", "main", ".entry-content", ".post-content"
]
recipe_content = ""
for selector in recipe_selectors:
try:
elements = soup.select(selector)
if elements:
for el in elements:
recipe_content += el.get_text(separator="\n", strip=True) + "\n\n"
except Exception:
continue
if recipe_content:
text = recipe_content
else:
# Fallback: use body text
text = soup.get_text(separator="\n", strip=True)
# Collapse excessive whitespace
text = re.sub(r"\n{3,}", "\n\n", text)
return text[:15000] # Cap length
def fetch_recipe(url: str, timeout: int = 30) -> dict:
"""Fetch a recipe URL and extract structured data.
Args:
url: Recipe URL to fetch
timeout: HTTP request timeout in seconds
Returns:
Dict with title, ingredients, instructions, etc.
"""
# Step 1: Fetch HTML — try direct first, then Browserless for JS/blocked sites
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
soup = None
soup_source = None # Track where we got the HTML from
# Try direct HTTP fetch first (fast, cheap)
try:
resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
soup_source = "direct"
except requests.RequestException:
pass
# If direct fetch failed (403/blocked), try Browserless (local headless Chrome)
if soup_source != "direct":
logging.info("Direct fetch failed, trying Browserless for %s", url)
html = _fetch_via_browserless(url, timeout=timeout)
if html:
soup = BeautifulSoup(html, "html.parser")
soup_source = "browserless"
# Step 2: Try ld+json structured data (best quality, zero LLM tokens)
ld_recipe = _extract_ld_json(soup) if soup else None
if ld_recipe:
recipe = {
"title": ld_recipe.get("name", "Untitled"),
"servings": _parse_servings(ld_recipe.get("recipeYield", "")),
"prep_time": _parse_duration(ld_recipe.get("prepTime")),
"cook_time": _parse_duration(ld_recipe.get("cookTime")),
"total_time": _parse_duration(ld_recipe.get("totalTime")),
"ingredients": _clean_ingredients(ld_recipe.get("recipeIngredient", [])),
"instructions": _parse_instructions(ld_recipe.get("recipeInstructions", [])),
"tags": _extract_tags(ld_recipe),
"source_url": url,
"extracted_from": "ld+json",
}
else:
# Tier 2: Jina Reader — renders JS, bypasses Cloudflare/bot detection
text = _fetch_via_jina(url)
extraction_source = "llm_jina" if text else None
# Tier 3: BeautifulSoup text extraction (last resort)
if not text and soup:
text = _build_text_from_html(soup)
extraction_source = "llm_bs4"
if not text:
return {"error": "Could not extract recipe content from page. All fetch methods failed.", "source_url": url}
prompt = RECIPE_FETCH_PROMPT.format(url=url, text=text)
try:
raw = _call_llm(prompt, timeout=120)
except Exception as e:
return {"error": f"LLM extraction failed: {e}", "source_url": url}
# Parse LLM response
raw = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
try:
recipe = json.loads(raw)
except json.JSONDecodeError:
return {"error": "LLM returned invalid JSON", "raw_response": raw[:500], "source_url": url}
if "error" in recipe:
return recipe
recipe["extracted_from"] = extraction_source
# Step 4: Classify ingredients into Costco zones
ingredients = recipe.get("ingredients", [])
if ingredients:
ingredient_text = "\n".join(ingredients)
route = optimize(ingredient_text, use_memory=True, markdown=False)
recipe["zone_map"] = route.get("output", "")
recipe["classified_ingredients"] = route.get("classified", {})
recipe["route_zones"] = list(route.get("classified", {}).keys())
# Step 5: Save to Rolodex
recipe_id = _save_recipe(recipe)
recipe["recipe_id"] = recipe_id
return recipe
def _save_recipe(recipe: dict) -> str:
"""Save a recipe to the Rolodex.
Returns:
Recipe ID (slug-based).
"""
_ensure_recipes_dir()
title = recipe.get("title", "untitled")
slug = re.sub(r"[^a-z0-9]+", "-", title.lower()).strip("-")[:50]
recipe_id = f"{slug}"
filepath = RECIPES_DIR / f"{recipe_id}.json"
# Add metadata
recipe["saved_at"] = datetime.now().isoformat()
recipe["recipe_id"] = recipe_id
with open(filepath, "w") as f:
json.dump(recipe, f, indent=2, ensure_ascii=False)
return recipe_id
def list_recipes() -> list[dict]:
"""List all saved recipes in the Rolodex.
Returns:
List of recipe summary dicts (id, title, tags, servings, saved_at).
"""
_ensure_recipes_dir()
recipes = []
for filepath in sorted(RECIPES_DIR.glob("*.json")):
try:
with open(filepath) as f:
r = json.load(f)
recipes.append({
"id": r.get("recipe_id", filepath.stem),
"title": r.get("title", "Unknown"),
"tags": r.get("tags", []),
"servings": r.get("servings"),
"saved_at": r.get("saved_at"),
"ingredient_count": len(r.get("ingredients", [])),
})
except (json.JSONDecodeError, KeyError):
continue
return recipes
def get_recipe(recipe_id: str) -> dict | None:
"""Load a specific recipe by ID.
Args:
recipe_id: Recipe ID (slug from title)
Returns:
Full recipe dict or None if not found.
"""
_ensure_recipes_dir()
filepath = RECIPES_DIR / f"{recipe_id}.json"
if not filepath.exists():
return None
with open(filepath) as f:
return json.load(f)
def delete_recipe(recipe_id: str) -> bool:
"""Delete a recipe from the Rolodex.
Returns:
True if deleted, False if not found.
"""
filepath = RECIPES_DIR / f"{recipe_id}.json"
if filepath.exists():
filepath.unlink()
return True
return False
def format_recipe(recipe: dict, include_zones: bool = True) -> str:
"""Format a recipe for display (Telegram-friendly).
Args:
recipe: Recipe dict from fetch_recipe() or get_recipe()
include_zones: Whether to include Costco zone classification
Returns:
Formatted string.
"""
lines = []
title = recipe.get("title", "Untitled Recipe")
lines.append(f"🍳 **{title}**")
if recipe.get("servings"):
lines.append(f"👥 Serves {recipe['servings']}")
times = []
for t in ("prep_time", "cook_time", "total_time"):
if recipe.get(t):
label = t.replace("_", " ").title()
times.append(f"{label}: {recipe[t]}")
if times:
lines.append("⏱ " + " | ".join(times))
lines.append("")
ingredients = recipe.get("ingredients", [])
if ingredients:
lines.append("**Ingredients:**")
for ing in ingredients:
lines.append(f" • {ing}")
instructions = recipe.get("instructions", [])
if instructions:
lines.append("")
lines.append("**Instructions:**")
for i, step in enumerate(instructions, 1):
lines.append(f" {i}. {step}")
if include_zones and recipe.get("zone_map"):
lines.append("")
lines.append("**Costco Route:**")
lines.append(recipe["zone_map"])
tags = recipe.get("tags", [])
if tags:
lines.append("")
lines.append(f"🏷 {' | '.join(tags)}")
source = recipe.get("source_url")
if source:
lines.append(f"🔗 {source}")
return "\n".join(lines)