📄 router.py 7,777 bytes Apr 30, 2026 📋 Raw

"""API Router for Brain Query Interface."""

import logging
from typing import List, Dict, Optional
from datetime import datetime

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field

from brain.query import (
QueryPattern,
detect_query_pattern,
calculate_confidence,
format_answer
)
from brain.embeddings import get_embedding
from brain.store import search_documents, store_document, get_stats, get_collection

logger = logging.getLogger(name)

Create router

brain_router = APIRouter(prefix="/api/brain", tags=["brain"])

=============================================================================

Request/Response Models

=============================================================================

class QueryRequest(BaseModel):
"""Request model for brain query."""
query: str = Field(..., description="Natural language query")
pattern: str = Field("auto", description="auto | logistical | entity | historical")
n_results: int = Field(5, ge=1, le=20, description="Number of results to return")

class SourceInfo(BaseModel):
"""Source document information."""
doc_id: str
type: str
date: Optional[str]
text: str

class QueryResponse(BaseModel):
"""Response model for brain query."""
answer: str
confidence: float
confidence_message: str
sources: List[SourceInfo]
pattern_used: str

class StatsResponse(BaseModel):
"""Response model for brain stats."""
total_chunks: int
collection_name: str
persist_directory: str

class HealthResponse(BaseModel):
"""Health check response."""
status: str
chromadb_available: bool
collection_ready: bool

=============================================================================

Endpoints

=============================================================================

@brain_router.post("/query", response_model=QueryResponse)
async def brain_query(request: QueryRequest):
"""Process natural language query against family documents.

Args:
    request: QueryRequest with query text and optional pattern override

Returns:
    QueryResponse with answer, confidence, and sources
"""
logger.info(f"Brain query: '{request.query}' (pattern={request.pattern})")

# 1. Detect pattern
if request.pattern == "auto":
    pattern = detect_query_pattern(request.query)
else:
    try:
        pattern = QueryPattern(request.pattern)
    except ValueError:
        logger.warning(f"Invalid pattern '{request.pattern}', using ENTITY")
        pattern = QueryPattern.ENTITY

logger.info(f"Using pattern: {pattern.value}")

# 2. Embed query
query_embedding = get_embedding(request.query)

# Check if embedding failed (all zeros)
if all(v == 0.0 for v in query_embedding):
    logger.error("Failed to get embedding for query")
    raise HTTPException(status_code=503, detail="Embedding service unavailable")

# 3. Search documents
results = search_documents(query_embedding, pattern, n_results=request.n_results)

# 4. Synthesize answer
if not results:
    return QueryResponse(
        answer="I couldn't find any relevant documents for that question.",
        confidence=0.0,
        confidence_message="No relevant documents found",
        sources=[],
        pattern_used=pattern.value
    )

# Get top result for the answer
top = results[0]
answer = format_answer(top, pattern)

# Calculate confidence
confidence, confidence_message = calculate_confidence(
    top["combined_score"], 
    pattern
)

# Build sources list
sources = []
for r in results:
    meta = r.get("metadata", {})
    sources.append(SourceInfo(
        doc_id=meta.get("doc_id", "unknown"),
        type=meta.get("doc_type", "unknown"),
        date=meta.get("source_date"),
        text=r.get("text", "")[:500]  # Truncate for response
    ))

return QueryResponse(
    answer=answer,
    confidence=confidence,
    confidence_message=confidence_message,
    sources=sources,
    pattern_used=pattern.value
)

@brain_router.get("/stats", response_model=StatsResponse)
async def brain_stats():
"""Get Brain statistics.

Returns:
    StatsResponse with collection stats
"""
stats = get_stats()

if "error" in stats:
    raise HTTPException(status_code=500, detail=stats["error"])

return StatsResponse(
    total_chunks=stats.get("total_chunks", 0),
    collection_name=stats.get("collection_name", "family_documents"),
    persist_directory=stats.get("persist_directory", "./data/chroma_db")
)

@brain_router.get("/health")
async def brain_health():
"""Health check for Brain service.

Returns:
    HealthResponse with service status
"""
try:
    coll = get_collection()
    coll.count()  # Test connection
    return HealthResponse(
        status="healthy",
        chromadb_available=True,
        collection_ready=True
    )
except Exception as e:
    logger.error(f"Health check failed: {e}")
    return HealthResponse(
        status="unhealthy",
        chromadb_available=False,
        collection_ready=False
    )

@brain_router.get("/patterns")
async def list_patterns():
"""List available query patterns and their descriptions.

Returns:
    Dict of pattern names to descriptions
"""
return {
    "logistical": {
        "description": "Time-sensitive queries (schedules, current events)",
        "examples": ["Is Friday a half-day?", "When is spring break?"],
        "decay": "exponential",
        "half_life_days": 7
    },
    "entity": {
        "description": "Fact lookup (sizes, dates, specific values)",
        "examples": ["What size is our HVAC filter?", "When was Buster's last vet visit?"],
        "decay": "linear",
        "half_life_days": 90
    },
    "historical": {
        "description": "Past events and historical records",
        "examples": ["What was the roofer's name?", "When did we replace the water heater?"],
        "decay": "none",
        "half_life_days": None
    }
}

=============================================================================

Admin endpoints for document management

=============================================================================

class DocumentRequest(BaseModel):
"""Request model for adding a document."""
doc_id: str
text: str
metadata: Dict = Field(default_factory=dict)

class DocumentResponse(BaseModel):
"""Response model for document operations."""
success: bool
message: str
chunks_added: int = 0

@brain_router.post("/admin/documents", response_model=DocumentResponse)
async def add_document(request: DocumentRequest):
"""Add a document to the brain (admin endpoint).

Args:
    request: DocumentRequest with doc_id, text, and metadata

Returns:
    DocumentResponse with success status
"""
from brain.embeddings import embed_document

try:
    chunks = embed_document(request.doc_id, request.text, request.metadata)
    store_document(request.doc_id, chunks)

    return DocumentResponse(
        success=True,
        message=f"Document {request.doc_id} added successfully",
        chunks_added=len(chunks)
    )
except Exception as e:
    logger.error(f"Failed to add document: {e}")
    raise HTTPException(status_code=500, detail=str(e))