"""Session-based authentication for admin routes. Replaces X-Admin-Token header and ?token= query param with session cookies. 30-day lifetime, secure on HTTPS, httponly to prevent XSS. Also supports Bearer tokens for API/machine consumers: Authorization: Bearer X-Hoffdesk-Secret: """ import os import logging from datetime import datetime, timedelta from typing import Optional, Dict, Any, Union from functools import wraps from fastapi import Request, HTTPException, Response, Depends from fastapi.responses import RedirectResponse, JSONResponse from starlette.middleware.base import BaseHTTPMiddleware from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired logger = logging.getLogger(__name__) # Config from environment SECRET_KEY = os.getenv("SESSION_SECRET_KEY", "dev-secret-change-in-production") MAX_AGE_DAYS = int(os.getenv("SESSION_MAX_AGE_DAYS", "30")) SESSION_COOKIE_NAME = "hoffdesk_session" # Serializer for signed cookies serializer = URLSafeTimedSerializer(SECRET_KEY) def _get_admin_token() -> str: """Get admin/API token from environment.""" return os.getenv("BLOG_ADMIN_TOKEN") or os.getenv("ADMIN_TOKEN") or os.getenv("HOFFDESK_SECRET") or "changeme-please-update" class SessionAuthMiddleware(BaseHTTPMiddleware): """Middleware to validate session cookies on admin routes. Exempts: - Public routes (/, /api/blog/*, /health, etc.) - Login page itself (/auth/*) - Webhook endpoints (/webhook, /telegram/callback) """ EXEMPT_PATHS = [ "/", # Blog index "/health", "/webhook", "/webhook/health", "/telegram/callback", "/family/events/removed", # API endpoint uses header auth ] EXEMPT_PREFIXES = [ "/api/blog/", # Public blog API "/api/system/", # System endpoints (brain viz, health) "/login", # Login page (added from /auth/ when routes were normalized) "/auth/", # Auth endpoints (login, logout, etc.) "/static/", # Static assets "/blog/", # Blog static/public assets "/blog/admin/static/", # Admin static files "/api/v1/content/briefs/", # Public content briefs "/family/login/", # Family dashboard login page "/api/public/", # Public API endpoints (IMAP status, metrics, etc.) "/api/today", # Dashboard data API (consumed by HTMX polling) "/api/events-dashboard", # Event Graph data API (consumed by HTMX polling) "/ui/", # UI HTML fragments for HTMX (new namespace per CONTRACT.md) "/api/events-list", # Event list HTML fragment for HTMX "/api/events-week", # Week view HTML fragment for HTMX "/api/events/", # Event state machine API (POST endpoints for confirm/decline/reschedule/counter) ] async def dispatch(self, request: Request, call_next): # Check if path is exempt path = request.url.path if any(path == exempt for exempt in self.EXEMPT_PATHS): return await call_next(request) if any(path.startswith(prefix) for prefix in self.EXEMPT_PREFIXES): return await call_next(request) # Check for session cookie OR Bearer token session_data = get_session(request) bearer_data = get_bearer_auth(request) if not session_data and not bearer_data: # No valid auth - return 401 or redirect to login accept = request.headers.get("Accept", "") hx_request = request.headers.get("HX-Request") == "true" if accept.startswith("text/html") or hx_request: return RedirectResponse(url=f"/login?redirect={path}", status_code=302) else: return JSONResponse( status_code=401, content={"detail": "Authentication required"} ) # Valid auth - attach user info to request state request.state.user = session_data or bearer_data request.state.auth_method = "session" if session_data else "bearer" return await call_next(request) # --- Session Cookie Functions --- def create_session(user_id: str, user_email: Optional[str] = None, roles: Optional[list] = None) -> str: """Create a signed session token. Args: user_id: Unique identifier (e.g., "matt", "aundrea") user_email: Optional email for display roles: Optional list of roles (e.g., ["admin", "family"]) Returns: Signed session string (goes in cookie) """ session_data = { "user_id": user_id, "user_email": user_email or user_id, "roles": roles or ["admin"], "created_at": datetime.now().isoformat(), } token = serializer.dumps(session_data) logger.info(f"Created session for {user_id}") return token def get_session(request: Request) -> Optional[Dict[str, Any]]: """Extract and validate session from request cookies. Returns: Session data dict or None if invalid/expired """ session_cookie = request.cookies.get(SESSION_COOKIE_NAME) if not session_cookie: return None try: session_data = serializer.loads( session_cookie, max_age=MAX_AGE_DAYS * 24 * 60 * 60 # Convert days to seconds ) return session_data except SignatureExpired: logger.debug("Session expired") return None except BadSignature: logger.warning("Invalid session signature (tampering?)") return None except Exception as e: logger.error(f"Session validation error: {e}") return None def set_session_cookie(response: Response, token: str) -> None: """Set the session cookie on a response.""" response.set_cookie( key=SESSION_COOKIE_NAME, value=token, max_age=MAX_AGE_DAYS * 24 * 60 * 60, httponly=True, # Prevent JavaScript access secure=False, # Set True in production with HTTPS samesite="lax", # CSRF protection ) def clear_session(response: Response) -> None: """Clear the session cookie (logout).""" response.delete_cookie(key=SESSION_COOKIE_NAME) # --- Bearer Token Functions --- def get_bearer_auth(request: Request) -> Optional[Dict[str, Any]]: """Extract and validate Bearer token from request headers. Supports: Authorization: Bearer X-Hoffdesk-Secret: Returns: User data dict or None if invalid """ # Check Authorization header first auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] # Strip "Bearer " prefix else: # Fall back to X-Hoffdesk-Secret token = request.headers.get("X-Hoffdesk-Secret", "") if not token: return None expected = _get_admin_token() if token != expected: return None return { "user_id": "api", "user_email": "api@hoffdesk.local", "roles": ["admin", "api"], "auth_method": "bearer", } # --- Unified Auth Dependency --- def require_auth(request: Request) -> Dict[str, Any]: """Dependency for FastAPI routes requiring auth (session OR Bearer). Usage: @router.get("/admin/protected") async def protected_page(request: Request, user=Depends(require_auth)): return {"message": f"Hello {user['user_id']}"} """ session = get_session(request) if session: return session bearer = get_bearer_auth(request) if bearer: return bearer raise HTTPException(status_code=401, detail="Authentication required") def require_session(request: Request) -> Dict[str, Any]: """Dependency for routes requiring session cookie specifically (browser UI). Usage: @router.get("/admin/dashboard") async def dashboard(request: Request, user=Depends(require_session)): return {"message": f"Hello {user['user_id']}"} """ session = get_session(request) if not session: raise HTTPException(status_code=401, detail="Session required") return session def is_authenticated(request: Request) -> bool: """Check if request has valid session (for templates).""" return get_session(request) is not None def get_current_user(request: Request) -> Optional[Dict[str, Any]]: """Get current user from request state or re-check auth.""" if hasattr(request.state, "user") and request.state.user: return request.state.user return get_session(request) or get_bearer_auth(request)