"""Ontology Engine - Clean-room knowledge graph implementation. Zero dependencies beyond SQLite. Natural language query interface. Export to Obsidian for visualization. """ import json import sqlite3 from pathlib import Path from typing import Optional, List, Dict, Any from datetime import datetime DEFAULT_DB_PATH = Path.home() / ".openclaw" / "data" / "ontology.db" class OntologyEngine: """SQLite-based knowledge graph engine.""" def __init__(self, db_path: Path = DEFAULT_DB_PATH): self.db_path = db_path self.db_path.parent.mkdir(parents=True, exist_ok=True) self._init_schema() def _init_schema(self): """Initialize database schema from SQL file.""" schema_path = Path(__file__).parent.parent.parent / "shared" / "ontology" / "schema.sql" with sqlite3.connect(self.db_path) as conn: with open(schema_path) as f: conn.executescript(f.read()) # ------------------------------------------------------------------------- # Entity CRUD # ------------------------------------------------------------------------- def add_entity( self, id: str, type: str, name: str, properties: Optional[Dict] = None, source: str = "manual", confidence: float = 1.0 ) -> Dict[str, Any]: """Add or update an entity.""" with sqlite3.connect(self.db_path) as conn: conn.execute( """INSERT OR REPLACE INTO ontology_entities (id, type, name, properties, source, confidence, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", (id, type, name, json.dumps(properties or {}), source, confidence, datetime.now()) ) return {"id": id, "type": type, "name": name, "status": "added"} def get_entity(self, id: str) -> Optional[Dict[str, Any]]: """Retrieve entity by ID.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row row = conn.execute( "SELECT * FROM ontology_entities WHERE id = ?", (id,) ).fetchone() if row: return dict(row) return None def find_entities( self, type: Optional[str] = None, name_contains: Optional[str] = None, limit: int = 20 ) -> List[Dict[str, Any]]: """Search entities by type or name.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row query = "SELECT * FROM ontology_entities WHERE 1=1" params = [] if type: query += " AND type = ?" params.append(type) if name_contains: query += " AND name LIKE ?" params.append(f"%{name_contains}%") query += " LIMIT ?" params.append(limit) rows = conn.execute(query, params).fetchall() return [dict(row) for row in rows] # ------------------------------------------------------------------------- # Relation CRUD # ------------------------------------------------------------------------- def add_relation( self, from_id: str, to_id: str, relation_type: str, properties: Optional[Dict] = None, confidence: float = 1.0 ) -> Dict[str, Any]: """Add a directed relation between entities.""" with sqlite3.connect(self.db_path) as conn: conn.execute( """INSERT OR REPLACE INTO ontology_relations (from_id, to_id, relation_type, properties, confidence, created_at) VALUES (?, ?, ?, ?, ?, ?)""", (from_id, to_id, relation_type, json.dumps(properties or {}), confidence, datetime.now()) ) return {"from": from_id, "to": to_id, "relation": relation_type, "status": "added"} def get_relations( self, entity_id: str, direction: str = "both" # "from", "to", "both" ) -> List[Dict[str, Any]]: """Get all relations for an entity.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row results = [] if direction in ("from", "both"): rows = conn.execute( """SELECT r.*, e.name as to_name FROM ontology_relations r JOIN ontology_entities e ON r.to_id = e.id WHERE r.from_id = ?""", (entity_id,) ).fetchall() results.extend([{**dict(row), "direction": "outgoing"} for row in rows]) if direction in ("to", "both"): rows = conn.execute( """SELECT r.*, e.name as from_name FROM ontology_relations r JOIN ontology_entities e ON r.from_id = e.id WHERE r.to_id = ?""", (entity_id,) ).fetchall() results.extend([{**dict(row), "direction": "incoming"} for row in rows]) return results # ------------------------------------------------------------------------- # Observation tracking # ------------------------------------------------------------------------- def add_observation( self, entity_id: str, observation: str, source: str = "inferred", confidence: float = 0.8 ) -> Dict[str, Any]: """Add a temporal observation about an entity.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( """INSERT INTO ontology_observations (entity_id, observation, source, confidence, observed_at) VALUES (?, ?, ?, ?, ?)""", (entity_id, observation, source, confidence, datetime.now()) ) return {"id": cursor.lastrowid, "entity": entity_id, "observation": observation} def get_observations(self, entity_id: str, limit: int = 10) -> List[Dict[str, Any]]: """Get recent observations about an entity.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row rows = conn.execute( """SELECT * FROM ontology_observations WHERE entity_id = ? ORDER BY observed_at DESC LIMIT ?""", (entity_id, limit) ).fetchall() return [dict(row) for row in rows] # ------------------------------------------------------------------------- # Query interface # ------------------------------------------------------------------------- def query(self, pattern: str, **kwargs) -> Dict[str, Any]: """Pattern-based query interface.""" # Simple pattern matching - can be extended with NLP pattern_lower = pattern.lower() if "projects involving" in pattern_lower: # Extract entity names from pattern return self._query_projects_involving(pattern_lower) elif "who works on" in pattern_lower: return self._query_who_works_on(pattern_lower) elif "relations of" in pattern_lower: entity = kwargs.get("entity") if entity: return self.get_relations(entity) # Default: search by name results = self.find_entities(name_contains=pattern, limit=kwargs.get("limit", 10)) return { "query_type": "search_fallback", "pattern": pattern, "results": results } def _query_projects_involving(self, pattern: str) -> Dict[str, Any]: """Query: 'projects involving X and Y'""" # Extract entity IDs from pattern (simplified) # TODO: proper NLP entity extraction with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row rows = conn.execute( """SELECT DISTINCT e.* FROM ontology_entities e JOIN ontology_relations r ON e.id = r.to_id OR e.id = r.from_id WHERE e.type = 'project' AND r.relation_type = 'works_on' LIMIT 20""" ).fetchall() return { "query_type": "projects_involving", "pattern": pattern, "results": [dict(row) for row in rows] } def _query_who_works_on(self, pattern: str) -> Dict[str, Any]: """Query: 'who works on X'""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row rows = conn.execute( """SELECT DISTINCT e.* FROM ontology_entities e JOIN ontology_relations r ON e.id = r.from_id WHERE e.type IN ('person', 'agent') AND r.relation_type = 'works_on' LIMIT 20""" ).fetchall() return { "query_type": "who_works_on", "pattern": pattern, "results": [dict(row) for row in rows] } # ------------------------------------------------------------------------- # Export # ------------------------------------------------------------------------- def export_to_obsidian(self, vault_path: Path) -> Dict[str, Any]: """Export entities as Obsidian markdown with graph links.""" vault_path = Path(vault_path) vault_path.mkdir(parents=True, exist_ok=True) entities = self.find_entities(limit=1000) files_created = 0 for entity in entities: # Create markdown file filename = f"{entity['id'].replace(':', '_')}.md" filepath = vault_path / filename # Build content with Obsidian links content = f"# {entity['name']}\n\n" content += f"**Type:** {entity['type']}\n\n" content += f"**ID:** `{entity['id']}`\n\n" props = json.loads(entity['properties']) if props: content += "## Properties\n\n" for k, v in props.items(): content += f"- **{k}:** {v}\n" content += "\n" # Add relations as links relations = self.get_relations(entity['id']) if relations: content += "## Relations\n\n" for r in relations: if r['direction'] == 'outgoing': content += f"- → [[{r['to_id'].replace(':', '_')}|{r['to_name']}]] ({r['relation_type']})\n" else: content += f"- ← [[{r['from_id'].replace(':', '_')}|{r['from_name']}]] ({r['relation_type']})\n" content += "\n" # Add observations observations = self.get_observations(entity['id'], limit=5) if observations: content += "## Recent Observations\n\n" for obs in observations: content += f"- {obs['observation']} ({obs['observed_at']})\n" filepath.write_text(content) files_created += 1 return {"files_created": files_created, "vault_path": str(vault_path)} def get_stats(self) -> Dict[str, Any]: """Get ontology statistics.""" with sqlite3.connect(self.db_path) as conn: entities = conn.execute("SELECT COUNT(*) FROM ontology_entities").fetchone()[0] relations = conn.execute("SELECT COUNT(*) FROM ontology_relations").fetchone()[0] observations = conn.execute("SELECT COUNT(*) FROM ontology_observations").fetchone()[0] by_type = conn.execute( """SELECT type, COUNT(*) as count FROM ontology_entities GROUP BY type""" ).fetchall() return { "entities": entities, "relations": relations, "observations": observations, "by_type": {row[0]: row[1] for row in by_type} } # Convenience singleton _engine: Optional[OntologyEngine] = None def get_engine() -> OntologyEngine: """Get or create singleton engine.""" global _engine if _engine is None: _engine = OntologyEngine() return _engine # CLI entry point if __name__ == "__main__": import sys engine = OntologyEngine() if len(sys.argv) < 2: print("Usage: python -m ontology.engine [stats|query|export]") sys.exit(1) cmd = sys.argv[1] if cmd == "stats": print(json.dumps(engine.get_stats(), indent=2)) elif cmd == "query" and len(sys.argv) > 2: results = engine.query(sys.argv[2]) print(json.dumps(results, indent=2, default=str)) elif cmd == "export" and len(sys.argv) > 2: result = engine.export_to_obsidian(Path(sys.argv[2])) print(json.dumps(result, indent=2)) else: print(f"Unknown command: {cmd}")