📄 adapter.py 18,502 bytes Today 22:50 📋 Raw

"""Thoth Extract Adapter — entity-relation memory extraction.

Tests a novel memory architecture: instead of extracting flat calendar events,
Thoth extracts durable entities and typed relations suitable for a knowledge graph.

This captures implicit connections: "Sully has soccer practice with Coach Mike at
the field" produces entities (Sully, Coach Mike, Soccer Field) and relations
(Sully->attends->school_sport, Coach Mike->coaches->Sully).

Comparisons against production (which produces flat calendar events) log to eval_results/.
"""

import json
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

Reuse production's async extraction infrastructure

from icarus.extractor import ExtractionResult as ProdExtractionResult
from icarus.tripwire import TripwireResult

---------------------------------------------------------------------------

Thoth Entity/Relation Data Model

---------------------------------------------------------------------------

ENTITY_TYPES = [
"person",
"place",
"organization",
"event",
"concept",
"object",
]

RELATION_TYPES = [
# Family / social
"parent_of",
"child_of",
"spouse_of",
"sibling_of",
"knows",
"friend_of",
# Professional
"works_at",
"colleague_of",
"coaches",
"teaches",
"student_of",
# Location
"lives_in",
"attends",
"located_at",
# Temporal
"has_event_on",
"has_appointment_at",
# Activity
"participates_in",
"responsible_for",
# Ownership / affinity
"has_pet",
"interested_in",
"member_of",
]

@dataclass
class Entity:
name: str
type: str # one of ENTITY_TYPES
aliases: list[str] = field(default_factory=list)
confidence: float = 0.0
source_text: str = ""

@dataclass
class Relation:
source: str # entity name
target: str # entity name
relation_type: str # one of RELATION_TYPES
confidence: float = 0.0
context: Optional[str] = None

@dataclass
class ThothExtraction:
"""Thoth's structured output — a set of entities and relations.

This is the fundamental unit of Thoth's memory model.
Unlike production's flat calendar events, Thoth produces graph data.
"""

entities: list[Entity] = field(default_factory=list)
relations: list[Relation] = field(default_factory=list)
confidence: float = 0.0
source_text: str = ""

def to_dict(self) -> dict:
    return {
        "entities": [
            {"name": e.name, "type": e.type, "aliases": e.aliases, "confidence": e.confidence}
            for e in self.entities
        ],
        "relations": [
            {"source": r.source, "target": r.target, "type": r.relation_type,
             "confidence": r.confidence, "context": r.context}
            for r in self.relations
        ],
        "confidence": self.confidence,
    }

---------------------------------------------------------------------------

Thoth Extraction Prompt

---------------------------------------------------------------------------

THOTH_PROMPT = """You are Thoth, a knowledge extraction agent. Given a message, extract:
1. ENTITIES — distinct people, places, organizations, events, or objects mentioned.
2. RELATIONS — typed connections between entities.

Entity types: person, place, organization, event, concept, object
Relation types: parent_of, child_of, spouse_of, sibling_of, knows, friend_of,
works_at, colleague_of, coaches, teaches, student_of,
lives_in, attends, located_at,
has_event_on, has_appointment_at,
participates_in, responsible_for,
has_pet, interested_in, member_of

Return JSON with this exact schema (no markdown, no backticks):
{{
"entities": [
{{ "name": "string", "type": "person|place|org|event|concept|object",
"aliases": ["nickname"], "confidence": 0.0-1.0 }}
],
"relations": [
{{ "source": "entity name", "target": "entity name",
"type": "relation type", "confidence": 0.0-1.0,
"context": "context or reason" }}
],
"overall_confidence": 0.0-1.0
}}

Rules:
- Only extract what is explicitly stated or clearly implied.
- For people, include name variants/aliases (nicknames, roles like "mom").
- For events with times, include a has_event_on relation to each participant.
- If nothing noteworthy (casual chat, generic info), set overall_confidence < 0.2.
- Prefer exact relation types from the list above; use 'knows' as fallback.
- CRITICAL: Relation direction matters. source --[type]--> target means the source
is the actor/owner. For example: if Mary coaches John, the relation is
"Mary" --[coaches]--> "John", not the reverse. If John is coached by Mary,
it is still "Mary" --[coaches]--> "John".
- CRITICAL: 'has_pet' is ONLY for actual pets (dogs, cats, etc.). Never use it for
objects, groceries, or possessions. Use 'responsible_for' for tasks.
- CRITICAL: 'attends' is for events or schools. Use 'located_at' for physical places.
- For family relations, prefer parent_of, child_of, spouse_of over 'knows'.

Message:
{text}
"""

---------------------------------------------------------------------------

Thoth Extractor

---------------------------------------------------------------------------

class ThothExtractor:
"""Extracts entities and relations from text using local LLM."""

def __init__(
    self,
    ollama_base_url: str = "http://matt-pc.tail864e81.ts.net:11434",
    model: str = "qwen2.5-coder:7b",
):
    self.ollama_base_url = ollama_base_url
    self.model = model

def extract(self, text: str) -> Optional[ProdExtractionResult]:
    """Thoth extraction, wrapped as a Production-style ExtractionResult.

    This is the drop-in replacement method. It calls the LLM with Thoth's
    entity-relation prompt, then maps the result to Production's schema
    for comparison.
    """
    thoth_result = self._extract_thoth(text)
    if not thoth_result or thoth_result.confidence < 0.3:
        return None

    # Map Thoth output to production schema for comparison
    event_type, summary = self._map_to_event(thoth_result)
    return ProdExtractionResult(
        event_type=event_type,
        summary=summary,
        dates=[],
        times=[],
        people=[e.name for e in thoth_result.entities if e.type == "person"],
        context=self._infer_context(thoth_result),
        location=self._find_location(thoth_result),
        confidence=thoth_result.confidence,
    )

async def extract_async(
    self, text: str, tripwire: TripwireResult,
    ollama_base_url: str = "http://matt-pc.tail864e81.ts.net:11434",
    model: str = "qwen2.5-coder:7b",
) -> ProdExtractionResult:
    """Async extraction — matches production's async extract() signature.

    This is the method called by the daemon's experiment path.
    """
    self.ollama_base_url = ollama_base_url
    self.model = model
    result = self._extract_thoth_sync(text)
    if not result or result.confidence < 0.3:
        return ProdExtractionResult(confidence=0.0)

    event_type, summary = self._map_to_event(result)
    return ProdExtractionResult(
        event_type=event_type,
        summary=summary,
        dates=[],
        times=[],
        people=[e.name for e in result.entities if e.type == "person"],
        context=self._infer_context(result),
        location=self._find_location(result),
        confidence=result.confidence,
    )

def extract_raw(self, text: str) -> Optional[ThothExtraction]:
    """Extract entities and relations from text. Returns Thoth's native data model."""
    return self._extract_thoth(text)

def _extract_thoth(self, text: str) -> Optional[ThothExtraction]:
    """Call LLM with Thoth prompt, parse entity-relation output."""
    prompt = THOTH_PROMPT.format(text=text[:2000])

    import httpx
    try:
        resp = httpx.post(
            f"{self.ollama_base_url}/api/generate",
            json={
                "model": self.model,
                "prompt": prompt,
                "stream": False,
                "options": {"temperature": 0.1, "num_predict": 1024},
            },
            timeout=20,
        )
        resp.raise_for_status()
        data = resp.json()
        llm_output = data.get("response", "")
    except Exception as e:
        print(f"[thoth] extraction failed: {e}")
        return None

    return self._parse_thoth_output(text, llm_output)

def _extract_thoth_sync(self, text: str) -> Optional[ThothExtraction]:
    """Synchronous wrapper — needed for _run_experiment in daemon.py."""
    return self._extract_thoth(text)

def _parse_thoth_output(self, text: str, llm_output: str) -> Optional[ThothExtraction]:
    """Parse LLM response into ThothExtraction with error recovery."""
    try:
        json_match = re.search(r"\{.*\}", llm_output, re.DOTALL)
        if not json_match:
            return ThothExtraction(confidence=0.0, source_text=text[:500])

        data = json.loads(json_match.group())

        entities = []
        for ed in data.get("entities", []):
            if isinstance(ed, dict):
                entities.append(Entity(
                    name=self._clean(ed.get("name", "")),
                    type=ed.get("type", "concept"),
                    aliases=ed.get("aliases", []),
                    confidence=max(0.0, min(1.0, ed.get("confidence", 0.5))),
                    source_text=text[:200],
                ))

        relations = []
        for rd in data.get("relations", []):
            if isinstance(rd, dict):
                relations.append(Relation(
                    source=self._clean(rd.get("source", "")),
                    target=self._clean(rd.get("target", "")),
                    relation_type=rd.get("type", "knows"),
                    confidence=max(0.0, min(1.0, rd.get("confidence", 0.5))),
                    context=rd.get("context"),
                ))

        overall = max(0.0, min(1.0, data.get("overall_confidence", 0.0)))

        return ThothExtraction(
            entities=entities,
            relations=relations,
            confidence=overall,
            source_text=text[:500],
        )

    except (json.JSONDecodeError, AttributeError) as e:
        print(f"[thoth] parse failed: {e}")
        return ThothExtraction(confidence=0.0, source_text=text[:500])

@staticmethod
def _clean(s: str) -> str:
    return s.strip().strip('"').strip("'")

def _map_to_event(self, t: ThothExtraction):
    """Map Thoth graph output to a flat event summary for comparison."""
    people = [e.name for e in t.entities if e.type == "person"]
    things = [e.name for e in t.entities if e.type not in ("person",)]
    has_time = any(r.relation_type in ("has_event_on", "has_appointment_at")
                   for r in t.relations)

    event_type = "calendar_event" if has_time else "coordination"
    summary_parts = []
    if things:
        summary_parts.append(", ".join(things[:3]))
    if people:
        summary_parts.append("involving " + ", ".join(people[:3]))
    summary = " — ".join(summary_parts) if summary_parts else "extracted entities"

    return event_type, summary

def _infer_context(self, t: ThothExtraction) -> str:
    relation_types = {r.relation_type for r in t.relations}
    if "coaches" in relation_types or "teaches" in relation_types:
        return "school"
    if "has_appointment_at" in relation_types:
        return "medical"
    if "lives_in" in relation_types or "located_at" in relation_types:
        return "transport"
    if "friend_of" in relation_types:
        return "social"
    if "participates_in" in relation_types:
        return "care_coverage"
    return "other"

def _find_location(self, t: ThothExtraction) -> Optional[str]:
    for e in t.entities:
        if e.type == "place":
            return e.name
    for r in t.relations:
        if r.relation_type in ("located_at", "lives_in", "attends"):
            return r.target
    return None

def compare_to_production(self, text: str):
    """Run both extractors, log comparison, return comparison result."""
    import asyncio
    from icarus.extractor import extract as prod_extract
    from icarus.tripwire import run_tripwire

    tripwire = run_tripwire(text)
    prod_result = asyncio.run(prod_extract(text, tripwire))
    thoth_result = self._extract_thoth(text)

    return ComparisonResult(prod=prod_result, thoth=thoth_result)

def compare_graphs(self, text: str):
    """Full graph-level comparison: production entities vs Thoth entities.

    Returns a detailed comparison including:
    - Entities found by both
    - Entities Thoth found that production missed (relations/connections)
    - Relations Thoth inferred that production doesn't model
    """
    import asyncio
    from icarus.extractor import extract as prod_extract
    from icarus.tripwire import run_tripwire

    tripwire = run_tripwire(text)
    prod_result = asyncio.run(prod_extract(text, tripwire))
    thoth_result = self._extract_thoth(text)

    return GraphComparison(prod=prod_result, thoth=thoth_result)

---------------------------------------------------------------------------

Comparison Data Classes

---------------------------------------------------------------------------

@dataclass
class ComparisonResult:
prod: Optional[ProdExtractionResult]
thoth: Optional[ThothExtraction]

def summary(self) -> str:
    """Produce human-readable comparison."""
    lines = []
    lines.append("=== Thoth vs Production Comparison ===")
    lines.append(f"  Production: {self._fmt_prod(self.prod)}")
    if self.thoth:
        lines.append(f"  Thoth entities ({len(self.thoth.entities)}): "
                     f"{', '.join(e.name for e in self.thoth.entities[:8])}")
        lines.append(f"  Thoth relations ({len(self.thoth.relations)}): "
                     f"{', '.join(f'{r.source}--[{r.relation_type}]-->{r.target}' for r in self.thoth.relations[:5])}")
        lines.append(f"  Thoth confidence: {self.thoth.confidence:.2f}")
    else:
        lines.append("  Thoth: None")
    return "\n".join(lines)

@staticmethod
def _fmt_prod(r: Optional[ProdExtractionResult]) -> str:
    if not r or r.confidence < 0.01:
        return "None"
    people = ", ".join(r.people[:3]) if r.people else "none"
    times = ", ".join(r.times[:2]) if r.times else "no times"
    return f"{r.event_type} | {r.summary[:60]} | people=[{people}] | times=[{times}] | conf={r.confidence:.2f}"

@dataclass
class GraphComparison:
"""Deep comparison between production's flat event and Thoth's graph."""
prod: Optional[ProdExtractionResult]
thoth: Optional[ThothExtraction]

def summary(self) -> str:
    lines = []
    lines.append("=== Graph Comparison ===")

    if not self.prod and not self.thoth:
        return "Both returned nothing."

    # People
    prod_people = set(self.prod.people) if self.prod and self.prod.people else set()
    thoth_people = {e.name.lower() for e in (self.thoth.entities or []) if e.type == "person"}
    both_people = prod_people.intersection(thoth_people)
    thoth_only = thoth_people - {p.lower() for p in prod_people}

    lines.append(f"  People — both: {len(both_people)}, "
                 f"Thoth-only: {len(thoth_only)} {', '.join(thoth_only) if thoth_only else ''}")

    # Relation novelty
    if self.thoth:
        lines.append(f"  Relations discovered by Thoth: {len(self.thoth.relations)}")
        for r in self.thoth.relations[:8]:
            lines.append(f"    {r.source} --[{r.relation_type}]--> {r.target} (conf={r.confidence:.2f})")
    else:
        lines.append("  Relations: none (Thoth returned nothing)")

    # Entity types
    if self.thoth:
        by_type = {}
        for e in self.thoth.entities:
            by_type.setdefault(e.type, 0)
            by_type[e.type] += 1
        lines.append(f"  Entity breakdown: {by_type}")

    return "\n".join(lines)

---------------------------------------------------------------------------

Batch evaluation

---------------------------------------------------------------------------

def compare_batch(messages: list[str]) -> list[dict]:
"""Run Thoth + Production on a batch of messages. Returns structured results.

Useful for backtesting against historical data.
"""
import asyncio
from icarus.extractor import extract as prod_extract
from icarus.tripwire import run_tripwire

extractor = ThothExtractor()
results = []

for msg in messages:
    tripwire = run_tripwire(msg)
    prod = asyncio.run(prod_extract(msg, tripwire))
    thoth = extractor.extract_raw(msg)

    results.append({
        "message": msg[:200],
        "tripwire_fired": tripwire.fired,
        "tripwire_confidence": tripwire.confidence,
        "production": {
            "event_type": prod.event_type if prod else None,
            "summary": prod.summary if prod else None,
            "confidence": prod.confidence if prod else None,
        },
        "thoth": {
            "num_entities": len(thoth.entities) if thoth else 0,
            "num_relations": len(thoth.relations) if thoth else 0,
            "confidence": thoth.confidence if thoth else 0.0,
            "people": [e.name for e in (thoth.entities or []) if e.type == "person"],
            "relations": [
                f"{r.source}--[{r.relation_type}]-->{r.target}"
                for r in (thoth.relations or [])
            ],
        },
    })

return results