"""
Family Assistant Webhook — FastAPI listener on localhost:5000
Receives push emails from Cloudflare Email Worker via hook.hoffdesk.com tunnel.
Auth: X-Hoffdesk-Secret header must match WEBHOOK_SECRET in .env
"""
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.responses import JSONResponse
import uvicorn
import os
import sys
import json
import hashlib
import time
from typing import Optional
from datetime import datetime, timezone
from email import message_from_string
from email.policy import default as default_policy
Bootstrap family_assistant imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(file))))
app = FastAPI(title="HoffDesk Webhook", version="1.0.0")
---------------------------------------------------------------------------
Config
---------------------------------------------------------------------------
def _load_secret() -> str:
"""Load WEBHOOK_SECRET from env or .env file."""
secret = os.environ.get("WEBHOOK_SECRET", "")
if secret:
return secret
env_path = os.path.join(os.path.dirname(os.path.dirname(file)), ".env")
if os.path.exists(env_path):
with open(env_path) as f:
for line in f:
if line.startswith("WEBHOOK_SECRET="):
return line.strip().split("=", 1)[1].strip()
return ""
WEBHOOK_SECRET = _load_secret()
if not WEBHOOK_SECRET:
print("WARN: WEBHOOK_SECRET not set — all requests will be rejected", file=sys.stderr)
---------------------------------------------------------------------------
In-memory dedup (24 h TTL, max 2048 keys)
---------------------------------------------------------------------------
_dedup: dict[str, float] = {}
_DEDUP_TTL = 86400 # 24 hours
_DEDUP_MAX = 2048
def _is_duplicate(key: str) -> bool:
now = time.time()
ts = _dedup.get(key)
if ts is None:
return False
if now - ts > _DEDUP_TTL:
del _dedup[key]
return False
return True
def _mark_processed(key: str):
now = time.time()
_dedup[key] = now
# Evict oldest entries if cache is full
if len(_dedup) > _DEDUP_MAX:
cutoff = now - _DEDUP_TTL
expired = [k for k, v in _dedup.items() if v < cutoff]
for k in expired[:_DEDUP_MAX // 2]:
del _dedup[k]
---------------------------------------------------------------------------
Email parsing
---------------------------------------------------------------------------
def _parse_raw_email(raw: str) -> dict:
"""Parse RFC 5322 raw email into structured dict."""
msg = message_from_string(raw, policy=default_policy)
body_text = ""
body_html = ""
attachments = []
if msg.is_multipart():
for part in msg.walk():
cd = part.get("Content-Disposition", "")
ct = part.get_content_type()
if "attachment" in cd:
fn = part.get_filename()
if fn:
payload = part.get_payload(decode=True)
attachments.append({
"filename": fn,
"content_type": ct,
"size": len(payload) if payload else 0,
})
elif ct == "text/plain" and not body_text:
body_text = part.get_content() or ""
elif ct == "text/html" and not body_html:
body_html = part.get_content() or ""
else:
body_text = msg.get_content() or ""
# Fallback: extract body from raw if parser returned empty
if not body_text and not body_html:
# Simple heuristic: everything after first blank line
parts = raw.split("\n\n", 1)
if len(parts) == 2:
body_text = parts[1][:50000] # 50K char safety limit
return {
"body_text": body_text[:50000],
"body_html": body_html[:50000],
"attachments": attachments,
"has_attachments": len(attachments) > 0,
}
---------------------------------------------------------------------------
Pipeline bridge
---------------------------------------------------------------------------
def _run_pipeline(email_data: dict) -> dict:
"""
Hand the parsed email to the family assistant pipeline.
Falls back gracefully if pipeline is unavailable.
"""
try:
from family_assistant import pipeline
# pipeline.process_webhook_email() will be implemented next
# For now, log and return acceptance
result = pipeline.process_webhook_email(email_data, notify=True)
return {"action": "processed", "result": result}
except AttributeError:
# process_webhook_email not yet implemented — accept & queue
print(" → pipeline.process_webhook_email() not yet implemented — email queued", flush=True)
return {"action": "accepted", "note": "pipeline integration pending"}
except Exception as e:
print(f" → Pipeline error: {e}", file=sys.stderr)
# Return success to prevent Cloudflare retry storm
return {"action": "error", "error": str(e)}
---------------------------------------------------------------------------
HTML → plain text (stdlib only, per architecture rules)
---------------------------------------------------------------------------
def _html_to_text(html: str) -> str:
"""Strip HTML tags using stdlib html.parser — no external deps."""
from html.parser import HTMLParser
class _Stripper(HTMLParser):
def __init__(self):
super().__init__()
self._parts: list[str] = []
def handle_data(self, data):
self._parts.append(data)
def get_text(self):
return " ".join(self._parts)
s = _Stripper()
s.feed(html)
return s.get_text()
---------------------------------------------------------------------------
Routes
---------------------------------------------------------------------------
@app.get("/health")
async def health():
return {
"status": "ok",
"timestamp": datetime.now(timezone.utc).isoformat(),
"dedup_cache_size": len(_dedup),
}
@app.post("/webhook")
async def email_webhook(
request: Request,
x_hoffdesk_secret: Optional[str] = Header(None, alias="X-Hoffdesk-Secret"),
):
"""Receive email push from Cloudflare Worker."""
# --- Auth ---
if not WEBHOOK_SECRET:
raise HTTPException(503, detail="Webhook not configured")
if x_hoffdesk_secret != WEBHOOK_SECRET:
ts = datetime.now(timezone.utc).isoformat()
print(f"[{ts}] REJECTED: invalid X-Hoffdesk-Secret", file=sys.stderr)
raise HTTPException(401, detail="Unauthorized")
# --- Parse payload ---
try:
payload = await request.json()
except Exception as e:
raise HTTPException(400, detail=f"Invalid JSON: {e}")
for field in ("from", "to", "subject", "raw_email"):
if field not in payload:
raise HTTPException(400, detail=f"Missing: {field}")
ts = datetime.now(timezone.utc).isoformat()
subj = payload["subject"][:60]
print(f"[{ts}] ← {payload['from']} | {subj}", flush=True)
# --- Dedup ---
dedup_key = payload.get("dedup_key") or hashlib.sha256(
f"{payload['from']}|{payload['subject']}|{payload.get('message_id','')}".encode()
).hexdigest()[:32]
if _is_duplicate(dedup_key):
print(f" → Dedup skip: {dedup_key[:16]}…", flush=True)
return JSONResponse({"status": "dedup", "dedup_key": dedup_key})
_mark_processed(dedup_key)
# --- Parse email ---
parsed = _parse_raw_email(payload["raw_email"])
# HTML → text fallback
body = parsed["body_text"]
if not body.strip() and parsed["body_html"]:
body = _html_to_text(parsed["body_html"])
# --- Build internal email record ---
email_data = {
"message_id": payload.get("message_id", ""),
"from": payload["from"],
"to": payload["to"],
"subject": payload["subject"],
"date": payload.get("date", ""),
"body_text": body,
"body_html": parsed["body_html"],
"has_attachments": parsed["has_attachments"],
"attachments": parsed["attachments"],
"dedup_key": dedup_key,
"source": "webhook",
}
# --- Pipeline ---
result = _run_pipeline(email_data)
print(f" → {result.get('action', 'unknown')}", flush=True)
return JSONResponse({"status": "ok", **result})
---------------------------------------------------------------------------
Entry point (dev/test; production uses systemd + uvicorn)
---------------------------------------------------------------------------
if name == "main":
uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")