"""Newsletter extraction via local LLM (Prompt-as-Code). Uses Markdown-header chunking to avoid VRAM exhaustion on the local GPU. Long newsletters are split at ## headings, each chunk is processed separately, and results are merged/deduplicated at the end. """ import json import os import re import sys from datetime import datetime, timedelta, date as date_type from zoneinfo import ZoneInfo import requests from family_assistant.config import ( LLM_URL, LLM_MODEL, LLM_NEWSLETTER_MODEL, LLM_NEWSLETTER_URL, LLM_TIMEOUT, LLM_NEWSLETTER_TIMEOUT, CHICAGO_TZ, MAX_BODY_CHARS, get_nickname_map, load_prompts, ) def _chunk_by_markdown_headers(text, max_chunk_chars=2000): """Split text at Markdown ##+ headers, respecting a max chunk size. Yields (header, chunk_text) tuples. The header is the ## line (or '' for the preamble before any header). Chunks that exceed max_chunk_chars after splitting at headers are further split at paragraph boundaries (blank lines). This keeps each LLM call small enough to fit in VRAM on the Gaming PC's 12GB 3080 Ti while preserving the semantic grouping of newsletter sections. """ if not text: return # Split at ## (or deeper) headings header_pattern = re.compile(r'^(#{1,6}\s+.+)$', re.MULTILINE) splits = header_pattern.split(text) # splits alternates: [preamble, header1, section1, header2, section2, ...] chunks = [] if splits and not header_pattern.match(splits[0]): # Preamble before first header preamble = splits[0].strip() if preamble: chunks.append(("", preamble)) splits = splits[1:] # Pair up headers with their content for i in range(0, len(splits) - 1, 2): header = splits[i].strip() if i < len(splits) else "" content = splits[i + 1].strip() if i + 1 < len(splits) else "" if content: chunks.append((header, content)) # If no headers found at all, treat the whole text as one chunk if not chunks and text.strip(): chunks.append(("", text.strip())) # Further split oversized chunks at paragraph boundaries final_chunks = [] for header, content in chunks: if len(content) <= max_chunk_chars: final_chunks.append((header, content)) else: paragraphs = re.split(r'\n\s*\n', content) current = "" for para in paragraphs: para = para.strip() if not para: continue if len(current) + len(para) + 2 <= max_chunk_chars: current = current + "\n\n" + para if current else para else: if current: final_chunks.append((header, current)) current = para if current: final_chunks.append((header, current)) for header, content in final_chunks: yield header, content def classify_email(subject, body, from_addr=""): """Classify an email as 'appointment' or 'newsletter'. Returns one of: "appointment", "newsletter" Falls back to "appointment" on errors. """ prompts = load_prompts() system_msg = prompts["email_classify"] trimmed_body = body[:MAX_BODY_CHARS] if body else "" user_msg = f"Subject: {subject}\nFrom: {from_addr}\n\n{trimmed_body}" raw = _call_llm(system_msg, user_msg) parsed = _parse_json_response(raw) if parsed and isinstance(parsed, dict): cls = parsed.get("classification", "appointment") if cls == "newsletter": return "newsletter" # Default to appointment — simpler path, existing parser handles it return "appointment" def parse_newsletter_with_llm(subject, body, from_addr="", date_str=""): """Parse a newsletter email into structured items. Uses Markdown-header chunking to preserve VRAM on the local GPU. Each chunk is sent to the LLM separately, and results are merged and deduplicated. Returns a list of item dicts, each with a "type" key: "event", "reminder", "action_item", "info" Each item also includes "relevance" ("high"|"low") and "reason" keys. """ prompts = load_prompts() system_template = prompts["newsletter_extract"] today = datetime.now(CHICAGO_TZ) today_str = today.strftime("%Y-%m-%d") today_day = today.strftime("%A") system_msg = system_template.format(today=today_str, today_day=today_day) # Use extended body limit for newsletters (URL-enriched bodies can be large) trimmed_body = body[:MAX_BODY_CHARS * 4] if body else "" # Chunk the body at Markdown headers to preserve VRAM chunks = list(_chunk_by_markdown_headers(trimmed_body, max_chunk_chars=2000)) if not chunks: return [] all_items = [] newsletter_timeout = LLM_NEWSLETTER_TIMEOUT # Process each chunk separately for i, (header, chunk_content) in enumerate(chunks): chunk_label = f"chunk {i+1}/{len(chunks)}" if header: chunk_label += f" ({header})" print(f" [Newsletter LLM] Processing {chunk_label} ({len(chunk_content)} chars)", file=sys.stderr) # Build user message with context header context = f"Subject: {subject}\nFrom: {from_addr}\nDate: {date_str}" if header: context += f"\nSection: {header}" user_msg = f"{context}\n\n{chunk_content}" # First attempt raw = _call_llm( system_msg, user_msg, model=LLM_NEWSLETTER_MODEL, url=LLM_NEWSLETTER_URL, timeout=newsletter_timeout, ) parsed = _parse_json_response(raw) # Retry once if JSON parsing failed if parsed is None and raw is not None: print(f" [Newsletter LLM] Invalid JSON in {chunk_label}, retrying...", file=sys.stderr) retry_system = system_msg + ( "\n\nIMPORTANT: Return ONLY a valid JSON array. " "No markdown code fences. No explanation. Just the array." ) raw = _call_llm( retry_system, user_msg, model=LLM_NEWSLETTER_MODEL, url=LLM_NEWSLETTER_URL, timeout=newsletter_timeout, ) parsed = _parse_json_response(raw) if parsed is None: print(f" [Newsletter LLM] Could not parse JSON in {chunk_label}", file=sys.stderr) continue if not isinstance(parsed, list): print(f" [Newsletter LLM] Expected list in {chunk_label}, got {type(parsed).__name__}", file=sys.stderr) continue # Normalize each item from this chunk for item in parsed: if not isinstance(item, dict): continue normalized = _normalize_newsletter_item(item) if normalized: all_items.append(normalized) if not all_items: return [] # Reduce step: pass combined items back to LLM to merge semantic duplicates deduped = _llm_dedup(all_items, subject) print(f" [Newsletter LLM] Extracted {len(all_items)} items, {len(deduped)} after dedup (Phase 1+2)", file=sys.stderr) return deduped def _llm_dedup(items, subject=""): """Two-phase dedup: Python pre-merge for near-duplicates, then LLM for semantic ones. Phase 1: Python merges items with matching (type, sorted_summary_words, date_key) — handles word-order and minor wording differences without LLM cost. Phase 2: LLM merges semantic duplicates (different wording, same event) — only runs if there are 3+ items after Phase 1. Falls back to Phase 1 result on any LLM error. """ # Phase 1: Python fuzzy dedup (type + sorted word-level summary + date) _STOP_WORDS = frozenset({'a', 'an', 'the', 'for', 'of', 'at', 'in', 'on', 'to', 'is', 'are', 'and', 'or', 'due', 'due:'}) seen = {} phase1_result = [] for item in items: summary = item.get("summary", "").lower().strip() # Normalize: remove stop words, strip plural 's', sort so word-order differences match def _norm_word(w): w = w.rstrip('s') if w.endswith('s') and len(w) > 2 else w # plural → singular return w summary_key = " ".join(sorted(_norm_word(w) for w in summary.split() if w not in _STOP_WORDS)) item_type = item.get("type", "") date_key = "" if item.get("start"): date_key = item["start"].isoformat()[:10] if hasattr(item["start"], "isoformat") else str(item["start"])[:10] elif item.get("due"): date_key = str(item["due"])[:10] key = (item_type, summary_key, date_key) if key in seen: # Merge who arrays from both items existing = seen[key] existing_who = set(w for w in existing.get("who", [])) new_who = set(w for w in item.get("who", [])) existing["who"] = sorted(existing_who | new_who) # Keep the longer/more detailed summary if len(item.get("summary", "")) > len(existing.get("summary", "")): existing["summary"] = item["summary"] if len(item.get("description", "")) > len(existing.get("description", "")): existing["description"] = item["description"] continue seen[key] = item phase1_result.append(item) # Phase 2: LLM semantic dedup — merges items that refer to the same # real-world event even if worded differently or typed differently (e.g., # event vs reminder). Now enabled by default with qwen2.5-coder:7b at # ~125 tok/s on Gaming PC. LLM_DEDUP_ENABLED = os.environ.get("LLM_DEDUP_ENABLED", "0") == "1" if not LLM_DEDUP_ENABLED or len(phase1_result) <= 2: print(f" [Newsletter LLM] {len(phase1_result)} items after Phase 1 dedup, Phase 2 {'skipped (disabled)' if not LLM_DEDUP_ENABLED else 'not needed'}", file=sys.stderr) return phase1_result # Phase 2: LLM semantic dedup prompts = load_prompts() system_msg = prompts["newsletter_dedup"] # Trim to key fields only — the LLM only needs enough to identify duplicates _DEDUP_KEYS = ("type", "summary", "who", "start", "end", "due", "relevance", "reason", "location") serializable = [] for item in phase1_result: s_item = {} for k in _DEDUP_KEYS: if k in item: v = item[k] if isinstance(v, datetime): s_item[k] = v.isoformat() elif isinstance(v, date_type): s_item[k] = v.isoformat() else: s_item[k] = v serializable.append(s_item) items_json = json.dumps(serializable, indent=2) user_msg = f"Newsletter: {subject}\n\n{items_json}" raw = _call_llm( system_msg, user_msg, model=LLM_NEWSLETTER_MODEL, # Use the strong newsletter model for dedup url=LLM_NEWSLETTER_URL, timeout=180, # Generous timeout — dedup is important but not time-sensitive ) parsed = _parse_json_response(raw) if parsed is None or not isinstance(parsed, list): print(f" [Newsletter LLM] Dedup LLM failed, keeping {len(phase1_result)} Phase 1 items", file=sys.stderr) return phase1_result # The LLM returns trimmed items — match back to originals by summary+type # to preserve all fields (description, duration_minutes, etc.) result = [] used_indices = set() for deduped_item in parsed: if not isinstance(deduped_item, dict): continue deduped_summary = deduped_item.get("summary", "").lower().strip() deduped_type = deduped_item.get("type", "") # Find the best matching original item (from phase1_result) # First try exact match on summary+type, then fall back to summary only # (LLM dedup may merge items with different types, e.g., event+reminder) best_match = None best_idx = None for idx, orig in enumerate(phase1_result): if idx in used_indices: continue orig_summary = orig.get("summary", "").lower().strip() if orig_summary == deduped_summary and orig.get("type", "") == deduped_type: best_match = orig best_idx = idx break # Fall back: match by summary only (type may have been upgraded by LLM) if best_match is None: for idx, orig in enumerate(phase1_result): if idx in used_indices: continue orig_summary = orig.get("summary", "").lower().strip() if orig_summary == deduped_summary: best_match = orig best_idx = idx break # Fall back: fuzzy summary match (contains check) if best_match is None: for idx, orig in enumerate(phase1_result): if idx in used_indices: continue orig_summary = orig.get("summary", "").lower().strip() if deduped_summary in orig_summary or orig_summary in deduped_summary: best_match = orig best_idx = idx break if best_match is not None: # Merge: start with original full item, override with LLM-merged fields merged = dict(best_match) # LLM may have updated who, relevance, reason, or description for k in ("who", "relevance", "reason", "description", "location"): if k in deduped_item: merged[k] = deduped_item[k] result.append(merged) used_indices.add(best_idx) else: # No match — normalize from LLM output directly normalized = _normalize_newsletter_item(deduped_item) if normalized: result.append(normalized) # Note: we intentionally do NOT re-add unmatched originals. If the LLM # returned fewer items, the missing ones were intentionally merged/deduped. # Re-adding them would defeat the purpose of Phase 2. return result if result else phase1_result def _call_llm(system, user, temperature=0, timeout=None, model=None, url=None): """Send a chat completion request to the local LLM endpoint.""" request_timeout = timeout or LLM_TIMEOUT request_model = model or LLM_MODEL request_url = url or LLM_URL payload = { "model": request_model, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": user}, ], "temperature": temperature, } try: resp = requests.post(request_url, json=payload, timeout=request_timeout) resp.raise_for_status() data = resp.json() return data["choices"][0]["message"]["content"].strip() except requests.exceptions.Timeout: print(f" [Newsletter LLM] Timeout after {request_timeout}s", file=sys.stderr) return None except requests.exceptions.ConnectionError: print(f" [Newsletter LLM] Connection failed to {LLM_URL}", file=sys.stderr) return None except Exception as e: print(f" [Newsletter LLM] Error: {e}", file=sys.stderr) return None def _parse_json_response(text): """Parse JSON from LLM response, handling markdown code fences and whitespace.""" if not text: return None text = text.strip() if text.startswith("```"): text = re.sub(r'^```(?:json)?\s*\n?', '', text) text = re.sub(r'\n?```\s*$', '', text) text = text.strip() try: return json.loads(text) except json.JSONDecodeError: match = re.search(r'\[.*\]', text, re.DOTALL) if match: try: return json.loads(match.group(0)) except json.JSONDecodeError: pass return None def _normalize_newsletter_item(item): """Normalize and validate a newsletter item dict from the LLM.""" item_type = item.get("type", "info") valid_types = ("event", "reminder", "action_item", "info") if item_type not in valid_types: item_type = "info" summary = str(item.get("summary", "")).strip() or "Newsletter item" who = item.get("who", []) if isinstance(who, str): who = [who] who = [str(w).strip() for w in who if w] # Normalize nicknames nicknames = get_nickname_map() who = [nicknames.get(w.lower(), w) for w in who] description = str(item.get("description", "")).strip()[:500] location = str(item.get("location", "")).strip() # Parse dates based on type start_dt = None end_dt = None due_date = None duration_minutes = int(item.get("duration_minutes", 60) or 60) if item_type == "event": start_str = item.get("start", "") end_str = item.get("end", "") start_dt = _parse_iso_datetime(start_str) end_dt = _parse_iso_datetime(end_str) if start_dt and not end_dt: end_dt = start_dt + timedelta(minutes=duration_minutes) if start_dt and end_dt: duration_minutes = int((end_dt - start_dt).total_seconds() / 60) # Day-of-week auto-correction for newsletter events claimed_day = str(item.get("claimed_day_of_week", "")).strip() if start_dt and claimed_day: from family_assistant.appointment_parser import _correct_day_of_week corrected_start, was_corrected = _correct_day_of_week(start_dt, claimed_day) if was_corrected: shift = corrected_start - start_dt print(f" [DayFix] {summary}: LLM said {claimed_day} but date was " f"{start_dt.strftime('%A %b %d')} → corrected to {corrected_start.strftime('%A %b %d')}", file=sys.stderr) start_dt = corrected_start if end_dt: end_dt = end_dt + shift duration_minutes = int((end_dt - start_dt).total_seconds() / 60) # Past-date guard now = datetime.now(CHICAGO_TZ) if start_dt and start_dt < now: old_start = start_dt if claimed_day: from family_assistant.appointment_parser import _correct_day_of_week start_dt, _ = _correct_day_of_week(start_dt, claimed_day) if start_dt < now: start_dt = start_dt + timedelta(days=7) else: start_dt = start_dt + timedelta(days=7) shift = start_dt - old_start print(f" [PastFix] {summary}: start was in the past ({old_start.strftime('%A %b %d')}) " f"→ shifted to {start_dt.strftime('%A %b %d')}", file=sys.stderr) if end_dt: end_dt = end_dt + shift duration_minutes = int((end_dt - start_dt).total_seconds() / 60) if not start_dt: # Date parse failed — no silent failures. Convert to info for Telegram digest. print(f" [Newsletter] Event date unparseable, converting to info: {summary} (raw: {start_str})", file=sys.stderr) item_type = "info" description = f"{description} (original type: event, date unparseable: {start_str})".strip()[:500] elif item_type in ("reminder", "action_item"): due_str = item.get("due", "") if due_str: due_date = _parse_iso_date(due_str) if not due_date: # Try start as fallback for due date start_str = item.get("start", "") due_date = _parse_iso_date(start_str) if start_str else None # Day-of-week correction for reminder/action due dates reminder_claimed_day = str(item.get("claimed_day_of_week", "")).strip() if due_date and reminder_claimed_day: from family_assistant.appointment_parser import _correct_day_of_week due_dt = datetime(due_date.year, due_date.month, due_date.day, tzinfo=CHICAGO_TZ) corrected_due, was_fixed = _correct_day_of_week(due_dt, reminder_claimed_day) if was_fixed: print(f" [DayFix] {summary}: reminder due said {reminder_claimed_day} but date was " f"{due_date.strftime('%A %b %d')} -> corrected to {corrected_due.strftime('%A %b %d')}", file=sys.stderr) due_date = corrected_due.date() # Past-date guard for reminders if due_date: today = datetime.now(CHICAGO_TZ).date() if due_date < today: old_due = due_date if reminder_claimed_day: due_dt = datetime(due_date.year, due_date.month, due_date.day, tzinfo=CHICAGO_TZ) corrected_due, _ = _correct_day_of_week(due_dt, reminder_claimed_day) due_date = corrected_due.date() if due_date < today: due_date = due_date + timedelta(days=7) else: due_date = due_date + timedelta(days=7) print(f" [PastFix] {summary}: reminder due was in the past ({old_due.strftime('%A %b %d')}) " f"-> shifted to {due_date.strftime('%A %b %d')}", file=sys.stderr) if not due_date: # Date parse failed — convert to info for Telegram digest. print(f" [Newsletter] {item_type} date unparseable, converting to info: {summary} (raw: {due_str})", file=sys.stderr) item_type = "info" description = f"{description} (original type: {item_type}, date unparseable: {due_str})".strip()[:500] is_recurring = bool(item.get("is_recurring", False)) is_multi_day = bool(item.get("is_multi_day", False)) # Extract recurrence dict if present and valid recurrence = None if is_recurring: rec_raw = item.get("recurrence") if isinstance(rec_raw, dict): from family_assistant.rrule_builder import validate_recurrence errors = validate_recurrence(rec_raw) if not errors: recurrence = rec_raw else: print(f" [Newsletter] Invalid recurrence dict, ignoring: {'; '.join(errors)}", file=sys.stderr) elif rec_raw: print(f" [Newsletter] recurrence is not a dict, ignoring: {type(rec_raw).__name__}", file=sys.stderr) result = { "type": item_type, "summary": summary, "who": who, "description": description, "location": location, "is_recurring": is_recurring, "is_multi_day": is_multi_day, # Relevance fields — always present per prompt contract "relevance": item.get("relevance", "high") if item.get("relevance") in ("high", "low") else "high", "reason": str(item.get("reason", "")).strip()[:300] or "No reason provided", } if item_type == "event": result["start"] = start_dt result["end"] = end_dt result["duration_minutes"] = duration_minutes result["claimed_day_of_week"] = claimed_day elif item_type in ("reminder", "action_item"): result["due"] = due_date # date object or None return result def _parse_iso_datetime(s): """Parse an ISO 8601 datetime string into a timezone-aware datetime.""" if not s or not isinstance(s, str): return None s = s.strip() if not s: return None try: dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=CHICAGO_TZ) return dt.astimezone(CHICAGO_TZ) except (ValueError, TypeError): pass for fmt in ( "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M", ): try: dt = datetime.strptime(s, fmt) if dt.tzinfo is None: dt = dt.replace(tzinfo=CHICAGO_TZ) return dt.astimezone(CHICAGO_TZ) except ValueError: continue print(f" [Newsletter Parse] Could not parse datetime: {s}", file=sys.stderr) return None def _parse_iso_date(s): """Parse an ISO 8601 date string (YYYY-MM-DD) into a date object.""" if not s or not isinstance(s, str): return None s = s.strip().split("T")[0] # Strip time component if present try: from datetime import date as date_type return date_type.fromisoformat(s) except (ValueError, TypeError): print(f" [Newsletter Parse] Could not parse date: {s}", file=sys.stderr) return None