"""CalDAV Calendar CRUD — Radicale backend (replaces Google Calendar API).
All Google Calendar API references have been stripped.
Uses caldav + icalendar libraries to write events to the local
Radicale instance on the Beelink.
Phone clients connect via Cloudflare Tunnel (cal.yourdomain.com) or Tailscale IP.
Pipeline writes via localhost (127.0.0.1:5232) as the 'assistant' user.
"""
import os
import sys
import uuid
from datetime import datetime, timedelta, timezone, date
from zoneinfo import ZoneInfo
import caldav
from icalendar import Calendar as iCalendar, Event as iEvent
from family_assistant.config import CHICAGO_TZ
---------------------------------------------------------------------------
CalDAV Connection
---------------------------------------------------------------------------
CALDAV_URL = os.environ.get("CALDAV_URL", "http://127.0.0.1:5232")
CALDAV_USER = os.environ.get("CALDAV_USER", "assistant")
CALDAV_PASSWORD = os.environ.get("CALDAV_PASSWORD", "") # Required, no default
CALDAV_CALENDAR_NAME = os.environ.get("CALDAV_CALENDAR_NAME", "family")
_dav_client = None
_dav_calendar = None
def get_calendar_service():
"""Return the CalDAV calendar object (replaces Google Calendar service).
Caches the connection for the process lifetime. Radicale is local
so connection pooling isn't critical, but reuse avoids re-auth overhead.
"""
global _dav_client, _dav_calendar
if _dav_calendar is not None:
return _dav_calendar
try:
_dav_client = caldav.DAVClient(
url=CALDAV_URL,
username=CALDAV_USER,
password=CALDAV_PASSWORD,
)
principal = _dav_client.principal()
# Find or create the family calendar
calendars = principal.calendars()
for cal in calendars:
# caldav library returns calendar name in various ways
cal_name = getattr(cal, 'name', None) or ''
if cal_name == CALDAV_CALENDAR_NAME:
_dav_calendar = cal
break
if _dav_calendar is None:
# Create the calendar if it doesn't exist
_dav_calendar = principal.make_calendar(
name=CALDAV_CALENDAR_NAME,
)
print(f" Created CalDAV calendar: {CALDAV_CALENDAR_NAME}", file=sys.stderr)
return _dav_calendar
except Exception as e:
print(f" [CalDAV] Connection error: {e}", file=sys.stderr)
raise
def _reset_calendar_cache():
"""Force reconnection on next get_calendar_service() call."""
global _dav_client, _dav_calendar
_dav_client = None
_dav_calendar = None
---------------------------------------------------------------------------
Event builders (iCalendar format)
---------------------------------------------------------------------------
def _build_ics(summary, start_dt, end_dt, description="", location="",
uid=None, recurrence_rule=None):
"""Build an iCalendar VEVENT string.
Args:
summary: Event title
start_dt: datetime or date string 'YYYY-MM-DD' for all-day
end_dt: datetime or date string 'YYYY-MM-DD' for all-day
description: Event description
location: Event location
uid: Optional UID (auto-generated if not provided)
recurrence_rule: Optional RRULE string (e.g. 'FREQ=WEEKLY;BYDAY=MO,WE')
Returns:
iCalendar string
"""
is_all_day = isinstance(start_dt, str)
cal = iCalendar()
cal.add("prodid", "-//Family Assistant//EN")
cal.add("version", "2.0")
cal.add("calscale", "GREGORIAN")
event = iEvent()
event.add("summary", summary)
if description:
event.add("description", description)
if location:
event.add("location", location)
# UID — unique per event, stable for dedup
event.add("uid", uid or str(uuid.uuid4()))
if is_all_day:
# All-day events use DATE not DATETIME
event.add("dtstart", date.fromisoformat(start_dt))
# End date for all-day: day AFTER the last day (iCal convention)
end_date = date.fromisoformat(end_dt) if end_dt else date.fromisoformat(start_dt)
if end_date == date.fromisoformat(start_dt):
end_date = end_date + timedelta(days=1) # Single all-day: next day
event.add("dtend", end_date)
else:
event.add("dtstart", start_dt)
event.add("dtend", end_dt)
if recurrence_rule:
event.add("rrule", recurrence_rule)
event.add("dtstamp", datetime.now(timezone.utc))
cal.add_component(event)
return cal.to_ical().decode("utf-8")
def _event_to_dict(event_obj):
"""Convert a caldav Event object to a dict matching the old Google format.
This keeps the rest of the codebase (pipeline, hermes, etc.) working
without rewriting all the downstream dict key references.
"""
try:
ics_data = event_obj.data
cal = iCalendar.from_ical(ics_data)
for component in cal.walk():
if component.name == "VEVENT":
summary = str(component.get("summary", "(no title)"))
dtstart = component.get("dtstart")
dtend = component.get("dtend")
uid = str(component.get("uid", ""))
description = str(component.get("description", ""))
location = str(component.get("location", ""))
# Format start/end consistently with old Google format
start_val = dtstart.dt if dtstart else ""
end_val = dtend.dt if dtend else ""
# datetime → ISO string; date → date string
if isinstance(start_val, datetime):
start_str = start_val.isoformat()
elif isinstance(start_val, date):
start_str = start_val.isoformat()
else:
start_str = str(start_val)
if isinstance(end_val, datetime):
end_str = end_val.isoformat()
elif isinstance(end_val, date):
end_str = end_val.isoformat()
else:
end_str = str(end_val)
return {
"id": uid,
"summary": summary,
"start": {"dateTime": start_str} if isinstance(start_val, datetime) else {"date": start_str},
"end": {"dateTime": end_str} if isinstance(end_val, datetime) else {"date": end_str},
"description": description,
"location": location,
"htmlLink": "", # No web UI for Radicale
}
except Exception as e:
print(f" [CalDAV] Error parsing event: {e}", file=sys.stderr)
return {
"id": getattr(event_obj, 'url', ''),
"summary": "(parse error)",
"start": {"dateTime": ""},
"end": {"dateTime": ""},
"description": "",
"location": "",
"htmlLink": "",
}
---------------------------------------------------------------------------
CRUD operations (same interface as the old Google version)
---------------------------------------------------------------------------
def list_upcoming_events(hours=48):
"""Return upcoming events in the next N hours."""
calendar = get_calendar_service()
now = datetime.now(timezone.utc)
end = now + timedelta(hours=hours)
try:
events = calendar.date_search(start=now, end=end)
result = []
for e in events:
d = _event_to_dict(e)
result.append(d)
return result
except Exception as e:
print(f" [CalDAV] list_upcoming_events error: {e}", file=sys.stderr)
return []
def event_exists(summary, start_dt, tolerance_minutes=10):
"""Check if a calendar event with similar summary and start time exists.
Uses fuzzy matching: same summary text (case-insensitive) and start time
within tolerance_minutes. Prevents duplicate calendar events.
"""
try:
calendar = get_calendar_service()
window_start = start_dt - timedelta(minutes=tolerance_minutes)
window_end = start_dt + timedelta(minutes=tolerance_minutes)
events = calendar.date_search(start=window_start, end=window_end)
for event_obj in events:
d = _event_to_dict(event_obj)
existing_summary = (d.get("summary", "") or "").lower().strip()
new_summary = (summary or "").lower().strip()
if existing_summary == new_summary:
return d
return None
except Exception:
# If we can't check, don't block creation — fail open
return None
def create_event(summary, start_dt, end_dt, description="", location=""):
"""Create a calendar event. Returns the event dict.
Supports both timed events (datetime objects) and all-day events
(date strings 'YYYY-MM-DD').
"""
calendar = get_calendar_service()
uid = str(uuid.uuid4())
ics = _build_ics(
summary=summary,
start_dt=start_dt,
end_dt=end_dt,
description=description,
location=location,
uid=uid,
)
calendar.save_event(ics)
# Build return dict in Google-compatible format
is_all_day = isinstance(start_dt, str)
if is_all_day:
return {
"id": uid,
"summary": summary,
"start": {"date": start_dt, "timeZone": "America/Chicago"},
"end": {"date": end_dt or start_dt, "timeZone": "America/Chicago"},
"description": description,
"location": location,
"htmlLink": "",
}
else:
return {
"id": uid,
"summary": summary,
"start": {"dateTime": start_dt.isoformat(), "timeZone": "America/Chicago"},
"end": {"dateTime": end_dt.isoformat(), "timeZone": "America/Chicago"},
"description": description,
"location": location,
"htmlLink": "",
}
def create_recurring_event(summary, start_dt, end_dt, recurrence, description="", location=""):
"""Create a recurring calendar event using RRULE.
The recurrence dict is translated to a valid RRULE string by the
deterministic rrule_builder module. The LLM never generates RRULE directly.
"""
from family_assistant.rrule_builder import build_rrule, validate_recurrence
errors = validate_recurrence(recurrence)
if errors:
raise ValueError(f"Invalid recurrence: {'; '.join(errors)}")
rrule_str = build_rrule(recurrence)
if not rrule_str:
raise ValueError(f"Could not build RRULE from: {recurrence}")
calendar = get_calendar_service()
uid = str(uuid.uuid4())
# Parse RRULE string into icalendar-compatible dict
# build_rrule returns "FREQ=WEEKLY;BYDAY=MO,WE" format
rrule_dict = {}
for part in rrule_str.split(";"):
key, _, val = part.partition("=")
key = key.strip()
val = val.strip()
# Integer fields
if key in ("FREQ", "UNTIL", "BYDAY", "BYMONTH", "BYMONTHDAY", "BYSETPOS"):
if key == "FREQ":
rrule_dict["freq"] = val
elif key == "UNTIL":
# Parse date
try:
until_dt = datetime.strptime(val, "%Y%m%dT%H%M%SZ")
rrule_dict["until"] = until_dt.replace(tzinfo=timezone.utc)
except ValueError:
try:
until_dt = datetime.strptime(val, "%Y%m%d")
rrule_dict["until"] = until_dt.replace(tzinfo=timezone.utc)
except ValueError:
pass
elif key == "BYDAY":
rrule_dict["byday"] = val.split(",")
elif key == "BYMONTH":
rrule_dict["bymonth"] = int(val)
elif key == "BYMONTHDAY":
rrule_dict["bymonthday"] = int(val)
elif key == "BYSETPOS":
rrule_dict["bysetpos"] = int(val)
elif key == "COUNT":
rrule_dict["count"] = int(val)
elif key == "INTERVAL":
rrule_dict["interval"] = int(val)
ics = _build_ics(
summary=summary,
start_dt=start_dt,
end_dt=end_dt,
description=description,
location=location,
uid=uid,
recurrence_rule=rrule_dict,
)
calendar.save_event(ics)
return {
"id": uid,
"summary": summary,
"start": {"dateTime": start_dt.isoformat(), "timeZone": "America/Chicago"},
"end": {"dateTime": end_dt.isoformat(), "timeZone": "America/Chicago"},
"recurrence": [f"RRULE:{rrule_str}"],
"htmlLink": "",
}
def find_and_cancel_event(summary, start_dt, tolerance_minutes=30):
"""Find and delete a calendar event matching a cancellation.
Uses fuzzy matching: case-insensitive partial summary match and start
time within tolerance_minutes.
Returns a dict with status and event details, or None if no match found.
"""
try:
calendar = get_calendar_service()
window_start = start_dt - timedelta(minutes=tolerance_minutes)
window_end = start_dt + timedelta(minutes=tolerance_minutes)
events = calendar.date_search(start=window_start, end=window_end)
search_summary = (summary or "").lower().strip()
for event_obj in events:
d = _event_to_dict(event_obj)
existing_summary = (d.get("summary", "") or "").lower().strip()
# Partial match: either summary contains the other
if search_summary and existing_summary and (
search_summary in existing_summary or existing_summary in search_summary
):
event_id = d["id"]
event_summary = d.get("summary", summary)
event_start = d["start"].get("dateTime", "")
try:
event_obj.delete()
print(
f" Cancelled: {event_summary} (uid={event_id})",
file=sys.stderr,
)
return {
"status": "CANCELLED",
"id": event_id,
"summary": event_summary,
"start": event_start,
}
except Exception as e:
print(
f" Error deleting event {event_id}: {e}",
file=sys.stderr,
)
return {
"status": "CANCEL_FAILED",
"id": event_id,
"summary": event_summary,
"error": str(e),
}
return None
except Exception as e:
print(f" Error searching for event to cancel: {e}", file=sys.stderr)
return None
def _find_event_by_summary_and_time(summary, start_iso, tolerance_minutes=30):
"""Find a calendar event matching summary (partial, case-insensitive) and start time.
Returns the event dict (Google-compatible format), or None.
"""
try:
calendar = get_calendar_service()
start_dt = datetime.fromisoformat(start_iso)
window_start = start_dt - timedelta(minutes=tolerance_minutes)
window_end = start_dt + timedelta(minutes=tolerance_minutes)
events = calendar.date_search(start=window_start, end=window_end)
search = (summary or "").lower().strip()
for event_obj in events:
d = _event_to_dict(event_obj)
existing = (d.get("summary", "") or "").lower().strip()
if search and existing and (search in existing or existing in search):
return d
return None
except Exception as e:
print(f" [Handler] Error finding event: {e}", file=sys.stderr)
return None
def _update_event_description(event_id, extra_text):
"""Append text to an event's description field."""
calendar = get_calendar_service()
# Search for the event by UID
events = calendar.events()
for event_obj in events:
d = _event_to_dict(event_obj)
if d.get("id") == event_id:
try:
ics_data = event_obj.data
cal = iCalendar.from_ical(ics_data)
for component in cal.walk():
if component.name == "VEVENT":
existing_desc = str(component.get("description", "")) or ""
component["description"] = f"{existing_desc}\n{extra_text}".strip()
event_obj.data = cal.to_ical().decode("utf-8")
event_obj.save()
return
except Exception as e:
print(f" [CalDAV] Error updating event: {e}", file=sys.stderr)
return
def _delete_event(event_id):
"""Delete a calendar event by UID."""
calendar = get_calendar_service()
events = calendar.events()
for event_obj in events:
d = _event_to_dict(event_obj)
if d.get("id") == event_id:
event_obj.delete()
return
def cancel_recurring_instance(event_id, instance_date, dry_run=False):
"""Cancel a single instance of a recurring event.
Creates an EXDATE exception on the master recurring event that excludes
the specified instance date.
Args:
event_id: UID of the master recurring event.
instance_date: ISO 8601 date string (YYYY-MM-DD).
dry_run: If True, return what would happen without executing.
Returns:
Dict with status and details.
"""
calendar = get_calendar_service()
events = calendar.events()
target_event = None
for event_obj in events:
d = _event_to_dict(event_obj)
if d.get("id") == event_id:
target_event = event_obj
break
if target_event is None:
return {"status": "error", "message": f"Could not find event {event_id}"}
d = _event_to_dict(target_event)
summary = d.get("summary", "")
start = d["start"].get("dateTime", d["start"].get("date", ""))
if dry_run:
return {
"status": "DRY_RUN",
"action": "cancel_instance",
"summary": summary,
"start": start,
}
# Add EXDATE for the instance date
try:
ics_data = target_event.data
cal = iCalendar.from_ical(ics_data)
for component in cal.walk():
if component.name == "VEVENT":
# Add EXDATE
ex_date = datetime.strptime(instance_date, "%Y-%m-%d")
ex_date = ex_date.replace(tzinfo=CHICAGO_TZ)
if "EXDATE" in component:
existing = component["EXDATE"]
if isinstance(existing, list):
existing.append(ex_date)
else:
component["EXDATE"] = [existing, ex_date]
else:
component.add("exdate", ex_date)
target_event.data = cal.to_ical().decode("utf-8")
target_event.save()
return {
"status": "INSTANCE_CANCELLED",
"summary": summary,
"start": start,
"event_id": event_id,
}
except Exception as e:
return {"status": "error", "message": f"Failed to cancel instance: {e}"}
def cancel_recurring_series(event_id, dry_run=False):
"""Delete all remaining instances of a recurring event series.
Deletes the master recurring event, which removes all current and
future instances.
Args:
event_id: UID of the master recurring event.
dry_run: If True, return what would happen without executing.
Returns:
Dict with status and details.
"""
calendar = get_calendar_service()
events = calendar.events()
target_event = None
for event_obj in events:
d = _event_to_dict(event_obj)
if d.get("id") == event_id:
target_event = event_obj
break
if target_event is None:
return {"status": "error", "message": f"Could not find event {event_id}"}
d = _event_to_dict(target_event)
summary = d.get("summary", "")
start = d["start"].get("dateTime", d["start"].get("date", ""))
if dry_run:
return {
"status": "DRY_RUN",
"action": "cancel_series",
"summary": summary,
"master_start": start,
}
try:
target_event.delete()
return {
"status": "SERIES_CANCELLED",
"summary": summary,
"master_id": event_id,
}
except Exception as e:
return {"status": "error", "message": f"Failed to cancel series: {e}"}
def find_recurring_instance(summary, instance_date):
"""Find a specific instance of a recurring event by summary and date.
Args:
summary: Event summary to search for (fuzzy match).
instance_date: ISO 8601 date string (YYYY-MM-DD) of the target instance.
Returns:
Event dict (Google-compatible format) for that instance, or None.
"""
calendar = get_calendar_service()
now = datetime.now(timezone.utc)
time_max = now + timedelta(days=180)
try:
events = calendar.date_search(start=now, end=time_max)
except Exception:
events = calendar.events()
target_date = datetime.strptime(instance_date, "%Y-%m-%d").date()
summary_lower = summary.lower().strip()
candidates = []
for event_obj in events:
d = _event_to_dict(event_obj)
event_summary = (d.get("summary", "") or "").lower().strip()
# Check summary match (exact or partial)
if summary_lower not in event_summary and event_summary not in summary_lower:
base = event_summary.split(" (")[0]
if summary_lower not in base and base not in summary_lower:
continue
# Check date match
start_str = d["start"].get("dateTime", d["start"].get("date", ""))
if not start_str:
continue
try:
if "T" in start_str:
event_date = datetime.fromisoformat(start_str).date()
else:
event_date = datetime.strptime(start_str, "%Y-%m-%d").date()
except (ValueError, TypeError):
continue
if event_date == target_date:
candidates.append(d)
if len(candidates) == 1:
return candidates[0]
elif len(candidates) > 1:
return candidates[0] # Return first match
return None