"""Family dashboard router - serves the dashboard HTML and API endpoints. Per CONTRACT.md: All HTMX fragments moved to /ui/* namespace in ui_router.py. This file keeps JSON API endpoints and HTML page serving. """ from fastapi import APIRouter, Request, HTTPException from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from datetime import datetime, timedelta from pathlib import Path from pydantic import BaseModel from typing import Optional, List, Dict, Any import json from fastapi.templating import Jinja2Templates from shared.session_auth import require_auth, get_session, is_authenticated router = APIRouter(prefix="", tags=["dashboard"]) # Local templates for hoffdesk-api dashboard DASHBOARD_TEMPLATES_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/dashboard/templates") templates = Jinja2Templates(directory=str(DASHBOARD_TEMPLATES_DIR)) # Path to test events file TEST_EVENTS_FILE = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/data/test_events.json") # Pydantic models for request bodies class RescheduleRequest(BaseModel): new_date: str new_time: Optional[str] = None class CounterRequest(BaseModel): proposed_date: str proposed_time: Optional[str] = None # Static files for dashboard DASHBOARD_STATIC_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/dashboard/static") @router.get("/family/login/", response_class=HTMLResponse) async def family_login_page(request: Request): """Serve the family login page.""" # Already authenticated? Redirect to dashboard. if is_authenticated(request): return RedirectResponse(url="/", status_code=302) return templates.TemplateResponse( request, "family_login.html.j2", {} ) @router.get("/", response_class=HTMLResponse) async def dashboard_root(request: Request, view: str = None): """Serve family dashboard at root path (for family.hoffdesk.com). Parameters: view (str): Optional view mode - 'list' or 'week' """ if not is_authenticated(request): return RedirectResponse(url="/family/login/?redirect=/", status_code=302) # Route to appropriate view if view == "week": return await week_view_page(request) return await dashboard_page(request) @router.get("/dashboard/", response_class=HTMLResponse) async def dashboard_page(request: Request): """Serve the family dashboard HTML.""" if not is_authenticated(request): return RedirectResponse(url="/family/login/?redirect=/dashboard/", status_code=302) # Use Jinja2 template for the dashboard return templates.TemplateResponse( request, "dashboard.html.j2", {} ) @router.get("/week", response_class=HTMLResponse) async def week_view_page(request: Request): """Serve the week view HTML.""" if not is_authenticated(request): return RedirectResponse(url="/family/login/?redirect=/week", status_code=302) # Use Jinja2 template for week view return templates.TemplateResponse( request, "week_view.html.j2", {} ) @router.get("/api/today") async def api_today(request: Request): """Return today's data: calendar, weather, health. Public endpoint — consumed by HTMX polling on the dashboard. No auth required since data is non-sensitive (calendar, weather, health). """ # Get real calendar data from Radicale calendar_data = await _get_calendar_today() # Get weather data weather_data = await _get_weather() # Get system health health_data = await _get_system_health() # Get Event Graph data events_data = await _get_event_graph() return { "timestamp": datetime.now().isoformat(), "calendar": calendar_data, "weather": weather_data, "health": health_data, "events": events_data, "meta": { "cache_ttl_seconds": 60, "data_freshness": { "calendar_seconds_ago": 0, "weather_seconds_ago": 0 } } } @router.get("/api/events-dashboard") async def api_events_dashboard(request: Request): """Return Event Graph HTML for dashboard — upcoming events with action buttons. Public endpoint — consumed by HTMX polling on the dashboard. Returns rendered HTML instead of JSON so buttons work immediately. """ events_data = await _get_event_graph() # Load events.html template events_template_path = DASHBOARD_TEMPLATES_DIR / "events.html" if events_template_path.exists(): template = Template(events_template_path.read_text()) html_content = template.render(events=events_data) return HTMLResponse(content=html_content) # Fallback to JSON if template missing return { "timestamp": datetime.now().isoformat(), "events": events_data, "meta": { "source": "event-graph-api", "cache_ttl_seconds": 15 } } async def _get_calendar_today(): """Fetch today's events from Radicale.""" try: import caldav import os # Support both CALDAV_* and RADICALE_* env vars host = os.getenv('RADICALE_HOST') or os.getenv('CALDAV_URL', '127.0.0.1') # Extract host from URL if needed if host.startswith('http://'): host = host[7:] # strip http:// if ':' in host: host = host.split(':')[0] port = os.getenv('RADICALE_PORT') or '5232' user = os.getenv('RADICALE_USER') or os.getenv('CALDAV_USER', 'assistant') password = os.getenv('RADICALE_PASS') or os.getenv('CALDAV_PASSWORD', 'family-assistant-2026') client = caldav.DAVClient( url=f"http://{host}:{port}/", username=user, password=password ) principal = client.principal() calendars = principal.calendars() # Get family calendar family_cal = None for cal in calendars: if 'family' in cal.name.lower(): family_cal = cal break if not family_cal: family_cal = calendars[0] if calendars else None if family_cal: # Get events for today from datetime import date, timedelta today = date.today() tomorrow = today + timedelta(days=1) events_data = family_cal.date_search(today, tomorrow) events = [] for event in events_data: vevent = event.instance.vevent events.append({ "uid": str(vevent.uid.value) if hasattr(vevent, 'uid') else "", "summary": str(vevent.summary.value) if hasattr(vevent, 'summary') else "", "start": vevent.dtstart.value.isoformat() if hasattr(vevent, 'dtstart') else "", "end": vevent.dtend.value.isoformat() if hasattr(vevent, 'dtend') else "", "is_all_day": not isinstance(vevent.dtstart.value, datetime) if hasattr(vevent, 'dtstart') else False, "location": str(vevent.location.value) if hasattr(vevent, 'location') else "", "attendees": [] }) return {"status": "ok", "events": events} return {"status": "error", "message": "No calendar found", "events": []} except Exception as e: return {"status": "error", "message": str(e), "events": []} async def _get_weather(): """Fetch weather data.""" try: import httpx # Use wttr.in for free weather (no API key needed) async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get("https://wttr.in/Green+Bay,WI?format=j1") if response.status_code == 200: data = response.json() current = data.get("current_condition", [{}])[0] # Build forecast forecast = [] for hour in data.get("weather", [{}])[0].get("hourly", [])[:12]: forecast.append({ "hour": f"{hour.get('time', '0000')[:2]}:00", "temperature_f": int(hour.get("tempF", 0)), "condition": hour.get("weatherDesc", [{}])[0].get("value", ""), "precipitation_chance_pct": int(hour.get("chanceofrain", 0)) }) return { "status": "ok", "current": { "temperature_f": int(current.get("temp_F", 0)), "feels_like_f": int(current.get("FeelsLikeF", 0)), "condition": current.get("weatherDesc", [{}])[0].get("value", ""), "humidity_pct": int(current.get("humidity", 0)), "wind_mph": int(current.get("windspeedMiles", 0)) }, "forecast": forecast } except Exception as e: pass # Fallback data return { "status": "fallback", "current": { "temperature_f": 52, "feels_like_f": 49, "condition": "Data unavailable", "humidity_pct": 62, "wind_mph": 10 }, "forecast": [] } async def _get_system_health(): """Check system health.""" import os import psutil components = {} # Check OpenClaw gateway try: import httpx async with httpx.AsyncClient() as client: start = datetime.now() response = await client.get("http://127.0.0.1:18789/health", timeout=2.0) latency = (datetime.now() - start).total_seconds() * 1000 components["openclaw_gateway"] = { "status": "ok" if response.status_code == 200 else "degraded", "latency_ms": round(latency, 1) } except: components["openclaw_gateway"] = {"status": "error", "latency_ms": 0} # Check Radicale try: start = datetime.now() import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) result = sock.connect_ex((os.getenv('RADICALE_HOST', '127.0.0.1'), int(os.getenv('RADICALE_PORT', '5232')))) latency = (datetime.now() - start).total_seconds() * 1000 components["radicale_caldav"] = { "status": "ok" if result == 0 else "error", "latency_ms": round(latency, 1) } sock.close() except: components["radicale_caldav"] = {"status": "error", "latency_ms": 0} # Check Ollama try: start = datetime.now() async with httpx.AsyncClient() as client: response = await client.get("http://100.104.147.116:11434/api/tags", timeout=3.0) latency = (datetime.now() - start).total_seconds() * 1000 components["ollama_local"] = { "status": "ok" if response.status_code == 200 else "error", "latency_ms": round(latency, 1) } except: components["ollama_local"] = {"status": "error", "latency_ms": 0} # Check Cloudflare tunnel try: start = datetime.now() import subprocess result = subprocess.run(['systemctl', 'is-active', 'cloudflared'], capture_output=True, text=True) latency = (datetime.now() - start).total_seconds() * 1000 components["cloudflare_tunnel"] = { "status": "ok" if result.returncode == 0 else "error", "latency_ms": round(latency, 1) } except: components["cloudflare_tunnel"] = {"status": "unknown", "latency_ms": 0} # Check IMAP Proxy try: import httpx async with httpx.AsyncClient() as client: start = datetime.now() response = await client.get("http://127.0.0.1:8000/imap/status", timeout=3.0) latency = (datetime.now() - start).total_seconds() * 1000 if response.status_code == 200: imap_data = response.json() imap_status = "ok" if imap_data.get("status") == "connected" else "degraded" if imap_data.get("status") == "not_configured" else "error" else: imap_status = "error" components["imap_proxy"] = { "status": imap_status, "latency_ms": round(latency, 1) } except: components["imap_proxy"] = {"status": "not_configured", "latency_ms": 0} # Calculate overall status overall = "healthy" for comp in components.values(): if comp["status"] == "error": overall = "degraded" break # Disk usage disk = psutil.disk_usage('/') return { "overall_status": overall, "components": components, "disk_usage": { "total_gb": round(disk.total / (1024**3), 1), "used_gb": round(disk.used / (1024**3), 1), "usage_pct": round(disk.percent, 1) } } async def _get_event_graph(): """Fetch upcoming events from Event Graph API or test data.""" try: # Try Event Graph API first import httpx async with httpx.AsyncClient() as client: response = await client.get( "http://localhost:8002/api/v1/events?limit=10", timeout=2.0 ) if response.status_code == 200: events = response.json() # Format for dashboard now = datetime.now() formatted = [] for e in events[:8]: start = e.get("start_date", "") # Determine if today, tomorrow, or future day_label = "" try: from datetime import date ev_date = datetime.fromisoformat(start.replace("Z", "+00:00")).date() today = date.today() if ev_date == today: day_label = "Today" elif ev_date == today.replace(day=today.day + 1): day_label = "Tomorrow" else: day_label = ev_date.strftime("%a %b %d") except Exception: day_label = start[:10] if start else "" formatted.append({ "id": e.get("id", ""), "title": e.get("title", "Untitled"), "start": start, "day_label": day_label, "status": e.get("status", "unknown"), "event_type": e.get("event_type", "calendar"), "confidence": e.get("confidence", 0), "location": e.get("location", ""), }) return formatted except Exception: pass # Fallback: Load test events from JSON file try: import json from pathlib import Path test_file = Path(__file__).parent.parent / "data" / "test_events.json" if test_file.exists(): with open(test_file) as f: test_events = json.load(f) # Format for dashboard - filter out declined events formatted = [] for e in test_events[:10]: # Skip declined events (they're hidden from dashboard) if e.get("status") == "declined": continue # Recalculate day_label based on current date start = e.get("start", "") day_label = "" try: from datetime import date ev_date = datetime.fromisoformat(start.replace("Z", "+00:00")).date() today = date.today() if ev_date == today: day_label = "Today" elif ev_date == today + timedelta(days=1): day_label = "Tomorrow" else: day_label = ev_date.strftime("%a %b %d") except Exception: day_label = start[:10] if start else "" # Check for conflicts with other events # Only show conflicts for non-confirmed events (needs_confirmation, pending) # Confirmed events: user accepted the conflict, stay silent # Rescheduled back to pending: conflicts re-appear conflicts = [] family_members = e.get("family_members", []) if e.get("status") in ("needs_confirmation", "pending"): for other in test_events: if other.get("id") != e.get("id") and other.get("status") != "declined": other_date = other.get("start", "") other_members = other.get("family_members", []) if other_date and other_date == start: # Determine conflict severity if family_members and other_members: shared = set(family_members) & set(other_members) if shared: conflicts.append({ "id": other.get("id", ""), "title": other.get("title", "Untitled"), "severity": "person", "shared_members": list(shared) }) else: conflicts.append({ "id": other.get("id", ""), "title": other.get("title", "Untitled"), "severity": "same_day" }) else: conflicts.append({ "id": other.get("id", ""), "title": other.get("title", "Untitled"), "severity": "same_day" }) formatted.append({ "id": e.get("id", ""), "title": e.get("title", "Untitled"), "start": start, "day_label": day_label, "status": e.get("status", "unknown"), "event_type": e.get("event_type", "calendar"), "confidence": e.get("confidence", 0.95), "location": e.get("location", ""), "family_members": family_members, "conflicts": conflicts, }) return formatted except Exception: pass return [] def _load_test_events(): """Load test events from JSON file.""" if TEST_EVENTS_FILE.exists(): with open(TEST_EVENTS_FILE, 'r') as f: return json.load(f) return [] def _save_test_events(events): """Save test events to JSON file.""" with open(TEST_EVENTS_FILE, 'w') as f: json.dump(events, f, indent=2) def _find_event_by_id(events, event_id: str): """Find an event by ID.""" for event in events: if event.get("id") == event_id: return event return None @router.get("/api/events/{event_id}") async def api_get_event(event_id: str, request: Request): """Get a single event by ID — returns HTML for HTMX modal requests.""" events = _load_test_events() event = _find_event_by_id(events, event_id) if not event: raise HTTPException(status_code=404, detail=f"Event {event_id} not found") # Calculate day_label start = event.get("start", "") day_label = "" try: from datetime import date ev_date = datetime.fromisoformat(start.replace("Z", "+00:00")).date() today = date.today() if ev_date == today: day_label = "Today" elif ev_date == today + timedelta(days=1): day_label = "Tomorrow" else: day_label = ev_date.strftime("%a %b %d") except Exception: day_label = start[:10] if start else "" event["day_label"] = day_label # Check if HTMX request is_htmx = request.headers.get("HX-Request") == "true" if is_htmx: # Load and render the event detail template event_detail_path = DASHBOARD_TEMPLATES_DIR / "partials" / "event_detail.html" if event_detail_path.exists(): template = Template(event_detail_path.read_text()) html_content = template.render(event=event) return HTMLResponse(content=html_content) # Return JSON for non-HTMX requests return event @router.post("/api/events/{event_id}/confirm") async def api_event_confirm(event_id: str, request: Request): """Confirm an event — transitions status to confirmed.""" events = _load_test_events() event = _find_event_by_id(events, event_id) if not event: raise HTTPException(status_code=404, detail=f"Event {event_id} not found") # Update status event["status"] = "confirmed" event["updated_at"] = datetime.now().isoformat() _save_test_events(events) # Re-render the event card HTML for HTMX swap events_template_path = DASHBOARD_TEMPLATES_DIR / "dashboard" / "components" / "events_list.html.j2" if events_template_path.exists(): from jinja2 import Template template = Template(events_template_path.read_text()) html_content = template.render(events=[event]) return HTMLResponse(content=html_content) # Fallback to JSON if template missing return { "status": "success", "event_id": event_id, "new_status": "confirmed", "message": "Event confirmed" } @router.post("/api/events/{event_id}/decline") async def api_event_decline(event_id: str, request: Request): """Decline an event — transitions status to declined (hidden from dashboard).""" events = _load_test_events() event = _find_event_by_id(events, event_id) if not event: raise HTTPException(status_code=404, detail=f"Event {event_id} not found") # Update status with soft-delete timestamp event["status"] = "declined" event["updated_at"] = datetime.now().isoformat() event["removed_at"] = datetime.now().isoformat() _save_test_events(events) # Re-render the event card HTML for HTMX swap (will be empty since declined) events_template_path = DASHBOARD_TEMPLATES_DIR / "dashboard" / "components" / "events_list.html.j2" if events_template_path.exists(): from jinja2 import Template template = Template(events_template_path.read_text()) html_content = template.render(events=[event]) return HTMLResponse(content=html_content) # Fallback to JSON if template missing return { "status": "success", "event_id": event_id, "new_status": "declined", "message": "Event declined and hidden from dashboard" } @router.post("/api/events/{event_id}/reschedule") async def api_event_reschedule(event_id: str, request: Request): """Reschedule an event to a new date/time — transitions status to pending. Supports both HTMX (form data) and JSON requests. """ events = _load_test_events() event = _find_event_by_id(events, event_id) if not event: raise HTTPException(status_code=404, detail=f"Event {event_id} not found") # Parse request body (JSON or form data) try: body = await request.json() new_date = body.get("new_date") new_time = body.get("new_time") except Exception: # Try form data form = await request.form() new_date = form.get("new_date") new_time = form.get("new_time") # Update date/time and status if new_date: event["start"] = new_date if new_time: event["time"] = new_time event["status"] = "pending" event["updated_at"] = datetime.now().isoformat() # Check for conflicts with other events on same date conflicts = [] for other in events: if other.get("id") != event_id and other.get("status") != "declined": # Simple date-based conflict detection other_date = other.get("start", "") if other_date and other_date == new_date: conflicts.append({ "title": other.get("title", "Untitled"), "id": other.get("id", ""), "date": other_date }) # Add conflict warning if any found if conflicts: event["conflict_warning"] = f"⚠️ Same day as: {', '.join(c['title'] for c in conflicts)}" else: event.pop("conflict_warning", None) _save_test_events(events) # Re-render the event card HTML for HTMX swap events_template_path = DASHBOARD_TEMPLATES_DIR / "dashboard" / "components" / "events_list.html.j2" if events_template_path.exists(): from jinja2 import Template template = Template(events_template_path.read_text()) html_content = template.render(events=[event]) return HTMLResponse(content=html_content) # Fallback to JSON if template missing return { "status": "success", "event_id": event_id, "new_status": "pending", "new_date": new_date, "new_time": new_time, "conflicts": conflicts, "message": "Event rescheduled and set to pending" } class ModifyRequest(BaseModel): title: Optional[str] = None new_date: str new_time: Optional[str] = None location: Optional[str] = None @router.get("/api/events/{event_id}/reschedule-form") async def api_event_reschedule_form(event_id: str, request: Request): """Return reschedule form HTML for HTMX swap.""" events = _load_test_events() event = _find_event_by_id(events, event_id) if not event: raise HTTPException(status_code=404, detail=f"Event {event_id} not found") # Extract date and time from start field start = event.get("start", "") current_date = start[:10] if start else datetime.now().strftime("%Y-%m-%d") current_time = event.get("time", "12:00")[:5] if event.get("time") else "12:00" # Generate HTML form html_content = f'''\n