"""Three-step waterfall recipe extractor. Step 1: JSON-LD Sniper — deterministic, zero LLM, ~95% hit rate Step 2: HTML Chunk → LLM — heuristic extraction with 15s timeout Step 3: Hard timeout guard — never hang beyond 30s total Zero domain checks. Schema.org extraction only. """ import asyncio import json import logging import re from dataclasses import dataclass, field from enum import Enum from typing import Optional import requests from bs4 import BeautifulSoup logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Result types # --------------------------------------------------------------------------- class ExtractionMethod(str, Enum): """How the recipe was extracted.""" JSON_LD = "json_ld" HTML_CHUNK_LLM = "html_chunk_llm" JINA_LLM = "jina_llm" FAILED = "failed" @dataclass class RecipeResult: """Structured recipe extraction result.""" title: str = "" servings: Optional[str] = None prep_time: Optional[str] = None cook_time: Optional[str] = None total_time: Optional[str] = None ingredients: list[str] = field(default_factory=list) instructions: list[str] = field(default_factory=list) tags: list[str] = field(default_factory=list) source_url: str = "" extraction_method: ExtractionMethod = ExtractionMethod.FAILED error: Optional[str] = None def to_dict(self) -> dict: return { "title": self.title, "servings": self.servings, "prep_time": self.prep_time, "cook_time": self.cook_time, "total_time": self.total_time, "ingredients": self.ingredients, "instructions": self.instructions, "tags": self.tags, "source_url": self.source_url, "extracted_from": self.extraction_method.value if isinstance(self.extraction_method, ExtractionMethod) else self.extraction_method, **({"error": self.error} if self.error else {}), } # --------------------------------------------------------------------------- # Step 1: JSON-LD Sniper (deterministic, zero LLM) # --------------------------------------------------------------------------- def extract_json_ld(html: str) -> Optional[dict]: """Extract Recipe schema from ld+json script tags. Handles: - Direct {"@type": "Recipe"} objects - @graph arrays containing Recipe items - Lists of ld+json blocks - Common JSON breakage (unescaped newlines in strings) Returns: Raw Recipe ld+json dict if found, None otherwise. """ soup = BeautifulSoup(html, "html.parser") for script in soup.find_all("script", type="application/ld+json"): if not script.string: continue try: raw = script.string # Fix common recipe site JSON breakage raw = raw.replace("\r", "") raw = _fix_json_newlines(raw) data = json.loads(raw) except (json.JSONDecodeError, TypeError): continue # Walk all possible structures to find a Recipe candidates = _flatten_ld_json(data) for item in candidates: if _is_recipe_type(item): return item return None def _flatten_ld_json(data) -> list[dict]: """Flatten ld+json data into a flat list of candidate dicts. Handles: dict, list, @graph arrays, nested @graph. """ items = [] if isinstance(data, list): for entry in data: items.extend(_flatten_ld_json(entry)) elif isinstance(data, dict): items.append(data) if "@graph" in data: for graph_item in data["@graph"]: items.extend(_flatten_ld_json(graph_item)) return items def _is_recipe_type(item: dict) -> bool: """Check if an ld+json item is a Recipe type.""" if not isinstance(item, dict): return False type_val = item.get("@type", "") if isinstance(type_val, str): return type_val == "Recipe" if isinstance(type_val, list): return "Recipe" in type_val return False def _fix_json_newlines(json_str: str) -> str: """Fix unescaped newlines/tabs inside JSON string values. Recipe sites frequently have multiline strings in their ld+json that aren't properly escaped. This escapes them. """ result = [] in_string = False i = 0 while i < len(json_str): c = json_str[i] if c == '"' and (i == 0 or json_str[i - 1] != "\\"): in_string = not in_string result.append(c) elif c == "\n" and in_string: result.append("\\n") elif c == "\t" and in_string: result.append("\\t") else: result.append(c) i += 1 return "".join(result) # --------------------------------------------------------------------------- # Step 1b: Parse ld+json Recipe dict into RecipeResult # --------------------------------------------------------------------------- def _parse_servings(yield_str) -> Optional[str]: """Parse recipeYield into clean servings string.""" if not yield_str: return None yield_str = str(yield_str).strip() if match := re.search(r"(?i)(serves?\s+)?(\d+[\s\-]*\d*)", yield_str): return match.group(2).strip() return yield_str if yield_str else None def _parse_duration(iso_duration) -> Optional[str]: """Parse ISO 8601 duration (PT30M) into readable string.""" if not iso_duration: return None iso_str = str(iso_duration) match = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?", iso_str) if not match: return iso_str hours, minutes = match.groups() parts = [] if hours: parts.append(f"{hours} hr") if minutes: parts.append(f"{minutes} min") return " ".join(parts) if parts else None def _clean_ingredients(ingredients: list) -> list[str]: """Clean ingredient strings — strip newlines, collapse whitespace.""" cleaned = [] for ing in ingredients: if ing: cleaned.append( re.sub(r"\s+", " ", str(ing).replace("\r", "").replace("\n", " ")).strip() ) return [i for i in cleaned if i] def _parse_instructions(instructions) -> list[str]: """Parse recipeInstructions from ld+json (HowToStep, HowToSection, or plain text).""" if isinstance(instructions, str): steps = re.split(r"\n+|\d+\)\s*", instructions) return [s.strip() for s in steps if s.strip()] if not isinstance(instructions, list): return [] result = [] for step in instructions: if isinstance(step, dict): # HowToSection — flatten its steps if step.get("@type") == "HowToSection" and "itemListElement" in step: for sub_step in step["itemListElement"]: text = sub_step.get("text", sub_step.get("name", "")) if text: result.append(str(text).strip()) else: # Regular HowToStep text = step.get("text", step.get("name", "")) if text: result.append(str(text).strip()) elif isinstance(step, str): result.append(step.strip()) return result def _extract_tags(ld_recipe: dict) -> list[str]: """Extract tags from recipeCategory, recipeCuisine, keywords.""" tags = [] for field in ["recipeCategory", "recipeCuisine", "keywords"]: val = ld_recipe.get(field) if val: if isinstance(val, str): tags.extend([t.strip() for t in val.split(",") if t.strip()]) elif isinstance(val, list): tags.extend([str(v).strip() for v in val if v]) # Infer dinner tag from title name = ld_recipe.get("name", "").lower() if any(w in name for w in ["chicken", "beef", "pork", "fish"]): tags.append("dinner") return list(set(tags))[:8] def recipe_from_json_ld(ld_recipe: dict, source_url: str = "") -> RecipeResult: """Convert a parsed ld+json Recipe dict into a RecipeResult.""" return RecipeResult( title=ld_recipe.get("name", "Untitled"), servings=_parse_servings(ld_recipe.get("recipeYield", "")), prep_time=_parse_duration(ld_recipe.get("prepTime")), cook_time=_parse_duration(ld_recipe.get("cookTime")), total_time=_parse_duration(ld_recipe.get("totalTime")), ingredients=_clean_ingredients(ld_recipe.get("recipeIngredient", [])), instructions=_parse_instructions(ld_recipe.get("recipeInstructions", [])), tags=_extract_tags(ld_recipe), source_url=source_url, extraction_method=ExtractionMethod.JSON_LD, ) # --------------------------------------------------------------------------- # Step 2: HTML Chunk Extraction (heuristic + LLM) # --------------------------------------------------------------------------- RECIPE_LLM_PROMPT = """SYSTEM: You are a recipe extraction engine. Extract the recipe from the following text, which was scraped from a recipe website. The text will contain ads, navigation, stories, and other garbage. Ignore ALL of that. Extract ONLY the recipe. Return a JSON object with this exact structure: {{ "title": "Recipe title", "servings": "Number of servings (e.g., '4' or '6-8')", "prep_time": "Prep time if stated (e.g., '15 min'), null if not found", "cook_time": "Cook time if stated (e.g., '30 min'), null if not found", "total_time": "Total time if stated, null if not found", "ingredients": ["ingredient 1", "ingredient 2", ...], "instructions": ["step 1", "step 2", ...], "tags": ["dinner", "chicken", "easy", ...], "source_url": "the URL provided" }} RULES: - ingredients: One item per list entry, including quantities (e.g., "2 cups flour", "1 lb chicken thighs") - instructions: Numbered steps as separate list entries. Combine multiple short steps if they're fragments. - tags: 3-8 relevant tags for categorization (meal type, protein, cuisine, difficulty) - If the page contains multiple recipes, extract the MAIN one - If no recipe is found, return {{"error": "no recipe found", "source_url": "..."}} - Strip ALL ad copy, nutrition disclaimers, "jump to recipe" buttons, social sharing text - Return ONLY the JSON object, no markdown fences USER: Extract the recipe from this text: URL: {url} TEXT: {text} RESPONSE (JSON only):""" def _extract_html_chunk(html: str) -> str: """Extract the main content area from HTML using heuristic selectors. Targets recipe-specific containers, then falls back to article/main. Strips nav, ads, footer, and other noise. """ soup = BeautifulSoup(html, "html.parser") # Remove noise for tag in soup.find_all(["script", "style", "nav", "footer", "header", "aside", "noscript", "iframe", "form"]): tag.decompose() # Try recipe-specific selectors first (ordered by specificity) recipe_selectors = [ "[id*='recipe']", "[class*='recipe']", "[id*='ingredient']", "[class*='ingredient']", "[id*='direction']", "[class*='direction']", "[id*='instruction']", "[class*='instruction']", "article", "main", ".entry-content", ".post-content", ] recipe_content = "" for selector in recipe_selectors: try: elements = soup.select(selector) if elements: for el in elements: recipe_content += el.get_text(separator="\n", strip=True) + "\n\n" except Exception: continue if recipe_content: text = recipe_content else: # Fallback: full body text = soup.get_text(separator="\n", strip=True) # Collapse excessive whitespace text = re.sub(r"\n{3,}", "\n\n", text) return text[:15000] # --------------------------------------------------------------------------- # URL Fetching # --------------------------------------------------------------------------- # Browserless for JS-heavy / blocked sites BROWSERLESS_URL = os.environ.get("BROWSERLESS_URL", "http://127.0.0.1:3000") \ if False else "" # Will be imported from config def _fetch_html_direct(url: str, timeout: int = 15) -> Optional[str]: """Fetch HTML via direct HTTP request. Fast, cheap, no JS rendering.""" headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", } try: resp = requests.get(url, headers=headers, timeout=timeout) resp.raise_for_status() return resp.text except requests.RequestException: return None def _fetch_via_browserless(url: str, timeout: int = 15) -> Optional[str]: """Fetch a URL through Browserless container (headless Chrome). Renders JavaScript, bypasses bot detection. """ try: from costco_route.config import BROWSERLESS_URL as BL_URL bl_url = BL_URL except ImportError: bl_url = "http://127.0.0.1:3000" try: resp = requests.post( f"{bl_url}/content", json={"url": url}, timeout=timeout, ) resp.raise_for_status() content = resp.text.strip() if content and len(content) > 200: return content[:50000] except requests.RequestException: pass return None def _fetch_via_jina(url: str, timeout: int = 15) -> Optional[str]: """Fetch a URL through Jina Reader API — renders JS, strips noise, returns Markdown.""" jina_url = f"https://r.jina.ai/{url}" headers = {"Accept": "text/markdown"} try: resp = requests.get(jina_url, headers=headers, timeout=timeout) resp.raise_for_status() content = resp.text.strip() if content and len(content) > 100: return content[:15000] except requests.RequestException: pass return None # --------------------------------------------------------------------------- # LLM Client (async-capable, with timeout) # --------------------------------------------------------------------------- def _call_llm_sync(prompt: str, timeout: int = 15) -> str: """Call LLM synchronously with a hard timeout. Uses the existing costco_route LLM client. """ from costco_route.llm_client import _call_llm try: return _call_llm(prompt, timeout=timeout) except Exception as e: raise RuntimeError(f"LLM extraction failed: {e}") from e async def _call_llm_async(prompt: str, timeout: int = 15) -> str: """Call LLM asynchronously with asyncio timeout guard.""" loop = asyncio.get_event_loop() try: result = await asyncio.wait_for( loop.run_in_executor(None, _call_llm_sync, prompt, timeout + 10), timeout=float(timeout), ) return result except asyncio.TimeoutError: raise TimeoutError(f"LLM extraction timed out after {timeout}s") # --------------------------------------------------------------------------- # The Waterfall # --------------------------------------------------------------------------- def extract_recipe(url: str, total_timeout: int = 30) -> RecipeResult: """Three-step waterfall recipe extraction. Step 1: JSON-LD Sniper — deterministic, zero LLM Step 2: HTML Chunk → LLM — heuristic extraction Step 3: Hard timeout — never hang Args: url: Recipe URL to extract total_timeout: Overall timeout in seconds (default 30) Returns: RecipeResult with extraction details """ import time as _time start = _time.monotonic() # ---- Step 0: Fetch HTML ---- html = _fetch_html_direct(url, timeout=15) fetch_method = "direct" if not html: logger.info("Direct fetch failed for %s, trying Browserless", url) html = _fetch_via_browserless(url, timeout=15) fetch_method = "browserless" if not html: # Total timeout check before expensive fallback elapsed = _time.monotonic() - start remaining = total_timeout - elapsed if remaining < 10: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error=f"Could not fetch HTML. All methods failed (elapsed: {elapsed:.1f}s)", ) return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error="Could not fetch HTML. All methods failed.", ) # ---- Step 1: JSON-LD Sniper (zero LLM) ---- ld_recipe = extract_json_ld(html) if ld_recipe: logger.info("JSON-LD extraction succeeded for %s", url) result = recipe_from_json_ld(ld_recipe, source_url=url) elapsed = _time.monotonic() - start logger.info("JSON-LD extraction took %.2fs", elapsed) return result # ---- Step 2: HTML Chunk + LLM (with timeout) ---- logger.info("JSON-LD not found for %s, falling back to HTML+LLM", url) elapsed = _time.monotonic() - start remaining = total_timeout - elapsed if remaining < 5: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error=f"Timeout budget exhausted after JSON-LD attempt ({elapsed:.1f}s elapsed)", ) # Try Jina first (better for JS-heavy sites), then BS4 text = None llm_source = None # Jina Reader (renders JS, strips noise) text = _fetch_via_jina(url, timeout=min(10, int(remaining))) if text: llm_source = ExtractionMethod.JINA_LLM # BS4 fallback (from already-fetched HTML) if not text: text = _extract_html_chunk(html) llm_source = ExtractionMethod.HTML_CHUNK_LLM if not text: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error="Could not extract recipe content from page. All methods failed.", ) # LLM extraction with remaining timeout (cap at 15s) llm_timeout = min(15, int(remaining - 2)) if llm_timeout < 3: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error=f"Insufficient time for LLM extraction ({remaining:.1f}s remaining)", ) prompt = RECIPE_LLM_PROMPT.format(url=url, text=text) try: raw = _call_llm_sync(prompt, timeout=llm_timeout) except (TimeoutError, RuntimeError) as e: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error=str(e), ) # Parse LLM response raw = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() try: recipe_data = json.loads(raw) except json.JSONDecodeError: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error="LLM returned invalid JSON", ) if "error" in recipe_data: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error=recipe_data.get("error", "Unknown error from LLM"), ) elapsed = _time.monotonic() - start logger.info("LLM extraction took %.2fs total", elapsed) return RecipeResult( title=recipe_data.get("title", "Untitled"), servings=recipe_data.get("servings"), prep_time=recipe_data.get("prep_time"), cook_time=recipe_data.get("cook_time"), total_time=recipe_data.get("total_time"), ingredients=recipe_data.get("ingredients", []), instructions=recipe_data.get("instructions", []), tags=recipe_data.get("tags", []), source_url=url, extraction_method=llm_source, ) async def extract_recipe_async(url: str, total_timeout: int = 30) -> RecipeResult: """Async version of the waterfall with proper asyncio timeout guards.""" import time as _time start = _time.monotonic() # ---- Step 0: Fetch HTML ---- loop = asyncio.get_event_loop() html = await loop.run_in_executor(None, _fetch_html_direct, url, 15) fetch_method = "direct" if not html: logger.info("Direct fetch failed for %s, trying Browserless", url) html = await loop.run_in_executor(None, _fetch_via_browserless, url, 15) fetch_method = "browserless" if not html: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error="Could not fetch HTML. All methods failed.", ) # ---- Step 1: JSON-LD Sniper (zero LLM) ---- ld_recipe = await loop.run_in_executor(None, extract_json_ld, html) if ld_recipe: logger.info("JSON-LD extraction succeeded for %s", url) result = recipe_from_json_ld(ld_recipe, source_url=url) return result # ---- Step 2: HTML Chunk + LLM (with timeout) ---- logger.info("JSON-LD not found for %s, falling back to HTML+LLM", url) elapsed = _time.monotonic() - start remaining = total_timeout - elapsed if remaining < 5: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error=f"Timeout budget exhausted ({elapsed:.1f}s elapsed)", ) text = None llm_source = None text = await loop.run_in_executor(None, _fetch_via_jina, url, min(10, int(remaining))) if text: llm_source = ExtractionMethod.JINA_LLM if not text: text = _extract_html_chunk(html) llm_source = ExtractionMethod.HTML_CHUNK_LLM if not text: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error="Could not extract recipe content from page. All methods failed.", ) # LLM with hard 15s timeout llm_timeout = min(15.0, remaining - 2) if llm_timeout < 3: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error=f"Insufficient time for LLM extraction ({remaining:.1f}s remaining)", ) prompt = RECIPE_LLM_PROMPT.format(url=url, text=text) try: raw = await _call_llm_async(prompt, timeout=int(llm_timeout)) except (asyncio.TimeoutError, TimeoutError) as e: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error=f"Extraction timed out after {llm_timeout:.0f}s. Site may be slow or blocking.", ) except Exception as e: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error=str(e), ) raw = raw.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() try: recipe_data = json.loads(raw) except json.JSONDecodeError: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error="LLM returned invalid JSON", ) if "error" in recipe_data: return RecipeResult( source_url=url, extraction_method=ExtractionMethod.FAILED, error=recipe_data.get("error", "Unknown error from LLM"), ) return RecipeResult( title=recipe_data.get("title", "Untitled"), servings=recipe_data.get("servings"), prep_time=recipe_data.get("prep_time"), cook_time=recipe_data.get("cook_time"), total_time=recipe_data.get("total_time"), ingredients=recipe_data.get("ingredients", []), instructions=recipe_data.get("instructions", []), tags=recipe_data.get("tags", []), source_url=url, extraction_method=llm_source, ) # Backward-compatible alias def extract_recipe_waterfall(url: str, total_timeout: int = 30) -> RecipeResult: """Alias for extract_recipe() — the waterfall is now the default.""" return extract_recipe(url, total_timeout=total_timeout)