"""CalendarValidator — Read-only calendar validation for extracted events.
Queries Google Calendar using gog CLI, performs fuzzy matching on event titles,
and returns MATCH | NO_MATCH | CONFLICT status.
"""
import json
import logging
import subprocess
from dataclasses import dataclass
from datetime import datetime, timedelta
from difflib import SequenceMatcher
from typing import Dict, Any, List, Optional
@dataclass
class CalendarCheckResult:
"""Result of calendar validation check."""
status: str # "MATCH" | "NO_MATCH" | "CONFLICT"
event_id: Optional[str] = None
event_title: Optional[str] = None
event_start: Optional[str] = None
event_end: Optional[str] = None
fuzzy_score: float = 0.0
conflict_description: Optional[str] = None
matched_event: Optional[Dict[str, Any]] = None
class CalendarValidator:
"""Read-only calendar validation for extracted events.
Uses gog CLI to query Google Calendar events and performs fuzzy matching
to determine if an extracted event already exists in the calendar.
"""
# Fuzzy matching thresholds
STRONG_MATCH_THRESHOLD = 0.7 # ≥ 0.7: Same event
POSSIBLE_MATCH_THRESHOLD = 0.4 # 0.4-0.7: Check time/location
def __init__(
self,
calendar_id: str = "hoffmann.family.manager@gmail.com",
account: str = "family-calendar-sync@hoffmann-family-manager.iam.gserviceaccount.com",
):
self.calendar_id = calendar_id
self.account = account
self.logger = logging.getLogger(__name__)
def check_event(self, extracted_event: Dict[str, Any]) -> CalendarCheckResult:
"""Check if an extracted event exists in the calendar.
Args:
extracted_event: Dict with 'what' (title), 'when' (time),
optionally 'where' (location)
Returns:
CalendarCheckResult with status MATCH | NO_MATCH | CONFLICT
"""
extracted_title = extracted_event.get("what", "")
extracted_when = extracted_event.get("when", "")
extracted_where = extracted_event.get("where", "")
if not extracted_title:
self.logger.warning("[CalendarValidator] No title in extracted event")
return CalendarCheckResult(status="NO_MATCH")
# Parse date range from extracted_when
date_start, date_end = self._parse_date_range(extracted_when)
if not date_start:
self.logger.warning("[CalendarValidator] Could not parse date from: %s", extracted_when)
return CalendarCheckResult(status="NO_MATCH")
# Query calendar for events in date range
try:
calendar_events = self._query_calendar(date_start, date_end)
except Exception as e:
self.logger.error("[CalendarValidator] Calendar query failed: %s", e)
return CalendarCheckResult(status="NO_MATCH")
if not calendar_events:
self.logger.info("[CalendarValidator] No events found in range %s to %s", date_start, date_end)
return CalendarCheckResult(status="NO_MATCH")
# Find best fuzzy match
best_match = None
best_score = 0.0
for event in calendar_events:
event_title = event.get("summary", "")
if not event_title:
continue
score = self._fuzzy_match_score(extracted_event, event_title)
self.logger.debug("[CalendarValidator] Fuzzy match: '%s' vs '%s' = %.2f",
extracted_title, event_title, score)
if score > best_score:
best_score = score
best_match = event
# Determine status based on fuzzy score
if best_score >= self.STRONG_MATCH_THRESHOLD:
# Strong match — likely same event
self.logger.info("[CalendarValidator] MATCH: '%s' matches '%s' (score: %.2f)",
extracted_title, best_match.get("summary"), best_score)
return CalendarCheckResult(
status="MATCH",
event_id=best_match.get("id"),
event_title=best_match.get("summary"),
event_start=self._extract_datetime(best_match.get("start")),
event_end=self._extract_datetime(best_match.get("end")),
fuzzy_score=best_score,
matched_event=best_match,
)
elif best_score >= self.POSSIBLE_MATCH_THRESHOLD:
# Possible match — check for conflicts
conflict_desc = self._check_conflict(
extracted_event, best_match, best_score
)
if conflict_desc:
self.logger.info("[CalendarValidator] CONFLICT: '%s' vs '%s' (score: %.2f) — %s",
extracted_title, best_match.get("summary"), best_score, conflict_desc)
return CalendarCheckResult(
status="CONFLICT",
event_id=best_match.get("id"),
event_title=best_match.get("summary"),
event_start=self._extract_datetime(best_match.get("start")),
event_end=self._extract_datetime(best_match.get("end")),
fuzzy_score=best_score,
conflict_description=conflict_desc,
matched_event=best_match,
)
else:
# No conflict detected — treat as match
self.logger.info("[CalendarValidator] MATCH (possible): '%s' matches '%s' (score: %.2f)",
extracted_title, best_match.get("summary"), best_score)
return CalendarCheckResult(
status="MATCH",
event_id=best_match.get("id"),
event_title=best_match.get("summary"),
event_start=self._extract_datetime(best_match.get("start")),
event_end=self._extract_datetime(best_match.get("end")),
fuzzy_score=best_score,
matched_event=best_match,
)
else:
# No match found
self.logger.info("[CalendarValidator] NO_MATCH: '%s' not found in calendar (best score: %.2f)",
extracted_title, best_score)
return CalendarCheckResult(
status="NO_MATCH",
fuzzy_score=best_score,
)
def _fuzzy_match_score(self, extracted_event: Dict[str, Any], calendar_title: str) -> float:
"""Calculate fuzzy string matching score (0.0-1.0).
Uses difflib.SequenceMatcher for fuzzy matching.
"""
if not extracted_event or not calendar_title:
return 0.0
extracted_title = extracted_event.get("what", "").lower().strip()
# Normalize: lowercase, strip whitespace
extracted = extracted_title.lower().strip()
calendar = calendar_title.lower().strip()
# Use SequenceMatcher for fuzzy matching
score = SequenceMatcher(None, extracted, calendar).ratio()
# Bonus: substring match (e.g., "soccer practice" matches "Sullivan Soccer Practice")
if extracted in calendar or calendar in extracted:
score = max(score, 0.6)
# Bonus: location/venue overlap
extracted_where = extracted_event.get("where", "").lower()
calendar_location = (matched_event.get("location", "") if 'matched_event' in dir() else "").lower()
if extracted_where and calendar_location:
# Extract city/venue from location
import re
# Simple city extraction - look for city names
city_pattern = r"([a-z]+),?\s*[a-z]{0,2}\s*\d{0,5}"
extracted_city = re.search(city_pattern, extracted_where)
calendar_city = re.search(city_pattern, calendar_location)
if extracted_city and calendar_city:
if extracted_city.group(1) == calendar_city.group(1):
score = max(score, 0.55) # Boost for location match
# Venue name match
venue_pattern = r"([a-z]+(?:\s+[a-z]+){0,2})"
extracted_venue = re.search(venue_pattern, extracted_where)
calendar_venue = re.search(venue_pattern, calendar_location)
if extracted_venue and calendar_venue:
venue_score = SequenceMatcher(None, extracted_venue.group(1), calendar_venue.group(1)).ratio()
if venue_score > 0.7:
score = max(score, 0.55) # Boost for venue match
# Bonus: keyword overlap for common events
keywords = ["appointment", "practice", "game", "school", "doctor", "dentist", "haircut", "oil change"]
extracted_keywords = [k for k in keywords if k in extracted]
calendar_keywords = [k for k in keywords if k in calendar]
if extracted_keywords and calendar_keywords:
if set(extracted_keywords) & set(calendar_keywords): # Any overlap
score = max(score, 0.5) # Boost for keyword match
return round(score, 3)
def _query_calendar(self, date_start: str, date_end: str) -> List[Dict[str, Any]]:
"""Execute gog calendar events query.
Args:
date_start: ISO 8601 start time
date_end: ISO 8601 end time
Returns:
List of calendar events
Raises:
subprocess.CalledProcessError: If gog CLI fails
json.JSONDecodeError: If JSON parsing fails
"""
cmd = [
"gog", "calendar", "events", self.calendar_id,
"--from", date_start,
"--to", date_end,
"--account", self.account,
"--json",
]
self.logger.debug("[CalendarValidator] Query: %s", " ".join(cmd))
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
)
data = json.loads(result.stdout)
events = data.get("events", [])
self.logger.debug("[CalendarValidator] Found %d events", len(events))
return events
def _parse_date_range(self, when_str: str) -> tuple:
"""Parse date/time string into ISO 8601 date range.
Handles various formats:
- "today", "tomorrow"
- "Monday", "Tuesday", etc.
- "5/3", "05/03"
- "May 3"
Returns:
(date_start, date_end) as ISO 8601 strings, or (None, None) if parsing fails
"""
if not when_str:
return None, None
when_lower = when_str.lower().strip()
now = datetime.now()
# Handle relative dates
if when_lower == "today" or when_lower.startswith("today"):
target_date = now.date()
elif when_lower == "tomorrow" or when_lower.startswith("tomorrow"):
target_date = now.date() + timedelta(days=1)
else:
# Try to parse various date formats
target_date = self._parse_date_string(when_str)
if not target_date:
return None, None
# Create full day range in ISO 8601 format
# Use UTC for consistency, gog CLI handles timezone conversion
date_start = f"{target_date.isoformat()}T00:00:00Z"
date_end = f"{target_date.isoformat()}T23:59:59Z"
return date_start, date_end
def _parse_date_string(self, date_str: str) -> Optional[datetime.date]:
"""Parse various date string formats into a date object."""
import re
now = datetime.now()
# Try to parse "May 3" or "may 3"
month_pattern = r"(january|february|march|april|may|june|july|august|september|october|november|december)\s+(\d{1,2})"
match = re.search(month_pattern, date_str.lower())
if match:
month_names = [
"january", "february", "march", "april", "may", "june",
"july", "august", "september", "october", "november", "december"
]
month = month_names.index(match.group(1)) + 1
day = int(match.group(2))
return datetime(now.year, month, day).date()
# Try to parse "5/3" or "05/03"
date_pattern = r"(\d{1,2})[/-](\d{1,2})"
match = re.search(date_pattern, date_str)
if match:
month = int(match.group(1))
day = int(match.group(2))
if 1 <= month <= 12 and 1 <= day <= 31:
return datetime(now.year, month, day).date()
# Try to parse day of week
days = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
for i, day_name in enumerate(days):
if day_name in date_str.lower():
# Calculate days until that day
current_weekday = now.weekday() # Monday = 0
target_weekday = i
days_ahead = (target_weekday - current_weekday) % 7
if days_ahead == 0:
days_ahead = 7 # Next week if today
return (now + timedelta(days=days_ahead)).date()
return None
def _extract_datetime(self, time_obj: Optional[Dict[str, Any]]) -> Optional[str]:
"""Extract datetime string from calendar event time object."""
if not time_obj:
return None
# Prefer dateTime, fall back to date
return time_obj.get("dateTime") or time_obj.get("date")
def _check_conflict(
self,
extracted_event: Dict[str, Any],
calendar_event: Dict[str, Any],
fuzzy_score: float,
) -> Optional[str]:
"""Check for conflicts between extracted and calendar event.
Returns conflict description if conflict detected, None otherwise.
"""
extracted_title = extracted_event.get("what", "").lower()
calendar_title = calendar_event.get("summary", "").lower()
# Check for significant title differences despite fuzzy match
# (e.g., "Soccer Practice" vs "Dentist Appointment")
if fuzzy_score < 0.6:
# Check if they're completely different event types
event_types = ["soccer", "dentist", "doctor", "practice", "game", "school"]
extracted_types = [t for t in event_types if t in extracted_title]
calendar_types = [t for t in event_types if t in calendar_title]
if extracted_types and calendar_types and extracted_types != calendar_types:
return f"Different event types: message mentions '{extracted_types[0]}' but calendar has '{calendar_types[0]}'"
# NEW: Check for time overlap conflicts
# If extracted event has a time and calendar event has a time,
# check if they overlap
extracted_when = extracted_event.get("when", "")
if self._extracted_time_overlaps(extracted_when, calendar_event):
return f"Time conflict: You have '{calendar_title}' scheduled during this time"
return None
def _extracted_time_overlaps(self, extracted_when: str, calendar_event: Dict[str, Any]) -> bool:
"""Check if extracted time overlaps with calendar event time.
Simple check: if extracted mentions a time and calendar event
starts at or around that time, it's a conflict.
"""
import re
# Extract time from when string (e.g., "0800", "8:00am", "8 AM")
time_patterns = [
r"(\d{1,2}):(\d{2})\s*(am|pm)?", # 8:00, 8:00am
r"(\d{1,2})\s*(am|pm)", # 8am, 8 pm
r"(0\d{3}|1\d{3}|2[0-3]\d{2})", # 0800, 1430 (military)
]
extracted_hour = None
extracted_minute = 0
for pattern in time_patterns:
match = re.search(pattern, extracted_when.lower())
if match:
groups = match.groups()
if len(groups) == 1:
# Military time: 0800
military = groups[0]
extracted_hour = int(military[:2])
extracted_minute = int(military[2:])
elif len(groups) >= 2:
hour = int(groups[0])
minute = int(groups[1]) if groups[1].isdigit() else 0
ampm = groups[2].lower() if len(groups) > 2 and groups[2] else None
if ampm == "pm" and hour != 12:
hour += 12
elif ampm == "am" and hour == 12:
hour = 0
extracted_hour = hour
extracted_minute = minute
break
if extracted_hour is None:
return False
# Get calendar event start time
event_start = calendar_event.get("start", {})
if "dateTime" in event_start:
# Parse ISO datetime
from datetime import datetime
dt_str = event_start["dateTime"]
# Handle timezone offset
if "-" in dt_str[10:]:
dt_str = dt_str[:dt_str.rfind("-")]
elif "+" in dt_str[10:]:
dt_str = dt_str[:dt_str.rfind("+")]
try:
event_dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
event_hour = event_dt.hour
event_minute = event_dt.minute
# Check if within 1 hour of each other
extracted_total_minutes = extracted_hour * 60 + extracted_minute
event_total_minutes = event_hour * 60 + event_minute
time_diff = abs(extracted_total_minutes - event_total_minutes)
return time_diff <= 60 # Within 1 hour = overlap
except:
return False
return False
def create_event(
self,
summary: str,
description: str = "",
location: str = "",
start_time: Optional[str] = None,
end_time: Optional[str] = None,
extracted_when: str = "",
) -> Dict[str, Any]:
"""Create a calendar event using the gog CLI.
Args:
summary: Event title/summary
description: Optional description
location: Optional location
start_time: ISO 8601 start time. If None, computed from extracted_when.
end_time: ISO 8601 end time. If None, defaults to start_time + 1 hour.
extracted_when: Raw extracted "when" string for parsing start time.
Returns:
Dict with status, event_id, event_url, or error details.
"""
# Resolve start time
if not start_time:
start_time = self._resolve_start_time(extracted_when, summary)
if not start_time:
return {
"status": "error",
"error": "Could not determine event start time. Provide a specific date/time."
}
# Default end = start + 1 hour
if not end_time:
from datetime import datetime, timedelta
try:
dt = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
end_dt = dt + timedelta(hours=1)
end_time = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
except Exception:
end_time = start_time
# Build gog command
cmd = [
"gog", "calendar", "create", self.calendar_id,
"--summary", summary,
"--from", start_time,
"--to", end_time,
"--account", self.account,
"--json",
]
if description:
cmd.extend(["--description", description])
if location:
cmd.extend(["--location", location])
self.logger.info("[CalendarValidator] Creating event: %s at %s", summary, start_time)
self.logger.debug("[CalendarValidator] Command: %s", " ".join(cmd))
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
)
data = json.loads(result.stdout)
event = data.get("event", {})
self.logger.info(
"[CalendarValidator] Event created: id=%s title=%s",
event.get("id"), event.get("summary")
)
return {
"status": "created",
"event_id": event.get("id"),
"event_summary": event.get("summary"),
"event_start": event.get("start"),
"event_end": event.get("end"),
"event_url": event.get("htmlLink", ""),
}
except subprocess.CalledProcessError as e:
error_msg = e.stderr or str(e)
self.logger.error("[CalendarValidator] Failed to create event: %s", error_msg)
return {
"status": "error",
"error": error_msg,
"stdout": e.stdout,
}
except json.JSONDecodeError as e:
self.logger.error("[CalendarValidator] Failed to parse gog output: %s", e)
return {
"status": "error",
"error": f"Failed to parse gog output: {e}",
}
def _resolve_start_time(self, extracted_when: str, summary: str) -> Optional[str]:
"""Resolve a friendly "when" string into an ISO 8601 start time.
Handles:
- "today", "tomorrow"
- "Monday", "Tuesday", etc. (next occurrence)
- "May 3" or "may 3"
- "5/3" or "05/03"
- time suffixes: "at 3pm", "at 0800"
"""
import re
from datetime import datetime, timezone
now = datetime.now()
when_lower = extracted_when.lower().strip()
# Determine target date
target_date = None
if when_lower == "today" or when_lower.startswith("today"):
target_date = now.date()
elif when_lower == "tomorrow" or when_lower.startswith("tomorrow"):
target_date = now.date() + timedelta(days=1)
else:
# Try day of week
days = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
for i, day_name in enumerate(days):
if day_name in when_lower:
current_weekday = now.weekday()
days_ahead = (i - current_weekday) % 7
if days_ahead == 0:
days_ahead = 7
target_date = (now + timedelta(days=days_ahead)).date()
break
if target_date is None:
# Try "May 3" format
month_pattern = r"(january|february|march|april|may|june|july|august|september|october|november|december)\s+(\d{1,2})"
match = re.search(month_pattern, when_lower)
if match:
month_names = [
"january", "february", "march", "april", "may", "june",
"july", "august", "september", "october", "november", "december"
]
month = month_names.index(match.group(1)) + 1
day = int(match.group(2))
target_date = datetime(now.year, month, day).date()
if target_date is None:
# Try "5/3" format
date_pattern = r"(\d{1,2})[/-](\d{1,2})"
match = re.search(date_pattern, when_lower)
if match:
month = int(match.group(1))
day = int(match.group(2))
if 1 <= month <= 12 and 1 <= day <= 31:
target_date = datetime(now.year, month, day).date()
if target_date is None:
return None
# Extract time if present
hour = 9 # Default: 9 AM
minute = 0
time_patterns = [
r"(\d{1,2}):(\d{2})\s*(am|pm)?", # 8:00, 8:00am
r"(\d{1,2})\s*(am|pm)", # 8am, 8 pm
r"at\s+(0\d{3}|1\d{3}|2[0-3]\d{2})", # at 0800, at 1430
]
for pattern in time_patterns:
match = re.search(pattern, when_lower)
if match:
groups = match.groups()
if len(groups) == 1:
# Military time: 0800
military = groups[0]
hour = int(military[:2])
minute = int(military[2:])
elif len(groups) >= 2:
h = int(groups[0])
m = int(groups[1]) if groups[1].isdigit() else 0
ampm = groups[2].lower() if len(groups) > 2 and groups[2] else None
if ampm == "pm" and h != 12:
h += 12
elif ampm == "am" and h == 12:
h = 0
hour = h
minute = m
break
# Build ISO 8601 string
from datetime import timezone as dt_tz
dt = datetime(
target_date.year, target_date.month, target_date.day,
hour, minute, tzinfo=now.astimezone().tzinfo or dt_tz.utc
)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def create_default_validator() -> CalendarValidator:
"""Create a CalendarValidator with default settings."""
return CalendarValidator()