"""Content brief v2 service layer — business logic for struggle-first pipeline.""" import json import re import uuid from datetime import datetime, timezone from typing import Optional from .storage import get_db, DB_PATH from .models import ( BriefAttempt, VoiceChecklist, BriefStatus, CreateBriefRequest, UpdateBriefRequest, BriefSummary, BriefDetail, ) def _now() -> str: """Current UTC timestamp as ISO string.""" return datetime.now(timezone.utc).isoformat() def _attempts_to_json(attempts: list[BriefAttempt]) -> str: """Convert list of BriefAttempt to JSON string.""" return json.dumps([{"attempt": a.attempt, "why_failed": a.why_failed} for a in attempts]) def _json_to_attempts(data: str) -> list[BriefAttempt]: """Convert JSON string to list of BriefAttempt.""" items = json.loads(data) if data else [] return [BriefAttempt(attempt=i["attempt"], why_failed=i["why_failed"]) for i in items] def _voice_to_json(voice: VoiceChecklist) -> str: """Convert VoiceChecklist to JSON string.""" return json.dumps({ "i_statements": voice.i_statements, "temporal_markers": voice.temporal_markers, "struggle_patterns": voice.struggle_patterns, "cost_mentions": voice.cost_mentions, "admission_count": voice.admission_count, "sounds_like_me": voice.sounds_like_me, }) def _json_to_voice(data: str) -> VoiceChecklist: """Convert JSON string to VoiceChecklist.""" d = json.loads(data) if data else {} return VoiceChecklist( i_statements=d.get("i_statements", False), temporal_markers=d.get("temporal_markers", False), struggle_patterns=d.get("struggle_patterns", False), cost_mentions=d.get("cost_mentions", False), admission_count=d.get("admission_count", False), sounds_like_me=d.get("sounds_like_me", False), ) class BriefNotFoundError(Exception): """Raised when a brief is not found.""" pass class BriefValidationError(Exception): """Raised when brief validation fails.""" pass # ─── CRUD Operations ───────────────────────────────────────────────────── def create_brief(request: CreateBriefRequest) -> BriefDetail: """Create a new content brief.""" brief_id = f"brief-{uuid.uuid4().hex[:12]}" now = _now() conn = get_db() try: cursor = conn.cursor() cursor.execute(""" INSERT INTO content_briefs_v2 ( id, title, struggle_angle, origin_story, attempts_json, the_moment, the_fix, reflection, voice_checklist_json, status, style_reference, target_length, created_by, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( brief_id, request.title, request.struggle_angle, request.origin_story, _attempts_to_json(request.attempts), request.the_moment, request.the_fix, request.reflection, _voice_to_json(request.voice_checklist), BriefStatus.DRAFT, request.style_reference, request.target_length, request.created_by, now, now, )) conn.commit() return get_brief(brief_id) finally: conn.close() def get_brief(brief_id: str) -> BriefDetail: """Get a brief by ID.""" conn = get_db() try: cursor = conn.cursor() cursor.execute( "SELECT * FROM content_briefs_v2 WHERE id = ?", (brief_id,) ) row = cursor.fetchone() if not row: raise BriefNotFoundError(f"Brief '{brief_id}' not found") return _row_to_detail(row) finally: conn.close() def list_briefs( status: Optional[str] = None, created_by: Optional[str] = None, page: int = 1, per_page: int = 20, ) -> tuple[list[BriefSummary], int]: """List briefs with optional filtering.""" conn = get_db() try: cursor = conn.cursor() where_clauses = [] params = [] if status: where_clauses.append("status = ?") params.append(status) if created_by: where_clauses.append("created_by = ?") params.append(created_by) where_sql = "WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Count total cursor.execute(f"SELECT COUNT(*) FROM content_briefs_v2 {where_sql}", params) total = cursor.fetchone()[0] # Fetch page offset = (page - 1) * per_page cursor.execute( f""" SELECT id, title, status, created_by, created_at, updated_at, struggle_score FROM content_briefs_v2 {where_sql} ORDER BY created_at DESC LIMIT ? OFFSET ? """, params + [per_page, offset], ) rows = cursor.fetchall() briefs = [_row_to_summary(r) for r in rows] return briefs, total finally: conn.close() def update_brief(brief_id: str, request: UpdateBriefRequest) -> BriefDetail: """Update an existing brief. Only works in draft or rejected status.""" brief = get_brief(brief_id) if brief.status not in (BriefStatus.DRAFT, BriefStatus.REJECTED): raise BriefValidationError(f"Cannot edit brief in status '{brief.status}'") now = _now() conn = get_db() try: cursor = conn.cursor() fields = [] params = [] if request.title is not None: fields.append("title = ?") params.append(request.title) if request.struggle_angle is not None: fields.append("struggle_angle = ?") params.append(request.struggle_angle) if request.origin_story is not None: fields.append("origin_story = ?") params.append(request.origin_story) if request.attempts is not None: fields.append("attempts_json = ?") params.append(_attempts_to_json(request.attempts)) if request.the_moment is not None: fields.append("the_moment = ?") params.append(request.the_moment) if request.the_fix is not None: fields.append("the_fix = ?") params.append(request.the_fix) if request.reflection is not None: fields.append("reflection = ?") params.append(request.reflection) if request.voice_checklist is not None: fields.append("voice_checklist_json = ?") params.append(_voice_to_json(request.voice_checklist)) if request.style_reference is not None: fields.append("style_reference = ?") params.append(request.style_reference) if request.target_length is not None: fields.append("target_length = ?") params.append(request.target_length) fields.append("updated_at = ?") params.append(now) params.append(brief_id) cursor.execute( f"UPDATE content_briefs_v2 SET {', '.join(fields)} WHERE id = ?", params, ) conn.commit() return get_brief(brief_id) finally: conn.close() def submit_brief(brief_id: str) -> BriefDetail: """Submit a brief for approval. Validates voice checklist first.""" brief = get_brief(brief_id) if brief.status != BriefStatus.DRAFT: raise BriefValidationError(f"Brief must be in draft status to submit, got '{brief.status}'") # Validate voice checklist — all must be true voice = brief.voice_checklist missing = [] if not voice.i_statements: missing.append("i_statements") if not voice.temporal_markers: missing.append("temporal_markers") if not voice.struggle_patterns: missing.append("struggle_patterns") if not voice.cost_mentions: missing.append("cost_mentions") if not voice.admission_count: missing.append("admission_count") if not voice.sounds_like_me: missing.append("sounds_like_me") if missing: raise BriefValidationError(f"Voice checklist incomplete: {', '.join(missing)}") now = _now() conn = get_db() try: cursor = conn.cursor() cursor.execute( "UPDATE content_briefs_v2 SET status = ?, updated_at = ? WHERE id = ?", (BriefStatus.PENDING, now, brief_id), ) conn.commit() return get_brief(brief_id) finally: conn.close() def approve_brief(brief_id: str, approved_by: str) -> BriefDetail: """Approve a brief and trigger generation.""" brief = get_brief(brief_id) if brief.status != BriefStatus.PENDING: raise BriefValidationError(f"Brief must be pending to approve, got '{brief.status}'") now = _now() job_id = f"gen-{uuid.uuid4().hex[:8]}" conn = get_db() try: cursor = conn.cursor() cursor.execute( """UPDATE content_briefs_v2 SET status = ?, approved_at = ?, approved_by = ?, generation_job_id = ?, updated_at = ? WHERE id = ?""", (BriefStatus.APPROVED, now, approved_by, job_id, now, brief_id), ) conn.commit() return get_brief(brief_id) finally: conn.close() def reject_brief(brief_id: str, feedback: str = "") -> BriefDetail: """Reject a brief and return it to draft with feedback.""" brief = get_brief(brief_id) if brief.status != BriefStatus.PENDING: raise BriefValidationError(f"Brief must be pending to reject, got '{brief.status}'") now = _now() conn = get_db() try: cursor = conn.cursor() # Store feedback in reflection field for now — v2 could add a feedback column cursor.execute( "UPDATE content_briefs_v2 SET status = ?, reflection = reflection || ? || ? || ?, updated_at = ? WHERE id = ?", (BriefStatus.REJECTED, "\n\n--- Editor Feedback ---\n", feedback, f"\n(rejected at {now})", now, brief_id), ) conn.commit() return get_brief(brief_id) finally: conn.close() def update_generation_status(brief_id: str, status: str, content: Optional[str] = None, html: Optional[str] = None, struggle_score: Optional[float] = None) -> BriefDetail: """Update brief after generation completes or fails.""" now = _now() conn = get_db() try: cursor = conn.cursor() if content is not None: cursor.execute( "UPDATE content_briefs_v2 SET status = ?, content_output = ?, content_html = ?, struggle_score = ?, updated_at = ? WHERE id = ?", (status, content, html, struggle_score, now, brief_id), ) else: cursor.execute( "UPDATE content_briefs_v2 SET status = ?, updated_at = ? WHERE id = ?", (status, now, brief_id), ) conn.commit() return get_brief(brief_id) finally: conn.close() # ─── Struggle Score ──────────────────────────────────────────────────────── def calculate_struggle_score(content: str) -> dict: """Calculate struggle score using heuristics per v2 spec. Returns dict with score (0-100) and component breakdown. """ paragraphs = content.split("\n\n") paragraph_count = max(len(paragraphs), 1) checks = { "i_statement_ratio": len(re.findall(r"^I\s", content, re.MULTILINE)) / paragraph_count, "temporal_markers": len(re.findall(r"\d{1,2}:\d{2}|yesterday|last night|this morning|2 AM|3 AM", content, re.IGNORECASE)), "struggle_patterns": len(re.findall(r"I thought.*but|I tried.*failed|didn't work|wasn't working|broke|broken", content, re.IGNORECASE)), "cost_mentions": len(re.findall(r"hour|minute|sleep|wife|kid|Aundrea|frustrated|wasted|lost", content, re.IGNORECASE)), "admission_count": len(re.findall(r"I was wrong|I didn't expect|should have|could have|mistake|oops", content, re.IGNORECASE)), } weights = { "i_statement_ratio": 30.0, "temporal_markers": 15.0, "struggle_patterns": 25.0, "cost_mentions": 20.0, "admission_count": 10.0, } # Normalize temporal markers and cost mentions (cap at reasonable max) checks["temporal_markers"] = min(checks["temporal_markers"], 5) / 5 checks["struggle_patterns"] = min(checks["struggle_patterns"], 5) / 5 checks["cost_mentions"] = min(checks["cost_mentions"], 5) / 5 checks["admission_count"] = min(checks["admission_count"], 5) / 5 # Cap i-statement ratio at 1.0 checks["i_statement_ratio"] = min(checks["i_statement_ratio"], 1.0) score = sum(checks[k] * weights[k] for k in weights) score = min(score, 100.0) # Interpretation if score >= 75: interpretation = "Strong struggle narrative — publish-ready" elif score >= 50: interpretation = "Good struggle elements — light editing recommended" elif score >= 25: interpretation = "Some struggle signals — needs revision" else: interpretation = "Weak struggle narrative — rewrite recommended" return { "score": round(score, 1), "checks": checks, "interpretation": interpretation, } # ─── Row Converters ─────────────────────────────────────────────────────── def _row_to_summary(row) -> BriefSummary: """Convert a DB row to BriefSummary.""" return BriefSummary( id=row["id"], title=row["title"], status=row["status"], created_by=row["created_by"], created_at=_parse_dt(row["created_at"]), updated_at=_parse_dt(row["updated_at"]), struggle_score=row["struggle_score"], ) def _row_to_detail(row) -> BriefDetail: """Convert a DB row to BriefDetail.""" return BriefDetail( id=row["id"], title=row["title"], status=row["status"], created_by=row["created_by"], created_at=_parse_dt(row["created_at"]), updated_at=_parse_dt(row["updated_at"]), struggle_score=row["struggle_score"], struggle_angle=row["struggle_angle"], origin_story=row["origin_story"], attempts=_json_to_attempts(row["attempts_json"]), the_moment=row["the_moment"], the_fix=row["the_fix"], reflection=row["reflection"], voice_checklist=_json_to_voice(row["voice_checklist_json"]), style_reference=row["style_reference"], target_length=row["target_length"], approved_at=_parse_dt(row["approved_at"]) if row["approved_at"] else None, approved_by=row["approved_by"], generation_job_id=row["generation_job_id"], content_output=row["content_output"], content_html=row["content_html"], ) def _parse_dt(value) -> Optional[datetime]: """Parse ISO datetime string.""" if not value: return None if isinstance(value, datetime): return value # Handle both 'Z' suffix and '+00:00' value = value.replace("Z", "+00:00") try: return datetime.fromisoformat(value) except ValueError: return None