"""Extractor — Tier 2 LLM extraction when tripwire fires. Runs a local model (7B-class on M4, or Gaming PC Ollama) to extract structured data from coordination signals. Output: ExtractionResult with event type, dates, times, people, context. """ import json import re import logging from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Optional import httpx from icarus.tripwire import TripwireResult logger = logging.getLogger(__name__) EXTRACTION_PROMPT = """You are Icarus, a family coordination assistant. Extract structured information from this message. Message: {text} Return JSON with: - event_type: "calendar_event" (has specific time), "coordination" (has date but no time), or "info" (just reference) - summary: brief 1-line summary - dates: list of ISO date strings (YYYY-MM-DD) mentioned - times: list of "HH:MM" times mentioned (24h format) - people: list of people mentioned (family member names or roles) - context: "transport" | "medical" | "school" | "social" | "care_coverage" | "other" - location: where it's happening (or null) - action_needed: what needs to happen (or null) - confidence: 0.0-1.0 how confident you are in this extraction If the message is not about coordination or events, set confidence to 0. If there are multiple events, list them in an "events" array field. """ @dataclass class ExtractionResult: event_type: str = "info" # calendar_event | coordination | info summary: str = "" dates: list[str] = field(default_factory=list) times: list[str] = field(default_factory=list) people: list[str] = field(default_factory=list) context: str = "other" location: Optional[str] = None action_needed: Optional[str] = None confidence: float = 0.0 multiple_events: list[dict] = field(default_factory=list) raw: Optional[dict] = None def _parse_extraction(text: str, llm_output: str) -> ExtractionResult: """Parse LLM output into ExtractionResult with fallback.""" try: # Find JSON block in response json_match = re.search(r"\{.*\}", llm_output, re.DOTALL) if not json_match: return ExtractionResult(confidence=0.0) data = json.loads(json_match.group()) result = ExtractionResult( event_type=data.get("event_type", "info"), summary=data.get("summary", ""), dates=data.get("dates", []), times=data.get("times", []), people=data.get("people", []), context=data.get("context", "other"), location=data.get("location"), action_needed=data.get("action_needed"), confidence=max(0.0, min(1.0, data.get("confidence", 0.0))), multiple_events=data.get("events", []), raw=data, ) return result except (json.JSONDecodeError, AttributeError) as e: logger.warning(f"Failed to parse extraction: {e}") return ExtractionResult(confidence=0.0) async def extract( text: str, tripwire: TripwireResult, ollama_base_url: str = "http://matt-pc.tail864e81.ts.net:11434", model: str = "qwen2.5-coder:7b", timeout: int = 30, ) -> ExtractionResult: """Run LLM extraction on text that tripped the wire. Uses configured Ollama endpoint. Falls back gracefully on failure. """ # Skip if tripwire didn't fire if not tripwire.fired: return ExtractionResult(confidence=0.0) prompt = EXTRACTION_PROMPT.format(text=text[:2000]) try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post( f"{ollama_base_url}/api/generate", json={ "model": model, "prompt": prompt, "stream": False, "options": {"temperature": 0.1, "num_predict": 512}, }, ) resp.raise_for_status() data = resp.json() llm_output = data.get("response", "") except (httpx.HTTPError, httpx.TimeoutException) as e: logger.warning(f"Ollama extraction failed: {e}") return ExtractionResult(confidence=0.0) result = _parse_extraction(text, llm_output) return result async def extract_sync( text: str, tripwire: TripwireResult, ollama_base_url: str = "http://matt-pc.tail864e81.ts.net:11434", model: str = "qwen2.5-coder:7b", timeout: int = 30, ) -> ExtractionResult: """Synchronous wrapper for async extract.""" import asyncio return await extract(text, tripwire, ollama_base_url, model, timeout)