šŸ“„ test_pipeline_live.py 4,998 bytes Apr 21, 2026 šŸ“‹ Raw

!/usr/bin/env python3

"""
Live test of the tiered content generation pipeline.
Actually calls models on the Gaming PC.

Usage:
python test_pipeline_live.py "My topic here"
python test_pipeline_live.py "How I migrated from Google Calendar to CalDAV" --type how_i_solved
"""

import asyncio
import sys
import argparse

sys.path.insert(0, '/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api')

from content import get_orchestrator, check_gaming_pc_available, ContentType

async def test_gaming_pc():
"""Check if Gaming PC is available."""
print("Checking Gaming PC availability...")
status = await check_gaming_pc_available()
print(f" Available: {status['available']}")
print(f" Reason: {status['reason']}")
return status['available']

async def run_pipeline(topic: str, content_type: str):
"""Run the full pipeline and print results."""
orchestrator = get_orchestrator()

# Map string to enum
type_map = {
    "roundup": ContentType.ROUNDUP,
    "how_i_solved": ContentType.HOW_I_SOLVED,
    "build_log": ContentType.BUILD_LOG,
    "essay": ContentType.ESSAY,
    "tutorial": ContentType.TUTORIAL,
}
ct = type_map.get(content_type, ContentType.HOW_I_SOLVED)

job_id = orchestrator.create_job(topic, ct)
print(f"\nCreated job: {job_id}")
print(f"Topic: {topic}")
print(f"Type: {content_type}")
print("-" * 60)

# Run pipeline
import time
start = time.time()

try:
    result = await orchestrator.run_pipeline(
        job_id=job_id,
        topic=topic,
        content_type=ct
    )

    elapsed = time.time() - start

    print(f"\n{'=' * 60}")
    print(f"PIPELINE COMPLETE in {elapsed:.1f}s")
    print(f"{'=' * 60}")

    print(f"\nšŸ“„ TITLE: {result.title}")
    print(f"šŸ“ WORD COUNT: {result.word_count}")
    print(f"ā±ļø READING TIME: {result.reading_time_minutes} min")

    print(f"\nšŸ·ļø TAGS: {', '.join(result.tags)}")
    print(f"šŸ“ EXCERPT: {result.excerpt}")
    print(f"šŸ” META: {result.meta_description}")

    print(f"\nāœ… COMPLIANCE:")
    print(f"   Replacements: {result.compliance.replacements_made}")
    print(f"   Warnings: {len(result.compliance.warnings)}")
    if result.compliance.warnings:
        for w in result.compliance.warnings:
            print(f"      - {w}")

    print(f"\n{'=' * 60}")
    print("DRAFT PREVIEW (first 1000 chars):")
    print(f"{'=' * 60}")
    print(result.content[:1000])
    print("...")

    # Save full output
    import json
    from datetime import datetime

    output = {
        "job_id": job_id,
        "topic": topic,
        "content_type": content_type,
        "elapsed_seconds": elapsed,
        "generated_at": datetime.now().isoformat(),
        "result": {
            "title": result.title,
            "content": result.content,
            "excerpt": result.excerpt,
            "tags": result.tags,
            "meta_description": result.meta_description,
            "word_count": result.word_count,
            "reading_time_minutes": result.reading_time_minutes,
            "compliance": result.compliance.dict()
        }
    }

    output_file = f"/home/hoffmann_admin/.openclaw/workspace-socrates/test_pipeline_{job_id}.json"
    with open(output_file, 'w') as f:
        json.dump(output, f, indent=2)

    print(f"\nšŸ’¾ Full output saved to: {output_file}")

except Exception as e:
    print(f"\nāŒ PIPELINE FAILED: {e}")
    import traceback
    traceback.print_exc()
    return 1

return 0

async def main():
parser = argparse.ArgumentParser(description="Test the tiered content pipeline")
parser.add_argument("topic", nargs="?", default="How I migrated from Google Calendar to a self-hosted CalDAV server", help="Topic to generate content about")
parser.add_argument("--type", default="how_i_solved", choices=["roundup", "how_i_solved", "build_log", "essay", "tutorial"], help="Content type")
parser.add_argument("--check-only", action="store_true", help="Only check Gaming PC availability")

args = parser.parse_args()

print("=" * 60)
print("TIERED CONTENT PIPELINE - LIVE TEST")
print("=" * 60)

# Check Gaming PC first
if not await test_gaming_pc():
    print("\nāŒ Gaming PC not available. Aborting.")
    return 1

if args.check_only:
    return 0

print("\n" + "=" * 60)
print(f"Starting pipeline for: {args.topic}")
print(f"Expected runtime: ~90 seconds")
print("=" * 60 + "\n")

return await run_pipeline(args.topic, args.type)

if name == "main":
exit_code = asyncio.run(main())
sys.exit(exit_code)