📄 newsletter.py 13,651 bytes Apr 25, 2026 📋 Raw

"""Newsletter handler — Extract events → Calendar + Telegram with remove buttons."""

import logging
import re
import os
from typing import Dict, Any, List, Optional
from datetime import datetime
from shared.llm import LLMClient
from shared.notify import TelegramNotifier
from family.calendar import CalendarClient
from family.email import EmailProcessor
from shared.url_fetcher import enrich_body_with_urls, extract_urls

logger = logging.getLogger(name)

NEWSLETTER_SUMMARY_PROMPT = """Summarize this school newsletter in 3-4 bullets.

Be brief. No introductions. Just the highlights.

Newsletter:
{content}

Format:
📰

• Key point 1
• Key point 2
• Key point 3 (if relevant)

Rules:
- No filler like "This newsletter covers..."
- No "In this edition..." intros
- Just bullet the actual content"""

NEWSLETTER_EVENT_EXTRACTION_PROMPT = """Extract calendar events from this newsletter.

Newsletters often contain event dates (spirit days, fundraisers, meetings, etc.).
Extract any events mentioned with specific dates/times.

Newsletter:
{content}

Respond with a JSON ARRAY of events like this example:
[
{{
"summary": "Event title here",
"description": "Full context including source newsletter",
"start_datetime": "2026-04-25T09:00:00",
"end_datetime": "2026-04-25T10:00:00",
"location": "Location or TBD",
"confidence": 0.85
}}
]

If no events found, return: []

Rules:
- ALWAYS return an ARRAY (square brackets), even for single event
- Use nearest logical future date (assume current year 2026, not past years)
- If time not specified, assume 9:00 AM or time implied by context
- Multi-day events (like Spirit Week): set start/end covering full range
- Specific meetings with times = high confidence (0.8+)
- Vague mentions = low confidence (0.5-0.7)

Response: JSON array only"""

Helper function outside class

def _grade_to_str(grade_num: int) -> str:
"""Convert numeric grade to readable string."""
if grade_num == -1:
return "Pre-K"
elif grade_num == 0:
return "K"
else:
return f"{grade_num}th"

class NewsletterHandler:
"""Handle newsletter emails: extract events, create calendar, summarize, notify."""

def __init__(
    self,
    llm_client: LLMClient,
    calendar_client: CalendarClient,
    telegram: TelegramNotifier
):
    self.llm = llm_client
    self.calendar = calendar_client
    self.telegram = telegram
    self.email_processor = EmailProcessor(llm_client)

def _load_family_grades(self) -> Dict[str, int]:
    """Load children's grades from environment or config."""
    # Try to load from family config or environment
    # Format: "Sullivan:1,Harper:-1"
    grades_str = os.getenv("FAMILY_GRADES", "")
    if grades_str:
        grades = {}
        for part in grades_str.split(","):
            if ":" in part:
                name, grade = part.split(":", 1)
                try:
                    grades[name.strip()] = int(grade.strip())
                except ValueError:
                    pass
        return grades

    # Default fallback
    return {"Sullivan": 1, "Harper": -1}

def _validate_grade_relevance(self, event: Dict) -> Optional[str]:
    """Check if event matches family grades. Returns reason if should skip."""
    summary = event.get("summary", "")
    description = event.get("description", "")
    text = f"{summary} {description}".lower()

    child_grades = self._load_family_grades()
    if not child_grades:
        return None  # Can't validate, allow through

    # Grade range patterns: (regex, min_grade, max_grade, label)
    grade_patterns = [
        # Ranges first (more specific)
        (r'\bk[- ]?2\b|\bk\s*to\s*2\b', -1, 2, "K-2"),
        (r'\bk[- ]?3\b|\bk\s*to\s*3\b', -1, 3, "K-3"),
        (r'\bk[- ]?5\b|\bk\s*to\s*5\b|\belementary\b', -1, 5, "K-5/Elementary"),
        (r'\b1[- ]?3\b|\b1st[- ]?3rd\b', 1, 3, "grades 1-3"),
        (r'\b3[- ]?5\b|\b3rd[- ]?5th\b', 3, 5, "grades 3-5"),
        (r'\bmiddle\s+school\b', 6, 8, "middle school"),
        # Specific grades after ranges
        (r'\b5th\s+grade\b|\bfifth\s+grade\b', 5, 5, "5th grade"),
        (r'\b4th\s+grade\b|\bfourth\s+grade\b', 4, 4, "4th grade"),
        (r'\b3rd\s+grade\b|\bthird\s+grade\b', 3, 3, "3rd grade"),
        (r'\b2nd\s+grade\b|\bsecond\s+grade\b', 2, 2, "2nd grade"),
        (r'\b1st\s+grade\b|\bfirst\s+grade\b', 1, 1, "1st grade"),
        (r'\bkindergarten\b', 0, 0, "Kindergarten"),
        (r'\bpre[- ]?k\b|\bpreschool\b', -1, -1, "Pre-K"),
    ]

    for pattern, min_grade, max_grade, label in grade_patterns:
        if re.search(pattern, text):
            # Check if any child falls in this range
            matching = [name for name, grade in child_grades.items()
                       if min_grade <= grade <= max_grade]
            if not matching:
                grade_str = ", ".join([f"{n}: {_grade_to_str(g)}" for n, g in sorted(child_grades.items())])
                return f"No child in {label}. Current: {grade_str}"
            return None  # Grade matches, allow

    return None  # No grade pattern found, allow

async def process(
    self,
    subject: str,
    body: str,
    sender: str,
    received_at: str
) -> Dict[str, Any]:
    """Process newsletter email.

    Flow:
    1. Extract events from newsletter
    2. Create calendar events (with metadata for removal)
    3. Send event notifications with REMOVE buttons
    4. Generate and send newsletter summary

    Returns:
        Dict with summary, events_created, notification_sent
    """
    result = {
        "type": "newsletter",
        "summary": "",
        "events_created": [],
        "notification_sent": False,
        "errors": []
    }

    # Step 0: Enrich body with URL content (for Smore links, etc.)
    enriched_body = body
    urls = extract_urls(body)
    if urls and len(body) < 200:  # Short body with URL = fetch the content
        logger.info(f"Fetching {len(urls)} URL(s) for enrichment")
        enriched_body = enrich_body_with_urls(body, max_urls=1)
        if len(enriched_body) > len(body):
            logger.info(f"Body enriched: {len(body)} → {len(enriched_body)} chars")

    # Step 1: Extract events from enriched newsletter
    events = await self._extract_events(subject, enriched_body, sender)

    # Step 2: Filter by grade relevance + create calendar events
    created_events = []
    skipped_events = []  # Track grade-filtered events
    for event in events:
        if event.get("confidence", 0) < 0.5:
            continue

        # Check grade relevance
        skip_reason = self._validate_grade_relevance(event)
        if skip_reason:
            skipped_events.append({
                "summary": event.get("summary", "?"),
                "reason": skip_reason
            })
            logger.info(f"Grade filtered: {event.get('summary', '?')} - {skip_reason}")
            continue

        try:
            calendar_result = await self.calendar.create_event(
                summary=event["summary"],
                start_datetime=event["start_datetime"],
                end_datetime=event["end_datetime"],
                description=event.get("description", f"From newsletter: {subject}"),
                location=event.get("location", "")
            )

            if calendar_result.get("created"):
                event_data = {
                    "uid": calendar_result.get("uid"),
                    "summary": event["summary"],
                    "start": event["start_datetime"]
                }
                created_events.append(event_data)

                # Send notification with REMOVE button
                await self._notify_event_created(event_data, subject)
            else:
                result["errors"].append(f"Calendar failed for {event['summary']}")

        except Exception as e:
            logger.error(f"Failed to create event {event.get('summary')}: {e}")
            result["errors"].append(f"Event creation failed: {e}")

    result["events_created"] = created_events

    # Step 3: Generate summary
    content = enriched_body[:3000] if len(enriched_body) > 3000 else enriched_body
    try:
        summary = await self._summarize(subject, content, sender)
        result["summary"] = summary
    except Exception as e:
        logger.error(f"Newsletter summarization failed: {e}")
        result["errors"].append(f"Summary failed: {e}")
        summary = f"📰 <b>{subject}</b>\n\nFrom: {sender}\n\n[Summary unavailable]"

    # Step 4: Send summary with event list
    try:
        await self._notify_summary(summary, created_events, skipped_events, subject)
        result["notification_sent"] = True
        logger.info(f"Newsletter processed: {len(created_events)} events, {len(skipped_events)} skipped, summary sent: {subject[:50]}...")
    except Exception as e:
        logger.error(f"Failed to send newsletter notification: {e}")
        result["errors"].append(f"Notify failed: {e}")

    return result

async def _extract_events(self, subject: str, body: str, sender: str) -> List[Dict]:
    """Extract calendar events from newsletter using LLM."""
    try:
        prompt = NEWSLETTER_EVENT_EXTRACTION_PROMPT.format(
            content=f"Subject: {subject}\n\nFrom: {sender}\n\n{body[:4000]}"
        )

        result = await self.llm.generate(
            prompt=prompt,
            model="qwen2.5-coder:7b",
            format="json",
            temperature=0.2,
            prefer_cloud=False
        )

        import json
        events = json.loads(result["content"])
        if isinstance(events, list):
            logger.info(f"Extracted {len(events)} events from newsletter")
            return events
        elif isinstance(events, dict):
            # Check if it has event fields (single event returned as dict)
            if "summary" in events and "start_datetime" in events:
                logger.info(f"Extracted 1 event from newsletter (single object)")
                return [events]
            # Check for nested events key
            if "events" in events:
                return events["events"]
        return []

    except Exception as e:
        logger.error(f"Event extraction failed: {e}")
        return []

async def _notify_event_created(self, event: Dict, newsletter_subject: str) -> None:
    """Send Telegram notification for created event with REMOVE button."""
    # Format: [REMOVE] Event summary
    msg = f"🗓️ <b>Event Added from Newsletter</b>\n\n"
    msg += f"<b>{event['summary']}</b>\n"
    msg += f"🕐 {event['start']}\n"
    msg += f"📰 Source: {newsletter_subject[:50]}...\n\n"
    msg += f"❌ Don't want this event? Tap REMOVE below."

    # Inline keyboard with REMOVE button
    # Callback format: remove_event:{uid}
    reply_markup = {
        "inline_keyboard": [[
            {
                "text": "❌ REMOVE",
                "callback_data": f"remove_event:{event['uid']}"
            }
        ]]
    }

    await self.telegram.send_with_buttons(msg, reply_markup)

async def _notify_summary(self, summary: str, events: List[Dict], skipped_events: List[Dict], subject: str) -> None:
    """Send newsletter summary with event list and skipped items."""
    msg = summary + "\n\n"

    if events:
        msg += "<b>📅 Events Added to Calendar:</b>\n"
        for i, event in enumerate(events[:5], 1):  # Max 5 events
            msg += f"{i}. {event['summary']}\n"
        if len(events) > 5:
            msg += f"...and {len(events) - 5} more\n"
        msg += "\n❌ Tap REMOVE on individual events to delete them."
    else:
        msg += "<i>No calendar events detected in this newsletter.</i>"

    # Add skipped events section (low relevance/grade filtered)
    if skipped_events:
        msg += "\n\n<b>🔍 Skipped (not relevant):</b>\n"
        for i, skipped in enumerate(skipped_events[:3], 1):
            msg += f"{i}. {skipped['summary']}\n"
        if len(skipped_events) > 3:
            msg += f"...and {len(skipped_events) - 3} more\n"
        msg += "\n<i>These didn't match your children's grades.</i>"

    await self.telegram.to_family(msg)

async def _summarize(self, subject: str, body: str, sender: str) -> str:
    """Generate newsletter summary using LLM."""
    prompt = NEWSLETTER_SUMMARY_PROMPT.format(content=f"Subject: {subject}\n\n{body}")

    result = await self.llm.generate(
        prompt=prompt,
        model="qwen2.5-coder:7b",
        temperature=0.3,
        prefer_cloud=False
    )

    summary = result["content"].strip()

    # Add source attribution if not present
    if not summary.startswith("📰"):
        summary = f"📰 <b>{subject}</b>\n\n{summary}"

    return summary