📄 intent_engine.py 25,592 bytes Apr 18, 2026 📋 Raw

"""Intent Engine — Parse natural-language calendar mutations from chat messages.

Routes user messages through the local LLM to extract structured intents
(move, cancel, add, rename, reject), then executes them against Radicale CalDAV.

This is the chat interface for the family calendar — the "Aundrea correction" flow.
"""

import json
import re
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

from family_assistant.config import (
CHICAGO_TZ,
LLM_MODEL,
LLM_URL,
load_family_config,
)
from family_assistant.calendar_sync import (
create_event,
create_recurring_event,
event_exists,
get_calendar_service,
cancel_recurring_instance,
cancel_recurring_series,
find_recurring_instance,
_event_to_dict,
_build_ics,
)

Load prompt template

import os

PROMPT_DIR = os.path.join(os.path.dirname(file), "prompts")
INTENT_PROMPT_PATH = os.path.join(PROMPT_DIR, "chat_intent.txt")

def _load_prompt():
with open(INTENT_PROMPT_PATH) as f:
return f.read()

def _call_llm(prompt: str) -> str:
"""Call the local LLM and return the response text."""
import requests

payload = {
    "model": LLM_MODEL,
    "messages": [{"role": "user", "content": prompt}],
    "temperature": 0.1,
    "stream": False,
}

resp = requests.post(LLM_URL, json=payload, timeout=60)
resp.raise_for_status()
data = resp.json()

# Handle both OpenAI and Ollama response formats
choices = data.get("choices", [])
if choices:
    return choices[0].get("message", {}).get("content", "").strip()

# Ollama format
if "message" in data:
    return data["message"].get("content", "").strip()

return data.get("response", "").strip()

def _build_prompt(message: str) -> str:
"""Build the intent extraction prompt with today's date and family config."""
today = datetime.now(CHICAGO_TZ).strftime("%A, %B %d, %Y")
family = load_family_config()

# Family members list
family_data = family.get("family", {})
members_list = family_data.get("members", []) if isinstance(family_data, dict) else family_data
members = []
nickname_rules = []
for member in members_list:
    name = member.get("name", "") if isinstance(member, dict) else str(member)
    members.append(name)
    if isinstance(member, dict):
        for nick in member.get("nicknames", []):
            nickname_rules.append(f'"{nick}" → "{name}"')

template = _load_prompt()
prompt = template.replace("{today}", today)
prompt = prompt.replace("{family_members}", ", ".join(members))
prompt = prompt.replace("{nickname_rules}", "\n".join(nickname_rules))
prompt += f"\n\nUser message: {message}"

return prompt

def parse_intent(message: str) -> dict:
"""Parse a natural-language message into a structured calendar intent.

Returns a dict with at least a 'type' key:
  - move: change date/time of an event
  - cancel: remove an event
  - add: create a new event
  - rename: change an event title
  - reject: add a rejection rule
  - none: not a calendar-related message
"""
prompt = _build_prompt(message)
raw = _call_llm(prompt)

# Strip markdown code fences if present
raw = re.sub(r"^```(?:json)?\s*", "", raw)
raw = re.sub(r"\s*```$", "", raw)

try:
    intent = json.loads(raw)
except json.JSONDecodeError:
    # Try to extract JSON from the response
    match = re.search(r"\{.*\}", raw, re.DOTALL)
    if match:
        try:
            intent = json.loads(match.group())
        except json.JSONDecodeError:
            return {"type": "error", "raw": raw, "error": "Could not parse LLM response as JSON"}
    else:
        return {"type": "error", "raw": raw, "error": "No JSON found in LLM response"}

return intent

def _caldav_search(start=None, end=None):
"""Search CalDAV events in a date range. Returns list of event dicts."""
calendar = get_calendar_service()
now = start or datetime.now(timezone.utc)
end_time = end or (now + timedelta(days=90))

try:
    event_objs = calendar.date_search(start=now, end=end_time)
except Exception:
    event_objs = calendar.events()

results = []
for obj in event_objs:
    d = _event_to_dict(obj)
    # Attach the raw caldav object for mutation operations
    d["_caldav_obj"] = obj
    results.append(d)
return results

def _find_event(summary: str):
"""Find a calendar event matching the given summary (fuzzy match).

Returns the event dict or None.
"""
events = _caldav_search()

# Exact match first
summary_lower = summary.lower().strip()
for event in events:
    if event.get("summary", "").lower() == summary_lower:
        return event
    # Also match without parenthetical who
    event_summary = event.get("summary", "")
    if event_summary.lower().startswith(summary_lower + " ("):
        return event

# Partial match
for event in events:
    if summary_lower in event.get("summary", "").lower():
        return event

# Base-name match: strip parenthetical who and trailing filler words
def _base_name(s):
    s = s.lower().strip()
    s = re.sub(r'\s*\(.*\)\s*$', '', s)
    s = re.sub(r'\s+(session|appointment|class|lesson|visit|meeting)$', '', s)
    return s.strip()

summary_base = _base_name(summary)
for event in events:
    event_base = _base_name(event.get("summary", ""))
    if summary_base and event_base and (summary_base in event_base or event_base in summary_base):
        return event

return None

def _find_events_by_who(who: list) -> list:
"""Find upcoming events matching any of the given family members."""
events = _caldav_search()
matched = []
for event in events:
event_summary = event.get("summary", "")
for name in who:
if name.lower() in event_summary.lower():
matched.append(event)
break

return matched

def execute_intent(intent: dict, dry_run: bool = False) -> dict:
"""Execute a parsed intent against the calendar or Family Brain.

Returns a result dict with status and details.
"""
intent_type = intent.get("type", "none")

if intent_type == "none":
    return {"status": "ignored", "message": "Not a calendar intent"}

if intent_type == "chatter":
    return {"status": "chatter", "message": "Acknowledgment — no action needed"}

if intent_type == "question":
    return _execute_question(intent)

if intent_type == "error":
    return {"status": "error", "message": intent.get("error", "Unknown error"), "raw": intent.get("raw")}

if intent_type == "move":
    return _execute_move(intent, dry_run)

if intent_type == "cancel":
    return _execute_cancel(intent, dry_run)

if intent_type == "add":
    return _execute_add(intent, dry_run)

if intent_type == "rename":
    return _execute_rename(intent, dry_run)

if intent_type == "reject":
    # Delegate to the rejection engine
    from family_assistant.rejection_engine import reject_event
    summary = intent.get("summary", "")
    reason = intent.get("reason", "")
    scope = intent.get("scope", "event")
    result = reject_event(summary, message=reason, scope=scope, delete_from_calendar=not dry_run)
    return {"status": "rejected", "summary": summary, "scope": scope, "details": result}

if intent_type == "remind":
    return _execute_remind(intent, dry_run)

return {"status": "unknown", "type": intent_type}

def _update_caldav_event(event_dict, updates: dict):
"""Update a CalDAV event with new field values.

Args:
    event_dict: Event dict with '_caldav_obj' attached
    updates: Dict of fields to update (summary, start, end, description, location)
"""
from icalendar import Calendar as iCalendar

obj = event_dict.get("_caldav_obj")
if not obj:
    raise ValueError("No CalDAV object attached to event dict")

ics_data = obj.data
cal = iCalendar.from_ical(ics_data)
for component in cal.walk():
    if component.name == "VEVENT":
        if "summary" in updates:
            component["summary"] = updates["summary"]
        if "description" in updates:
            component["description"] = updates["description"]
        if "location" in updates:
            component["location"] = updates["location"]
        if "dtstart" in updates:
            component["dtstart"] = updates["dtstart"]
        if "dtend" in updates:
            component["dtend"] = updates["dtend"]

obj.data = cal.to_ical().decode("utf-8")
obj.save()

def _delete_caldav_event(event_dict):
"""Delete a CalDAV event."""
obj = event_dict.get("_caldav_obj")
if obj:
obj.delete()

def _execute_move(intent: dict, dry_run: bool = False) -> dict:
"""Move an event to a new date/time."""
summary = intent.get("summary", "")
new_start_str = intent.get("new_start")
new_end_str = intent.get("new_end")

if not summary:
    return {"status": "error", "message": "No event summary provided for move"}

if not new_start_str:
    return {"status": "error", "message": "No new start time provided for move"}

# Find the event
event = _find_event(summary)
if not event:
    who = intent.get("who", [])
    if who:
        events = _find_events_by_who(who)
        if len(events) == 1:
            event = events[0]
        elif len(events) > 1:
            return {
                "status": "ambiguous",
                "message": f"Found {len(events)} events matching {who}. Which one?",
                "events": [{"summary": e.get("summary"), "start": e["start"].get("dateTime", "")} for e in events],
            }
    if not event:
        return {"status": "not_found", "message": f"No upcoming event matching '{summary}'"}

# Parse new times
try:
    new_start = datetime.fromisoformat(new_start_str)
    if new_start.tzinfo is None:
        new_start = new_start.replace(tzinfo=CHICAGO_TZ)
except (ValueError, TypeError):
    return {"status": "error", "message": f"Could not parse new start time: {new_start_str}"}

if new_end_str:
    try:
        new_end = datetime.fromisoformat(new_end_str)
        if new_end.tzinfo is None:
            new_end = new_end.replace(tzinfo=CHICAGO_TZ)
    except (ValueError, TypeError):
        # Keep same duration
        old_start_str = event["start"].get("dateTime", "")
        old_end_str = event["end"].get("dateTime", "")
        if old_start_str and old_end_str:
            duration = datetime.fromisoformat(old_end_str) - datetime.fromisoformat(old_start_str)
        else:
            duration = timedelta(hours=1)
        new_end = new_start + duration
else:
    old_start_str = event["start"].get("dateTime", "")
    old_end_str = event["end"].get("dateTime", "")
    if old_start_str and old_end_str:
        duration = datetime.fromisoformat(old_end_str) - datetime.fromisoformat(old_start_str)
    else:
        duration = timedelta(hours=1)
    new_end = new_start + duration

if dry_run:
    return {
        "status": "DRY_RUN",
        "action": "move",
        "summary": event.get("summary"),
        "old_start": event["start"].get("dateTime"),
        "old_end": event["end"].get("dateTime"),
        "new_start": new_start.isoformat(),
        "new_end": new_end.isoformat(),
    }

# Update the event via CalDAV
_update_caldav_event(event, {
    "dtstart": new_start,
    "dtend": new_end,
})

return {
    "status": "MOVED",
    "summary": event.get("summary"),
    "old_start": event["start"].get("dateTime", ""),
    "new_start": new_start.isoformat(),
    "new_end": new_end.isoformat(),
    "id": event.get("id"),
    "link": "",
}

def _execute_cancel(intent: dict, dry_run: bool = False) -> dict:
"""Cancel an event. Supports single-instance and series-wide cancellation."""
summary = intent.get("summary", "")
cancel_scope = intent.get("cancel_scope", "instance")
instance_date = intent.get("instance_date")

if not summary:
    return {"status": "error", "message": "No event summary provided for cancel"}

# --- Recurring event cancellation ---
if cancel_scope == "series":
    event = _find_event(summary)
    if not event:
        return {"status": "not_found", "message": f"No upcoming event matching '{summary}'"}

    return cancel_recurring_series(event["id"], dry_run=dry_run)

if instance_date:
    instance = find_recurring_instance(summary, instance_date)
    if not instance:
        return {
            "status": "not_found",
            "message": f"No '{summary}' instance found on {instance_date}",
        }

    return cancel_recurring_instance(instance["id"], instance_date, dry_run=dry_run)

# --- Standard (non-recurring) cancellation ---
event = _find_event(summary)
if not event:
    who = intent.get("who", [])
    if who:
        events = _find_events_by_who(who)
        if len(events) == 1:
            event = events[0]
        elif len(events) > 1:
            return {
                "status": "ambiguous",
                "message": f"Found {len(events)} events matching {who}. Which one?",
                "events": [{"summary": e.get("summary"), "start": e["start"].get("dateTime", "")} for e in events],
            }
    if not event:
        return {"status": "not_found", "message": f"No upcoming event matching '{summary}'"}

if dry_run:
    return {
        "status": "DRY_RUN",
        "action": "cancel",
        "summary": event.get("summary"),
        "start": event["start"].get("dateTime"),
    }

_delete_caldav_event(event)

return {
    "status": "CANCELLED",
    "summary": event.get("summary"),
    "start": event["start"].get("dateTime"),
}

def _execute_add(intent: dict, dry_run: bool = False) -> dict:
"""Add a new event (single or recurring)."""
summary = intent.get("summary", "")
start_str = intent.get("start")
end_str = intent.get("end")
duration = intent.get("duration_minutes", 60)
who = intent.get("who", [])
location = intent.get("location", "")
description = intent.get("description", "")
is_recurring = intent.get("is_recurring", False)
recurrence = intent.get("recurrence") if is_recurring else None

if not summary:
    return {"status": "error", "message": "No event summary provided"}
if not start_str:
    return {"status": "error", "message": "No start time provided"}

try:
    start_dt = datetime.fromisoformat(start_str)
    if start_dt.tzinfo is None:
        start_dt = start_dt.replace(tzinfo=CHICAGO_TZ)
except (ValueError, TypeError):
    return {"status": "error", "message": f"Could not parse start time: {start_str}"}

if end_str:
    try:
        end_dt = datetime.fromisoformat(end_str)
        if end_dt.tzinfo is None:
            end_dt = end_dt.replace(tzinfo=CHICAGO_TZ)
    except (ValueError, TypeError):
        end_dt = start_dt + timedelta(minutes=duration)
else:
    end_dt = start_dt + timedelta(minutes=duration)

# Append who to summary
if who:
    who_str = ", ".join(who)
    if who_str not in summary:
        summary = f"{summary} ({who_str})"

if dry_run:
    result = {
        "status": "DRY_RUN",
        "action": "add",
        "summary": summary,
        "start": start_dt.isoformat(),
        "end": end_dt.isoformat(),
        "location": location,
    }
    if is_recurring and recurrence:
        from family_assistant.rrule_builder import build_rrule
        result["is_recurring"] = True
        result["recurrence"] = recurrence
        result["rrule"] = build_rrule(recurrence)
    return result

if is_recurring and recurrence:
    from family_assistant.rrule_builder import validate_recurrence
    errors = validate_recurrence(recurrence)
    if errors:
        return {"status": "error", "message": f"Invalid recurrence: {'; '.join(errors)}"}
    event = create_recurring_event(
        summary=summary,
        start_dt=start_dt,
        end_dt=end_dt,
        recurrence=recurrence,
        description=description,
        location=location,
    )
else:
    event = create_event(
        summary=summary,
        start_dt=start_dt,
        end_dt=end_dt,
        description=description,
        location=location,
    )

result = {
    "status": "CREATED",
    "summary": event.get("summary"),
    "start": event["start"].get("dateTime", start_dt.isoformat()),
    "end": event["end"].get("dateTime", end_dt.isoformat()),
    "id": event["id"],
    "link": event.get("htmlLink", ""),
}
if is_recurring:
    result["is_recurring"] = True
    result["recurrence"] = recurrence
return result

def _execute_remind(intent: dict, dry_run: bool = False) -> dict:
"""Create an all-day reminder event, prepended with [REMINDER]."""
summary = intent.get("summary", "")
date_str = intent.get("date", intent.get("start", ""))
who = intent.get("who", [])
description = intent.get("description", "")

if not summary:
    return {"status": "error", "message": "No summary provided for reminder"}
if not date_str:
    return {"status": "error", "message": "No date provided for reminder"}

# Parse the date
try:
    if "T" not in date_str:
        date_obj = datetime.strptime(date_str, "%Y-%m-%d").date()
    else:
        dt = datetime.fromisoformat(date_str)
        date_obj = dt.date()
except (ValueError, TypeError):
    return {"status": "error", "message": f"Could not parse date: {date_str}"}

# Prepend [REMINDER] to summary
if not summary.startswith("[REMINDER]"):
    summary = f"[REMINDER] {summary}"

# Append who to summary
if who:
    who_str = ", ".join(who)
    if who_str not in summary:
        summary = f"{summary} ({who_str})"

if dry_run:
    return {
        "status": "DRY_RUN",
        "action": "remind",
        "summary": summary,
        "date": date_obj.isoformat(),
        "description": description,
    }

# Create all-day event via CalDAV (date string, no time)
event = create_event(
    summary=summary,
    start_dt=date_obj.isoformat(),
    end_dt=date_obj.isoformat(),
    description=description,
)

return {
    "status": "CREATED",
    "summary": event.get("summary"),
    "date": date_obj.isoformat(),
    "id": event["id"],
    "link": "",
}

def _execute_rename(intent: dict, dry_run: bool = False) -> dict:
"""Rename an event."""
summary = intent.get("summary", "")
new_summary = intent.get("new_summary", "")

if not summary or not new_summary:
    return {"status": "error", "message": "Need both current and new summary for rename"}

event = _find_event(summary)
if not event:
    return {"status": "not_found", "message": f"No upcoming event matching '{summary}'"}

if dry_run:
    return {
        "status": "DRY_RUN",
        "action": "rename",
        "old_summary": event.get("summary"),
        "new_summary": new_summary,
    }

_update_caldav_event(event, {"summary": new_summary})

return {
    "status": "RENAMED",
    "old_summary": summary,
    "new_summary": new_summary,
    "id": event.get("id"),
}

def _execute_question(intent: dict) -> dict:
"""Answer a question using hybrid Calendar + Family Brain RAG.

Queries both CalDAV (structured data) and ChromaDB (unstructured
newsletter/email context) to synthesize a complete answer.

Calendar is the source of truth for temporal data (time/date/location).
"""
from family_assistant.family_brain import query as brain_query, _build_hybrid_prompt
import requests as _req

question_text = intent.get("question", intent.get("summary", ""))
if not question_text:
    return {"status": "error", "message": "No question provided"}

# --- Source 1: CalDAV (structured) ---
calendar_context = _search_calendar_for_question(question_text)

# --- Source 2: Family Brain / ChromaDB (unstructured) ---
brain_chunks = brain_query(question_text, top_k=3)
brain_has_relevant = brain_chunks and any(c["score"] >= 0.3 for c in brain_chunks)

# If neither source has anything, bail
if not calendar_context and not brain_has_relevant:
    return {
        "status": "ANSWERED",
        "question": question_text,
        "answer": "I don't see that in the emails I've processed.",
        "sources": [],
        "confidence": "low",
    }

# --- Synthesize with LLM ---
prompt = _build_hybrid_prompt(question_text, calendar_context, brain_chunks if brain_has_relevant else [])
payload = {
    "model": LLM_MODEL,
    "messages": [{"role": "user", "content": prompt}],
    "temperature": 0.3,
    "stream": False,
}

try:
    resp = _req.post(LLM_URL, json=payload, timeout=60)
    resp.raise_for_status()
    data = resp.json()

    choices = data.get("choices", [])
    if choices:
        answer_text = choices[0].get("message", {}).get("content", "").strip()
    elif "message" in data:
        answer_text = data["message"].get("content", "").strip()
    else:
        answer_text = data.get("response", "").strip()

    # Clean up artifacts
    answer_text = answer_text.replace("ANSWER:", "").replace("Answer:", "").strip()
except Exception as e:
    answer_text = f"Error synthesizing answer: {e}"

sources = []
if calendar_context:
    sources.append({"source": "calendar", "events": len(calendar_context)})
if brain_has_relevant:
    for c in brain_chunks:
        if c["score"] >= 0.3:
            sources.append({
                "source": "brain",
                "subject": c["metadata"].get("subject"),
                "date": c["metadata"].get("date"),
                "score": round(c["score"], 3),
            })

confidence = "high"
if calendar_context and brain_has_relevant:
    confidence = "high"
elif calendar_context or brain_has_relevant:
    confidence = "medium"
else:
    confidence = "low"

return {
    "status": "ANSWERED",
    "question": question_text,
    "answer": answer_text,
    "sources": sources,
    "confidence": confidence,
}

def _search_calendar_for_question(question: str) -> list[dict]:
"""Search upcoming calendar events relevant to a question.

Uses keyword matching from the question to find relevant events.
Returns structured event data (summary, time, location, description).
"""
events = _caldav_search()

# Extract keywords from the question (skip common words)
skip_words = {"what", "where", "when", "is", "the", "a", "an", "do", "does",
              "need", "for", "to", "at", "in", "on", "about", "how",
              "who", "which", "can", "will", "be", "are", "was", "were",
              "has", "have", "had", "it", "that", "this", "from"}
keywords = [w.lower() for w in question.split() if w.lower() not in skip_words and len(w) > 2]

# Also check family member names
family_config = load_family_config()
family_names = set()
family_members = family_config.get("family", {}).get("members", [])
for member in family_members:
    name = member.get("name", "")
    family_names.add(name.lower())
    for nick in member.get("nicknames", []):
        family_names.add(nick.lower())

matched = []
for event in events:
    summary = event.get("summary", "")
    desc = event.get("description", "")
    location = event.get("location", "")
    combined = f"{summary} {desc} {location}".lower()

    # Score: keyword matches + family name matches
    score = 0
    for kw in keywords:
        if kw in combined:
            score += 1
    for name in family_names:
        if name in combined or name in question.lower():
            if name in combined:
                score += 1

    if score > 0:
        start = event["start"].get("dateTime", event["start"].get("date", ""))
        end = event["end"].get("dateTime", event["end"].get("date", ""))
        matched.append({
            "summary": summary,
            "start": start,
            "end": end,
            "location": location,
            "description": desc[:300] if desc else "",
            "relevance_score": score,
        })

# Sort by relevance, then by date
matched.sort(key=lambda e: (-e["relevance_score"], e["start"]))
return matched[:5]