📄 brief_service.py 15,610 bytes Apr 22, 2026 📋 Raw

"""Content brief v2 service layer — business logic for struggle-first pipeline."""

import json
import re
import uuid
from datetime import datetime, timezone
from typing import Optional

from .storage import get_db, DB_PATH
from .models import (
BriefAttempt,
VoiceChecklist,
BriefStatus,
CreateBriefRequest,
UpdateBriefRequest,
BriefSummary,
BriefDetail,
)

def _now() -> str:
"""Current UTC timestamp as ISO string."""
return datetime.now(timezone.utc).isoformat()

def _attempts_to_json(attempts: list[BriefAttempt]) -> str:
"""Convert list of BriefAttempt to JSON string."""
return json.dumps([{"attempt": a.attempt, "why_failed": a.why_failed} for a in attempts])

def _json_to_attempts(data: str) -> list[BriefAttempt]:
"""Convert JSON string to list of BriefAttempt."""
items = json.loads(data) if data else []
return [BriefAttempt(attempt=i["attempt"], why_failed=i["why_failed"]) for i in items]

def _voice_to_json(voice: VoiceChecklist) -> str:
"""Convert VoiceChecklist to JSON string."""
return json.dumps({
"i_statements": voice.i_statements,
"temporal_markers": voice.temporal_markers,
"struggle_patterns": voice.struggle_patterns,
"cost_mentions": voice.cost_mentions,
"admission_count": voice.admission_count,
"sounds_like_me": voice.sounds_like_me,
})

def _json_to_voice(data: str) -> VoiceChecklist:
"""Convert JSON string to VoiceChecklist."""
d = json.loads(data) if data else {}
return VoiceChecklist(
i_statements=d.get("i_statements", False),
temporal_markers=d.get("temporal_markers", False),
struggle_patterns=d.get("struggle_patterns", False),
cost_mentions=d.get("cost_mentions", False),
admission_count=d.get("admission_count", False),
sounds_like_me=d.get("sounds_like_me", False),
)

class BriefNotFoundError(Exception):
"""Raised when a brief is not found."""
pass

class BriefValidationError(Exception):
"""Raised when brief validation fails."""
pass

─── CRUD Operations ─────────────────────────────────────────────────────

def create_brief(request: CreateBriefRequest) -> BriefDetail:
"""Create a new content brief."""
brief_id = f"brief-{uuid.uuid4().hex[:12]}"
now = _now()

conn = get_db()
try:
    cursor = conn.cursor()
    cursor.execute("""
        INSERT INTO content_briefs_v2 (
            id, title, struggle_angle, origin_story, attempts_json,
            the_moment, the_fix, reflection, voice_checklist_json,
            status, style_reference, target_length, created_by,
            created_at, updated_at
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        brief_id, request.title, request.struggle_angle, request.origin_story,
        _attempts_to_json(request.attempts), request.the_moment, request.the_fix,
        request.reflection, _voice_to_json(request.voice_checklist),
        BriefStatus.DRAFT, request.style_reference, request.target_length,
        request.created_by, now, now,
    ))
    conn.commit()
    return get_brief(brief_id)
finally:
    conn.close()

def get_brief(brief_id: str) -> BriefDetail:
"""Get a brief by ID."""
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM content_briefs_v2 WHERE id = ?",
(brief_id,)
)
row = cursor.fetchone()
if not row:
raise BriefNotFoundError(f"Brief '{brief_id}' not found")
return _row_to_detail(row)
finally:
conn.close()

def list_briefs(
status: Optional[str] = None,
created_by: Optional[str] = None,
page: int = 1,
per_page: int = 20,
) -> tuple[list[BriefSummary], int]:
"""List briefs with optional filtering."""
conn = get_db()
try:
cursor = conn.cursor()

    where_clauses = []
    params = []
    if status:
        where_clauses.append("status = ?")
        params.append(status)
    if created_by:
        where_clauses.append("created_by = ?")
        params.append(created_by)

    where_sql = "WHERE " + " AND ".join(where_clauses) if where_clauses else ""

    # Count total
    cursor.execute(f"SELECT COUNT(*) FROM content_briefs_v2 {where_sql}", params)
    total = cursor.fetchone()[0]

    # Fetch page
    offset = (page - 1) * per_page
    cursor.execute(
        f"""
        SELECT id, title, status, created_by, created_at, updated_at, struggle_score
        FROM content_briefs_v2
        {where_sql}
        ORDER BY created_at DESC
        LIMIT ? OFFSET ?
        """,
        params + [per_page, offset],
    )
    rows = cursor.fetchall()
    briefs = [_row_to_summary(r) for r in rows]
    return briefs, total
finally:
    conn.close()

def update_brief(brief_id: str, request: UpdateBriefRequest) -> BriefDetail:
"""Update an existing brief. Only works in draft or rejected status."""
brief = get_brief(brief_id)
if brief.status not in (BriefStatus.DRAFT, BriefStatus.REJECTED):
raise BriefValidationError(f"Cannot edit brief in status '{brief.status}'")

now = _now()
conn = get_db()
try:
    cursor = conn.cursor()

    fields = []
    params = []

    if request.title is not None:
        fields.append("title = ?")
        params.append(request.title)
    if request.struggle_angle is not None:
        fields.append("struggle_angle = ?")
        params.append(request.struggle_angle)
    if request.origin_story is not None:
        fields.append("origin_story = ?")
        params.append(request.origin_story)
    if request.attempts is not None:
        fields.append("attempts_json = ?")
        params.append(_attempts_to_json(request.attempts))
    if request.the_moment is not None:
        fields.append("the_moment = ?")
        params.append(request.the_moment)
    if request.the_fix is not None:
        fields.append("the_fix = ?")
        params.append(request.the_fix)
    if request.reflection is not None:
        fields.append("reflection = ?")
        params.append(request.reflection)
    if request.voice_checklist is not None:
        fields.append("voice_checklist_json = ?")
        params.append(_voice_to_json(request.voice_checklist))
    if request.style_reference is not None:
        fields.append("style_reference = ?")
        params.append(request.style_reference)
    if request.target_length is not None:
        fields.append("target_length = ?")
        params.append(request.target_length)

    fields.append("updated_at = ?")
    params.append(now)
    params.append(brief_id)

    cursor.execute(
        f"UPDATE content_briefs_v2 SET {', '.join(fields)} WHERE id = ?",
        params,
    )
    conn.commit()
    return get_brief(brief_id)
finally:
    conn.close()

def submit_brief(brief_id: str) -> BriefDetail:
"""Submit a brief for approval. Validates voice checklist first."""
brief = get_brief(brief_id)

if brief.status != BriefStatus.DRAFT:
    raise BriefValidationError(f"Brief must be in draft status to submit, got '{brief.status}'")

# Validate voice checklist  all must be true
voice = brief.voice_checklist
missing = []
if not voice.i_statements:
    missing.append("i_statements")
if not voice.temporal_markers:
    missing.append("temporal_markers")
if not voice.struggle_patterns:
    missing.append("struggle_patterns")
if not voice.cost_mentions:
    missing.append("cost_mentions")
if not voice.admission_count:
    missing.append("admission_count")
if not voice.sounds_like_me:
    missing.append("sounds_like_me")

if missing:
    raise BriefValidationError(f"Voice checklist incomplete: {', '.join(missing)}")

now = _now()
conn = get_db()
try:
    cursor = conn.cursor()
    cursor.execute(
        "UPDATE content_briefs_v2 SET status = ?, updated_at = ? WHERE id = ?",
        (BriefStatus.PENDING, now, brief_id),
    )
    conn.commit()
    return get_brief(brief_id)
finally:
    conn.close()

def approve_brief(brief_id: str, approved_by: str) -> BriefDetail:
"""Approve a brief and trigger generation."""
brief = get_brief(brief_id)
if brief.status != BriefStatus.PENDING:
raise BriefValidationError(f"Brief must be pending to approve, got '{brief.status}'")

now = _now()
job_id = f"gen-{uuid.uuid4().hex[:8]}"

conn = get_db()
try:
    cursor = conn.cursor()
    cursor.execute(
        """UPDATE content_briefs_v2 
           SET status = ?, approved_at = ?, approved_by = ?, generation_job_id = ?, updated_at = ?
           WHERE id = ?""",
        (BriefStatus.APPROVED, now, approved_by, job_id, now, brief_id),
    )
    conn.commit()
    return get_brief(brief_id)
finally:
    conn.close()

def reject_brief(brief_id: str, feedback: str = "") -> BriefDetail:
"""Reject a brief and return it to draft with feedback."""
brief = get_brief(brief_id)
if brief.status != BriefStatus.PENDING:
raise BriefValidationError(f"Brief must be pending to reject, got '{brief.status}'")

now = _now()
conn = get_db()
try:
    cursor = conn.cursor()
    # Store feedback in reflection field for now — v2 could add a feedback column
    cursor.execute(
        "UPDATE content_briefs_v2 SET status = ?, reflection = reflection || ? || ? || ?, updated_at = ? WHERE id = ?",
        (BriefStatus.REJECTED, "\n\n--- Editor Feedback ---\n", feedback, f"\n(rejected at {now})", now, brief_id),
    )
    conn.commit()
    return get_brief(brief_id)
finally:
    conn.close()

def update_generation_status(brief_id: str, status: str, content: Optional[str] = None, html: Optional[str] = None, struggle_score: Optional[float] = None) -> BriefDetail:
"""Update brief after generation completes or fails."""
now = _now()
conn = get_db()
try:
cursor = conn.cursor()
if content is not None:
cursor.execute(
"UPDATE content_briefs_v2 SET status = ?, content_output = ?, content_html = ?, struggle_score = ?, updated_at = ? WHERE id = ?",
(status, content, html, struggle_score, now, brief_id),
)
else:
cursor.execute(
"UPDATE content_briefs_v2 SET status = ?, updated_at = ? WHERE id = ?",
(status, now, brief_id),
)
conn.commit()
return get_brief(brief_id)
finally:
conn.close()

─── Struggle Score ────────────────────────────────────────────────────────

def calculate_struggle_score(content: str) -> dict:
"""Calculate struggle score using heuristics per v2 spec.

Returns dict with score (0-100) and component breakdown.
"""
paragraphs = content.split("\n\n")
paragraph_count = max(len(paragraphs), 1)

checks = {
    "i_statement_ratio": len(re.findall(r"^I\s", content, re.MULTILINE)) / paragraph_count,
    "temporal_markers": len(re.findall(r"\d{1,2}:\d{2}|yesterday|last night|this morning|2 AM|3 AM", content, re.IGNORECASE)),
    "struggle_patterns": len(re.findall(r"I thought.*but|I tried.*failed|didn't work|wasn't working|broke|broken", content, re.IGNORECASE)),
    "cost_mentions": len(re.findall(r"hour|minute|sleep|wife|kid|Aundrea|frustrated|wasted|lost", content, re.IGNORECASE)),
    "admission_count": len(re.findall(r"I was wrong|I didn't expect|should have|could have|mistake|oops", content, re.IGNORECASE)),
}

weights = {
    "i_statement_ratio": 30.0,
    "temporal_markers": 15.0,
    "struggle_patterns": 25.0,
    "cost_mentions": 20.0,
    "admission_count": 10.0,
}

# Normalize temporal markers and cost mentions (cap at reasonable max)
checks["temporal_markers"] = min(checks["temporal_markers"], 5) / 5
checks["struggle_patterns"] = min(checks["struggle_patterns"], 5) / 5
checks["cost_mentions"] = min(checks["cost_mentions"], 5) / 5
checks["admission_count"] = min(checks["admission_count"], 5) / 5

# Cap i-statement ratio at 1.0
checks["i_statement_ratio"] = min(checks["i_statement_ratio"], 1.0)

score = sum(checks[k] * weights[k] for k in weights)
score = min(score, 100.0)

# Interpretation
if score >= 75:
    interpretation = "Strong struggle narrative  publish-ready"
elif score >= 50:
    interpretation = "Good struggle elements  light editing recommended"
elif score >= 25:
    interpretation = "Some struggle signals  needs revision"
else:
    interpretation = "Weak struggle narrative  rewrite recommended"

return {
    "score": round(score, 1),
    "checks": checks,
    "interpretation": interpretation,
}

─── Row Converters ───────────────────────────────────────────────────────

def _row_to_summary(row) -> BriefSummary:
"""Convert a DB row to BriefSummary."""
return BriefSummary(
id=row["id"],
title=row["title"],
status=row["status"],
created_by=row["created_by"],
created_at=_parse_dt(row["created_at"]),
updated_at=_parse_dt(row["updated_at"]),
struggle_score=row["struggle_score"],
)

def _row_to_detail(row) -> BriefDetail:
"""Convert a DB row to BriefDetail."""
return BriefDetail(
id=row["id"],
title=row["title"],
status=row["status"],
created_by=row["created_by"],
created_at=_parse_dt(row["created_at"]),
updated_at=_parse_dt(row["updated_at"]),
struggle_score=row["struggle_score"],
struggle_angle=row["struggle_angle"],
origin_story=row["origin_story"],
attempts=_json_to_attempts(row["attempts_json"]),
the_moment=row["the_moment"],
the_fix=row["the_fix"],
reflection=row["reflection"],
voice_checklist=_json_to_voice(row["voice_checklist_json"]),
style_reference=row["style_reference"],
target_length=row["target_length"],
approved_at=_parse_dt(row["approved_at"]) if row["approved_at"] else None,
approved_by=row["approved_by"],
generation_job_id=row["generation_job_id"],
content_output=row["content_output"],
content_html=row["content_html"],
)

def _parse_dt(value) -> Optional[datetime]:
"""Parse ISO datetime string."""
if not value:
return None
if isinstance(value, datetime):
return value
# Handle both 'Z' suffix and '+00:00'
value = value.replace("Z", "+00:00")
try:
return datetime.fromisoformat(value)
except ValueError:
return None