"""Public webhook receiver for Cloudflare Email Worker.
Accepts Cloudflare Worker format, translates to v2 pipeline, returns simple JSON.
No auth required — Cloudflare Worker handles auth via WEBHOOK_SECRET header.
"""
import os
import sys
import logging
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Request, HTTPException, Header
from fastapi.responses import JSONResponse
Add parent to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(file))))
from shared.llm import LLMClient
from shared.notify import TelegramNotifier
from family.calendar import CalendarClient
from family.pipeline import FamilyPipeline
logger = logging.getLogger(name)
Public webhook router (no auth)
webhook_router = APIRouter(tags=["webhook"])
Initialize services (shared with family router)
llm_client = LLMClient()
telegram = TelegramNotifier()
calendar = CalendarClient()
pipeline = FamilyPipeline(llm_client, calendar, telegram)
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "")
def _extract_body_from_raw(raw_email: str) -> str:
"""Extract plain text body from raw RFC 5322 email."""
# Simple extraction: find first blank line, everything after is body
parts = raw_email.split("\n\n", 1)
if len(parts) > 1:
body = parts[1]
# Strip HTML tags if present
if "<html" in body.lower() or "<!doctype" in body.lower():
return _html_to_text(body)
return body[:2000] # Reasonable limit
return raw_email[:2000]
def _html_to_text(html: str) -> str:
"""Strip HTML tags using stdlib html.parser."""
from html.parser import HTMLParser
class _Stripper(HTMLParser):
def __init__(self):
super().__init__()
self._parts = []
def handle_data(self, data):
self._parts.append(data)
def get_text(self):
return " ".join(self._parts)
s = _Stripper()
try:
s.feed(html)
return s.get_text()
except:
return html
@webhook_router.post("/webhook")
async def cloudflare_webhook(
request: Request,
x_hoffdesk_secret: Optional[str] = Header(None, alias="X-Hoffdesk-Secret")
):
"""Receive email from Cloudflare Email Worker.
Expected payload (from Cloudflare Worker):
{
"from": "sender@example.com",
"to": "assistant@hoffdesk.com",
"subject": "Email subject",
"date": "...",
"message_id": "...",
"raw_email": "Full RFC 5322 email..."
}
"""
# Auth check
if not WEBHOOK_SECRET:
raise HTTPException(503, detail="Webhook not configured")
if x_hoffdesk_secret != WEBHOOK_SECRET:
logger.warning("Rejected: invalid X-Hoffdesk-Secret")
raise HTTPException(401, detail="Unauthorized")
# Parse payload
try:
payload = await request.json()
except Exception as e:
raise HTTPException(400, detail=f"Invalid JSON: {e}")
# Validate required fields
for field in ("from", "to", "subject", "raw_email"):
if field not in payload:
raise HTTPException(400, detail=f"Missing: {field}")
# Extract body from raw email
body = _extract_body_from_raw(payload["raw_email"])
logger.info(f"Webhook received: {payload['subject'][:50]} from {payload['from']}")
# Process through v2 pipeline
try:
result = await pipeline.process_email(
subject=payload["subject"],
body=body,
sender=payload["from"],
received_at=payload.get("date", datetime.now().isoformat())
)
return JSONResponse({
"status": "ok",
"classification": result.get("classification"),
"event_created": result.get("event_created", False),
"events_created": result.get("events_created", []),
"notification_sent": result.get("notification_sent", False),
"handler": result.get("handler"),
"summary": result.get("summary", "")[:200],
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.exception("Pipeline failed")
# Return success to prevent Cloudflare retry storm
return JSONResponse({
"status": "error",
"error": str(e)[:200],
"timestamp": datetime.now().isoformat()
}, status_code=200)
@webhook_router.get("/health")
async def webhook_health():
"""Health check for webhook endpoint."""
return {
"status": "ok",
"webhook_ready": bool(WEBHOOK_SECRET),
"timestamp": datetime.now().isoformat()
}
@webhook_router.post("/telegram/callback")
async def telegram_callback(request: Request):
"""Handle Telegram callback queries (inline button clicks).
Processes:
- remove_event:{uid} → Delete calendar event
"""
try:
payload = await request.json()
except Exception as e:
raise HTTPException(400, detail=f"Invalid JSON: {e}")
# Handle callback query
callback_query = payload.get("callback_query")
if not callback_query:
return JSONResponse({"ok": True}) # Not a callback, ignore
callback_data = callback_query.get("data", "")
chat_id = callback_query.get("message", {}).get("chat", {}).get("id")
message_id = callback_query.get("message", {}).get("message_id")
logger.info(f"Telegram callback: {callback_data} from chat {chat_id}")
# Process remove_event callback
if callback_data.startswith("remove_event:"):
event_uid = callback_data.split(":", 1)[1]
try:
from family.calendar import CalendarClient
from family.removal_tracker import RemovalTracker
calendar = CalendarClient()
tracker = RemovalTracker()
# Delete the event
delete_result = await calendar.delete_event(event_uid)
# Log the removal for dashboard
if delete_result.get("deleted"):
# Extract event info from the callback message if possible
message_text = callback_query.get("message", {}).get("text", "")
summary = "Unknown"
start_time = ""
# Try to parse summary from message
for line in message_text.split("\n"):
if line.startswith("•") or (" at " in line and ":" in line):
summary = line.strip("• ").strip()
break
await tracker.log_removal(
uid=event_uid,
summary=summary,
start_time=start_time,
removed_by=f"telegram_user_{chat_id}",
source="newsletter",
reason="User clicked REMOVE button"
)
# Answer the callback (removes loading state)
await _answer_callback_query(callback_query.get("id"),
"Event removed from calendar" if delete_result.get("deleted") else "Failed to remove event")
# Update the message to show it's been removed
if delete_result.get("deleted"):
await _edit_message_text(chat_id, message_id,
callback_query["message"]["text"] + "\n\n❌ <b>REMOVED</b>")
return JSONResponse({"ok": True})
except Exception as e:
logger.error(f"Failed to remove event {event_uid}: {e}")
await _answer_callback_query(callback_query.get("id"), "Error removing event")
return JSONResponse({"ok": False, "error": str(e)})
return JSONResponse({"ok": True})
async def _answer_callback_query(callback_query_id: str, text: str) -> None:
"""Answer a Telegram callback query."""
import httpx
import os
bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
if not bot_token or not callback_query_id:
return
async with httpx.AsyncClient() as client:
await client.post(
f"https://api.telegram.org/bot{bot_token}/answerCallbackQuery",
json={"callback_query_id": callback_query_id, "text": text}
)
async def _edit_message_text(chat_id: int, message_id: int, text: str) -> None:
"""Edit a Telegram message."""
import httpx
import os
bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
if not bot_token or not chat_id or not message_id:
return
async with httpx.AsyncClient() as client:
await client.post(
f"https://api.telegram.org/bot{bot_token}/editMessageText",
json={
"chat_id": chat_id,
"message_id": message_id,
"text": text,
"parse_mode": "HTML"
}
)
@webhook_router.get("/family/events/removed")
async def get_removed_events(
hours: int = 24,
limit: int = 20,
x_hoffdesk_secret: Optional[str] = Header(None, alias="X-Hoffdesk-Secret")
):
"""Get recently removed calendar events.
Query params:
hours: How far back to look (default 24)
limit: Max results (default 20)
Returns:
List of removal records with metadata
"""
# Auth check
if not WEBHOOK_SECRET:
raise HTTPException(503, detail="Not configured")
if x_hoffdesk_secret != WEBHOOK_SECRET:
raise HTTPException(401, detail="Unauthorized")
from family.removal_tracker import RemovalTracker
tracker = RemovalTracker()
removals = tracker.get_recent_removals(hours=hours, limit=limit)
return {
"removals": removals,
"count": len(removals),
"hours": hours,
"timestamp": datetime.now().isoformat()
}