"""Thoth Extract Adapter — entity-relation memory extraction. Tests a novel memory architecture: instead of extracting flat calendar events, Thoth extracts durable entities and typed relations suitable for a knowledge graph. This captures implicit connections: "Sully has soccer practice with Coach Mike at the field" produces entities (Sully, Coach Mike, Soccer Field) and relations (Sully->attends->school_sport, Coach Mike->coaches->Sully). Comparisons against production (which produces flat calendar events) log to eval_results/. """ import json import re from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Optional # Reuse production's async extraction infrastructure from icarus.extractor import ExtractionResult as ProdExtractionResult from icarus.tripwire import TripwireResult # --------------------------------------------------------------------------- # Thoth Entity/Relation Data Model # --------------------------------------------------------------------------- ENTITY_TYPES = [ "person", "place", "organization", "event", "concept", "object", ] RELATION_TYPES = [ # Family / social "parent_of", "child_of", "spouse_of", "sibling_of", "knows", "friend_of", # Professional "works_at", "colleague_of", "coaches", "teaches", "student_of", # Location "lives_in", "attends", "located_at", # Temporal "has_event_on", "has_appointment_at", # Activity "participates_in", "responsible_for", # Ownership / affinity "has_pet", "interested_in", "member_of", ] @dataclass class Entity: name: str type: str # one of ENTITY_TYPES aliases: list[str] = field(default_factory=list) confidence: float = 0.0 source_text: str = "" @dataclass class Relation: source: str # entity name target: str # entity name relation_type: str # one of RELATION_TYPES confidence: float = 0.0 context: Optional[str] = None @dataclass class ThothExtraction: """Thoth's structured output — a set of entities and relations. This is the fundamental unit of Thoth's memory model. Unlike production's flat calendar events, Thoth produces graph data. """ entities: list[Entity] = field(default_factory=list) relations: list[Relation] = field(default_factory=list) confidence: float = 0.0 source_text: str = "" def to_dict(self) -> dict: return { "entities": [ {"name": e.name, "type": e.type, "aliases": e.aliases, "confidence": e.confidence} for e in self.entities ], "relations": [ {"source": r.source, "target": r.target, "type": r.relation_type, "confidence": r.confidence, "context": r.context} for r in self.relations ], "confidence": self.confidence, } # --------------------------------------------------------------------------- # Thoth Extraction Prompt # --------------------------------------------------------------------------- THOTH_PROMPT = """You are Thoth, a knowledge extraction agent. Given a message, extract: 1. ENTITIES — distinct people, places, organizations, events, or objects mentioned. 2. RELATIONS — typed connections between entities. Entity types: person, place, organization, event, concept, object Relation types: parent_of, child_of, spouse_of, sibling_of, knows, friend_of, works_at, colleague_of, coaches, teaches, student_of, lives_in, attends, located_at, has_event_on, has_appointment_at, participates_in, responsible_for, has_pet, interested_in, member_of Return JSON with this exact schema (no markdown, no backticks): {{ "entities": [ {{ "name": "string", "type": "person|place|org|event|concept|object", "aliases": ["nickname"], "confidence": 0.0-1.0 }} ], "relations": [ {{ "source": "entity name", "target": "entity name", "type": "relation type", "confidence": 0.0-1.0, "context": "context or reason" }} ], "overall_confidence": 0.0-1.0 }} Rules: - Only extract what is explicitly stated or clearly implied. - For people, include name variants/aliases (nicknames, roles like "mom"). - For events with times, include a has_event_on relation to each participant. - If nothing noteworthy (casual chat, generic info), set overall_confidence < 0.2. - Prefer exact relation types from the list above; use 'knows' as fallback. - CRITICAL: Relation direction matters. source --[type]--> target means the source is the actor/owner. For example: if Mary coaches John, the relation is "Mary" --[coaches]--> "John", not the reverse. If John is coached by Mary, it is still "Mary" --[coaches]--> "John". - CRITICAL: 'has_pet' is ONLY for actual pets (dogs, cats, etc.). Never use it for objects, groceries, or possessions. Use 'responsible_for' for tasks. - CRITICAL: 'attends' is for events or schools. Use 'located_at' for physical places. - For family relations, prefer parent_of, child_of, spouse_of over 'knows'. Message: {text} """ # --------------------------------------------------------------------------- # Thoth Extractor # --------------------------------------------------------------------------- class ThothExtractor: """Extracts entities and relations from text using local LLM.""" def __init__( self, ollama_base_url: str = "http://matt-pc.tail864e81.ts.net:11434", model: str = "qwen2.5-coder:7b", ): self.ollama_base_url = ollama_base_url self.model = model def extract(self, text: str) -> Optional[ProdExtractionResult]: """Thoth extraction, wrapped as a Production-style ExtractionResult. This is the drop-in replacement method. It calls the LLM with Thoth's entity-relation prompt, then maps the result to Production's schema for comparison. """ thoth_result = self._extract_thoth(text) if not thoth_result or thoth_result.confidence < 0.3: return None # Map Thoth output to production schema for comparison event_type, summary = self._map_to_event(thoth_result) return ProdExtractionResult( event_type=event_type, summary=summary, dates=[], times=[], people=[e.name for e in thoth_result.entities if e.type == "person"], context=self._infer_context(thoth_result), location=self._find_location(thoth_result), confidence=thoth_result.confidence, ) async def extract_async( self, text: str, tripwire: TripwireResult, ollama_base_url: str = "http://matt-pc.tail864e81.ts.net:11434", model: str = "qwen2.5-coder:7b", ) -> ProdExtractionResult: """Async extraction — matches production's async extract() signature. This is the method called by the daemon's experiment path. """ self.ollama_base_url = ollama_base_url self.model = model result = self._extract_thoth_sync(text) if not result or result.confidence < 0.3: return ProdExtractionResult(confidence=0.0) event_type, summary = self._map_to_event(result) return ProdExtractionResult( event_type=event_type, summary=summary, dates=[], times=[], people=[e.name for e in result.entities if e.type == "person"], context=self._infer_context(result), location=self._find_location(result), confidence=result.confidence, ) def extract_raw(self, text: str) -> Optional[ThothExtraction]: """Extract entities and relations from text. Returns Thoth's native data model.""" return self._extract_thoth(text) def _extract_thoth(self, text: str) -> Optional[ThothExtraction]: """Call LLM with Thoth prompt, parse entity-relation output.""" prompt = THOTH_PROMPT.format(text=text[:2000]) import httpx try: resp = httpx.post( f"{self.ollama_base_url}/api/generate", json={ "model": self.model, "prompt": prompt, "stream": False, "options": {"temperature": 0.1, "num_predict": 1024}, }, timeout=20, ) resp.raise_for_status() data = resp.json() llm_output = data.get("response", "") except Exception as e: print(f"[thoth] extraction failed: {e}") return None return self._parse_thoth_output(text, llm_output) def _extract_thoth_sync(self, text: str) -> Optional[ThothExtraction]: """Synchronous wrapper — needed for _run_experiment in daemon.py.""" return self._extract_thoth(text) def _parse_thoth_output(self, text: str, llm_output: str) -> Optional[ThothExtraction]: """Parse LLM response into ThothExtraction with error recovery.""" try: json_match = re.search(r"\{.*\}", llm_output, re.DOTALL) if not json_match: return ThothExtraction(confidence=0.0, source_text=text[:500]) data = json.loads(json_match.group()) entities = [] for ed in data.get("entities", []): if isinstance(ed, dict): entities.append(Entity( name=self._clean(ed.get("name", "")), type=ed.get("type", "concept"), aliases=ed.get("aliases", []), confidence=max(0.0, min(1.0, ed.get("confidence", 0.5))), source_text=text[:200], )) relations = [] for rd in data.get("relations", []): if isinstance(rd, dict): relations.append(Relation( source=self._clean(rd.get("source", "")), target=self._clean(rd.get("target", "")), relation_type=rd.get("type", "knows"), confidence=max(0.0, min(1.0, rd.get("confidence", 0.5))), context=rd.get("context"), )) overall = max(0.0, min(1.0, data.get("overall_confidence", 0.0))) return ThothExtraction( entities=entities, relations=relations, confidence=overall, source_text=text[:500], ) except (json.JSONDecodeError, AttributeError) as e: print(f"[thoth] parse failed: {e}") return ThothExtraction(confidence=0.0, source_text=text[:500]) @staticmethod def _clean(s: str) -> str: return s.strip().strip('"').strip("'") def _map_to_event(self, t: ThothExtraction): """Map Thoth graph output to a flat event summary for comparison.""" people = [e.name for e in t.entities if e.type == "person"] things = [e.name for e in t.entities if e.type not in ("person",)] has_time = any(r.relation_type in ("has_event_on", "has_appointment_at") for r in t.relations) event_type = "calendar_event" if has_time else "coordination" summary_parts = [] if things: summary_parts.append(", ".join(things[:3])) if people: summary_parts.append("involving " + ", ".join(people[:3])) summary = " — ".join(summary_parts) if summary_parts else "extracted entities" return event_type, summary def _infer_context(self, t: ThothExtraction) -> str: relation_types = {r.relation_type for r in t.relations} if "coaches" in relation_types or "teaches" in relation_types: return "school" if "has_appointment_at" in relation_types: return "medical" if "lives_in" in relation_types or "located_at" in relation_types: return "transport" if "friend_of" in relation_types: return "social" if "participates_in" in relation_types: return "care_coverage" return "other" def _find_location(self, t: ThothExtraction) -> Optional[str]: for e in t.entities: if e.type == "place": return e.name for r in t.relations: if r.relation_type in ("located_at", "lives_in", "attends"): return r.target return None def compare_to_production(self, text: str): """Run both extractors, log comparison, return comparison result.""" import asyncio from icarus.extractor import extract as prod_extract from icarus.tripwire import run_tripwire tripwire = run_tripwire(text) prod_result = asyncio.run(prod_extract(text, tripwire)) thoth_result = self._extract_thoth(text) return ComparisonResult(prod=prod_result, thoth=thoth_result) def compare_graphs(self, text: str): """Full graph-level comparison: production entities vs Thoth entities. Returns a detailed comparison including: - Entities found by both - Entities Thoth found that production missed (relations/connections) - Relations Thoth inferred that production doesn't model """ import asyncio from icarus.extractor import extract as prod_extract from icarus.tripwire import run_tripwire tripwire = run_tripwire(text) prod_result = asyncio.run(prod_extract(text, tripwire)) thoth_result = self._extract_thoth(text) return GraphComparison(prod=prod_result, thoth=thoth_result) # --------------------------------------------------------------------------- # Comparison Data Classes # --------------------------------------------------------------------------- @dataclass class ComparisonResult: prod: Optional[ProdExtractionResult] thoth: Optional[ThothExtraction] def summary(self) -> str: """Produce human-readable comparison.""" lines = [] lines.append("=== Thoth vs Production Comparison ===") lines.append(f" Production: {self._fmt_prod(self.prod)}") if self.thoth: lines.append(f" Thoth entities ({len(self.thoth.entities)}): " f"{', '.join(e.name for e in self.thoth.entities[:8])}") lines.append(f" Thoth relations ({len(self.thoth.relations)}): " f"{', '.join(f'{r.source}--[{r.relation_type}]-->{r.target}' for r in self.thoth.relations[:5])}") lines.append(f" Thoth confidence: {self.thoth.confidence:.2f}") else: lines.append(" Thoth: None") return "\n".join(lines) @staticmethod def _fmt_prod(r: Optional[ProdExtractionResult]) -> str: if not r or r.confidence < 0.01: return "None" people = ", ".join(r.people[:3]) if r.people else "none" times = ", ".join(r.times[:2]) if r.times else "no times" return f"{r.event_type} | {r.summary[:60]} | people=[{people}] | times=[{times}] | conf={r.confidence:.2f}" @dataclass class GraphComparison: """Deep comparison between production's flat event and Thoth's graph.""" prod: Optional[ProdExtractionResult] thoth: Optional[ThothExtraction] def summary(self) -> str: lines = [] lines.append("=== Graph Comparison ===") if not self.prod and not self.thoth: return "Both returned nothing." # People prod_people = set(self.prod.people) if self.prod and self.prod.people else set() thoth_people = {e.name.lower() for e in (self.thoth.entities or []) if e.type == "person"} both_people = prod_people.intersection(thoth_people) thoth_only = thoth_people - {p.lower() for p in prod_people} lines.append(f" People — both: {len(both_people)}, " f"Thoth-only: {len(thoth_only)} {', '.join(thoth_only) if thoth_only else ''}") # Relation novelty if self.thoth: lines.append(f" Relations discovered by Thoth: {len(self.thoth.relations)}") for r in self.thoth.relations[:8]: lines.append(f" {r.source} --[{r.relation_type}]--> {r.target} (conf={r.confidence:.2f})") else: lines.append(" Relations: none (Thoth returned nothing)") # Entity types if self.thoth: by_type = {} for e in self.thoth.entities: by_type.setdefault(e.type, 0) by_type[e.type] += 1 lines.append(f" Entity breakdown: {by_type}") return "\n".join(lines) # --------------------------------------------------------------------------- # Batch evaluation # --------------------------------------------------------------------------- def compare_batch(messages: list[str]) -> list[dict]: """Run Thoth + Production on a batch of messages. Returns structured results. Useful for backtesting against historical data. """ import asyncio from icarus.extractor import extract as prod_extract from icarus.tripwire import run_tripwire extractor = ThothExtractor() results = [] for msg in messages: tripwire = run_tripwire(msg) prod = asyncio.run(prod_extract(msg, tripwire)) thoth = extractor.extract_raw(msg) results.append({ "message": msg[:200], "tripwire_fired": tripwire.fired, "tripwire_confidence": tripwire.confidence, "production": { "event_type": prod.event_type if prod else None, "summary": prod.summary if prod else None, "confidence": prod.confidence if prod else None, }, "thoth": { "num_entities": len(thoth.entities) if thoth else 0, "num_relations": len(thoth.relations) if thoth else 0, "confidence": thoth.confidence if thoth else 0.0, "people": [e.name for e in (thoth.entities or []) if e.type == "person"], "relations": [ f"{r.source}--[{r.relation_type}]-->{r.target}" for r in (thoth.relations or []) ], }, }) return results