šŸ“„ test_v2_pipeline.py 9,755 bytes Apr 22, 2026 šŸ“‹ Raw

!/usr/bin/env python3

"""
Content Pipeline v2 — Live Integration Test
Tests the new struggle-first pipeline with dns-night style injection

Usage:
python test_v2_pipeline.py "Your test topic here"
python test_v2_pipeline.py # Uses default topic
"""

import sys
import json
import requests
from datetime import datetime

Configuration

API_BASE = "http://127.0.0.1:8000"
ADMIN_TOKEN = "hoffdesk-admin-2025"
GAMING_PC_URL = "http://matt-pc.tail864e81.ts.net:11434"

def check_health():
"""Check if API and Gaming PC are available."""
print("=" * 60)
print("CONTENT PIPELINE v2 — INTEGRATION TEST")
print("=" * 60)

# Check API
try:
    r = requests.get(f"{API_BASE}/health", timeout=5)
    if r.status_code == 200:
        print("āœ… API: Running")
    else:
        print(f"āŒ API: Status {r.status_code}")
        return False
except Exception as e:
    print(f"āŒ API: Not reachable ({e})")
    return False

# Check Gaming PC
try:
    r = requests.get(f"{GAMING_PC_URL}/api/tags", timeout=5)
    if r.status_code == 200:
        models = r.json().get("models", [])
        phi4 = any(m.get("name") == "phi4:14b" for m in models)
        if phi4:
            print("āœ… Gaming PC: phi4:14b available")
        else:
            print("āš ļø  Gaming PC: Running but phi4:14b not found")
    else:
        print(f"āš ļø  Gaming PC: Status {r.status_code}")
except Exception as e:
    print(f"āš ļø  Gaming PC: Not reachable ({e})")

# Check style references
try:
    r = requests.get(
        f"{API_BASE}/api/v1/content/briefs/style-references",
        headers={"X-Admin-Token": ADMIN_TOKEN},
        timeout=5
    )
    if r.status_code == 200:
        styles = r.json()
        dns_night = any(s.get("slug") == "dns-night" for s in styles)
        if dns_night:
            print("āœ… Style: dns-night.json loaded")
        else:
            print("āš ļø  Style: dns-night.json not found in styles")
    else:
        print(f"āš ļø  Style endpoint: {r.status_code}")
except Exception as e:
    print(f"āš ļø  Style check failed: {e}")

return True

def create_brief(topic="Test: The Morning I Broke Coffee"):
"""Create a test brief."""
print("\n" + "-" * 60)
print("STEP 1: Creating brief")
print("-" * 60)

payload = {
    "title": topic,
    "struggle_angle": "I broke the coffee maker at 6 AM when Aundrea needed caffeine",
    "origin_story": "It was Monday morning. Aundrea was already running late for work. I was 'just cleaning' the coffee maker before she woke up...",
    "attempts": [
        {"attempt": "Took it apart to clean the filter", "why_failed": "Couldn't remember how the plastic pieces fit together"},
        {"attempt": "Watched a YouTube disassembly video", "why_failed": "Our model was 5 years older than the video"}
    ],
    "the_moment": "At 6:17 AM, I found the broken plastic piece under the sink. It had been cracked for months. I was the problem all along.",
    "the_fix": "Ordered a new coffee maker on Amazon Same-Day. It arrived by 9 AM. Aundrea worked from home that morning.",
    "reflection": "Never disassemble critical appliances before morning coffee. Also: always have backup caffeine in the house.",
    "style_reference": "dns-night",
    "target_length": 800
}

try:
    r = requests.post(
        f"{API_BASE}/api/v1/content/briefs/",
        headers={
            "Content-Type": "application/json",
            "X-Admin-Token": ADMIN_TOKEN
        },
        json=payload,
        timeout=10
    )

    if r.status_code == 200 or r.status_code == 201:
        data = r.json()
        brief_id = data.get("brief_id")
        print(f"āœ… Brief created: {brief_id}")
        print(f"   Status: {data.get('status')}")
        return brief_id
    else:
        print(f"āŒ Failed to create brief: {r.status_code}")
        print(f"   Response: {r.text[:200]}")
        return None
except Exception as e:
    print(f"āŒ Error creating brief: {e}")
    return None

def submit_for_approval(brief_id):
"""Submit brief for approval."""
print("\n" + "-" * 60)
print("STEP 2: Submitting for approval")
print("-" * 60)

try:
    r = requests.post(
        f"{API_BASE}/api/v1/content/briefs/{brief_id}/submit",
        headers={"X-Admin-Token": ADMIN_TOKEN},
        timeout=10
    )

    if r.status_code == 200:
        data = r.json()
        print(f"āœ… Submitted: {data.get('status')}")
        print(f"   Check Telegram for approval DM")
        return True
    else:
        print(f"āŒ Submit failed: {r.status_code}")
        print(f"   {r.text[:200]}")
        return False
except Exception as e:
    print(f"āŒ Error submitting: {e}")
    return False

def approve_and_generate(brief_id):
"""Approve brief and trigger generation."""
print("\n" + "-" * 60)
print("STEP 3: Approving and triggering generation")
print("-" * 60)
print("ā±ļø This will take 60-120 seconds...")
print("-" * 60)

try:
    r = requests.post(
        f"{API_BASE}/api/v1/content/briefs/{brief_id}/approve",
        headers={"X-Admin-Token": ADMIN_TOKEN},
        timeout=300  # 5 minute timeout for generation
    )

    if r.status_code == 200:
        data = r.json()
        print(f"āœ… Generation complete")
        return data
    else:
        print(f"āŒ Generation failed: {r.status_code}")
        print(f"   {r.text[:500]}")
        return None
except Exception as e:
    print(f"āŒ Error during generation: {e}")
    return None

def check_output(brief_id):
"""Check generated output."""
print("\n" + "-" * 60)
print("STEP 4: Checking output")
print("-" * 60)

try:
    r = requests.get(
        f"{API_BASE}/api/v1/content/briefs/{brief_id}/output",
        headers={"X-Admin-Token": ADMIN_TOKEN},
        timeout=10
    )

    if r.status_code == 200:
        data = r.json()
        score = data.get("struggle_score", 0)
        content = data.get("content", "")

        print(f"āœ… Output retrieved")
        print(f"   Struggle score: {score}")
        print(f"   Content length: {len(content)} chars")

        if score >= 75:
            print(f"   āœ… Score: Strong struggle narrative")
        elif score >= 50:
            print(f"   āš ļø  Score: Moderate — consider revision")
        else:
            print(f"   āŒ Score: Needs more personal stakes")

        # Show excerpt
        print(f"\nšŸ“ Preview (first 500 chars):")
        print(f"{'-'*60}")
        print(content[:500] + "..." if len(content) > 500 else content)
        print(f"{'-'*60}")

        return data
    else:
        print(f"āŒ Failed to get output: {r.status_code}")
        return None
except Exception as e:
    print(f"āŒ Error getting output: {e}")
    return None

def save_results(brief_id, output_data):
"""Save results to file."""
filename = f"/home/hoffmann_admin/.openclaw/workspace/v2-test-result-{brief_id[:8]}.json"

result = {
    "test_run": datetime.now().isoformat(),
    "brief_id": brief_id,
    "success": output_data is not None,
    "output": output_data
}

try:
    with open(filename, 'w') as f:
        json.dump(result, f, indent=2)
    print(f"\nšŸ’¾ Results saved: {filename}")
except Exception as e:
    print(f"āš ļø  Could not save results: {e}")

def main():
import argparse
parser = argparse.ArgumentParser(description="Test Content Pipeline v2")
parser.add_argument("--topic", default="Test: The Morning I Broke Coffee", help="Test topic")
parser.add_argument("--health-only", action="store_true", help="Only check health, skip generation")
args = parser.parse_args()

# Check health
if not check_health():
    print("\nāŒ Health check failed. Aborting.")
    return 1

if args.health_only:
    print("\nāœ… Health check complete.")
    return 0

# Create brief
brief_id = create_brief(args.topic)
if not brief_id:
    print("\nāŒ Failed to create brief. Aborting.")
    return 1

# Submit
if not submit_for_approval(brief_id):
    print("\nāŒ Failed to submit. Aborting.")
    return 1

# Approve and generate
print("\nā³ Waiting for generation (60-120s)...")
result = approve_and_generate(brief_id)
if not result:
    print("\nāŒ Generation failed.")
    return 1

# Check output
output = check_output(brief_id)
if not output:
    print("\nāŒ Failed to retrieve output.")
    return 1

# Save results
save_results(brief_id, output)

# Summary
print("\n" + "=" * 60)
print("āœ… INTEGRATION TEST COMPLETE")
print("=" * 60)
score = output.get("struggle_score", 0)
print(f"Brief ID: {brief_id}")
print(f"Struggle Score: {score}/100")
print(f"Status: {'āœ… PASS' if score >= 50 else 'āŒ FAIL'}")
print(f"Next: Check Telegram for approval workflow")
print(f"   or view brief at: /admin/pipeline/brief/{brief_id}")

return 0

if name == "main":
sys.exit(main())