"""Ontology Engine - Clean-room knowledge graph implementation.
Zero dependencies beyond SQLite. Natural language query interface.
Export to Obsidian for visualization.
"""
import json
import sqlite3
from pathlib import Path
from typing import Optional, List, Dict, Any
from datetime import datetime
DEFAULT_DB_PATH = Path.home() / ".openclaw" / "data" / "ontology.db"
class OntologyEngine:
"""SQLite-based knowledge graph engine."""
def __init__(self, db_path: Path = DEFAULT_DB_PATH):
self.db_path = db_path
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_schema()
def _init_schema(self):
"""Initialize database schema from SQL file."""
schema_path = Path(__file__).parent.parent.parent / "shared" / "ontology" / "schema.sql"
with sqlite3.connect(self.db_path) as conn:
with open(schema_path) as f:
conn.executescript(f.read())
# -------------------------------------------------------------------------
# Entity CRUD
# -------------------------------------------------------------------------
def add_entity(
self,
id: str,
type: str,
name: str,
properties: Optional[Dict] = None,
source: str = "manual",
confidence: float = 1.0
) -> Dict[str, Any]:
"""Add or update an entity."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO ontology_entities
(id, type, name, properties, source, confidence, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(id, type, name, json.dumps(properties or {}), source, confidence, datetime.now())
)
return {"id": id, "type": type, "name": name, "status": "added"}
def get_entity(self, id: str) -> Optional[Dict[str, Any]]:
"""Retrieve entity by ID."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM ontology_entities WHERE id = ?", (id,)
).fetchone()
if row:
return dict(row)
return None
def find_entities(
self,
type: Optional[str] = None,
name_contains: Optional[str] = None,
limit: int = 20
) -> List[Dict[str, Any]]:
"""Search entities by type or name."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
query = "SELECT * FROM ontology_entities WHERE 1=1"
params = []
if type:
query += " AND type = ?"
params.append(type)
if name_contains:
query += " AND name LIKE ?"
params.append(f"%{name_contains}%")
query += " LIMIT ?"
params.append(limit)
rows = conn.execute(query, params).fetchall()
return [dict(row) for row in rows]
# -------------------------------------------------------------------------
# Relation CRUD
# -------------------------------------------------------------------------
def add_relation(
self,
from_id: str,
to_id: str,
relation_type: str,
properties: Optional[Dict] = None,
confidence: float = 1.0
) -> Dict[str, Any]:
"""Add a directed relation between entities."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO ontology_relations
(from_id, to_id, relation_type, properties, confidence, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(from_id, to_id, relation_type, json.dumps(properties or {}), confidence, datetime.now())
)
return {"from": from_id, "to": to_id, "relation": relation_type, "status": "added"}
def get_relations(
self,
entity_id: str,
direction: str = "both" # "from", "to", "both"
) -> List[Dict[str, Any]]:
"""Get all relations for an entity."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
results = []
if direction in ("from", "both"):
rows = conn.execute(
"""SELECT r.*, e.name as to_name
FROM ontology_relations r
JOIN ontology_entities e ON r.to_id = e.id
WHERE r.from_id = ?""", (entity_id,)
).fetchall()
results.extend([{**dict(row), "direction": "outgoing"} for row in rows])
if direction in ("to", "both"):
rows = conn.execute(
"""SELECT r.*, e.name as from_name
FROM ontology_relations r
JOIN ontology_entities e ON r.from_id = e.id
WHERE r.to_id = ?""", (entity_id,)
).fetchall()
results.extend([{**dict(row), "direction": "incoming"} for row in rows])
return results
# -------------------------------------------------------------------------
# Observation tracking
# -------------------------------------------------------------------------
def add_observation(
self,
entity_id: str,
observation: str,
source: str = "inferred",
confidence: float = 0.8
) -> Dict[str, Any]:
"""Add a temporal observation about an entity."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""INSERT INTO ontology_observations
(entity_id, observation, source, confidence, observed_at)
VALUES (?, ?, ?, ?, ?)""",
(entity_id, observation, source, confidence, datetime.now())
)
return {"id": cursor.lastrowid, "entity": entity_id, "observation": observation}
def get_observations(self, entity_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent observations about an entity."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""SELECT * FROM ontology_observations
WHERE entity_id = ?
ORDER BY observed_at DESC
LIMIT ?""", (entity_id, limit)
).fetchall()
return [dict(row) for row in rows]
# -------------------------------------------------------------------------
# Query interface
# -------------------------------------------------------------------------
def query(self, pattern: str, **kwargs) -> Dict[str, Any]:
"""Pattern-based query interface."""
# Simple pattern matching - can be extended with NLP
pattern_lower = pattern.lower()
if "projects involving" in pattern_lower:
# Extract entity names from pattern
return self._query_projects_involving(pattern_lower)
elif "who works on" in pattern_lower:
return self._query_who_works_on(pattern_lower)
elif "relations of" in pattern_lower:
entity = kwargs.get("entity")
if entity:
return self.get_relations(entity)
# Default: search by name
results = self.find_entities(name_contains=pattern, limit=kwargs.get("limit", 10))
return {
"query_type": "search_fallback",
"pattern": pattern,
"results": results
}
def _query_projects_involving(self, pattern: str) -> Dict[str, Any]:
"""Query: 'projects involving X and Y'"""
# Extract entity IDs from pattern (simplified)
# TODO: proper NLP entity extraction
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""SELECT DISTINCT e.* FROM ontology_entities e
JOIN ontology_relations r ON e.id = r.to_id OR e.id = r.from_id
WHERE e.type = 'project' AND r.relation_type = 'works_on'
LIMIT 20"""
).fetchall()
return {
"query_type": "projects_involving",
"pattern": pattern,
"results": [dict(row) for row in rows]
}
def _query_who_works_on(self, pattern: str) -> Dict[str, Any]:
"""Query: 'who works on X'"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""SELECT DISTINCT e.* FROM ontology_entities e
JOIN ontology_relations r ON e.id = r.from_id
WHERE e.type IN ('person', 'agent') AND r.relation_type = 'works_on'
LIMIT 20"""
).fetchall()
return {
"query_type": "who_works_on",
"pattern": pattern,
"results": [dict(row) for row in rows]
}
# -------------------------------------------------------------------------
# Export
# -------------------------------------------------------------------------
def export_to_obsidian(self, vault_path: Path) -> Dict[str, Any]:
"""Export entities as Obsidian markdown with graph links."""
vault_path = Path(vault_path)
vault_path.mkdir(parents=True, exist_ok=True)
entities = self.find_entities(limit=1000)
files_created = 0
for entity in entities:
# Create markdown file
filename = f"{entity['id'].replace(':', '_')}.md"
filepath = vault_path / filename
# Build content with Obsidian links
content = f"# {entity['name']}\n\n"
content += f"**Type:** {entity['type']}\n\n"
content += f"**ID:** `{entity['id']}`\n\n"
props = json.loads(entity['properties'])
if props:
content += "## Properties\n\n"
for k, v in props.items():
content += f"- **{k}:** {v}\n"
content += "\n"
# Add relations as links
relations = self.get_relations(entity['id'])
if relations:
content += "## Relations\n\n"
for r in relations:
if r['direction'] == 'outgoing':
content += f"- → [[{r['to_id'].replace(':', '_')}|{r['to_name']}]] ({r['relation_type']})\n"
else:
content += f"- ← [[{r['from_id'].replace(':', '_')}|{r['from_name']}]] ({r['relation_type']})\n"
content += "\n"
# Add observations
observations = self.get_observations(entity['id'], limit=5)
if observations:
content += "## Recent Observations\n\n"
for obs in observations:
content += f"- {obs['observation']} ({obs['observed_at']})\n"
filepath.write_text(content)
files_created += 1
return {"files_created": files_created, "vault_path": str(vault_path)}
def get_stats(self) -> Dict[str, Any]:
"""Get ontology statistics."""
with sqlite3.connect(self.db_path) as conn:
entities = conn.execute("SELECT COUNT(*) FROM ontology_entities").fetchone()[0]
relations = conn.execute("SELECT COUNT(*) FROM ontology_relations").fetchone()[0]
observations = conn.execute("SELECT COUNT(*) FROM ontology_observations").fetchone()[0]
by_type = conn.execute(
"""SELECT type, COUNT(*) as count
FROM ontology_entities GROUP BY type"""
).fetchall()
return {
"entities": entities,
"relations": relations,
"observations": observations,
"by_type": {row[0]: row[1] for row in by_type}
}
Convenience singleton
_engine: Optional[OntologyEngine] = None
def get_engine() -> OntologyEngine:
"""Get or create singleton engine."""
global _engine
if _engine is None:
_engine = OntologyEngine()
return _engine
CLI entry point
if name == "main":
import sys
engine = OntologyEngine()
if len(sys.argv) < 2:
print("Usage: python -m ontology.engine [stats|query|export]")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "stats":
print(json.dumps(engine.get_stats(), indent=2))
elif cmd == "query" and len(sys.argv) > 2:
results = engine.query(sys.argv[2])
print(json.dumps(results, indent=2, default=str))
elif cmd == "export" and len(sys.argv) > 2:
result = engine.export_to_obsidian(Path(sys.argv[2]))
print(json.dumps(result, indent=2))
else:
print(f"Unknown command: {cmd}")