📄 auth.py 5,597 bytes Sunday 12:25 📋 Raw

"""
JWT Authentication Module
Token creation, validation, and FastAPI dependencies
"""
from datetime import datetime, timedelta
from typing import Optional, List
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session

from app.config import settings
from app.database import get_db
from app.models import User

Password hashing context

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

JWT configuration

SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes

Security scheme

security = HTTPBearer(auto_error=False)

def hash_password(password: str) -> str:
"""Hash a plaintext password using bcrypt"""
return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plaintext password against a hash"""
return pwd_context.verify(plain_password, hashed_password)

def create_access_token(
user_id: str,
role: str,
school_id: str,
assigned_sports: List[str] = None,
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create a JWT access token

Payload:
{
    "sub": "usr_001",      # user_id
    "role": "at",          # role claim
    "school_id": "schl_001",  # school claim
    "assigned_sports": ["Football"],  # coach's sports
    "exp": 1719878400      # expiration timestamp
}
"""
if expires_delta:
    expire = datetime.utcnow() + expires_delta
else:
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

to_encode = {
    "sub": user_id,
    "role": role,
    "school_id": school_id,
    "assigned_sports": assigned_sports or [],
    "exp": expire,
    "iat": datetime.utcnow(),  # issued at
    "type": "access"
}

encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

def verify_token(token: str) -> Optional[dict]:
"""
Verify and decode a JWT token

Returns:
    dict with user_id, role, school_id, assigned_sports, exp claims
    None if token is invalid or expired
"""
try:
    payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])

    # Extract claims
    user_id: str = payload.get("sub")
    role: str = payload.get("role")
    school_id: str = payload.get("school_id")
    assigned_sports: List[str] = payload.get("assigned_sports", [])
    token_type: str = payload.get("type")

    if user_id is None or role is None or school_id is None:
        return None

    if token_type != "access":
        return None

    return {
        "user_id": user_id,
        "role": role,
        "school_id": school_id,
        "assigned_sports": assigned_sports,
        "exp": payload.get("exp"),
        "iat": payload.get("iat")
    }
except JWTError:
    return None

async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""
FastAPI dependency: Extract and validate JWT, return User model

Usage:
    current_user: User = Depends(get_current_user)
"""
credentials_exception = HTTPException(
    status_code=status.HTTP_401_UNAUTHORIZED,
    detail="Could not validate credentials",
    headers={"WWW-Authenticate": "Bearer"},
)

if not credentials:
    raise credentials_exception

token = credentials.credentials
payload = verify_token(token)

if payload is None:
    raise credentials_exception

# Get user from database
user = db.query(User).filter(User.id == payload["user_id"]).first()

if user is None:
    raise credentials_exception

if not user.is_active:
    raise HTTPException(
        status_code=status.HTTP_403_FORBIDDEN,
        detail="User account is inactive"
    )

return user

def require_role(allowed_roles: List[str]):
"""
FastAPI dependency factory: Require specific role(s)

Usage:
    @router.get("/admin-only")
    async def admin_endpoint(
        current_user: User = Depends(require_role(["admin"]))
    ):
        ...
"""
async def role_checker(
    current_user: User = Depends(get_current_user)
) -> User:
    if current_user.role not in allowed_roles:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"Access denied. Required role: one of {allowed_roles}"
        )
    return current_user

return role_checker

Role-based shortcut dependencies

require_at = require_role(["at"])
require_at_or_coach = require_role(["at", "coach"])
require_at_or_admin = require_role(["at", "admin"])
require_coach = require_role(["coach"])
require_ad = require_role(["ad"])
require_coach_or_ad = require_role(["coach", "ad"])
require_any_role = require_role(["at", "coach", "parent", "admin", "ad"])

def get_token_from_request(request: Request) -> Optional[str]:
"""Extract Bearer token from request headers (for manual extraction)"""
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
return auth_header[7:]
return None