"""
Incident Capture System — Real struggle logging for content generation.
Usage:
from incidents import capture
# Log an incident as it happens
capture.incident(
title="Cloudflare Error 1033 on notes.hoffdesk.com",
systems=["cloudflared", "uvicorn", "systemd"],
error="Error 1033: Cloudflare can't reach origin",
attempts=[
{"what": "Checked uvicorn health", "result": "Running but bound to 127.0.0.1"},
{"what": "Added systemd drop-in for 0.0.0.0", "result": "Local health passed, still Error 1033"},
{"what": "Checked cloudflared service file", "result": "Found override.conf with old syntax"},
],
fix="Removed stale override.conf, restarted with local config",
reflection="Should have run `systemctl cat` first. Always check for drop-ins.",
cost={"time_hours": 2.5, "sleep_lost": True}
)
Stored in: ~/.openclaw/workspace-socrates/incidents/
Format: YAML frontmatter + Markdown body
"""
import json
import os
import re
import yaml
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
INCIDENTS_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/incidents")
INCIDENTS_DIR.mkdir(parents=True, exist_ok=True)
def _slugify(text: str) -> str:
"""Create URL-safe slug from title."""
slug = re.sub(r'[^\w\s-]', '', text.lower())
slug = re.sub(r'[-\s]+', '-', slug)
return slug.strip('-')[:50]
def _now_iso() -> str:
"""Current UTC timestamp."""
return datetime.now(timezone.utc).isoformat()
def incident(
title: str,
systems: List[str],
error: str,
attempts: List[Dict[str, str]],
fix: str,
reflection: str,
cost: Optional[Dict] = None,
tags: Optional[List[str]] = None,
related_urls: Optional[List[str]] = None,
resolved: bool = True,
) -> Path:
"""
Capture a real incident for later content generation.
Args:
title: What broke (short, specific)
systems: Which systems were involved (e.g., ["cloudflared", "uvicorn"])
error: The specific error message or symptom
attempts: List of {what: "...", result: "..."} for each attempt
fix: What actually worked
reflection: Honest admission of what you'd do differently
cost: Optional {time_hours: float, sleep_lost: bool, relationship_friction: bool}
tags: Optional tags for filtering
related_urls: Links to logs, commits, etc.
resolved: Whether the incident is fully resolved
Returns:
Path to saved incident file
"""
slug = _slugify(title)
timestamp = _now_iso()
filename = f"{timestamp[:10]}-{slug}.md"
filepath = INCIDENTS_DIR / filename
# Build frontmatter
frontmatter = {
"title": title,
"date": timestamp,
"systems": systems,
"error": error,
"resolved": resolved,
"tags": tags or ["incident"],
"cost": cost or {},
}
if related_urls:
frontmatter["related"] = related_urls
# Build Markdown body
body_lines = [
f"# {title}",
"",
f"**Systems:** {', '.join(systems)}",
f"**Error:** `{error}`",
"",
"## Attempts",
"",
]
for i, attempt in enumerate(attempts, 1):
body_lines.extend([
f"### Attempt {i}: {attempt['what']}",
"",
f"Result: {attempt['result']}",
"",
])
body_lines.extend([
"## The Fix",
"",
fix,
"",
"## Reflection",
"",
reflection,
"",
])
if cost:
body_lines.extend([
"## Cost",
"",
])
if cost.get("time_hours"):
body_lines.append(f"- Time: {cost['time_hours']} hours")
if cost.get("sleep_lost"):
body_lines.append("- Sleep: Lost")
if cost.get("relationship_friction"):
body_lines.append("- Relationship friction: Yes")
body_lines.append("")
# Write file
with open(filepath, "w") as f:
f.write("---\n")
f.write(yaml.dump(frontmatter, default_flow_style=False, sort_keys=False))
f.write("---\n\n")
f.write("\n".join(body_lines))
return filepath
def list_incidents(tag: Optional[str] = None, resolved_only: bool = True) -> List[Dict]:
"""
List captured incidents, optionally filtered.
Returns list of dicts with title, date, systems, slug, path.
"""
incidents = []
for filepath in sorted(INCIDENTS_DIR.glob("*.md"), reverse=True):
try:
with open(filepath) as f:
content = f.read()
# Parse frontmatter
if content.startswith("---"):
_, fm, body = content.split("---", 2)
frontmatter = yaml.safe_load(fm)
else:
continue
# Filter by tag
if tag and tag not in frontmatter.get("tags", []):
continue
# Filter by resolution
if resolved_only and not frontmatter.get("resolved", True):
continue
incidents.append({
"title": frontmatter.get("title", "Untitled"),
"date": frontmatter.get("date", ""),
"systems": frontmatter.get("systems", []),
"error": frontmatter.get("error", ""),
"resolved": frontmatter.get("resolved", True),
"tags": frontmatter.get("tags", []),
"slug": filepath.stem,
"path": str(filepath),
})
except Exception as e:
print(f"Warning: Could not parse {filepath}: {e}")
continue
return incidents
def get_incident(slug: str) -> Optional[Dict]:
"""Get full incident data by slug."""
filepath = INCIDENTS_DIR / f"{slug}.md"
if not filepath.exists():
return None
with open(filepath) as f:
content = f.read()
# Parse frontmatter and body
if content.startswith("---"):
_, fm, body = content.split("---", 2)
frontmatter = yaml.safe_load(fm)
else:
frontmatter = {}
body = content
# Extract attempts from body
attempts = []
for match in re.finditer(r'### Attempt (\d+): (.+?)\n\nResult: (.+?)(?=\n\n|$)', body, re.DOTALL):
attempts.append({
"attempt": match.group(2).strip(),
"why_failed": match.group(3).strip()
})
# Extract fix and reflection from body sections
fix_match = re.search(r'## The Fix\n\n(.+?)(?=\n## |$)', body, re.DOTALL)
reflection_match = re.search(r'## Reflection\n\n(.+?)(?=\n## |$)', body, re.DOTALL)
return {
"title": frontmatter.get("title", ""),
"date": frontmatter.get("date", ""),
"systems": frontmatter.get("systems", []),
"error": frontmatter.get("error", ""),
"attempts": attempts,
"fix": fix_match.group(1).strip() if fix_match else frontmatter.get("fix", ""),
"reflection": reflection_match.group(1).strip() if reflection_match else frontmatter.get("reflection", ""),
"cost": frontmatter.get("cost", {}),
"tags": frontmatter.get("tags", []),
"resolved": frontmatter.get("resolved", True),
"body": body,
}
def to_brief(slug: str) -> Dict:
"""
Convert incident to content brief format for pipeline.
Returns dict matching the brief schema used by build_struggle_first_prompt().
"""
incident_data = get_incident(slug)
if not incident_data:
raise ValueError(f"Incident not found: {slug}")
return {
"struggle_angle": incident_data["title"],
"origin_story": f"I was working on {', '.join(incident_data['systems'])} when {incident_data['error']}.",
"attempts": incident_data["attempts"],
"the_moment": incident_data.get("reflection", "").split(".")[0] + ".", # First sentence
"the_fix": incident_data.get("fix", ""),
"reflection": incident_data.get("reflection", ""),
"target_length": 1200,
"systems": incident_data["systems"],
"error": incident_data["error"],
"date": incident_data["date"],
}
CLI entry point
if name == "main":
import sys
if len(sys.argv) < 2:
print("Usage: python incidents.py [list|get <slug>|to-brief <slug>]")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "list":
incidents = list_incidents()
for inc in incidents[:10]:
status = "✓" if inc["resolved"] else "○"
print(f"{status} {inc['date'][:10]} | {inc['title']} ({', '.join(inc['systems'])})")
elif cmd == "get" and len(sys.argv) > 2:
data = get_incident(sys.argv[2])
if data:
print(yaml.dump(data, default_flow_style=False))
else:
print(f"Not found: {sys.argv[2]}")
elif cmd == "to-brief" and len(sys.argv) > 2:
brief = to_brief(sys.argv[2])
print(yaml.dump(brief, default_flow_style=False))
else:
print("Unknown command")
sys.exit(1)