"""API Router for Brain Query Interface."""
import logging
from typing import List, Dict, Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from brain.query import (
QueryPattern,
detect_query_pattern,
calculate_confidence,
format_answer
)
from brain.embeddings import get_embedding
from brain.store import search_documents, store_document, get_stats, get_collection
logger = logging.getLogger(name)
Create router
brain_router = APIRouter(prefix="/api/brain", tags=["brain"])
=============================================================================
Request/Response Models
=============================================================================
class QueryRequest(BaseModel):
"""Request model for brain query."""
query: str = Field(..., description="Natural language query")
pattern: str = Field("auto", description="auto | logistical | entity | historical")
n_results: int = Field(5, ge=1, le=20, description="Number of results to return")
class SourceInfo(BaseModel):
"""Source document information."""
doc_id: str
type: str
date: Optional[str]
text: str
class QueryResponse(BaseModel):
"""Response model for brain query."""
answer: str
confidence: float
confidence_message: str
sources: List[SourceInfo]
pattern_used: str
class StatsResponse(BaseModel):
"""Response model for brain stats."""
total_chunks: int
collection_name: str
persist_directory: str
class HealthResponse(BaseModel):
"""Health check response."""
status: str
chromadb_available: bool
collection_ready: bool
=============================================================================
Endpoints
=============================================================================
@brain_router.post("/query", response_model=QueryResponse)
async def brain_query(request: QueryRequest):
"""Process natural language query against family documents.
Args:
request: QueryRequest with query text and optional pattern override
Returns:
QueryResponse with answer, confidence, and sources
"""
logger.info(f"Brain query: '{request.query}' (pattern={request.pattern})")
# 1. Detect pattern
if request.pattern == "auto":
pattern = detect_query_pattern(request.query)
else:
try:
pattern = QueryPattern(request.pattern)
except ValueError:
logger.warning(f"Invalid pattern '{request.pattern}', using ENTITY")
pattern = QueryPattern.ENTITY
logger.info(f"Using pattern: {pattern.value}")
# 2. Embed query
query_embedding = get_embedding(request.query)
# Check if embedding failed (all zeros)
if all(v == 0.0 for v in query_embedding):
logger.error("Failed to get embedding for query")
raise HTTPException(status_code=503, detail="Embedding service unavailable")
# 3. Search documents
results = search_documents(query_embedding, pattern, n_results=request.n_results)
# 4. Synthesize answer
if not results:
return QueryResponse(
answer="I couldn't find any relevant documents for that question.",
confidence=0.0,
confidence_message="No relevant documents found",
sources=[],
pattern_used=pattern.value
)
# Get top result for the answer
top = results[0]
answer = format_answer(top, pattern)
# Calculate confidence
confidence, confidence_message = calculate_confidence(
top["combined_score"],
pattern
)
# Build sources list
sources = []
for r in results:
meta = r.get("metadata", {})
sources.append(SourceInfo(
doc_id=meta.get("doc_id", "unknown"),
type=meta.get("doc_type", "unknown"),
date=meta.get("source_date"),
text=r.get("text", "")[:500] # Truncate for response
))
return QueryResponse(
answer=answer,
confidence=confidence,
confidence_message=confidence_message,
sources=sources,
pattern_used=pattern.value
)
@brain_router.get("/stats", response_model=StatsResponse)
async def brain_stats():
"""Get Brain statistics.
Returns:
StatsResponse with collection stats
"""
stats = get_stats()
if "error" in stats:
raise HTTPException(status_code=500, detail=stats["error"])
return StatsResponse(
total_chunks=stats.get("total_chunks", 0),
collection_name=stats.get("collection_name", "family_documents"),
persist_directory=stats.get("persist_directory", "./data/chroma_db")
)
@brain_router.get("/health")
async def brain_health():
"""Health check for Brain service.
Returns:
HealthResponse with service status
"""
try:
coll = get_collection()
coll.count() # Test connection
return HealthResponse(
status="healthy",
chromadb_available=True,
collection_ready=True
)
except Exception as e:
logger.error(f"Health check failed: {e}")
return HealthResponse(
status="unhealthy",
chromadb_available=False,
collection_ready=False
)
@brain_router.get("/patterns")
async def list_patterns():
"""List available query patterns and their descriptions.
Returns:
Dict of pattern names to descriptions
"""
return {
"logistical": {
"description": "Time-sensitive queries (schedules, current events)",
"examples": ["Is Friday a half-day?", "When is spring break?"],
"decay": "exponential",
"half_life_days": 7
},
"entity": {
"description": "Fact lookup (sizes, dates, specific values)",
"examples": ["What size is our HVAC filter?", "When was Buster's last vet visit?"],
"decay": "linear",
"half_life_days": 90
},
"historical": {
"description": "Past events and historical records",
"examples": ["What was the roofer's name?", "When did we replace the water heater?"],
"decay": "none",
"half_life_days": None
}
}
=============================================================================
Admin endpoints for document management
=============================================================================
class DocumentRequest(BaseModel):
"""Request model for adding a document."""
doc_id: str
text: str
metadata: Dict = Field(default_factory=dict)
class DocumentResponse(BaseModel):
"""Response model for document operations."""
success: bool
message: str
chunks_added: int = 0
@brain_router.post("/admin/documents", response_model=DocumentResponse)
async def add_document(request: DocumentRequest):
"""Add a document to the brain (admin endpoint).
Args:
request: DocumentRequest with doc_id, text, and metadata
Returns:
DocumentResponse with success status
"""
from brain.embeddings import embed_document
try:
chunks = embed_document(request.doc_id, request.text, request.metadata)
store_document(request.doc_id, chunks)
return DocumentResponse(
success=True,
message=f"Document {request.doc_id} added successfully",
chunks_added=len(chunks)
)
except Exception as e:
logger.error(f"Failed to add document: {e}")
raise HTTPException(status_code=500, detail=str(e))