"""Conflict detection, resolution, and response handler."""
import json
import re
import sys
from datetime import datetime, timedelta
import requests
from icarus.core.config import (
LLM_URL,
LLM_MODEL,
LLM_RESOLVE_TIMEOUT,
CHICAGO_TZ,
MIN_OVERLAP_MINUTES,
load_prompts,
)
from icarus.core.calendar_sync import (
get_calendar_service,
_event_to_dict,
_find_event_by_summary_and_time,
_update_event_description,
_delete_event,
create_event,
)
---------------------------------------------------------------------------
Conflict Detection
---------------------------------------------------------------------------
def detect_conflicts(events=None, hours=168):
"""Detect scheduling conflicts in the calendar.
If `events` is provided (list of dicts with summary/start/end keys), check each
against existing calendar events.
If `events` is None, scan the next `hours` for ALL conflicts.
Two events conflict if their time ranges overlap (start1 < end2 AND start2 < end1),
ignoring trivial overlaps of less than MIN_OVERLAP_MINUTES minutes.
Returns a list of conflict pairs.
"""
def _parse_gcal_dt(dt_str):
"""Parse a Google Calendar datetime string to a timezone-aware datetime."""
if not dt_str:
return None
try:
dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
return dt.astimezone(CHICAGO_TZ)
except (ValueError, TypeError):
try:
dt = datetime.strptime(dt_str, "%Y-%m-%d")
return dt.replace(tzinfo=CHICAGO_TZ)
except (ValueError, TypeError):
return None
def _overlap_minutes(s1, e1, s2, e2):
"""Calculate overlap in minutes between two time ranges."""
latest_start = max(s1, s2)
earliest_end = min(e1, e2)
delta = (earliest_end - latest_start).total_seconds() / 60
return max(0, delta)
calendar = get_calendar_service()
now = datetime.now(CHICAGO_TZ)
window_end = now + timedelta(hours=hours)
# Fetch all events in the window via CalDAV
try:
event_objs = calendar.date_search(start=now, end=window_end)
except Exception:
event_objs = calendar.events()
existing = []
for e_obj in event_objs:
e = _event_to_dict(e_obj)
# Skip all-day events — they're notes/reminders, not time commitments
if e.get("start", {}).get("date") and not e.get("start", {}).get("dateTime"):
continue
s = _parse_gcal_dt(e["start"].get("dateTime", e["start"].get("date", "")))
en = _parse_gcal_dt(e["end"].get("dateTime", e["end"].get("date", "")))
if s and en:
existing.append({
"id": e.get("id", ""),
"summary": e.get("summary", "(no title)"),
"start": s,
"end": en,
})
# Build the full event list to check
if events is not None:
# Normalize provided events (may have ISO strings or datetime objects)
new_events = []
for ev in events:
s_raw = ev.get("start")
e_raw = ev.get("end")
s = s_raw if isinstance(s_raw, datetime) else _parse_gcal_dt(str(s_raw)) if s_raw else None
en = e_raw if isinstance(e_raw, datetime) else _parse_gcal_dt(str(e_raw)) if e_raw else None
if s and en:
new_events.append({
"id": ev.get("id", "new"),
"summary": ev.get("summary", "(new event)"),
"start": s,
"end": en,
})
# Check each new event against existing calendar events
to_check = new_events
against = existing
else:
# Check ALL events against each other
to_check = existing
against = existing
conflicts = []
seen_pairs = set()
for i, ev1 in enumerate(to_check):
for j, ev2 in enumerate(against):
# Skip self-comparison
if events is None and i == j:
continue
# Skip if same event by ID
if ev1.get("id") and ev2.get("id") and ev1["id"] == ev2["id"]:
continue
# Deduplicate pairs (always compare in sorted order)
pair_key = tuple(sorted([ev1.get("id", str(i)), ev2.get("id", str(j))]))
if pair_key in seen_pairs:
continue
seen_pairs.add(pair_key)
overlap = _overlap_minutes(ev1["start"], ev1["end"], ev2["start"], ev2["end"])
if overlap >= MIN_OVERLAP_MINUTES:
conflicts.append({
"event1": {
"id": ev1.get("id", ""),
"summary": ev1["summary"],
"start": ev1["start"].isoformat(),
"end": ev1["end"].isoformat(),
},
"event2": {
"id": ev2.get("id", ""),
"summary": ev2["summary"],
"start": ev2["start"].isoformat(),
"end": ev2["end"].isoformat(),
},
"overlap_minutes": round(overlap),
})
return conflicts
---------------------------------------------------------------------------
Conflict Resolution
---------------------------------------------------------------------------
def _call_llm_resolve(system, user, temperature=0.3):
"""Send a chat completion request to the LLM for conflict resolution."""
payload = {
"model": LLM_MODEL,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"temperature": temperature,
}
try:
resp = requests.post(LLM_URL, json=payload, timeout=LLM_RESOLVE_TIMEOUT)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"].strip()
except requests.exceptions.Timeout:
print(f" [LLM] Resolution timeout after {LLM_RESOLVE_TIMEOUT}s", file=sys.stderr)
return None
except Exception as e:
print(f" [LLM] Resolution error: {e}", file=sys.stderr)
return None
def resolve_conflict(conflict):
"""Use the LLM to generate resolution suggestions for a scheduling conflict.
Takes a conflict dict (from detect_conflicts) and returns resolution options
with a human-friendly message the family can act on.
"""
prompts = load_prompts()
resolution_prompt = prompts["conflict_resolve"]
e1 = conflict["event1"]
e2 = conflict["event2"]
user_msg = json.dumps({
"conflict": {
"event1": {
"summary": e1["summary"],
"start": e1["start"],
"end": e1["end"],
},
"event2": {
"summary": e2["summary"],
"start": e2["start"],
"end": e2["end"],
},
"overlap_minutes": conflict["overlap_minutes"],
}
}, indent=2)
raw = _call_llm_resolve(resolution_prompt, user_msg)
if not raw:
return {
"conflict_summary": f"{e1['summary']} conflicts with {e2['summary']}",
"message": f"⚠️ Schedule conflict: {e1['summary']} ({e1['start']}) overlaps with {e2['summary']} ({e2['start']}) by {conflict['overlap_minutes']}min. Please review.",
"options": [],
"error": "LLM resolution failed",
}
# Parse JSON from LLM response
text = raw.strip()
if text.startswith("```"):
text = re.sub(r'^```(?:json)?\s*\n?', '', text)
text = re.sub(r'\n?```\s*$', '', text)
text = text.strip()
try:
parsed = json.loads(text)
except json.JSONDecodeError:
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
try:
parsed = json.loads(match.group(0))
except json.JSONDecodeError:
parsed = None
else:
parsed = None
if not parsed or not isinstance(parsed, dict):
return {
"conflict_summary": f"{e1['summary']} conflicts with {e2['summary']}",
"message": f"⚠️ Schedule conflict: {e1['summary']} ({e1['start']}) overlaps with {e2['summary']} ({e2['start']}) by {conflict['overlap_minutes']}min. Please review.",
"options": [],
"error": "Could not parse LLM resolution response",
"raw": text,
}
return parsed
def resolve_all_conflicts(hours=168):
"""Detect all conflicts in the next N hours and generate resolution suggestions.
Returns a list of dicts, each with the conflict and its resolution options.
"""
conflicts = detect_conflicts(hours=hours)
if not conflicts:
return []
resolutions = []
for conflict in conflicts:
resolution = resolve_conflict(conflict)
resolutions.append({
"conflict": conflict,
"resolution": resolution,
})
return resolutions
---------------------------------------------------------------------------
Response Handler — Execute a chosen resolution option
---------------------------------------------------------------------------
def execute_resolution(conflict, resolution, option_index, dry_run=False):
"""Execute a chosen resolution option for a scheduling conflict.
Args:
conflict: the conflict dict from detect_conflicts
resolution: the resolution dict from resolve_conflict
option_index: 1-based index into resolution["options"]
dry_run: if True, report what would happen without making changes
Returns a dict with the action taken and results.
"""
options = resolution.get("options", [])
if not options or option_index < 1 or option_index > len(options):
return {
"status": "ERROR",
"message": f"Invalid option {option_index}. Available: 1-{len(options)}",
}
chosen = options[option_index - 1]
action = chosen.get("action", "").lower()
affects = chosen.get("affects_event", "")
affects_who = chosen.get("affects_who", "")
description = chosen.get("description", "")
# Identify the events involved
event1_data = conflict.get("event1", {})
event2_data = conflict.get("event2", {})
priority = resolution.get("priority_event", "event1")
# The event to modify/cancel is the one specified by affects_event,
# or if not specified, the non-priority event
if affects == "event2":
target_data, keep_data = event2_data, event1_data
elif affects == "event1":
target_data, keep_data = event1_data, event2_data
elif priority == "event1":
target_data, keep_data = event2_data, event1_data
else:
target_data, keep_data = event1_data, event2_data
result = {
"action": action,
"chosen_option": chosen,
"target_event": target_data.get("summary", ""),
"kept_event": keep_data.get("summary", ""),
}
# Find the actual calendar events
target_event = _find_event_by_summary_and_time(
target_data.get("summary", ""), target_data.get("start", "")
)
keep_event = _find_event_by_summary_and_time(
keep_data.get("summary", ""), keep_data.get("start", "")
)
if not target_event:
result["status"] = "ERROR"
result["message"] = f"Could not find calendar event: {target_data.get('summary', '')}"
return result
if dry_run:
result["status"] = "DRY_RUN"
result["message"] = f"Would {action}: {description}"
return result
# ---- SPLIT: Both events happen, parents divide responsibilities ----
if action == "split":
# Update both events with who's handling them
if keep_event and affects_who:
_update_event_description(
keep_event["id"],
f"📋 {affects_who} handling this (conflict resolved via split)",
)
_update_event_description(
target_event["id"],
f"📋 {affects_who} handling this (conflict resolved via split)",
)
result["status"] = "SPLIT"
result["message"] = (
f"Both events happening. {affects_who} is handling {target_data.get('summary', '')}. "
f"Updated both calendar events."
)
return result
# ---- REASSIGN: Same event, different adult responsible ----
elif action == "reassign":
if affects_who:
_update_event_description(
target_event["id"],
f"📋 Reassigned: {affects_who} now handling this",
)
result["status"] = "REASSIGNED"
result["message"] = (
f"{affects_who} is now handling {target_data.get('summary', '')}. "
f"Calendar updated."
)
return result
# ---- RESCHEDULE: Remove lower-priority event + create rebook reminder ----
elif action == "reschedule":
target_summary = target_event.get("summary", "")
target_start = target_event["start"].get("dateTime", "")
target_location = target_event.get("location", "")
target_id = target_event["id"]
# Delete the conflicting event
_delete_event(target_id)
# Create a reminder to rebook
reminder_summary = f"📞 Rebook: {target_summary}"
reminder_desc = (
f"This event was removed due to a scheduling conflict.\n"
f"Original: {target_summary} at {target_start}\n"
f"Location: {target_location}\n"
f"Action needed: Call to reschedule."
)
# Schedule the reminder for today (or next business day) at 9 AM
reminder_dt = datetime.now(CHICAGO_TZ).replace(
hour=9, minute=0, second=0, microsecond=0
)
if reminder_dt <= datetime.now(CHICAGO_TZ):
reminder_dt += timedelta(days=1)
reminder_end = reminder_dt + timedelta(minutes=30)
reminder_event = create_event(
summary=reminder_summary,
start_dt=reminder_dt,
end_dt=reminder_end,
description=reminder_desc,
)
result["status"] = "RESCHEDULED"
result["deleted_event_id"] = target_id
result["reminder_event_id"] = reminder_event["id"]
result["message"] = (
f"Removed {target_summary} from calendar. "
f"Created rebook reminder for {reminder_dt.strftime('%A %b %d at %-I:%M %p')}. "
f"You'll need to call to reschedule."
)
return result
# ---- Unknown action ----
else:
result["status"] = "ERROR"
result["message"] = f"Unknown action: {action}. Valid: split, reassign, reschedule."
return result