"""
Cases API Endpoints
GET /api/v1/cases/{athlete_id} - Returns CaseSchema array
GET /api/v1/cases/{case_id}/timeline - Returns EventSchema array
GET /api/v1/cases/{case_id}/milestones - Returns MilestoneSchema array
"""
from typing import List, Optional
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Query, status, Request
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from app.database import get_db
from app.models import Athlete, Case, Event, Milestone, School, User
from app.schemas import (
CaseSchema, EventSchema, MilestoneSchema, EventContent,
CaseCreate, CaseUpdate
)
from app.utils.id_generator import generate_id
from app.auth import get_current_user, require_role
router = APIRouter(tags=["cases"])
============== VALIDATION HELPERS ==============
VALID_SEVERITY = {"mild", "moderate", "severe"}
VALID_STATUS = {"active", "resolved"}
VALID_ATTENTION_LEVELS = {"urgent", "warning", "stable"}
def validate_enum(value: str, valid_set: set, field_name: str):
"""Validate enum values"""
if value not in valid_set:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid {field_name}: '{value}'. Must be one of: {valid_set}"
)
============== FERPA FILTERING HELPERS ==============
def strip_clinical_notes(content: dict) -> dict:
"""Remove clinical_notes from event content for non-AT roles"""
filtered = dict(content)
if "clinical_notes" in filtered:
del filtered["clinical_notes"]
return filtered
def filter_events_by_visibility(events: List[Event], role: str) -> List[Event]:
"""Filter events based on visibility array"""
if role == "at":
return events # AT sees all events
filtered = []
for event in events:
visibility = event.visibility or []
if role in visibility:
filtered.append(event)
return filtered
def strip_clinical_notes_from_events(events: List[Event], role: str) -> List[Event]:
"""Create modified event copies with clinical_notes stripped for non-AT roles"""
if role == "at":
return events
filtered_events = []
for event in events:
# Create a copy with stripped content
event_copy = type('obj', (object,), {})()
event_copy.__dict__ = event.__dict__.copy()
if event.content and isinstance(event.content, dict):
event_copy.content = strip_clinical_notes(event.content)
filtered_events.append(event_copy)
return filtered_events
@router.get("/{case_id}", response_model=CaseSchema)
async def get_case_by_id(
request: Request,
case_id: str,
school_id: str = Query(..., description="School identifier for tenant isolation"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
Get a single case by ID
Returns full case data including all details.
"""
role = current_user.role
user_id = current_user.id
# Validate school matches user's school
if current_user.school_id != school_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied for this school"
)
# Validate school exists
school = db.query(School).filter(School.id == school_id).first()
if not school:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"School '{school_id}' not found"
)
# Query case
case = db.query(Case).filter(
and_(Case.id == case_id, Case.school_id == school_id)
).first()
if not case:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Case '{case_id}' not found"
)
# Parent: Check access to own child using parent_ids
if role == "parent":
athlete = db.query(Athlete).filter(Athlete.id == case.athlete_id).first()
if not athlete or user_id not in (athlete.parent_ids or []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Parents can only view their own child's cases"
)
case_data = CaseSchema.model_validate(case)
# Coach: hide severity and attention_level
if role == "coach":
case_dict = case_data.model_dump()
case_dict["severity"] = "moderate"
case_dict["attention_level"] = "stable"
case_data = CaseSchema(**case_dict)
return case_data
@router.get("/athlete/{athlete_id}", response_model=List[CaseSchema])
async def get_athlete_cases(
request: Request,
athlete_id: str,
school_id: str = Query(..., description="School identifier for tenant isolation"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
Get all cases for an athlete
Role-based filtering:
- Coach: Returns active status only (no severity, no clinical details) - filters to active cases only
- Parent: Returns cases for own child only
- AT: Full access to all case data
"""
role = current_user.role
user_id = current_user.id
# Validate school matches user's school
if current_user.school_id != school_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied for this school"
)
# Validate school exists
school = db.query(School).filter(School.id == school_id).first()
if not school:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"School '{school_id}' not found"
)
# Query athlete
athlete = db.query(Athlete).filter(
and_(Athlete.id == athlete_id, Athlete.school_id == school_id)
).first()
if not athlete:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Athlete '{athlete_id}' not found"
)
# Parent: Check access to own child using parent_ids array
if role == "parent":
if user_id not in (athlete.parent_ids or []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Parents can only view their own child's cases"
)
# Build query
query = db.query(Case).filter(
and_(Case.athlete_id == athlete_id, Case.school_id == school_id)
)
# Coach: active cases only (hide severity, attention_level)
if role == "coach":
query = query.filter(Case.status == "active")
cases = query.order_by(Case.opened_at.desc()).all()
# Transform cases based on role
result = []
for case in cases:
case_data = CaseSchema.model_validate(case)
# Coach: hide severity and attention_level
if role == "coach":
# Create modified case without sensitive fields
case_dict = case_data.model_dump()
case_dict["severity"] = "moderate" # Default placeholder
case_dict["attention_level"] = "stable" # Default placeholder
case_data = CaseSchema(**case_dict)
result.append(case_data)
return result
@router.get("/{case_id}/timeline", response_model=List[EventSchema])
async def get_case_timeline(
request: Request,
case_id: str,
school_id: str = Query(..., description="School identifier for tenant isolation"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
Get event timeline for a case
FERPA Filter:
- Strips clinical_notes key for non-AT roles
- Filters events by visibility array
"""
role = current_user.role
user_id = current_user.id
# Validate school matches user's school
if current_user.school_id != school_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied for this school"
)
# Validate school exists
school = db.query(School).filter(School.id == school_id).first()
if not school:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"School '{school_id}' not found"
)
# Query case
case = db.query(Case).filter(
and_(Case.id == case_id, Case.school_id == school_id)
).first()
if not case:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Case '{case_id}' not found"
)
# Parent: Check access to own child using parent_ids
if role == "parent":
athlete = db.query(Athlete).filter(Athlete.id == case.athlete_id).first()
if not athlete or user_id not in (athlete.parent_ids or []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Parents can only view their own child's cases"
)
# Query events
events = db.query(Event).filter(
and_(Event.case_id == case_id, Event.school_id == school_id)
).order_by(Event.timestamp.desc()).all()
# Filter by visibility
events = filter_events_by_visibility(events, role)
# Transform events based on role (strip clinical_notes for non-AT)
result = []
for event in events:
event_data = EventSchema.model_validate(event)
if role != "at":
# Strip clinical_notes from content
content_dict = event_data.content.model_dump() if event_data.content else {}
content_dict = strip_clinical_notes(content_dict)
event_data.content = EventContent(**content_dict)
result.append(event_data)
return result
@router.get("/{case_id}/milestones", response_model=List[MilestoneSchema])
async def get_case_milestones(
request: Request,
case_id: str,
school_id: str = Query(..., description="School identifier for tenant isolation"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
Get milestones for a case
Role-based access:
- AT: Read/Write (milestones readable by all)
- Coach/Parent: Read-Only (milestones readable by all)
"""
role = current_user.role
user_id = current_user.id
# Validate school matches user's school
if current_user.school_id != school_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied for this school"
)
# Validate school exists
school = db.query(School).filter(School.id == school_id).first()
if not school:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"School '{school_id}' not found"
)
# Query case
case = db.query(Case).filter(
and_(Case.id == case_id, Case.school_id == school_id)
).first()
if not case:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Case '{case_id}' not found"
)
# Parent: Check access to own child using parent_ids
if role == "parent":
athlete = db.query(Athlete).filter(Athlete.id == case.athlete_id).first()
if not athlete or user_id not in (athlete.parent_ids or []):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Parents can only view their own child's cases"
)
# Query milestones - all roles can read
milestones = db.query(Milestone).filter(
and_(Milestone.case_id == case_id, Milestone.school_id == school_id)
).order_by(Milestone.target_date).all()
return [MilestoneSchema.model_validate(m) for m in milestones]
@router.post("", response_model=CaseSchema, status_code=status.HTTP_201_CREATED)
async def create_case(
request: Request,
case_data: CaseCreate,
current_user: User = Depends(require_role(["at"])),
db: Session = Depends(get_db),
):
"""
Create a new case
Restricted to AT role only
"""
# Validate user's school matches
if current_user.school_id != case_data.school_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot create case for different school"
)
# Validate school exists
school = db.query(School).filter(School.id == case_data.school_id).first()
if not school:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"School '{case_data.school_id}' not found"
)
# Validate athlete exists
athlete = db.query(Athlete).filter(
and_(Athlete.id == case_data.athlete_id, Athlete.school_id == case_data.school_id)
).first()
if not athlete:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Athlete '{case_data.athlete_id}' not found"
)
# Validate enums
validate_enum(case_data.severity, VALID_SEVERITY, "severity")
validate_enum(case_data.attention_level, VALID_ATTENTION_LEVELS, "attention_level")
# Check for recent resolved case (30-day window for reopening)
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
recent_case = db.query(Case).filter(
and_(
Case.athlete_id == case_data.athlete_id,
Case.status == "resolved",
Case.resolved_at >= thirty_days_ago
)
).order_by(Case.resolved_at.desc()).first()
reopened_from_case_id = None
reopened_at = None
if recent_case and case_data.title.lower() in recent_case.title.lower():
# Same body part - reopen logic
reopened_from_case_id = recent_case.id
reopened_at = datetime.utcnow()
# Create case
case = Case(
id=generate_id("cas_"),
school_id=case_data.school_id,
athlete_id=case_data.athlete_id,
title=case_data.title,
severity=case_data.severity,
attention_level=case_data.attention_level,
status="active",
opened_at=datetime.utcnow(),
reopened_from_case_id=reopened_from_case_id,
reopened_at=reopened_at,
primary_at_id=case_data.primary_at_id,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(case)
db.commit()
db.refresh(case)
# Update athlete's active_case_ids
if case.id not in (athlete.active_case_ids or []):
athlete.active_case_ids = (athlete.active_case_ids or []) + [case.id]
db.commit()
return CaseSchema.model_validate(case)
@router.patch("/{case_id}", response_model=CaseSchema)
async def update_case(
request: Request,
case_id: str,
case_data: CaseUpdate,
school_id: str = Query(..., description="School identifier"),
current_user: User = Depends(require_role(["at"])),
db: Session = Depends(get_db),
):
"""
Update an existing case
Restricted to AT role only
"""
# Validate user's school matches
if current_user.school_id != school_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot update case in different school"
)
# Query case
case = db.query(Case).filter(
and_(Case.id == case_id, Case.school_id == school_id)
).first()
if not case:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Case '{case_id}' not found"
)
# Update fields
if case_data.title is not None:
case.title = case_data.title
if case_data.attention_level is not None:
validate_enum(case_data.attention_level, VALID_ATTENTION_LEVELS, "attention_level")
case.attention_level = case_data.attention_level
if case_data.status is not None:
validate_enum(case_data.status, VALID_STATUS, "status")
case.status = case_data.status
if case_data.status == "resolved":
case.resolved_at = datetime.utcnow()
# Remove from athlete's active_case_ids
athlete = db.query(Athlete).filter(Athlete.id == case.athlete_id).first()
if athlete and case.id in (athlete.active_case_ids or []):
athlete.active_case_ids = [cid for cid in (athlete.active_case_ids or []) if cid != case.id]
if case_data.primary_at_id is not None:
case.primary_at_id = case_data.primary_at_id
case.updated_at = datetime.utcnow()
db.commit()
db.refresh(case)
return CaseSchema.model_validate(case)
@router.delete("/{case_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_case(
request: Request,
case_id: str,
school_id: str = Query(..., description="School identifier"),
current_user: User = Depends(require_role(["at"])),
db: Session = Depends(get_db),
):
"""
Delete a case
Restricted to AT role only
"""
# Validate user's school matches
if current_user.school_id != school_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot delete case in different school"
)
# Query case
case = db.query(Case).filter(
and_(Case.id == case_id, Case.school_id == school_id)
).first()
if not case:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Case '{case_id}' not found"
)
# Remove from athlete's active_case_ids
athlete = db.query(Athlete).filter(Athlete.id == case.athlete_id).first()
if athlete and case.id in (athlete.active_case_ids or []):
athlete.active_case_ids = [cid for cid in (athlete.active_case_ids or []) if cid != case.id]
db.delete(case)
db.commit()
return None