๐Ÿ“„ family.py 3,283 bytes Apr 23, 2026 ๐Ÿ“‹ Raw

"""Family handler โ€” Calendar + Telegram for family emails."""

import logging
from typing import Dict, Any
from shared.llm import LLMClient
from shared.notify import TelegramNotifier
from family.calendar import CalendarClient
from family.email import EmailProcessor

logger = logging.getLogger(name)

class FamilyHandler:
"""Handle family emails: extract event if present, always notify."""

def __init__(
    self,
    llm_client: LLMClient,
    calendar_client: CalendarClient,
    telegram: TelegramNotifier
):
    self.llm = llm_client
    self.calendar = calendar_client
    self.telegram = telegram
    self.email_processor = EmailProcessor(llm_client)

async def process(
    self,
    subject: str,
    body: str,
    sender: str,
    received_at: str
) -> Dict[str, Any]:
    """Process family email.

    Tries to extract event (school, activities), always notifies.

    Returns:
        Dict with event_created, notification_sent, parsed_event
    """
    result = {
        "type": "family",
        "event_created": False,
        "notification_sent": False,
        "parsed": {},
        "errors": []
    }

    # Try to extract event details
    parsed = await self.email_processor.extract_event(subject, body)

    if parsed and parsed.get("confidence", 0) >= 0.5:
        result["parsed"] = parsed

        # Create calendar event
        calendar_result = await self.calendar.create_event(
            summary=parsed["summary"],
            start_datetime=parsed["start_datetime"],
            end_datetime=parsed["end_datetime"],
            description=parsed.get("description", f"From: {sender}\n\n{subject}"),
            location=parsed.get("location", "")
        )

        result["event_created"] = calendar_result.get("created", False)

        if not result["event_created"]:
            result["errors"].append(f"Calendar failed: {calendar_result.get('error', 'unknown')}")

    # Always notify about family emails
    await self._notify(subject, body, sender, parsed, result["event_created"])
    result["notification_sent"] = True

    return result

async def _notify(
    self,
    subject: str,
    body: str,
    sender: str,
    parsed: Dict[str, Any],
    event_created: bool
) -> None:
    """Send Telegram notification about family email."""
    msg = f"๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ <b>Family Email</b>\n\n"
    msg += f"<b>{subject}</b>\n"
    msg += f"From: {sender}\n"

    if parsed:
        event_time = parsed.get("start_datetime", "unknown")
        msg += f"\n๐Ÿ“… {parsed['summary']}\n"
        msg += f"๐Ÿ• {event_time}\n"
        if parsed.get("location"):
            msg += f"๐Ÿ“ {parsed['location']}\n"
        msg += f"\n{'โœ… Added to family calendar' if event_created else 'โš ๏ธ Calendar failed'}"
    else:
        msg += f"\nโ„น๏ธ No event details found โ€” informational email"

    await self.telegram.to_family(msg)