""" Family Assistant Webhook — FastAPI listener on localhost:5000 Receives push emails from Cloudflare Email Worker via hook.hoffdesk.com tunnel. Auth: X-Hoffdesk-Secret header must match WEBHOOK_SECRET in .env """ from fastapi import FastAPI, HTTPException, Header, Request from fastapi.responses import JSONResponse import uvicorn import os import sys import json import hashlib import time from typing import Optional from datetime import datetime, timezone from email import message_from_string from email.policy import default as default_policy # Bootstrap family_assistant imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) app = FastAPI(title="HoffDesk Webhook", version="1.0.0") # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- def _load_secret() -> str: """Load WEBHOOK_SECRET from env or .env file.""" secret = os.environ.get("WEBHOOK_SECRET", "") if secret: return secret env_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), ".env") if os.path.exists(env_path): with open(env_path) as f: for line in f: if line.startswith("WEBHOOK_SECRET="): return line.strip().split("=", 1)[1].strip() return "" WEBHOOK_SECRET = _load_secret() if not WEBHOOK_SECRET: print("WARN: WEBHOOK_SECRET not set — all requests will be rejected", file=sys.stderr) # --------------------------------------------------------------------------- # In-memory dedup (24 h TTL, max 2048 keys) # --------------------------------------------------------------------------- _dedup: dict[str, float] = {} _DEDUP_TTL = 86400 # 24 hours _DEDUP_MAX = 2048 def _is_duplicate(key: str) -> bool: now = time.time() ts = _dedup.get(key) if ts is None: return False if now - ts > _DEDUP_TTL: del _dedup[key] return False return True def _mark_processed(key: str): now = time.time() _dedup[key] = now # Evict oldest entries if cache is full if len(_dedup) > _DEDUP_MAX: cutoff = now - _DEDUP_TTL expired = [k for k, v in _dedup.items() if v < cutoff] for k in expired[:_DEDUP_MAX // 2]: del _dedup[k] # --------------------------------------------------------------------------- # Email parsing # --------------------------------------------------------------------------- def _parse_raw_email(raw: str) -> dict: """Parse RFC 5322 raw email into structured dict.""" msg = message_from_string(raw, policy=default_policy) body_text = "" body_html = "" attachments = [] if msg.is_multipart(): for part in msg.walk(): cd = part.get("Content-Disposition", "") ct = part.get_content_type() if "attachment" in cd: fn = part.get_filename() if fn: payload = part.get_payload(decode=True) attachments.append({ "filename": fn, "content_type": ct, "size": len(payload) if payload else 0, }) elif ct == "text/plain" and not body_text: body_text = part.get_content() or "" elif ct == "text/html" and not body_html: body_html = part.get_content() or "" else: body_text = msg.get_content() or "" # Fallback: extract body from raw if parser returned empty if not body_text and not body_html: # Simple heuristic: everything after first blank line parts = raw.split("\n\n", 1) if len(parts) == 2: body_text = parts[1][:50000] # 50K char safety limit return { "body_text": body_text[:50000], "body_html": body_html[:50000], "attachments": attachments, "has_attachments": len(attachments) > 0, } # --------------------------------------------------------------------------- # Pipeline bridge # --------------------------------------------------------------------------- def _run_pipeline(email_data: dict) -> dict: """ Hand the parsed email to the family assistant pipeline. Falls back gracefully if pipeline is unavailable. """ try: from family_assistant import pipeline # pipeline.process_webhook_email() will be implemented next # For now, log and return acceptance result = pipeline.process_webhook_email(email_data, notify=True) return {"action": "processed", "result": result} except AttributeError: # process_webhook_email not yet implemented — accept & queue print(" → pipeline.process_webhook_email() not yet implemented — email queued", flush=True) return {"action": "accepted", "note": "pipeline integration pending"} except Exception as e: print(f" → Pipeline error: {e}", file=sys.stderr) # Return success to prevent Cloudflare retry storm return {"action": "error", "error": str(e)} # --------------------------------------------------------------------------- # HTML → plain text (stdlib only, per architecture rules) # --------------------------------------------------------------------------- def _html_to_text(html: str) -> str: """Strip HTML tags using stdlib html.parser — no external deps.""" from html.parser import HTMLParser class _Stripper(HTMLParser): def __init__(self): super().__init__() self._parts: list[str] = [] def handle_data(self, data): self._parts.append(data) def get_text(self): return " ".join(self._parts) s = _Stripper() s.feed(html) return s.get_text() # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.get("/health") async def health(): return { "status": "ok", "timestamp": datetime.now(timezone.utc).isoformat(), "dedup_cache_size": len(_dedup), } @app.post("/webhook") async def email_webhook( request: Request, x_hoffdesk_secret: Optional[str] = Header(None, alias="X-Hoffdesk-Secret"), ): """Receive email push from Cloudflare Worker.""" # --- Auth --- if not WEBHOOK_SECRET: raise HTTPException(503, detail="Webhook not configured") if x_hoffdesk_secret != WEBHOOK_SECRET: ts = datetime.now(timezone.utc).isoformat() print(f"[{ts}] REJECTED: invalid X-Hoffdesk-Secret", file=sys.stderr) raise HTTPException(401, detail="Unauthorized") # --- Parse payload --- try: payload = await request.json() except Exception as e: raise HTTPException(400, detail=f"Invalid JSON: {e}") for field in ("from", "to", "subject", "raw_email"): if field not in payload: raise HTTPException(400, detail=f"Missing: {field}") ts = datetime.now(timezone.utc).isoformat() subj = payload["subject"][:60] print(f"[{ts}] ← {payload['from']} | {subj}", flush=True) # --- Dedup --- dedup_key = payload.get("dedup_key") or hashlib.sha256( f"{payload['from']}|{payload['subject']}|{payload.get('message_id','')}".encode() ).hexdigest()[:32] if _is_duplicate(dedup_key): print(f" → Dedup skip: {dedup_key[:16]}…", flush=True) return JSONResponse({"status": "dedup", "dedup_key": dedup_key}) _mark_processed(dedup_key) # --- Parse email --- parsed = _parse_raw_email(payload["raw_email"]) # HTML → text fallback body = parsed["body_text"] if not body.strip() and parsed["body_html"]: body = _html_to_text(parsed["body_html"]) # --- Build internal email record --- email_data = { "message_id": payload.get("message_id", ""), "from": payload["from"], "to": payload["to"], "subject": payload["subject"], "date": payload.get("date", ""), "body_text": body, "body_html": parsed["body_html"], "has_attachments": parsed["has_attachments"], "attachments": parsed["attachments"], "dedup_key": dedup_key, "source": "webhook", } # --- Pipeline --- result = _run_pipeline(email_data) print(f" → {result.get('action', 'unknown')}", flush=True) return JSONResponse({"status": "ok", **result}) # --------------------------------------------------------------------------- # Entry point (dev/test; production uses systemd + uvicorn) # --------------------------------------------------------------------------- if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")