"""Newsletter extraction via local LLM (Prompt-as-Code).
Uses Markdown-header chunking to avoid VRAM exhaustion on the local GPU.
Long newsletters are split at ## headings, each chunk is processed
separately, and results are merged/deduplicated at the end.
"""
import json
import os
import re
import sys
from datetime import datetime, timedelta, date as date_type
from zoneinfo import ZoneInfo
import requests
from icarus.core.config import (
LLM_URL,
LLM_MODEL,
LLM_NEWSLETTER_MODEL,
LLM_NEWSLETTER_URL,
LLM_TIMEOUT,
LLM_NEWSLETTER_TIMEOUT,
CHICAGO_TZ,
MAX_BODY_CHARS,
get_nickname_map,
load_prompts,
)
def _chunk_by_markdown_headers(text, max_chunk_chars=2000):
"""Split text at Markdown ##+ headers, respecting a max chunk size.
Yields (header, chunk_text) tuples. The header is the ## line (or '' for
the preamble before any header). Chunks that exceed max_chunk_chars after
splitting at headers are further split at paragraph boundaries (blank lines).
This keeps each LLM call small enough to fit in VRAM on the Gaming PC's
12GB 3080 Ti while preserving the semantic grouping of newsletter sections.
"""
if not text:
return
# Split at ## (or deeper) headings
header_pattern = re.compile(r'^(#{1,6}\s+.+)$', re.MULTILINE)
splits = header_pattern.split(text)
# splits alternates: [preamble, header1, section1, header2, section2, ...]
chunks = []
if splits and not header_pattern.match(splits[0]):
# Preamble before first header
preamble = splits[0].strip()
if preamble:
chunks.append(("", preamble))
splits = splits[1:]
# Pair up headers with their content
for i in range(0, len(splits) - 1, 2):
header = splits[i].strip() if i < len(splits) else ""
content = splits[i + 1].strip() if i + 1 < len(splits) else ""
if content:
chunks.append((header, content))
# If no headers found at all, treat the whole text as one chunk
if not chunks and text.strip():
chunks.append(("", text.strip()))
# Further split oversized chunks at paragraph boundaries
final_chunks = []
for header, content in chunks:
if len(content) <= max_chunk_chars:
final_chunks.append((header, content))
else:
paragraphs = re.split(r'\n\s*\n', content)
current = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
if len(current) + len(para) + 2 <= max_chunk_chars:
current = current + "\n\n" + para if current else para
else:
if current:
final_chunks.append((header, current))
current = para
if current:
final_chunks.append((header, current))
for header, content in final_chunks:
yield header, content
def classify_email(subject, body, from_addr=""):
"""Classify an email as 'appointment' or 'newsletter'.
Returns one of: "appointment", "newsletter"
Falls back to "appointment" on errors.
"""
prompts = load_prompts()
system_msg = prompts["email_classify"]
trimmed_body = body[:MAX_BODY_CHARS] if body else ""
user_msg = f"Subject: {subject}\nFrom: {from_addr}\n\n{trimmed_body}"
raw = _call_llm(system_msg, user_msg)
parsed = _parse_json_response(raw)
if parsed and isinstance(parsed, dict):
cls = parsed.get("classification", "appointment")
if cls == "newsletter":
return "newsletter"
# Default to appointment — simpler path, existing parser handles it
return "appointment"
def parse_newsletter_with_llm(subject, body, from_addr="", date_str=""):
"""Parse a newsletter email into structured items.
Uses Markdown-header chunking to preserve VRAM on the local GPU.
Each chunk is sent to the LLM separately, and results are merged
and deduplicated.
Returns a list of item dicts, each with a "type" key:
"event", "reminder", "action_item", "info"
Each item also includes "relevance" ("high"|"low") and "reason" keys.
"""
prompts = load_prompts()
system_template = prompts["newsletter_extract"]
today = datetime.now(CHICAGO_TZ)
today_str = today.strftime("%Y-%m-%d")
today_day = today.strftime("%A")
system_msg = system_template.format(today=today_str, today_day=today_day)
# Use extended body limit for newsletters (URL-enriched bodies can be large)
trimmed_body = body[:MAX_BODY_CHARS * 4] if body else ""
# Chunk the body at Markdown headers to preserve VRAM
chunks = list(_chunk_by_markdown_headers(trimmed_body, max_chunk_chars=2000))
if not chunks:
return []
all_items = []
newsletter_timeout = LLM_NEWSLETTER_TIMEOUT
# Process each chunk separately
for i, (header, chunk_content) in enumerate(chunks):
chunk_label = f"chunk {i+1}/{len(chunks)}"
if header:
chunk_label += f" ({header})"
print(f" [Newsletter LLM] Processing {chunk_label} ({len(chunk_content)} chars)", file=sys.stderr)
# Build user message with context header
context = f"Subject: {subject}\nFrom: {from_addr}\nDate: {date_str}"
if header:
context += f"\nSection: {header}"
user_msg = f"{context}\n\n{chunk_content}"
# First attempt
raw = _call_llm(
system_msg, user_msg,
model=LLM_NEWSLETTER_MODEL,
url=LLM_NEWSLETTER_URL,
timeout=newsletter_timeout,
)
parsed = _parse_json_response(raw)
# Retry once if JSON parsing failed
if parsed is None and raw is not None:
print(f" [Newsletter LLM] Invalid JSON in {chunk_label}, retrying...", file=sys.stderr)
retry_system = system_msg + (
"\n\nIMPORTANT: Return ONLY a valid JSON array. "
"No markdown code fences. No explanation. Just the array."
)
raw = _call_llm(
retry_system, user_msg,
model=LLM_NEWSLETTER_MODEL,
url=LLM_NEWSLETTER_URL,
timeout=newsletter_timeout,
)
parsed = _parse_json_response(raw)
if parsed is None:
print(f" [Newsletter LLM] Could not parse JSON in {chunk_label}", file=sys.stderr)
continue
if not isinstance(parsed, list):
print(f" [Newsletter LLM] Expected list in {chunk_label}, got {type(parsed).__name__}", file=sys.stderr)
continue
# Normalize each item from this chunk
for item in parsed:
if not isinstance(item, dict):
continue
normalized = _normalize_newsletter_item(item)
if normalized:
all_items.append(normalized)
if not all_items:
return []
# Reduce step: pass combined items back to LLM to merge semantic duplicates
deduped = _llm_dedup(all_items, subject)
print(f" [Newsletter LLM] Extracted {len(all_items)} items, {len(deduped)} after dedup (Phase 1+2)", file=sys.stderr)
return deduped
def _llm_dedup(items, subject=""):
"""Two-phase dedup: Python pre-merge for near-duplicates, then LLM for semantic ones.
Phase 1: Python merges items with matching (type, sorted_summary_words, date_key)
— handles word-order and minor wording differences without LLM cost.
Phase 2: LLM merges semantic duplicates (different wording, same event)
— only runs if there are 3+ items after Phase 1.
Falls back to Phase 1 result on any LLM error.
"""
# Phase 1: Python fuzzy dedup (type + sorted word-level summary + date)
_STOP_WORDS = frozenset({'a', 'an', 'the', 'for', 'of', 'at', 'in', 'on', 'to', 'is', 'are', 'and', 'or', 'due', 'due:'})
seen = {}
phase1_result = []
for item in items:
summary = item.get("summary", "").lower().strip()
# Normalize: remove stop words, strip plural 's', sort so word-order differences match
def _norm_word(w):
w = w.rstrip('s') if w.endswith('s') and len(w) > 2 else w # plural → singular
return w
summary_key = " ".join(sorted(_norm_word(w) for w in summary.split() if w not in _STOP_WORDS))
item_type = item.get("type", "")
date_key = ""
if item.get("start"):
date_key = item["start"].isoformat()[:10] if hasattr(item["start"], "isoformat") else str(item["start"])[:10]
elif item.get("due"):
date_key = str(item["due"])[:10]
key = (item_type, summary_key, date_key)
if key in seen:
# Merge who arrays from both items
existing = seen[key]
existing_who = set(w for w in existing.get("who", []))
new_who = set(w for w in item.get("who", []))
existing["who"] = sorted(existing_who | new_who)
# Keep the longer/more detailed summary
if len(item.get("summary", "")) > len(existing.get("summary", "")):
existing["summary"] = item["summary"]
if len(item.get("description", "")) > len(existing.get("description", "")):
existing["description"] = item["description"]
continue
seen[key] = item
phase1_result.append(item)
# Phase 2: LLM semantic dedup — merges items that refer to the same
# real-world event even if worded differently or typed differently (e.g.,
# event vs reminder). Now enabled by default with qwen2.5-coder:7b at
# ~125 tok/s on Gaming PC.
LLM_DEDUP_ENABLED = os.environ.get("LLM_DEDUP_ENABLED", "0") == "1"
if not LLM_DEDUP_ENABLED or len(phase1_result) <= 2:
print(f" [Newsletter LLM] {len(phase1_result)} items after Phase 1 dedup, Phase 2 {'skipped (disabled)' if not LLM_DEDUP_ENABLED else 'not needed'}", file=sys.stderr)
return phase1_result
# Phase 2: LLM semantic dedup
prompts = load_prompts()
system_msg = prompts["newsletter_dedup"]
# Trim to key fields only — the LLM only needs enough to identify duplicates
_DEDUP_KEYS = ("type", "summary", "who", "start", "end", "due", "relevance", "reason", "location")
serializable = []
for item in phase1_result:
s_item = {}
for k in _DEDUP_KEYS:
if k in item:
v = item[k]
if isinstance(v, datetime):
s_item[k] = v.isoformat()
elif isinstance(v, date_type):
s_item[k] = v.isoformat()
else:
s_item[k] = v
serializable.append(s_item)
items_json = json.dumps(serializable, indent=2)
user_msg = f"Newsletter: {subject}\n\n{items_json}"
raw = _call_llm(
system_msg, user_msg,
model=LLM_NEWSLETTER_MODEL, # Use the strong newsletter model for dedup
url=LLM_NEWSLETTER_URL,
timeout=180, # Generous timeout — dedup is important but not time-sensitive
)
parsed = _parse_json_response(raw)
if parsed is None or not isinstance(parsed, list):
print(f" [Newsletter LLM] Dedup LLM failed, keeping {len(phase1_result)} Phase 1 items", file=sys.stderr)
return phase1_result
# The LLM returns trimmed items — match back to originals by summary+type
# to preserve all fields (description, duration_minutes, etc.)
result = []
used_indices = set()
for deduped_item in parsed:
if not isinstance(deduped_item, dict):
continue
deduped_summary = deduped_item.get("summary", "").lower().strip()
deduped_type = deduped_item.get("type", "")
# Find the best matching original item (from phase1_result)
# First try exact match on summary+type, then fall back to summary only
# (LLM dedup may merge items with different types, e.g., event+reminder)
best_match = None
best_idx = None
for idx, orig in enumerate(phase1_result):
if idx in used_indices:
continue
orig_summary = orig.get("summary", "").lower().strip()
if orig_summary == deduped_summary and orig.get("type", "") == deduped_type:
best_match = orig
best_idx = idx
break
# Fall back: match by summary only (type may have been upgraded by LLM)
if best_match is None:
for idx, orig in enumerate(phase1_result):
if idx in used_indices:
continue
orig_summary = orig.get("summary", "").lower().strip()
if orig_summary == deduped_summary:
best_match = orig
best_idx = idx
break
# Fall back: fuzzy summary match (contains check)
if best_match is None:
for idx, orig in enumerate(phase1_result):
if idx in used_indices:
continue
orig_summary = orig.get("summary", "").lower().strip()
if deduped_summary in orig_summary or orig_summary in deduped_summary:
best_match = orig
best_idx = idx
break
if best_match is not None:
# Merge: start with original full item, override with LLM-merged fields
merged = dict(best_match)
# LLM may have updated who, relevance, reason, or description
for k in ("who", "relevance", "reason", "description", "location"):
if k in deduped_item:
merged[k] = deduped_item[k]
result.append(merged)
used_indices.add(best_idx)
else:
# No match — normalize from LLM output directly
normalized = _normalize_newsletter_item(deduped_item)
if normalized:
result.append(normalized)
# Note: we intentionally do NOT re-add unmatched originals. If the LLM
# returned fewer items, the missing ones were intentionally merged/deduped.
# Re-adding them would defeat the purpose of Phase 2.
return result if result else phase1_result
def _call_llm(system, user, temperature=0, timeout=None, model=None, url=None):
"""Send a chat completion request to the local LLM endpoint."""
request_timeout = timeout or LLM_TIMEOUT
request_model = model or LLM_MODEL
request_url = url or LLM_URL
payload = {
"model": request_model,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"temperature": temperature,
}
try:
resp = requests.post(request_url, json=payload, timeout=request_timeout)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"].strip()
except requests.exceptions.Timeout:
print(f" [Newsletter LLM] Timeout after {request_timeout}s", file=sys.stderr)
return None
except requests.exceptions.ConnectionError:
print(f" [Newsletter LLM] Connection failed to {LLM_URL}", file=sys.stderr)
return None
except Exception as e:
print(f" [Newsletter LLM] Error: {e}", file=sys.stderr)
return None
def _parse_json_response(text):
"""Parse JSON from LLM response, handling markdown code fences and whitespace."""
if not text:
return None
text = text.strip()
if text.startswith(""):
text = re.sub(r'^(?:json)?\s\n?', '', text)
text = re.sub(r'\n?```\s$', '', text)
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
match = re.search(r'[.*]', text, re.DOTALL)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
pass
return None
def _normalize_newsletter_item(item):
"""Normalize and validate a newsletter item dict from the LLM."""
item_type = item.get("type", "info")
valid_types = ("event", "reminder", "action_item", "info")
if item_type not in valid_types:
item_type = "info"
summary = str(item.get("summary", "")).strip() or "Newsletter item"
who = item.get("who", [])
if isinstance(who, str):
who = [who]
who = [str(w).strip() for w in who if w]
# Normalize nicknames
nicknames = get_nickname_map()
who = [nicknames.get(w.lower(), w) for w in who]
description = str(item.get("description", "")).strip()[:500]
location = str(item.get("location", "")).strip()
# Parse dates based on type
start_dt = None
end_dt = None
due_date = None
duration_minutes = int(item.get("duration_minutes", 60) or 60)
if item_type == "event":
start_str = item.get("start", "")
end_str = item.get("end", "")
start_dt = _parse_iso_datetime(start_str)
end_dt = _parse_iso_datetime(end_str)
if start_dt and not end_dt:
end_dt = start_dt + timedelta(minutes=duration_minutes)
if start_dt and end_dt:
duration_minutes = int((end_dt - start_dt).total_seconds() / 60)
# Day-of-week auto-correction for newsletter events
claimed_day = str(item.get("claimed_day_of_week", "")).strip()
if start_dt and claimed_day:
from icarus.core.appointment_parser import _correct_day_of_week
corrected_start, was_corrected = _correct_day_of_week(start_dt, claimed_day)
if was_corrected:
shift = corrected_start - start_dt
print(f" [DayFix] {summary}: LLM said {claimed_day} but date was "
f"{start_dt.strftime('%A %b %d')} → corrected to {corrected_start.strftime('%A %b %d')}",
file=sys.stderr)
start_dt = corrected_start
if end_dt:
end_dt = end_dt + shift
duration_minutes = int((end_dt - start_dt).total_seconds() / 60)
# Past-date guard
now = datetime.now(CHICAGO_TZ)
if start_dt and start_dt < now:
old_start = start_dt
if claimed_day:
from icarus.core.appointment_parser import _correct_day_of_week
start_dt, _ = _correct_day_of_week(start_dt, claimed_day)
if start_dt < now:
start_dt = start_dt + timedelta(days=7)
else:
start_dt = start_dt + timedelta(days=7)
shift = start_dt - old_start
print(f" [PastFix] {summary}: start was in the past ({old_start.strftime('%A %b %d')}) "
f"→ shifted to {start_dt.strftime('%A %b %d')}", file=sys.stderr)
if end_dt:
end_dt = end_dt + shift
duration_minutes = int((end_dt - start_dt).total_seconds() / 60)
if not start_dt:
# Date parse failed — no silent failures. Convert to info for Telegram digest.
print(f" [Newsletter] Event date unparseable, converting to info: {summary} (raw: {start_str})", file=sys.stderr)
item_type = "info"
description = f"{description} (original type: event, date unparseable: {start_str})".strip()[:500]
elif item_type in ("reminder", "action_item"):
due_str = item.get("due", "")
if due_str:
due_date = _parse_iso_date(due_str)
if not due_date:
# Try start as fallback for due date
start_str = item.get("start", "")
due_date = _parse_iso_date(start_str) if start_str else None
# Day-of-week correction for reminder/action due dates
reminder_claimed_day = str(item.get("claimed_day_of_week", "")).strip()
if due_date and reminder_claimed_day:
from icarus.core.appointment_parser import _correct_day_of_week
due_dt = datetime(due_date.year, due_date.month, due_date.day, tzinfo=CHICAGO_TZ)
corrected_due, was_fixed = _correct_day_of_week(due_dt, reminder_claimed_day)
if was_fixed:
print(f" [DayFix] {summary}: reminder due said {reminder_claimed_day} but date was "
f"{due_date.strftime('%A %b %d')} -> corrected to {corrected_due.strftime('%A %b %d')}",
file=sys.stderr)
due_date = corrected_due.date()
# Past-date guard for reminders
if due_date:
today = datetime.now(CHICAGO_TZ).date()
if due_date < today:
old_due = due_date
if reminder_claimed_day:
due_dt = datetime(due_date.year, due_date.month, due_date.day, tzinfo=CHICAGO_TZ)
corrected_due, _ = _correct_day_of_week(due_dt, reminder_claimed_day)
due_date = corrected_due.date()
if due_date < today:
due_date = due_date + timedelta(days=7)
else:
due_date = due_date + timedelta(days=7)
print(f" [PastFix] {summary}: reminder due was in the past ({old_due.strftime('%A %b %d')}) "
f"-> shifted to {due_date.strftime('%A %b %d')}", file=sys.stderr)
if not due_date:
# Date parse failed — convert to info for Telegram digest.
print(f" [Newsletter] {item_type} date unparseable, converting to info: {summary} (raw: {due_str})", file=sys.stderr)
item_type = "info"
description = f"{description} (original type: {item_type}, date unparseable: {due_str})".strip()[:500]
is_recurring = bool(item.get("is_recurring", False))
is_multi_day = bool(item.get("is_multi_day", False))
# Extract recurrence dict if present and valid
recurrence = None
if is_recurring:
rec_raw = item.get("recurrence")
if isinstance(rec_raw, dict):
from icarus.core.rrule_builder import validate_recurrence
errors = validate_recurrence(rec_raw)
if not errors:
recurrence = rec_raw
else:
print(f" [Newsletter] Invalid recurrence dict, ignoring: {'; '.join(errors)}", file=sys.stderr)
elif rec_raw:
print(f" [Newsletter] recurrence is not a dict, ignoring: {type(rec_raw).__name__}", file=sys.stderr)
result = {
"type": item_type,
"summary": summary,
"who": who,
"description": description,
"location": location,
"is_recurring": is_recurring,
"is_multi_day": is_multi_day,
# Relevance fields — always present per prompt contract
"relevance": item.get("relevance", "high") if item.get("relevance") in ("high", "low") else "high",
"reason": str(item.get("reason", "")).strip()[:300] or "No reason provided",
}
# Post-process: validate relevance against actual family grades
result = _validate_relevance_against_family(result)
if item_type == "event":
result["start"] = start_dt
result["end"] = end_dt
result["duration_minutes"] = duration_minutes
result["claimed_day_of_week"] = claimed_day
elif item_type in ("reminder", "action_item"):
result["due"] = due_date # date object or None
return result
def _parse_iso_datetime(s):
"""Parse an ISO 8601 datetime string into a timezone-aware datetime."""
if not s or not isinstance(s, str):
return None
s = s.strip()
if not s:
return None
try:
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=CHICAGO_TZ)
return dt.astimezone(CHICAGO_TZ)
except (ValueError, TypeError):
pass
for fmt in (
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M",
):
try:
dt = datetime.strptime(s, fmt)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=CHICAGO_TZ)
return dt.astimezone(CHICAGO_TZ)
except ValueError:
continue
print(f" [Newsletter Parse] Could not parse datetime: {s}", file=sys.stderr)
return None
def _parse_iso_date(s):
"""Parse an ISO 8601 date string (YYYY-MM-DD) into a date object."""
if not s or not isinstance(s, str):
return None
s = s.strip().split("T")[0] # Strip time component if present
try:
from datetime import date as date_type
return date_type.fromisoformat(s)
except (ValueError, TypeError):
print(f" [Newsletter Parse] Could not parse date: {s}", file=sys.stderr)
return None
def _validate_relevance_against_family(item):
"""Post-process LLM relevance decisions against actual family data.
The LLM sometimes marks grade-specific events as 'high' relevance even when
no child is in that grade. This function validates and corrects those errors.
Returns the item with potentially corrected relevance and reason.
"""
from icarus.core.config import load_family_config
# Only check items currently marked as high relevance
if item.get("relevance") != "high":
return item
summary = item.get("summary", "")
description = item.get("description", "")
text_to_check = f"{summary} {description}".lower()
# Get children's grades from config
try:
config = load_family_config()
children = [m for m in config.get("family", {}).get("members", [])
if m.get("role") in ("son", "daughter") and "baseline_grade" in m]
child_grades = {}
for child in children:
name = child.get("name", "")
grade = child.get("baseline_grade")
if name and grade is not None:
child_grades[name] = grade
except Exception:
# If config fails, trust the LLM's judgment
return item
if not child_grades:
return item
# Grade patterns to look for - order matters (more specific first)
# Each tuple: (regex pattern, min_grade, max_grade, label)
# For specific grades, min=max. For ranges, specify both.
grade_patterns = [
# Ranges first
(r'\bk[- ]?2\b|\bk\s*to\s*2\b|\bk[- ]?2nd\b', -1, 2, "K-2"), # K, 1, 2
(r'\bk[- ]?3\b|\bk\s*to\s*3\b|\bk[- ]?3rd\b', -1, 3, "K-3"), # K, 1, 2, 3
(r'\bk[- ]?4\b|\bk\s*to\s*4\b', -1, 4, "K-4"), # K through 4
(r'\bk[- ]?5\b|\bk\s*to\s*5\b', -1, 5, "K-5"), # K through 5
(r'\b1[- ]?3\b|\b1st[- ]?3rd\b|\b1\s*to\s*3\b', 1, 3, "grades 1-3"),
(r'\b3[- ]?5\b|\b3rd[- ]?5th\b|\b3\s*to\s*5\b', 3, 5, "grades 3-5"),
(r'\b4[- ]?6\b|\b4th[- ]?6th\b|\b4\s*to\s*6\b', 4, 6, "grades 4-6"),
(r'\bmiddle\s+school\b', 6, 8, "middle school"), # Rough: 6-8
(r'\belementary\s+school\b', -1, 5, "elementary school"), # K-5
# Specific grades after ranges
(r'\b5th\s+grade\b|\bfifth\s+grade\b', 5, 5, "5th grade"),
(r'\b4th\s+grade\b|\bfourth\s+grade\b', 4, 4, "4th grade"),
(r'\b3rd\s+grade\b|\bthird\s+grade\b', 3, 3, "3rd grade"),
(r'\b2nd\s+grade\b|\bsecond\s+grade\b', 2, 2, "2nd grade"),
(r'\b1st\s+grade\b|\bfirst\s+grade\b', 1, 1, "1st grade"),
(r'\bkindergarten\b', 0, 0, "Kindergarten"),
(r'\bpre[- ]?k\b|\bpreschool\b', -1, -1, "Pre-K"),
]
# Check if text mentions a grade or range
mentioned_min = None
mentioned_max = None
grade_label = None
for pattern, min_grade, max_grade, label in grade_patterns:
if re.search(pattern, text_to_check):
mentioned_min = min_grade
mentioned_max = max_grade
grade_label = label
break
# If a grade/range is mentioned, verify ANY child falls within that range
if mentioned_min is not None and mentioned_max is not None:
matching_children = [name for name, grade in child_grades.items()
if mentioned_min <= grade <= mentioned_max]
if not matching_children:
# No child in this grade range — downgrade to low relevance
child_grade_list = ", ".join([f"{n}: {_grade_num_to_str(g)}" for n, g in sorted(child_grades.items())])
item["relevance"] = "low"
item["reason"] = f"No child in {grade_label}. Current: {child_grade_list}"
print(f" [Relevance Fix] Downgraded '{summary[:50]}...' to low: no child in {grade_label}", file=sys.stderr)
return item
def _grade_num_to_str(grade_num):
"""Convert numeric grade to readable string."""
if grade_num == -1:
return "Pre-K"
elif grade_num == 0:
return "K"
else:
return f"{grade_num}th"