"""
JWT Authentication Module
Token creation, validation, and FastAPI dependencies
"""
from datetime import datetime, timedelta
from typing import Optional, List
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.config import settings
from app.database import get_db
from app.models import User
Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
JWT configuration
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes
Security scheme
security = HTTPBearer(auto_error=False)
def hash_password(password: str) -> str:
"""Hash a plaintext password using bcrypt"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plaintext password against a hash"""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(
user_id: str,
role: str,
school_id: str,
assigned_sports: List[str] = None,
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create a JWT access token
Payload:
{
"sub": "usr_001", # user_id
"role": "at", # role claim
"school_id": "schl_001", # school claim
"assigned_sports": ["Football"], # coach's sports
"exp": 1719878400 # expiration timestamp
}
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {
"sub": user_id,
"role": role,
"school_id": school_id,
"assigned_sports": assigned_sports or [],
"exp": expire,
"iat": datetime.utcnow(), # issued at
"type": "access"
}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> Optional[dict]:
"""
Verify and decode a JWT token
Returns:
dict with user_id, role, school_id, assigned_sports, exp claims
None if token is invalid or expired
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# Extract claims
user_id: str = payload.get("sub")
role: str = payload.get("role")
school_id: str = payload.get("school_id")
assigned_sports: List[str] = payload.get("assigned_sports", [])
token_type: str = payload.get("type")
if user_id is None or role is None or school_id is None:
return None
if token_type != "access":
return None
return {
"user_id": user_id,
"role": role,
"school_id": school_id,
"assigned_sports": assigned_sports,
"exp": payload.get("exp"),
"iat": payload.get("iat")
}
except JWTError:
return None
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""
FastAPI dependency: Extract and validate JWT, return User model
Usage:
current_user: User = Depends(get_current_user)
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not credentials:
raise credentials_exception
token = credentials.credentials
payload = verify_token(token)
if payload is None:
raise credentials_exception
# Get user from database
user = db.query(User).filter(User.id == payload["user_id"]).first()
if user is None:
raise credentials_exception
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is inactive"
)
return user
def require_role(allowed_roles: List[str]):
"""
FastAPI dependency factory: Require specific role(s)
Usage:
@router.get("/admin-only")
async def admin_endpoint(
current_user: User = Depends(require_role(["admin"]))
):
...
"""
async def role_checker(
current_user: User = Depends(get_current_user)
) -> User:
if current_user.role not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Access denied. Required role: one of {allowed_roles}"
)
return current_user
return role_checker
Role-based shortcut dependencies
require_at = require_role(["at"])
require_at_or_coach = require_role(["at", "coach"])
require_at_or_admin = require_role(["at", "admin"])
require_coach = require_role(["coach"])
require_ad = require_role(["ad"])
require_coach_or_ad = require_role(["coach", "ad"])
require_any_role = require_role(["at", "coach", "parent", "admin", "ad"])
def get_token_from_request(request: Request) -> Optional[str]:
"""Extract Bearer token from request headers (for manual extraction)"""
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
return auth_header[7:]
return None