"""
Tiered Content Generation Pipeline - 5 Stage Orchestration (V2 Struggle-First).
Stage 1: Strategy → Llama 3.1 8B (Gaming PC) → Find the struggle
Stage 2: Structure → Qwen 2.5 7B (Gaming PC) → Validate narrative
Stage 3: Draft → Phi-4 14B (Gaming PC) → Full struggle-first draft (~60-90s)
Stage 4: SEO → Llama 3.1 8B (Gaming PC) → Excerpt, tags, meta
Stage 5: Compliance → Python (local) → Strip banned words, flag dates/names
Total runtime: ~90 seconds
"""
import asyncio
import json
import re
import time
import uuid
from datetime import datetime
from typing import Dict, Optional, Any, Callable
from .models import (
PipelineStage, JobStatus, StruggleFirstBrief, StructureOutput,
DraftOutput, SEOOutput, ComplianceReport, PipelineResult,
ContentType
)
from .compliance_filter import ComplianceFilter, SOVEREIGN_STACK_CONTEXT
Import v2 struggle-first prompt builder
try:
from blog.generation.prompts import build_struggle_first_prompt
V2_PROMPTS_AVAILABLE = True
except ImportError as e:
V2_PROMPTS_AVAILABLE = False
import logging
logging.getLogger(name).warning(f"V2 struggle-first prompts not available: {e}")
--- Configuration ---
Gaming PC Ollama endpoint via Tailscale
GAMING_PC_HOST = "matt-pc.tail864e81.ts.net"
OLLAMA_URL = f"http://{GAMING_PC_HOST}:11434/api/generate"
MODELS = {
"strategy": "llama3.1:8b",
"structure": "qwen2.5-coder:7b",
"draft": "phi4:14b",
"seo": "llama3.1:8b",
}
Stage weights for progress calculation
STAGE_WEIGHTS = {
PipelineStage.QUEUED: 0,
PipelineStage.STRATEGY: 10, # ~10s
PipelineStage.STRUCTURE: 10, # ~15s
PipelineStage.DRAFT: 60, # ~90s (bottleneck)
PipelineStage.SEO: 15, # ~10s
PipelineStage.COMPLIANCE: 5, # ~1s
PipelineStage.COMPLETED: 0,
}
--- System Prompts (V2 Struggle-First) ---
STRATEGY_PROMPT_V2 = """You are a content strategist helping a technical blogger find the real story behind a topic.
{grounding_context}
Topic: {topic}
Content Type: {content_type}
{outline_section}
Your job: Identify the human struggle, not the solution. Find the moment where something broke, failed, or surprised.
Generate a STRUGGLE-FIRST content brief with these fields:
- struggle_angle: What specifically broke or went wrong? (Not "how to fix X" but "the night X broke")
- origin_story: Why were you even trying this? What was the setup? (2-3 sentences)
- attempts: List 2-3 failed attempts with why each failed:
- attempt: What you tried
- why_failed: Why it didn't work - the_moment: The realization or breaking point - when did you understand the real problem?
- the_fix: What actually worked, with explicit caveats/tradeoffs
- reflection: What you'd do differently next time (honest, not preachy)
- target_length: Word count (800-1500)
Return ONLY JSON:
{{
"struggle_angle": "...",
"origin_story": "...",
"attempts": [
{{"attempt": "...", "why_failed": "..."}},
{{"attempt": "...", "why_failed": "..."}}
],
"the_moment": "...",
"the_fix": "...",
"reflection": "...",
"target_length": 1200
}}
Rules:
- Use ONLY tools from the grounding context above (Radicale, Tailscale, Ollama, etc.)
- NEVER invent: teams, startups, e-commerce platforms, millions of users
- Be specific: titanium-butler, Gaming PC, model names, error messages
- Use generic family references ("my spouse", "the kids") - NEVER real names
"""
STRUCTURE_PROMPT_V2 = """Validate this struggle-first brief before we spend 90 minutes generating.
{SOVEREIGN_STACK_CONTEXT}
Brief:
- Struggle: {struggle_angle}
- Origin: {origin_story}
- Attempts: {attempts}
- Moment: {the_moment}
- Fix: {the_fix}
Evaluate:
1. Is the struggle specific and grounded? (Not generic)
2. Are the failed attempts concrete? (Not "I tried a few things")
3. Is the moment of realization clear?
4. Does the fix have explicit caveats?
5. Is the reflection honest, not preachy?
Return ONLY JSON:
{{
"validated": true/false,
"concerns": ["..."],
"suggested_structure": ["section1", "section2", ...]
}}
"""
DRAFT_PROMPT = """You are a technical writer with a sovereign, pragmatic voice.
{SOVEREIGN_STACK_CONTEXT}
Title: {title}
Angle: {angle}
Target: {target_audience}
Tone: {tone_notes}
Structure to follow:
{structure}
Rules:
- Start with the human moment, not the solution
- Admit wrong turns before revealing fixes
- Use specific versions, error messages, timestamps
- End with actionable takeaways
- NEVER use corporate buzzwords (delve, tapestry, leverage, holistic, etc.)
- NEVER invent specific dates, times, or days of the week (no "Last Tuesday", "March 15th", "7:45 PM")
- Write 800-1200 words
- Be the competent engineer talking to another engineer
Write the full blog post now.
"""
SEO_PROMPT = """Given this blog post content, generate SEO metadata.
Content:
{content}
Return ONLY JSON:
{{
"excerpt": "1-2 sentence summary",
"tags": ["tag1", "tag2", "tag3", "tag4", "tag5"],
"meta_description": "under 160 characters",
"keywords": ["keyword1", "keyword2"]
}}
Rules:
- Excerpt should hook a reader
- Tags are lowercase, hyphenated if multi-word
- Meta description is search-friendly
"""
--- Ollama Client ---
async def check_gaming_pc_available() -> dict:
"""Check if Gaming PC Ollama is reachable via Tailscale."""
import httpx
import subprocess
try:
# First check if host is reachable
ping = subprocess.run(
["ping", "-c", "1", "-W", "2", GAMING_PC_HOST],
capture_output=True
)
if ping.returncode != 0:
return {"available": False, "reason": "Host unreachable via Tailscale"}
# Then check Ollama
async with httpx.AsyncClient() as client:
response = await client.get(
f"http://{GAMING_PC_HOST}:11434",
timeout=5.0
)
if response.status_code == 200:
return {"available": True, "reason": "Ollama responding"}
return {"available": False, "reason": f"Ollama returned {response.status_code}"}
except Exception as e:
return {"available": False, "reason": str(e)}
async def generate_ollama(
model: str,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 4000,
timeout: float = 300.0,
json_mode: bool = False
) -> str:
"""Generate text using Ollama on Gaming PC."""
import httpx
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": temperature,
"num_predict": max_tokens,
"top_p": 0.9,
}
}
if json_mode:
payload["format"] = "json"
async with httpx.AsyncClient() as client:
response = await client.post(
OLLAMA_URL,
json=payload,
timeout=timeout
)
response.raise_for_status()
data = response.json()
return data.get("response", "")
def extract_json(text: str) -> Optional[dict]:
"""Extract JSON from model response (handles markdown wrapping)."""
# Try to find JSON in code blocks
code_block = re.search(r'(?:json)?\s*(\{.*?\})\s*', text, re.DOTALL)
if code_block:
text = code_block.group(1)
# Try direct JSON extraction
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Find JSON between braces
start = text.find('{')
end = text.rfind('}')
if start >= 0 and end > start:
try:
return json.loads(text[start:end+1])
except json.JSONDecodeError:
pass
return None
--- Stage Implementations (V2 Struggle-First) ---
async def stage_strategy(
topic: str,
content_type: str,
outline: Optional[list],
progress_callback: Optional[Callable] = None
) -> StruggleFirstBrief:
"""Stage 1: Generate struggle-first brief using Llama 3.1 8B."""
if progress_callback:
progress_callback("Finding the struggle...", 10)
outline_section = ""
if outline:
outline_section = "\nOutline points:\n" + "\n".join(f"- {o}" for o in outline)
# Get grounding context from real projects
try:
from shared.grounding import get_grounding_context
grounding = get_grounding_context(topic)
except Exception:
grounding = "No grounding context available."
prompt = STRATEGY_PROMPT_V2.format(
grounding_context=grounding,
topic=topic,
content_type=content_type,
outline_section=outline_section
)
response = await generate_ollama(
model=MODELS["strategy"],
prompt=prompt,
max_tokens=1000,
json_mode=True,
timeout=30.0
)
data = extract_json(response)
if not data:
raise ValueError(f"Failed to parse strategy output: {response[:200]}")
return StruggleFirstBrief(
struggle_angle=data.get("struggle_angle", f"The time {topic} broke"),
origin_story=data.get("origin_story", ""),
attempts=data.get("attempts", []),
the_moment=data.get("the_moment", ""),
the_fix=data.get("the_fix", ""),
reflection=data.get("reflection", ""),
target_length=int(data.get("target_length", 1200))
)
async def stage_structure(
brief: StruggleFirstBrief,
progress_callback: Optional[Callable] = None
) -> StructureOutput:
"""Stage 2: Validate struggle-first brief using Qwen 2.5 7B."""
if progress_callback:
progress_callback("Validating struggle narrative...", 20)
attempts_json = json.dumps(brief.attempts)
prompt = STRUCTURE_PROMPT_V2.format(
SOVEREIGN_STACK_CONTEXT=SOVEREIGN_STACK_CONTEXT,
struggle_angle=brief.struggle_angle,
origin_story=brief.origin_story,
attempts=attempts_json,
the_moment=brief.the_moment,
the_fix=brief.the_fix
)
response = await generate_ollama(
model=MODELS["structure"],
prompt=prompt,
max_tokens=600,
json_mode=True,
timeout=30.0
)
data = extract_json(response)
if not data:
raise ValueError(f"Failed to parse structure output: {response[:200]}")
return StructureOutput(
validated=data.get("validated", True),
structure=data.get("suggested_structure", []),
estimated_word_count=int(data.get("estimated_word_count") or 1000),
concerns=data.get("concerns")
)
DRAFT_PROMPT_V2_FALLBACK = """Write a first-person narrative blog post about this incident.
{grounding_context}
What broke: {{struggle_angle}}
Origin story: {{origin_story}}
Failed attempts:
{{attempts}}
The moment of realization: {{the_moment}}
What worked: {{the_fix}}
Reflection: {{reflection}}
Writing Rules (MANDATORY):
- [ ] First person only ("I", "my", "we") - NEVER "you should"
- [ ] Specific timestamp or location in first paragraph (titanium-butler, Gaming PC, etc.)
- [ ] Use ONLY tools from grounding context above - NEVER hallucinate AWS/GCP/Azure services
- [ ] At least one internal monologue quote
- [ ] "I thought... but..." pattern somewhere
- [ ] Admit one thing you got wrong
- [ ] Mention the cost: time, sleep, relationship friction
- [ ] NO corporate buzzwords (delve, tapestry, leverage, holistic, etc.)
- [ ] NEVER invent specific dates (no "Last Tuesday", "March 15th")
- [ ] NEVER use real names (use "my spouse", "the kids" - NOT actual names)
Target: {{target_length}} words
Write the complete blog post now:"""
async def stage_draft(
brief: StruggleFirstBrief,
structure: StructureOutput,
progress_callback: Optional[Callable] = None
) -> DraftOutput:
"""Stage 3: Generate struggle-first draft using Phi-4 14B (~60-90s)."""
if progress_callback:
progress_callback("Drafting struggle narrative with Phi-4... (this takes ~90 seconds)", 30)
# Get grounding context
try:
from shared.grounding import get_grounding_context
grounding = get_grounding_context()
except Exception:
grounding = "No grounding context available."
# Build prompt using v2 struggle-first builder if available
brief_dict = {
"struggle_angle": brief.struggle_angle,
"origin_story": brief.origin_story,
"attempts": brief.attempts,
"the_moment": brief.the_moment,
"the_fix": brief.the_fix,
"reflection": brief.reflection,
"target_length": brief.target_length
}
if V2_PROMPTS_AVAILABLE:
prompt = build_struggle_first_prompt(brief_dict, style_reference=None)
# Prepend grounding context
prompt = grounding + "\n\n" + prompt
else:
# Fallback to inline template
attempts_text = "\n".join(
f"Attempt {i+1}: {a.get('attempt', '')}\nWhy it failed: {a.get('why_failed', '')}"
for i, a in enumerate(brief.attempts)
)
prompt = DRAFT_PROMPT_V2_FALLBACK.replace("{{struggle_angle}}", brief.struggle_angle) \
.replace("{{origin_story}}", brief.origin_story) \
.replace("{{attempts}}", attempts_text) \
.replace("{{the_moment}}", brief.the_moment) \
.replace("{{the_fix}}", brief.the_fix) \
.replace("{{reflection}}", brief.reflection) \
.replace("{{target_length}}", str(brief.target_length)) \
.replace("{grounding_context}", grounding)
response = await generate_ollama(
model=MODELS["draft"],
prompt=prompt,
temperature=0.7,
max_tokens=4000,
timeout=180.0 # 3 minutes for long generation
)
content = response.strip()
word_count = len(content.split())
reading_time = max(1, word_count // 200) # ~200 WPM
if progress_callback:
progress_callback("Draft complete", 90)
return DraftOutput(
content=content,
word_count=word_count,
reading_time_minutes=reading_time
)
async def stage_seo(
content: str,
progress_callback: Optional[Callable] = None
) -> SEOOutput:
"""Stage 4: Generate SEO metadata using Llama 3.1 8B."""
if progress_callback:
progress_callback("Generating SEO metadata...", 95)
# Use first 2000 chars for SEO context
excerpt_content = content[:2000]
prompt = SEO_PROMPT.format(content=excerpt_content)
response = await generate_ollama(
model=MODELS["seo"],
prompt=prompt,
max_tokens=400,
json_mode=True,
timeout=30.0
)
data = extract_json(response)
if not data:
# Fallback
return SEOOutput(
excerpt=content[:150] + "...",
tags=["homelab", "infrastructure"],
meta_description=content[:160],
keywords=["self-hosted"]
)
return SEOOutput(
excerpt=data.get("excerpt", content[:150] + "..."),
tags=data.get("tags", []),
meta_description=data.get("meta_description", content[:160]),
keywords=data.get("keywords", [])
)
def stage_compliance(
content: str,
progress_callback: Optional[Callable] = None
) -> tuple[str, ComplianceReport]:
"""Stage 5: Filter banned words, flag dates/names."""
if progress_callback:
progress_callback("Running compliance checks...", 99)
filter_instance = ComplianceFilter()
report = filter_instance.process(content)
return report.clean_text, ComplianceReport(
replacements_made=report.replacements_made,
warnings=[f"Found: '{w}'" for w in report.banned_found] if report.has_warnings else [],
is_compliant=report.is_compliant,
banned_words_found=report.banned_found,
dates_flagged=report.dates_found,
names_flagged=report.names_found
)
--- Pipeline Orchestrator ---
class PipelineOrchestrator:
"""Manages job state and runs the tiered pipeline."""
def __init__(self):
self.jobs: Dict[str, JobStatus] = {}
self.results: Dict[str, PipelineResult] = {}
self.cancelled: set = set()
def create_job(self, topic: str, content_type: ContentType) -> str:
"""Create a new pipeline job and return job ID."""
job_id = f"gen_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
self.jobs[job_id] = JobStatus(
job_id=job_id,
status=PipelineStage.QUEUED,
stage_number=0,
total_stages=5,
progress_percent=0,
started_at=datetime.now()
)
return job_id
def get_job(self, job_id: str) -> Optional[JobStatus]:
"""Get current status of a job."""
return self.jobs.get(job_id)
def cancel_job(self, job_id: str) -> bool:
"""Mark a job for cancellation."""
if job_id in self.jobs:
self.cancelled.add(job_id)
return True
return False
def _update_progress(self, job_id: str, stage: PipelineStage, message: str, percent: int):
"""Update job progress and calculate estimated time remaining."""
if job_id in self.jobs:
job = self.jobs[job_id]
job.status = stage
job.stage_number = list(STAGE_WEIGHTS.keys()).index(stage)
job.progress_percent = percent
job.current_stage_name = message
# Calculate estimated seconds remaining based on elapsed time and progress
if job.started_at and percent > 0:
elapsed = (datetime.now() - job.started_at).total_seconds()
if percent < 100:
# Estimate: if X% took elapsed seconds, (100-X)% will take...
estimated_total = elapsed / (percent / 100)
remaining = int(estimated_total - elapsed)
job.estimated_seconds_remaining = max(5, remaining) # At least 5 seconds
else:
job.estimated_seconds_remaining = 0
async def run_pipeline(
self,
job_id: str,
topic: str,
content_type: ContentType,
outline: Optional[list] = None,
context: Optional[str] = None
) -> PipelineResult:
"""Run the full 5-stage v2 struggle-first pipeline."""
partial_outputs = {}
def progress_cb(message: str, percent: int):
self._update_progress(job_id, PipelineStage(self.jobs[job_id].status), message, percent)
try:
# Stage 1: Strategy (V2 Struggle-First)
self._update_progress(job_id, PipelineStage.STRATEGY, "Finding the struggle...", 5)
brief = await stage_strategy(topic, content_type.value, outline, progress_cb)
partial_outputs["brief"] = brief.dict()
if job_id in self.cancelled:
raise asyncio.CancelledError()
# Stage 2: Structure (V2)
self._update_progress(job_id, PipelineStage.STRUCTURE, "Validating struggle narrative...", 15)
structure = await stage_structure(brief, progress_cb)
partial_outputs["structure"] = structure.dict()
if not structure.validated and structure.concerns:
# Continue with warnings
pass
if job_id in self.cancelled:
raise asyncio.CancelledError()
# Stage 3: Draft (V2 Struggle-First)
self._update_progress(job_id, PipelineStage.DRAFT, "Drafting struggle narrative...", 30)
draft = await stage_draft(brief, structure, progress_cb)
partial_outputs["draft"] = {"word_count": draft.word_count, "reading_time": draft.reading_time_minutes}
if job_id in self.cancelled:
raise asyncio.CancelledError()
# Stage 4: SEO
self._update_progress(job_id, PipelineStage.SEO, "Generating SEO metadata...", 95)
seo = await stage_seo(draft.content, progress_cb)
partial_outputs["seo"] = seo.dict()
# Stage 5: Compliance
self._update_progress(job_id, PipelineStage.COMPLIANCE, "Running compliance checks...", 99)
clean_content, compliance = stage_compliance(draft.content, progress_cb)
partial_outputs["compliance"] = compliance.dict()
# Finalize - use struggle_angle as title if no better option
title = brief.struggle_angle if not brief.struggle_angle.startswith("The time") else f"The Night {topic} Broke"
result = PipelineResult(
title=title,
content=clean_content,
excerpt=seo.excerpt,
tags=seo.tags,
meta_description=seo.meta_description,
word_count=draft.word_count,
reading_time_minutes=draft.reading_time_minutes,
compliance=compliance
)
self.results[job_id] = result
self._update_progress(job_id, PipelineStage.COMPLETED, "Complete", 100)
self.jobs[job_id].completed_at = datetime.now()
return result
except asyncio.CancelledError:
self._update_progress(job_id, PipelineStage.CANCELLED, "Cancelled", 0)
raise
except Exception as e:
self.jobs[job_id].error = str(e)
self._update_progress(job_id, PipelineStage.FAILED, f"Failed: {e}", 0)
raise
Global orchestrator instance
_orchestrator: Optional[PipelineOrchestrator] = None
def get_orchestrator() -> PipelineOrchestrator:
"""Get or create the global orchestrator."""
global _orchestrator
if _orchestrator is None:
_orchestrator = PipelineOrchestrator()
return _orchestrator