"""Conflict detection, resolution, and response handler.""" import json import re import sys from datetime import datetime, timedelta import requests from family_assistant.config import ( LLM_URL, LLM_MODEL, LLM_RESOLVE_TIMEOUT, CHICAGO_TZ, MIN_OVERLAP_MINUTES, load_prompts, ) from family_assistant.calendar_sync import ( get_calendar_service, _event_to_dict, _find_event_by_summary_and_time, _update_event_description, _delete_event, create_event, ) # --------------------------------------------------------------------------- # Conflict Detection # --------------------------------------------------------------------------- def detect_conflicts(events=None, hours=168): """Detect scheduling conflicts in the calendar. If `events` is provided (list of dicts with summary/start/end keys), check each against existing calendar events. If `events` is None, scan the next `hours` for ALL conflicts. Two events conflict if their time ranges overlap (start1 < end2 AND start2 < end1), ignoring trivial overlaps of less than MIN_OVERLAP_MINUTES minutes. Returns a list of conflict pairs. """ def _parse_gcal_dt(dt_str): """Parse a Google Calendar datetime string to a timezone-aware datetime.""" if not dt_str: return None try: dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00")) return dt.astimezone(CHICAGO_TZ) except (ValueError, TypeError): try: dt = datetime.strptime(dt_str, "%Y-%m-%d") return dt.replace(tzinfo=CHICAGO_TZ) except (ValueError, TypeError): return None def _overlap_minutes(s1, e1, s2, e2): """Calculate overlap in minutes between two time ranges.""" latest_start = max(s1, s2) earliest_end = min(e1, e2) delta = (earliest_end - latest_start).total_seconds() / 60 return max(0, delta) calendar = get_calendar_service() now = datetime.now(CHICAGO_TZ) window_end = now + timedelta(hours=hours) # Fetch all events in the window via CalDAV try: event_objs = calendar.date_search(start=now, end=window_end) except Exception: event_objs = calendar.events() existing = [] for e_obj in event_objs: e = _event_to_dict(e_obj) # Skip all-day events — they're notes/reminders, not time commitments if e.get("start", {}).get("date") and not e.get("start", {}).get("dateTime"): continue s = _parse_gcal_dt(e["start"].get("dateTime", e["start"].get("date", ""))) en = _parse_gcal_dt(e["end"].get("dateTime", e["end"].get("date", ""))) if s and en: existing.append({ "id": e.get("id", ""), "summary": e.get("summary", "(no title)"), "start": s, "end": en, }) # Build the full event list to check if events is not None: # Normalize provided events (may have ISO strings or datetime objects) new_events = [] for ev in events: s_raw = ev.get("start") e_raw = ev.get("end") s = s_raw if isinstance(s_raw, datetime) else _parse_gcal_dt(str(s_raw)) if s_raw else None en = e_raw if isinstance(e_raw, datetime) else _parse_gcal_dt(str(e_raw)) if e_raw else None if s and en: new_events.append({ "id": ev.get("id", "new"), "summary": ev.get("summary", "(new event)"), "start": s, "end": en, }) # Check each new event against existing calendar events to_check = new_events against = existing else: # Check ALL events against each other to_check = existing against = existing conflicts = [] seen_pairs = set() for i, ev1 in enumerate(to_check): for j, ev2 in enumerate(against): # Skip self-comparison if events is None and i == j: continue # Skip if same event by ID if ev1.get("id") and ev2.get("id") and ev1["id"] == ev2["id"]: continue # Deduplicate pairs (always compare in sorted order) pair_key = tuple(sorted([ev1.get("id", str(i)), ev2.get("id", str(j))])) if pair_key in seen_pairs: continue seen_pairs.add(pair_key) overlap = _overlap_minutes(ev1["start"], ev1["end"], ev2["start"], ev2["end"]) if overlap >= MIN_OVERLAP_MINUTES: conflicts.append({ "event1": { "id": ev1.get("id", ""), "summary": ev1["summary"], "start": ev1["start"].isoformat(), "end": ev1["end"].isoformat(), }, "event2": { "id": ev2.get("id", ""), "summary": ev2["summary"], "start": ev2["start"].isoformat(), "end": ev2["end"].isoformat(), }, "overlap_minutes": round(overlap), }) return conflicts # --------------------------------------------------------------------------- # Conflict Resolution # --------------------------------------------------------------------------- def _call_llm_resolve(system, user, temperature=0.3): """Send a chat completion request to the LLM for conflict resolution.""" payload = { "model": LLM_MODEL, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": user}, ], "temperature": temperature, } try: resp = requests.post(LLM_URL, json=payload, timeout=LLM_RESOLVE_TIMEOUT) resp.raise_for_status() data = resp.json() return data["choices"][0]["message"]["content"].strip() except requests.exceptions.Timeout: print(f" [LLM] Resolution timeout after {LLM_RESOLVE_TIMEOUT}s", file=sys.stderr) return None except Exception as e: print(f" [LLM] Resolution error: {e}", file=sys.stderr) return None def resolve_conflict(conflict): """Use the LLM to generate resolution suggestions for a scheduling conflict. Takes a conflict dict (from detect_conflicts) and returns resolution options with a human-friendly message the family can act on. """ prompts = load_prompts() resolution_prompt = prompts["conflict_resolve"] e1 = conflict["event1"] e2 = conflict["event2"] user_msg = json.dumps({ "conflict": { "event1": { "summary": e1["summary"], "start": e1["start"], "end": e1["end"], }, "event2": { "summary": e2["summary"], "start": e2["start"], "end": e2["end"], }, "overlap_minutes": conflict["overlap_minutes"], } }, indent=2) raw = _call_llm_resolve(resolution_prompt, user_msg) if not raw: return { "conflict_summary": f"{e1['summary']} conflicts with {e2['summary']}", "message": f"⚠️ Schedule conflict: {e1['summary']} ({e1['start']}) overlaps with {e2['summary']} ({e2['start']}) by {conflict['overlap_minutes']}min. Please review.", "options": [], "error": "LLM resolution failed", } # Parse JSON from LLM response text = raw.strip() if text.startswith("```"): text = re.sub(r'^```(?:json)?\s*\n?', '', text) text = re.sub(r'\n?```\s*$', '', text) text = text.strip() try: parsed = json.loads(text) except json.JSONDecodeError: match = re.search(r'\{.*\}', text, re.DOTALL) if match: try: parsed = json.loads(match.group(0)) except json.JSONDecodeError: parsed = None else: parsed = None if not parsed or not isinstance(parsed, dict): return { "conflict_summary": f"{e1['summary']} conflicts with {e2['summary']}", "message": f"⚠️ Schedule conflict: {e1['summary']} ({e1['start']}) overlaps with {e2['summary']} ({e2['start']}) by {conflict['overlap_minutes']}min. Please review.", "options": [], "error": "Could not parse LLM resolution response", "raw": text, } return parsed def resolve_all_conflicts(hours=168): """Detect all conflicts in the next N hours and generate resolution suggestions. Returns a list of dicts, each with the conflict and its resolution options. """ conflicts = detect_conflicts(hours=hours) if not conflicts: return [] resolutions = [] for conflict in conflicts: resolution = resolve_conflict(conflict) resolutions.append({ "conflict": conflict, "resolution": resolution, }) return resolutions # --------------------------------------------------------------------------- # Response Handler — Execute a chosen resolution option # --------------------------------------------------------------------------- def execute_resolution(conflict, resolution, option_index, dry_run=False): """Execute a chosen resolution option for a scheduling conflict. Args: conflict: the conflict dict from detect_conflicts resolution: the resolution dict from resolve_conflict option_index: 1-based index into resolution["options"] dry_run: if True, report what would happen without making changes Returns a dict with the action taken and results. """ options = resolution.get("options", []) if not options or option_index < 1 or option_index > len(options): return { "status": "ERROR", "message": f"Invalid option {option_index}. Available: 1-{len(options)}", } chosen = options[option_index - 1] action = chosen.get("action", "").lower() affects = chosen.get("affects_event", "") affects_who = chosen.get("affects_who", "") description = chosen.get("description", "") # Identify the events involved event1_data = conflict.get("event1", {}) event2_data = conflict.get("event2", {}) priority = resolution.get("priority_event", "event1") # The event to modify/cancel is the one specified by affects_event, # or if not specified, the non-priority event if affects == "event2": target_data, keep_data = event2_data, event1_data elif affects == "event1": target_data, keep_data = event1_data, event2_data elif priority == "event1": target_data, keep_data = event2_data, event1_data else: target_data, keep_data = event1_data, event2_data result = { "action": action, "chosen_option": chosen, "target_event": target_data.get("summary", ""), "kept_event": keep_data.get("summary", ""), } # Find the actual calendar events target_event = _find_event_by_summary_and_time( target_data.get("summary", ""), target_data.get("start", "") ) keep_event = _find_event_by_summary_and_time( keep_data.get("summary", ""), keep_data.get("start", "") ) if not target_event: result["status"] = "ERROR" result["message"] = f"Could not find calendar event: {target_data.get('summary', '')}" return result if dry_run: result["status"] = "DRY_RUN" result["message"] = f"Would {action}: {description}" return result # ---- SPLIT: Both events happen, parents divide responsibilities ---- if action == "split": # Update both events with who's handling them if keep_event and affects_who: _update_event_description( keep_event["id"], f"📋 {affects_who} handling this (conflict resolved via split)", ) _update_event_description( target_event["id"], f"📋 {affects_who} handling this (conflict resolved via split)", ) result["status"] = "SPLIT" result["message"] = ( f"Both events happening. {affects_who} is handling {target_data.get('summary', '')}. " f"Updated both calendar events." ) return result # ---- REASSIGN: Same event, different adult responsible ---- elif action == "reassign": if affects_who: _update_event_description( target_event["id"], f"📋 Reassigned: {affects_who} now handling this", ) result["status"] = "REASSIGNED" result["message"] = ( f"{affects_who} is now handling {target_data.get('summary', '')}. " f"Calendar updated." ) return result # ---- RESCHEDULE: Remove lower-priority event + create rebook reminder ---- elif action == "reschedule": target_summary = target_event.get("summary", "") target_start = target_event["start"].get("dateTime", "") target_location = target_event.get("location", "") target_id = target_event["id"] # Delete the conflicting event _delete_event(target_id) # Create a reminder to rebook reminder_summary = f"📞 Rebook: {target_summary}" reminder_desc = ( f"This event was removed due to a scheduling conflict.\n" f"Original: {target_summary} at {target_start}\n" f"Location: {target_location}\n" f"Action needed: Call to reschedule." ) # Schedule the reminder for today (or next business day) at 9 AM reminder_dt = datetime.now(CHICAGO_TZ).replace( hour=9, minute=0, second=0, microsecond=0 ) if reminder_dt <= datetime.now(CHICAGO_TZ): reminder_dt += timedelta(days=1) reminder_end = reminder_dt + timedelta(minutes=30) reminder_event = create_event( summary=reminder_summary, start_dt=reminder_dt, end_dt=reminder_end, description=reminder_desc, ) result["status"] = "RESCHEDULED" result["deleted_event_id"] = target_id result["reminder_event_id"] = reminder_event["id"] result["message"] = ( f"Removed {target_summary} from calendar. " f"Created rebook reminder for {reminder_dt.strftime('%A %b %d at %-I:%M %p')}. " f"You'll need to call to reschedule." ) return result # ---- Unknown action ---- else: result["status"] = "ERROR" result["message"] = f"Unknown action: {action}. Valid: split, reassign, reschedule." return result