"""Brain — ChromaDB-based knowledge retrieval.
Single source of truth for family knowledge. Stores documents
(emails, extracted events, briefings) as embeddings for
natural language query.
Uses nomic-embed-text for embeddings (runs on Gaming PC or local Ollama).
"""
import json
import logging
import os
from pathlib import Path
from typing import Optional
import chromadb
from chromadb.config import Settings
from icarus.config import get_data_dir
logger = logging.getLogger(name)
COLLECTION_NAME = "icarus_brain"
Default embedding model
EMBEDDING_MODEL = os.environ.get("ICARUS_EMBED_MODEL", "nomic-embed-text")
def _get_client() -> chromadb.ClientAPI:
"""Get ChromaDB client, persisting to data directory."""
data_dir = get_data_dir()
data_dir.mkdir(parents=True, exist_ok=True)
chroma_dir = data_dir / "chroma_db"
chroma_dir.mkdir(exist_ok=True)
return chromadb.PersistentClient(
path=str(chroma_dir),
settings=Settings(anonymized_telemetry=False),
)
def _get_collection():
"""Get or create the brain collection."""
client = _get_client()
try:
return client.get_collection(COLLECTION_NAME)
except ValueError:
return client.create_collection(COLLECTION_NAME)
def _embed_fn(texts: list[str]) -> list[list[float]]:
"""Embed texts using Ollama. Falls back to chroma default."""
import httpx
ollama_base = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
try:
resp = httpx.post(
f"{ollama_base}/api/embed",
json={"model": EMBEDDING_MODEL, "input": texts},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
return data.get("embeddings", [])
except Exception as e:
logger.warning(f"Embedding failed ({e}), using chroma default")
# ChromaDB will use its built-in all-MiniLM-L6-v2
return []
---------------------------------------------------------------------------
Public API
---------------------------------------------------------------------------
def ingest(
doc_id: str,
text: str,
metadata: Optional[dict] = None,
collection: Optional[str] = None,
) -> bool:
"""Ingest a document into the brain for later query."""
try:
col = _get_collection() if collection is None else _get_collection(collection)
# Try embedding with Ollama
embeddings = _embed_fn([text])
if embeddings:
col.add(
ids=[doc_id],
embeddings=embeddings,
documents=[text],
metadatas=[metadata or {}],
)
else:
# Fallback: let ChromaDB handle embeddings
col.add(
ids=[doc_id],
documents=[text],
metadatas=[metadata or {}],
)
return True
except Exception as e:
logger.error(f"Brain ingest failed: {e}")
return False
def query(
question: str,
n_results: int = 5,
collection: Optional[str] = None,
) -> list[dict]:
"""Query the brain for relevant documents.
Returns list of dicts with 'document', 'metadata', 'distance'.
"""
try:
col = _get_collection() if collection is None else _get_collection(collection)
results = col.query(
query_texts=[question],
n_results=n_results,
)
docs = []
if results.get("ids") and results["ids"][0]:
for i, doc_id in enumerate(results["ids"][0]):
docs.append({
"id": doc_id,
"document": results["documents"][0][i] if results.get("documents") else "",
"metadata": results["metadatas"][0][i] if results.get("metadatas") else {},
"distance": results["distances"][0][i] if results.get("distances") else 0.0,
})
return docs
except Exception as e:
logger.error(f"Brain query failed: {e}")
return []
def get_stats() -> dict:
"""Get collection stats."""
try:
col = _get_collection()
count = col.count()
return {"document_count": count, "collection": COLLECTION_NAME}
except Exception as e:
return {"document_count": 0, "error": str(e)}