"""HoffDesk Analytics — lightweight request tracking.
SQLite-backed, zero external dependencies.
Logs: timestamp, path, method, status, response_ms, user_agent, referer, host, ip_hash
Dashboard: /admin/analytics (family app only)
"""
import sqlite3
import hashlib
import time
import json
from pathlib import Path
from datetime import datetime, timedelta
from collections import Counter
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
# ── Config ─────────────────────────────────────────────────────────────────
DB_PATH = Path(__file__).parent / "data" / "analytics.db"
# Paths to ignore (static assets, health checks, analytics itself)
IGNORE_PATHS = {
"/health", "/admin/analytics", "/admin/analytics/",
"/favicon.ico", "/robots.txt",
}
IGNORE_PREFIXES = (
"/static/", "/blog/static/", "/api/health",
)
# ── DB Setup ───────────────────────────────────────────────────────────────
def _ensure_db():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.execute(
"""
CREATE TABLE IF NOT EXISTS requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
host TEXT,
path TEXT NOT NULL,
method TEXT,
status_code INTEGER,
response_ms REAL,
user_agent TEXT,
referer TEXT,
ip_hash TEXT
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_requests_ts ON requests(ts)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_requests_path ON requests(path)
"""
)
conn.commit()
conn.close()
# ── Middleware ─────────────────────────────────────────────────────────────
class AnalyticsMiddleware(BaseHTTPMiddleware):
"""Record every request to local SQLite. Ignores static assets and health checks."""
async def dispatch(self, request: Request, call_next):
path = request.url.path
# Skip ignored paths
if path in IGNORE_PATHS or any(path.startswith(p) for p in IGNORE_PREFIXES):
return await call_next(request)
start = time.time()
response: Response = await call_next(request)
duration = (time.time() - start) * 1000
# Hash IP for privacy (no raw IPs stored)
client_ip = request.client.host if request.client else "unknown"
ip_hash = hashlib.sha256(client_ip.encode()).hexdigest()[:16]
# Async-safe write via thread pool
import asyncio
def _write():
try:
conn = sqlite3.connect(str(DB_PATH))
conn.execute(
"""
INSERT INTO requests (ts, host, path, method, status_code, response_ms, user_agent, referer, ip_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
datetime.utcnow().isoformat(),
request.url.hostname,
path,
request.method,
response.status_code,
round(duration, 2),
request.headers.get("user-agent", "")[:250],
request.headers.get("referer", "")[:250],
ip_hash,
),
)
conn.commit()
conn.close()
except Exception:
pass # Analytics must never break the app
asyncio.get_event_loop().run_in_executor(None, _write)
return response
# ── Dashboard Queries ──────────────────────────────────────────────────────
def _query(sql: str, params=()) -> list:
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
cur = conn.execute(sql, params)
rows = [dict(r) for r in cur.fetchall()]
conn.close()
return rows
def get_stats(days: int = 30) -> dict:
"""Aggregate analytics for the dashboard."""
since = (datetime.utcnow() - timedelta(days=days)).isoformat()
# Total requests + unique visitors (by ip_hash)
total = _query(
"SELECT COUNT(*) as c FROM requests WHERE ts > ?", (since,)
)[0]["c"]
unique = _query(
"SELECT COUNT(DISTINCT ip_hash) as c FROM requests WHERE ts > ?", (since,)
)[0]["c"]
# Top paths
top_paths = _query(
"""
SELECT path, COUNT(*) as hits
FROM requests WHERE ts > ?
GROUP BY path ORDER BY hits DESC LIMIT 20
""",
(since,),
)
# Daily breakdown
daily = _query(
"""
SELECT DATE(ts) as day, COUNT(*) as hits, COUNT(DISTINCT ip_hash) as unique_visitors
FROM requests WHERE ts > ?
GROUP BY day ORDER BY day DESC LIMIT 30
""",
(since,),
)
# Status code distribution
statuses = _query(
"""
SELECT status_code, COUNT(*) as hits
FROM requests WHERE ts > ?
GROUP BY status_code ORDER BY hits DESC
""",
(since,),
)
# Response time percentiles (rough)
pctiles = _query(
"""
SELECT response_ms FROM requests
WHERE ts > ? AND response_ms IS NOT NULL
ORDER BY response_ms
""",
(since,),
)
if pctiles:
times = [r["response_ms"] for r in pctiles]
p50 = times[len(times) // 2]
p95 = times[int(len(times) * 0.95)] if len(times) > 20 else times[-1]
else:
p50 = p95 = 0
return {
"total_requests": total,
"unique_visitors": unique,
"top_paths": top_paths,
"daily": daily,
"statuses": statuses,
"response_ms_p50": p50,
"response_ms_p95": p95,
"since_days": days,
}
# ── Dashboard HTML ───────────────────────────────────────────────────────────
def _render_dashboard(stats: dict) -> str:
rows = "\n".join(
f'
| {r["path"]} | {r["hits"]:,} |
'
for r in stats["top_paths"]
)
daily_rows = "\n".join(
f'| {r["day"]} | {r["hits"]:,} | {r["unique_visitors"]:,} |
'
for r in stats["daily"]
)
status_rows = "\n".join(
f'| {r["status_code"]} | {r["hits"]:,} |
'
for r in stats["statuses"]
)
return f"""
HoffDesk Analytics
📊 HoffDesk Analytics
Last {stats['since_days']} days · Self-hosted · Zero third-party scripts
Total Requests
{stats['total_requests']:,}
Unique Visitors
{stats['unique_visitors']:,}
P50 Response
{stats['response_ms_p50']:.0f}ms
P95 Response
{stats['response_ms_p95']:.0f}ms
Daily Traffic
| Date | Requests | Unique |
{daily_rows}
Status Codes
| Status | Count |
{status_rows}
"""