""" Incident Capture System — Real struggle logging for content generation. Usage: from incidents import capture # Log an incident as it happens capture.incident( title="Cloudflare Error 1033 on notes.hoffdesk.com", systems=["cloudflared", "uvicorn", "systemd"], error="Error 1033: Cloudflare can't reach origin", attempts=[ {"what": "Checked uvicorn health", "result": "Running but bound to 127.0.0.1"}, {"what": "Added systemd drop-in for 0.0.0.0", "result": "Local health passed, still Error 1033"}, {"what": "Checked cloudflared service file", "result": "Found override.conf with old syntax"}, ], fix="Removed stale override.conf, restarted with local config", reflection="Should have run `systemctl cat` first. Always check for drop-ins.", cost={"time_hours": 2.5, "sleep_lost": True} ) Stored in: ~/.openclaw/workspace-socrates/incidents/ Format: YAML frontmatter + Markdown body """ import json import os import re import yaml from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional INCIDENTS_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/incidents") INCIDENTS_DIR.mkdir(parents=True, exist_ok=True) def _slugify(text: str) -> str: """Create URL-safe slug from title.""" slug = re.sub(r'[^\w\s-]', '', text.lower()) slug = re.sub(r'[-\s]+', '-', slug) return slug.strip('-')[:50] def _now_iso() -> str: """Current UTC timestamp.""" return datetime.now(timezone.utc).isoformat() def incident( title: str, systems: List[str], error: str, attempts: List[Dict[str, str]], fix: str, reflection: str, cost: Optional[Dict] = None, tags: Optional[List[str]] = None, related_urls: Optional[List[str]] = None, resolved: bool = True, ) -> Path: """ Capture a real incident for later content generation. Args: title: What broke (short, specific) systems: Which systems were involved (e.g., ["cloudflared", "uvicorn"]) error: The specific error message or symptom attempts: List of {what: "...", result: "..."} for each attempt fix: What actually worked reflection: Honest admission of what you'd do differently cost: Optional {time_hours: float, sleep_lost: bool, relationship_friction: bool} tags: Optional tags for filtering related_urls: Links to logs, commits, etc. resolved: Whether the incident is fully resolved Returns: Path to saved incident file """ slug = _slugify(title) timestamp = _now_iso() filename = f"{timestamp[:10]}-{slug}.md" filepath = INCIDENTS_DIR / filename # Build frontmatter frontmatter = { "title": title, "date": timestamp, "systems": systems, "error": error, "resolved": resolved, "tags": tags or ["incident"], "cost": cost or {}, } if related_urls: frontmatter["related"] = related_urls # Build Markdown body body_lines = [ f"# {title}", "", f"**Systems:** {', '.join(systems)}", f"**Error:** `{error}`", "", "## Attempts", "", ] for i, attempt in enumerate(attempts, 1): body_lines.extend([ f"### Attempt {i}: {attempt['what']}", "", f"Result: {attempt['result']}", "", ]) body_lines.extend([ "## The Fix", "", fix, "", "## Reflection", "", reflection, "", ]) if cost: body_lines.extend([ "## Cost", "", ]) if cost.get("time_hours"): body_lines.append(f"- Time: {cost['time_hours']} hours") if cost.get("sleep_lost"): body_lines.append("- Sleep: Lost") if cost.get("relationship_friction"): body_lines.append("- Relationship friction: Yes") body_lines.append("") # Write file with open(filepath, "w") as f: f.write("---\n") f.write(yaml.dump(frontmatter, default_flow_style=False, sort_keys=False)) f.write("---\n\n") f.write("\n".join(body_lines)) return filepath def list_incidents(tag: Optional[str] = None, resolved_only: bool = True) -> List[Dict]: """ List captured incidents, optionally filtered. Returns list of dicts with title, date, systems, slug, path. """ incidents = [] for filepath in sorted(INCIDENTS_DIR.glob("*.md"), reverse=True): try: with open(filepath) as f: content = f.read() # Parse frontmatter if content.startswith("---"): _, fm, body = content.split("---", 2) frontmatter = yaml.safe_load(fm) else: continue # Filter by tag if tag and tag not in frontmatter.get("tags", []): continue # Filter by resolution if resolved_only and not frontmatter.get("resolved", True): continue incidents.append({ "title": frontmatter.get("title", "Untitled"), "date": frontmatter.get("date", ""), "systems": frontmatter.get("systems", []), "error": frontmatter.get("error", ""), "resolved": frontmatter.get("resolved", True), "tags": frontmatter.get("tags", []), "slug": filepath.stem, "path": str(filepath), }) except Exception as e: print(f"Warning: Could not parse {filepath}: {e}") continue return incidents def get_incident(slug: str) -> Optional[Dict]: """Get full incident data by slug.""" filepath = INCIDENTS_DIR / f"{slug}.md" if not filepath.exists(): return None with open(filepath) as f: content = f.read() # Parse frontmatter and body if content.startswith("---"): _, fm, body = content.split("---", 2) frontmatter = yaml.safe_load(fm) else: frontmatter = {} body = content # Extract attempts from body attempts = [] for match in re.finditer(r'### Attempt (\d+): (.+?)\n\nResult: (.+?)(?=\n\n|$)', body, re.DOTALL): attempts.append({ "attempt": match.group(2).strip(), "why_failed": match.group(3).strip() }) # Extract fix and reflection from body sections fix_match = re.search(r'## The Fix\n\n(.+?)(?=\n## |$)', body, re.DOTALL) reflection_match = re.search(r'## Reflection\n\n(.+?)(?=\n## |$)', body, re.DOTALL) return { "title": frontmatter.get("title", ""), "date": frontmatter.get("date", ""), "systems": frontmatter.get("systems", []), "error": frontmatter.get("error", ""), "attempts": attempts, "fix": fix_match.group(1).strip() if fix_match else frontmatter.get("fix", ""), "reflection": reflection_match.group(1).strip() if reflection_match else frontmatter.get("reflection", ""), "cost": frontmatter.get("cost", {}), "tags": frontmatter.get("tags", []), "resolved": frontmatter.get("resolved", True), "body": body, } def to_brief(slug: str) -> Dict: """ Convert incident to content brief format for pipeline. Returns dict matching the brief schema used by build_struggle_first_prompt(). """ incident_data = get_incident(slug) if not incident_data: raise ValueError(f"Incident not found: {slug}") return { "struggle_angle": incident_data["title"], "origin_story": f"I was working on {', '.join(incident_data['systems'])} when {incident_data['error']}.", "attempts": incident_data["attempts"], "the_moment": incident_data.get("reflection", "").split(".")[0] + ".", # First sentence "the_fix": incident_data.get("fix", ""), "reflection": incident_data.get("reflection", ""), "target_length": 1200, "systems": incident_data["systems"], "error": incident_data["error"], "date": incident_data["date"], } # CLI entry point if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: python incidents.py [list|get |to-brief ]") sys.exit(1) cmd = sys.argv[1] if cmd == "list": incidents = list_incidents() for inc in incidents[:10]: status = "✓" if inc["resolved"] else "○" print(f"{status} {inc['date'][:10]} | {inc['title']} ({', '.join(inc['systems'])})") elif cmd == "get" and len(sys.argv) > 2: data = get_incident(sys.argv[2]) if data: print(yaml.dump(data, default_flow_style=False)) else: print(f"Not found: {sys.argv[2]}") elif cmd == "to-brief" and len(sys.argv) > 2: brief = to_brief(sys.argv[2]) print(yaml.dump(brief, default_flow_style=False)) else: print("Unknown command") sys.exit(1)