"""UI router - HTMX fragments returning HTML (not JSON). Per CONTRACT.md: All /ui/* endpoints return HTMLResponse using Jinja2 templates. No inline HTML generation allowed. """ from fastapi import APIRouter, Request, HTTPException from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from datetime import datetime, timedelta from pathlib import Path from typing import Optional import json # Local templates for hoffdesk-api dashboard DASHBOARD_TEMPLATES_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/dashboard/templates") # Initialize Jinja2 templates templates = Jinja2Templates(directory=str(DASHBOARD_TEMPLATES_DIR)) # Path to test events file TEST_EVENTS_FILE = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/data/test_events.json") router = APIRouter(prefix="/ui", tags=["ui"]) async def _get_weather(): """Fetch weather data.""" try: import httpx async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get("https://wttr.in/Green+Bay,WI?format=j1") if response.status_code == 200: data = response.json() current = data.get("current_condition", [{}])[0] forecast = [] for hour in data.get("weather", [{}])[0].get("hourly", [])[:12]: forecast.append({ "hour": f"{hour.get('time', '0000')[:2]}:00", "temperature_f": int(hour.get("tempF", 0)), "condition": hour.get("weatherDesc", [{}])[0].get("value", ""), "precipitation_chance_pct": int(hour.get("chanceofrain", 0)) }) return { "status": "ok", "current": { "temperature_f": int(current.get("temp_F", 0)), "feels_like_f": int(current.get("FeelsLikeF", 0)), "condition": current.get("weatherDesc", [{}])[0].get("value", ""), "humidity_pct": int(current.get("humidity", 0)), "wind_mph": int(current.get("windspeedMiles", 0)) }, "forecast": forecast } except Exception: pass return { "status": "fallback", "current": { "temperature_f": 52, "feels_like_f": 49, "condition": "Data unavailable", "humidity_pct": 62, "wind_mph": 10 }, "forecast": [] } async def _get_calendar_today(): """Fetch today's events from Radicale.""" try: import caldav import os # Support both CALDAV_* and RADICALE_* env vars host = os.getenv('RADICALE_HOST') or os.getenv('CALDAV_URL', '127.0.0.1') # Extract host from URL if needed if host.startswith('http://'): host = host[7:] # strip http:// if ':' in host: host = host.split(':')[0] port = os.getenv('RADICALE_PORT') or '5232' user = os.getenv('RADICALE_USER') or os.getenv('CALDAV_USER', 'assistant') password = os.getenv('RADICALE_PASS') or os.getenv('CALDAV_PASSWORD', 'family-assistant-2026') client = caldav.DAVClient( url=f"http://{host}:{port}/", username=user, password=password ) principal = client.principal() calendars = principal.calendars() # Get family calendar family_cal = None for cal in calendars: if 'family' in cal.name.lower(): family_cal = cal break if not family_cal: family_cal = calendars[0] if calendars else None if family_cal: # Get events for today from datetime import date, timedelta today = date.today() tomorrow = today + timedelta(days=1) events_data = family_cal.date_search(today, tomorrow) events = [] for event in events_data: vevent = event.instance.vevent events.append({ "uid": str(vevent.uid.value) if hasattr(vevent, 'uid') else "", "summary": str(vevent.summary.value) if hasattr(vevent, 'summary') else "", "start": vevent.dtstart.value.isoformat() if hasattr(vevent, 'dtstart') else "", "end": vevent.dtend.value.isoformat() if hasattr(vevent, 'dtend') else "", "is_all_day": not isinstance(vevent.dtstart.value, datetime) if hasattr(vevent, 'dtstart') else False, "location": str(vevent.location.value) if hasattr(vevent, 'location') else "", "attendees": [] }) return {"status": "ok", "events": events} return {"status": "error", "message": "No calendar found", "events": []} except Exception as e: return {"status": "error", "message": str(e), "events": []} async def _get_system_health(): """Check system health.""" import os import psutil components = {} # Check OpenClaw gateway try: import httpx async with httpx.AsyncClient() as client: start = datetime.now() response = await client.get("http://127.0.0.1:18789/health", timeout=2.0) latency = (datetime.now() - start).total_seconds() * 1000 components["openclaw_gateway"] = { "status": "ok" if response.status_code == 200 else "degraded", "latency_ms": round(latency, 1) } except: components["openclaw_gateway"] = {"status": "error", "latency_ms": 0} # Check Radicale try: start = datetime.now() import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) result = sock.connect_ex((os.getenv('RADICALE_HOST', '127.0.0.1'), int(os.getenv('RADICALE_PORT', '5232')))) latency = (datetime.now() - start).total_seconds() * 1000 components["radicale_caldav"] = { "status": "ok" if result == 0 else "error", "latency_ms": round(latency, 1) } sock.close() except: components["radicale_caldav"] = {"status": "error", "latency_ms": 0} # Check Ollama try: start = datetime.now() import httpx async with httpx.AsyncClient() as client: response = await client.get("http://100.104.147.116:11434/api/tags", timeout=3.0) latency = (datetime.now() - start).total_seconds() * 1000 components["ollama_local"] = { "status": "ok" if response.status_code == 200 else "error", "latency_ms": round(latency, 1) } except: components["ollama_local"] = {"status": "error", "latency_ms": 0} # Check Cloudflare tunnel try: start = datetime.now() import subprocess result = subprocess.run(['systemctl', 'is-active', 'cloudflared'], capture_output=True, text=True) latency = (datetime.now() - start).total_seconds() * 1000 components["cloudflare_tunnel"] = { "status": "ok" if result.returncode == 0 else "error", "latency_ms": round(latency, 1) } except: components["cloudflare_tunnel"] = {"status": "unknown", "latency_ms": 0} # Check IMAP Proxy try: import httpx async with httpx.AsyncClient() as client: start = datetime.now() response = await client.get("http://127.0.0.1:8000/imap/status", timeout=3.0) latency = (datetime.now() - start).total_seconds() * 1000 if response.status_code == 200: imap_data = response.json() imap_status = "ok" if imap_data.get("status") == "connected" else "degraded" if imap_data.get("status") == "not_configured" else "error" else: imap_status = "error" components["imap_proxy"] = { "status": imap_status, "latency_ms": round(latency, 1) } except: components["imap_proxy"] = {"status": "not_configured", "latency_ms": 0} # Calculate overall status overall = "healthy" for comp in components.values(): if comp["status"] == "error": overall = "degraded" break # Disk usage disk = psutil.disk_usage('/') return { "overall_status": overall, "components": components, "disk_usage": { "total_gb": round(disk.total / (1024**3), 1), "used_gb": round(disk.used / (1024**3), 1), "usage_pct": round(disk.percent, 1) } } def _load_test_events(): """Load test events from JSON file.""" if TEST_EVENTS_FILE.exists(): with open(TEST_EVENTS_FILE, 'r') as f: return json.load(f) return [] def _get_day_label(date_str: str) -> str: """Get human-friendly day label for a date.""" if not date_str: return "" try: from datetime import date ev_date = datetime.fromisoformat(date_str.replace("Z", "+00:00")).date() today = date.today() if ev_date == today: return "Today" elif ev_date == today + timedelta(days=1): return "Tomorrow" else: return ev_date.strftime("%a %b %d") except Exception: return date_str[:10] if date_str else "" def _get_week_start(date_obj=None) -> datetime: """Get the Monday of the week containing the given date (or today).""" if date_obj is None: date_obj = datetime.now() days_since_monday = date_obj.weekday() week_start = date_obj - timedelta(days=days_since_monday) return week_start.replace(hour=0, minute=0, second=0, microsecond=0) @router.get("/calendar-card", response_class=HTMLResponse) async def ui_calendar_card(request: Request): """Return calendar card HTML fragment. Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template. """ calendar_data = await _get_calendar_today() return templates.TemplateResponse( request, "dashboard/components/calendar_card.html.j2", {"calendar": calendar_data} ) @router.get("/weather-card", response_class=HTMLResponse) async def ui_weather_card(request: Request): """Return weather card HTML fragment. Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template. """ weather_data = await _get_weather() return templates.TemplateResponse( request, "dashboard/components/weather_card.html.j2", {"weather": weather_data} ) @router.get("/health-card", response_class=HTMLResponse) async def ui_health_card(request: Request): """Return health card HTML fragment. Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template. """ health_data = await _get_system_health() return templates.TemplateResponse( request, "dashboard/components/health_card.html.j2", {"health": health_data} ) @router.get("/system-status-bar", response_class=HTMLResponse) async def ui_system_status_bar(request: Request): """Return compact system status bar HTML fragment. Per CONTRACT.md: HTMX calls this endpoint, returns HTML using template. Shows status dots + labels in a horizontal row, with thin disk usage bar. """ health_data = await _get_system_health() return templates.TemplateResponse( request, "dashboard/components/system_status_bar.html.j2", {"health": health_data} ) def _format_event_for_list(event): """Format event for list view.""" events_data = _load_test_events() # Calculate conflicts conflicts = [] start = event.get("start", "") family_members = event.get("family_members", []) if event.get("status") in ("needs_confirmation", "pending"): for other in events_data: if other.get("id") != event.get("id") and other.get("status") != "declined": other_date = other.get("start", "") other_members = other.get("family_members", []) if other_date and other_date == start: if family_members and other_members: shared = set(family_members) & set(other_members) if shared: conflicts.append({ "id": other.get("id", ""), "title": other.get("title", "Untitled"), "severity": "person", "shared_members": list(shared) }) else: conflicts.append({ "id": other.get("id", ""), "title": other.get("title", "Untitled"), "severity": "same_day", }) else: conflicts.append({ "id": other.get("id", ""), "title": other.get("title", "Untitled"), "severity": "same_day", }) return { "id": event.get("id", ""), "title": event.get("title", "Untitled"), "start": start, "day_label": _get_day_label(start), "status": event.get("status", "unknown"), "event_type": event.get("event_type", "calendar"), "location": event.get("location", ""), "family_members": family_members, "conflicts": conflicts, "time": event.get("time", ""), "counter_proposed_date": event.get("counter_proposed_date", ""), "counter_proposed_time": event.get("counter_proposed_time", ""), } @router.get("/events-list", response_class=HTMLResponse) async def ui_events_list(request: Request): """Return events list HTML fragment. Per CONTRACT.md: Moved from /api/events-list to /ui/events-list. Returns HTMLResponse using Jinja2 template. """ # Load and filter events events = _load_test_events() filtered_events = [] for event in events: if event.get("status") != "declined": filtered_events.append(_format_event_for_list(event)) # Sort by start date filtered_events.sort(key=lambda e: e.get("start", "")) return templates.TemplateResponse( request, "dashboard/components/events_list.html.j2", {"events": filtered_events} ) @router.get("/events-week", response_class=HTMLResponse) async def ui_events_week(request: Request, start: Optional[str] = None): """Return week view HTML fragment. Per CONTRACT.md: Moved from /api/events-week to /ui/events-week. Returns HTMLResponse using Jinja2 template. """ # Determine week start date if start: try: week_start = datetime.strptime(start, "%Y-%m-%d") week_start = _get_week_start(week_start) except ValueError: raise HTTPException(status_code=400, detail="Invalid start date format. Use YYYY-MM-DD") else: week_start = _get_week_start() week_end = week_start + timedelta(days=6) # Load events events = _load_test_events() # Build days array today = datetime.now().date() day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] days = [] for i in range(7): day_date = week_start + timedelta(days=i) day_date_str = day_date.strftime("%Y-%m-%d") # Get events for this day day_events = [] for event in events: if event.get("status") == "declined": continue event_date = event.get("start", "") if event_date == day_date_str: family_members = event.get("family_members", []) status = event.get("status", "unknown") status_class = status members_class = "" if family_members: if len(family_members) > 2: members_class = "members-family" else: members_class = f"members-{family_members[0].lower()}" # Category detection category = "appointment" title_lower = event.get("title", "").lower() if "practice" in title_lower or "baseball" in title_lower or "dance" in title_lower or "piano" in title_lower: category = "practice" elif "party" in title_lower or "birthday" in title_lower: category = "party" elif "trip" in title_lower or "travel" in title_lower: category = "travel" day_events.append({ "slug": event.get("id", ""), "title": event.get("title", "Untitled"), "time": event.get("time", ""), "duration_minutes": event.get("duration_minutes", 60), "family_members": family_members, "status": status, "status_class": status_class, "members_class": members_class, "category": category, "location": event.get("location", ""), }) # Sort events by time day_events.sort(key=lambda e: e.get("time") or "99:99") days.append({ "date": day_date_str, "day_name": day_names[i], "day_number": day_date.day, "month": month_names[day_date.month - 1], "is_today": day_date.date() == today, "events": day_events }) # Calculate prev/next week dates prev_week_date = (week_start - timedelta(days=7)).strftime("%Y-%m-%d") next_week_date = (week_start + timedelta(days=7)).strftime("%Y-%m-%d") # Format week title week_title = f"{month_names[week_start.month - 1]} {week_start.day} - {month_names[week_end.month - 1]} {week_end.day}" return templates.TemplateResponse( request, "dashboard/components/events_week.html.j2", { "days": days, "week_title": week_title, "week_start": week_start.strftime("%Y-%m-%d"), "week_end": week_end.strftime("%Y-%m-%d"), "prev_week": prev_week_date, "next_week": next_week_date } )