"""Extractor — Tier 2 LLM extraction when tripwire fires.
Runs a local model (7B-class on M4, or Gaming PC Ollama) to extract
structured data from coordination signals.
Output: ExtractionResult with event type, dates, times, people, context.
"""
import json
import re
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
import httpx
from icarus.tripwire import TripwireResult
logger = logging.getLogger(name)
EXTRACTION_PROMPT = """You are Icarus, a family coordination assistant. Extract structured information from this message.
Message: {text}
Return JSON with:
- event_type: "calendar_event" (has specific time), "coordination" (has date but no time), or "info" (just reference)
- summary: brief 1-line summary
- dates: list of ISO date strings (YYYY-MM-DD) mentioned
- times: list of "HH:MM" times mentioned (24h format)
- people: list of people mentioned (family member names or roles)
- context: "transport" | "medical" | "school" | "social" | "care_coverage" | "other"
- location: where it's happening (or null)
- action_needed: what needs to happen (or null)
- confidence: 0.0-1.0 how confident you are in this extraction
If the message is not about coordination or events, set confidence to 0.
If there are multiple events, list them in an "events" array field.
"""
@dataclass
class ExtractionResult:
event_type: str = "info" # calendar_event | coordination | info
summary: str = ""
dates: list[str] = field(default_factory=list)
times: list[str] = field(default_factory=list)
people: list[str] = field(default_factory=list)
context: str = "other"
location: Optional[str] = None
action_needed: Optional[str] = None
confidence: float = 0.0
multiple_events: list[dict] = field(default_factory=list)
raw: Optional[dict] = None
def _parse_extraction(text: str, llm_output: str) -> ExtractionResult:
"""Parse LLM output into ExtractionResult with fallback."""
try:
# Find JSON block in response
json_match = re.search(r"{.*}", llm_output, re.DOTALL)
if not json_match:
return ExtractionResult(confidence=0.0)
data = json.loads(json_match.group())
result = ExtractionResult(
event_type=data.get("event_type", "info"),
summary=data.get("summary", ""),
dates=data.get("dates", []),
times=data.get("times", []),
people=data.get("people", []),
context=data.get("context", "other"),
location=data.get("location"),
action_needed=data.get("action_needed"),
confidence=max(0.0, min(1.0, data.get("confidence", 0.0))),
multiple_events=data.get("events", []),
raw=data,
)
return result
except (json.JSONDecodeError, AttributeError) as e:
logger.warning(f"Failed to parse extraction: {e}")
return ExtractionResult(confidence=0.0)
async def extract(
text: str,
tripwire: TripwireResult,
ollama_base_url: str = "http://matt-pc.tail864e81.ts.net:11434",
model: str = "qwen2.5-coder:7b",
timeout: int = 30,
) -> ExtractionResult:
"""Run LLM extraction on text that tripped the wire.
Uses configured Ollama endpoint. Falls back gracefully on failure.
"""
# Skip if tripwire didn't fire
if not tripwire.fired:
return ExtractionResult(confidence=0.0)
prompt = EXTRACTION_PROMPT.format(text=text[:2000])
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(
f"{ollama_base_url}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 512},
},
)
resp.raise_for_status()
data = resp.json()
llm_output = data.get("response", "")
except (httpx.HTTPError, httpx.TimeoutException) as e:
logger.warning(f"Ollama extraction failed: {e}")
return ExtractionResult(confidence=0.0)
result = _parse_extraction(text, llm_output)
return result
async def extract_sync(
text: str,
tripwire: TripwireResult,
ollama_base_url: str = "http://matt-pc.tail864e81.ts.net:11434",
model: str = "qwen2.5-coder:7b",
timeout: int = 30,
) -> ExtractionResult:
"""Synchronous wrapper for async extract."""
import asyncio
return await extract(text, tripwire, ollama_base_url, model, timeout)