"""
FastAPI router for content generation API.
Endpoints:
- POST /admin/content/generate → Start generation, return job_id
- GET /content/jobs/{job_id} → Poll for status and results
- POST /content/jobs/{job_id}/cancel → Cancel running job
"""
import os
from fastapi import APIRouter, HTTPException, Request, Header, Depends
from typing import Optional
import logging
from shared.session_auth import require_auth, is_authenticated, get_bearer_auth
from .models import (
GenerateRequest, GenerateResponse, JobDetailResponse,
CancelResponse, ContentType, PipelineStage
)
from .pipeline import get_orchestrator, check_gaming_pc_available
Import v2 generation support
try:
from ..blog.generation.prompts import build_struggle_first_prompt
from ..blog.style.loader import list_style_examples
V2_AVAILABLE = True
except ImportError:
V2_AVAILABLE = False
logging.getLogger(name).warning("V2 generation not available")
router = APIRouter(prefix="/content", tags=["content-generation"])
@router.get("/health")
async def health_check():
"""
Check if Gaming PC Ollama is available.
Returns availability status for all models in the pipeline.
"""
pc_status = await check_gaming_pc_available()
return {
"gaming_pc": pc_status,
"models": {
"strategy": "llama3.1:8b",
"structure": "qwen2.5-coder:7b",
"draft": "phi4:14b",
"seo": "llama3.1:8b",
},
"pipeline_ready": pc_status["available"]
}
@router.post("/generate", response_model=GenerateResponse)
async def start_generation(
req: Request,
request: GenerateRequest,
user: dict = Depends(require_auth),
):
"""
Start the tiered content generation pipeline.
Returns immediately with a job ID. Poll /jobs/{job_id} for progress.
"""
orchestrator = get_orchestrator()
job_id = orchestrator.create_job(
topic=request.topic,
content_type=request.content_type
)
# Start pipeline in background
import asyncio
asyncio.create_task(
orchestrator.run_pipeline(
job_id=job_id,
topic=request.topic,
content_type=request.content_type,
outline=request.outline,
context=request.context
)
)
return GenerateResponse(
job_id=job_id,
status=PipelineStage.QUEUED,
estimated_seconds=90
)
@router.get("/jobs/{job_id}", response_model=JobDetailResponse)
async def get_job_status(job_id: str):
"""
Get the current status of a generation job.
Poll this endpoint every 2-3 seconds for progress updates.
"""
orchestrator = get_orchestrator()
job = orchestrator.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
result = orchestrator.results.get(job_id)
return JobDetailResponse(
job_id=job.job_id,
status=job.status,
stage_number=job.stage_number,
total_stages=job.total_stages,
progress_percent=job.progress_percent,
current_stage_name=job.current_stage_name,
estimated_seconds_remaining=job.estimated_seconds_remaining,
started_at=job.started_at,
completed_at=job.completed_at,
error=job.error,
result=result,
partial_outputs=None # Could populate from orchestrator if needed
)
@router.post("/jobs/{job_id}/cancel", response_model=CancelResponse)
async def cancel_generation(
job_id: str,
user: dict = Depends(require_auth)
):
"""
Cancel a running generation job.
Returns whether the job was successfully cancelled.
"""
orchestrator = get_orchestrator()
job = orchestrator.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job.status in (PipelineStage.COMPLETED, PipelineStage.FAILED, PipelineStage.CANCELLED):
return CancelResponse(
job_id=job_id,
was_cancelled=False,
message=f"Job already {job.status.value}"
)
was_cancelled = orchestrator.cancel_job(job_id)
return CancelResponse(
job_id=job_id,
was_cancelled=was_cancelled,
message="Job cancelled" if was_cancelled else "Failed to cancel"
)
--- V2 Struggle-First Endpoints ---
@router.get("/suggested-topics")
async def get_suggested_topics(user: dict = Depends(require_auth)):
"""Get suggested struggle topics based on recent memory and projects."""
try:
from shared.grounding import get_struggle_candidates
candidates = get_struggle_candidates()
return {"topics": candidates}
except Exception as e:
return {"topics": [], "error": str(e)}
@router.get("/style-references")
async def get_style_references(user: dict = Depends(require_auth)):
"""List available style references for few-shot prompting."""
if not V2_AVAILABLE:
raise HTTPException(status_code=503, detail="V2 generation not available")
styles = list_style_examples()
return {"styles": styles}
@router.post("/generate-v2")
async def start_generation_v2(
req: Request,
brief_id: str,
style_reference: Optional[str] = None,
user: dict = Depends(require_auth),
):
"""
Start v2 struggle-first content generation from brief.
This endpoint uses the struggle-first prompt template
with optional style example injection.
"""
if not V2_AVAILABLE:
raise HTTPException(status_code=503, detail="V2 generation not available")
# Import here to avoid circular imports
from ..blog.brief_service import get_brief, BriefNotFoundError
from ..blog.generation_status import update_generation_status
from ..blog.generation.prompts import build_struggle_first_prompt
from ..blog.metrics.struggle import calculate_struggle_score
from ..generation.pipeline import generate_ollama
import asyncio
try:
brief = get_brief(brief_id)
except BriefNotFoundError:
raise HTTPException(status_code=404, detail=f"Brief '{brief_id}' not found")
if brief.status not in ("approved", "completed"):
raise HTTPException(
status_code=400,
detail=f"Brief must be approved, current status: {brief.status}"
)
# Update status to generating
update_generation_status(brief_id, "generating")
# Build v2 prompt
prompt = build_struggle_first_prompt(
brief=brief.__dict__,
style_reference=style_reference or brief.style_reference
)
# Generate directly with phi4:14b (skip 5-stage for v2)
job_id = f"v2-{brief_id[:8]}"
async def run_v2_generation():
try:
response = await generate_ollama(
model="phi4:14b",
prompt=prompt,
temperature=0.7,
max_tokens=4000,
timeout=180.0
)
content = response.strip()
# Calculate struggle score
score_result = calculate_struggle_score(content)
# Save output
update_generation_status(
brief_id,
"completed",
content_output=content,
struggle_score=score_result["score"]
)
return content, score_result["score"]
except Exception as e:
update_generation_status(brief_id, "failed")
raise e
# Start in background
asyncio.create_task(run_v2_generation())
return GenerateResponse(
job_id=job_id,
status=PipelineStage.QUEUED,
estimated_seconds=90,
message="Struggle-first generation started"
)