"""Intent Engine — Parse natural-language calendar mutations from chat messages. Routes user messages through the local LLM to extract structured intents (move, cancel, add, rename, reject), then executes them against Radicale CalDAV. This is the chat interface for the family calendar — the "Aundrea correction" flow. """ import json import re from datetime import datetime, timedelta from zoneinfo import ZoneInfo from family_assistant.config import ( CHICAGO_TZ, LLM_MODEL, LLM_URL, load_family_config, ) from family_assistant.calendar_sync import ( create_event, create_recurring_event, event_exists, get_calendar_service, cancel_recurring_instance, cancel_recurring_series, find_recurring_instance, _event_to_dict, _build_ics, ) # Load prompt template import os PROMPT_DIR = os.path.join(os.path.dirname(__file__), "prompts") INTENT_PROMPT_PATH = os.path.join(PROMPT_DIR, "chat_intent.txt") def _load_prompt(): with open(INTENT_PROMPT_PATH) as f: return f.read() def _call_llm(prompt: str) -> str: """Call the local LLM and return the response text.""" import requests payload = { "model": LLM_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "stream": False, } resp = requests.post(LLM_URL, json=payload, timeout=60) resp.raise_for_status() data = resp.json() # Handle both OpenAI and Ollama response formats choices = data.get("choices", []) if choices: return choices[0].get("message", {}).get("content", "").strip() # Ollama format if "message" in data: return data["message"].get("content", "").strip() return data.get("response", "").strip() def _build_prompt(message: str) -> str: """Build the intent extraction prompt with today's date and family config.""" today = datetime.now(CHICAGO_TZ).strftime("%A, %B %d, %Y") family = load_family_config() # Family members list family_data = family.get("family", {}) members_list = family_data.get("members", []) if isinstance(family_data, dict) else family_data members = [] nickname_rules = [] for member in members_list: name = member.get("name", "") if isinstance(member, dict) else str(member) members.append(name) if isinstance(member, dict): for nick in member.get("nicknames", []): nickname_rules.append(f'"{nick}" → "{name}"') template = _load_prompt() prompt = template.replace("{today}", today) prompt = prompt.replace("{family_members}", ", ".join(members)) prompt = prompt.replace("{nickname_rules}", "\n".join(nickname_rules)) prompt += f"\n\nUser message: {message}" return prompt def parse_intent(message: str) -> dict: """Parse a natural-language message into a structured calendar intent. Returns a dict with at least a 'type' key: - move: change date/time of an event - cancel: remove an event - add: create a new event - rename: change an event title - reject: add a rejection rule - none: not a calendar-related message """ prompt = _build_prompt(message) raw = _call_llm(prompt) # Strip markdown code fences if present raw = re.sub(r"^```(?:json)?\s*", "", raw) raw = re.sub(r"\s*```$", "", raw) try: intent = json.loads(raw) except json.JSONDecodeError: # Try to extract JSON from the response match = re.search(r"\{.*\}", raw, re.DOTALL) if match: try: intent = json.loads(match.group()) except json.JSONDecodeError: return {"type": "error", "raw": raw, "error": "Could not parse LLM response as JSON"} else: return {"type": "error", "raw": raw, "error": "No JSON found in LLM response"} return intent def _caldav_search(start=None, end=None): """Search CalDAV events in a date range. Returns list of event dicts.""" calendar = get_calendar_service() now = start or datetime.now(timezone.utc) end_time = end or (now + timedelta(days=90)) try: event_objs = calendar.date_search(start=now, end=end_time) except Exception: event_objs = calendar.events() results = [] for obj in event_objs: d = _event_to_dict(obj) # Attach the raw caldav object for mutation operations d["_caldav_obj"] = obj results.append(d) return results def _find_event(summary: str): """Find a calendar event matching the given summary (fuzzy match). Returns the event dict or None. """ events = _caldav_search() # Exact match first summary_lower = summary.lower().strip() for event in events: if event.get("summary", "").lower() == summary_lower: return event # Also match without parenthetical who event_summary = event.get("summary", "") if event_summary.lower().startswith(summary_lower + " ("): return event # Partial match for event in events: if summary_lower in event.get("summary", "").lower(): return event # Base-name match: strip parenthetical who and trailing filler words def _base_name(s): s = s.lower().strip() s = re.sub(r'\s*\(.*\)\s*$', '', s) s = re.sub(r'\s+(session|appointment|class|lesson|visit|meeting)$', '', s) return s.strip() summary_base = _base_name(summary) for event in events: event_base = _base_name(event.get("summary", "")) if summary_base and event_base and (summary_base in event_base or event_base in summary_base): return event return None def _find_events_by_who(who: list) -> list: """Find upcoming events matching any of the given family members.""" events = _caldav_search() matched = [] for event in events: event_summary = event.get("summary", "") for name in who: if name.lower() in event_summary.lower(): matched.append(event) break return matched def execute_intent(intent: dict, dry_run: bool = False) -> dict: """Execute a parsed intent against the calendar or Family Brain. Returns a result dict with status and details. """ intent_type = intent.get("type", "none") if intent_type == "none": return {"status": "ignored", "message": "Not a calendar intent"} if intent_type == "chatter": return {"status": "chatter", "message": "Acknowledgment — no action needed"} if intent_type == "question": return _execute_question(intent) if intent_type == "error": return {"status": "error", "message": intent.get("error", "Unknown error"), "raw": intent.get("raw")} if intent_type == "move": return _execute_move(intent, dry_run) if intent_type == "cancel": return _execute_cancel(intent, dry_run) if intent_type == "add": return _execute_add(intent, dry_run) if intent_type == "rename": return _execute_rename(intent, dry_run) if intent_type == "reject": # Delegate to the rejection engine from family_assistant.rejection_engine import reject_event summary = intent.get("summary", "") reason = intent.get("reason", "") scope = intent.get("scope", "event") result = reject_event(summary, message=reason, scope=scope, delete_from_calendar=not dry_run) return {"status": "rejected", "summary": summary, "scope": scope, "details": result} if intent_type == "remind": return _execute_remind(intent, dry_run) return {"status": "unknown", "type": intent_type} def _update_caldav_event(event_dict, updates: dict): """Update a CalDAV event with new field values. Args: event_dict: Event dict with '_caldav_obj' attached updates: Dict of fields to update (summary, start, end, description, location) """ from icalendar import Calendar as iCalendar obj = event_dict.get("_caldav_obj") if not obj: raise ValueError("No CalDAV object attached to event dict") ics_data = obj.data cal = iCalendar.from_ical(ics_data) for component in cal.walk(): if component.name == "VEVENT": if "summary" in updates: component["summary"] = updates["summary"] if "description" in updates: component["description"] = updates["description"] if "location" in updates: component["location"] = updates["location"] if "dtstart" in updates: component["dtstart"] = updates["dtstart"] if "dtend" in updates: component["dtend"] = updates["dtend"] obj.data = cal.to_ical().decode("utf-8") obj.save() def _delete_caldav_event(event_dict): """Delete a CalDAV event.""" obj = event_dict.get("_caldav_obj") if obj: obj.delete() def _execute_move(intent: dict, dry_run: bool = False) -> dict: """Move an event to a new date/time.""" summary = intent.get("summary", "") new_start_str = intent.get("new_start") new_end_str = intent.get("new_end") if not summary: return {"status": "error", "message": "No event summary provided for move"} if not new_start_str: return {"status": "error", "message": "No new start time provided for move"} # Find the event event = _find_event(summary) if not event: who = intent.get("who", []) if who: events = _find_events_by_who(who) if len(events) == 1: event = events[0] elif len(events) > 1: return { "status": "ambiguous", "message": f"Found {len(events)} events matching {who}. Which one?", "events": [{"summary": e.get("summary"), "start": e["start"].get("dateTime", "")} for e in events], } if not event: return {"status": "not_found", "message": f"No upcoming event matching '{summary}'"} # Parse new times try: new_start = datetime.fromisoformat(new_start_str) if new_start.tzinfo is None: new_start = new_start.replace(tzinfo=CHICAGO_TZ) except (ValueError, TypeError): return {"status": "error", "message": f"Could not parse new start time: {new_start_str}"} if new_end_str: try: new_end = datetime.fromisoformat(new_end_str) if new_end.tzinfo is None: new_end = new_end.replace(tzinfo=CHICAGO_TZ) except (ValueError, TypeError): # Keep same duration old_start_str = event["start"].get("dateTime", "") old_end_str = event["end"].get("dateTime", "") if old_start_str and old_end_str: duration = datetime.fromisoformat(old_end_str) - datetime.fromisoformat(old_start_str) else: duration = timedelta(hours=1) new_end = new_start + duration else: old_start_str = event["start"].get("dateTime", "") old_end_str = event["end"].get("dateTime", "") if old_start_str and old_end_str: duration = datetime.fromisoformat(old_end_str) - datetime.fromisoformat(old_start_str) else: duration = timedelta(hours=1) new_end = new_start + duration if dry_run: return { "status": "DRY_RUN", "action": "move", "summary": event.get("summary"), "old_start": event["start"].get("dateTime"), "old_end": event["end"].get("dateTime"), "new_start": new_start.isoformat(), "new_end": new_end.isoformat(), } # Update the event via CalDAV _update_caldav_event(event, { "dtstart": new_start, "dtend": new_end, }) return { "status": "MOVED", "summary": event.get("summary"), "old_start": event["start"].get("dateTime", ""), "new_start": new_start.isoformat(), "new_end": new_end.isoformat(), "id": event.get("id"), "link": "", } def _execute_cancel(intent: dict, dry_run: bool = False) -> dict: """Cancel an event. Supports single-instance and series-wide cancellation.""" summary = intent.get("summary", "") cancel_scope = intent.get("cancel_scope", "instance") instance_date = intent.get("instance_date") if not summary: return {"status": "error", "message": "No event summary provided for cancel"} # --- Recurring event cancellation --- if cancel_scope == "series": event = _find_event(summary) if not event: return {"status": "not_found", "message": f"No upcoming event matching '{summary}'"} return cancel_recurring_series(event["id"], dry_run=dry_run) if instance_date: instance = find_recurring_instance(summary, instance_date) if not instance: return { "status": "not_found", "message": f"No '{summary}' instance found on {instance_date}", } return cancel_recurring_instance(instance["id"], instance_date, dry_run=dry_run) # --- Standard (non-recurring) cancellation --- event = _find_event(summary) if not event: who = intent.get("who", []) if who: events = _find_events_by_who(who) if len(events) == 1: event = events[0] elif len(events) > 1: return { "status": "ambiguous", "message": f"Found {len(events)} events matching {who}. Which one?", "events": [{"summary": e.get("summary"), "start": e["start"].get("dateTime", "")} for e in events], } if not event: return {"status": "not_found", "message": f"No upcoming event matching '{summary}'"} if dry_run: return { "status": "DRY_RUN", "action": "cancel", "summary": event.get("summary"), "start": event["start"].get("dateTime"), } _delete_caldav_event(event) return { "status": "CANCELLED", "summary": event.get("summary"), "start": event["start"].get("dateTime"), } def _execute_add(intent: dict, dry_run: bool = False) -> dict: """Add a new event (single or recurring).""" summary = intent.get("summary", "") start_str = intent.get("start") end_str = intent.get("end") duration = intent.get("duration_minutes", 60) who = intent.get("who", []) location = intent.get("location", "") description = intent.get("description", "") is_recurring = intent.get("is_recurring", False) recurrence = intent.get("recurrence") if is_recurring else None if not summary: return {"status": "error", "message": "No event summary provided"} if not start_str: return {"status": "error", "message": "No start time provided"} try: start_dt = datetime.fromisoformat(start_str) if start_dt.tzinfo is None: start_dt = start_dt.replace(tzinfo=CHICAGO_TZ) except (ValueError, TypeError): return {"status": "error", "message": f"Could not parse start time: {start_str}"} if end_str: try: end_dt = datetime.fromisoformat(end_str) if end_dt.tzinfo is None: end_dt = end_dt.replace(tzinfo=CHICAGO_TZ) except (ValueError, TypeError): end_dt = start_dt + timedelta(minutes=duration) else: end_dt = start_dt + timedelta(minutes=duration) # Append who to summary if who: who_str = ", ".join(who) if who_str not in summary: summary = f"{summary} ({who_str})" if dry_run: result = { "status": "DRY_RUN", "action": "add", "summary": summary, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "location": location, } if is_recurring and recurrence: from family_assistant.rrule_builder import build_rrule result["is_recurring"] = True result["recurrence"] = recurrence result["rrule"] = build_rrule(recurrence) return result if is_recurring and recurrence: from family_assistant.rrule_builder import validate_recurrence errors = validate_recurrence(recurrence) if errors: return {"status": "error", "message": f"Invalid recurrence: {'; '.join(errors)}"} event = create_recurring_event( summary=summary, start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=description, location=location, ) else: event = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=description, location=location, ) result = { "status": "CREATED", "summary": event.get("summary"), "start": event["start"].get("dateTime", start_dt.isoformat()), "end": event["end"].get("dateTime", end_dt.isoformat()), "id": event["id"], "link": event.get("htmlLink", ""), } if is_recurring: result["is_recurring"] = True result["recurrence"] = recurrence return result def _execute_remind(intent: dict, dry_run: bool = False) -> dict: """Create an all-day reminder event, prepended with [REMINDER].""" summary = intent.get("summary", "") date_str = intent.get("date", intent.get("start", "")) who = intent.get("who", []) description = intent.get("description", "") if not summary: return {"status": "error", "message": "No summary provided for reminder"} if not date_str: return {"status": "error", "message": "No date provided for reminder"} # Parse the date try: if "T" not in date_str: date_obj = datetime.strptime(date_str, "%Y-%m-%d").date() else: dt = datetime.fromisoformat(date_str) date_obj = dt.date() except (ValueError, TypeError): return {"status": "error", "message": f"Could not parse date: {date_str}"} # Prepend [REMINDER] to summary if not summary.startswith("[REMINDER]"): summary = f"[REMINDER] {summary}" # Append who to summary if who: who_str = ", ".join(who) if who_str not in summary: summary = f"{summary} ({who_str})" if dry_run: return { "status": "DRY_RUN", "action": "remind", "summary": summary, "date": date_obj.isoformat(), "description": description, } # Create all-day event via CalDAV (date string, no time) event = create_event( summary=summary, start_dt=date_obj.isoformat(), end_dt=date_obj.isoformat(), description=description, ) return { "status": "CREATED", "summary": event.get("summary"), "date": date_obj.isoformat(), "id": event["id"], "link": "", } def _execute_rename(intent: dict, dry_run: bool = False) -> dict: """Rename an event.""" summary = intent.get("summary", "") new_summary = intent.get("new_summary", "") if not summary or not new_summary: return {"status": "error", "message": "Need both current and new summary for rename"} event = _find_event(summary) if not event: return {"status": "not_found", "message": f"No upcoming event matching '{summary}'"} if dry_run: return { "status": "DRY_RUN", "action": "rename", "old_summary": event.get("summary"), "new_summary": new_summary, } _update_caldav_event(event, {"summary": new_summary}) return { "status": "RENAMED", "old_summary": summary, "new_summary": new_summary, "id": event.get("id"), } def _execute_question(intent: dict) -> dict: """Answer a question using hybrid Calendar + Family Brain RAG. Queries both CalDAV (structured data) and ChromaDB (unstructured newsletter/email context) to synthesize a complete answer. Calendar is the source of truth for temporal data (time/date/location). """ from family_assistant.family_brain import query as brain_query, _build_hybrid_prompt import requests as _req question_text = intent.get("question", intent.get("summary", "")) if not question_text: return {"status": "error", "message": "No question provided"} # --- Source 1: CalDAV (structured) --- calendar_context = _search_calendar_for_question(question_text) # --- Source 2: Family Brain / ChromaDB (unstructured) --- brain_chunks = brain_query(question_text, top_k=3) brain_has_relevant = brain_chunks and any(c["score"] >= 0.3 for c in brain_chunks) # If neither source has anything, bail if not calendar_context and not brain_has_relevant: return { "status": "ANSWERED", "question": question_text, "answer": "I don't see that in the emails I've processed.", "sources": [], "confidence": "low", } # --- Synthesize with LLM --- prompt = _build_hybrid_prompt(question_text, calendar_context, brain_chunks if brain_has_relevant else []) payload = { "model": LLM_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, "stream": False, } try: resp = _req.post(LLM_URL, json=payload, timeout=60) resp.raise_for_status() data = resp.json() choices = data.get("choices", []) if choices: answer_text = choices[0].get("message", {}).get("content", "").strip() elif "message" in data: answer_text = data["message"].get("content", "").strip() else: answer_text = data.get("response", "").strip() # Clean up artifacts answer_text = answer_text.replace("ANSWER:", "").replace("Answer:", "").strip() except Exception as e: answer_text = f"Error synthesizing answer: {e}" sources = [] if calendar_context: sources.append({"source": "calendar", "events": len(calendar_context)}) if brain_has_relevant: for c in brain_chunks: if c["score"] >= 0.3: sources.append({ "source": "brain", "subject": c["metadata"].get("subject"), "date": c["metadata"].get("date"), "score": round(c["score"], 3), }) confidence = "high" if calendar_context and brain_has_relevant: confidence = "high" elif calendar_context or brain_has_relevant: confidence = "medium" else: confidence = "low" return { "status": "ANSWERED", "question": question_text, "answer": answer_text, "sources": sources, "confidence": confidence, } def _search_calendar_for_question(question: str) -> list[dict]: """Search upcoming calendar events relevant to a question. Uses keyword matching from the question to find relevant events. Returns structured event data (summary, time, location, description). """ events = _caldav_search() # Extract keywords from the question (skip common words) skip_words = {"what", "where", "when", "is", "the", "a", "an", "do", "does", "need", "for", "to", "at", "in", "on", "about", "how", "who", "which", "can", "will", "be", "are", "was", "were", "has", "have", "had", "it", "that", "this", "from"} keywords = [w.lower() for w in question.split() if w.lower() not in skip_words and len(w) > 2] # Also check family member names family_config = load_family_config() family_names = set() family_members = family_config.get("family", {}).get("members", []) for member in family_members: name = member.get("name", "") family_names.add(name.lower()) for nick in member.get("nicknames", []): family_names.add(nick.lower()) matched = [] for event in events: summary = event.get("summary", "") desc = event.get("description", "") location = event.get("location", "") combined = f"{summary} {desc} {location}".lower() # Score: keyword matches + family name matches score = 0 for kw in keywords: if kw in combined: score += 1 for name in family_names: if name in combined or name in question.lower(): if name in combined: score += 1 if score > 0: start = event["start"].get("dateTime", event["start"].get("date", "")) end = event["end"].get("dateTime", event["end"].get("date", "")) matched.append({ "summary": summary, "start": start, "end": end, "location": location, "description": desc[:300] if desc else "", "relevance_score": score, }) # Sort by relevance, then by date matched.sort(key=lambda e: (-e["relevance_score"], e["start"])) return matched[:5]