📄 ui_router.py 19,173 bytes Apr 30, 2026 📋 Raw

"""UI router - HTMX fragments returning HTML (not JSON).

Per CONTRACT.md: All /ui/* endpoints return HTMLResponse using Jinja2 templates.
No inline HTML generation allowed.
"""

from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import json

Local templates for hoffdesk-api dashboard

DASHBOARD_TEMPLATES_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/dashboard/templates")

Initialize Jinja2 templates

templates = Jinja2Templates(directory=str(DASHBOARD_TEMPLATES_DIR))

Path to test events file

TEST_EVENTS_FILE = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/data/test_events.json")

router = APIRouter(prefix="/ui", tags=["ui"])

async def _get_weather():
"""Fetch weather data."""
try:
import httpx

    async with httpx.AsyncClient(timeout=10.0) as client:
        response = await client.get("https://wttr.in/Green+Bay,WI?format=j1")
        if response.status_code == 200:
            data = response.json()
            current = data.get("current_condition", [{}])[0]

            forecast = []
            for hour in data.get("weather", [{}])[0].get("hourly", [])[:12]:
                forecast.append({
                    "hour": f"{hour.get('time', '0000')[:2]}:00",
                    "temperature_f": int(hour.get("tempF", 0)),
                    "condition": hour.get("weatherDesc", [{}])[0].get("value", ""),
                    "precipitation_chance_pct": int(hour.get("chanceofrain", 0))
                })

            return {
                "status": "ok",
                "current": {
                    "temperature_f": int(current.get("temp_F", 0)),
                    "feels_like_f": int(current.get("FeelsLikeF", 0)),
                    "condition": current.get("weatherDesc", [{}])[0].get("value", ""),
                    "humidity_pct": int(current.get("humidity", 0)),
                    "wind_mph": int(current.get("windspeedMiles", 0))
                },
                "forecast": forecast
            }
except Exception:
    pass

return {
    "status": "fallback",
    "current": {
        "temperature_f": 52,
        "feels_like_f": 49,
        "condition": "Data unavailable",
        "humidity_pct": 62,
        "wind_mph": 10
    },
    "forecast": []
}

async def _get_calendar_today():
"""Fetch today's events from Radicale."""
try:
import caldav
import os

    # Support both CALDAV_* and RADICALE_* env vars
    host = os.getenv('RADICALE_HOST') or os.getenv('CALDAV_URL', '127.0.0.1')
    # Extract host from URL if needed
    if host.startswith('http://'):
        host = host[7:]  # strip http://
    if ':' in host:
        host = host.split(':')[0]

    port = os.getenv('RADICALE_PORT') or '5232'
    user = os.getenv('RADICALE_USER') or os.getenv('CALDAV_USER', 'assistant')
    password = os.getenv('RADICALE_PASS') or os.getenv('CALDAV_PASSWORD', 'family-assistant-2026')

    client = caldav.DAVClient(
        url=f"http://{host}:{port}/",
        username=user,
        password=password
    )

    principal = client.principal()
    calendars = principal.calendars()

    # Get family calendar
    family_cal = None
    for cal in calendars:
        if 'family' in cal.name.lower():
            family_cal = cal
            break

    if not family_cal:
        family_cal = calendars[0] if calendars else None

    if family_cal:
        # Get events for today
        from datetime import date, timedelta
        today = date.today()
        tomorrow = today + timedelta(days=1)

        events_data = family_cal.date_search(today, tomorrow)
        events = []

        for event in events_data:
            vevent = event.instance.vevent
            events.append({
                "uid": str(vevent.uid.value) if hasattr(vevent, 'uid') else "",
                "summary": str(vevent.summary.value) if hasattr(vevent, 'summary') else "",
                "start": vevent.dtstart.value.isoformat() if hasattr(vevent, 'dtstart') else "",
                "end": vevent.dtend.value.isoformat() if hasattr(vevent, 'dtend') else "",
                "is_all_day": not isinstance(vevent.dtstart.value, datetime) if hasattr(vevent, 'dtstart') else False,
                "location": str(vevent.location.value) if hasattr(vevent, 'location') else "",
                "attendees": []
            })

        return {"status": "ok", "events": events}

    return {"status": "error", "message": "No calendar found", "events": []}

except Exception as e:
    return {"status": "error", "message": str(e), "events": []}

async def _get_system_health():
"""Check system health."""
import os
import psutil

components = {}

# Check OpenClaw gateway
try:
    import httpx
    async with httpx.AsyncClient() as client:
        start = datetime.now()
        response = await client.get("http://127.0.0.1:18789/health", timeout=2.0)
        latency = (datetime.now() - start).total_seconds() * 1000
        components["openclaw_gateway"] = {
            "status": "ok" if response.status_code == 200 else "degraded",
            "latency_ms": round(latency, 1)
        }
except:
    components["openclaw_gateway"] = {"status": "error", "latency_ms": 0}

# Check Radicale
try:
    start = datetime.now()
    import socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(2)
    result = sock.connect_ex((os.getenv('RADICALE_HOST', '127.0.0.1'), int(os.getenv('RADICALE_PORT', '5232'))))
    latency = (datetime.now() - start).total_seconds() * 1000
    components["radicale_caldav"] = {
        "status": "ok" if result == 0 else "error",
        "latency_ms": round(latency, 1)
    }
    sock.close()
except:
    components["radicale_caldav"] = {"status": "error", "latency_ms": 0}

# Check Ollama
try:
    start = datetime.now()
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get("http://100.104.147.116:11434/api/tags", timeout=3.0)
        latency = (datetime.now() - start).total_seconds() * 1000
        components["ollama_local"] = {
            "status": "ok" if response.status_code == 200 else "error",
            "latency_ms": round(latency, 1)
        }
except:
    components["ollama_local"] = {"status": "error", "latency_ms": 0}

# Check Cloudflare tunnel
try:
    start = datetime.now()
    import subprocess
    result = subprocess.run(['systemctl', 'is-active', 'cloudflared'], capture_output=True, text=True)
    latency = (datetime.now() - start).total_seconds() * 1000
    components["cloudflare_tunnel"] = {
        "status": "ok" if result.returncode == 0 else "error",
        "latency_ms": round(latency, 1)
    }
except:
    components["cloudflare_tunnel"] = {"status": "unknown", "latency_ms": 0}

# Check IMAP Proxy
try:
    import httpx
    async with httpx.AsyncClient() as client:
        start = datetime.now()
        response = await client.get("http://127.0.0.1:8000/imap/status", timeout=3.0)
        latency = (datetime.now() - start).total_seconds() * 1000
        if response.status_code == 200:
            imap_data = response.json()
            imap_status = "ok" if imap_data.get("status") == "connected" else "degraded" if imap_data.get("status") == "not_configured" else "error"
        else:
            imap_status = "error"
        components["imap_proxy"] = {
            "status": imap_status,
            "latency_ms": round(latency, 1)
        }
except:
    components["imap_proxy"] = {"status": "not_configured", "latency_ms": 0}

# Calculate overall status
overall = "healthy"
for comp in components.values():
    if comp["status"] == "error":
        overall = "degraded"
        break

# Disk usage
disk = psutil.disk_usage('/')

return {
    "overall_status": overall,
    "components": components,
    "disk_usage": {
        "total_gb": round(disk.total / (1024**3), 1),
        "used_gb": round(disk.used / (1024**3), 1),
        "usage_pct": round(disk.percent, 1)
    }
}

def _load_test_events():
"""Load test events from JSON file."""
if TEST_EVENTS_FILE.exists():
with open(TEST_EVENTS_FILE, 'r') as f:
return json.load(f)
return []

def _get_day_label(date_str: str) -> str:
"""Get human-friendly day label for a date."""
if not date_str:
return ""
try:
from datetime import date
ev_date = datetime.fromisoformat(date_str.replace("Z", "+00:00")).date()
today = date.today()
if ev_date == today:
return "Today"
elif ev_date == today + timedelta(days=1):
return "Tomorrow"
else:
return ev_date.strftime("%a %b %d")
except Exception:
return date_str[:10] if date_str else ""

def _get_week_start(date_obj=None) -> datetime:
"""Get the Monday of the week containing the given date (or today)."""
if date_obj is None:
date_obj = datetime.now()
days_since_monday = date_obj.weekday()
week_start = date_obj - timedelta(days=days_since_monday)
return week_start.replace(hour=0, minute=0, second=0, microsecond=0)

@router.get("/calendar-card", response_class=HTMLResponse)
async def ui_calendar_card(request: Request):
"""Return calendar card HTML fragment.

Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template.
"""
calendar_data = await _get_calendar_today()

return templates.TemplateResponse(
    request,
    "dashboard/components/calendar_card.html.j2",
    {"calendar": calendar_data}
)

@router.get("/weather-card", response_class=HTMLResponse)
async def ui_weather_card(request: Request):
"""Return weather card HTML fragment.

Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template.
"""
weather_data = await _get_weather()

return templates.TemplateResponse(
    request,
    "dashboard/components/weather_card.html.j2",
    {"weather": weather_data}
)

@router.get("/health-card", response_class=HTMLResponse)
async def ui_health_card(request: Request):
"""Return health card HTML fragment.

Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template.
"""
health_data = await _get_system_health()

return templates.TemplateResponse(
    request,
    "dashboard/components/health_card.html.j2",
    {"health": health_data}
)

@router.get("/system-status-bar", response_class=HTMLResponse)
async def ui_system_status_bar(request: Request):
"""Return compact system status bar HTML fragment.

Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template.
Shows status dots + labels in a horizontal row, with thin disk usage bar.
"""
health_data = await _get_system_health()

return templates.TemplateResponse(
    request,
    "dashboard/components/system_status_bar.html.j2",
    {"health": health_data}
)

def _format_event_for_list(event):
"""Format event for list view."""
events_data = _load_test_events()

# Calculate conflicts
conflicts = []
start = event.get("start", "")
family_members = event.get("family_members", [])

if event.get("status") in ("needs_confirmation", "pending"):
    for other in events_data:
        if other.get("id") != event.get("id") and other.get("status") != "declined":
            other_date = other.get("start", "")
            other_members = other.get("family_members", [])
            if other_date and other_date == start:
                if family_members and other_members:
                    shared = set(family_members) & set(other_members)
                    if shared:
                        conflicts.append({
                            "id": other.get("id", ""),
                            "title": other.get("title", "Untitled"),
                            "severity": "person",
                            "shared_members": list(shared)
                        })
                    else:
                        conflicts.append({
                            "id": other.get("id", ""),
                            "title": other.get("title", "Untitled"),
                            "severity": "same_day",
                        })
                else:
                    conflicts.append({
                        "id": other.get("id", ""),
                        "title": other.get("title", "Untitled"),
                        "severity": "same_day",
                    })

return {
    "id": event.get("id", ""),
    "title": event.get("title", "Untitled"),
    "start": start,
    "day_label": _get_day_label(start),
    "status": event.get("status", "unknown"),
    "event_type": event.get("event_type", "calendar"),
    "location": event.get("location", ""),
    "family_members": family_members,
    "conflicts": conflicts,
    "time": event.get("time", ""),
    "counter_proposed_date": event.get("counter_proposed_date", ""),
    "counter_proposed_time": event.get("counter_proposed_time", ""),
}

@router.get("/events-list", response_class=HTMLResponse)
async def ui_events_list(request: Request):
"""Return events list HTML fragment.

Per CONTRACT.md: Moved from /api/events-list to /ui/events-list.
Returns HTMLResponse using Jinja2 template.
"""
# Load and filter events
events = _load_test_events()
filtered_events = []

for event in events:
    if event.get("status") != "declined":
        filtered_events.append(_format_event_for_list(event))

# Sort by start date
filtered_events.sort(key=lambda e: e.get("start", ""))

return templates.TemplateResponse(
    request,
    "dashboard/components/events_list.html.j2",
    {"events": filtered_events}
)

@router.get("/events-week", response_class=HTMLResponse)
async def ui_events_week(request: Request, start: Optional[str] = None):
"""Return week view HTML fragment.

Per CONTRACT.md: Moved from /api/events-week to /ui/events-week.
Returns HTMLResponse using Jinja2 template.
"""
# Determine week start date
if start:
    try:
        week_start = datetime.strptime(start, "%Y-%m-%d")
        week_start = _get_week_start(week_start)
    except ValueError:
        raise HTTPException(status_code=400, detail="Invalid start date format. Use YYYY-MM-DD")
else:
    week_start = _get_week_start()

week_end = week_start + timedelta(days=6)

# Load events
events = _load_test_events()

# Build days array
today = datetime.now().date()
day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", 
               "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]

days = []
for i in range(7):
    day_date = week_start + timedelta(days=i)
    day_date_str = day_date.strftime("%Y-%m-%d")

    # Get events for this day
    day_events = []
    for event in events:
        if event.get("status") == "declined":
            continue
        event_date = event.get("start", "")
        if event_date == day_date_str:
            family_members = event.get("family_members", [])
            status = event.get("status", "unknown")

            status_class = status
            members_class = ""
            if family_members:
                if len(family_members) > 2:
                    members_class = "members-family"
                else:
                    members_class = f"members-{family_members[0].lower()}"

            # Category detection
            category = "appointment"
            title_lower = event.get("title", "").lower()
            if "practice" in title_lower or "baseball" in title_lower or "dance" in title_lower or "piano" in title_lower:
                category = "practice"
            elif "party" in title_lower or "birthday" in title_lower:
                category = "party"
            elif "trip" in title_lower or "travel" in title_lower:
                category = "travel"

            day_events.append({
                "slug": event.get("id", ""),
                "title": event.get("title", "Untitled"),
                "time": event.get("time", ""),
                "duration_minutes": event.get("duration_minutes", 60),
                "family_members": family_members,
                "status": status,
                "status_class": status_class,
                "members_class": members_class,
                "category": category,
                "location": event.get("location", ""),
            })

    # Sort events by time
    day_events.sort(key=lambda e: e.get("time") or "99:99")

    days.append({
        "date": day_date_str,
        "day_name": day_names[i],
        "day_number": day_date.day,
        "month": month_names[day_date.month - 1],
        "is_today": day_date.date() == today,
        "events": day_events
    })

# Calculate prev/next week dates
prev_week_date = (week_start - timedelta(days=7)).strftime("%Y-%m-%d")
next_week_date = (week_start + timedelta(days=7)).strftime("%Y-%m-%d")

# Format week title
week_title = f"{month_names[week_start.month - 1]} {week_start.day} - {month_names[week_end.month - 1]} {week_end.day}"

return templates.TemplateResponse(
    request,
    "dashboard/components/events_week.html.j2",
    {
        "days": days,
        "week_title": week_title,
        "week_start": week_start.strftime("%Y-%m-%d"),
        "week_end": week_end.strftime("%Y-%m-%d"),
        "prev_week": prev_week_date,
        "next_week": next_week_date
    }
)