""" Tiered Content Generation Pipeline - 5 Stage Orchestration (V2 Struggle-First). Stage 1: Strategy → Llama 3.1 8B (Gaming PC) → Find the struggle Stage 2: Structure → Qwen 2.5 7B (Gaming PC) → Validate narrative Stage 3: Draft → Phi-4 14B (Gaming PC) → Full struggle-first draft (~60-90s) Stage 4: SEO → Llama 3.1 8B (Gaming PC) → Excerpt, tags, meta Stage 5: Compliance → Python (local) → Strip banned words, flag dates/names Total runtime: ~90 seconds """ import asyncio import json import re import time import uuid from datetime import datetime from typing import Dict, Optional, Any, Callable from .models import ( PipelineStage, JobStatus, StruggleFirstBrief, StructureOutput, DraftOutput, SEOOutput, ComplianceReport, PipelineResult, ContentType ) from .compliance_filter import ComplianceFilter, SOVEREIGN_STACK_CONTEXT # Import v2 struggle-first prompt builder try: from blog.generation.prompts import build_struggle_first_prompt V2_PROMPTS_AVAILABLE = True except ImportError as e: V2_PROMPTS_AVAILABLE = False import logging logging.getLogger(__name__).warning(f"V2 struggle-first prompts not available: {e}") # --- Configuration --- # Gaming PC Ollama endpoint via Tailscale GAMING_PC_HOST = "matt-pc.tail864e81.ts.net" OLLAMA_URL = f"http://{GAMING_PC_HOST}:11434/api/generate" MODELS = { "strategy": "llama3.1:8b", "structure": "qwen2.5-coder:7b", "draft": "phi4:14b", "seo": "llama3.1:8b", } # Stage weights for progress calculation STAGE_WEIGHTS = { PipelineStage.QUEUED: 0, PipelineStage.STRATEGY: 10, # ~10s PipelineStage.STRUCTURE: 10, # ~15s PipelineStage.DRAFT: 60, # ~90s (bottleneck) PipelineStage.SEO: 15, # ~10s PipelineStage.COMPLIANCE: 5, # ~1s PipelineStage.COMPLETED: 0, } # --- System Prompts (V2 Struggle-First) --- STRATEGY_PROMPT_V2 = """You are a content strategist helping a technical blogger find the real story behind a topic. {grounding_context} Topic: {topic} Content Type: {content_type} {outline_section} Your job: Identify the human struggle, not the solution. Find the moment where something broke, failed, or surprised. Generate a STRUGGLE-FIRST content brief with these fields: 1. struggle_angle: What specifically broke or went wrong? (Not "how to fix X" but "the night X broke") 2. origin_story: Why were you even trying this? What was the setup? (2-3 sentences) 3. attempts: List 2-3 failed attempts with why each failed: - attempt: What you tried - why_failed: Why it didn't work 4. the_moment: The realization or breaking point - when did you understand the real problem? 5. the_fix: What actually worked, with explicit caveats/tradeoffs 6. reflection: What you'd do differently next time (honest, not preachy) 7. target_length: Word count (800-1500) Return ONLY JSON: {{ "struggle_angle": "...", "origin_story": "...", "attempts": [ {{"attempt": "...", "why_failed": "..."}}, {{"attempt": "...", "why_failed": "..."}} ], "the_moment": "...", "the_fix": "...", "reflection": "...", "target_length": 1200 }} Rules: - Use ONLY tools from the grounding context above (Radicale, Tailscale, Ollama, etc.) - NEVER invent: teams, startups, e-commerce platforms, millions of users - Be specific: titanium-butler, Gaming PC, model names, error messages - Use generic family references ("my spouse", "the kids") - NEVER real names """ STRUCTURE_PROMPT_V2 = """Validate this struggle-first brief before we spend 90 minutes generating. {SOVEREIGN_STACK_CONTEXT} Brief: - Struggle: {struggle_angle} - Origin: {origin_story} - Attempts: {attempts} - Moment: {the_moment} - Fix: {the_fix} Evaluate: 1. Is the struggle specific and grounded? (Not generic) 2. Are the failed attempts concrete? (Not "I tried a few things") 3. Is the moment of realization clear? 4. Does the fix have explicit caveats? 5. Is the reflection honest, not preachy? Return ONLY JSON: {{ "validated": true/false, "concerns": ["..."], "suggested_structure": ["section1", "section2", ...] }} """ DRAFT_PROMPT = """You are a technical writer with a sovereign, pragmatic voice. {SOVEREIGN_STACK_CONTEXT} Title: {title} Angle: {angle} Target: {target_audience} Tone: {tone_notes} Structure to follow: {structure} Rules: - Start with the human moment, not the solution - Admit wrong turns before revealing fixes - Use specific versions, error messages, timestamps - End with actionable takeaways - NEVER use corporate buzzwords (delve, tapestry, leverage, holistic, etc.) - NEVER invent specific dates, times, or days of the week (no "Last Tuesday", "March 15th", "7:45 PM") - Write 800-1200 words - Be the competent engineer talking to another engineer Write the full blog post now. """ SEO_PROMPT = """Given this blog post content, generate SEO metadata. Content: {content} Return ONLY JSON: {{ "excerpt": "1-2 sentence summary", "tags": ["tag1", "tag2", "tag3", "tag4", "tag5"], "meta_description": "under 160 characters", "keywords": ["keyword1", "keyword2"] }} Rules: - Excerpt should hook a reader - Tags are lowercase, hyphenated if multi-word - Meta description is search-friendly """ # --- Ollama Client --- async def check_gaming_pc_available() -> dict: """Check if Gaming PC Ollama is reachable via Tailscale.""" import httpx import subprocess try: # First check if host is reachable ping = subprocess.run( ["ping", "-c", "1", "-W", "2", GAMING_PC_HOST], capture_output=True ) if ping.returncode != 0: return {"available": False, "reason": "Host unreachable via Tailscale"} # Then check Ollama async with httpx.AsyncClient() as client: response = await client.get( f"http://{GAMING_PC_HOST}:11434", timeout=5.0 ) if response.status_code == 200: return {"available": True, "reason": "Ollama responding"} return {"available": False, "reason": f"Ollama returned {response.status_code}"} except Exception as e: return {"available": False, "reason": str(e)} async def generate_ollama( model: str, prompt: str, temperature: float = 0.7, max_tokens: int = 4000, timeout: float = 300.0, json_mode: bool = False ) -> str: """Generate text using Ollama on Gaming PC.""" import httpx payload = { "model": model, "prompt": prompt, "stream": False, "options": { "temperature": temperature, "num_predict": max_tokens, "top_p": 0.9, } } if json_mode: payload["format"] = "json" async with httpx.AsyncClient() as client: response = await client.post( OLLAMA_URL, json=payload, timeout=timeout ) response.raise_for_status() data = response.json() return data.get("response", "") def extract_json(text: str) -> Optional[dict]: """Extract JSON from model response (handles markdown wrapping).""" # Try to find JSON in code blocks code_block = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL) if code_block: text = code_block.group(1) # Try direct JSON extraction try: return json.loads(text) except json.JSONDecodeError: pass # Find JSON between braces start = text.find('{') end = text.rfind('}') if start >= 0 and end > start: try: return json.loads(text[start:end+1]) except json.JSONDecodeError: pass return None # --- Stage Implementations (V2 Struggle-First) --- async def stage_strategy( topic: str, content_type: str, outline: Optional[list], progress_callback: Optional[Callable] = None ) -> StruggleFirstBrief: """Stage 1: Generate struggle-first brief using Llama 3.1 8B.""" if progress_callback: progress_callback("Finding the struggle...", 10) outline_section = "" if outline: outline_section = "\nOutline points:\n" + "\n".join(f"- {o}" for o in outline) # Get grounding context from real projects try: from shared.grounding import get_grounding_context grounding = get_grounding_context(topic) except Exception: grounding = "No grounding context available." prompt = STRATEGY_PROMPT_V2.format( grounding_context=grounding, topic=topic, content_type=content_type, outline_section=outline_section ) response = await generate_ollama( model=MODELS["strategy"], prompt=prompt, max_tokens=1000, json_mode=True, timeout=30.0 ) data = extract_json(response) if not data: raise ValueError(f"Failed to parse strategy output: {response[:200]}") return StruggleFirstBrief( struggle_angle=data.get("struggle_angle", f"The time {topic} broke"), origin_story=data.get("origin_story", ""), attempts=data.get("attempts", []), the_moment=data.get("the_moment", ""), the_fix=data.get("the_fix", ""), reflection=data.get("reflection", ""), target_length=int(data.get("target_length", 1200)) ) async def stage_structure( brief: StruggleFirstBrief, progress_callback: Optional[Callable] = None ) -> StructureOutput: """Stage 2: Validate struggle-first brief using Qwen 2.5 7B.""" if progress_callback: progress_callback("Validating struggle narrative...", 20) attempts_json = json.dumps(brief.attempts) prompt = STRUCTURE_PROMPT_V2.format( SOVEREIGN_STACK_CONTEXT=SOVEREIGN_STACK_CONTEXT, struggle_angle=brief.struggle_angle, origin_story=brief.origin_story, attempts=attempts_json, the_moment=brief.the_moment, the_fix=brief.the_fix ) response = await generate_ollama( model=MODELS["structure"], prompt=prompt, max_tokens=600, json_mode=True, timeout=30.0 ) data = extract_json(response) if not data: raise ValueError(f"Failed to parse structure output: {response[:200]}") return StructureOutput( validated=data.get("validated", True), structure=data.get("suggested_structure", []), estimated_word_count=int(data.get("estimated_word_count") or 1000), concerns=data.get("concerns") ) DRAFT_PROMPT_V2_FALLBACK = """Write a first-person narrative blog post about this incident. {grounding_context} What broke: {{struggle_angle}} Origin story: {{origin_story}} Failed attempts: {{attempts}} The moment of realization: {{the_moment}} What worked: {{the_fix}} Reflection: {{reflection}} Writing Rules (MANDATORY): - [ ] First person only ("I", "my", "we") - NEVER "you should" - [ ] Specific timestamp or location in first paragraph (titanium-butler, Gaming PC, etc.) - [ ] Use ONLY tools from grounding context above - NEVER hallucinate AWS/GCP/Azure services - [ ] At least one internal monologue quote - [ ] "I thought... but..." pattern somewhere - [ ] Admit one thing you got wrong - [ ] Mention the cost: time, sleep, relationship friction - [ ] NO corporate buzzwords (delve, tapestry, leverage, holistic, etc.) - [ ] NEVER invent specific dates (no "Last Tuesday", "March 15th") - [ ] NEVER use real names (use "my spouse", "the kids" - NOT actual names) Target: {{target_length}} words Write the complete blog post now:""" async def stage_draft( brief: StruggleFirstBrief, structure: StructureOutput, progress_callback: Optional[Callable] = None ) -> DraftOutput: """Stage 3: Generate struggle-first draft using Phi-4 14B (~60-90s).""" if progress_callback: progress_callback("Drafting struggle narrative with Phi-4... (this takes ~90 seconds)", 30) # Get grounding context try: from shared.grounding import get_grounding_context grounding = get_grounding_context() except Exception: grounding = "No grounding context available." # Build prompt using v2 struggle-first builder if available brief_dict = { "struggle_angle": brief.struggle_angle, "origin_story": brief.origin_story, "attempts": brief.attempts, "the_moment": brief.the_moment, "the_fix": brief.the_fix, "reflection": brief.reflection, "target_length": brief.target_length } if V2_PROMPTS_AVAILABLE: prompt = build_struggle_first_prompt(brief_dict, style_reference=None) # Prepend grounding context prompt = grounding + "\n\n" + prompt else: # Fallback to inline template attempts_text = "\n".join( f"Attempt {i+1}: {a.get('attempt', '')}\nWhy it failed: {a.get('why_failed', '')}" for i, a in enumerate(brief.attempts) ) prompt = DRAFT_PROMPT_V2_FALLBACK.replace("{{struggle_angle}}", brief.struggle_angle) \ .replace("{{origin_story}}", brief.origin_story) \ .replace("{{attempts}}", attempts_text) \ .replace("{{the_moment}}", brief.the_moment) \ .replace("{{the_fix}}", brief.the_fix) \ .replace("{{reflection}}", brief.reflection) \ .replace("{{target_length}}", str(brief.target_length)) \ .replace("{grounding_context}", grounding) response = await generate_ollama( model=MODELS["draft"], prompt=prompt, temperature=0.7, max_tokens=4000, timeout=180.0 # 3 minutes for long generation ) content = response.strip() word_count = len(content.split()) reading_time = max(1, word_count // 200) # ~200 WPM if progress_callback: progress_callback("Draft complete", 90) return DraftOutput( content=content, word_count=word_count, reading_time_minutes=reading_time ) async def stage_seo( content: str, progress_callback: Optional[Callable] = None ) -> SEOOutput: """Stage 4: Generate SEO metadata using Llama 3.1 8B.""" if progress_callback: progress_callback("Generating SEO metadata...", 95) # Use first 2000 chars for SEO context excerpt_content = content[:2000] prompt = SEO_PROMPT.format(content=excerpt_content) response = await generate_ollama( model=MODELS["seo"], prompt=prompt, max_tokens=400, json_mode=True, timeout=30.0 ) data = extract_json(response) if not data: # Fallback return SEOOutput( excerpt=content[:150] + "...", tags=["homelab", "infrastructure"], meta_description=content[:160], keywords=["self-hosted"] ) return SEOOutput( excerpt=data.get("excerpt", content[:150] + "..."), tags=data.get("tags", []), meta_description=data.get("meta_description", content[:160]), keywords=data.get("keywords", []) ) def stage_compliance( content: str, progress_callback: Optional[Callable] = None ) -> tuple[str, ComplianceReport]: """Stage 5: Filter banned words, flag dates/names.""" if progress_callback: progress_callback("Running compliance checks...", 99) filter_instance = ComplianceFilter() report = filter_instance.process(content) return report.clean_text, ComplianceReport( replacements_made=report.replacements_made, warnings=[f"Found: '{w}'" for w in report.banned_found] if report.has_warnings else [], is_compliant=report.is_compliant, banned_words_found=report.banned_found, dates_flagged=report.dates_found, names_flagged=report.names_found ) # --- Pipeline Orchestrator --- class PipelineOrchestrator: """Manages job state and runs the tiered pipeline.""" def __init__(self): self.jobs: Dict[str, JobStatus] = {} self.results: Dict[str, PipelineResult] = {} self.cancelled: set = set() def create_job(self, topic: str, content_type: ContentType) -> str: """Create a new pipeline job and return job ID.""" job_id = f"gen_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" self.jobs[job_id] = JobStatus( job_id=job_id, status=PipelineStage.QUEUED, stage_number=0, total_stages=5, progress_percent=0, started_at=datetime.now() ) return job_id def get_job(self, job_id: str) -> Optional[JobStatus]: """Get current status of a job.""" return self.jobs.get(job_id) def cancel_job(self, job_id: str) -> bool: """Mark a job for cancellation.""" if job_id in self.jobs: self.cancelled.add(job_id) return True return False def _update_progress(self, job_id: str, stage: PipelineStage, message: str, percent: int): """Update job progress and calculate estimated time remaining.""" if job_id in self.jobs: job = self.jobs[job_id] job.status = stage job.stage_number = list(STAGE_WEIGHTS.keys()).index(stage) job.progress_percent = percent job.current_stage_name = message # Calculate estimated seconds remaining based on elapsed time and progress if job.started_at and percent > 0: elapsed = (datetime.now() - job.started_at).total_seconds() if percent < 100: # Estimate: if X% took elapsed seconds, (100-X)% will take... estimated_total = elapsed / (percent / 100) remaining = int(estimated_total - elapsed) job.estimated_seconds_remaining = max(5, remaining) # At least 5 seconds else: job.estimated_seconds_remaining = 0 async def run_pipeline( self, job_id: str, topic: str, content_type: ContentType, outline: Optional[list] = None, context: Optional[str] = None ) -> PipelineResult: """Run the full 5-stage v2 struggle-first pipeline.""" partial_outputs = {} def progress_cb(message: str, percent: int): self._update_progress(job_id, PipelineStage(self.jobs[job_id].status), message, percent) try: # Stage 1: Strategy (V2 Struggle-First) self._update_progress(job_id, PipelineStage.STRATEGY, "Finding the struggle...", 5) brief = await stage_strategy(topic, content_type.value, outline, progress_cb) partial_outputs["brief"] = brief.dict() if job_id in self.cancelled: raise asyncio.CancelledError() # Stage 2: Structure (V2) self._update_progress(job_id, PipelineStage.STRUCTURE, "Validating struggle narrative...", 15) structure = await stage_structure(brief, progress_cb) partial_outputs["structure"] = structure.dict() if not structure.validated and structure.concerns: # Continue with warnings pass if job_id in self.cancelled: raise asyncio.CancelledError() # Stage 3: Draft (V2 Struggle-First) self._update_progress(job_id, PipelineStage.DRAFT, "Drafting struggle narrative...", 30) draft = await stage_draft(brief, structure, progress_cb) partial_outputs["draft"] = {"word_count": draft.word_count, "reading_time": draft.reading_time_minutes} if job_id in self.cancelled: raise asyncio.CancelledError() # Stage 4: SEO self._update_progress(job_id, PipelineStage.SEO, "Generating SEO metadata...", 95) seo = await stage_seo(draft.content, progress_cb) partial_outputs["seo"] = seo.dict() # Stage 5: Compliance self._update_progress(job_id, PipelineStage.COMPLIANCE, "Running compliance checks...", 99) clean_content, compliance = stage_compliance(draft.content, progress_cb) partial_outputs["compliance"] = compliance.dict() # Finalize - use struggle_angle as title if no better option title = brief.struggle_angle if not brief.struggle_angle.startswith("The time") else f"The Night {topic} Broke" result = PipelineResult( title=title, content=clean_content, excerpt=seo.excerpt, tags=seo.tags, meta_description=seo.meta_description, word_count=draft.word_count, reading_time_minutes=draft.reading_time_minutes, compliance=compliance ) self.results[job_id] = result self._update_progress(job_id, PipelineStage.COMPLETED, "Complete", 100) self.jobs[job_id].completed_at = datetime.now() return result except asyncio.CancelledError: self._update_progress(job_id, PipelineStage.CANCELLED, "Cancelled", 0) raise except Exception as e: self.jobs[job_id].error = str(e) self._update_progress(job_id, PipelineStage.FAILED, f"Failed: {e}", 0) raise # Global orchestrator instance _orchestrator: Optional[PipelineOrchestrator] = None def get_orchestrator() -> PipelineOrchestrator: """Get or create the global orchestrator.""" global _orchestrator if _orchestrator is None: _orchestrator = PipelineOrchestrator() return _orchestrator