#!/usr/bin/env python3 """ Live test of the tiered content generation pipeline. Actually calls models on the Gaming PC. Usage: python test_pipeline_live.py "My topic here" python test_pipeline_live.py "How I migrated from Google Calendar to CalDAV" --type how_i_solved """ import asyncio import sys import argparse sys.path.insert(0, '/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api') from content import get_orchestrator, check_gaming_pc_available, ContentType async def test_gaming_pc(): """Check if Gaming PC is available.""" print("Checking Gaming PC availability...") status = await check_gaming_pc_available() print(f" Available: {status['available']}") print(f" Reason: {status['reason']}") return status['available'] async def run_pipeline(topic: str, content_type: str): """Run the full pipeline and print results.""" orchestrator = get_orchestrator() # Map string to enum type_map = { "roundup": ContentType.ROUNDUP, "how_i_solved": ContentType.HOW_I_SOLVED, "build_log": ContentType.BUILD_LOG, "essay": ContentType.ESSAY, "tutorial": ContentType.TUTORIAL, } ct = type_map.get(content_type, ContentType.HOW_I_SOLVED) job_id = orchestrator.create_job(topic, ct) print(f"\nCreated job: {job_id}") print(f"Topic: {topic}") print(f"Type: {content_type}") print("-" * 60) # Run pipeline import time start = time.time() try: result = await orchestrator.run_pipeline( job_id=job_id, topic=topic, content_type=ct ) elapsed = time.time() - start print(f"\n{'=' * 60}") print(f"PIPELINE COMPLETE in {elapsed:.1f}s") print(f"{'=' * 60}") print(f"\nšŸ“„ TITLE: {result.title}") print(f"šŸ“ WORD COUNT: {result.word_count}") print(f"ā±ļø READING TIME: {result.reading_time_minutes} min") print(f"\nšŸ·ļø TAGS: {', '.join(result.tags)}") print(f"šŸ“ EXCERPT: {result.excerpt}") print(f"šŸ” META: {result.meta_description}") print(f"\nāœ… COMPLIANCE:") print(f" Replacements: {result.compliance.replacements_made}") print(f" Warnings: {len(result.compliance.warnings)}") if result.compliance.warnings: for w in result.compliance.warnings: print(f" - {w}") print(f"\n{'=' * 60}") print("DRAFT PREVIEW (first 1000 chars):") print(f"{'=' * 60}") print(result.content[:1000]) print("...") # Save full output import json from datetime import datetime output = { "job_id": job_id, "topic": topic, "content_type": content_type, "elapsed_seconds": elapsed, "generated_at": datetime.now().isoformat(), "result": { "title": result.title, "content": result.content, "excerpt": result.excerpt, "tags": result.tags, "meta_description": result.meta_description, "word_count": result.word_count, "reading_time_minutes": result.reading_time_minutes, "compliance": result.compliance.dict() } } output_file = f"/home/hoffmann_admin/.openclaw/workspace-socrates/test_pipeline_{job_id}.json" with open(output_file, 'w') as f: json.dump(output, f, indent=2) print(f"\nšŸ’¾ Full output saved to: {output_file}") except Exception as e: print(f"\nāŒ PIPELINE FAILED: {e}") import traceback traceback.print_exc() return 1 return 0 async def main(): parser = argparse.ArgumentParser(description="Test the tiered content pipeline") parser.add_argument("topic", nargs="?", default="How I migrated from Google Calendar to a self-hosted CalDAV server", help="Topic to generate content about") parser.add_argument("--type", default="how_i_solved", choices=["roundup", "how_i_solved", "build_log", "essay", "tutorial"], help="Content type") parser.add_argument("--check-only", action="store_true", help="Only check Gaming PC availability") args = parser.parse_args() print("=" * 60) print("TIERED CONTENT PIPELINE - LIVE TEST") print("=" * 60) # Check Gaming PC first if not await test_gaming_pc(): print("\nāŒ Gaming PC not available. Aborting.") return 1 if args.check_only: return 0 print("\n" + "=" * 60) print(f"Starting pipeline for: {args.topic}") print(f"Expected runtime: ~90 seconds") print("=" * 60 + "\n") return await run_pipeline(args.topic, args.type) if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)