"""CalDAV Calendar CRUD — Radicale backend (replaces Google Calendar API). All Google Calendar API references have been stripped. Uses caldav + icalendar libraries to write events to the local Radicale instance on the Beelink. Phone clients connect via Cloudflare Tunnel (cal.yourdomain.com) or Tailscale IP. Pipeline writes via localhost (127.0.0.1:5232) as the 'assistant' user. """ import os import sys import uuid from datetime import datetime, timedelta, timezone, date from zoneinfo import ZoneInfo import caldav from icalendar import Calendar as iCalendar, Event as iEvent from family_assistant.config import CHICAGO_TZ # --------------------------------------------------------------------------- # CalDAV Connection # --------------------------------------------------------------------------- CALDAV_URL = os.environ.get("CALDAV_URL", "http://127.0.0.1:5232") CALDAV_USER = os.environ.get("CALDAV_USER", "assistant") CALDAV_PASSWORD = os.environ.get("CALDAV_PASSWORD", "") # Required, no default CALDAV_CALENDAR_NAME = os.environ.get("CALDAV_CALENDAR_NAME", "family") _dav_client = None _dav_calendar = None def get_calendar_service(): """Return the CalDAV calendar object (replaces Google Calendar service). Caches the connection for the process lifetime. Radicale is local so connection pooling isn't critical, but reuse avoids re-auth overhead. """ global _dav_client, _dav_calendar if _dav_calendar is not None: return _dav_calendar try: _dav_client = caldav.DAVClient( url=CALDAV_URL, username=CALDAV_USER, password=CALDAV_PASSWORD, ) principal = _dav_client.principal() # Find or create the family calendar calendars = principal.calendars() for cal in calendars: # caldav library returns calendar name in various ways cal_name = getattr(cal, 'name', None) or '' if cal_name == CALDAV_CALENDAR_NAME: _dav_calendar = cal break if _dav_calendar is None: # Create the calendar if it doesn't exist _dav_calendar = principal.make_calendar( name=CALDAV_CALENDAR_NAME, ) print(f" Created CalDAV calendar: {CALDAV_CALENDAR_NAME}", file=sys.stderr) return _dav_calendar except Exception as e: print(f" [CalDAV] Connection error: {e}", file=sys.stderr) raise def _reset_calendar_cache(): """Force reconnection on next get_calendar_service() call.""" global _dav_client, _dav_calendar _dav_client = None _dav_calendar = None # --------------------------------------------------------------------------- # Event builders (iCalendar format) # --------------------------------------------------------------------------- def _build_ics(summary, start_dt, end_dt, description="", location="", uid=None, recurrence_rule=None): """Build an iCalendar VEVENT string. Args: summary: Event title start_dt: datetime or date string 'YYYY-MM-DD' for all-day end_dt: datetime or date string 'YYYY-MM-DD' for all-day description: Event description location: Event location uid: Optional UID (auto-generated if not provided) recurrence_rule: Optional RRULE string (e.g. 'FREQ=WEEKLY;BYDAY=MO,WE') Returns: iCalendar string """ is_all_day = isinstance(start_dt, str) cal = iCalendar() cal.add("prodid", "-//Family Assistant//EN") cal.add("version", "2.0") cal.add("calscale", "GREGORIAN") event = iEvent() event.add("summary", summary) if description: event.add("description", description) if location: event.add("location", location) # UID — unique per event, stable for dedup event.add("uid", uid or str(uuid.uuid4())) if is_all_day: # All-day events use DATE not DATETIME event.add("dtstart", date.fromisoformat(start_dt)) # End date for all-day: day AFTER the last day (iCal convention) end_date = date.fromisoformat(end_dt) if end_dt else date.fromisoformat(start_dt) if end_date == date.fromisoformat(start_dt): end_date = end_date + timedelta(days=1) # Single all-day: next day event.add("dtend", end_date) else: event.add("dtstart", start_dt) event.add("dtend", end_dt) if recurrence_rule: event.add("rrule", recurrence_rule) event.add("dtstamp", datetime.now(timezone.utc)) cal.add_component(event) return cal.to_ical().decode("utf-8") def _event_to_dict(event_obj): """Convert a caldav Event object to a dict matching the old Google format. This keeps the rest of the codebase (pipeline, hermes, etc.) working without rewriting all the downstream dict key references. """ try: ics_data = event_obj.data cal = iCalendar.from_ical(ics_data) for component in cal.walk(): if component.name == "VEVENT": summary = str(component.get("summary", "(no title)")) dtstart = component.get("dtstart") dtend = component.get("dtend") uid = str(component.get("uid", "")) description = str(component.get("description", "")) location = str(component.get("location", "")) # Format start/end consistently with old Google format start_val = dtstart.dt if dtstart else "" end_val = dtend.dt if dtend else "" # datetime → ISO string; date → date string if isinstance(start_val, datetime): start_str = start_val.isoformat() elif isinstance(start_val, date): start_str = start_val.isoformat() else: start_str = str(start_val) if isinstance(end_val, datetime): end_str = end_val.isoformat() elif isinstance(end_val, date): end_str = end_val.isoformat() else: end_str = str(end_val) return { "id": uid, "summary": summary, "start": {"dateTime": start_str} if isinstance(start_val, datetime) else {"date": start_str}, "end": {"dateTime": end_str} if isinstance(end_val, datetime) else {"date": end_str}, "description": description, "location": location, "htmlLink": "", # No web UI for Radicale } except Exception as e: print(f" [CalDAV] Error parsing event: {e}", file=sys.stderr) return { "id": getattr(event_obj, 'url', ''), "summary": "(parse error)", "start": {"dateTime": ""}, "end": {"dateTime": ""}, "description": "", "location": "", "htmlLink": "", } # --------------------------------------------------------------------------- # CRUD operations (same interface as the old Google version) # --------------------------------------------------------------------------- def list_upcoming_events(hours=48): """Return upcoming events in the next N hours.""" calendar = get_calendar_service() now = datetime.now(timezone.utc) end = now + timedelta(hours=hours) try: events = calendar.date_search(start=now, end=end) result = [] for e in events: d = _event_to_dict(e) result.append(d) return result except Exception as e: print(f" [CalDAV] list_upcoming_events error: {e}", file=sys.stderr) return [] def event_exists(summary, start_dt, tolerance_minutes=10): """Check if a calendar event with similar summary and start time exists. Uses fuzzy matching: same summary text (case-insensitive) and start time within tolerance_minutes. Prevents duplicate calendar events. """ try: calendar = get_calendar_service() window_start = start_dt - timedelta(minutes=tolerance_minutes) window_end = start_dt + timedelta(minutes=tolerance_minutes) events = calendar.date_search(start=window_start, end=window_end) for event_obj in events: d = _event_to_dict(event_obj) existing_summary = (d.get("summary", "") or "").lower().strip() new_summary = (summary or "").lower().strip() if existing_summary == new_summary: return d return None except Exception: # If we can't check, don't block creation — fail open return None def create_event(summary, start_dt, end_dt, description="", location=""): """Create a calendar event. Returns the event dict. Supports both timed events (datetime objects) and all-day events (date strings 'YYYY-MM-DD'). """ calendar = get_calendar_service() uid = str(uuid.uuid4()) ics = _build_ics( summary=summary, start_dt=start_dt, end_dt=end_dt, description=description, location=location, uid=uid, ) calendar.save_event(ics) # Build return dict in Google-compatible format is_all_day = isinstance(start_dt, str) if is_all_day: return { "id": uid, "summary": summary, "start": {"date": start_dt, "timeZone": "America/Chicago"}, "end": {"date": end_dt or start_dt, "timeZone": "America/Chicago"}, "description": description, "location": location, "htmlLink": "", } else: return { "id": uid, "summary": summary, "start": {"dateTime": start_dt.isoformat(), "timeZone": "America/Chicago"}, "end": {"dateTime": end_dt.isoformat(), "timeZone": "America/Chicago"}, "description": description, "location": location, "htmlLink": "", } def create_recurring_event(summary, start_dt, end_dt, recurrence, description="", location=""): """Create a recurring calendar event using RRULE. The recurrence dict is translated to a valid RRULE string by the deterministic rrule_builder module. The LLM never generates RRULE directly. """ from family_assistant.rrule_builder import build_rrule, validate_recurrence errors = validate_recurrence(recurrence) if errors: raise ValueError(f"Invalid recurrence: {'; '.join(errors)}") rrule_str = build_rrule(recurrence) if not rrule_str: raise ValueError(f"Could not build RRULE from: {recurrence}") calendar = get_calendar_service() uid = str(uuid.uuid4()) # Parse RRULE string into icalendar-compatible dict # build_rrule returns "FREQ=WEEKLY;BYDAY=MO,WE" format rrule_dict = {} for part in rrule_str.split(";"): key, _, val = part.partition("=") key = key.strip() val = val.strip() # Integer fields if key in ("FREQ", "UNTIL", "BYDAY", "BYMONTH", "BYMONTHDAY", "BYSETPOS"): if key == "FREQ": rrule_dict["freq"] = val elif key == "UNTIL": # Parse date try: until_dt = datetime.strptime(val, "%Y%m%dT%H%M%SZ") rrule_dict["until"] = until_dt.replace(tzinfo=timezone.utc) except ValueError: try: until_dt = datetime.strptime(val, "%Y%m%d") rrule_dict["until"] = until_dt.replace(tzinfo=timezone.utc) except ValueError: pass elif key == "BYDAY": rrule_dict["byday"] = val.split(",") elif key == "BYMONTH": rrule_dict["bymonth"] = int(val) elif key == "BYMONTHDAY": rrule_dict["bymonthday"] = int(val) elif key == "BYSETPOS": rrule_dict["bysetpos"] = int(val) elif key == "COUNT": rrule_dict["count"] = int(val) elif key == "INTERVAL": rrule_dict["interval"] = int(val) ics = _build_ics( summary=summary, start_dt=start_dt, end_dt=end_dt, description=description, location=location, uid=uid, recurrence_rule=rrule_dict, ) calendar.save_event(ics) return { "id": uid, "summary": summary, "start": {"dateTime": start_dt.isoformat(), "timeZone": "America/Chicago"}, "end": {"dateTime": end_dt.isoformat(), "timeZone": "America/Chicago"}, "recurrence": [f"RRULE:{rrule_str}"], "htmlLink": "", } def find_and_cancel_event(summary, start_dt, tolerance_minutes=30): """Find and delete a calendar event matching a cancellation. Uses fuzzy matching: case-insensitive partial summary match and start time within tolerance_minutes. Returns a dict with status and event details, or None if no match found. """ try: calendar = get_calendar_service() window_start = start_dt - timedelta(minutes=tolerance_minutes) window_end = start_dt + timedelta(minutes=tolerance_minutes) events = calendar.date_search(start=window_start, end=window_end) search_summary = (summary or "").lower().strip() for event_obj in events: d = _event_to_dict(event_obj) existing_summary = (d.get("summary", "") or "").lower().strip() # Partial match: either summary contains the other if search_summary and existing_summary and ( search_summary in existing_summary or existing_summary in search_summary ): event_id = d["id"] event_summary = d.get("summary", summary) event_start = d["start"].get("dateTime", "") try: event_obj.delete() print( f" Cancelled: {event_summary} (uid={event_id})", file=sys.stderr, ) return { "status": "CANCELLED", "id": event_id, "summary": event_summary, "start": event_start, } except Exception as e: print( f" Error deleting event {event_id}: {e}", file=sys.stderr, ) return { "status": "CANCEL_FAILED", "id": event_id, "summary": event_summary, "error": str(e), } return None except Exception as e: print(f" Error searching for event to cancel: {e}", file=sys.stderr) return None def _find_event_by_summary_and_time(summary, start_iso, tolerance_minutes=30): """Find a calendar event matching summary (partial, case-insensitive) and start time. Returns the event dict (Google-compatible format), or None. """ try: calendar = get_calendar_service() start_dt = datetime.fromisoformat(start_iso) window_start = start_dt - timedelta(minutes=tolerance_minutes) window_end = start_dt + timedelta(minutes=tolerance_minutes) events = calendar.date_search(start=window_start, end=window_end) search = (summary or "").lower().strip() for event_obj in events: d = _event_to_dict(event_obj) existing = (d.get("summary", "") or "").lower().strip() if search and existing and (search in existing or existing in search): return d return None except Exception as e: print(f" [Handler] Error finding event: {e}", file=sys.stderr) return None def _update_event_description(event_id, extra_text): """Append text to an event's description field.""" calendar = get_calendar_service() # Search for the event by UID events = calendar.events() for event_obj in events: d = _event_to_dict(event_obj) if d.get("id") == event_id: try: ics_data = event_obj.data cal = iCalendar.from_ical(ics_data) for component in cal.walk(): if component.name == "VEVENT": existing_desc = str(component.get("description", "")) or "" component["description"] = f"{existing_desc}\n{extra_text}".strip() event_obj.data = cal.to_ical().decode("utf-8") event_obj.save() return except Exception as e: print(f" [CalDAV] Error updating event: {e}", file=sys.stderr) return def _delete_event(event_id): """Delete a calendar event by UID.""" calendar = get_calendar_service() events = calendar.events() for event_obj in events: d = _event_to_dict(event_obj) if d.get("id") == event_id: event_obj.delete() return def cancel_recurring_instance(event_id, instance_date, dry_run=False): """Cancel a single instance of a recurring event. Creates an EXDATE exception on the master recurring event that excludes the specified instance date. Args: event_id: UID of the master recurring event. instance_date: ISO 8601 date string (YYYY-MM-DD). dry_run: If True, return what would happen without executing. Returns: Dict with status and details. """ calendar = get_calendar_service() events = calendar.events() target_event = None for event_obj in events: d = _event_to_dict(event_obj) if d.get("id") == event_id: target_event = event_obj break if target_event is None: return {"status": "error", "message": f"Could not find event {event_id}"} d = _event_to_dict(target_event) summary = d.get("summary", "") start = d["start"].get("dateTime", d["start"].get("date", "")) if dry_run: return { "status": "DRY_RUN", "action": "cancel_instance", "summary": summary, "start": start, } # Add EXDATE for the instance date try: ics_data = target_event.data cal = iCalendar.from_ical(ics_data) for component in cal.walk(): if component.name == "VEVENT": # Add EXDATE ex_date = datetime.strptime(instance_date, "%Y-%m-%d") ex_date = ex_date.replace(tzinfo=CHICAGO_TZ) if "EXDATE" in component: existing = component["EXDATE"] if isinstance(existing, list): existing.append(ex_date) else: component["EXDATE"] = [existing, ex_date] else: component.add("exdate", ex_date) target_event.data = cal.to_ical().decode("utf-8") target_event.save() return { "status": "INSTANCE_CANCELLED", "summary": summary, "start": start, "event_id": event_id, } except Exception as e: return {"status": "error", "message": f"Failed to cancel instance: {e}"} def cancel_recurring_series(event_id, dry_run=False): """Delete all remaining instances of a recurring event series. Deletes the master recurring event, which removes all current and future instances. Args: event_id: UID of the master recurring event. dry_run: If True, return what would happen without executing. Returns: Dict with status and details. """ calendar = get_calendar_service() events = calendar.events() target_event = None for event_obj in events: d = _event_to_dict(event_obj) if d.get("id") == event_id: target_event = event_obj break if target_event is None: return {"status": "error", "message": f"Could not find event {event_id}"} d = _event_to_dict(target_event) summary = d.get("summary", "") start = d["start"].get("dateTime", d["start"].get("date", "")) if dry_run: return { "status": "DRY_RUN", "action": "cancel_series", "summary": summary, "master_start": start, } try: target_event.delete() return { "status": "SERIES_CANCELLED", "summary": summary, "master_id": event_id, } except Exception as e: return {"status": "error", "message": f"Failed to cancel series: {e}"} def find_recurring_instance(summary, instance_date): """Find a specific instance of a recurring event by summary and date. Args: summary: Event summary to search for (fuzzy match). instance_date: ISO 8601 date string (YYYY-MM-DD) of the target instance. Returns: Event dict (Google-compatible format) for that instance, or None. """ calendar = get_calendar_service() now = datetime.now(timezone.utc) time_max = now + timedelta(days=180) try: events = calendar.date_search(start=now, end=time_max) except Exception: events = calendar.events() target_date = datetime.strptime(instance_date, "%Y-%m-%d").date() summary_lower = summary.lower().strip() candidates = [] for event_obj in events: d = _event_to_dict(event_obj) event_summary = (d.get("summary", "") or "").lower().strip() # Check summary match (exact or partial) if summary_lower not in event_summary and event_summary not in summary_lower: base = event_summary.split(" (")[0] if summary_lower not in base and base not in summary_lower: continue # Check date match start_str = d["start"].get("dateTime", d["start"].get("date", "")) if not start_str: continue try: if "T" in start_str: event_date = datetime.fromisoformat(start_str).date() else: event_date = datetime.strptime(start_str, "%Y-%m-%d").date() except (ValueError, TypeError): continue if event_date == target_date: candidates.append(d) if len(candidates) == 1: return candidates[0] elif len(candidates) > 1: return candidates[0] # Return first match return None