"""Icarus Staging API — FastAPI entry point.""" import os from pathlib import Path from fastapi import FastAPI, File, UploadFile, HTTPException, Request from fastapi.responses import JSONResponse, HTMLResponse from fastapi.templating import Jinja2Templates from pathlib import Path import os # Verify staging environment assert os.environ.get("ICARUS_ENV") == "staging", \ "ICARUS_ENV must be set to 'staging'" app = FastAPI(title="Icarus", version="0.0.1") # Static files from fastapi.staticfiles import StaticFiles static_dir = Path(__file__).parent.parent / "static" app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") # Templates from jinja2 import Environment, FileSystemLoader templates_dir = Path(__file__).parent.parent / "templates" jinja_env = Environment(loader=FileSystemLoader(str(templates_dir))) # Custom filter for time formatting from datetime import datetime def format_time_filter(value): if isinstance(value, str): dt = datetime.fromisoformat(value.replace('Z', '+00:00')) else: dt = value now = datetime.now(dt.tzinfo) diff = now - dt if diff.days == 0: if diff.seconds < 3600: return f"{diff.seconds // 60}m ago" return f"{diff.seconds // 3600}h ago" if diff.days == 1: return "Yesterday" return dt.strftime("%b %d") jinja_env.filters['format_time'] = format_time_filter @app.get("/health") async def health(): return { "status": "ok", "env": "icarus-staging", "version": "0.0.1" } # ============================================================================= # Vision Pipeline Endpoints # ============================================================================= @app.post("/vision/parse") async def vision_parse(file: UploadFile = File(...)): """Upload a document and get parsed text.""" from icarus.core.vision.parser import parse_document_bytes content = await file.read() filename = file.filename try: result = await parse_document_bytes(content, filename) return JSONResponse(content=result) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=f"Processing failed: {e}") @app.post("/vision/briefing") async def vision_briefing(file: UploadFile = File(...)): """Upload a document and get a full briefing card.""" from icarus.core.vision.pipeline import process_attachment content = await file.read() filename = file.filename email_meta = { "from": "upload@icarus.local", "subject": filename, "date": "now", "to": "family", "body": "" } try: result = await process_attachment(email_meta, content, filename) return JSONResponse(content=result) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=f"Briefing generation failed: {e}") @app.get("/vision/status") async def vision_status(): """Check vision model availability.""" import httpx from icarus.core.config.staging import OLLAMA_BASE_URL try: async with httpx.AsyncClient() as client: response = await client.get(f"{OLLAMA_BASE_URL}/api/tags", timeout=10.0) models = response.json().get("models", []) # Check for vision models vision_models = [m for m in models if any(v in m["name"] for v in ["vl", "llava", "vision"])] vision_ready = len(vision_models) > 0 return { "ollama_ready": True, "vision_model_ready": vision_ready, "vision_models": [m["name"] for m in vision_models], "all_models": [m["name"] for m in models], "ollama_url": OLLAMA_BASE_URL } except Exception as e: return { "ollama_ready": False, "error": str(e), "ollama_url": OLLAMA_BASE_URL } # ============================================================================= # Telegram Bot — Long Polling (Sovereign Stack) # ============================================================================= from icarus.core.telegram.polling import start_polling from icarus.core.db.grocery_list import init_db as init_grocery_db from icarus.core.db.documents import init_db as init_documents_db from icarus.core.db.event_graph import init_db as init_event_graph_db @app.on_event("startup") async def startup_event(): """Start long-polling and initialize DBs on FastAPI startup.""" init_grocery_db() init_documents_db() init_event_graph_db() await start_polling() # ============================================================================= # System State View (Phase 5) # ============================================================================= @app.get("/system/state") async def system_state(request: Request, format: str = None): """System state dashboard — read-only visibility into Icarus decisions.""" from icarus.core.family_loader import get_family_config from icarus.core.memory.engine import stats as memory_stats # Load family configuration family_config = get_family_config() # Build response data members = [] for member in family_config.members: members.append({ "id": member.get("id"), "name": member.get("name"), "nickname": member.get("nickname", "no nickname"), "current_grade": member.get("current_grade", "N/A"), "school": member.get("school", ""), "teacher": member.get("teacher", ""), "calendar_color": member.get("calendar_color", "blue"), "stats": { "documents_routed": 0, # TODO: Wire to actual telemetry "avg_confidence": 0.9 } }) # Build rules rules = [] for rule in family_config.inference_rules: rules.append({ "id": rule.get("id"), "description": rule.get("description", ""), "pattern": rule.get("pattern", ""), "assign_to": rule.get("assign_to", []), "confidence": rule.get("confidence", 0.5), "status": "active", "last_triggered": "2026-04-26T14:30:00Z", # TODO: Wire to actual telemetry "trigger_count": 0 }) # Telemetry — powered by Event Graph from icarus.core.db.event_graph import get_stats as get_event_graph_stats try: eg_stats = get_event_graph_stats() except Exception: eg_stats = {"total": 0, "by_type": {}, "coordination_open": 0, "blocking": 0} telemetry = { "total_documents": eg_stats["total"], "auto_routed": eg_stats["by_type"].get("calendar_event", 0), "user_asked": eg_stats["by_type"].get("coordination", 0), "errors": 0, "avg_processing_time_ms": 0, "confidence_distribution": { "high": 0, "medium": 0, "low": 0 }, "event_graph": eg_stats } # Routing decisions — from Event Graph from icarus.core.db.event_graph import get_recent_events try: routing_decisions = get_recent_events(days=30, limit=50) except Exception: routing_decisions = [] data = { "family": { "family_id": family_config.family_id, "member_count": len(family_config.members), "rule_count": len(family_config.inference_rules) }, "members": members, "rules": rules, "routing_decisions": routing_decisions, "telemetry": telemetry } # Return JSON if requested if format == "json": return JSONResponse(content=data) # Return HTML with HTMX polling template = jinja_env.get_template("system-state.html.j2") html_content = template.render( request=request, **data ) return HTMLResponse(content=html_content) # ============================================================================= # Event Graph API Endpoints # ============================================================================= @app.get("/api/event_graph/recent") async def get_recent_events(days: int = 7, limit: int = 50): """Return recent Event Graph entries for dashboard display.""" from icarus.core.db.event_graph import get_recent_events events = get_recent_events(days=days, limit=limit) return JSONResponse(content=events) @app.get("/api/event_graph/coordination") async def get_coordination_items(): """Return coordination items that need confirmation (for HBM).""" from icarus.core.db.event_graph import get_coordination_items items = get_coordination_items() return JSONResponse(content=items) @app.get("/api/event_graph/stats") async def get_event_graph_stats(): """Return aggregate Event Graph statistics.""" from icarus.core.db.event_graph import get_stats stats = get_stats() return JSONResponse(content=stats) # ============================================================================= # Brain Intelligence Endpoints # ============================================================================= @app.get("/brain/query") async def brain_query(q: str, top_k: int = 3, source: str = "all"): """Query the Family Brain with natural language. Args: q: Natural language question top_k: Number of chunks to retrieve (default 3) source: Filter by source type — "email", "newsletter", or "all" Returns: JSON with answer, sources (with attribution), and confidence """ import time from icarus.core.family_brain import answer start_time = time.time() # Build where filter if source specified where_filter = None if source in ["email", "newsletter"]: where_filter = {"source": source} try: result = answer(question=q, top_k=top_k, where=where_filter) # Format sources with attribution formatted_sources = [] for src in result.get("sources", []): subject = src.get("subject", "Unknown") date = src.get("date", "Unknown date") formatted_sources.append({ "attribution": f"Found in: {subject}, {date}", "subject": subject, "date": date, "score": src.get("score", 0), "source_type": src.get("source", "unknown") }) query_time_ms = int((time.time() - start_time) * 1000) return JSONResponse(content={ "answer": result.get("answer", "I don't see that in the emails I've processed."), "sources": formatted_sources, "confidence": result.get("confidence", "low"), "query_time_ms": query_time_ms }) except Exception as e: query_time_ms = int((time.time() - start_time) * 1000) return JSONResponse( status_code=503, content={ "answer": "Brain service temporarily unavailable.", "sources": [], "confidence": "low", "error": str(e), "query_time_ms": query_time_ms } ) @app.get("/brain/stats") async def brain_stats(): """Get Family Brain statistics.""" from icarus.core.family_brain import stats as brain_stats_func try: stats_data = brain_stats_func() return JSONResponse(content={ "total_documents": stats_data.get("count", 0), "collection": stats_data.get("collection", "family_knowledge"), "db_path": str(stats_data.get("db_path", "unknown")) }) except Exception as e: return JSONResponse( status_code=503, content={ "total_documents": 0, "collection": "family_knowledge", "error": str(e) } ) @app.post("/brain/ingest") async def brain_ingest(payload: dict): """Ingest a Telegram extraction into the Family Brain. Accepts an extraction event from the shadow bot (or any consumer) and writes it to the ChromaDB family_knowledge collection with full provenance metadata. Expected payload matches what shadow_bot.py sends after processing. """ import time from icarus.core.family_brain import ingest_telegram start_time = time.time() # Validate required fields required = ["extraction_id", "message_id", "message_text", "extraction_type", "extracted_what"] missing = [k for k in required if k not in payload] if missing: return JSONResponse( status_code=422, content={"error": f"Missing required fields: {', '.join(missing)}"} ) try: result = ingest_telegram( extraction_id=payload["extraction_id"], message_id=str(payload["message_id"]), sender_name=payload.get("sender_name", "Unknown"), sender_username=payload.get("sender_username", ""), chat_title=payload.get("chat_title", "Family Logistics"), message_text=payload["message_text"], extraction_type=payload["extraction_type"], extracted_who=payload.get("extracted_who", []), extracted_what=payload["extracted_what"], extracted_when=payload.get("extracted_when", ""), extracted_where=payload.get("extracted_where", ""), confidence=payload.get("confidence", 0.5), needs_confirmation=payload.get("needs_confirmation", True), calendar_check_status=payload.get("calendar_check_status", "pending"), calendar_event_id=payload.get("calendar_event_id"), calendar_event_title=payload.get("calendar_event_title"), fuzzy_match_score=payload.get("fuzzy_match_score"), ) ingest_time_ms = int((time.time() - start_time) * 1000) return JSONResponse(content={ "status": result["status"], "ids": result["ids"], "chunks": result["chunks"], "ingest_time_ms": ingest_time_ms, }) except Exception as e: return JSONResponse( status_code=500, content={ "status": "error", "error": str(e), } ) @app.get("/api") async def api_root(): """JSON landing page with available endpoints.""" return JSONResponse(content={ "service": "Icarus Brain Intelligence (Staging)", "version": "0.0.1", "endpoints": { "health": "/health", "brain_query": "/brain/query?q=your+question", "brain_stats": "/brain/stats", "brain_ingest": "POST /brain/ingest", "vision_parse": "/vision/parse", "vision_briefing": "/vision/briefing", "system_state": "/system/state", "event_graph": "/api/event_graph/recent" } })