📄 email_webhook.py 9,439 bytes Apr 26, 2026 📋 Raw

"""
Family Assistant Webhook — FastAPI listener on localhost:5000
Receives push emails from Cloudflare Email Worker via hook.hoffdesk.com tunnel.

Auth: X-Hoffdesk-Secret header must match WEBHOOK_SECRET in .env
"""

from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.responses import JSONResponse
import uvicorn
import os
import sys
import json
import hashlib
import time
from typing import Optional
from datetime import datetime, timezone
from email import message_from_string
from email.policy import default as default_policy

Bootstrap icarus.core imports

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(file))))

app = FastAPI(title="HoffDesk Webhook", version="1.0.0")

---------------------------------------------------------------------------

Config

---------------------------------------------------------------------------

def _load_secret() -> str:
"""Load WEBHOOK_SECRET from env or .env file."""
secret = os.environ.get("WEBHOOK_SECRET", "")
if secret:
return secret
env_path = os.path.join(os.path.dirname(os.path.dirname(file)), ".env")
if os.path.exists(env_path):
with open(env_path) as f:
for line in f:
if line.startswith("WEBHOOK_SECRET="):
return line.strip().split("=", 1)[1].strip()
return ""

WEBHOOK_SECRET = _load_secret()
if not WEBHOOK_SECRET:
print("WARN: WEBHOOK_SECRET not set — all requests will be rejected", file=sys.stderr)

---------------------------------------------------------------------------

In-memory dedup (24 h TTL, max 2048 keys)

---------------------------------------------------------------------------

_dedup: dict[str, float] = {}
_DEDUP_TTL = 86400 # 24 hours
_DEDUP_MAX = 2048

def _is_duplicate(key: str) -> bool:
now = time.time()
ts = _dedup.get(key)
if ts is None:
return False
if now - ts > _DEDUP_TTL:
del _dedup[key]
return False
return True

def _mark_processed(key: str):
now = time.time()
_dedup[key] = now
# Evict oldest entries if cache is full
if len(_dedup) > _DEDUP_MAX:
cutoff = now - _DEDUP_TTL
expired = [k for k, v in _dedup.items() if v < cutoff]
for k in expired[:_DEDUP_MAX // 2]:
del _dedup[k]

---------------------------------------------------------------------------

Email parsing

---------------------------------------------------------------------------

def _parse_raw_email(raw: str) -> dict:
"""Parse RFC 5322 raw email into structured dict."""
msg = message_from_string(raw, policy=default_policy)

body_text = ""
body_html = ""
attachments = []

if msg.is_multipart():
    for part in msg.walk():
        cd = part.get("Content-Disposition", "")
        ct = part.get_content_type()
        if "attachment" in cd:
            fn = part.get_filename()
            if fn:
                payload = part.get_payload(decode=True)
                attachments.append({
                    "filename": fn,
                    "content_type": ct,
                    "size": len(payload) if payload else 0,
                })
        elif ct == "text/plain" and not body_text:
            body_text = part.get_content() or ""
        elif ct == "text/html" and not body_html:
            body_html = part.get_content() or ""
else:
    body_text = msg.get_content() or ""

# Fallback: extract body from raw if parser returned empty
if not body_text and not body_html:
    # Simple heuristic: everything after first blank line
    parts = raw.split("\n\n", 1)
    if len(parts) == 2:
        body_text = parts[1][:50000]  # 50K char safety limit

return {
    "body_text": body_text[:50000],
    "body_html": body_html[:50000],
    "attachments": attachments,
    "has_attachments": len(attachments) > 0,
}

---------------------------------------------------------------------------

Pipeline bridge with ingress spooling

---------------------------------------------------------------------------

def _run_pipeline(email_data: dict, log_id: int = None) -> dict:
"""
Hand the parsed email to the family assistant pipeline.
Falls back gracefully if pipeline is unavailable.
"""
from icarus.core.ingress_spooler import mark_processing_started, mark_processing_completed

if log_id:
    mark_processing_started(log_id)

try:
    from icarus.core import pipeline
    result = pipeline.process_webhook_email(email_data, notify=True)

    if log_id:
        mark_processing_completed(log_id, result="success")

    return {"action": "processed", "result": result}
except AttributeError:
    print("  → pipeline.process_webhook_email() not yet implemented — email queued", flush=True)
    if log_id:
        mark_processing_completed(log_id, result="pending", error_message="pipeline not implemented")
    return {"action": "accepted", "note": "pipeline integration pending"}
except Exception as e:
    print(f"  → Pipeline error: {e}", file=sys.stderr)
    if log_id:
        mark_processing_completed(log_id, result="error", error_message=str(e))
    return {"action": "error", "error": str(e)}

---------------------------------------------------------------------------

HTML → plain text (stdlib only, per architecture rules)

---------------------------------------------------------------------------

def _html_to_text(html: str) -> str:
"""Strip HTML tags using stdlib html.parser — no external deps."""
from html.parser import HTMLParser

class _Stripper(HTMLParser):
    def __init__(self):
        super().__init__()
        self._parts: list[str] = []
    def handle_data(self, data):
        self._parts.append(data)
    def get_text(self):
        return " ".join(self._parts)

s = _Stripper()
s.feed(html)
return s.get_text()

---------------------------------------------------------------------------

Routes

---------------------------------------------------------------------------

@app.get("/health")
async def health():
return {
"status": "ok",
"timestamp": datetime.now(timezone.utc).isoformat(),
"dedup_cache_size": len(_dedup),
}

@app.post("/webhook")
async def email_webhook(
request: Request,
x_hoffdesk_secret: Optional[str] = Header(None, alias="X-Hoffdesk-Secret"),
):
"""Receive email push from Cloudflare Worker."""

# --- Auth ---
if not WEBHOOK_SECRET:
    raise HTTPException(503, detail="Webhook not configured")

if x_hoffdesk_secret != WEBHOOK_SECRET:
    ts = datetime.now(timezone.utc).isoformat()
    print(f"[{ts}] REJECTED: invalid X-Hoffdesk-Secret", file=sys.stderr)
    raise HTTPException(401, detail="Unauthorized")

# --- Parse payload ---
try:
    payload = await request.json()
except Exception as e:
    raise HTTPException(400, detail=f"Invalid JSON: {e}")

for field in ("from", "to", "subject", "raw_email"):
    if field not in payload:
        raise HTTPException(400, detail=f"Missing: {field}")

ts = datetime.now(timezone.utc).isoformat()
subj = payload["subject"][:60]
print(f"[{ts}] ← {payload['from']} | {subj}", flush=True)

# --- Ingress Spooling: Log immediately on arrival ---
from icarus.core.ingress_spooler import log_ingress
log_id = log_ingress(
    raw_payload=payload,
    source="webhook"
)
print(f"  → Logged to ingress spool: ID {log_id}", flush=True)

# --- Dedup ---
dedup_key = payload.get("dedup_key") or hashlib.sha256(
    f"{payload['from']}|{payload['subject']}|{payload.get('message_id','')}".encode()
).hexdigest()[:32]

if _is_duplicate(dedup_key):
    print(f"  → Dedup skip: {dedup_key[:16]}…", flush=True)
    return JSONResponse({"status": "dedup", "dedup_key": dedup_key})

_mark_processed(dedup_key)

# --- Parse email ---
parsed = _parse_raw_email(payload["raw_email"])

# HTML → text fallback
body = parsed["body_text"]
if not body.strip() and parsed["body_html"]:
    body = _html_to_text(parsed["body_html"])

# --- Build internal email record ---
email_data = {
    "message_id": payload.get("message_id", ""),
    "from": payload["from"],
    "to": payload["to"],
    "subject": payload["subject"],
    "date": payload.get("date", ""),
    "body_text": body,
    "body_html": parsed["body_html"],
    "has_attachments": parsed["has_attachments"],
    "attachments": parsed["attachments"],
    "dedup_key": dedup_key,
    "source": "webhook",
    "ingress_log_id": log_id,
}

# --- Pipeline ---
result = _run_pipeline(email_data, log_id=log_id)

print(f"  → {result.get('action', 'unknown')}", flush=True)
return JSONResponse({"status": "ok", **result})

---------------------------------------------------------------------------

Entry point (dev/test; production uses systemd + uvicorn)

---------------------------------------------------------------------------

if name == "main":
uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")