"""Family automation router - /admin/family/* endpoints.""" import os import json import logging from datetime import datetime from typing import Optional from fastapi import APIRouter, Request, Depends, HTTPException from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from pydantic import BaseModel from shared.session_auth import require_auth, get_session, is_authenticated, get_bearer_auth from shared.llm import LLMClient from shared.notify import TelegramNotifier from .email import EmailProcessor from .calendar import CalendarClient from .pipeline import FamilyPipeline logger = logging.getLogger(__name__) family_router = APIRouter(prefix="/family", tags=["family-automation"]) # Initialize services llm_client = LLMClient() telegram = TelegramNotifier() calendar = CalendarClient() pipeline = FamilyPipeline(llm_client, calendar, telegram) class EmailWebhookRequest(BaseModel): """Incoming email webhook payload.""" subject: str body: str sender: str received_at: Optional[str] = None class PipelineStatusResponse(BaseModel): """Pipeline health status.""" llm: dict calendar: dict telegram: bool pipeline_ready: bool def _get_user_info(request: Request) -> dict: """Get current user info.""" session = get_session(request) if session: return { "user_id": session.get("user_id", "admin"), "user_email": session.get("user_email", "admin@hoffdesk.local"), } # Fallback for bearer auth bearer = get_bearer_auth(request) if bearer: return { "user_id": bearer.get("user_id", "api"), "user_email": bearer.get("user_email", "api@hoffdesk.local"), } return {"user_id": "guest", "user_email": "guest@hoffdesk.local"} @family_router.get("/login", response_class=HTMLResponse) async def family_login_page(request: Request): """Show login page if not authenticated.""" if is_authenticated(request): return RedirectResponse(url="/admin/family/") # Redirect to unified login return RedirectResponse(url="/login?redirect=/admin/family/", status_code=302) @family_router.post("/login") async def family_login(request: Request): """Login handler - deprecated, use /auth/login.""" return RedirectResponse(url="/login?redirect=/admin/family/", status_code=302) @family_router.get("/", response_class=HTMLResponse) async def family_dashboard(request: Request): """Family automation dashboard.""" if not is_authenticated(request): return RedirectResponse(url="/login?redirect=/admin/family/", status_code=302) user_info = _get_user_info(request) return HTMLResponse(content=f""" Family Automation | HoffDesk

๐Ÿ  Family Automation

Logged in as {user_info['user_id']} ยท Logout

Pipeline Status

Email โ†’ LLM Parse โ†’ Calendar โ†’ Telegram

View detailed status โ†’

API Endpoints

POST /admin/family/webhook/email

Receive email webhook, process to calendar + notify.

POST /admin/family/pipeline/test

Test the full pipeline with sample data.

GET /admin/family/status

Health check for all services.

Configuration

User: {user_info['user_id']}

Telegram: TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID

Calendar: Radicale at 100.71.120.12:5232

""") @family_router.get("/status", response_model=PipelineStatusResponse) async def pipeline_status(request: Request): """Check health of all family pipeline services.""" # Allow both session (browser) and bearer (API) auth if not is_authenticated(request) and not get_bearer_auth(request): raise HTTPException(status_code=401, detail="Authentication required") llm_health = await llm_client.health() calendar_health = await calendar.health() telegram_ready = bool(os.getenv("TELEGRAM_BOT_TOKEN")) return PipelineStatusResponse( llm=llm_health, calendar=calendar_health, telegram=telegram_ready, pipeline_ready=all([ llm_health["local"] or llm_health["cloud"], calendar_health["connected"], telegram_ready ]) ) @family_router.post("/webhook/email") async def email_webhook( request: Request, data: EmailWebhookRequest ): """Process incoming email webhook. Flow: Email body โ†’ LLM extraction โ†’ Calendar event โ†’ Telegram notification """ # Require Bearer token for webhooks (machine-to-machine) bearer = get_bearer_auth(request) if not bearer: raise HTTPException(status_code=401, detail="Bearer token required") logger.info(f"Received email webhook: {data.subject[:50]}...") try: result = await pipeline.process_email( subject=data.subject, body=data.body, sender=data.sender, received_at=data.received_at or datetime.now().isoformat() ) return { "success": True, "event_created": result.get("event_created", False), "notification_sent": result.get("notification_sent", False), "parsed_data": result.get("parsed", {}), "timestamp": datetime.now().isoformat() } except Exception as e: logger.exception("Pipeline failed") await telegram.to_matt(f"โŒ Pipeline error: {str(e)[:200]}") raise HTTPException(status_code=500, detail=str(e)) @family_router.post("/pipeline/test") async def test_pipeline(request: Request): """Test the full pipeline with sample data.""" # Allow both session (browser) and bearer (API) auth if not is_authenticated(request) and not get_bearer_auth(request): raise HTTPException(status_code=401, detail="Authentication required") sample_email = { "subject": "Dentist appointment tomorrow at 3pm", "body": "Hi, this confirms your appointment with Dr. Smith on April 23rd at 3:00 PM.", "sender": "reception@dentist.example.com", "received_at": datetime.now().isoformat() } try: result = await pipeline.process_email(**sample_email) return { "test": True, "sample": sample_email, "result": result } except Exception as e: logger.exception("Test pipeline failed") raise HTTPException(status_code=500, detail=str(e)) @family_router.on_event("shutdown") async def shutdown(): """Clean up resources.""" await llm_client.close() await telegram.close()