""" JWT Authentication Module Token creation, validation, and FastAPI dependencies """ from datetime import datetime, timedelta from typing import Optional, List from fastapi import Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from app.config import settings from app.database import get_db from app.models import User # Password hashing context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT configuration SECRET_KEY = settings.secret_key ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes # Security scheme security = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: """Hash a plaintext password using bcrypt""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a plaintext password against a hash""" return pwd_context.verify(plain_password, hashed_password) def create_access_token( user_id: str, role: str, school_id: str, assigned_sports: List[str] = None, expires_delta: Optional[timedelta] = None ) -> str: """ Create a JWT access token Payload: { "sub": "usr_001", # user_id "role": "at", # role claim "school_id": "schl_001", # school claim "assigned_sports": ["Football"], # coach's sports "exp": 1719878400 # expiration timestamp } """ if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode = { "sub": user_id, "role": role, "school_id": school_id, "assigned_sports": assigned_sports or [], "exp": expire, "iat": datetime.utcnow(), # issued at "type": "access" } encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(token: str) -> Optional[dict]: """ Verify and decode a JWT token Returns: dict with user_id, role, school_id, assigned_sports, exp claims None if token is invalid or expired """ try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # Extract claims user_id: str = payload.get("sub") role: str = payload.get("role") school_id: str = payload.get("school_id") assigned_sports: List[str] = payload.get("assigned_sports", []) token_type: str = payload.get("type") if user_id is None or role is None or school_id is None: return None if token_type != "access": return None return { "user_id": user_id, "role": role, "school_id": school_id, "assigned_sports": assigned_sports, "exp": payload.get("exp"), "iat": payload.get("iat") } except JWTError: return None async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), db: Session = Depends(get_db) ) -> User: """ FastAPI dependency: Extract and validate JWT, return User model Usage: current_user: User = Depends(get_current_user) """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) if not credentials: raise credentials_exception token = credentials.credentials payload = verify_token(token) if payload is None: raise credentials_exception # Get user from database user = db.query(User).filter(User.id == payload["user_id"]).first() if user is None: raise credentials_exception if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is inactive" ) return user def require_role(allowed_roles: List[str]): """ FastAPI dependency factory: Require specific role(s) Usage: @router.get("/admin-only") async def admin_endpoint( current_user: User = Depends(require_role(["admin"])) ): ... """ async def role_checker( current_user: User = Depends(get_current_user) ) -> User: if current_user.role not in allowed_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Access denied. Required role: one of {allowed_roles}" ) return current_user return role_checker # Role-based shortcut dependencies require_at = require_role(["at"]) require_at_or_coach = require_role(["at", "coach"]) require_at_or_admin = require_role(["at", "admin"]) require_coach = require_role(["coach"]) require_ad = require_role(["ad"]) require_coach_or_ad = require_role(["coach", "ad"]) require_any_role = require_role(["at", "coach", "parent", "admin", "ad"]) def get_token_from_request(request: Request) -> Optional[str]: """Extract Bearer token from request headers (for manual extraction)""" auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): return auth_header[7:] return None