!/usr/bin/env python3
"""
Live test of the tiered content generation pipeline.
Actually calls models on the Gaming PC.
Usage:
python test_pipeline_live.py "My topic here"
python test_pipeline_live.py "How I migrated from Google Calendar to CalDAV" --type how_i_solved
"""
import asyncio
import sys
import argparse
sys.path.insert(0, '/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api')
from content import get_orchestrator, check_gaming_pc_available, ContentType
async def test_gaming_pc():
"""Check if Gaming PC is available."""
print("Checking Gaming PC availability...")
status = await check_gaming_pc_available()
print(f" Available: {status['available']}")
print(f" Reason: {status['reason']}")
return status['available']
async def run_pipeline(topic: str, content_type: str):
"""Run the full pipeline and print results."""
orchestrator = get_orchestrator()
# Map string to enum
type_map = {
"roundup": ContentType.ROUNDUP,
"how_i_solved": ContentType.HOW_I_SOLVED,
"build_log": ContentType.BUILD_LOG,
"essay": ContentType.ESSAY,
"tutorial": ContentType.TUTORIAL,
}
ct = type_map.get(content_type, ContentType.HOW_I_SOLVED)
job_id = orchestrator.create_job(topic, ct)
print(f"\nCreated job: {job_id}")
print(f"Topic: {topic}")
print(f"Type: {content_type}")
print("-" * 60)
# Run pipeline
import time
start = time.time()
try:
result = await orchestrator.run_pipeline(
job_id=job_id,
topic=topic,
content_type=ct
)
elapsed = time.time() - start
print(f"\n{'=' * 60}")
print(f"PIPELINE COMPLETE in {elapsed:.1f}s")
print(f"{'=' * 60}")
print(f"\nš TITLE: {result.title}")
print(f"š WORD COUNT: {result.word_count}")
print(f"ā±ļø READING TIME: {result.reading_time_minutes} min")
print(f"\nš·ļø TAGS: {', '.join(result.tags)}")
print(f"š EXCERPT: {result.excerpt}")
print(f"š META: {result.meta_description}")
print(f"\nā
COMPLIANCE:")
print(f" Replacements: {result.compliance.replacements_made}")
print(f" Warnings: {len(result.compliance.warnings)}")
if result.compliance.warnings:
for w in result.compliance.warnings:
print(f" - {w}")
print(f"\n{'=' * 60}")
print("DRAFT PREVIEW (first 1000 chars):")
print(f"{'=' * 60}")
print(result.content[:1000])
print("...")
# Save full output
import json
from datetime import datetime
output = {
"job_id": job_id,
"topic": topic,
"content_type": content_type,
"elapsed_seconds": elapsed,
"generated_at": datetime.now().isoformat(),
"result": {
"title": result.title,
"content": result.content,
"excerpt": result.excerpt,
"tags": result.tags,
"meta_description": result.meta_description,
"word_count": result.word_count,
"reading_time_minutes": result.reading_time_minutes,
"compliance": result.compliance.dict()
}
}
output_file = f"/home/hoffmann_admin/.openclaw/workspace-socrates/test_pipeline_{job_id}.json"
with open(output_file, 'w') as f:
json.dump(output, f, indent=2)
print(f"\nš¾ Full output saved to: {output_file}")
except Exception as e:
print(f"\nā PIPELINE FAILED: {e}")
import traceback
traceback.print_exc()
return 1
return 0
async def main():
parser = argparse.ArgumentParser(description="Test the tiered content pipeline")
parser.add_argument("topic", nargs="?", default="How I migrated from Google Calendar to a self-hosted CalDAV server", help="Topic to generate content about")
parser.add_argument("--type", default="how_i_solved", choices=["roundup", "how_i_solved", "build_log", "essay", "tutorial"], help="Content type")
parser.add_argument("--check-only", action="store_true", help="Only check Gaming PC availability")
args = parser.parse_args()
print("=" * 60)
print("TIERED CONTENT PIPELINE - LIVE TEST")
print("=" * 60)
# Check Gaming PC first
if not await test_gaming_pc():
print("\nā Gaming PC not available. Aborting.")
return 1
if args.check_only:
return 0
print("\n" + "=" * 60)
print(f"Starting pipeline for: {args.topic}")
print(f"Expected runtime: ~90 seconds")
print("=" * 60 + "\n")
return await run_pipeline(args.topic, args.type)
if name == "main":
exit_code = asyncio.run(main())
sys.exit(exit_code)