"""LLM-based appointment parsing (Prompt-as-Code).""" import json import re import sys from datetime import datetime, timedelta from zoneinfo import ZoneInfo import requests from family_assistant.config import ( LLM_URL, LLM_MODEL, LLM_TIMEOUT, CHICAGO_TZ, MAX_BODY_CHARS, get_nickname_map, load_prompts, ) def _call_llm(system, user, temperature=0): """Send a chat completion request to the local LLM endpoint.""" payload = { "model": LLM_MODEL, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": user}, ], "temperature": temperature, } try: resp = requests.post(LLM_URL, json=payload, timeout=LLM_TIMEOUT) resp.raise_for_status() data = resp.json() return data["choices"][0]["message"]["content"].strip() except requests.exceptions.Timeout: print(f" [LLM] Timeout after {LLM_TIMEOUT}s", file=sys.stderr) return None except requests.exceptions.ConnectionError: print(f" [LLM] Connection failed to {LLM_URL}", file=sys.stderr) return None except Exception as e: print(f" [LLM] Error: {e}", file=sys.stderr) return None def _parse_json_response(text): """Parse JSON from LLM response, handling markdown code fences and whitespace.""" if not text: return None # Strip markdown code fences if present text = text.strip() if text.startswith("```"): # Remove opening fence (with optional language tag) text = re.sub(r'^```(?:json)?\s*\n?', '', text) # Remove closing fence text = re.sub(r'\n?```\s*$', '', text) text = text.strip() try: return json.loads(text) except json.JSONDecodeError: # Try to find JSON array in the response match = re.search(r'\[.*\]', text, re.DOTALL) if match: try: return json.loads(match.group(0)) except json.JSONDecodeError: pass return None def parse_email_with_llm(subject, body, from_addr="", date_str=""): """ Send email content to the local LLM and parse the JSON response. Returns a list of appointment dicts, or empty list if nothing found / error. """ prompts = load_prompts() system_template = prompts["appointment_extract"] retry_suffix = prompts["appointment_retry"] today = datetime.now(CHICAGO_TZ) today_str = today.strftime("%Y-%m-%d") today_day = today.strftime("%A") system_msg = system_template.format(today=today_str, today_day=today_day) # Trim body to keep token burn low trimmed_body = body[:MAX_BODY_CHARS] if body else "" user_msg = f"Subject: {subject}\nFrom: {from_addr}\n\n{trimmed_body}" # First attempt raw = _call_llm(system_msg, user_msg) parsed = _parse_json_response(raw) # Retry once with stricter prompt if JSON parsing failed if parsed is None and raw is not None: print(" [LLM] Invalid JSON, retrying with stricter prompt...", file=sys.stderr) retry_system = system_template.format(today=today) + "\n" + retry_suffix raw = _call_llm(retry_system, user_msg) parsed = _parse_json_response(raw) if parsed is None: print(" [LLM] Could not parse JSON response", file=sys.stderr) return [] # Validate: must be a list if not isinstance(parsed, list): print(f" [LLM] Expected list, got {type(parsed).__name__}", file=sys.stderr) return [] # Validate and normalize each element results = [] for item in parsed: if not isinstance(item, dict): continue apt = _normalize_appointment(item) if apt: results.append(apt) return results def _correct_day_of_week(start_dt, claimed_day): """If the parsed date doesn't fall on the claimed day of week, find the nearest future date that actually falls on the claimed day. This catches LLM date math errors where e.g. "Monday" was resolved to a Tuesday date. We find the CLOSEST future date matching the claimed day relative to today (not relative to the wrong date), so we don't overshoot. Returns (corrected_dt, was_corrected_bool). """ if not claimed_day or not start_dt: return start_dt, False DAY_MAP = { 'monday': 0, 'mon': 0, 'tuesday': 1, 'tue': 1, 'tu': 1, 'tues': 1, 'wednesday': 2, 'wed': 2, 'thursday': 3, 'thu': 3, 'thur': 3, 'thurs': 3, 'friday': 4, 'fri': 4, 'saturday': 5, 'sat': 5, 'sunday': 6, 'sun': 6, } target_weekday = DAY_MAP.get(claimed_day.lower().strip()) if target_weekday is None: return start_dt, False actual_weekday = start_dt.weekday() # Monday=0 if actual_weekday == target_weekday: return start_dt, False # already correct # Find the closest future date (from today) that falls on the target weekday. # Preserve the time-of-day from the original parsed datetime. now = datetime.now(CHICAGO_TZ) today_weekday = now.weekday() days_from_today = (target_weekday - today_weekday) % 7 if days_from_today == 0: # Target day is today — check if the event time is still in the future candidate = now.replace(hour=start_dt.hour, minute=start_dt.minute, second=start_dt.second, microsecond=0) if candidate > now: days_from_today = 0 else: days_from_today = 7 # today's time already passed, use next week target_date = now.date() + timedelta(days=days_from_today) corrected = start_dt.replace(year=target_date.year, month=target_date.month, day=target_date.day) return corrected, True def _normalize_appointment(item): """Normalize and validate an appointment dict from the LLM.""" apt_type = item.get("type", "appointment") if apt_type not in ("appointment", "cancellation"): apt_type = "appointment" summary = str(item.get("summary", "")).strip() or "Appointment" who = item.get("who", []) if isinstance(who, str): who = [who] who = [str(w).strip() for w in who if w] # Normalize nicknames from family config nicknames = get_nickname_map() who = [nicknames.get(w.lower(), w) for w in who] # Parse start/end datetimes start_str = item.get("start", "") end_str = item.get("end", "") start_dt = _parse_iso_datetime(start_str) end_dt = _parse_iso_datetime(end_str) duration_minutes = int(item.get("duration_minutes", 60) or 60) if apt_type == "cancellation": duration_minutes = 0 # If we have start but no end, compute end from duration if start_dt and not end_dt: end_dt = start_dt + timedelta(minutes=duration_minutes) # If we have both start and end, compute duration if start_dt and end_dt: duration_minutes = int((end_dt - start_dt).total_seconds() / 60) # Day-of-week auto-correction: if the LLM resolved a date wrong # (e.g., said Monday but picked a Tuesday date), shift to the # correct future date matching the claimed day. claimed_day = str(item.get("claimed_day_of_week", "")).strip() if start_dt and claimed_day: corrected_start, was_corrected = _correct_day_of_week(start_dt, claimed_day) if was_corrected: shift = corrected_start - start_dt print(f" [DayFix] {summary}: LLM said {claimed_day} but date was " f"{start_dt.strftime('%A %b %d')} → corrected to {corrected_start.strftime('%A %b %d')}", file=sys.stderr) start_dt = corrected_start if end_dt: end_dt = end_dt + shift # Recompute duration after shift if start_dt and end_dt: duration_minutes = int((end_dt - start_dt).total_seconds() / 60) # Past-date guard: if start is in the past, shift forward. now = datetime.now(CHICAGO_TZ) if start_dt and start_dt < now: old_start = start_dt if claimed_day: # Shift to next occurrence of the claimed day start_dt, _ = _correct_day_of_week(start_dt, claimed_day) # If still in the past (same weekday but earlier today), add a week if start_dt < now: start_dt = start_dt + timedelta(days=7) else: # No day hint — just push forward 7 days start_dt = start_dt + timedelta(days=7) shift = start_dt - old_start print(f" [PastFix] {summary}: start was in the past ({old_start.strftime('%A %b %d')}) " f"→ shifted to {start_dt.strftime('%A %b %d')}", file=sys.stderr) if end_dt: end_dt = end_dt + shift if start_dt and end_dt: duration_minutes = int((end_dt - start_dt).total_seconds() / 60) location = str(item.get("location", "")).strip() is_recurring = bool(item.get("is_recurring", False)) is_multi_day = bool(item.get("is_multi_day", False)) description = str(item.get("description", "")).strip()[:500] # Extract recurrence dict if present and valid recurrence = None if is_recurring: rec_raw = item.get("recurrence") if isinstance(rec_raw, dict): # Validate via rrule_builder before accepting from family_assistant.rrule_builder import validate_recurrence errors = validate_recurrence(rec_raw) if not errors: recurrence = rec_raw else: print(f" [LLM] Invalid recurrence dict, ignoring: {'; '.join(errors)}", file=sys.stderr) elif rec_raw: print(f" [LLM] recurrence is not a dict, ignoring: {type(rec_raw).__name__}", file=sys.stderr) # Sanity check: if it's supposed to be an appointment but has no start, skip it if apt_type == "appointment" and not start_dt: # Try to salvage with just the date string print(f" [LLM] Appointment missing start datetime: {summary}", file=sys.stderr) return None result = { "type": apt_type, "summary": summary, "who": who, "start": start_dt, "end": end_dt, "duration_minutes": duration_minutes, "location": location, "is_recurring": is_recurring, "is_multi_day": is_multi_day, "description": description, "claimed_day_of_week": claimed_day, } if recurrence: result["recurrence"] = recurrence return result def _parse_iso_datetime(s): """Parse an ISO 8601 datetime string into a timezone-aware datetime.""" if not s or not isinstance(s, str): return None s = s.strip() if not s: return None # Try standard ISO format for fmt in ( "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M", ): try: dt = datetime.strptime(s, fmt) if dt.tzinfo is None: dt = dt.replace(tzinfo=CHICAGO_TZ) return dt.astimezone(CHICAGO_TZ) except ValueError: continue # Try handling timezone abbreviations like -05:00 or CST # Python 3.7+ handles %z with colon try: dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=CHICAGO_TZ) return dt.astimezone(CHICAGO_TZ) except (ValueError, TypeError): pass print(f" [Parse] Could not parse datetime: {s}", file=sys.stderr) return None