"""UI router - HTMX fragments returning HTML (not JSON).
Per CONTRACT.md: All /ui/* endpoints return HTMLResponse using Jinja2 templates.
No inline HTML generation allowed.
"""
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import json
# Local templates for hoffdesk-api dashboard
DASHBOARD_TEMPLATES_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/dashboard/templates")
# Initialize Jinja2 templates
templates = Jinja2Templates(directory=str(DASHBOARD_TEMPLATES_DIR))
# Path to test events file
TEST_EVENTS_FILE = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/data/test_events.json")
router = APIRouter(prefix="/ui", tags=["ui"])
async def _get_weather():
"""Fetch weather data."""
try:
import httpx
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get("https://wttr.in/Green+Bay,WI?format=j1")
if response.status_code == 200:
data = response.json()
current = data.get("current_condition", [{}])[0]
forecast = []
for hour in data.get("weather", [{}])[0].get("hourly", [])[:12]:
forecast.append({
"hour": f"{hour.get('time', '0000')[:2]}:00",
"temperature_f": int(hour.get("tempF", 0)),
"condition": hour.get("weatherDesc", [{}])[0].get("value", ""),
"precipitation_chance_pct": int(hour.get("chanceofrain", 0))
})
return {
"status": "ok",
"current": {
"temperature_f": int(current.get("temp_F", 0)),
"feels_like_f": int(current.get("FeelsLikeF", 0)),
"condition": current.get("weatherDesc", [{}])[0].get("value", ""),
"humidity_pct": int(current.get("humidity", 0)),
"wind_mph": int(current.get("windspeedMiles", 0))
},
"forecast": forecast
}
except Exception:
pass
return {
"status": "fallback",
"current": {
"temperature_f": 52,
"feels_like_f": 49,
"condition": "Data unavailable",
"humidity_pct": 62,
"wind_mph": 10
},
"forecast": []
}
async def _get_calendar_today():
"""Fetch today's events from Radicale."""
try:
import caldav
import os
# Support both CALDAV_* and RADICALE_* env vars
host = os.getenv('RADICALE_HOST') or os.getenv('CALDAV_URL', '127.0.0.1')
# Extract host from URL if needed
if host.startswith('http://'):
host = host[7:] # strip http://
if ':' in host:
host = host.split(':')[0]
port = os.getenv('RADICALE_PORT') or '5232'
user = os.getenv('RADICALE_USER') or os.getenv('CALDAV_USER', 'assistant')
password = os.getenv('RADICALE_PASS') or os.getenv('CALDAV_PASSWORD', 'family-assistant-2026')
client = caldav.DAVClient(
url=f"http://{host}:{port}/",
username=user,
password=password
)
principal = client.principal()
calendars = principal.calendars()
# Get family calendar
family_cal = None
for cal in calendars:
if 'family' in cal.name.lower():
family_cal = cal
break
if not family_cal:
family_cal = calendars[0] if calendars else None
if family_cal:
# Get events for today
from datetime import date, timedelta
today = date.today()
tomorrow = today + timedelta(days=1)
events_data = family_cal.date_search(today, tomorrow)
events = []
for event in events_data:
vevent = event.instance.vevent
events.append({
"uid": str(vevent.uid.value) if hasattr(vevent, 'uid') else "",
"summary": str(vevent.summary.value) if hasattr(vevent, 'summary') else "",
"start": vevent.dtstart.value.isoformat() if hasattr(vevent, 'dtstart') else "",
"end": vevent.dtend.value.isoformat() if hasattr(vevent, 'dtend') else "",
"is_all_day": not isinstance(vevent.dtstart.value, datetime) if hasattr(vevent, 'dtstart') else False,
"location": str(vevent.location.value) if hasattr(vevent, 'location') else "",
"attendees": []
})
return {"status": "ok", "events": events}
return {"status": "error", "message": "No calendar found", "events": []}
except Exception as e:
return {"status": "error", "message": str(e), "events": []}
async def _get_system_health():
"""Check system health."""
import os
import psutil
components = {}
# Check OpenClaw gateway
try:
import httpx
async with httpx.AsyncClient() as client:
start = datetime.now()
response = await client.get("http://127.0.0.1:18789/health", timeout=2.0)
latency = (datetime.now() - start).total_seconds() * 1000
components["openclaw_gateway"] = {
"status": "ok" if response.status_code == 200 else "degraded",
"latency_ms": round(latency, 1)
}
except:
components["openclaw_gateway"] = {"status": "error", "latency_ms": 0}
# Check Radicale
try:
start = datetime.now()
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((os.getenv('RADICALE_HOST', '127.0.0.1'), int(os.getenv('RADICALE_PORT', '5232'))))
latency = (datetime.now() - start).total_seconds() * 1000
components["radicale_caldav"] = {
"status": "ok" if result == 0 else "error",
"latency_ms": round(latency, 1)
}
sock.close()
except:
components["radicale_caldav"] = {"status": "error", "latency_ms": 0}
# Check Ollama
try:
start = datetime.now()
import httpx
async with httpx.AsyncClient() as client:
response = await client.get("http://100.104.147.116:11434/api/tags", timeout=3.0)
latency = (datetime.now() - start).total_seconds() * 1000
components["ollama_local"] = {
"status": "ok" if response.status_code == 200 else "error",
"latency_ms": round(latency, 1)
}
except:
components["ollama_local"] = {"status": "error", "latency_ms": 0}
# Check Cloudflare tunnel
try:
start = datetime.now()
import subprocess
result = subprocess.run(['systemctl', 'is-active', 'cloudflared'], capture_output=True, text=True)
latency = (datetime.now() - start).total_seconds() * 1000
components["cloudflare_tunnel"] = {
"status": "ok" if result.returncode == 0 else "error",
"latency_ms": round(latency, 1)
}
except:
components["cloudflare_tunnel"] = {"status": "unknown", "latency_ms": 0}
# Check IMAP Proxy
try:
import httpx
async with httpx.AsyncClient() as client:
start = datetime.now()
response = await client.get("http://127.0.0.1:8000/imap/status", timeout=3.0)
latency = (datetime.now() - start).total_seconds() * 1000
if response.status_code == 200:
imap_data = response.json()
imap_status = "ok" if imap_data.get("status") == "connected" else "degraded" if imap_data.get("status") == "not_configured" else "error"
else:
imap_status = "error"
components["imap_proxy"] = {
"status": imap_status,
"latency_ms": round(latency, 1)
}
except:
components["imap_proxy"] = {"status": "not_configured", "latency_ms": 0}
# Calculate overall status
overall = "healthy"
for comp in components.values():
if comp["status"] == "error":
overall = "degraded"
break
# Disk usage
disk = psutil.disk_usage('/')
return {
"overall_status": overall,
"components": components,
"disk_usage": {
"total_gb": round(disk.total / (1024**3), 1),
"used_gb": round(disk.used / (1024**3), 1),
"usage_pct": round(disk.percent, 1)
}
}
def _load_test_events():
"""Load test events from JSON file."""
if TEST_EVENTS_FILE.exists():
with open(TEST_EVENTS_FILE, 'r') as f:
return json.load(f)
return []
def _get_day_label(date_str: str) -> str:
"""Get human-friendly day label for a date."""
if not date_str:
return ""
try:
from datetime import date
ev_date = datetime.fromisoformat(date_str.replace("Z", "+00:00")).date()
today = date.today()
if ev_date == today:
return "Today"
elif ev_date == today + timedelta(days=1):
return "Tomorrow"
else:
return ev_date.strftime("%a %b %d")
except Exception:
return date_str[:10] if date_str else ""
def _get_week_start(date_obj=None) -> datetime:
"""Get the Monday of the week containing the given date (or today)."""
if date_obj is None:
date_obj = datetime.now()
days_since_monday = date_obj.weekday()
week_start = date_obj - timedelta(days=days_since_monday)
return week_start.replace(hour=0, minute=0, second=0, microsecond=0)
@router.get("/calendar-card", response_class=HTMLResponse)
async def ui_calendar_card(request: Request):
"""Return calendar card HTML fragment.
Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template.
"""
calendar_data = await _get_calendar_today()
return templates.TemplateResponse(
request,
"dashboard/components/calendar_card.html.j2",
{"calendar": calendar_data}
)
@router.get("/weather-card", response_class=HTMLResponse)
async def ui_weather_card(request: Request):
"""Return weather card HTML fragment.
Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template.
"""
weather_data = await _get_weather()
return templates.TemplateResponse(
request,
"dashboard/components/weather_card.html.j2",
{"weather": weather_data}
)
@router.get("/health-card", response_class=HTMLResponse)
async def ui_health_card(request: Request):
"""Return health card HTML fragment.
Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template.
"""
health_data = await _get_system_health()
return templates.TemplateResponse(
request,
"dashboard/components/health_card.html.j2",
{"health": health_data}
)
@router.get("/system-status-bar", response_class=HTMLResponse)
async def ui_system_status_bar(request: Request):
"""Return compact system status bar HTML fragment.
Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template.
Shows status dots + labels in a horizontal row, with thin disk usage bar.
"""
health_data = await _get_system_health()
return templates.TemplateResponse(
request,
"dashboard/components/system_status_bar.html.j2",
{"health": health_data}
)
def _format_event_for_list(event):
"""Format event for list view."""
events_data = _load_test_events()
# Calculate conflicts
conflicts = []
start = event.get("start", "")
family_members = event.get("family_members", [])
if event.get("status") in ("needs_confirmation", "pending"):
for other in events_data:
if other.get("id") != event.get("id") and other.get("status") != "declined":
other_date = other.get("start", "")
other_members = other.get("family_members", [])
if other_date and other_date == start:
if family_members and other_members:
shared = set(family_members) & set(other_members)
if shared:
conflicts.append({
"id": other.get("id", ""),
"title": other.get("title", "Untitled"),
"severity": "person",
"shared_members": list(shared)
})
else:
conflicts.append({
"id": other.get("id", ""),
"title": other.get("title", "Untitled"),
"severity": "same_day",
})
else:
conflicts.append({
"id": other.get("id", ""),
"title": other.get("title", "Untitled"),
"severity": "same_day",
})
return {
"id": event.get("id", ""),
"title": event.get("title", "Untitled"),
"start": start,
"day_label": _get_day_label(start),
"status": event.get("status", "unknown"),
"event_type": event.get("event_type", "calendar"),
"location": event.get("location", ""),
"family_members": family_members,
"conflicts": conflicts,
"time": event.get("time", ""),
"counter_proposed_date": event.get("counter_proposed_date", ""),
"counter_proposed_time": event.get("counter_proposed_time", ""),
}
@router.get("/events-list", response_class=HTMLResponse)
async def ui_events_list(request: Request):
"""Return events list HTML fragment.
Per CONTRACT.md: Moved from /api/events-list to /ui/events-list.
Returns HTMLResponse using Jinja2 template.
"""
# Load and filter events
events = _load_test_events()
filtered_events = []
for event in events:
if event.get("status") != "declined":
filtered_events.append(_format_event_for_list(event))
# Sort by start date
filtered_events.sort(key=lambda e: e.get("start", ""))
return templates.TemplateResponse(
request,
"dashboard/components/events_list.html.j2",
{"events": filtered_events}
)
@router.get("/events-week", response_class=HTMLResponse)
async def ui_events_week(request: Request, start: Optional[str] = None):
"""Return week view HTML fragment.
Per CONTRACT.md: Moved from /api/events-week to /ui/events-week.
Returns HTMLResponse using Jinja2 template.
"""
# Determine week start date
if start:
try:
week_start = datetime.strptime(start, "%Y-%m-%d")
week_start = _get_week_start(week_start)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid start date format. Use YYYY-MM-DD")
else:
week_start = _get_week_start()
week_end = week_start + timedelta(days=6)
# Load events
events = _load_test_events()
# Build days array
today = datetime.now().date()
day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
days = []
for i in range(7):
day_date = week_start + timedelta(days=i)
day_date_str = day_date.strftime("%Y-%m-%d")
# Get events for this day
day_events = []
for event in events:
if event.get("status") == "declined":
continue
event_date = event.get("start", "")
if event_date == day_date_str:
family_members = event.get("family_members", [])
status = event.get("status", "unknown")
status_class = status
members_class = ""
if family_members:
if len(family_members) > 2:
members_class = "members-family"
else:
members_class = f"members-{family_members[0].lower()}"
# Category detection
category = "appointment"
title_lower = event.get("title", "").lower()
if "practice" in title_lower or "baseball" in title_lower or "dance" in title_lower or "piano" in title_lower:
category = "practice"
elif "party" in title_lower or "birthday" in title_lower:
category = "party"
elif "trip" in title_lower or "travel" in title_lower:
category = "travel"
day_events.append({
"slug": event.get("id", ""),
"title": event.get("title", "Untitled"),
"time": event.get("time", ""),
"duration_minutes": event.get("duration_minutes", 60),
"family_members": family_members,
"status": status,
"status_class": status_class,
"members_class": members_class,
"category": category,
"location": event.get("location", ""),
})
# Sort events by time
day_events.sort(key=lambda e: e.get("time") or "99:99")
days.append({
"date": day_date_str,
"day_name": day_names[i],
"day_number": day_date.day,
"month": month_names[day_date.month - 1],
"is_today": day_date.date() == today,
"events": day_events
})
# Calculate prev/next week dates
prev_week_date = (week_start - timedelta(days=7)).strftime("%Y-%m-%d")
next_week_date = (week_start + timedelta(days=7)).strftime("%Y-%m-%d")
# Format week title
week_title = f"{month_names[week_start.month - 1]} {week_start.day} - {month_names[week_end.month - 1]} {week_end.day}"
return templates.TemplateResponse(
request,
"dashboard/components/events_week.html.j2",
{
"days": days,
"week_title": week_title,
"week_start": week_start.strftime("%Y-%m-%d"),
"week_end": week_end.strftime("%Y-%m-%d"),
"prev_week": prev_week_date,
"next_week": next_week_date
}
)