"""Content brief v2 service layer — business logic for struggle-first pipeline."""
import json
import re
import uuid
from datetime import datetime, timezone
from typing import Optional
from .storage import get_db, DB_PATH
from .models import (
BriefAttempt,
VoiceChecklist,
BriefStatus,
CreateBriefRequest,
UpdateBriefRequest,
BriefSummary,
BriefDetail,
)
def _now() -> str:
"""Current UTC timestamp as ISO string."""
return datetime.now(timezone.utc).isoformat()
def _attempts_to_json(attempts: list[BriefAttempt]) -> str:
"""Convert list of BriefAttempt to JSON string."""
return json.dumps([{"attempt": a.attempt, "why_failed": a.why_failed} for a in attempts])
def _json_to_attempts(data: str) -> list[BriefAttempt]:
"""Convert JSON string to list of BriefAttempt."""
items = json.loads(data) if data else []
return [BriefAttempt(attempt=i["attempt"], why_failed=i["why_failed"]) for i in items]
def _voice_to_json(voice: VoiceChecklist) -> str:
"""Convert VoiceChecklist to JSON string."""
return json.dumps({
"i_statements": voice.i_statements,
"temporal_markers": voice.temporal_markers,
"struggle_patterns": voice.struggle_patterns,
"cost_mentions": voice.cost_mentions,
"admission_count": voice.admission_count,
"sounds_like_me": voice.sounds_like_me,
})
def _json_to_voice(data: str) -> VoiceChecklist:
"""Convert JSON string to VoiceChecklist."""
d = json.loads(data) if data else {}
return VoiceChecklist(
i_statements=d.get("i_statements", False),
temporal_markers=d.get("temporal_markers", False),
struggle_patterns=d.get("struggle_patterns", False),
cost_mentions=d.get("cost_mentions", False),
admission_count=d.get("admission_count", False),
sounds_like_me=d.get("sounds_like_me", False),
)
class BriefNotFoundError(Exception):
"""Raised when a brief is not found."""
pass
class BriefValidationError(Exception):
"""Raised when brief validation fails."""
pass
─── CRUD Operations ─────────────────────────────────────────────────────
def create_brief(request: CreateBriefRequest) -> BriefDetail:
"""Create a new content brief."""
brief_id = f"brief-{uuid.uuid4().hex[:12]}"
now = _now()
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO content_briefs_v2 (
id, title, struggle_angle, origin_story, attempts_json,
the_moment, the_fix, reflection, voice_checklist_json,
status, style_reference, target_length, created_by,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
brief_id, request.title, request.struggle_angle, request.origin_story,
_attempts_to_json(request.attempts), request.the_moment, request.the_fix,
request.reflection, _voice_to_json(request.voice_checklist),
BriefStatus.DRAFT, request.style_reference, request.target_length,
request.created_by, now, now,
))
conn.commit()
return get_brief(brief_id)
finally:
conn.close()
def get_brief(brief_id: str) -> BriefDetail:
"""Get a brief by ID."""
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM content_briefs_v2 WHERE id = ?",
(brief_id,)
)
row = cursor.fetchone()
if not row:
raise BriefNotFoundError(f"Brief '{brief_id}' not found")
return _row_to_detail(row)
finally:
conn.close()
def list_briefs(
status: Optional[str] = None,
created_by: Optional[str] = None,
page: int = 1,
per_page: int = 20,
) -> tuple[list[BriefSummary], int]:
"""List briefs with optional filtering."""
conn = get_db()
try:
cursor = conn.cursor()
where_clauses = []
params = []
if status:
where_clauses.append("status = ?")
params.append(status)
if created_by:
where_clauses.append("created_by = ?")
params.append(created_by)
where_sql = "WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Count total
cursor.execute(f"SELECT COUNT(*) FROM content_briefs_v2 {where_sql}", params)
total = cursor.fetchone()[0]
# Fetch page
offset = (page - 1) * per_page
cursor.execute(
f"""
SELECT id, title, status, created_by, created_at, updated_at, struggle_score
FROM content_briefs_v2
{where_sql}
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
params + [per_page, offset],
)
rows = cursor.fetchall()
briefs = [_row_to_summary(r) for r in rows]
return briefs, total
finally:
conn.close()
def update_brief(brief_id: str, request: UpdateBriefRequest) -> BriefDetail:
"""Update an existing brief. Only works in draft or rejected status."""
brief = get_brief(brief_id)
if brief.status not in (BriefStatus.DRAFT, BriefStatus.REJECTED):
raise BriefValidationError(f"Cannot edit brief in status '{brief.status}'")
now = _now()
conn = get_db()
try:
cursor = conn.cursor()
fields = []
params = []
if request.title is not None:
fields.append("title = ?")
params.append(request.title)
if request.struggle_angle is not None:
fields.append("struggle_angle = ?")
params.append(request.struggle_angle)
if request.origin_story is not None:
fields.append("origin_story = ?")
params.append(request.origin_story)
if request.attempts is not None:
fields.append("attempts_json = ?")
params.append(_attempts_to_json(request.attempts))
if request.the_moment is not None:
fields.append("the_moment = ?")
params.append(request.the_moment)
if request.the_fix is not None:
fields.append("the_fix = ?")
params.append(request.the_fix)
if request.reflection is not None:
fields.append("reflection = ?")
params.append(request.reflection)
if request.voice_checklist is not None:
fields.append("voice_checklist_json = ?")
params.append(_voice_to_json(request.voice_checklist))
if request.style_reference is not None:
fields.append("style_reference = ?")
params.append(request.style_reference)
if request.target_length is not None:
fields.append("target_length = ?")
params.append(request.target_length)
fields.append("updated_at = ?")
params.append(now)
params.append(brief_id)
cursor.execute(
f"UPDATE content_briefs_v2 SET {', '.join(fields)} WHERE id = ?",
params,
)
conn.commit()
return get_brief(brief_id)
finally:
conn.close()
def submit_brief(brief_id: str) -> BriefDetail:
"""Submit a brief for approval. Validates voice checklist first."""
brief = get_brief(brief_id)
if brief.status != BriefStatus.DRAFT:
raise BriefValidationError(f"Brief must be in draft status to submit, got '{brief.status}'")
# Validate voice checklist — all must be true
voice = brief.voice_checklist
missing = []
if not voice.i_statements:
missing.append("i_statements")
if not voice.temporal_markers:
missing.append("temporal_markers")
if not voice.struggle_patterns:
missing.append("struggle_patterns")
if not voice.cost_mentions:
missing.append("cost_mentions")
if not voice.admission_count:
missing.append("admission_count")
if not voice.sounds_like_me:
missing.append("sounds_like_me")
if missing:
raise BriefValidationError(f"Voice checklist incomplete: {', '.join(missing)}")
now = _now()
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute(
"UPDATE content_briefs_v2 SET status = ?, updated_at = ? WHERE id = ?",
(BriefStatus.PENDING, now, brief_id),
)
conn.commit()
return get_brief(brief_id)
finally:
conn.close()
def approve_brief(brief_id: str, approved_by: str) -> BriefDetail:
"""Approve a brief and trigger generation."""
brief = get_brief(brief_id)
if brief.status != BriefStatus.PENDING:
raise BriefValidationError(f"Brief must be pending to approve, got '{brief.status}'")
now = _now()
job_id = f"gen-{uuid.uuid4().hex[:8]}"
conn = get_db()
try:
cursor = conn.cursor()
cursor.execute(
"""UPDATE content_briefs_v2
SET status = ?, approved_at = ?, approved_by = ?, generation_job_id = ?, updated_at = ?
WHERE id = ?""",
(BriefStatus.APPROVED, now, approved_by, job_id, now, brief_id),
)
conn.commit()
return get_brief(brief_id)
finally:
conn.close()
def reject_brief(brief_id: str, feedback: str = "") -> BriefDetail:
"""Reject a brief and return it to draft with feedback."""
brief = get_brief(brief_id)
if brief.status != BriefStatus.PENDING:
raise BriefValidationError(f"Brief must be pending to reject, got '{brief.status}'")
now = _now()
conn = get_db()
try:
cursor = conn.cursor()
# Store feedback in reflection field for now — v2 could add a feedback column
cursor.execute(
"UPDATE content_briefs_v2 SET status = ?, reflection = reflection || ? || ? || ?, updated_at = ? WHERE id = ?",
(BriefStatus.REJECTED, "\n\n--- Editor Feedback ---\n", feedback, f"\n(rejected at {now})", now, brief_id),
)
conn.commit()
return get_brief(brief_id)
finally:
conn.close()
def update_generation_status(brief_id: str, status: str, content: Optional[str] = None, html: Optional[str] = None, struggle_score: Optional[float] = None) -> BriefDetail:
"""Update brief after generation completes or fails."""
now = _now()
conn = get_db()
try:
cursor = conn.cursor()
if content is not None:
cursor.execute(
"UPDATE content_briefs_v2 SET status = ?, content_output = ?, content_html = ?, struggle_score = ?, updated_at = ? WHERE id = ?",
(status, content, html, struggle_score, now, brief_id),
)
else:
cursor.execute(
"UPDATE content_briefs_v2 SET status = ?, updated_at = ? WHERE id = ?",
(status, now, brief_id),
)
conn.commit()
return get_brief(brief_id)
finally:
conn.close()
─── Struggle Score ────────────────────────────────────────────────────────
def calculate_struggle_score(content: str) -> dict:
"""Calculate struggle score using heuristics per v2 spec.
Returns dict with score (0-100) and component breakdown.
"""
paragraphs = content.split("\n\n")
paragraph_count = max(len(paragraphs), 1)
checks = {
"i_statement_ratio": len(re.findall(r"^I\s", content, re.MULTILINE)) / paragraph_count,
"temporal_markers": len(re.findall(r"\d{1,2}:\d{2}|yesterday|last night|this morning|2 AM|3 AM", content, re.IGNORECASE)),
"struggle_patterns": len(re.findall(r"I thought.*but|I tried.*failed|didn't work|wasn't working|broke|broken", content, re.IGNORECASE)),
"cost_mentions": len(re.findall(r"hour|minute|sleep|wife|kid|Aundrea|frustrated|wasted|lost", content, re.IGNORECASE)),
"admission_count": len(re.findall(r"I was wrong|I didn't expect|should have|could have|mistake|oops", content, re.IGNORECASE)),
}
weights = {
"i_statement_ratio": 30.0,
"temporal_markers": 15.0,
"struggle_patterns": 25.0,
"cost_mentions": 20.0,
"admission_count": 10.0,
}
# Normalize temporal markers and cost mentions (cap at reasonable max)
checks["temporal_markers"] = min(checks["temporal_markers"], 5) / 5
checks["struggle_patterns"] = min(checks["struggle_patterns"], 5) / 5
checks["cost_mentions"] = min(checks["cost_mentions"], 5) / 5
checks["admission_count"] = min(checks["admission_count"], 5) / 5
# Cap i-statement ratio at 1.0
checks["i_statement_ratio"] = min(checks["i_statement_ratio"], 1.0)
score = sum(checks[k] * weights[k] for k in weights)
score = min(score, 100.0)
# Interpretation
if score >= 75:
interpretation = "Strong struggle narrative — publish-ready"
elif score >= 50:
interpretation = "Good struggle elements — light editing recommended"
elif score >= 25:
interpretation = "Some struggle signals — needs revision"
else:
interpretation = "Weak struggle narrative — rewrite recommended"
return {
"score": round(score, 1),
"checks": checks,
"interpretation": interpretation,
}
─── Row Converters ───────────────────────────────────────────────────────
def _row_to_summary(row) -> BriefSummary:
"""Convert a DB row to BriefSummary."""
return BriefSummary(
id=row["id"],
title=row["title"],
status=row["status"],
created_by=row["created_by"],
created_at=_parse_dt(row["created_at"]),
updated_at=_parse_dt(row["updated_at"]),
struggle_score=row["struggle_score"],
)
def _row_to_detail(row) -> BriefDetail:
"""Convert a DB row to BriefDetail."""
return BriefDetail(
id=row["id"],
title=row["title"],
status=row["status"],
created_by=row["created_by"],
created_at=_parse_dt(row["created_at"]),
updated_at=_parse_dt(row["updated_at"]),
struggle_score=row["struggle_score"],
struggle_angle=row["struggle_angle"],
origin_story=row["origin_story"],
attempts=_json_to_attempts(row["attempts_json"]),
the_moment=row["the_moment"],
the_fix=row["the_fix"],
reflection=row["reflection"],
voice_checklist=_json_to_voice(row["voice_checklist_json"]),
style_reference=row["style_reference"],
target_length=row["target_length"],
approved_at=_parse_dt(row["approved_at"]) if row["approved_at"] else None,
approved_by=row["approved_by"],
generation_job_id=row["generation_job_id"],
content_output=row["content_output"],
content_html=row["content_html"],
)
def _parse_dt(value) -> Optional[datetime]:
"""Parse ISO datetime string."""
if not value:
return None
if isinstance(value, datetime):
return value
# Handle both 'Z' suffix and '+00:00'
value = value.replace("Z", "+00:00")
try:
return datetime.fromisoformat(value)
except ValueError:
return None