"""Brain — ChromaDB-based knowledge retrieval. Single source of truth for family knowledge. Stores documents (emails, extracted events, briefings) as embeddings for natural language query. Uses nomic-embed-text for embeddings (runs on Gaming PC or local Ollama). """ import json import logging import os from pathlib import Path from typing import Optional import chromadb from chromadb.config import Settings from icarus.config import get_data_dir logger = logging.getLogger(__name__) COLLECTION_NAME = "icarus_brain" # Default embedding model EMBEDDING_MODEL = os.environ.get("ICARUS_EMBED_MODEL", "nomic-embed-text") def _get_client() -> chromadb.ClientAPI: """Get ChromaDB client, persisting to data directory.""" data_dir = get_data_dir() data_dir.mkdir(parents=True, exist_ok=True) chroma_dir = data_dir / "chroma_db" chroma_dir.mkdir(exist_ok=True) return chromadb.PersistentClient( path=str(chroma_dir), settings=Settings(anonymized_telemetry=False), ) def _get_collection(): """Get or create the brain collection.""" client = _get_client() try: return client.get_collection(COLLECTION_NAME) except ValueError: return client.create_collection(COLLECTION_NAME) def _embed_fn(texts: list[str]) -> list[list[float]]: """Embed texts using Ollama. Falls back to chroma default.""" import httpx ollama_base = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434") try: resp = httpx.post( f"{ollama_base}/api/embed", json={"model": EMBEDDING_MODEL, "input": texts}, timeout=30, ) resp.raise_for_status() data = resp.json() return data.get("embeddings", []) except Exception as e: logger.warning(f"Embedding failed ({e}), using chroma default") # ChromaDB will use its built-in all-MiniLM-L6-v2 return [] # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def ingest( doc_id: str, text: str, metadata: Optional[dict] = None, collection: Optional[str] = None, ) -> bool: """Ingest a document into the brain for later query.""" try: col = _get_collection() if collection is None else _get_collection(collection) # Try embedding with Ollama embeddings = _embed_fn([text]) if embeddings: col.add( ids=[doc_id], embeddings=embeddings, documents=[text], metadatas=[metadata or {}], ) else: # Fallback: let ChromaDB handle embeddings col.add( ids=[doc_id], documents=[text], metadatas=[metadata or {}], ) return True except Exception as e: logger.error(f"Brain ingest failed: {e}") return False def query( question: str, n_results: int = 5, collection: Optional[str] = None, ) -> list[dict]: """Query the brain for relevant documents. Returns list of dicts with 'document', 'metadata', 'distance'. """ try: col = _get_collection() if collection is None else _get_collection(collection) results = col.query( query_texts=[question], n_results=n_results, ) docs = [] if results.get("ids") and results["ids"][0]: for i, doc_id in enumerate(results["ids"][0]): docs.append({ "id": doc_id, "document": results["documents"][0][i] if results.get("documents") else "", "metadata": results["metadatas"][0][i] if results.get("metadatas") else {}, "distance": results["distances"][0][i] if results.get("distances") else 0.0, }) return docs except Exception as e: logger.error(f"Brain query failed: {e}") return [] def get_stats() -> dict: """Get collection stats.""" try: col = _get_collection() count = col.count() return {"document_count": count, "collection": COLLECTION_NAME} except Exception as e: return {"document_count": 0, "error": str(e)}