"""Family automation pipeline v2 — Classification + Routing.""" import logging from typing import Dict, Any from datetime import datetime from shared.llm import LLMClient from shared.notify import TelegramNotifier from family.calendar import CalendarClient from family.classifier import EmailClassifier from family.handlers.appointment import AppointmentHandler from family.handlers.newsletter import NewsletterHandler from family.handlers.family import FamilyHandler from family.handlers.other import OtherHandler logger = logging.getLogger(__name__) class FamilyPipeline: """Orchestrate email → classify → route → process → notify.""" def __init__( self, llm_client: LLMClient, calendar_client: CalendarClient, telegram: TelegramNotifier ): self.llm = llm_client self.calendar = calendar_client self.telegram = telegram self.classifier = EmailClassifier(llm_client) # Initialize handlers self.handlers = { "appointment": AppointmentHandler(llm_client, calendar_client, telegram), "newsletter": NewsletterHandler(llm_client, calendar_client, telegram), "family": FamilyHandler(llm_client, calendar_client, telegram), "other": OtherHandler(telegram), } async def process_email( self, subject: str, body: str, sender: str, received_at: str ) -> Dict[str, Any]: """Process email through classification + routing pipeline. Steps: 1. Classify email type 2. Route to appropriate handler 3. Process with handler 4. Log results Returns: Dict with classification, handler results """ result = { "classification": None, "handler": None, "processed": False, "errors": [] } # Step 1: Classify try: email_type = await self.classifier.classify(subject, body, sender) result["classification"] = email_type logger.info(f"Email classified as '{email_type}': {subject[:50]}...") except Exception as e: logger.error(f"Classification failed: {e}") result["errors"].append(f"Classification failed: {e}") email_type = "other" # Fallback result["classification"] = email_type # Step 2: Route to handler handler = self.handlers.get(email_type) if not handler: logger.warning(f"No handler for '{email_type}', using 'other'") handler = self.handlers["other"] result["classification"] = f"{email_type}→other" result["handler"] = email_type # Step 3: Process try: handler_result = await handler.process(subject, body, sender, received_at) result.update(handler_result) result["processed"] = True logger.info(f"Handler '{email_type}' complete: {subject[:50]}...") except Exception as e: logger.error(f"Handler '{email_type}' failed: {e}") result["errors"].append(f"Handler failed: {e}") # Fallback: send basic notification try: await self.telegram.to_family( f"📧 Email Processing Error\n\n" f"Type: {email_type}\n" f"Subject: {subject[:100]}\n" f"Error: {str(e)[:200]}" ) except: pass return result async def health(self) -> Dict[str, Any]: """Check all pipeline components.""" return { "llm": await self.llm.health(), "calendar": await self.calendar.health(), "telegram": bool(self.telegram.client), "classifier": True, # No external deps "handlers": list(self.handlers.keys()) }