📄 newsletter_parser.py 24,750 bytes Apr 19, 2026 📋 Raw

"""Newsletter extraction via local LLM (Prompt-as-Code).

Uses Markdown-header chunking to avoid VRAM exhaustion on the local GPU.
Long newsletters are split at ## headings, each chunk is processed
separately, and results are merged/deduplicated at the end.
"""

import json
import os
import re
import sys
from datetime import datetime, timedelta, date as date_type
from zoneinfo import ZoneInfo

import requests

from family_assistant.config import (
LLM_URL,
LLM_MODEL,
LLM_NEWSLETTER_MODEL,
LLM_NEWSLETTER_URL,
LLM_TIMEOUT,
LLM_NEWSLETTER_TIMEOUT,
CHICAGO_TZ,
MAX_BODY_CHARS,
get_nickname_map,
load_prompts,
)

def _chunk_by_markdown_headers(text, max_chunk_chars=2000):
"""Split text at Markdown ##+ headers, respecting a max chunk size.

Yields (header, chunk_text) tuples. The header is the ## line (or '' for
the preamble before any header). Chunks that exceed max_chunk_chars after
splitting at headers are further split at paragraph boundaries (blank lines).

This keeps each LLM call small enough to fit in VRAM on the Gaming PC's
12GB 3080 Ti while preserving the semantic grouping of newsletter sections.
"""
if not text:
    return

# Split at ## (or deeper) headings
header_pattern = re.compile(r'^(#{1,6}\s+.+)$', re.MULTILINE)
splits = header_pattern.split(text)

# splits alternates: [preamble, header1, section1, header2, section2, ...]
chunks = []
if splits and not header_pattern.match(splits[0]):
    # Preamble before first header
    preamble = splits[0].strip()
    if preamble:
        chunks.append(("", preamble))
    splits = splits[1:]

# Pair up headers with their content
for i in range(0, len(splits) - 1, 2):
    header = splits[i].strip() if i < len(splits) else ""
    content = splits[i + 1].strip() if i + 1 < len(splits) else ""
    if content:
        chunks.append((header, content))

# If no headers found at all, treat the whole text as one chunk
if not chunks and text.strip():
    chunks.append(("", text.strip()))

# Further split oversized chunks at paragraph boundaries
final_chunks = []
for header, content in chunks:
    if len(content) <= max_chunk_chars:
        final_chunks.append((header, content))
    else:
        paragraphs = re.split(r'\n\s*\n', content)
        current = ""
        for para in paragraphs:
            para = para.strip()
            if not para:
                continue
            if len(current) + len(para) + 2 <= max_chunk_chars:
                current = current + "\n\n" + para if current else para
            else:
                if current:
                    final_chunks.append((header, current))
                current = para
        if current:
            final_chunks.append((header, current))

for header, content in final_chunks:
    yield header, content

def classify_email(subject, body, from_addr=""):
"""Classify an email as 'appointment' or 'newsletter'.

Returns one of: "appointment", "newsletter"
Falls back to "appointment" on errors.
"""
prompts = load_prompts()
system_msg = prompts["email_classify"]
trimmed_body = body[:MAX_BODY_CHARS] if body else ""
user_msg = f"Subject: {subject}\nFrom: {from_addr}\n\n{trimmed_body}"

raw = _call_llm(system_msg, user_msg)
parsed = _parse_json_response(raw)

if parsed and isinstance(parsed, dict):
    cls = parsed.get("classification", "appointment")
    if cls == "newsletter":
        return "newsletter"
# Default to appointment — simpler path, existing parser handles it
return "appointment"

def parse_newsletter_with_llm(subject, body, from_addr="", date_str=""):
"""Parse a newsletter email into structured items.

Uses Markdown-header chunking to preserve VRAM on the local GPU.
Each chunk is sent to the LLM separately, and results are merged
and deduplicated.

Returns a list of item dicts, each with a "type" key:
  "event", "reminder", "action_item", "info"
Each item also includes "relevance" ("high"|"low") and "reason" keys.
"""
prompts = load_prompts()
system_template = prompts["newsletter_extract"]
today = datetime.now(CHICAGO_TZ)
today_str = today.strftime("%Y-%m-%d")
today_day = today.strftime("%A")
system_msg = system_template.format(today=today_str, today_day=today_day)

# Use extended body limit for newsletters (URL-enriched bodies can be large)
trimmed_body = body[:MAX_BODY_CHARS * 4] if body else ""

# Chunk the body at Markdown headers to preserve VRAM
chunks = list(_chunk_by_markdown_headers(trimmed_body, max_chunk_chars=2000))

if not chunks:
    return []

all_items = []
newsletter_timeout = LLM_NEWSLETTER_TIMEOUT

# Process each chunk separately
for i, (header, chunk_content) in enumerate(chunks):
    chunk_label = f"chunk {i+1}/{len(chunks)}"
    if header:
        chunk_label += f" ({header})"
    print(f"  [Newsletter LLM] Processing {chunk_label} ({len(chunk_content)} chars)", file=sys.stderr)

    # Build user message with context header
    context = f"Subject: {subject}\nFrom: {from_addr}\nDate: {date_str}"
    if header:
        context += f"\nSection: {header}"
    user_msg = f"{context}\n\n{chunk_content}"

    # First attempt
    raw = _call_llm(
        system_msg, user_msg,
        model=LLM_NEWSLETTER_MODEL,
        url=LLM_NEWSLETTER_URL,
        timeout=newsletter_timeout,
    )
    parsed = _parse_json_response(raw)

    # Retry once if JSON parsing failed
    if parsed is None and raw is not None:
        print(f"  [Newsletter LLM] Invalid JSON in {chunk_label}, retrying...", file=sys.stderr)
        retry_system = system_msg + (
            "\n\nIMPORTANT: Return ONLY a valid JSON array. "
            "No markdown code fences. No explanation. Just the array."
        )
        raw = _call_llm(
            retry_system, user_msg,
            model=LLM_NEWSLETTER_MODEL,
            url=LLM_NEWSLETTER_URL,
            timeout=newsletter_timeout,
        )
        parsed = _parse_json_response(raw)

    if parsed is None:
        print(f"  [Newsletter LLM] Could not parse JSON in {chunk_label}", file=sys.stderr)
        continue

    if not isinstance(parsed, list):
        print(f"  [Newsletter LLM] Expected list in {chunk_label}, got {type(parsed).__name__}", file=sys.stderr)
        continue

    # Normalize each item from this chunk
    for item in parsed:
        if not isinstance(item, dict):
            continue
        normalized = _normalize_newsletter_item(item)
        if normalized:
            all_items.append(normalized)

if not all_items:
    return []

# Reduce step: pass combined items back to LLM to merge semantic duplicates
deduped = _llm_dedup(all_items, subject)
print(f"  [Newsletter LLM] Extracted {len(all_items)} items, {len(deduped)} after dedup (Phase 1+2)", file=sys.stderr)

return deduped

def _llm_dedup(items, subject=""):
"""Two-phase dedup: Python pre-merge for near-duplicates, then LLM for semantic ones.

Phase 1: Python merges items with matching (type, sorted_summary_words, date_key)
           handles word-order and minor wording differences without LLM cost.
Phase 2: LLM merges semantic duplicates (different wording, same event)
           only runs if there are 3+ items after Phase 1.
Falls back to Phase 1 result on any LLM error.
"""
# Phase 1: Python fuzzy dedup (type + sorted word-level summary + date)
_STOP_WORDS = frozenset({'a', 'an', 'the', 'for', 'of', 'at', 'in', 'on', 'to', 'is', 'are', 'and', 'or', 'due', 'due:'})
seen = {}
phase1_result = []
for item in items:
    summary = item.get("summary", "").lower().strip()
    # Normalize: remove stop words, strip plural 's', sort so word-order differences match
    def _norm_word(w):
        w = w.rstrip('s') if w.endswith('s') and len(w) > 2 else w  # plural  singular
        return w
    summary_key = " ".join(sorted(_norm_word(w) for w in summary.split() if w not in _STOP_WORDS))
    item_type = item.get("type", "")
    date_key = ""
    if item.get("start"):
        date_key = item["start"].isoformat()[:10] if hasattr(item["start"], "isoformat") else str(item["start"])[:10]
    elif item.get("due"):
        date_key = str(item["due"])[:10]
    key = (item_type, summary_key, date_key)
    if key in seen:
        # Merge who arrays from both items
        existing = seen[key]
        existing_who = set(w for w in existing.get("who", []))
        new_who = set(w for w in item.get("who", []))
        existing["who"] = sorted(existing_who | new_who)
        # Keep the longer/more detailed summary
        if len(item.get("summary", "")) > len(existing.get("summary", "")):
            existing["summary"] = item["summary"]
        if len(item.get("description", "")) > len(existing.get("description", "")):
            existing["description"] = item["description"]
        continue
    seen[key] = item
    phase1_result.append(item)

# Phase 2: LLM semantic dedup — merges items that refer to the same
# real-world event even if worded differently or typed differently (e.g.,
# event vs reminder). Now enabled by default with qwen2.5-coder:7b at
# ~125 tok/s on Gaming PC.
LLM_DEDUP_ENABLED = os.environ.get("LLM_DEDUP_ENABLED", "0") == "1"

if not LLM_DEDUP_ENABLED or len(phase1_result) <= 2:
    print(f"  [Newsletter LLM] {len(phase1_result)} items after Phase 1 dedup, Phase 2 {'skipped (disabled)' if not LLM_DEDUP_ENABLED else 'not needed'}", file=sys.stderr)
    return phase1_result

# Phase 2: LLM semantic dedup
prompts = load_prompts()
system_msg = prompts["newsletter_dedup"]

# Trim to key fields only — the LLM only needs enough to identify duplicates
_DEDUP_KEYS = ("type", "summary", "who", "start", "end", "due", "relevance", "reason", "location")
serializable = []
for item in phase1_result:
    s_item = {}
    for k in _DEDUP_KEYS:
        if k in item:
            v = item[k]
            if isinstance(v, datetime):
                s_item[k] = v.isoformat()
            elif isinstance(v, date_type):
                s_item[k] = v.isoformat()
            else:
                s_item[k] = v
    serializable.append(s_item)

items_json = json.dumps(serializable, indent=2)
user_msg = f"Newsletter: {subject}\n\n{items_json}"

raw = _call_llm(
    system_msg, user_msg,
    model=LLM_NEWSLETTER_MODEL,  # Use the strong newsletter model for dedup
    url=LLM_NEWSLETTER_URL,
    timeout=180,  # Generous timeout  dedup is important but not time-sensitive
)
parsed = _parse_json_response(raw)

if parsed is None or not isinstance(parsed, list):
    print(f"  [Newsletter LLM] Dedup LLM failed, keeping {len(phase1_result)} Phase 1 items", file=sys.stderr)
    return phase1_result

# The LLM returns trimmed items — match back to originals by summary+type
# to preserve all fields (description, duration_minutes, etc.)
result = []
used_indices = set()
for deduped_item in parsed:
    if not isinstance(deduped_item, dict):
        continue
    deduped_summary = deduped_item.get("summary", "").lower().strip()
    deduped_type = deduped_item.get("type", "")

    # Find the best matching original item (from phase1_result)
    # First try exact match on summary+type, then fall back to summary only
    # (LLM dedup may merge items with different types, e.g., event+reminder)
    best_match = None
    best_idx = None
    for idx, orig in enumerate(phase1_result):
        if idx in used_indices:
            continue
        orig_summary = orig.get("summary", "").lower().strip()
        if orig_summary == deduped_summary and orig.get("type", "") == deduped_type:
            best_match = orig
            best_idx = idx
            break
    # Fall back: match by summary only (type may have been upgraded by LLM)
    if best_match is None:
        for idx, orig in enumerate(phase1_result):
            if idx in used_indices:
                continue
            orig_summary = orig.get("summary", "").lower().strip()
            if orig_summary == deduped_summary:
                best_match = orig
                best_idx = idx
                break
    # Fall back: fuzzy summary match (contains check)
    if best_match is None:
        for idx, orig in enumerate(phase1_result):
            if idx in used_indices:
                continue
            orig_summary = orig.get("summary", "").lower().strip()
            if deduped_summary in orig_summary or orig_summary in deduped_summary:
                best_match = orig
                best_idx = idx
                break

    if best_match is not None:
        # Merge: start with original full item, override with LLM-merged fields
        merged = dict(best_match)
        # LLM may have updated who, relevance, reason, or description
        for k in ("who", "relevance", "reason", "description", "location"):
            if k in deduped_item:
                merged[k] = deduped_item[k]
        result.append(merged)
        used_indices.add(best_idx)
    else:
        # No match — normalize from LLM output directly
        normalized = _normalize_newsletter_item(deduped_item)
        if normalized:
            result.append(normalized)

# Note: we intentionally do NOT re-add unmatched originals. If the LLM
# returned fewer items, the missing ones were intentionally merged/deduped.
# Re-adding them would defeat the purpose of Phase 2.

return result if result else phase1_result

def _call_llm(system, user, temperature=0, timeout=None, model=None, url=None):
"""Send a chat completion request to the local LLM endpoint."""
request_timeout = timeout or LLM_TIMEOUT
request_model = model or LLM_MODEL
request_url = url or LLM_URL
payload = {
"model": request_model,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"temperature": temperature,
}
try:
resp = requests.post(request_url, json=payload, timeout=request_timeout)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"].strip()
except requests.exceptions.Timeout:
print(f" [Newsletter LLM] Timeout after {request_timeout}s", file=sys.stderr)
return None
except requests.exceptions.ConnectionError:
print(f" [Newsletter LLM] Connection failed to {LLM_URL}", file=sys.stderr)
return None
except Exception as e:
print(f" [Newsletter LLM] Error: {e}", file=sys.stderr)
return None

def _parse_json_response(text):
"""Parse JSON from LLM response, handling markdown code fences and whitespace."""
if not text:
return None
text = text.strip()
if text.startswith(""): text = re.sub(r'^(?:json)?\s\n?', '', text)
text = re.sub(r'\n?```\s
$', '', text)
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
match = re.search(r'[.*]', text, re.DOTALL)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
pass
return None

def _normalize_newsletter_item(item):
"""Normalize and validate a newsletter item dict from the LLM."""
item_type = item.get("type", "info")
valid_types = ("event", "reminder", "action_item", "info")
if item_type not in valid_types:
item_type = "info"

summary = str(item.get("summary", "")).strip() or "Newsletter item"
who = item.get("who", [])
if isinstance(who, str):
    who = [who]
who = [str(w).strip() for w in who if w]

# Normalize nicknames
nicknames = get_nickname_map()
who = [nicknames.get(w.lower(), w) for w in who]

description = str(item.get("description", "")).strip()[:500]
location = str(item.get("location", "")).strip()

# Parse dates based on type
start_dt = None
end_dt = None
due_date = None
duration_minutes = int(item.get("duration_minutes", 60) or 60)

if item_type == "event":
    start_str = item.get("start", "")
    end_str = item.get("end", "")
    start_dt = _parse_iso_datetime(start_str)
    end_dt = _parse_iso_datetime(end_str)
    if start_dt and not end_dt:
        end_dt = start_dt + timedelta(minutes=duration_minutes)
    if start_dt and end_dt:
        duration_minutes = int((end_dt - start_dt).total_seconds() / 60)

    # Day-of-week auto-correction for newsletter events
    claimed_day = str(item.get("claimed_day_of_week", "")).strip()
    if start_dt and claimed_day:
        from family_assistant.appointment_parser import _correct_day_of_week
        corrected_start, was_corrected = _correct_day_of_week(start_dt, claimed_day)
        if was_corrected:
            shift = corrected_start - start_dt
            print(f"  [DayFix] {summary}: LLM said {claimed_day} but date was "
                  f"{start_dt.strftime('%A %b %d')} → corrected to {corrected_start.strftime('%A %b %d')}",
                  file=sys.stderr)
            start_dt = corrected_start
            if end_dt:
                end_dt = end_dt + shift
                duration_minutes = int((end_dt - start_dt).total_seconds() / 60)

    # Past-date guard
    now = datetime.now(CHICAGO_TZ)
    if start_dt and start_dt < now:
        old_start = start_dt
        if claimed_day:
            from family_assistant.appointment_parser import _correct_day_of_week
            start_dt, _ = _correct_day_of_week(start_dt, claimed_day)
            if start_dt < now:
                start_dt = start_dt + timedelta(days=7)
        else:
            start_dt = start_dt + timedelta(days=7)
        shift = start_dt - old_start
        print(f"  [PastFix] {summary}: start was in the past ({old_start.strftime('%A %b %d')}) "
              f"→ shifted to {start_dt.strftime('%A %b %d')}", file=sys.stderr)
        if end_dt:
            end_dt = end_dt + shift
            duration_minutes = int((end_dt - start_dt).total_seconds() / 60)

    if not start_dt:
        # Date parse failed  no silent failures. Convert to info for Telegram digest.
        print(f"  [Newsletter] Event date unparseable, converting to info: {summary} (raw: {start_str})", file=sys.stderr)
        item_type = "info"
        description = f"{description} (original type: event, date unparseable: {start_str})".strip()[:500]

elif item_type in ("reminder", "action_item"):
    due_str = item.get("due", "")
    if due_str:
        due_date = _parse_iso_date(due_str)
    if not due_date:
        # Try start as fallback for due date
        start_str = item.get("start", "")
        due_date = _parse_iso_date(start_str) if start_str else None

    # Day-of-week correction for reminder/action due dates
    reminder_claimed_day = str(item.get("claimed_day_of_week", "")).strip()
    if due_date and reminder_claimed_day:
        from family_assistant.appointment_parser import _correct_day_of_week
        due_dt = datetime(due_date.year, due_date.month, due_date.day, tzinfo=CHICAGO_TZ)
        corrected_due, was_fixed = _correct_day_of_week(due_dt, reminder_claimed_day)
        if was_fixed:
            print(f"  [DayFix] {summary}: reminder due said {reminder_claimed_day} but date was "
                  f"{due_date.strftime('%A %b %d')} -> corrected to {corrected_due.strftime('%A %b %d')}",
                  file=sys.stderr)
            due_date = corrected_due.date()

    # Past-date guard for reminders
    if due_date:
        today = datetime.now(CHICAGO_TZ).date()
        if due_date < today:
            old_due = due_date
            if reminder_claimed_day:
                due_dt = datetime(due_date.year, due_date.month, due_date.day, tzinfo=CHICAGO_TZ)
                corrected_due, _ = _correct_day_of_week(due_dt, reminder_claimed_day)
                due_date = corrected_due.date()
                if due_date < today:
                    due_date = due_date + timedelta(days=7)
            else:
                due_date = due_date + timedelta(days=7)
            print(f"  [PastFix] {summary}: reminder due was in the past ({old_due.strftime('%A %b %d')}) "
                  f"-> shifted to {due_date.strftime('%A %b %d')}", file=sys.stderr)

    if not due_date:
        # Date parse failed  convert to info for Telegram digest.
        print(f"  [Newsletter] {item_type} date unparseable, converting to info: {summary} (raw: {due_str})", file=sys.stderr)
        item_type = "info"
        description = f"{description} (original type: {item_type}, date unparseable: {due_str})".strip()[:500]

is_recurring = bool(item.get("is_recurring", False))
is_multi_day = bool(item.get("is_multi_day", False))

# Extract recurrence dict if present and valid
recurrence = None
if is_recurring:
    rec_raw = item.get("recurrence")
    if isinstance(rec_raw, dict):
        from family_assistant.rrule_builder import validate_recurrence
        errors = validate_recurrence(rec_raw)
        if not errors:
            recurrence = rec_raw
        else:
            print(f"  [Newsletter] Invalid recurrence dict, ignoring: {'; '.join(errors)}", file=sys.stderr)
    elif rec_raw:
        print(f"  [Newsletter] recurrence is not a dict, ignoring: {type(rec_raw).__name__}", file=sys.stderr)

result = {
    "type": item_type,
    "summary": summary,
    "who": who,
    "description": description,
    "location": location,
    "is_recurring": is_recurring,
    "is_multi_day": is_multi_day,
    # Relevance fields  always present per prompt contract
    "relevance": item.get("relevance", "high") if item.get("relevance") in ("high", "low") else "high",
    "reason": str(item.get("reason", "")).strip()[:300] or "No reason provided",
}

if item_type == "event":
    result["start"] = start_dt
    result["end"] = end_dt
    result["duration_minutes"] = duration_minutes
    result["claimed_day_of_week"] = claimed_day
elif item_type in ("reminder", "action_item"):
    result["due"] = due_date  # date object or None

return result

def _parse_iso_datetime(s):
"""Parse an ISO 8601 datetime string into a timezone-aware datetime."""
if not s or not isinstance(s, str):
return None
s = s.strip()
if not s:
return None

try:
    dt = datetime.fromisoformat(s)
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=CHICAGO_TZ)
    return dt.astimezone(CHICAGO_TZ)
except (ValueError, TypeError):
    pass

for fmt in (
    "%Y-%m-%dT%H:%M:%S%z",
    "%Y-%m-%dT%H:%M:%S",
    "%Y-%m-%dT%H:%M",
):
    try:
        dt = datetime.strptime(s, fmt)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=CHICAGO_TZ)
        return dt.astimezone(CHICAGO_TZ)
    except ValueError:
        continue

print(f"  [Newsletter Parse] Could not parse datetime: {s}", file=sys.stderr)
return None

def _parse_iso_date(s):
"""Parse an ISO 8601 date string (YYYY-MM-DD) into a date object."""
if not s or not isinstance(s, str):
return None
s = s.strip().split("T")[0] # Strip time component if present
try:
from datetime import date as date_type
return date_type.fromisoformat(s)
except (ValueError, TypeError):
print(f" [Newsletter Parse] Could not parse date: {s}", file=sys.stderr)
return None