"""Family automation pipeline v2 β Classification + Routing."""
import logging
from typing import Dict, Any
from datetime import datetime
from shared.llm import LLMClient
from shared.notify import TelegramNotifier
from family.calendar import CalendarClient
from family.classifier import EmailClassifier
from family.handlers.appointment import AppointmentHandler
from family.handlers.newsletter import NewsletterHandler
from family.handlers.family import FamilyHandler
from family.handlers.other import OtherHandler
logger = logging.getLogger(name)
class FamilyPipeline:
"""Orchestrate email β classify β route β process β notify."""
def __init__(
self,
llm_client: LLMClient,
calendar_client: CalendarClient,
telegram: TelegramNotifier
):
self.llm = llm_client
self.calendar = calendar_client
self.telegram = telegram
self.classifier = EmailClassifier(llm_client)
# Initialize handlers
self.handlers = {
"appointment": AppointmentHandler(llm_client, calendar_client, telegram),
"newsletter": NewsletterHandler(llm_client, calendar_client, telegram),
"family": FamilyHandler(llm_client, calendar_client, telegram),
"other": OtherHandler(telegram),
}
async def process_email(
self,
subject: str,
body: str,
sender: str,
received_at: str
) -> Dict[str, Any]:
"""Process email through classification + routing pipeline.
Steps:
1. Classify email type
2. Route to appropriate handler
3. Process with handler
4. Log results
Returns:
Dict with classification, handler results
"""
result = {
"classification": None,
"handler": None,
"processed": False,
"errors": []
}
# Step 1: Classify
try:
email_type = await self.classifier.classify(subject, body, sender)
result["classification"] = email_type
logger.info(f"Email classified as '{email_type}': {subject[:50]}...")
except Exception as e:
logger.error(f"Classification failed: {e}")
result["errors"].append(f"Classification failed: {e}")
email_type = "other" # Fallback
result["classification"] = email_type
# Step 2: Route to handler
handler = self.handlers.get(email_type)
if not handler:
logger.warning(f"No handler for '{email_type}', using 'other'")
handler = self.handlers["other"]
result["classification"] = f"{email_type}βother"
result["handler"] = email_type
# Step 3: Process
try:
handler_result = await handler.process(subject, body, sender, received_at)
result.update(handler_result)
result["processed"] = True
logger.info(f"Handler '{email_type}' complete: {subject[:50]}...")
except Exception as e:
logger.error(f"Handler '{email_type}' failed: {e}")
result["errors"].append(f"Handler failed: {e}")
# Fallback: send basic notification
try:
await self.telegram.to_family(
f"π§ <b>Email Processing Error</b>\n\n"
f"Type: {email_type}\n"
f"Subject: {subject[:100]}\n"
f"Error: {str(e)[:200]}"
)
except:
pass
return result
async def health(self) -> Dict[str, Any]:
"""Check all pipeline components."""
return {
"llm": await self.llm.health(),
"calendar": await self.calendar.health(),
"telegram": bool(self.telegram.client),
"classifier": True, # No external deps
"handlers": list(self.handlers.keys())
}