"""Unified authentication endpoints. Handles login/logout for all admin routes with session cookies. Browser consumers get session cookies, API consumers get Bearer tokens. """ import os import json import logging from typing import Optional from fastapi import APIRouter, Request, HTTPException from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from pydantic import BaseModel from pathlib import Path from shared.session_auth import ( create_session, set_session_cookie, clear_session, get_session, SESSION_COOKIE_NAME, MAX_AGE_DAYS ) logger = logging.getLogger(__name__) auth_router = APIRouter(tags=["auth"]) # In a real app, use bcrypt + database. For now, env-configured. # Format: comma-separated "username:password:email:role1,role2" _USERS_ENV = os.getenv("HOFFDESK_USERS", "") def _get_users() -> dict: """Parse users from environment.""" users = {} if not _USERS_ENV: # Fallback: single admin from token token = os.getenv("BLOG_ADMIN_TOKEN") or os.getenv("ADMIN_TOKEN") or "changeme" users["matt"] = { "password": token, "email": "matt@hoffdesk.local", "roles": ["admin"], } return users for user_spec in _USERS_ENV.split(","): parts = user_spec.strip().split(":") if len(parts) >= 2: username = parts[0] password = parts[1] email = parts[2] if len(parts) > 2 else f"{username}@hoffdesk.local" roles = parts[3].split(";") if len(parts) > 3 else ["admin"] users[username] = { "password": password, "email": email, "roles": roles, } return users class LoginRequest(BaseModel): """Login request body.""" username: str password: str class LoginResponse(BaseModel): """Login response.""" success: bool user_id: Optional[str] = None message: Optional[str] = None # --- Login Page --- # Path to shared templates LOGIN_TEMPLATE_DIR = Path("/home/hoffmann_admin/.openclaw/shared/project-docs/dashboard/templates") @auth_router.get("/login", response_class=HTMLResponse) async def login_page(request: Request, redirect: Optional[str] = None, error: Optional[str] = None): """Show unified login page.""" # Already logged in? if get_session(request): session = get_session(request) roles = session.get("roles", ["admin"]) if "family" in roles and "admin" not in roles: return RedirectResponse(url=redirect or "/", status_code=302) return RedirectResponse(url=redirect or "/admin/blog/", status_code=302) # Serve our styled template instead of the inline version login_path = LOGIN_TEMPLATE_DIR / "family_login.html" if login_path.exists(): html_content = login_path.read_text() return HTMLResponse(content=html_content) # Fallback: inline login page return HTMLResponse(content=f""" Login | HoffDesk

HoffDesk

Admin Login

{f'\u003cdiv class="error"\u003e{error}\u003c/div\u003e' if error else ''}
Sign In
""") # --- Login Handler --- @auth_router.post("/login") async def login(request: Request): """Handle login submission. Returns JSON for AJAX, sets session cookie.""" import logging logger = logging.getLogger(__name__) logger.info(f"Login POST received: path={request.url.path}, host={request.headers.get('host')}") # Parse request body (JSON or form) try: body = await request.body() content_type = request.headers.get('content-type', '') if 'application/json' in content_type: data = json.loads(body) if body else {} else: # Form data parsing from urllib.parse import parse_qs data = parse_qs(body.decode('utf-8')) if body else {} data = {k: v[0] if len(v) == 1 else v for k, v in data.items()} except Exception: data = {} username = data.get("username", "").strip().lower() password = data.get("password", "") redirect_url = data.get("redirect", "/admin/blog/") # Validate credentials users = _get_users() user = users.get(username) if not user or user["password"] != password: logger.warning(f"Failed login attempt for user: {username}") return JSONResponse( status_code=401, content=LoginResponse( success=False, message="Invalid username or password" ).model_dump() ) # Determine redirect based on role if "family" in user.get("roles", []) and "admin" not in user.get("roles", []): redirect_url = redirect_url or "/" # Create session session_token = create_session( user_id=username, user_email=user["email"], roles=user["roles"] ) logger.info(f"Successful login: {username}") # Build response with cookie response = JSONResponse( content=LoginResponse( success=True, user_id=username, message="Login successful" ).model_dump() ) set_session_cookie(response, session_token) # Also set redirect in response for client-side handling response.headers["X-Redirect"] = redirect_url return response # --- Logout --- @auth_router.post("/logout") async def logout(request: Request): """Clear session cookie (logout).""" response = JSONResponse( content={"success": True, "message": "Logged out"} ) clear_session(response) return response @auth_router.get("/logout") async def logout_get(request: Request): """GET logout (for links/buttons without JS).""" response = RedirectResponse(url="/login", status_code=302) clear_session(response) return response # --- Session Check --- @auth_router.get("/me") async def current_user(request: Request): """Get current authenticated user info.""" session = get_session(request) if session: return { "authenticated": True, "user_id": session["user_id"], "user_email": session.get("user_email"), "roles": session.get("roles", ["admin"]), "expires_in_days": MAX_AGE_DAYS, } # Check Bearer token from shared.session_auth import get_bearer_auth bearer = get_bearer_auth(request) if bearer: return { "authenticated": True, "user_id": bearer["user_id"], "roles": bearer.get("roles", ["api"]), "auth_method": "bearer", } return JSONResponse( status_code=401, content={"authenticated": False} )