πŸ“„ pipeline.py 4,050 bytes Apr 24, 2026 πŸ“‹ Raw

"""Family automation pipeline v2 β€” Classification + Routing."""

import logging
from typing import Dict, Any
from datetime import datetime

from shared.llm import LLMClient
from shared.notify import TelegramNotifier
from family.calendar import CalendarClient
from family.classifier import EmailClassifier
from family.handlers.appointment import AppointmentHandler
from family.handlers.newsletter import NewsletterHandler
from family.handlers.family import FamilyHandler
from family.handlers.other import OtherHandler

logger = logging.getLogger(name)

class FamilyPipeline:
"""Orchestrate email β†’ classify β†’ route β†’ process β†’ notify."""

def __init__(
    self,
    llm_client: LLMClient,
    calendar_client: CalendarClient,
    telegram: TelegramNotifier
):
    self.llm = llm_client
    self.calendar = calendar_client
    self.telegram = telegram
    self.classifier = EmailClassifier(llm_client)

    # Initialize handlers
    self.handlers = {
        "appointment": AppointmentHandler(llm_client, calendar_client, telegram),
        "newsletter": NewsletterHandler(llm_client, calendar_client, telegram),
        "family": FamilyHandler(llm_client, calendar_client, telegram),
        "other": OtherHandler(telegram),
    }

async def process_email(
    self,
    subject: str,
    body: str,
    sender: str,
    received_at: str
) -> Dict[str, Any]:
    """Process email through classification + routing pipeline.

    Steps:
    1. Classify email type
    2. Route to appropriate handler
    3. Process with handler
    4. Log results

    Returns:
        Dict with classification, handler results
    """
    result = {
        "classification": None,
        "handler": None,
        "processed": False,
        "errors": []
    }

    # Step 1: Classify
    try:
        email_type = await self.classifier.classify(subject, body, sender)
        result["classification"] = email_type
        logger.info(f"Email classified as '{email_type}': {subject[:50]}...")
    except Exception as e:
        logger.error(f"Classification failed: {e}")
        result["errors"].append(f"Classification failed: {e}")
        email_type = "other"  # Fallback
        result["classification"] = email_type

    # Step 2: Route to handler
    handler = self.handlers.get(email_type)
    if not handler:
        logger.warning(f"No handler for '{email_type}', using 'other'")
        handler = self.handlers["other"]
        result["classification"] = f"{email_type}β†’other"

    result["handler"] = email_type

    # Step 3: Process
    try:
        handler_result = await handler.process(subject, body, sender, received_at)
        result.update(handler_result)
        result["processed"] = True
        logger.info(f"Handler '{email_type}' complete: {subject[:50]}...")
    except Exception as e:
        logger.error(f"Handler '{email_type}' failed: {e}")
        result["errors"].append(f"Handler failed: {e}")

        # Fallback: send basic notification
        try:
            await self.telegram.to_family(
                f"πŸ“§ <b>Email Processing Error</b>\n\n"
                f"Type: {email_type}\n"
                f"Subject: {subject[:100]}\n"
                f"Error: {str(e)[:200]}"
            )
        except:
            pass

    return result

async def health(self) -> Dict[str, Any]:
    """Check all pipeline components."""
    return {
        "llm": await self.llm.health(),
        "calendar": await self.calendar.health(),
        "telegram": bool(self.telegram.client),
        "classifier": True,  # No external deps
        "handlers": list(self.handlers.keys())
    }