📄 engine.py 13,256 bytes Apr 27, 2026 📋 Raw

"""Ontology Engine - Clean-room knowledge graph implementation.

Zero dependencies beyond SQLite. Natural language query interface.
Export to Obsidian for visualization.
"""

import json
import sqlite3
from pathlib import Path
from typing import Optional, List, Dict, Any
from datetime import datetime

DEFAULT_DB_PATH = Path.home() / ".openclaw" / "data" / "ontology.db"

class OntologyEngine:
"""SQLite-based knowledge graph engine."""

def __init__(self, db_path: Path = DEFAULT_DB_PATH):
    self.db_path = db_path
    self.db_path.parent.mkdir(parents=True, exist_ok=True)
    self._init_schema()

def _init_schema(self):
    """Initialize database schema from SQL file."""
    schema_path = Path(__file__).parent.parent.parent / "shared" / "ontology" / "schema.sql"
    with sqlite3.connect(self.db_path) as conn:
        with open(schema_path) as f:
            conn.executescript(f.read())

# -------------------------------------------------------------------------
# Entity CRUD
# -------------------------------------------------------------------------

def add_entity(
    self,
    id: str,
    type: str,
    name: str,
    properties: Optional[Dict] = None,
    source: str = "manual",
    confidence: float = 1.0
) -> Dict[str, Any]:
    """Add or update an entity."""
    with sqlite3.connect(self.db_path) as conn:
        conn.execute(
            """INSERT OR REPLACE INTO ontology_entities 
               (id, type, name, properties, source, confidence, updated_at)
               VALUES (?, ?, ?, ?, ?, ?, ?)""",
            (id, type, name, json.dumps(properties or {}), source, confidence, datetime.now())
        )
    return {"id": id, "type": type, "name": name, "status": "added"}

def get_entity(self, id: str) -> Optional[Dict[str, Any]]:
    """Retrieve entity by ID."""
    with sqlite3.connect(self.db_path) as conn:
        conn.row_factory = sqlite3.Row
        row = conn.execute(
            "SELECT * FROM ontology_entities WHERE id = ?", (id,)
        ).fetchone()
        if row:
            return dict(row)
        return None

def find_entities(
    self,
    type: Optional[str] = None,
    name_contains: Optional[str] = None,
    limit: int = 20
) -> List[Dict[str, Any]]:
    """Search entities by type or name."""
    with sqlite3.connect(self.db_path) as conn:
        conn.row_factory = sqlite3.Row
        query = "SELECT * FROM ontology_entities WHERE 1=1"
        params = []
        if type:
            query += " AND type = ?"
            params.append(type)
        if name_contains:
            query += " AND name LIKE ?"
            params.append(f"%{name_contains}%")
        query += " LIMIT ?"
        params.append(limit)

        rows = conn.execute(query, params).fetchall()
        return [dict(row) for row in rows]

# -------------------------------------------------------------------------
# Relation CRUD
# -------------------------------------------------------------------------

def add_relation(
    self,
    from_id: str,
    to_id: str,
    relation_type: str,
    properties: Optional[Dict] = None,
    confidence: float = 1.0
) -> Dict[str, Any]:
    """Add a directed relation between entities."""
    with sqlite3.connect(self.db_path) as conn:
        conn.execute(
            """INSERT OR REPLACE INTO ontology_relations
               (from_id, to_id, relation_type, properties, confidence, created_at)
               VALUES (?, ?, ?, ?, ?, ?)""",
            (from_id, to_id, relation_type, json.dumps(properties or {}), confidence, datetime.now())
        )
    return {"from": from_id, "to": to_id, "relation": relation_type, "status": "added"}

def get_relations(
    self,
    entity_id: str,
    direction: str = "both"  # "from", "to", "both"
) -> List[Dict[str, Any]]:
    """Get all relations for an entity."""
    with sqlite3.connect(self.db_path) as conn:
        conn.row_factory = sqlite3.Row
        results = []

        if direction in ("from", "both"):
            rows = conn.execute(
                """SELECT r.*, e.name as to_name 
                   FROM ontology_relations r
                   JOIN ontology_entities e ON r.to_id = e.id
                   WHERE r.from_id = ?""", (entity_id,)
            ).fetchall()
            results.extend([{**dict(row), "direction": "outgoing"} for row in rows])

        if direction in ("to", "both"):
            rows = conn.execute(
                """SELECT r.*, e.name as from_name
                   FROM ontology_relations r
                   JOIN ontology_entities e ON r.from_id = e.id
                   WHERE r.to_id = ?""", (entity_id,)
            ).fetchall()
            results.extend([{**dict(row), "direction": "incoming"} for row in rows])

        return results

# -------------------------------------------------------------------------
# Observation tracking
# -------------------------------------------------------------------------

def add_observation(
    self,
    entity_id: str,
    observation: str,
    source: str = "inferred",
    confidence: float = 0.8
) -> Dict[str, Any]:
    """Add a temporal observation about an entity."""
    with sqlite3.connect(self.db_path) as conn:
        cursor = conn.execute(
            """INSERT INTO ontology_observations
               (entity_id, observation, source, confidence, observed_at)
               VALUES (?, ?, ?, ?, ?)""",
            (entity_id, observation, source, confidence, datetime.now())
        )
    return {"id": cursor.lastrowid, "entity": entity_id, "observation": observation}

def get_observations(self, entity_id: str, limit: int = 10) -> List[Dict[str, Any]]:
    """Get recent observations about an entity."""
    with sqlite3.connect(self.db_path) as conn:
        conn.row_factory = sqlite3.Row
        rows = conn.execute(
            """SELECT * FROM ontology_observations
               WHERE entity_id = ?
               ORDER BY observed_at DESC
               LIMIT ?""", (entity_id, limit)
        ).fetchall()
        return [dict(row) for row in rows]

# -------------------------------------------------------------------------
# Query interface
# -------------------------------------------------------------------------

def query(self, pattern: str, **kwargs) -> Dict[str, Any]:
    """Pattern-based query interface."""
    # Simple pattern matching - can be extended with NLP
    pattern_lower = pattern.lower()

    if "projects involving" in pattern_lower:
        # Extract entity names from pattern
        return self._query_projects_involving(pattern_lower)
    elif "who works on" in pattern_lower:
        return self._query_who_works_on(pattern_lower)
    elif "relations of" in pattern_lower:
        entity = kwargs.get("entity")
        if entity:
            return self.get_relations(entity)

    # Default: search by name
    results = self.find_entities(name_contains=pattern, limit=kwargs.get("limit", 10))
    return {
        "query_type": "search_fallback",
        "pattern": pattern,
        "results": results
    }

def _query_projects_involving(self, pattern: str) -> Dict[str, Any]:
    """Query: 'projects involving X and Y'"""
    # Extract entity IDs from pattern (simplified)
    # TODO: proper NLP entity extraction
    with sqlite3.connect(self.db_path) as conn:
        conn.row_factory = sqlite3.Row
        rows = conn.execute(
            """SELECT DISTINCT e.* FROM ontology_entities e
               JOIN ontology_relations r ON e.id = r.to_id OR e.id = r.from_id
               WHERE e.type = 'project' AND r.relation_type = 'works_on'
               LIMIT 20"""
        ).fetchall()
        return {
            "query_type": "projects_involving",
            "pattern": pattern,
            "results": [dict(row) for row in rows]
        }

def _query_who_works_on(self, pattern: str) -> Dict[str, Any]:
    """Query: 'who works on X'"""
    with sqlite3.connect(self.db_path) as conn:
        conn.row_factory = sqlite3.Row
        rows = conn.execute(
            """SELECT DISTINCT e.* FROM ontology_entities e
               JOIN ontology_relations r ON e.id = r.from_id
               WHERE e.type IN ('person', 'agent') AND r.relation_type = 'works_on'
               LIMIT 20"""
        ).fetchall()
        return {
            "query_type": "who_works_on",
            "pattern": pattern,
            "results": [dict(row) for row in rows]
        }

# -------------------------------------------------------------------------
# Export
# -------------------------------------------------------------------------

def export_to_obsidian(self, vault_path: Path) -> Dict[str, Any]:
    """Export entities as Obsidian markdown with graph links."""
    vault_path = Path(vault_path)
    vault_path.mkdir(parents=True, exist_ok=True)

    entities = self.find_entities(limit=1000)
    files_created = 0

    for entity in entities:
        # Create markdown file
        filename = f"{entity['id'].replace(':', '_')}.md"
        filepath = vault_path / filename

        # Build content with Obsidian links
        content = f"# {entity['name']}\n\n"
        content += f"**Type:** {entity['type']}\n\n"
        content += f"**ID:** `{entity['id']}`\n\n"

        props = json.loads(entity['properties'])
        if props:
            content += "## Properties\n\n"
            for k, v in props.items():
                content += f"- **{k}:** {v}\n"
            content += "\n"

        # Add relations as links
        relations = self.get_relations(entity['id'])
        if relations:
            content += "## Relations\n\n"
            for r in relations:
                if r['direction'] == 'outgoing':
                    content += f"- → [[{r['to_id'].replace(':', '_')}|{r['to_name']}]] ({r['relation_type']})\n"
                else:
                    content += f"- ← [[{r['from_id'].replace(':', '_')}|{r['from_name']}]] ({r['relation_type']})\n"
            content += "\n"

        # Add observations
        observations = self.get_observations(entity['id'], limit=5)
        if observations:
            content += "## Recent Observations\n\n"
            for obs in observations:
                content += f"- {obs['observation']} ({obs['observed_at']})\n"

        filepath.write_text(content)
        files_created += 1

    return {"files_created": files_created, "vault_path": str(vault_path)}

def get_stats(self) -> Dict[str, Any]:
    """Get ontology statistics."""
    with sqlite3.connect(self.db_path) as conn:
        entities = conn.execute("SELECT COUNT(*) FROM ontology_entities").fetchone()[0]
        relations = conn.execute("SELECT COUNT(*) FROM ontology_relations").fetchone()[0]
        observations = conn.execute("SELECT COUNT(*) FROM ontology_observations").fetchone()[0]

        by_type = conn.execute(
            """SELECT type, COUNT(*) as count 
               FROM ontology_entities GROUP BY type"""
        ).fetchall()

    return {
        "entities": entities,
        "relations": relations,
        "observations": observations,
        "by_type": {row[0]: row[1] for row in by_type}
    }

Convenience singleton

_engine: Optional[OntologyEngine] = None

def get_engine() -> OntologyEngine:
"""Get or create singleton engine."""
global _engine
if _engine is None:
_engine = OntologyEngine()
return _engine

CLI entry point

if name == "main":
import sys

engine = OntologyEngine()

if len(sys.argv) < 2:
    print("Usage: python -m ontology.engine [stats|query|export]")
    sys.exit(1)

cmd = sys.argv[1]

if cmd == "stats":
    print(json.dumps(engine.get_stats(), indent=2))
elif cmd == "query" and len(sys.argv) > 2:
    results = engine.query(sys.argv[2])
    print(json.dumps(results, indent=2, default=str))
elif cmd == "export" and len(sys.argv) > 2:
    result = engine.export_to_obsidian(Path(sys.argv[2]))
    print(json.dumps(result, indent=2))
else:
    print(f"Unknown command: {cmd}")