"""Icarus Staging API — FastAPI entry point."""
import os
from pathlib import Path
from fastapi import FastAPI, File, UploadFile, HTTPException, Request
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.templating import Jinja2Templates
from pathlib import Path
import os
Verify staging environment
assert os.environ.get("ICARUS_ENV") == "staging", \
"ICARUS_ENV must be set to 'staging'"
app = FastAPI(title="Icarus", version="0.0.1")
Static files
from fastapi.staticfiles import StaticFiles
static_dir = Path(file).parent.parent / "static"
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
Templates
from jinja2 import Environment, FileSystemLoader
templates_dir = Path(file).parent.parent / "templates"
jinja_env = Environment(loader=FileSystemLoader(str(templates_dir)))
Custom filter for time formatting
from datetime import datetime
def format_time_filter(value):
if isinstance(value, str):
dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
else:
dt = value
now = datetime.now(dt.tzinfo)
diff = now - dt
if diff.days == 0:
if diff.seconds < 3600:
return f"{diff.seconds // 60}m ago"
return f"{diff.seconds // 3600}h ago"
if diff.days == 1:
return "Yesterday"
return dt.strftime("%b %d")
jinja_env.filters['format_time'] = format_time_filter
@app.get("/health")
async def health():
return {
"status": "ok",
"env": "icarus-staging",
"version": "0.0.1"
}
=============================================================================
Vision Pipeline Endpoints
=============================================================================
@app.post("/vision/parse")
async def vision_parse(file: UploadFile = File(...)):
"""Upload a document and get parsed text."""
from icarus.core.vision.parser import parse_document_bytes
content = await file.read()
filename = file.filename
try:
result = await parse_document_bytes(content, filename)
return JSONResponse(content=result)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Processing failed: {e}")
@app.post("/vision/briefing")
async def vision_briefing(file: UploadFile = File(...)):
"""Upload a document and get a full briefing card."""
from icarus.core.vision.pipeline import process_attachment
content = await file.read()
filename = file.filename
email_meta = {
"from": "upload@icarus.local",
"subject": filename,
"date": "now",
"to": "family",
"body": ""
}
try:
result = await process_attachment(email_meta, content, filename)
return JSONResponse(content=result)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Briefing generation failed: {e}")
@app.get("/vision/status")
async def vision_status():
"""Check vision model availability."""
import httpx
from icarus.core.config.staging import OLLAMA_BASE_URL
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{OLLAMA_BASE_URL}/api/tags", timeout=10.0)
models = response.json().get("models", [])
# Check for vision models
vision_models = [m for m in models if any(v in m["name"] for v in ["vl", "llava", "vision"])]
vision_ready = len(vision_models) > 0
return {
"ollama_ready": True,
"vision_model_ready": vision_ready,
"vision_models": [m["name"] for m in vision_models],
"all_models": [m["name"] for m in models],
"ollama_url": OLLAMA_BASE_URL
}
except Exception as e:
return {
"ollama_ready": False,
"error": str(e),
"ollama_url": OLLAMA_BASE_URL
}
=============================================================================
Telegram Bot — Long Polling (Sovereign Stack)
=============================================================================
from icarus.core.telegram.polling import start_polling
from icarus.core.db.grocery_list import init_db as init_grocery_db
from icarus.core.db.documents import init_db as init_documents_db
from icarus.core.db.event_graph import init_db as init_event_graph_db
@app.on_event("startup")
async def startup_event():
"""Start long-polling and initialize DBs on FastAPI startup."""
init_grocery_db()
init_documents_db()
init_event_graph_db()
await start_polling()
=============================================================================
System State View (Phase 5)
=============================================================================
@app.get("/system/state")
async def system_state(request: Request, format: str = None):
"""System state dashboard — read-only visibility into Icarus decisions."""
from icarus.core.family_loader import get_family_config
from icarus.core.memory.engine import stats as memory_stats
# Load family configuration
family_config = get_family_config()
# Build response data
members = []
for member in family_config.members:
members.append({
"id": member.get("id"),
"name": member.get("name"),
"nickname": member.get("nickname", "no nickname"),
"current_grade": member.get("current_grade", "N/A"),
"school": member.get("school", ""),
"teacher": member.get("teacher", ""),
"calendar_color": member.get("calendar_color", "blue"),
"stats": {
"documents_routed": 0, # TODO: Wire to actual telemetry
"avg_confidence": 0.9
}
})
# Build rules
rules = []
for rule in family_config.inference_rules:
rules.append({
"id": rule.get("id"),
"description": rule.get("description", ""),
"pattern": rule.get("pattern", ""),
"assign_to": rule.get("assign_to", []),
"confidence": rule.get("confidence", 0.5),
"status": "active",
"last_triggered": "2026-04-26T14:30:00Z", # TODO: Wire to actual telemetry
"trigger_count": 0
})
# Telemetry — powered by Event Graph
from icarus.core.db.event_graph import get_stats as get_event_graph_stats
try:
eg_stats = get_event_graph_stats()
except Exception:
eg_stats = {"total": 0, "by_type": {}, "coordination_open": 0, "blocking": 0}
telemetry = {
"total_documents": eg_stats["total"],
"auto_routed": eg_stats["by_type"].get("calendar_event", 0),
"user_asked": eg_stats["by_type"].get("coordination", 0),
"errors": 0,
"avg_processing_time_ms": 0,
"confidence_distribution": {
"high": 0,
"medium": 0,
"low": 0
},
"event_graph": eg_stats
}
# Routing decisions — from Event Graph
from icarus.core.db.event_graph import get_recent_events
try:
routing_decisions = get_recent_events(days=30, limit=50)
except Exception:
routing_decisions = []
data = {
"family": {
"family_id": family_config.family_id,
"member_count": len(family_config.members),
"rule_count": len(family_config.inference_rules)
},
"members": members,
"rules": rules,
"routing_decisions": routing_decisions,
"telemetry": telemetry
}
# Return JSON if requested
if format == "json":
return JSONResponse(content=data)
# Return HTML with HTMX polling
template = jinja_env.get_template("system-state.html.j2")
html_content = template.render(
request=request,
**data
)
return HTMLResponse(content=html_content)
=============================================================================
Event Graph API Endpoints
=============================================================================
@app.get("/api/event_graph/recent")
async def get_recent_events(days: int = 7, limit: int = 50):
"""Return recent Event Graph entries for dashboard display."""
from icarus.core.db.event_graph import get_recent_events
events = get_recent_events(days=days, limit=limit)
return JSONResponse(content=events)
@app.get("/api/event_graph/coordination")
async def get_coordination_items():
"""Return coordination items that need confirmation (for HBM)."""
from icarus.core.db.event_graph import get_coordination_items
items = get_coordination_items()
return JSONResponse(content=items)
@app.get("/api/event_graph/stats")
async def get_event_graph_stats():
"""Return aggregate Event Graph statistics."""
from icarus.core.db.event_graph import get_stats
stats = get_stats()
return JSONResponse(content=stats)
=============================================================================
Brain Intelligence Endpoints
=============================================================================
@app.get("/brain/query")
async def brain_query(q: str, top_k: int = 3, source: str = "all"):
"""Query the Family Brain with natural language.
Args:
q: Natural language question
top_k: Number of chunks to retrieve (default 3)
source: Filter by source type — "email", "newsletter", or "all"
Returns:
JSON with answer, sources (with attribution), and confidence
"""
import time
from icarus.core.family_brain import answer
start_time = time.time()
# Build where filter if source specified
where_filter = None
if source in ["email", "newsletter"]:
where_filter = {"source": source}
try:
result = answer(question=q, top_k=top_k, where=where_filter)
# Format sources with attribution
formatted_sources = []
for src in result.get("sources", []):
subject = src.get("subject", "Unknown")
date = src.get("date", "Unknown date")
formatted_sources.append({
"attribution": f"Found in: {subject}, {date}",
"subject": subject,
"date": date,
"score": src.get("score", 0),
"source_type": src.get("source", "unknown")
})
query_time_ms = int((time.time() - start_time) * 1000)
return JSONResponse(content={
"answer": result.get("answer", "I don't see that in the emails I've processed."),
"sources": formatted_sources,
"confidence": result.get("confidence", "low"),
"query_time_ms": query_time_ms
})
except Exception as e:
query_time_ms = int((time.time() - start_time) * 1000)
return JSONResponse(
status_code=503,
content={
"answer": "Brain service temporarily unavailable.",
"sources": [],
"confidence": "low",
"error": str(e),
"query_time_ms": query_time_ms
}
)
@app.get("/brain/stats")
async def brain_stats():
"""Get Family Brain statistics."""
from icarus.core.family_brain import stats as brain_stats_func
try:
stats_data = brain_stats_func()
return JSONResponse(content={
"total_documents": stats_data.get("count", 0),
"collection": stats_data.get("collection", "family_knowledge"),
"db_path": str(stats_data.get("db_path", "unknown"))
})
except Exception as e:
return JSONResponse(
status_code=503,
content={
"total_documents": 0,
"collection": "family_knowledge",
"error": str(e)
}
)
@app.post("/brain/ingest")
async def brain_ingest(payload: dict):
"""Ingest a Telegram extraction into the Family Brain.
Accepts an extraction event from the shadow bot (or any consumer)
and writes it to the ChromaDB family_knowledge collection with
full provenance metadata.
Expected payload matches what shadow_bot.py sends after processing.
"""
import time
from icarus.core.family_brain import ingest_telegram
start_time = time.time()
# Validate required fields
required = ["extraction_id", "message_id", "message_text", "extraction_type", "extracted_what"]
missing = [k for k in required if k not in payload]
if missing:
return JSONResponse(
status_code=422,
content={"error": f"Missing required fields: {', '.join(missing)}"}
)
try:
result = ingest_telegram(
extraction_id=payload["extraction_id"],
message_id=str(payload["message_id"]),
sender_name=payload.get("sender_name", "Unknown"),
sender_username=payload.get("sender_username", ""),
chat_title=payload.get("chat_title", "Family Logistics"),
message_text=payload["message_text"],
extraction_type=payload["extraction_type"],
extracted_who=payload.get("extracted_who", []),
extracted_what=payload["extracted_what"],
extracted_when=payload.get("extracted_when", ""),
extracted_where=payload.get("extracted_where", ""),
confidence=payload.get("confidence", 0.5),
needs_confirmation=payload.get("needs_confirmation", True),
calendar_check_status=payload.get("calendar_check_status", "pending"),
calendar_event_id=payload.get("calendar_event_id"),
calendar_event_title=payload.get("calendar_event_title"),
fuzzy_match_score=payload.get("fuzzy_match_score"),
)
ingest_time_ms = int((time.time() - start_time) * 1000)
return JSONResponse(content={
"status": result["status"],
"ids": result["ids"],
"chunks": result["chunks"],
"ingest_time_ms": ingest_time_ms,
})
except Exception as e:
return JSONResponse(
status_code=500,
content={
"status": "error",
"error": str(e),
}
)
@app.get("/api")
async def api_root():
"""JSON landing page with available endpoints."""
return JSONResponse(content={
"service": "Icarus Brain Intelligence (Staging)",
"version": "0.0.1",
"endpoints": {
"health": "/health",
"brain_query": "/brain/query?q=your+question",
"brain_stats": "/brain/stats",
"brain_ingest": "POST /brain/ingest",
"vision_parse": "/vision/parse",
"vision_briefing": "/vision/briefing",
"system_state": "/system/state",
"event_graph": "/api/event_graph/recent"
}
})