📄 conflict_engine.py 14,969 bytes Apr 19, 2026 📋 Raw

"""Conflict detection, resolution, and response handler."""

import json
import re
import sys
from datetime import datetime, timedelta

import requests

from family_assistant.config import (
LLM_URL,
LLM_MODEL,
LLM_RESOLVE_TIMEOUT,
CHICAGO_TZ,
MIN_OVERLAP_MINUTES,
load_prompts,
)
from family_assistant.calendar_sync import (
get_calendar_service,
_event_to_dict,
_find_event_by_summary_and_time,
_update_event_description,
_delete_event,
create_event,
)

---------------------------------------------------------------------------

Conflict Detection

---------------------------------------------------------------------------

def detect_conflicts(events=None, hours=168):
"""Detect scheduling conflicts in the calendar.

If `events` is provided (list of dicts with summary/start/end keys), check each
against existing calendar events.
If `events` is None, scan the next `hours` for ALL conflicts.

Two events conflict if their time ranges overlap (start1 < end2 AND start2 < end1),
ignoring trivial overlaps of less than MIN_OVERLAP_MINUTES minutes.

Returns a list of conflict pairs.
"""

def _parse_gcal_dt(dt_str):
    """Parse a Google Calendar datetime string to a timezone-aware datetime."""
    if not dt_str:
        return None
    try:
        dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
        return dt.astimezone(CHICAGO_TZ)
    except (ValueError, TypeError):
        try:
            dt = datetime.strptime(dt_str, "%Y-%m-%d")
            return dt.replace(tzinfo=CHICAGO_TZ)
        except (ValueError, TypeError):
            return None

def _overlap_minutes(s1, e1, s2, e2):
    """Calculate overlap in minutes between two time ranges."""
    latest_start = max(s1, s2)
    earliest_end = min(e1, e2)
    delta = (earliest_end - latest_start).total_seconds() / 60
    return max(0, delta)

calendar = get_calendar_service()
now = datetime.now(CHICAGO_TZ)
window_end = now + timedelta(hours=hours)

# Fetch all events in the window via CalDAV
try:
    event_objs = calendar.date_search(start=now, end=window_end)
except Exception:
    event_objs = calendar.events()

existing = []
for e_obj in event_objs:
    e = _event_to_dict(e_obj)
    # Skip all-day events — they're notes/reminders, not time commitments
    if e.get("start", {}).get("date") and not e.get("start", {}).get("dateTime"):
        continue
    s = _parse_gcal_dt(e["start"].get("dateTime", e["start"].get("date", "")))
    en = _parse_gcal_dt(e["end"].get("dateTime", e["end"].get("date", "")))
    if s and en:
        existing.append({
            "id": e.get("id", ""),
            "summary": e.get("summary", "(no title)"),
            "start": s,
            "end": en,
        })

# Build the full event list to check
if events is not None:
    # Normalize provided events (may have ISO strings or datetime objects)
    new_events = []
    for ev in events:
        s_raw = ev.get("start")
        e_raw = ev.get("end")
        s = s_raw if isinstance(s_raw, datetime) else _parse_gcal_dt(str(s_raw)) if s_raw else None
        en = e_raw if isinstance(e_raw, datetime) else _parse_gcal_dt(str(e_raw)) if e_raw else None
        if s and en:
            new_events.append({
                "id": ev.get("id", "new"),
                "summary": ev.get("summary", "(new event)"),
                "start": s,
                "end": en,
            })
    # Check each new event against existing calendar events
    to_check = new_events
    against = existing
else:
    # Check ALL events against each other
    to_check = existing
    against = existing

conflicts = []
seen_pairs = set()
for i, ev1 in enumerate(to_check):
    for j, ev2 in enumerate(against):
        # Skip self-comparison
        if events is None and i == j:
            continue
        # Skip if same event by ID
        if ev1.get("id") and ev2.get("id") and ev1["id"] == ev2["id"]:
            continue
        # Deduplicate pairs (always compare in sorted order)
        pair_key = tuple(sorted([ev1.get("id", str(i)), ev2.get("id", str(j))]))
        if pair_key in seen_pairs:
            continue
        seen_pairs.add(pair_key)

        overlap = _overlap_minutes(ev1["start"], ev1["end"], ev2["start"], ev2["end"])
        if overlap >= MIN_OVERLAP_MINUTES:
            conflicts.append({
                "event1": {
                    "id": ev1.get("id", ""),
                    "summary": ev1["summary"],
                    "start": ev1["start"].isoformat(),
                    "end": ev1["end"].isoformat(),
                },
                "event2": {
                    "id": ev2.get("id", ""),
                    "summary": ev2["summary"],
                    "start": ev2["start"].isoformat(),
                    "end": ev2["end"].isoformat(),
                },
                "overlap_minutes": round(overlap),
            })

return conflicts

---------------------------------------------------------------------------

Conflict Resolution

---------------------------------------------------------------------------

def _call_llm_resolve(system, user, temperature=0.3):
"""Send a chat completion request to the LLM for conflict resolution."""
payload = {
"model": LLM_MODEL,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"temperature": temperature,
}
try:
resp = requests.post(LLM_URL, json=payload, timeout=LLM_RESOLVE_TIMEOUT)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"].strip()
except requests.exceptions.Timeout:
print(f" [LLM] Resolution timeout after {LLM_RESOLVE_TIMEOUT}s", file=sys.stderr)
return None
except Exception as e:
print(f" [LLM] Resolution error: {e}", file=sys.stderr)
return None

def resolve_conflict(conflict):
"""Use the LLM to generate resolution suggestions for a scheduling conflict.

Takes a conflict dict (from detect_conflicts) and returns resolution options
with a human-friendly message the family can act on.
"""
prompts = load_prompts()
resolution_prompt = prompts["conflict_resolve"]

e1 = conflict["event1"]
e2 = conflict["event2"]

user_msg = json.dumps({
    "conflict": {
        "event1": {
            "summary": e1["summary"],
            "start": e1["start"],
            "end": e1["end"],
        },
        "event2": {
            "summary": e2["summary"],
            "start": e2["start"],
            "end": e2["end"],
        },
        "overlap_minutes": conflict["overlap_minutes"],
    }
}, indent=2)

raw = _call_llm_resolve(resolution_prompt, user_msg)
if not raw:
    return {
        "conflict_summary": f"{e1['summary']} conflicts with {e2['summary']}",
        "message": f"⚠️ Schedule conflict: {e1['summary']} ({e1['start']}) overlaps with {e2['summary']} ({e2['start']}) by {conflict['overlap_minutes']}min. Please review.",
        "options": [],
        "error": "LLM resolution failed",
    }

# Parse JSON from LLM response
text = raw.strip()
if text.startswith("```"):
    text = re.sub(r'^```(?:json)?\s*\n?', '', text)
    text = re.sub(r'\n?```\s*$', '', text)
    text = text.strip()
try:
    parsed = json.loads(text)
except json.JSONDecodeError:
    match = re.search(r'\{.*\}', text, re.DOTALL)
    if match:
        try:
            parsed = json.loads(match.group(0))
        except json.JSONDecodeError:
            parsed = None
    else:
        parsed = None

if not parsed or not isinstance(parsed, dict):
    return {
        "conflict_summary": f"{e1['summary']} conflicts with {e2['summary']}",
        "message": f"⚠️ Schedule conflict: {e1['summary']} ({e1['start']}) overlaps with {e2['summary']} ({e2['start']}) by {conflict['overlap_minutes']}min. Please review.",
        "options": [],
        "error": "Could not parse LLM resolution response",
        "raw": text,
    }

return parsed

def resolve_all_conflicts(hours=168):
"""Detect all conflicts in the next N hours and generate resolution suggestions.

Returns a list of dicts, each with the conflict and its resolution options.
"""
conflicts = detect_conflicts(hours=hours)
if not conflicts:
    return []

resolutions = []
for conflict in conflicts:
    resolution = resolve_conflict(conflict)
    resolutions.append({
        "conflict": conflict,
        "resolution": resolution,
    })

return resolutions

---------------------------------------------------------------------------

Response Handler — Execute a chosen resolution option

---------------------------------------------------------------------------

def execute_resolution(conflict, resolution, option_index, dry_run=False):
"""Execute a chosen resolution option for a scheduling conflict.

Args:
    conflict: the conflict dict from detect_conflicts
    resolution: the resolution dict from resolve_conflict
    option_index: 1-based index into resolution["options"]
    dry_run: if True, report what would happen without making changes

Returns a dict with the action taken and results.
"""
options = resolution.get("options", [])
if not options or option_index < 1 or option_index > len(options):
    return {
        "status": "ERROR",
        "message": f"Invalid option {option_index}. Available: 1-{len(options)}",
    }

chosen = options[option_index - 1]
action = chosen.get("action", "").lower()
affects = chosen.get("affects_event", "")
affects_who = chosen.get("affects_who", "")
description = chosen.get("description", "")

# Identify the events involved
event1_data = conflict.get("event1", {})
event2_data = conflict.get("event2", {})
priority = resolution.get("priority_event", "event1")

# The event to modify/cancel is the one specified by affects_event,
# or if not specified, the non-priority event
if affects == "event2":
    target_data, keep_data = event2_data, event1_data
elif affects == "event1":
    target_data, keep_data = event1_data, event2_data
elif priority == "event1":
    target_data, keep_data = event2_data, event1_data
else:
    target_data, keep_data = event1_data, event2_data

result = {
    "action": action,
    "chosen_option": chosen,
    "target_event": target_data.get("summary", ""),
    "kept_event": keep_data.get("summary", ""),
}

# Find the actual calendar events
target_event = _find_event_by_summary_and_time(
    target_data.get("summary", ""), target_data.get("start", "")
)
keep_event = _find_event_by_summary_and_time(
    keep_data.get("summary", ""), keep_data.get("start", "")
)

if not target_event:
    result["status"] = "ERROR"
    result["message"] = f"Could not find calendar event: {target_data.get('summary', '')}"
    return result

if dry_run:
    result["status"] = "DRY_RUN"
    result["message"] = f"Would {action}: {description}"
    return result

# ---- SPLIT: Both events happen, parents divide responsibilities ----
if action == "split":
    # Update both events with who's handling them
    if keep_event and affects_who:
        _update_event_description(
            keep_event["id"],
            f"📋 {affects_who} handling this (conflict resolved via split)",
        )
    _update_event_description(
        target_event["id"],
        f"📋 {affects_who} handling this (conflict resolved via split)",
    )
    result["status"] = "SPLIT"
    result["message"] = (
        f"Both events happening. {affects_who} is handling {target_data.get('summary', '')}. "
        f"Updated both calendar events."
    )
    return result

# ---- REASSIGN: Same event, different adult responsible ----
elif action == "reassign":
    if affects_who:
        _update_event_description(
            target_event["id"],
            f"📋 Reassigned: {affects_who} now handling this",
        )
    result["status"] = "REASSIGNED"
    result["message"] = (
        f"{affects_who} is now handling {target_data.get('summary', '')}. "
        f"Calendar updated."
    )
    return result

# ---- RESCHEDULE: Remove lower-priority event + create rebook reminder ----
elif action == "reschedule":
    target_summary = target_event.get("summary", "")
    target_start = target_event["start"].get("dateTime", "")
    target_location = target_event.get("location", "")
    target_id = target_event["id"]

    # Delete the conflicting event
    _delete_event(target_id)

    # Create a reminder to rebook
    reminder_summary = f"📞 Rebook: {target_summary}"
    reminder_desc = (
        f"This event was removed due to a scheduling conflict.\n"
        f"Original: {target_summary} at {target_start}\n"
        f"Location: {target_location}\n"
        f"Action needed: Call to reschedule."
    )
    # Schedule the reminder for today (or next business day) at 9 AM
    reminder_dt = datetime.now(CHICAGO_TZ).replace(
        hour=9, minute=0, second=0, microsecond=0
    )
    if reminder_dt <= datetime.now(CHICAGO_TZ):
        reminder_dt += timedelta(days=1)
    reminder_end = reminder_dt + timedelta(minutes=30)

    reminder_event = create_event(
        summary=reminder_summary,
        start_dt=reminder_dt,
        end_dt=reminder_end,
        description=reminder_desc,
    )

    result["status"] = "RESCHEDULED"
    result["deleted_event_id"] = target_id
    result["reminder_event_id"] = reminder_event["id"]
    result["message"] = (
        f"Removed {target_summary} from calendar. "
        f"Created rebook reminder for {reminder_dt.strftime('%A %b %d at %-I:%M %p')}. "
        f"You'll need to call to reschedule."
    )
    return result

# ---- Unknown action ----
else:
    result["status"] = "ERROR"
    result["message"] = f"Unknown action: {action}. Valid: split, reassign, reschedule."
    return result