"""Family dashboard router - serves the dashboard HTML and API endpoints.
Per CONTRACT.md: All HTMX fragments moved to /ui/* namespace in ui_router.py.
This file keeps JSON API endpoints and HTML page serving.
"""
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from datetime import datetime, timedelta
from pathlib import Path
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import json
from fastapi.templating import Jinja2Templates
from shared.session_auth import require_auth, get_session, is_authenticated
router = APIRouter(prefix="", tags=["dashboard"])
Local templates for hoffdesk-api dashboard
DASHBOARD_TEMPLATES_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/dashboard/templates")
templates = Jinja2Templates(directory=str(DASHBOARD_TEMPLATES_DIR))
Path to test events file
TEST_EVENTS_FILE = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/data/test_events.json")
Pydantic models for request bodies
class RescheduleRequest(BaseModel):
new_date: str
new_time: Optional[str] = None
class CounterRequest(BaseModel):
proposed_date: str
proposed_time: Optional[str] = None
Static files for dashboard
DASHBOARD_STATIC_DIR = Path("/home/hoffmann_admin/.openclaw/workspace-socrates/hoffdesk-api/dashboard/static")
@router.get("/family/login/", response_class=HTMLResponse)
async def family_login_page(request: Request):
"""Serve the family login page."""
# Already authenticated? Redirect to dashboard.
if is_authenticated(request):
return RedirectResponse(url="/", status_code=302)
return templates.TemplateResponse(
request,
"family_login.html.j2",
{}
)
@router.get("/", response_class=HTMLResponse)
async def dashboard_root(request: Request, view: str = None):
"""Serve family dashboard at root path (for family.hoffdesk.com).
Parameters:
view (str): Optional view mode - 'list' or 'week'
"""
if not is_authenticated(request):
return RedirectResponse(url="/family/login/?redirect=/", status_code=302)
# Route to appropriate view
if view == "week":
return await week_view_page(request)
return await dashboard_page(request)
@router.get("/dashboard/", response_class=HTMLResponse)
async def dashboard_page(request: Request):
"""Serve the family dashboard HTML."""
if not is_authenticated(request):
return RedirectResponse(url="/family/login/?redirect=/dashboard/", status_code=302)
# Use Jinja2 template for the dashboard
return templates.TemplateResponse(
request,
"dashboard.html.j2",
{}
)
@router.get("/week", response_class=HTMLResponse)
async def week_view_page(request: Request):
"""Serve the week view HTML."""
if not is_authenticated(request):
return RedirectResponse(url="/family/login/?redirect=/week", status_code=302)
# Use Jinja2 template for week view
return templates.TemplateResponse(
request,
"week_view.html.j2",
{}
)
@router.get("/api/today")
async def api_today(request: Request):
"""Return today's data: calendar, weather, health.
Public endpoint — consumed by HTMX polling on the dashboard.
No auth required since data is non-sensitive (calendar, weather, health).
"""
# Get real calendar data from Radicale
calendar_data = await _get_calendar_today()
# Get weather data
weather_data = await _get_weather()
# Get system health
health_data = await _get_system_health()
# Get Event Graph data
events_data = await _get_event_graph()
return {
"timestamp": datetime.now().isoformat(),
"calendar": calendar_data,
"weather": weather_data,
"health": health_data,
"events": events_data,
"meta": {
"cache_ttl_seconds": 60,
"data_freshness": {
"calendar_seconds_ago": 0,
"weather_seconds_ago": 0
}
}
}
@router.get("/api/events-dashboard")
async def api_events_dashboard(request: Request):
"""Return Event Graph HTML for dashboard — upcoming events with action buttons.
Public endpoint — consumed by HTMX polling on the dashboard.
Returns rendered HTML instead of JSON so buttons work immediately.
"""
events_data = await _get_event_graph()
# Load events.html template
events_template_path = DASHBOARD_TEMPLATES_DIR / "events.html"
if events_template_path.exists():
template = Template(events_template_path.read_text())
html_content = template.render(events=events_data)
return HTMLResponse(content=html_content)
# Fallback to JSON if template missing
return {
"timestamp": datetime.now().isoformat(),
"events": events_data,
"meta": {
"source": "event-graph-api",
"cache_ttl_seconds": 15
}
}
async def _get_calendar_today():
"""Fetch today's events from Radicale."""
try:
import caldav
import os
# Support both CALDAV_* and RADICALE_* env vars
host = os.getenv('RADICALE_HOST') or os.getenv('CALDAV_URL', '127.0.0.1')
# Extract host from URL if needed
if host.startswith('http://'):
host = host[7:] # strip http://
if ':' in host:
host = host.split(':')[0]
port = os.getenv('RADICALE_PORT') or '5232'
user = os.getenv('RADICALE_USER') or os.getenv('CALDAV_USER', 'assistant')
password = os.getenv('RADICALE_PASS') or os.getenv('CALDAV_PASSWORD', 'family-assistant-2026')
client = caldav.DAVClient(
url=f"http://{host}:{port}/",
username=user,
password=password
)
principal = client.principal()
calendars = principal.calendars()
# Get family calendar
family_cal = None
for cal in calendars:
if 'family' in cal.name.lower():
family_cal = cal
break
if not family_cal:
family_cal = calendars[0] if calendars else None
if family_cal:
# Get events for today
from datetime import date, timedelta
today = date.today()
tomorrow = today + timedelta(days=1)
events_data = family_cal.date_search(today, tomorrow)
events = []
for event in events_data:
vevent = event.instance.vevent
events.append({
"uid": str(vevent.uid.value) if hasattr(vevent, 'uid') else "",
"summary": str(vevent.summary.value) if hasattr(vevent, 'summary') else "",
"start": vevent.dtstart.value.isoformat() if hasattr(vevent, 'dtstart') else "",
"end": vevent.dtend.value.isoformat() if hasattr(vevent, 'dtend') else "",
"is_all_day": not isinstance(vevent.dtstart.value, datetime) if hasattr(vevent, 'dtstart') else False,
"location": str(vevent.location.value) if hasattr(vevent, 'location') else "",
"attendees": []
})
return {"status": "ok", "events": events}
return {"status": "error", "message": "No calendar found", "events": []}
except Exception as e:
return {"status": "error", "message": str(e), "events": []}
async def _get_weather():
"""Fetch weather data."""
try:
import httpx
# Use wttr.in for free weather (no API key needed)
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get("https://wttr.in/Green+Bay,WI?format=j1")
if response.status_code == 200:
data = response.json()
current = data.get("current_condition", [{}])[0]
# Build forecast
forecast = []
for hour in data.get("weather", [{}])[0].get("hourly", [])[:12]:
forecast.append({
"hour": f"{hour.get('time', '0000')[:2]}:00",
"temperature_f": int(hour.get("tempF", 0)),
"condition": hour.get("weatherDesc", [{}])[0].get("value", ""),
"precipitation_chance_pct": int(hour.get("chanceofrain", 0))
})
return {
"status": "ok",
"current": {
"temperature_f": int(current.get("temp_F", 0)),
"feels_like_f": int(current.get("FeelsLikeF", 0)),
"condition": current.get("weatherDesc", [{}])[0].get("value", ""),
"humidity_pct": int(current.get("humidity", 0)),
"wind_mph": int(current.get("windspeedMiles", 0))
},
"forecast": forecast
}
except Exception as e:
pass
# Fallback data
return {
"status": "fallback",
"current": {
"temperature_f": 52,
"feels_like_f": 49,
"condition": "Data unavailable",
"humidity_pct": 62,
"wind_mph": 10
},
"forecast": []
}
async def _get_system_health():
"""Check system health."""
import os
import psutil
components = {}
# Check OpenClaw gateway
try:
import httpx
async with httpx.AsyncClient() as client:
start = datetime.now()
response = await client.get("http://127.0.0.1:18789/health", timeout=2.0)
latency = (datetime.now() - start).total_seconds() * 1000
components["openclaw_gateway"] = {
"status": "ok" if response.status_code == 200 else "degraded",
"latency_ms": round(latency, 1)
}
except:
components["openclaw_gateway"] = {"status": "error", "latency_ms": 0}
# Check Radicale
try:
start = datetime.now()
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((os.getenv('RADICALE_HOST', '127.0.0.1'), int(os.getenv('RADICALE_PORT', '5232'))))
latency = (datetime.now() - start).total_seconds() * 1000
components["radicale_caldav"] = {
"status": "ok" if result == 0 else "error",
"latency_ms": round(latency, 1)
}
sock.close()
except:
components["radicale_caldav"] = {"status": "error", "latency_ms": 0}
# Check Ollama
try:
start = datetime.now()
async with httpx.AsyncClient() as client:
response = await client.get("http://100.104.147.116:11434/api/tags", timeout=3.0)
latency = (datetime.now() - start).total_seconds() * 1000
components["ollama_local"] = {
"status": "ok" if response.status_code == 200 else "error",
"latency_ms": round(latency, 1)
}
except:
components["ollama_local"] = {"status": "error", "latency_ms": 0}
# Check Cloudflare tunnel
try:
start = datetime.now()
import subprocess
result = subprocess.run(['systemctl', 'is-active', 'cloudflared'], capture_output=True, text=True)
latency = (datetime.now() - start).total_seconds() * 1000
components["cloudflare_tunnel"] = {
"status": "ok" if result.returncode == 0 else "error",
"latency_ms": round(latency, 1)
}
except:
components["cloudflare_tunnel"] = {"status": "unknown", "latency_ms": 0}
# Check IMAP Proxy
try:
import httpx
async with httpx.AsyncClient() as client:
start = datetime.now()
response = await client.get("http://127.0.0.1:8000/imap/status", timeout=3.0)
latency = (datetime.now() - start).total_seconds() * 1000
if response.status_code == 200:
imap_data = response.json()
imap_status = "ok" if imap_data.get("status") == "connected" else "degraded" if imap_data.get("status") == "not_configured" else "error"
else:
imap_status = "error"
components["imap_proxy"] = {
"status": imap_status,
"latency_ms": round(latency, 1)
}
except:
components["imap_proxy"] = {"status": "not_configured", "latency_ms": 0}
# Calculate overall status
overall = "healthy"
for comp in components.values():
if comp["status"] == "error":
overall = "degraded"
break
# Disk usage
disk = psutil.disk_usage('/')
return {
"overall_status": overall,
"components": components,
"disk_usage": {
"total_gb": round(disk.total / (1024**3), 1),
"used_gb": round(disk.used / (1024**3), 1),
"usage_pct": round(disk.percent, 1)
}
}
async def _get_event_graph():
"""Fetch upcoming events from Event Graph API or test data."""
try:
# Try Event Graph API first
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(
"http://localhost:8002/api/v1/events?limit=10",
timeout=2.0
)
if response.status_code == 200:
events = response.json()
# Format for dashboard
now = datetime.now()
formatted = []
for e in events[:8]:
start = e.get("start_date", "")
# Determine if today, tomorrow, or future
day_label = ""
try:
from datetime import date
ev_date = datetime.fromisoformat(start.replace("Z", "+00:00")).date()
today = date.today()
if ev_date == today:
day_label = "Today"
elif ev_date == today.replace(day=today.day + 1):
day_label = "Tomorrow"
else:
day_label = ev_date.strftime("%a %b %d")
except Exception:
day_label = start[:10] if start else ""
formatted.append({
"id": e.get("id", ""),
"title": e.get("title", "Untitled"),
"start": start,
"day_label": day_label,
"status": e.get("status", "unknown"),
"event_type": e.get("event_type", "calendar"),
"confidence": e.get("confidence", 0),
"location": e.get("location", ""),
})
return formatted
except Exception:
pass
# Fallback: Load test events from JSON file
try:
import json
from pathlib import Path
test_file = Path(__file__).parent.parent / "data" / "test_events.json"
if test_file.exists():
with open(test_file) as f:
test_events = json.load(f)
# Format for dashboard - filter out declined events
formatted = []
for e in test_events[:10]:
# Skip declined events (they're hidden from dashboard)
if e.get("status") == "declined":
continue
# Recalculate day_label based on current date
start = e.get("start", "")
day_label = ""
try:
from datetime import date
ev_date = datetime.fromisoformat(start.replace("Z", "+00:00")).date()
today = date.today()
if ev_date == today:
day_label = "Today"
elif ev_date == today + timedelta(days=1):
day_label = "Tomorrow"
else:
day_label = ev_date.strftime("%a %b %d")
except Exception:
day_label = start[:10] if start else ""
# Check for conflicts with other events
# Only show conflicts for non-confirmed events (needs_confirmation, pending)
# Confirmed events: user accepted the conflict, stay silent
# Rescheduled back to pending: conflicts re-appear
conflicts = []
family_members = e.get("family_members", [])
if e.get("status") in ("needs_confirmation", "pending"):
for other in test_events:
if other.get("id") != e.get("id") and other.get("status") != "declined":
other_date = other.get("start", "")
other_members = other.get("family_members", [])
if other_date and other_date == start:
# Determine conflict severity
if family_members and other_members:
shared = set(family_members) & set(other_members)
if shared:
conflicts.append({
"id": other.get("id", ""),
"title": other.get("title", "Untitled"),
"severity": "person",
"shared_members": list(shared)
})
else:
conflicts.append({
"id": other.get("id", ""),
"title": other.get("title", "Untitled"),
"severity": "same_day"
})
else:
conflicts.append({
"id": other.get("id", ""),
"title": other.get("title", "Untitled"),
"severity": "same_day"
})
formatted.append({
"id": e.get("id", ""),
"title": e.get("title", "Untitled"),
"start": start,
"day_label": day_label,
"status": e.get("status", "unknown"),
"event_type": e.get("event_type", "calendar"),
"confidence": e.get("confidence", 0.95),
"location": e.get("location", ""),
"family_members": family_members,
"conflicts": conflicts,
})
return formatted
except Exception:
pass
return []
def _load_test_events():
"""Load test events from JSON file."""
if TEST_EVENTS_FILE.exists():
with open(TEST_EVENTS_FILE, 'r') as f:
return json.load(f)
return []
def _save_test_events(events):
"""Save test events to JSON file."""
with open(TEST_EVENTS_FILE, 'w') as f:
json.dump(events, f, indent=2)
def _find_event_by_id(events, event_id: str):
"""Find an event by ID."""
for event in events:
if event.get("id") == event_id:
return event
return None
@router.get("/api/events/{event_id}")
async def api_get_event(event_id: str, request: Request):
"""Get a single event by ID — returns HTML for HTMX modal requests."""
events = _load_test_events()
event = _find_event_by_id(events, event_id)
if not event:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
# Calculate day_label
start = event.get("start", "")
day_label = ""
try:
from datetime import date
ev_date = datetime.fromisoformat(start.replace("Z", "+00:00")).date()
today = date.today()
if ev_date == today:
day_label = "Today"
elif ev_date == today + timedelta(days=1):
day_label = "Tomorrow"
else:
day_label = ev_date.strftime("%a %b %d")
except Exception:
day_label = start[:10] if start else ""
event["day_label"] = day_label
# Check if HTMX request
is_htmx = request.headers.get("HX-Request") == "true"
if is_htmx:
# Load and render the event detail template
event_detail_path = DASHBOARD_TEMPLATES_DIR / "partials" / "event_detail.html"
if event_detail_path.exists():
template = Template(event_detail_path.read_text())
html_content = template.render(event=event)
return HTMLResponse(content=html_content)
# Return JSON for non-HTMX requests
return event
@router.post("/api/events/{event_id}/confirm")
async def api_event_confirm(event_id: str, request: Request):
"""Confirm an event — transitions status to confirmed."""
events = _load_test_events()
event = _find_event_by_id(events, event_id)
if not event:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
# Update status
event["status"] = "confirmed"
event["updated_at"] = datetime.now().isoformat()
_save_test_events(events)
# Re-render the event card HTML for HTMX swap
events_template_path = DASHBOARD_TEMPLATES_DIR / "dashboard" / "components" / "events_list.html.j2"
if events_template_path.exists():
from jinja2 import Template
template = Template(events_template_path.read_text())
html_content = template.render(events=[event])
return HTMLResponse(content=html_content)
# Fallback to JSON if template missing
return {
"status": "success",
"event_id": event_id,
"new_status": "confirmed",
"message": "Event confirmed"
}
@router.post("/api/events/{event_id}/decline")
async def api_event_decline(event_id: str, request: Request):
"""Decline an event — transitions status to declined (hidden from dashboard)."""
events = _load_test_events()
event = _find_event_by_id(events, event_id)
if not event:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
# Update status with soft-delete timestamp
event["status"] = "declined"
event["updated_at"] = datetime.now().isoformat()
event["removed_at"] = datetime.now().isoformat()
_save_test_events(events)
# Re-render the event card HTML for HTMX swap (will be empty since declined)
events_template_path = DASHBOARD_TEMPLATES_DIR / "dashboard" / "components" / "events_list.html.j2"
if events_template_path.exists():
from jinja2 import Template
template = Template(events_template_path.read_text())
html_content = template.render(events=[event])
return HTMLResponse(content=html_content)
# Fallback to JSON if template missing
return {
"status": "success",
"event_id": event_id,
"new_status": "declined",
"message": "Event declined and hidden from dashboard"
}
@router.post("/api/events/{event_id}/reschedule")
async def api_event_reschedule(event_id: str, request: Request):
"""Reschedule an event to a new date/time — transitions status to pending.
Supports both HTMX (form data) and JSON requests.
"""
events = _load_test_events()
event = _find_event_by_id(events, event_id)
if not event:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
# Parse request body (JSON or form data)
try:
body = await request.json()
new_date = body.get("new_date")
new_time = body.get("new_time")
except Exception:
# Try form data
form = await request.form()
new_date = form.get("new_date")
new_time = form.get("new_time")
# Update date/time and status
if new_date:
event["start"] = new_date
if new_time:
event["time"] = new_time
event["status"] = "pending"
event["updated_at"] = datetime.now().isoformat()
# Check for conflicts with other events on same date
conflicts = []
for other in events:
if other.get("id") != event_id and other.get("status") != "declined":
# Simple date-based conflict detection
other_date = other.get("start", "")
if other_date and other_date == new_date:
conflicts.append({
"title": other.get("title", "Untitled"),
"id": other.get("id", ""),
"date": other_date
})
# Add conflict warning if any found
if conflicts:
event["conflict_warning"] = f"⚠️ Same day as: {', '.join(c['title'] for c in conflicts)}"
else:
event.pop("conflict_warning", None)
_save_test_events(events)
# Re-render the event card HTML for HTMX swap
events_template_path = DASHBOARD_TEMPLATES_DIR / "dashboard" / "components" / "events_list.html.j2"
if events_template_path.exists():
from jinja2 import Template
template = Template(events_template_path.read_text())
html_content = template.render(events=[event])
return HTMLResponse(content=html_content)
# Fallback to JSON if template missing
return {
"status": "success",
"event_id": event_id,
"new_status": "pending",
"new_date": new_date,
"new_time": new_time,
"conflicts": conflicts,
"message": "Event rescheduled and set to pending"
}
class ModifyRequest(BaseModel):
title: Optional[str] = None
new_date: str
new_time: Optional[str] = None
location: Optional[str] = None
@router.get("/api/events/{event_id}/reschedule-form")
async def api_event_reschedule_form(event_id: str, request: Request):
"""Return reschedule form HTML for HTMX swap."""
events = _load_test_events()
event = _find_event_by_id(events, event_id)
if not event:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
# Extract date and time from start field
start = event.get("start", "")
current_date = start[:10] if start else datetime.now().strftime("%Y-%m-%d")
current_time = event.get("time", "12:00")[:5] if event.get("time") else "12:00"
# Generate HTML form
html_content = f'''\n<div class="event-card" id="event-{event_id}">
Reschedule: {event.get("title", "Untitled")}
return HTMLResponse(content=html_content)
@router.get("/api/events/{event_id}/modify-form")
async def api_event_modify_form(event_id: str, request: Request):
"""Return modify form HTML for HTMX swap."""
events = _load_test_events()
event = _find_event_by_id(events, event_id)
if not event:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
# Extract date and time from start field
start = event.get("start", "")
current_date = start[:10] if start else datetime.now().strftime("%Y-%m-%d")
current_time = event.get("time", "12:00")[:5] if event.get("time") else "12:00"
current_title = event.get("title", "")
current_location = event.get("location", "")
# Generate HTML form
html_content = f'''\n<div class="event-card" id="event-{event_id}">