📄 appointment_parser.py 11,654 bytes Apr 19, 2026 📋 Raw

"""LLM-based appointment parsing (Prompt-as-Code)."""

import json
import re
import sys
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

import requests

from family_assistant.config import (
LLM_URL,
LLM_MODEL,
LLM_TIMEOUT,
CHICAGO_TZ,
MAX_BODY_CHARS,
get_nickname_map,
load_prompts,
)

def _call_llm(system, user, temperature=0):
"""Send a chat completion request to the local LLM endpoint."""
payload = {
"model": LLM_MODEL,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"temperature": temperature,
}
try:
resp = requests.post(LLM_URL, json=payload, timeout=LLM_TIMEOUT)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"].strip()
except requests.exceptions.Timeout:
print(f" [LLM] Timeout after {LLM_TIMEOUT}s", file=sys.stderr)
return None
except requests.exceptions.ConnectionError:
print(f" [LLM] Connection failed to {LLM_URL}", file=sys.stderr)
return None
except Exception as e:
print(f" [LLM] Error: {e}", file=sys.stderr)
return None

def _parse_json_response(text):
"""Parse JSON from LLM response, handling markdown code fences and whitespace."""
if not text:
return None
# Strip markdown code fences if present
text = text.strip()
if text.startswith(""): # Remove opening fence (with optional language tag) text = re.sub(r'^(?:json)?\s\n?', '', text)
# Remove closing fence
text = re.sub(r'\n?```\s
$', '', text)
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
# Try to find JSON array in the response
match = re.search(r'[.*]', text, re.DOTALL)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
pass
return None

def parse_email_with_llm(subject, body, from_addr="", date_str=""):
"""
Send email content to the local LLM and parse the JSON response.
Returns a list of appointment dicts, or empty list if nothing found / error.
"""
prompts = load_prompts()
system_template = prompts["appointment_extract"]
retry_suffix = prompts["appointment_retry"]

today = datetime.now(CHICAGO_TZ)
today_str = today.strftime("%Y-%m-%d")
today_day = today.strftime("%A")
system_msg = system_template.format(today=today_str, today_day=today_day)

# Trim body to keep token burn low
trimmed_body = body[:MAX_BODY_CHARS] if body else ""

user_msg = f"Subject: {subject}\nFrom: {from_addr}\n\n{trimmed_body}"

# First attempt
raw = _call_llm(system_msg, user_msg)
parsed = _parse_json_response(raw)

# Retry once with stricter prompt if JSON parsing failed
if parsed is None and raw is not None:
    print("  [LLM] Invalid JSON, retrying with stricter prompt...", file=sys.stderr)
    retry_system = system_template.format(today=today) + "\n" + retry_suffix
    raw = _call_llm(retry_system, user_msg)
    parsed = _parse_json_response(raw)

if parsed is None:
    print("  [LLM] Could not parse JSON response", file=sys.stderr)
    return []

# Validate: must be a list
if not isinstance(parsed, list):
    print(f"  [LLM] Expected list, got {type(parsed).__name__}", file=sys.stderr)
    return []

# Validate and normalize each element
results = []
for item in parsed:
    if not isinstance(item, dict):
        continue
    apt = _normalize_appointment(item)
    if apt:
        results.append(apt)

return results

def _correct_day_of_week(start_dt, claimed_day):
"""If the parsed date doesn't fall on the claimed day of week, find the
nearest future date that actually falls on the claimed day.

This catches LLM date math errors where e.g. "Monday" was resolved to
a Tuesday date. We find the CLOSEST future date matching the claimed day
relative to today (not relative to the wrong date), so we don't overshoot.

Returns (corrected_dt, was_corrected_bool).
"""
if not claimed_day or not start_dt:
    return start_dt, False

DAY_MAP = {
    'monday': 0, 'mon': 0,
    'tuesday': 1, 'tue': 1, 'tu': 1, 'tues': 1,
    'wednesday': 2, 'wed': 2,
    'thursday': 3, 'thu': 3, 'thur': 3, 'thurs': 3,
    'friday': 4, 'fri': 4,
    'saturday': 5, 'sat': 5,
    'sunday': 6, 'sun': 6,
}

target_weekday = DAY_MAP.get(claimed_day.lower().strip())
if target_weekday is None:
    return start_dt, False

actual_weekday = start_dt.weekday()  # Monday=0
if actual_weekday == target_weekday:
    return start_dt, False  # already correct

# Find the closest future date (from today) that falls on the target weekday.
# Preserve the time-of-day from the original parsed datetime.
now = datetime.now(CHICAGO_TZ)
today_weekday = now.weekday()
days_from_today = (target_weekday - today_weekday) % 7
if days_from_today == 0:
    # Target day is today — check if the event time is still in the future
    candidate = now.replace(hour=start_dt.hour, minute=start_dt.minute,
                            second=start_dt.second, microsecond=0)
    if candidate > now:
        days_from_today = 0
    else:
        days_from_today = 7  # today's time already passed, use next week

target_date = now.date() + timedelta(days=days_from_today)
corrected = start_dt.replace(year=target_date.year, month=target_date.month,
                              day=target_date.day)
return corrected, True

def _normalize_appointment(item):
"""Normalize and validate an appointment dict from the LLM."""
apt_type = item.get("type", "appointment")
if apt_type not in ("appointment", "cancellation"):
apt_type = "appointment"

summary = str(item.get("summary", "")).strip() or "Appointment"
who = item.get("who", [])
if isinstance(who, str):
    who = [who]
who = [str(w).strip() for w in who if w]

# Normalize nicknames from family config
nicknames = get_nickname_map()
who = [nicknames.get(w.lower(), w) for w in who]

# Parse start/end datetimes
start_str = item.get("start", "")
end_str = item.get("end", "")
start_dt = _parse_iso_datetime(start_str)
end_dt = _parse_iso_datetime(end_str)

duration_minutes = int(item.get("duration_minutes", 60) or 60)
if apt_type == "cancellation":
    duration_minutes = 0

# If we have start but no end, compute end from duration
if start_dt and not end_dt:
    end_dt = start_dt + timedelta(minutes=duration_minutes)

# If we have both start and end, compute duration
if start_dt and end_dt:
    duration_minutes = int((end_dt - start_dt).total_seconds() / 60)

# Day-of-week auto-correction: if the LLM resolved a date wrong
# (e.g., said Monday but picked a Tuesday date), shift to the
# correct future date matching the claimed day.
claimed_day = str(item.get("claimed_day_of_week", "")).strip()
if start_dt and claimed_day:
    corrected_start, was_corrected = _correct_day_of_week(start_dt, claimed_day)
    if was_corrected:
        shift = corrected_start - start_dt
        print(f"  [DayFix] {summary}: LLM said {claimed_day} but date was "
              f"{start_dt.strftime('%A %b %d')} → corrected to {corrected_start.strftime('%A %b %d')}",
              file=sys.stderr)
        start_dt = corrected_start
        if end_dt:
            end_dt = end_dt + shift
        # Recompute duration after shift
        if start_dt and end_dt:
            duration_minutes = int((end_dt - start_dt).total_seconds() / 60)

# Past-date guard: if start is in the past, shift forward.
now = datetime.now(CHICAGO_TZ)
if start_dt and start_dt < now:
    old_start = start_dt
    if claimed_day:
        # Shift to next occurrence of the claimed day
        start_dt, _ = _correct_day_of_week(start_dt, claimed_day)
        # If still in the past (same weekday but earlier today), add a week
        if start_dt < now:
            start_dt = start_dt + timedelta(days=7)
    else:
        # No day hint  just push forward 7 days
        start_dt = start_dt + timedelta(days=7)
    shift = start_dt - old_start
    print(f"  [PastFix] {summary}: start was in the past ({old_start.strftime('%A %b %d')}) "
          f"→ shifted to {start_dt.strftime('%A %b %d')}", file=sys.stderr)
    if end_dt:
        end_dt = end_dt + shift
    if start_dt and end_dt:
        duration_minutes = int((end_dt - start_dt).total_seconds() / 60)

location = str(item.get("location", "")).strip()
is_recurring = bool(item.get("is_recurring", False))
is_multi_day = bool(item.get("is_multi_day", False))
description = str(item.get("description", "")).strip()[:500]

# Extract recurrence dict if present and valid
recurrence = None
if is_recurring:
    rec_raw = item.get("recurrence")
    if isinstance(rec_raw, dict):
        # Validate via rrule_builder before accepting
        from family_assistant.rrule_builder import validate_recurrence
        errors = validate_recurrence(rec_raw)
        if not errors:
            recurrence = rec_raw
        else:
            print(f"  [LLM] Invalid recurrence dict, ignoring: {'; '.join(errors)}", file=sys.stderr)
    elif rec_raw:
        print(f"  [LLM] recurrence is not a dict, ignoring: {type(rec_raw).__name__}", file=sys.stderr)

# Sanity check: if it's supposed to be an appointment but has no start, skip it
if apt_type == "appointment" and not start_dt:
    # Try to salvage with just the date string
    print(f"  [LLM] Appointment missing start datetime: {summary}", file=sys.stderr)
    return None

result = {
    "type": apt_type,
    "summary": summary,
    "who": who,
    "start": start_dt,
    "end": end_dt,
    "duration_minutes": duration_minutes,
    "location": location,
    "is_recurring": is_recurring,
    "is_multi_day": is_multi_day,
    "description": description,
    "claimed_day_of_week": claimed_day,
}
if recurrence:
    result["recurrence"] = recurrence
return result

def _parse_iso_datetime(s):
"""Parse an ISO 8601 datetime string into a timezone-aware datetime."""
if not s or not isinstance(s, str):
return None
s = s.strip()
if not s:
return None

# Try standard ISO format
for fmt in (
    "%Y-%m-%dT%H:%M:%S%z",
    "%Y-%m-%dT%H:%M:%S.%f%z",
    "%Y-%m-%dT%H:%M:%S",
    "%Y-%m-%dT%H:%M",
):
    try:
        dt = datetime.strptime(s, fmt)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=CHICAGO_TZ)
        return dt.astimezone(CHICAGO_TZ)
    except ValueError:
        continue

# Try handling timezone abbreviations like -05:00 or CST
# Python 3.7+ handles %z with colon
try:
    dt = datetime.fromisoformat(s)
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=CHICAGO_TZ)
    return dt.astimezone(CHICAGO_TZ)
except (ValueError, TypeError):
    pass

print(f"  [Parse] Could not parse datetime: {s}", file=sys.stderr)
return None