📄 pipeline.py 21,714 bytes Apr 22, 2026 📋 Raw

"""
Tiered Content Generation Pipeline - 5 Stage Orchestration (V2 Struggle-First).

Stage 1: Strategy → Llama 3.1 8B (Gaming PC) → Find the struggle
Stage 2: Structure → Qwen 2.5 7B (Gaming PC) → Validate narrative
Stage 3: Draft → Phi-4 14B (Gaming PC) → Full struggle-first draft (~60-90s)
Stage 4: SEO → Llama 3.1 8B (Gaming PC) → Excerpt, tags, meta
Stage 5: Compliance → Python (local) → Strip banned words, flag dates/names

Total runtime: ~90 seconds
"""

import asyncio
import json
import re
import time
import uuid
from datetime import datetime
from typing import Dict, Optional, Any, Callable

from .models import (
PipelineStage, JobStatus, StruggleFirstBrief, StructureOutput,
DraftOutput, SEOOutput, ComplianceReport, PipelineResult,
ContentType
)
from .compliance_filter import ComplianceFilter, SOVEREIGN_STACK_CONTEXT

Import v2 struggle-first prompt builder

try:
from blog.generation.prompts import build_struggle_first_prompt
V2_PROMPTS_AVAILABLE = True
except ImportError as e:
V2_PROMPTS_AVAILABLE = False
import logging
logging.getLogger(name).warning(f"V2 struggle-first prompts not available: {e}")

--- Configuration ---

Gaming PC Ollama endpoint via Tailscale

GAMING_PC_HOST = "matt-pc.tail864e81.ts.net"
OLLAMA_URL = f"http://{GAMING_PC_HOST}:11434/api/generate"

MODELS = {
"strategy": "llama3.1:8b",
"structure": "qwen2.5-coder:7b",
"draft": "phi4:14b",
"seo": "llama3.1:8b",
}

Stage weights for progress calculation

STAGE_WEIGHTS = {
PipelineStage.QUEUED: 0,
PipelineStage.STRATEGY: 10, # ~10s
PipelineStage.STRUCTURE: 10, # ~15s
PipelineStage.DRAFT: 60, # ~90s (bottleneck)
PipelineStage.SEO: 15, # ~10s
PipelineStage.COMPLIANCE: 5, # ~1s
PipelineStage.COMPLETED: 0,
}

--- System Prompts (V2 Struggle-First) ---

STRATEGY_PROMPT_V2 = """You are a content strategist helping a technical blogger find the real story behind a topic.

{grounding_context}

Topic: {topic}
Content Type: {content_type}
{outline_section}

Your job: Identify the human struggle, not the solution. Find the moment where something broke, failed, or surprised.

Generate a STRUGGLE-FIRST content brief with these fields:

  1. struggle_angle: What specifically broke or went wrong? (Not "how to fix X" but "the night X broke")
  2. origin_story: Why were you even trying this? What was the setup? (2-3 sentences)
  3. attempts: List 2-3 failed attempts with why each failed:
    - attempt: What you tried
    - why_failed: Why it didn't work
  4. the_moment: The realization or breaking point - when did you understand the real problem?
  5. the_fix: What actually worked, with explicit caveats/tradeoffs
  6. reflection: What you'd do differently next time (honest, not preachy)
  7. target_length: Word count (800-1500)

Return ONLY JSON:
{{
"struggle_angle": "...",
"origin_story": "...",
"attempts": [
{{"attempt": "...", "why_failed": "..."}},
{{"attempt": "...", "why_failed": "..."}}
],
"the_moment": "...",
"the_fix": "...",
"reflection": "...",
"target_length": 1200
}}

Rules:
- Use ONLY tools from the grounding context above (Radicale, Tailscale, Ollama, etc.)
- NEVER invent: teams, startups, e-commerce platforms, millions of users
- Be specific: titanium-butler, Gaming PC, model names, error messages
- Use generic family references ("my spouse", "the kids") - NEVER real names
"""

STRUCTURE_PROMPT_V2 = """Validate this struggle-first brief before we spend 90 minutes generating.

{SOVEREIGN_STACK_CONTEXT}

Brief:
- Struggle: {struggle_angle}
- Origin: {origin_story}
- Attempts: {attempts}
- Moment: {the_moment}
- Fix: {the_fix}

Evaluate:
1. Is the struggle specific and grounded? (Not generic)
2. Are the failed attempts concrete? (Not "I tried a few things")
3. Is the moment of realization clear?
4. Does the fix have explicit caveats?
5. Is the reflection honest, not preachy?

Return ONLY JSON:
{{
"validated": true/false,
"concerns": ["..."],
"suggested_structure": ["section1", "section2", ...]
}}
"""

DRAFT_PROMPT = """You are a technical writer with a sovereign, pragmatic voice.

{SOVEREIGN_STACK_CONTEXT}

Title: {title}
Angle: {angle}
Target: {target_audience}
Tone: {tone_notes}

Structure to follow:
{structure}

Rules:
- Start with the human moment, not the solution
- Admit wrong turns before revealing fixes
- Use specific versions, error messages, timestamps
- End with actionable takeaways
- NEVER use corporate buzzwords (delve, tapestry, leverage, holistic, etc.)
- NEVER invent specific dates, times, or days of the week (no "Last Tuesday", "March 15th", "7:45 PM")
- Write 800-1200 words
- Be the competent engineer talking to another engineer

Write the full blog post now.
"""

SEO_PROMPT = """Given this blog post content, generate SEO metadata.

Content:
{content}

Return ONLY JSON:
{{
"excerpt": "1-2 sentence summary",
"tags": ["tag1", "tag2", "tag3", "tag4", "tag5"],
"meta_description": "under 160 characters",
"keywords": ["keyword1", "keyword2"]
}}

Rules:
- Excerpt should hook a reader
- Tags are lowercase, hyphenated if multi-word
- Meta description is search-friendly
"""

--- Ollama Client ---

async def check_gaming_pc_available() -> dict:
"""Check if Gaming PC Ollama is reachable via Tailscale."""
import httpx
import subprocess

try:
    # First check if host is reachable
    ping = subprocess.run(
        ["ping", "-c", "1", "-W", "2", GAMING_PC_HOST],
        capture_output=True
    )
    if ping.returncode != 0:
        return {"available": False, "reason": "Host unreachable via Tailscale"}

    # Then check Ollama
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"http://{GAMING_PC_HOST}:11434",
            timeout=5.0
        )
        if response.status_code == 200:
            return {"available": True, "reason": "Ollama responding"}
        return {"available": False, "reason": f"Ollama returned {response.status_code}"}
except Exception as e:
    return {"available": False, "reason": str(e)}

async def generate_ollama(
model: str,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 4000,
timeout: float = 300.0,
json_mode: bool = False
) -> str:
"""Generate text using Ollama on Gaming PC."""
import httpx

payload = {
    "model": model,
    "prompt": prompt,
    "stream": False,
    "options": {
        "temperature": temperature,
        "num_predict": max_tokens,
        "top_p": 0.9,
    }
}

if json_mode:
    payload["format"] = "json"

async with httpx.AsyncClient() as client:
    response = await client.post(
        OLLAMA_URL,
        json=payload,
        timeout=timeout
    )
    response.raise_for_status()
    data = response.json()
    return data.get("response", "")

def extract_json(text: str) -> Optional[dict]:
"""Extract JSON from model response (handles markdown wrapping)."""
# Try to find JSON in code blocks
code_block = re.search(r'(?:json)?\s*(\{.*?\})\s*', text, re.DOTALL)
if code_block:
text = code_block.group(1)

# Try direct JSON extraction
try:
    return json.loads(text)
except json.JSONDecodeError:
    pass

# Find JSON between braces
start = text.find('{')
end = text.rfind('}')
if start >= 0 and end > start:
    try:
        return json.loads(text[start:end+1])
    except json.JSONDecodeError:
        pass

return None

--- Stage Implementations (V2 Struggle-First) ---

async def stage_strategy(
topic: str,
content_type: str,
outline: Optional[list],
progress_callback: Optional[Callable] = None
) -> StruggleFirstBrief:
"""Stage 1: Generate struggle-first brief using Llama 3.1 8B."""

if progress_callback:
    progress_callback("Finding the struggle...", 10)

outline_section = ""
if outline:
    outline_section = "\nOutline points:\n" + "\n".join(f"- {o}" for o in outline)

# Get grounding context from real projects
try:
    from shared.grounding import get_grounding_context
    grounding = get_grounding_context(topic)
except Exception:
    grounding = "No grounding context available."

prompt = STRATEGY_PROMPT_V2.format(
    grounding_context=grounding,
    topic=topic,
    content_type=content_type,
    outline_section=outline_section
)

response = await generate_ollama(
    model=MODELS["strategy"],
    prompt=prompt,
    max_tokens=1000,
    json_mode=True,
    timeout=30.0
)

data = extract_json(response)
if not data:
    raise ValueError(f"Failed to parse strategy output: {response[:200]}")

return StruggleFirstBrief(
    struggle_angle=data.get("struggle_angle", f"The time {topic} broke"),
    origin_story=data.get("origin_story", ""),
    attempts=data.get("attempts", []),
    the_moment=data.get("the_moment", ""),
    the_fix=data.get("the_fix", ""),
    reflection=data.get("reflection", ""),
    target_length=int(data.get("target_length", 1200))
)

async def stage_structure(
brief: StruggleFirstBrief,
progress_callback: Optional[Callable] = None
) -> StructureOutput:
"""Stage 2: Validate struggle-first brief using Qwen 2.5 7B."""

if progress_callback:
    progress_callback("Validating struggle narrative...", 20)

attempts_json = json.dumps(brief.attempts)

prompt = STRUCTURE_PROMPT_V2.format(
    SOVEREIGN_STACK_CONTEXT=SOVEREIGN_STACK_CONTEXT,
    struggle_angle=brief.struggle_angle,
    origin_story=brief.origin_story,
    attempts=attempts_json,
    the_moment=brief.the_moment,
    the_fix=brief.the_fix
)

response = await generate_ollama(
    model=MODELS["structure"],
    prompt=prompt,
    max_tokens=600,
    json_mode=True,
    timeout=30.0
)

data = extract_json(response)
if not data:
    raise ValueError(f"Failed to parse structure output: {response[:200]}")

return StructureOutput(
    validated=data.get("validated", True),
    structure=data.get("suggested_structure", []),
    estimated_word_count=int(data.get("estimated_word_count") or 1000),
    concerns=data.get("concerns")
)

DRAFT_PROMPT_V2_FALLBACK = """Write a first-person narrative blog post about this incident.

{grounding_context}

What broke: {{struggle_angle}}

Origin story: {{origin_story}}

Failed attempts:
{{attempts}}

The moment of realization: {{the_moment}}

What worked: {{the_fix}}

Reflection: {{reflection}}

Writing Rules (MANDATORY):
- [ ] First person only ("I", "my", "we") - NEVER "you should"
- [ ] Specific timestamp or location in first paragraph (titanium-butler, Gaming PC, etc.)
- [ ] Use ONLY tools from grounding context above - NEVER hallucinate AWS/GCP/Azure services
- [ ] At least one internal monologue quote
- [ ] "I thought... but..." pattern somewhere
- [ ] Admit one thing you got wrong
- [ ] Mention the cost: time, sleep, relationship friction
- [ ] NO corporate buzzwords (delve, tapestry, leverage, holistic, etc.)
- [ ] NEVER invent specific dates (no "Last Tuesday", "March 15th")
- [ ] NEVER use real names (use "my spouse", "the kids" - NOT actual names)

Target: {{target_length}} words

Write the complete blog post now:"""

async def stage_draft(
brief: StruggleFirstBrief,
structure: StructureOutput,
progress_callback: Optional[Callable] = None
) -> DraftOutput:
"""Stage 3: Generate struggle-first draft using Phi-4 14B (~60-90s)."""

if progress_callback:
    progress_callback("Drafting struggle narrative with Phi-4... (this takes ~90 seconds)", 30)

# Get grounding context
try:
    from shared.grounding import get_grounding_context
    grounding = get_grounding_context()
except Exception:
    grounding = "No grounding context available."

# Build prompt using v2 struggle-first builder if available
brief_dict = {
    "struggle_angle": brief.struggle_angle,
    "origin_story": brief.origin_story,
    "attempts": brief.attempts,
    "the_moment": brief.the_moment,
    "the_fix": brief.the_fix,
    "reflection": brief.reflection,
    "target_length": brief.target_length
}

if V2_PROMPTS_AVAILABLE:
    prompt = build_struggle_first_prompt(brief_dict, style_reference=None)
    # Prepend grounding context
    prompt = grounding + "\n\n" + prompt
else:
    # Fallback to inline template
    attempts_text = "\n".join(
        f"Attempt {i+1}: {a.get('attempt', '')}\nWhy it failed: {a.get('why_failed', '')}"
        for i, a in enumerate(brief.attempts)
    )
    prompt = DRAFT_PROMPT_V2_FALLBACK.replace("{{struggle_angle}}", brief.struggle_angle) \
        .replace("{{origin_story}}", brief.origin_story) \
        .replace("{{attempts}}", attempts_text) \
        .replace("{{the_moment}}", brief.the_moment) \
        .replace("{{the_fix}}", brief.the_fix) \
        .replace("{{reflection}}", brief.reflection) \
        .replace("{{target_length}}", str(brief.target_length)) \
        .replace("{grounding_context}", grounding)

response = await generate_ollama(
    model=MODELS["draft"],
    prompt=prompt,
    temperature=0.7,
    max_tokens=4000,
    timeout=180.0  # 3 minutes for long generation
)

content = response.strip()
word_count = len(content.split())
reading_time = max(1, word_count // 200)  # ~200 WPM

if progress_callback:
    progress_callback("Draft complete", 90)

return DraftOutput(
    content=content,
    word_count=word_count,
    reading_time_minutes=reading_time
)

async def stage_seo(
content: str,
progress_callback: Optional[Callable] = None
) -> SEOOutput:
"""Stage 4: Generate SEO metadata using Llama 3.1 8B."""

if progress_callback:
    progress_callback("Generating SEO metadata...", 95)

# Use first 2000 chars for SEO context
excerpt_content = content[:2000]

prompt = SEO_PROMPT.format(content=excerpt_content)

response = await generate_ollama(
    model=MODELS["seo"],
    prompt=prompt,
    max_tokens=400,
    json_mode=True,
    timeout=30.0
)

data = extract_json(response)
if not data:
    # Fallback
    return SEOOutput(
        excerpt=content[:150] + "...",
        tags=["homelab", "infrastructure"],
        meta_description=content[:160],
        keywords=["self-hosted"]
    )

return SEOOutput(
    excerpt=data.get("excerpt", content[:150] + "..."),
    tags=data.get("tags", []),
    meta_description=data.get("meta_description", content[:160]),
    keywords=data.get("keywords", [])
)

def stage_compliance(
content: str,
progress_callback: Optional[Callable] = None
) -> tuple[str, ComplianceReport]:
"""Stage 5: Filter banned words, flag dates/names."""

if progress_callback:
    progress_callback("Running compliance checks...", 99)

filter_instance = ComplianceFilter()
report = filter_instance.process(content)

return report.clean_text, ComplianceReport(
    replacements_made=report.replacements_made,
    warnings=[f"Found: '{w}'" for w in report.banned_found] if report.has_warnings else [],
    is_compliant=report.is_compliant,
    banned_words_found=report.banned_found,
    dates_flagged=report.dates_found,
    names_flagged=report.names_found
)

--- Pipeline Orchestrator ---

class PipelineOrchestrator:
"""Manages job state and runs the tiered pipeline."""

def __init__(self):
    self.jobs: Dict[str, JobStatus] = {}
    self.results: Dict[str, PipelineResult] = {}
    self.cancelled: set = set()

def create_job(self, topic: str, content_type: ContentType) -> str:
    """Create a new pipeline job and return job ID."""
    job_id = f"gen_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"

    self.jobs[job_id] = JobStatus(
        job_id=job_id,
        status=PipelineStage.QUEUED,
        stage_number=0,
        total_stages=5,
        progress_percent=0,
        started_at=datetime.now()
    )

    return job_id

def get_job(self, job_id: str) -> Optional[JobStatus]:
    """Get current status of a job."""
    return self.jobs.get(job_id)

def cancel_job(self, job_id: str) -> bool:
    """Mark a job for cancellation."""
    if job_id in self.jobs:
        self.cancelled.add(job_id)
        return True
    return False

def _update_progress(self, job_id: str, stage: PipelineStage, message: str, percent: int):
    """Update job progress and calculate estimated time remaining."""
    if job_id in self.jobs:
        job = self.jobs[job_id]
        job.status = stage
        job.stage_number = list(STAGE_WEIGHTS.keys()).index(stage)
        job.progress_percent = percent
        job.current_stage_name = message

        # Calculate estimated seconds remaining based on elapsed time and progress
        if job.started_at and percent > 0:
            elapsed = (datetime.now() - job.started_at).total_seconds()
            if percent < 100:
                # Estimate: if X% took elapsed seconds, (100-X)% will take...
                estimated_total = elapsed / (percent / 100)
                remaining = int(estimated_total - elapsed)
                job.estimated_seconds_remaining = max(5, remaining)  # At least 5 seconds
            else:
                job.estimated_seconds_remaining = 0

async def run_pipeline(
    self,
    job_id: str,
    topic: str,
    content_type: ContentType,
    outline: Optional[list] = None,
    context: Optional[str] = None
) -> PipelineResult:
    """Run the full 5-stage v2 struggle-first pipeline."""

    partial_outputs = {}

    def progress_cb(message: str, percent: int):
        self._update_progress(job_id, PipelineStage(self.jobs[job_id].status), message, percent)

    try:
        # Stage 1: Strategy (V2 Struggle-First)
        self._update_progress(job_id, PipelineStage.STRATEGY, "Finding the struggle...", 5)
        brief = await stage_strategy(topic, content_type.value, outline, progress_cb)
        partial_outputs["brief"] = brief.dict()

        if job_id in self.cancelled:
            raise asyncio.CancelledError()

        # Stage 2: Structure (V2)
        self._update_progress(job_id, PipelineStage.STRUCTURE, "Validating struggle narrative...", 15)
        structure = await stage_structure(brief, progress_cb)
        partial_outputs["structure"] = structure.dict()

        if not structure.validated and structure.concerns:
            # Continue with warnings
            pass

        if job_id in self.cancelled:
            raise asyncio.CancelledError()

        # Stage 3: Draft (V2 Struggle-First)
        self._update_progress(job_id, PipelineStage.DRAFT, "Drafting struggle narrative...", 30)
        draft = await stage_draft(brief, structure, progress_cb)
        partial_outputs["draft"] = {"word_count": draft.word_count, "reading_time": draft.reading_time_minutes}

        if job_id in self.cancelled:
            raise asyncio.CancelledError()

        # Stage 4: SEO
        self._update_progress(job_id, PipelineStage.SEO, "Generating SEO metadata...", 95)
        seo = await stage_seo(draft.content, progress_cb)
        partial_outputs["seo"] = seo.dict()

        # Stage 5: Compliance
        self._update_progress(job_id, PipelineStage.COMPLIANCE, "Running compliance checks...", 99)
        clean_content, compliance = stage_compliance(draft.content, progress_cb)
        partial_outputs["compliance"] = compliance.dict()

        # Finalize - use struggle_angle as title if no better option
        title = brief.struggle_angle if not brief.struggle_angle.startswith("The time") else f"The Night {topic} Broke"

        result = PipelineResult(
            title=title,
            content=clean_content,
            excerpt=seo.excerpt,
            tags=seo.tags,
            meta_description=seo.meta_description,
            word_count=draft.word_count,
            reading_time_minutes=draft.reading_time_minutes,
            compliance=compliance
        )

        self.results[job_id] = result
        self._update_progress(job_id, PipelineStage.COMPLETED, "Complete", 100)
        self.jobs[job_id].completed_at = datetime.now()

        return result

    except asyncio.CancelledError:
        self._update_progress(job_id, PipelineStage.CANCELLED, "Cancelled", 0)
        raise
    except Exception as e:
        self.jobs[job_id].error = str(e)
        self._update_progress(job_id, PipelineStage.FAILED, f"Failed: {e}", 0)
        raise

Global orchestrator instance

_orchestrator: Optional[PipelineOrchestrator] = None

def get_orchestrator() -> PipelineOrchestrator:
"""Get or create the global orchestrator."""
global _orchestrator
if _orchestrator is None:
_orchestrator = PipelineOrchestrator()
return _orchestrator