"""Public webhook receiver for Cloudflare Email Worker. Accepts Cloudflare Worker format, translates to v2 pipeline, returns simple JSON. No auth required — Cloudflare Worker handles auth via WEBHOOK_SECRET header. """ import os import sys import logging from datetime import datetime from typing import Optional from fastapi import APIRouter, Request, HTTPException, Header from fastapi.responses import JSONResponse # Add parent to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from shared.llm import LLMClient from shared.notify import TelegramNotifier from family.calendar import CalendarClient from family.pipeline import FamilyPipeline logger = logging.getLogger(__name__) # Public webhook router (no auth) webhook_router = APIRouter(tags=["webhook"]) # Initialize services (shared with family router) llm_client = LLMClient() telegram = TelegramNotifier() calendar = CalendarClient() pipeline = FamilyPipeline(llm_client, calendar, telegram) WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "") def _extract_body_from_raw(raw_email: str) -> str: """Extract plain text body from raw RFC 5322 email.""" # Simple extraction: find first blank line, everything after is body parts = raw_email.split("\n\n", 1) if len(parts) > 1: body = parts[1] # Strip HTML tags if present if " str: """Strip HTML tags using stdlib html.parser.""" from html.parser import HTMLParser class _Stripper(HTMLParser): def __init__(self): super().__init__() self._parts = [] def handle_data(self, data): self._parts.append(data) def get_text(self): return " ".join(self._parts) s = _Stripper() try: s.feed(html) return s.get_text() except: return html @webhook_router.post("/webhook") async def cloudflare_webhook( request: Request, x_hoffdesk_secret: Optional[str] = Header(None, alias="X-Hoffdesk-Secret") ): """Receive email from Cloudflare Email Worker. Expected payload (from Cloudflare Worker): { "from": "sender@example.com", "to": "assistant@hoffdesk.com", "subject": "Email subject", "date": "...", "message_id": "...", "raw_email": "Full RFC 5322 email..." } """ # Auth check if not WEBHOOK_SECRET: raise HTTPException(503, detail="Webhook not configured") if x_hoffdesk_secret != WEBHOOK_SECRET: logger.warning("Rejected: invalid X-Hoffdesk-Secret") raise HTTPException(401, detail="Unauthorized") # Parse payload try: payload = await request.json() except Exception as e: raise HTTPException(400, detail=f"Invalid JSON: {e}") # Validate required fields for field in ("from", "to", "subject", "raw_email"): if field not in payload: raise HTTPException(400, detail=f"Missing: {field}") # Extract body from raw email body = _extract_body_from_raw(payload["raw_email"]) logger.info(f"Webhook received: {payload['subject'][:50]} from {payload['from']}") # Process through v2 pipeline try: result = await pipeline.process_email( subject=payload["subject"], body=body, sender=payload["from"], received_at=payload.get("date", datetime.now().isoformat()) ) return JSONResponse({ "status": "ok", "classification": result.get("classification"), "event_created": result.get("event_created", False), "events_created": result.get("events_created", []), "notification_sent": result.get("notification_sent", False), "handler": result.get("handler"), "summary": result.get("summary", "")[:200], "timestamp": datetime.now().isoformat() }) except Exception as e: logger.exception("Pipeline failed") # Return success to prevent Cloudflare retry storm return JSONResponse({ "status": "error", "error": str(e)[:200], "timestamp": datetime.now().isoformat() }, status_code=200) @webhook_router.get("/health") async def webhook_health(): """Health check for webhook endpoint.""" return { "status": "ok", "webhook_ready": bool(WEBHOOK_SECRET), "timestamp": datetime.now().isoformat() } @webhook_router.post("/telegram/callback") async def telegram_callback(request: Request): """Handle Telegram callback queries (inline button clicks). Processes: - remove_event:{uid} → Delete calendar event """ try: payload = await request.json() except Exception as e: raise HTTPException(400, detail=f"Invalid JSON: {e}") # Handle callback query callback_query = payload.get("callback_query") if not callback_query: return JSONResponse({"ok": True}) # Not a callback, ignore callback_data = callback_query.get("data", "") chat_id = callback_query.get("message", {}).get("chat", {}).get("id") message_id = callback_query.get("message", {}).get("message_id") logger.info(f"Telegram callback: {callback_data} from chat {chat_id}") # Process remove_event callback if callback_data.startswith("remove_event:"): event_uid = callback_data.split(":", 1)[1] try: from family.calendar import CalendarClient from family.removal_tracker import RemovalTracker calendar = CalendarClient() tracker = RemovalTracker() # Delete the event delete_result = await calendar.delete_event(event_uid) # Log the removal for dashboard if delete_result.get("deleted"): # Extract event info from the callback message if possible message_text = callback_query.get("message", {}).get("text", "") summary = "Unknown" start_time = "" # Try to parse summary from message for line in message_text.split("\n"): if line.startswith("•") or (" at " in line and ":" in line): summary = line.strip("• ").strip() break await tracker.log_removal( uid=event_uid, summary=summary, start_time=start_time, removed_by=f"telegram_user_{chat_id}", source="newsletter", reason="User clicked REMOVE button" ) # Answer the callback (removes loading state) await _answer_callback_query(callback_query.get("id"), "Event removed from calendar" if delete_result.get("deleted") else "Failed to remove event") # Update the message to show it's been removed if delete_result.get("deleted"): await _edit_message_text(chat_id, message_id, callback_query["message"]["text"] + "\n\n❌ REMOVED") return JSONResponse({"ok": True}) except Exception as e: logger.error(f"Failed to remove event {event_uid}: {e}") await _answer_callback_query(callback_query.get("id"), "Error removing event") return JSONResponse({"ok": False, "error": str(e)}) return JSONResponse({"ok": True}) async def _answer_callback_query(callback_query_id: str, text: str) -> None: """Answer a Telegram callback query.""" import httpx import os bot_token = os.getenv("TELEGRAM_BOT_TOKEN") if not bot_token or not callback_query_id: return async with httpx.AsyncClient() as client: await client.post( f"https://api.telegram.org/bot{bot_token}/answerCallbackQuery", json={"callback_query_id": callback_query_id, "text": text} ) async def _edit_message_text(chat_id: int, message_id: int, text: str) -> None: """Edit a Telegram message.""" import httpx import os bot_token = os.getenv("TELEGRAM_BOT_TOKEN") if not bot_token or not chat_id or not message_id: return async with httpx.AsyncClient() as client: await client.post( f"https://api.telegram.org/bot{bot_token}/editMessageText", json={ "chat_id": chat_id, "message_id": message_id, "text": text, "parse_mode": "HTML" } ) @webhook_router.get("/family/events/removed") async def get_removed_events( hours: int = 24, limit: int = 20, x_hoffdesk_secret: Optional[str] = Header(None, alias="X-Hoffdesk-Secret") ): """Get recently removed calendar events. Query params: hours: How far back to look (default 24) limit: Max results (default 20) Returns: List of removal records with metadata """ # Auth check if not WEBHOOK_SECRET: raise HTTPException(503, detail="Not configured") if x_hoffdesk_secret != WEBHOOK_SECRET: raise HTTPException(401, detail="Unauthorized") from family.removal_tracker import RemovalTracker tracker = RemovalTracker() removals = tracker.get_recent_removals(hours=hours, limit=limit) return { "removals": removals, "count": len(removals), "hours": hours, "timestamp": datetime.now().isoformat() }