"""Unified authentication endpoints.
Handles login/logout for all admin routes with session cookies.
Browser consumers get session cookies, API consumers get Bearer tokens.
"""
import os
import json
import logging
from typing import Optional
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from pydantic import BaseModel
from pathlib import Path
from shared.session_auth import (
create_session, set_session_cookie, clear_session,
get_session, SESSION_COOKIE_NAME, MAX_AGE_DAYS
)
logger = logging.getLogger(name)
auth_router = APIRouter(tags=["auth"])
In a real app, use bcrypt + database. For now, env-configured.
Format: comma-separated "username:password:email:role1,role2"
_USERS_ENV = os.getenv("HOFFDESK_USERS", "")
def _get_users() -> dict:
"""Parse users from environment."""
users = {}
if not _USERS_ENV:
# Fallback: single admin from token
token = os.getenv("BLOG_ADMIN_TOKEN") or os.getenv("ADMIN_TOKEN") or "changeme"
users["matt"] = {
"password": token,
"email": "matt@hoffdesk.local",
"roles": ["admin"],
}
return users
for user_spec in _USERS_ENV.split(","):
parts = user_spec.strip().split(":")
if len(parts) >= 2:
username = parts[0]
password = parts[1]
email = parts[2] if len(parts) > 2 else f"{username}@hoffdesk.local"
roles = parts[3].split(";") if len(parts) > 3 else ["admin"]
users[username] = {
"password": password,
"email": email,
"roles": roles,
}
return users
class LoginRequest(BaseModel):
"""Login request body."""
username: str
password: str
class LoginResponse(BaseModel):
"""Login response."""
success: bool
user_id: Optional[str] = None
message: Optional[str] = None
--- Login Page ---
Path to shared templates
LOGIN_TEMPLATE_DIR = Path("/home/hoffmann_admin/.openclaw/shared/project-docs/dashboard/templates")
@auth_router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request, redirect: Optional[str] = None, error: Optional[str] = None):
"""Show unified login page."""
# Already logged in?
if get_session(request):
session = get_session(request)
roles = session.get("roles", ["admin"])
if "family" in roles and "admin" not in roles:
return RedirectResponse(url=redirect or "/", status_code=302)
return RedirectResponse(url=redirect or "/admin/blog/", status_code=302)
# Serve our styled template instead of the inline version
login_path = LOGIN_TEMPLATE_DIR / "family_login.html"
if login_path.exists():
html_content = login_path.read_text()
return HTMLResponse(content=html_content)
# Fallback: inline login page
return HTMLResponse(content=f"""<!DOCTYPE html>
HoffDesk
Admin Login
{f'\u003cdiv class="error"\u003e{error}\u003c/div\u003e' if error else ''}""")
--- Login Handler ---
@auth_router.post("/login")
async def login(request: Request):
"""Handle login submission. Returns JSON for AJAX, sets session cookie."""
import logging
logger = logging.getLogger(name)
logger.info(f"Login POST received: path={request.url.path}, host={request.headers.get('host')}")
# Parse request body (JSON or form)
try:
body = await request.body()
content_type = request.headers.get('content-type', '')
if 'application/json' in content_type:
data = json.loads(body) if body else {}
else:
# Form data parsing
from urllib.parse import parse_qs
data = parse_qs(body.decode('utf-8')) if body else {}
data = {k: v[0] if len(v) == 1 else v for k, v in data.items()}
except Exception:
data = {}
username = data.get("username", "").strip().lower()
password = data.get("password", "")
redirect_url = data.get("redirect", "/admin/blog/")
# Validate credentials
users = _get_users()
user = users.get(username)
if not user or user["password"] != password:
logger.warning(f"Failed login attempt for user: {username}")
return JSONResponse(
status_code=401,
content=LoginResponse(
success=False,
message="Invalid username or password"
).model_dump()
)
# Determine redirect based on role
if "family" in user.get("roles", []) and "admin" not in user.get("roles", []):
redirect_url = redirect_url or "/"
# Create session
session_token = create_session(
user_id=username,
user_email=user["email"],
roles=user["roles"]
)
logger.info(f"Successful login: {username}")
# Build response with cookie
response = JSONResponse(
content=LoginResponse(
success=True,
user_id=username,
message="Login successful"
).model_dump()
)
set_session_cookie(response, session_token)
# Also set redirect in response for client-side handling
response.headers["X-Redirect"] = redirect_url
return response
--- Logout ---
@auth_router.post("/logout")
async def logout(request: Request):
"""Clear session cookie (logout)."""
response = JSONResponse(
content={"success": True, "message": "Logged out"}
)
clear_session(response)
return response
@auth_router.get("/logout")
async def logout_get(request: Request):
"""GET logout (for links/buttons without JS)."""
response = RedirectResponse(url="/login", status_code=302)
clear_session(response)
return response
--- Session Check ---
@auth_router.get("/me")
async def current_user(request: Request):
"""Get current authenticated user info."""
session = get_session(request)
if session:
return {
"authenticated": True,
"user_id": session["user_id"],
"user_email": session.get("user_email"),
"roles": session.get("roles", ["admin"]),
"expires_in_days": MAX_AGE_DAYS,
}
# Check Bearer token
from shared.session_auth import get_bearer_auth
bearer = get_bearer_auth(request)
if bearer:
return {
"authenticated": True,
"user_id": bearer["user_id"],
"roles": bearer.get("roles", ["api"]),
"auth_method": "bearer",
}
return JSONResponse(
status_code=401,
content={"authenticated": False}
)