"""Three-step waterfall recipe extractor.
Step 1: JSON-LD Sniper — deterministic, zero LLM, ~95% hit rate
Step 2: HTML Chunk → LLM — heuristic extraction with 15s timeout
Step 3: Hard timeout guard — never hang beyond 30s total
Zero domain checks. Schema.org extraction only.
"""
import asyncio
import json
import logging
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import requests
from bs4 import BeautifulSoup
logger = logging.getLogger(name)
---------------------------------------------------------------------------
Result types
---------------------------------------------------------------------------
class ExtractionMethod(str, Enum):
"""How the recipe was extracted."""
JSON_LD = "json_ld"
HTML_CHUNK_LLM = "html_chunk_llm"
JINA_LLM = "jina_llm"
FAILED = "failed"
@dataclass
class RecipeResult:
"""Structured recipe extraction result."""
title: str = ""
servings: Optional[str] = None
prep_time: Optional[str] = None
cook_time: Optional[str] = None
total_time: Optional[str] = None
ingredients: list[str] = field(default_factory=list)
instructions: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
source_url: str = ""
extraction_method: ExtractionMethod = ExtractionMethod.FAILED
error: Optional[str] = None
def to_dict(self) -> dict:
return {
"title": self.title,
"servings": self.servings,
"prep_time": self.prep_time,
"cook_time": self.cook_time,
"total_time": self.total_time,
"ingredients": self.ingredients,
"instructions": self.instructions,
"tags": self.tags,
"source_url": self.source_url,
"extracted_from": self.extraction_method.value if isinstance(self.extraction_method, ExtractionMethod) else self.extraction_method,
**({"error": self.error} if self.error else {}),
}
---------------------------------------------------------------------------
Step 1: JSON-LD Sniper (deterministic, zero LLM)
---------------------------------------------------------------------------
def extract_json_ld(html: str) -> Optional[dict]:
"""Extract Recipe schema from ld+json script tags.
Handles:
- Direct {"@type": "Recipe"} objects
- @graph arrays containing Recipe items
- Lists of ld+json blocks
- Common JSON breakage (unescaped newlines in strings)
Returns:
Raw Recipe ld+json dict if found, None otherwise.
"""
soup = BeautifulSoup(html, "html.parser")
for script in soup.find_all("script", type="application/ld+json"):
if not script.string:
continue
try:
raw = script.string
# Fix common recipe site JSON breakage
raw = raw.replace("\r", "")
raw = _fix_json_newlines(raw)
data = json.loads(raw)
except (json.JSONDecodeError, TypeError):
continue
# Walk all possible structures to find a Recipe
candidates = _flatten_ld_json(data)
for item in candidates:
if _is_recipe_type(item):
return item
return None
def _flatten_ld_json(data) -> list[dict]:
"""Flatten ld+json data into a flat list of candidate dicts.
Handles: dict, list, @graph arrays, nested @graph.
"""
items = []
if isinstance(data, list):
for entry in data:
items.extend(_flatten_ld_json(entry))
elif isinstance(data, dict):
items.append(data)
if "@graph" in data:
for graph_item in data["@graph"]:
items.extend(_flatten_ld_json(graph_item))
return items
def _is_recipe_type(item: dict) -> bool:
"""Check if an ld+json item is a Recipe type."""
if not isinstance(item, dict):
return False
type_val = item.get("@type", "")
if isinstance(type_val, str):
return type_val == "Recipe"
if isinstance(type_val, list):
return "Recipe" in type_val
return False
def _fix_json_newlines(json_str: str) -> str:
"""Fix unescaped newlines/tabs inside JSON string values.
Recipe sites frequently have multiline strings in their ld+json
that aren't properly escaped. This escapes them.
"""
result = []
in_string = False
i = 0
while i < len(json_str):
c = json_str[i]
if c == '"' and (i == 0 or json_str[i - 1] != "\\"):
in_string = not in_string
result.append(c)
elif c == "\n" and in_string:
result.append("\\n")
elif c == "\t" and in_string:
result.append("\\t")
else:
result.append(c)
i += 1
return "".join(result)
---------------------------------------------------------------------------
Step 1b: Parse ld+json Recipe dict into RecipeResult
---------------------------------------------------------------------------
def _parse_servings(yield_str) -> Optional[str]:
"""Parse recipeYield into clean servings string."""
if not yield_str:
return None
yield_str = str(yield_str).strip()
if match := re.search(r"(?i)(serves?\s+)?(\d+[\s-]\d)", yield_str):
return match.group(2).strip()
return yield_str if yield_str else None
def _parse_duration(iso_duration) -> Optional[str]:
"""Parse ISO 8601 duration (PT30M) into readable string."""
if not iso_duration:
return None
iso_str = str(iso_duration)
match = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?", iso_str)
if not match:
return iso_str
hours, minutes = match.groups()
parts = []
if hours:
parts.append(f"{hours} hr")
if minutes:
parts.append(f"{minutes} min")
return " ".join(parts) if parts else None
def _clean_ingredients(ingredients: list) -> list[str]:
"""Clean ingredient strings — strip newlines, collapse whitespace."""
cleaned = []
for ing in ingredients:
if ing:
cleaned.append(
re.sub(r"\s+", " ", str(ing).replace("\r", "").replace("\n", " ")).strip()
)
return [i for i in cleaned if i]
def _parse_instructions(instructions) -> list[str]:
"""Parse recipeInstructions from ld+json (HowToStep, HowToSection, or plain text)."""
if isinstance(instructions, str):
steps = re.split(r"\n+|\d+)\s*", instructions)
return [s.strip() for s in steps if s.strip()]
if not isinstance(instructions, list):
return []
result = []
for step in instructions:
if isinstance(step, dict):
# HowToSection — flatten its steps
if step.get("@type") == "HowToSection" and "itemListElement" in step:
for sub_step in step["itemListElement"]:
text = sub_step.get("text", sub_step.get("name", ""))
if text:
result.append(str(text).strip())
else:
# Regular HowToStep
text = step.get("text", step.get("name", ""))
if text:
result.append(str(text).strip())
elif isinstance(step, str):
result.append(step.strip())
return result
def _extract_tags(ld_recipe: dict) -> list[str]:
"""Extract tags from recipeCategory, recipeCuisine, keywords."""
tags = []
for field in ["recipeCategory", "recipeCuisine", "keywords"]:
val = ld_recipe.get(field)
if val:
if isinstance(val, str):
tags.extend([t.strip() for t in val.split(",") if t.strip()])
elif isinstance(val, list):
tags.extend([str(v).strip() for v in val if v])
# Infer dinner tag from title
name = ld_recipe.get("name", "").lower()
if any(w in name for w in ["chicken", "beef", "pork", "fish"]):
tags.append("dinner")
return list(set(tags))[:8]
def recipe_from_json_ld(ld_recipe: dict, source_url: str = "") -> RecipeResult:
"""Convert a parsed ld+json Recipe dict into a RecipeResult."""
return RecipeResult(
title=ld_recipe.get("name", "Untitled"),
servings=_parse_servings(ld_recipe.get("recipeYield", "")),
prep_time=_parse_duration(ld_recipe.get("prepTime")),
cook_time=_parse_duration(ld_recipe.get("cookTime")),
total_time=_parse_duration(ld_recipe.get("totalTime")),
ingredients=_clean_ingredients(ld_recipe.get("recipeIngredient", [])),
instructions=_parse_instructions(ld_recipe.get("recipeInstructions", [])),
tags=_extract_tags(ld_recipe),
source_url=source_url,
extraction_method=ExtractionMethod.JSON_LD,
)
---------------------------------------------------------------------------
Step 2: HTML Chunk Extraction (heuristic + LLM)
---------------------------------------------------------------------------
RECIPE_LLM_PROMPT = """SYSTEM: You are a recipe extraction engine. Extract the recipe from the following text, which was scraped from a recipe website.
The text will contain ads, navigation, stories, and other garbage. Ignore ALL of that. Extract ONLY the recipe.
Return a JSON object with this exact structure:
{{
"title": "Recipe title",
"servings": "Number of servings (e.g., '4' or '6-8')",
"prep_time": "Prep time if stated (e.g., '15 min'), null if not found",
"cook_time": "Cook time if stated (e.g., '30 min'), null if not found",
"total_time": "Total time if stated, null if not found",
"ingredients": ["ingredient 1", "ingredient 2", ...],
"instructions": ["step 1", "step 2", ...],
"tags": ["dinner", "chicken", "easy", ...],
"source_url": "the URL provided"
}}
RULES:
- ingredients: One item per list entry, including quantities (e.g., "2 cups flour", "1 lb chicken thighs")
- instructions: Numbered steps as separate list entries. Combine multiple short steps if they're fragments.
- tags: 3-8 relevant tags for categorization (meal type, protein, cuisine, difficulty)
- If the page contains multiple recipes, extract the MAIN one
- If no recipe is found, return {{"error": "no recipe found", "source_url": "..."}}
- Strip ALL ad copy, nutrition disclaimers, "jump to recipe" buttons, social sharing text
- Return ONLY the JSON object, no markdown fences
USER: Extract the recipe from this text:
URL: {url}
TEXT:
{text}
RESPONSE (JSON only):"""
def _extract_html_chunk(html: str) -> str:
"""Extract the main content area from HTML using heuristic selectors.
Targets recipe-specific containers, then falls back to article/main.
Strips nav, ads, footer, and other noise.
"""
soup = BeautifulSoup(html, "html.parser")
# Remove noise
for tag in soup.find_all(["script", "style", "nav", "footer", "header", "aside",
"noscript", "iframe", "form"]):
tag.decompose()
# Try recipe-specific selectors first (ordered by specificity)
recipe_selectors = [
"[id*='recipe']", "[class*='recipe']",
"[id*='ingredient']", "[class*='ingredient']",
"[id*='direction']", "[class*='direction']",
"[id*='instruction']", "[class*='instruction']",
"article", "main", ".entry-content", ".post-content",
]
recipe_content = ""
for selector in recipe_selectors:
try:
elements = soup.select(selector)
if elements:
for el in elements:
recipe_content += el.get_text(separator="\n", strip=True) + "\n\n"
except Exception:
continue
if recipe_content:
text = recipe_content
else:
# Fallback: full body
text = soup.get_text(separator="\n", strip=True)
# Collapse excessive whitespace
text = re.sub(r"\n{3,}", "\n\n", text)
return text[:15000]
---------------------------------------------------------------------------
URL Fetching
---------------------------------------------------------------------------
Browserless for JS-heavy / blocked sites
BROWSERLESS_URL = os.environ.get("BROWSERLESS_URL", "http://127.0.0.1:3000") \
if False else "" # Will be imported from config
def _fetch_html_direct(url: str, timeout: int = 15) -> Optional[str]:
"""Fetch HTML via direct HTTP request. Fast, cheap, no JS rendering."""
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,/;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
try:
resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.text
except requests.RequestException:
return None
def _fetch_via_browserless(url: str, timeout: int = 15) -> Optional[str]:
"""Fetch a URL through Browserless container (headless Chrome).
Renders JavaScript, bypasses bot detection.
"""
try:
from costco_route.config import BROWSERLESS_URL as BL_URL
bl_url = BL_URL
except ImportError:
bl_url = "http://127.0.0.1:3000"
try:
resp = requests.post(
f"{bl_url}/content",
json={"url": url},
timeout=timeout,
)
resp.raise_for_status()
content = resp.text.strip()
if content and len(content) > 200:
return content[:50000]
except requests.RequestException:
pass
return None
def _fetch_via_jina(url: str, timeout: int = 15) -> Optional[str]:
"""Fetch a URL through Jina Reader API — renders JS, strips noise, returns Markdown."""
jina_url = f"https://r.jina.ai/{url}"
headers = {"Accept": "text/markdown"}
try:
resp = requests.get(jina_url, headers=headers, timeout=timeout)
resp.raise_for_status()
content = resp.text.strip()
if content and len(content) > 100:
return content[:15000]
except requests.RequestException:
pass
return None
---------------------------------------------------------------------------
LLM Client (async-capable, with timeout)
---------------------------------------------------------------------------
def _call_llm_sync(prompt: str, timeout: int = 15) -> str:
"""Call LLM synchronously with a hard timeout.
Uses the existing costco_route LLM client.
"""
from costco_route.llm_client import _call_llm
try:
return _call_llm(prompt, timeout=timeout)
except Exception as e:
raise RuntimeError(f"LLM extraction failed: {e}") from e
async def _call_llm_async(prompt: str, timeout: int = 15) -> str:
"""Call LLM asynchronously with asyncio timeout guard."""
loop = asyncio.get_event_loop()
try:
result = await asyncio.wait_for(
loop.run_in_executor(None, _call_llm_sync, prompt, timeout + 10),
timeout=float(timeout),
)
return result
except asyncio.TimeoutError:
raise TimeoutError(f"LLM extraction timed out after {timeout}s")
---------------------------------------------------------------------------
The Waterfall
---------------------------------------------------------------------------
def extract_recipe(url: str, total_timeout: int = 30) -> RecipeResult:
"""Three-step waterfall recipe extraction.
Step 1: JSON-LD Sniper — deterministic, zero LLM
Step 2: HTML Chunk → LLM — heuristic extraction
Step 3: Hard timeout — never hang
Args:
url: Recipe URL to extract
total_timeout: Overall timeout in seconds (default 30)
Returns:
RecipeResult with extraction details
"""
import time as _time
start = _time.monotonic()
# ---- Step 0: Fetch HTML ----
html = _fetch_html_direct(url, timeout=15)
fetch_method = "direct"
if not html:
logger.info("Direct fetch failed for %s, trying Browserless", url)
html = _fetch_via_browserless(url, timeout=15)
fetch_method = "browserless"
if not html:
# Total timeout check before expensive fallback
elapsed = _time.monotonic() - start
remaining = total_timeout - elapsed
if remaining < 10:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error=f"Could not fetch HTML. All methods failed (elapsed: {elapsed:.1f}s)",
)
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error="Could not fetch HTML. All methods failed.",
)
# ---- Step 1: JSON-LD Sniper (zero LLM) ----
ld_recipe = extract_json_ld(html)
if ld_recipe:
logger.info("JSON-LD extraction succeeded for %s", url)
result = recipe_from_json_ld(ld_recipe, source_url=url)
elapsed = _time.monotonic() - start
logger.info("JSON-LD extraction took %.2fs", elapsed)
return result
# ---- Step 2: HTML Chunk + LLM (with timeout) ----
logger.info("JSON-LD not found for %s, falling back to HTML+LLM", url)
elapsed = _time.monotonic() - start
remaining = total_timeout - elapsed
if remaining < 5:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error=f"Timeout budget exhausted after JSON-LD attempt ({elapsed:.1f}s elapsed)",
)
# Try Jina first (better for JS-heavy sites), then BS4
text = None
llm_source = None
# Jina Reader (renders JS, strips noise)
text = _fetch_via_jina(url, timeout=min(10, int(remaining)))
if text:
llm_source = ExtractionMethod.JINA_LLM
# BS4 fallback (from already-fetched HTML)
if not text:
text = _extract_html_chunk(html)
llm_source = ExtractionMethod.HTML_CHUNK_LLM
if not text:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error="Could not extract recipe content from page. All methods failed.",
)
# LLM extraction with remaining timeout (cap at 15s)
llm_timeout = min(15, int(remaining - 2))
if llm_timeout < 3:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error=f"Insufficient time for LLM extraction ({remaining:.1f}s remaining)",
)
prompt = RECIPE_LLM_PROMPT.format(url=url, text=text)
try:
raw = _call_llm_sync(prompt, timeout=llm_timeout)
except (TimeoutError, RuntimeError) as e:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error=str(e),
)
# Parse LLM response
raw = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
try:
recipe_data = json.loads(raw)
except json.JSONDecodeError:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error="LLM returned invalid JSON",
)
if "error" in recipe_data:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error=recipe_data.get("error", "Unknown error from LLM"),
)
elapsed = _time.monotonic() - start
logger.info("LLM extraction took %.2fs total", elapsed)
return RecipeResult(
title=recipe_data.get("title", "Untitled"),
servings=recipe_data.get("servings"),
prep_time=recipe_data.get("prep_time"),
cook_time=recipe_data.get("cook_time"),
total_time=recipe_data.get("total_time"),
ingredients=recipe_data.get("ingredients", []),
instructions=recipe_data.get("instructions", []),
tags=recipe_data.get("tags", []),
source_url=url,
extraction_method=llm_source,
)
async def extract_recipe_async(url: str, total_timeout: int = 30) -> RecipeResult:
"""Async version of the waterfall with proper asyncio timeout guards."""
import time as _time
start = _time.monotonic()
# ---- Step 0: Fetch HTML ----
loop = asyncio.get_event_loop()
html = await loop.run_in_executor(None, _fetch_html_direct, url, 15)
fetch_method = "direct"
if not html:
logger.info("Direct fetch failed for %s, trying Browserless", url)
html = await loop.run_in_executor(None, _fetch_via_browserless, url, 15)
fetch_method = "browserless"
if not html:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error="Could not fetch HTML. All methods failed.",
)
# ---- Step 1: JSON-LD Sniper (zero LLM) ----
ld_recipe = await loop.run_in_executor(None, extract_json_ld, html)
if ld_recipe:
logger.info("JSON-LD extraction succeeded for %s", url)
result = recipe_from_json_ld(ld_recipe, source_url=url)
return result
# ---- Step 2: HTML Chunk + LLM (with timeout) ----
logger.info("JSON-LD not found for %s, falling back to HTML+LLM", url)
elapsed = _time.monotonic() - start
remaining = total_timeout - elapsed
if remaining < 5:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error=f"Timeout budget exhausted ({elapsed:.1f}s elapsed)",
)
text = None
llm_source = None
text = await loop.run_in_executor(None, _fetch_via_jina, url, min(10, int(remaining)))
if text:
llm_source = ExtractionMethod.JINA_LLM
if not text:
text = _extract_html_chunk(html)
llm_source = ExtractionMethod.HTML_CHUNK_LLM
if not text:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error="Could not extract recipe content from page. All methods failed.",
)
# LLM with hard 15s timeout
llm_timeout = min(15.0, remaining - 2)
if llm_timeout < 3:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error=f"Insufficient time for LLM extraction ({remaining:.1f}s remaining)",
)
prompt = RECIPE_LLM_PROMPT.format(url=url, text=text)
try:
raw = await _call_llm_async(prompt, timeout=int(llm_timeout))
except (asyncio.TimeoutError, TimeoutError) as e:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error=f"Extraction timed out after {llm_timeout:.0f}s. Site may be slow or blocking.",
)
except Exception as e:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error=str(e),
)
raw = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
try:
recipe_data = json.loads(raw)
except json.JSONDecodeError:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error="LLM returned invalid JSON",
)
if "error" in recipe_data:
return RecipeResult(
source_url=url,
extraction_method=ExtractionMethod.FAILED,
error=recipe_data.get("error", "Unknown error from LLM"),
)
return RecipeResult(
title=recipe_data.get("title", "Untitled"),
servings=recipe_data.get("servings"),
prep_time=recipe_data.get("prep_time"),
cook_time=recipe_data.get("cook_time"),
total_time=recipe_data.get("total_time"),
ingredients=recipe_data.get("ingredients", []),
instructions=recipe_data.get("instructions", []),
tags=recipe_data.get("tags", []),
source_url=url,
extraction_method=llm_source,
)
Backward-compatible alias
def extract_recipe_waterfall(url: str, total_timeout: int = 30) -> RecipeResult:
"""Alias for extract_recipe() — the waterfall is now the default."""
return extract_recipe(url, total_timeout=total_timeout)