"""LLM-based appointment parsing (Prompt-as-Code)."""
import json
import re
import sys
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
from family_assistant.config import (
LLM_URL,
LLM_MODEL,
LLM_TIMEOUT,
CHICAGO_TZ,
MAX_BODY_CHARS,
get_nickname_map,
load_prompts,
)
def _call_llm(system, user, temperature=0):
"""Send a chat completion request to the local LLM endpoint."""
payload = {
"model": LLM_MODEL,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"temperature": temperature,
}
try:
resp = requests.post(LLM_URL, json=payload, timeout=LLM_TIMEOUT)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"].strip()
except requests.exceptions.Timeout:
print(f" [LLM] Timeout after {LLM_TIMEOUT}s", file=sys.stderr)
return None
except requests.exceptions.ConnectionError:
print(f" [LLM] Connection failed to {LLM_URL}", file=sys.stderr)
return None
except Exception as e:
print(f" [LLM] Error: {e}", file=sys.stderr)
return None
def _parse_json_response(text):
"""Parse JSON from LLM response, handling markdown code fences and whitespace."""
if not text:
return None
# Strip markdown code fences if present
text = text.strip()
if text.startswith(""):
# Remove opening fence (with optional language tag)
text = re.sub(r'^(?:json)?\s\n?', '', text)
# Remove closing fence
text = re.sub(r'\n?```\s$', '', text)
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
# Try to find JSON array in the response
match = re.search(r'[.*]', text, re.DOTALL)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
pass
return None
def parse_email_with_llm(subject, body, from_addr="", date_str=""):
"""
Send email content to the local LLM and parse the JSON response.
Returns a list of appointment dicts, or empty list if nothing found / error.
"""
prompts = load_prompts()
system_template = prompts["appointment_extract"]
retry_suffix = prompts["appointment_retry"]
today = datetime.now(CHICAGO_TZ)
today_str = today.strftime("%Y-%m-%d")
today_day = today.strftime("%A")
system_msg = system_template.format(today=today_str, today_day=today_day)
# Trim body to keep token burn low
trimmed_body = body[:MAX_BODY_CHARS] if body else ""
user_msg = f"Subject: {subject}\nFrom: {from_addr}\n\n{trimmed_body}"
# First attempt
raw = _call_llm(system_msg, user_msg)
parsed = _parse_json_response(raw)
# Retry once with stricter prompt if JSON parsing failed
if parsed is None and raw is not None:
print(" [LLM] Invalid JSON, retrying with stricter prompt...", file=sys.stderr)
retry_system = system_template.format(today=today) + "\n" + retry_suffix
raw = _call_llm(retry_system, user_msg)
parsed = _parse_json_response(raw)
if parsed is None:
print(" [LLM] Could not parse JSON response", file=sys.stderr)
return []
# Validate: must be a list
if not isinstance(parsed, list):
print(f" [LLM] Expected list, got {type(parsed).__name__}", file=sys.stderr)
return []
# Validate and normalize each element
results = []
for item in parsed:
if not isinstance(item, dict):
continue
apt = _normalize_appointment(item)
if apt:
results.append(apt)
return results
def _correct_day_of_week(start_dt, claimed_day):
"""If the parsed date doesn't fall on the claimed day of week, find the
nearest future date that actually falls on the claimed day.
This catches LLM date math errors where e.g. "Monday" was resolved to
a Tuesday date. We find the CLOSEST future date matching the claimed day
relative to today (not relative to the wrong date), so we don't overshoot.
Returns (corrected_dt, was_corrected_bool).
"""
if not claimed_day or not start_dt:
return start_dt, False
DAY_MAP = {
'monday': 0, 'mon': 0,
'tuesday': 1, 'tue': 1, 'tu': 1, 'tues': 1,
'wednesday': 2, 'wed': 2,
'thursday': 3, 'thu': 3, 'thur': 3, 'thurs': 3,
'friday': 4, 'fri': 4,
'saturday': 5, 'sat': 5,
'sunday': 6, 'sun': 6,
}
target_weekday = DAY_MAP.get(claimed_day.lower().strip())
if target_weekday is None:
return start_dt, False
actual_weekday = start_dt.weekday() # Monday=0
if actual_weekday == target_weekday:
return start_dt, False # already correct
# Find the closest future date (from today) that falls on the target weekday.
# Preserve the time-of-day from the original parsed datetime.
now = datetime.now(CHICAGO_TZ)
today_weekday = now.weekday()
days_from_today = (target_weekday - today_weekday) % 7
if days_from_today == 0:
# Target day is today — check if the event time is still in the future
candidate = now.replace(hour=start_dt.hour, minute=start_dt.minute,
second=start_dt.second, microsecond=0)
if candidate > now:
days_from_today = 0
else:
days_from_today = 7 # today's time already passed, use next week
target_date = now.date() + timedelta(days=days_from_today)
corrected = start_dt.replace(year=target_date.year, month=target_date.month,
day=target_date.day)
return corrected, True
def _normalize_appointment(item):
"""Normalize and validate an appointment dict from the LLM."""
apt_type = item.get("type", "appointment")
if apt_type not in ("appointment", "cancellation"):
apt_type = "appointment"
summary = str(item.get("summary", "")).strip() or "Appointment"
who = item.get("who", [])
if isinstance(who, str):
who = [who]
who = [str(w).strip() for w in who if w]
# Normalize nicknames from family config
nicknames = get_nickname_map()
who = [nicknames.get(w.lower(), w) for w in who]
# Parse start/end datetimes
start_str = item.get("start", "")
end_str = item.get("end", "")
start_dt = _parse_iso_datetime(start_str)
end_dt = _parse_iso_datetime(end_str)
duration_minutes = int(item.get("duration_minutes", 60) or 60)
if apt_type == "cancellation":
duration_minutes = 0
# If we have start but no end, compute end from duration
if start_dt and not end_dt:
end_dt = start_dt + timedelta(minutes=duration_minutes)
# If we have both start and end, compute duration
if start_dt and end_dt:
duration_minutes = int((end_dt - start_dt).total_seconds() / 60)
# Day-of-week auto-correction: if the LLM resolved a date wrong
# (e.g., said Monday but picked a Tuesday date), shift to the
# correct future date matching the claimed day.
claimed_day = str(item.get("claimed_day_of_week", "")).strip()
if start_dt and claimed_day:
corrected_start, was_corrected = _correct_day_of_week(start_dt, claimed_day)
if was_corrected:
shift = corrected_start - start_dt
print(f" [DayFix] {summary}: LLM said {claimed_day} but date was "
f"{start_dt.strftime('%A %b %d')} → corrected to {corrected_start.strftime('%A %b %d')}",
file=sys.stderr)
start_dt = corrected_start
if end_dt:
end_dt = end_dt + shift
# Recompute duration after shift
if start_dt and end_dt:
duration_minutes = int((end_dt - start_dt).total_seconds() / 60)
# Past-date guard: if start is in the past, shift forward.
now = datetime.now(CHICAGO_TZ)
if start_dt and start_dt < now:
old_start = start_dt
if claimed_day:
# Shift to next occurrence of the claimed day
start_dt, _ = _correct_day_of_week(start_dt, claimed_day)
# If still in the past (same weekday but earlier today), add a week
if start_dt < now:
start_dt = start_dt + timedelta(days=7)
else:
# No day hint — just push forward 7 days
start_dt = start_dt + timedelta(days=7)
shift = start_dt - old_start
print(f" [PastFix] {summary}: start was in the past ({old_start.strftime('%A %b %d')}) "
f"→ shifted to {start_dt.strftime('%A %b %d')}", file=sys.stderr)
if end_dt:
end_dt = end_dt + shift
if start_dt and end_dt:
duration_minutes = int((end_dt - start_dt).total_seconds() / 60)
location = str(item.get("location", "")).strip()
is_recurring = bool(item.get("is_recurring", False))
is_multi_day = bool(item.get("is_multi_day", False))
description = str(item.get("description", "")).strip()[:500]
# Extract recurrence dict if present and valid
recurrence = None
if is_recurring:
rec_raw = item.get("recurrence")
if isinstance(rec_raw, dict):
# Validate via rrule_builder before accepting
from family_assistant.rrule_builder import validate_recurrence
errors = validate_recurrence(rec_raw)
if not errors:
recurrence = rec_raw
else:
print(f" [LLM] Invalid recurrence dict, ignoring: {'; '.join(errors)}", file=sys.stderr)
elif rec_raw:
print(f" [LLM] recurrence is not a dict, ignoring: {type(rec_raw).__name__}", file=sys.stderr)
# Sanity check: if it's supposed to be an appointment but has no start, skip it
if apt_type == "appointment" and not start_dt:
# Try to salvage with just the date string
print(f" [LLM] Appointment missing start datetime: {summary}", file=sys.stderr)
return None
result = {
"type": apt_type,
"summary": summary,
"who": who,
"start": start_dt,
"end": end_dt,
"duration_minutes": duration_minutes,
"location": location,
"is_recurring": is_recurring,
"is_multi_day": is_multi_day,
"description": description,
"claimed_day_of_week": claimed_day,
}
if recurrence:
result["recurrence"] = recurrence
return result
def _parse_iso_datetime(s):
"""Parse an ISO 8601 datetime string into a timezone-aware datetime."""
if not s or not isinstance(s, str):
return None
s = s.strip()
if not s:
return None
# Try standard ISO format
for fmt in (
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M",
):
try:
dt = datetime.strptime(s, fmt)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=CHICAGO_TZ)
return dt.astimezone(CHICAGO_TZ)
except ValueError:
continue
# Try handling timezone abbreviations like -05:00 or CST
# Python 3.7+ handles %z with colon
try:
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=CHICAGO_TZ)
return dt.astimezone(CHICAGO_TZ)
except (ValueError, TypeError):
pass
print(f" [Parse] Could not parse datetime: {s}", file=sys.stderr)
return None