"""API Router for Brain Query Interface.""" import logging from typing import List, Dict, Optional from datetime import datetime from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field from brain.query import ( QueryPattern, detect_query_pattern, calculate_confidence, format_answer ) from brain.embeddings import get_embedding from brain.store import search_documents, store_document, get_stats, get_collection logger = logging.getLogger(__name__) # Create router brain_router = APIRouter(prefix="/api/brain", tags=["brain"]) # ============================================================================= # Request/Response Models # ============================================================================= class QueryRequest(BaseModel): """Request model for brain query.""" query: str = Field(..., description="Natural language query") pattern: str = Field("auto", description="auto | logistical | entity | historical") n_results: int = Field(5, ge=1, le=20, description="Number of results to return") class SourceInfo(BaseModel): """Source document information.""" doc_id: str type: str date: Optional[str] text: str class QueryResponse(BaseModel): """Response model for brain query.""" answer: str confidence: float confidence_message: str sources: List[SourceInfo] pattern_used: str class StatsResponse(BaseModel): """Response model for brain stats.""" total_chunks: int collection_name: str persist_directory: str class HealthResponse(BaseModel): """Health check response.""" status: str chromadb_available: bool collection_ready: bool # ============================================================================= # Endpoints # ============================================================================= @brain_router.post("/query", response_model=QueryResponse) async def brain_query(request: QueryRequest): """Process natural language query against family documents. Args: request: QueryRequest with query text and optional pattern override Returns: QueryResponse with answer, confidence, and sources """ logger.info(f"Brain query: '{request.query}' (pattern={request.pattern})") # 1. Detect pattern if request.pattern == "auto": pattern = detect_query_pattern(request.query) else: try: pattern = QueryPattern(request.pattern) except ValueError: logger.warning(f"Invalid pattern '{request.pattern}', using ENTITY") pattern = QueryPattern.ENTITY logger.info(f"Using pattern: {pattern.value}") # 2. Embed query query_embedding = get_embedding(request.query) # Check if embedding failed (all zeros) if all(v == 0.0 for v in query_embedding): logger.error("Failed to get embedding for query") raise HTTPException(status_code=503, detail="Embedding service unavailable") # 3. Search documents results = search_documents(query_embedding, pattern, n_results=request.n_results) # 4. Synthesize answer if not results: return QueryResponse( answer="I couldn't find any relevant documents for that question.", confidence=0.0, confidence_message="No relevant documents found", sources=[], pattern_used=pattern.value ) # Get top result for the answer top = results[0] answer = format_answer(top, pattern) # Calculate confidence confidence, confidence_message = calculate_confidence( top["combined_score"], pattern ) # Build sources list sources = [] for r in results: meta = r.get("metadata", {}) sources.append(SourceInfo( doc_id=meta.get("doc_id", "unknown"), type=meta.get("doc_type", "unknown"), date=meta.get("source_date"), text=r.get("text", "")[:500] # Truncate for response )) return QueryResponse( answer=answer, confidence=confidence, confidence_message=confidence_message, sources=sources, pattern_used=pattern.value ) @brain_router.get("/stats", response_model=StatsResponse) async def brain_stats(): """Get Brain statistics. Returns: StatsResponse with collection stats """ stats = get_stats() if "error" in stats: raise HTTPException(status_code=500, detail=stats["error"]) return StatsResponse( total_chunks=stats.get("total_chunks", 0), collection_name=stats.get("collection_name", "family_documents"), persist_directory=stats.get("persist_directory", "./data/chroma_db") ) @brain_router.get("/health") async def brain_health(): """Health check for Brain service. Returns: HealthResponse with service status """ try: coll = get_collection() coll.count() # Test connection return HealthResponse( status="healthy", chromadb_available=True, collection_ready=True ) except Exception as e: logger.error(f"Health check failed: {e}") return HealthResponse( status="unhealthy", chromadb_available=False, collection_ready=False ) @brain_router.get("/patterns") async def list_patterns(): """List available query patterns and their descriptions. Returns: Dict of pattern names to descriptions """ return { "logistical": { "description": "Time-sensitive queries (schedules, current events)", "examples": ["Is Friday a half-day?", "When is spring break?"], "decay": "exponential", "half_life_days": 7 }, "entity": { "description": "Fact lookup (sizes, dates, specific values)", "examples": ["What size is our HVAC filter?", "When was Buster's last vet visit?"], "decay": "linear", "half_life_days": 90 }, "historical": { "description": "Past events and historical records", "examples": ["What was the roofer's name?", "When did we replace the water heater?"], "decay": "none", "half_life_days": None } } # ============================================================================= # Admin endpoints for document management # ============================================================================= class DocumentRequest(BaseModel): """Request model for adding a document.""" doc_id: str text: str metadata: Dict = Field(default_factory=dict) class DocumentResponse(BaseModel): """Response model for document operations.""" success: bool message: str chunks_added: int = 0 @brain_router.post("/admin/documents", response_model=DocumentResponse) async def add_document(request: DocumentRequest): """Add a document to the brain (admin endpoint). Args: request: DocumentRequest with doc_id, text, and metadata Returns: DocumentResponse with success status """ from brain.embeddings import embed_document try: chunks = embed_document(request.doc_id, request.text, request.metadata) store_document(request.doc_id, chunks) return DocumentResponse( success=True, message=f"Document {request.doc_id} added successfully", chunks_added=len(chunks) ) except Exception as e: logger.error(f"Failed to add document: {e}") raise HTTPException(status_code=500, detail=str(e))