"""CalendarValidator — Read-only calendar validation for extracted events. Queries Google Calendar using gog CLI, performs fuzzy matching on event titles, and returns MATCH | NO_MATCH | CONFLICT status. """ import json import logging import subprocess from dataclasses import dataclass from datetime import datetime, timedelta from difflib import SequenceMatcher from typing import Dict, Any, List, Optional @dataclass class CalendarCheckResult: """Result of calendar validation check.""" status: str # "MATCH" | "NO_MATCH" | "CONFLICT" event_id: Optional[str] = None event_title: Optional[str] = None event_start: Optional[str] = None event_end: Optional[str] = None fuzzy_score: float = 0.0 conflict_description: Optional[str] = None matched_event: Optional[Dict[str, Any]] = None class CalendarValidator: """Read-only calendar validation for extracted events. Uses gog CLI to query Google Calendar events and performs fuzzy matching to determine if an extracted event already exists in the calendar. """ # Fuzzy matching thresholds STRONG_MATCH_THRESHOLD = 0.7 # ≥ 0.7: Same event POSSIBLE_MATCH_THRESHOLD = 0.4 # 0.4-0.7: Check time/location def __init__( self, calendar_id: str = "hoffmann.family.manager@gmail.com", account: str = "family-calendar-sync@hoffmann-family-manager.iam.gserviceaccount.com", ): self.calendar_id = calendar_id self.account = account self.logger = logging.getLogger(__name__) def check_event(self, extracted_event: Dict[str, Any]) -> CalendarCheckResult: """Check if an extracted event exists in the calendar. Args: extracted_event: Dict with 'what' (title), 'when' (time), optionally 'where' (location) Returns: CalendarCheckResult with status MATCH | NO_MATCH | CONFLICT """ extracted_title = extracted_event.get("what", "") extracted_when = extracted_event.get("when", "") extracted_where = extracted_event.get("where", "") if not extracted_title: self.logger.warning("[CalendarValidator] No title in extracted event") return CalendarCheckResult(status="NO_MATCH") # Parse date range from extracted_when date_start, date_end = self._parse_date_range(extracted_when) if not date_start: self.logger.warning("[CalendarValidator] Could not parse date from: %s", extracted_when) return CalendarCheckResult(status="NO_MATCH") # Query calendar for events in date range try: calendar_events = self._query_calendar(date_start, date_end) except Exception as e: self.logger.error("[CalendarValidator] Calendar query failed: %s", e) return CalendarCheckResult(status="NO_MATCH") if not calendar_events: self.logger.info("[CalendarValidator] No events found in range %s to %s", date_start, date_end) return CalendarCheckResult(status="NO_MATCH") # Find best fuzzy match best_match = None best_score = 0.0 for event in calendar_events: event_title = event.get("summary", "") if not event_title: continue score = self._fuzzy_match_score(extracted_event, event_title) self.logger.debug("[CalendarValidator] Fuzzy match: '%s' vs '%s' = %.2f", extracted_title, event_title, score) if score > best_score: best_score = score best_match = event # Determine status based on fuzzy score if best_score >= self.STRONG_MATCH_THRESHOLD: # Strong match — likely same event self.logger.info("[CalendarValidator] MATCH: '%s' matches '%s' (score: %.2f)", extracted_title, best_match.get("summary"), best_score) return CalendarCheckResult( status="MATCH", event_id=best_match.get("id"), event_title=best_match.get("summary"), event_start=self._extract_datetime(best_match.get("start")), event_end=self._extract_datetime(best_match.get("end")), fuzzy_score=best_score, matched_event=best_match, ) elif best_score >= self.POSSIBLE_MATCH_THRESHOLD: # Possible match — check for conflicts conflict_desc = self._check_conflict( extracted_event, best_match, best_score ) if conflict_desc: self.logger.info("[CalendarValidator] CONFLICT: '%s' vs '%s' (score: %.2f) — %s", extracted_title, best_match.get("summary"), best_score, conflict_desc) return CalendarCheckResult( status="CONFLICT", event_id=best_match.get("id"), event_title=best_match.get("summary"), event_start=self._extract_datetime(best_match.get("start")), event_end=self._extract_datetime(best_match.get("end")), fuzzy_score=best_score, conflict_description=conflict_desc, matched_event=best_match, ) else: # No conflict detected — treat as match self.logger.info("[CalendarValidator] MATCH (possible): '%s' matches '%s' (score: %.2f)", extracted_title, best_match.get("summary"), best_score) return CalendarCheckResult( status="MATCH", event_id=best_match.get("id"), event_title=best_match.get("summary"), event_start=self._extract_datetime(best_match.get("start")), event_end=self._extract_datetime(best_match.get("end")), fuzzy_score=best_score, matched_event=best_match, ) else: # No match found self.logger.info("[CalendarValidator] NO_MATCH: '%s' not found in calendar (best score: %.2f)", extracted_title, best_score) return CalendarCheckResult( status="NO_MATCH", fuzzy_score=best_score, ) def _fuzzy_match_score(self, extracted_event: Dict[str, Any], calendar_title: str) -> float: """Calculate fuzzy string matching score (0.0-1.0). Uses difflib.SequenceMatcher for fuzzy matching. """ if not extracted_event or not calendar_title: return 0.0 extracted_title = extracted_event.get("what", "").lower().strip() # Normalize: lowercase, strip whitespace extracted = extracted_title.lower().strip() calendar = calendar_title.lower().strip() # Use SequenceMatcher for fuzzy matching score = SequenceMatcher(None, extracted, calendar).ratio() # Bonus: substring match (e.g., "soccer practice" matches "Sullivan Soccer Practice") if extracted in calendar or calendar in extracted: score = max(score, 0.6) # Bonus: location/venue overlap extracted_where = extracted_event.get("where", "").lower() calendar_location = (matched_event.get("location", "") if 'matched_event' in dir() else "").lower() if extracted_where and calendar_location: # Extract city/venue from location import re # Simple city extraction - look for city names city_pattern = r"([a-z]+),?\s*[a-z]{0,2}\s*\d{0,5}" extracted_city = re.search(city_pattern, extracted_where) calendar_city = re.search(city_pattern, calendar_location) if extracted_city and calendar_city: if extracted_city.group(1) == calendar_city.group(1): score = max(score, 0.55) # Boost for location match # Venue name match venue_pattern = r"([a-z]+(?:\s+[a-z]+){0,2})" extracted_venue = re.search(venue_pattern, extracted_where) calendar_venue = re.search(venue_pattern, calendar_location) if extracted_venue and calendar_venue: venue_score = SequenceMatcher(None, extracted_venue.group(1), calendar_venue.group(1)).ratio() if venue_score > 0.7: score = max(score, 0.55) # Boost for venue match # Bonus: keyword overlap for common events keywords = ["appointment", "practice", "game", "school", "doctor", "dentist", "haircut", "oil change"] extracted_keywords = [k for k in keywords if k in extracted] calendar_keywords = [k for k in keywords if k in calendar] if extracted_keywords and calendar_keywords: if set(extracted_keywords) & set(calendar_keywords): # Any overlap score = max(score, 0.5) # Boost for keyword match return round(score, 3) def _query_calendar(self, date_start: str, date_end: str) -> List[Dict[str, Any]]: """Execute gog calendar events query. Args: date_start: ISO 8601 start time date_end: ISO 8601 end time Returns: List of calendar events Raises: subprocess.CalledProcessError: If gog CLI fails json.JSONDecodeError: If JSON parsing fails """ cmd = [ "gog", "calendar", "events", self.calendar_id, "--from", date_start, "--to", date_end, "--account", self.account, "--json", ] self.logger.debug("[CalendarValidator] Query: %s", " ".join(cmd)) result = subprocess.run( cmd, capture_output=True, text=True, check=True, ) data = json.loads(result.stdout) events = data.get("events", []) self.logger.debug("[CalendarValidator] Found %d events", len(events)) return events def _parse_date_range(self, when_str: str) -> tuple: """Parse date/time string into ISO 8601 date range. Handles various formats: - "today", "tomorrow" - "Monday", "Tuesday", etc. - "5/3", "05/03" - "May 3" Returns: (date_start, date_end) as ISO 8601 strings, or (None, None) if parsing fails """ if not when_str: return None, None when_lower = when_str.lower().strip() now = datetime.now() # Handle relative dates if when_lower == "today" or when_lower.startswith("today"): target_date = now.date() elif when_lower == "tomorrow" or when_lower.startswith("tomorrow"): target_date = now.date() + timedelta(days=1) else: # Try to parse various date formats target_date = self._parse_date_string(when_str) if not target_date: return None, None # Create full day range in ISO 8601 format # Use UTC for consistency, gog CLI handles timezone conversion date_start = f"{target_date.isoformat()}T00:00:00Z" date_end = f"{target_date.isoformat()}T23:59:59Z" return date_start, date_end def _parse_date_string(self, date_str: str) -> Optional[datetime.date]: """Parse various date string formats into a date object.""" import re now = datetime.now() # Try to parse "May 3" or "may 3" month_pattern = r"(january|february|march|april|may|june|july|august|september|october|november|december)\s+(\d{1,2})" match = re.search(month_pattern, date_str.lower()) if match: month_names = [ "january", "february", "march", "april", "may", "june", "july", "august", "september", "october", "november", "december" ] month = month_names.index(match.group(1)) + 1 day = int(match.group(2)) return datetime(now.year, month, day).date() # Try to parse "5/3" or "05/03" date_pattern = r"(\d{1,2})[/-](\d{1,2})" match = re.search(date_pattern, date_str) if match: month = int(match.group(1)) day = int(match.group(2)) if 1 <= month <= 12 and 1 <= day <= 31: return datetime(now.year, month, day).date() # Try to parse day of week days = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] for i, day_name in enumerate(days): if day_name in date_str.lower(): # Calculate days until that day current_weekday = now.weekday() # Monday = 0 target_weekday = i days_ahead = (target_weekday - current_weekday) % 7 if days_ahead == 0: days_ahead = 7 # Next week if today return (now + timedelta(days=days_ahead)).date() return None def _extract_datetime(self, time_obj: Optional[Dict[str, Any]]) -> Optional[str]: """Extract datetime string from calendar event time object.""" if not time_obj: return None # Prefer dateTime, fall back to date return time_obj.get("dateTime") or time_obj.get("date") def _check_conflict( self, extracted_event: Dict[str, Any], calendar_event: Dict[str, Any], fuzzy_score: float, ) -> Optional[str]: """Check for conflicts between extracted and calendar event. Returns conflict description if conflict detected, None otherwise. """ extracted_title = extracted_event.get("what", "").lower() calendar_title = calendar_event.get("summary", "").lower() # Check for significant title differences despite fuzzy match # (e.g., "Soccer Practice" vs "Dentist Appointment") if fuzzy_score < 0.6: # Check if they're completely different event types event_types = ["soccer", "dentist", "doctor", "practice", "game", "school"] extracted_types = [t for t in event_types if t in extracted_title] calendar_types = [t for t in event_types if t in calendar_title] if extracted_types and calendar_types and extracted_types != calendar_types: return f"Different event types: message mentions '{extracted_types[0]}' but calendar has '{calendar_types[0]}'" # NEW: Check for time overlap conflicts # If extracted event has a time and calendar event has a time, # check if they overlap extracted_when = extracted_event.get("when", "") if self._extracted_time_overlaps(extracted_when, calendar_event): return f"Time conflict: You have '{calendar_title}' scheduled during this time" return None def _extracted_time_overlaps(self, extracted_when: str, calendar_event: Dict[str, Any]) -> bool: """Check if extracted time overlaps with calendar event time. Simple check: if extracted mentions a time and calendar event starts at or around that time, it's a conflict. """ import re # Extract time from when string (e.g., "0800", "8:00am", "8 AM") time_patterns = [ r"(\d{1,2}):(\d{2})\s*(am|pm)?", # 8:00, 8:00am r"(\d{1,2})\s*(am|pm)", # 8am, 8 pm r"(0\d{3}|1\d{3}|2[0-3]\d{2})", # 0800, 1430 (military) ] extracted_hour = None extracted_minute = 0 for pattern in time_patterns: match = re.search(pattern, extracted_when.lower()) if match: groups = match.groups() if len(groups) == 1: # Military time: 0800 military = groups[0] extracted_hour = int(military[:2]) extracted_minute = int(military[2:]) elif len(groups) >= 2: hour = int(groups[0]) minute = int(groups[1]) if groups[1].isdigit() else 0 ampm = groups[2].lower() if len(groups) > 2 and groups[2] else None if ampm == "pm" and hour != 12: hour += 12 elif ampm == "am" and hour == 12: hour = 0 extracted_hour = hour extracted_minute = minute break if extracted_hour is None: return False # Get calendar event start time event_start = calendar_event.get("start", {}) if "dateTime" in event_start: # Parse ISO datetime from datetime import datetime dt_str = event_start["dateTime"] # Handle timezone offset if "-" in dt_str[10:]: dt_str = dt_str[:dt_str.rfind("-")] elif "+" in dt_str[10:]: dt_str = dt_str[:dt_str.rfind("+")] try: event_dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00")) event_hour = event_dt.hour event_minute = event_dt.minute # Check if within 1 hour of each other extracted_total_minutes = extracted_hour * 60 + extracted_minute event_total_minutes = event_hour * 60 + event_minute time_diff = abs(extracted_total_minutes - event_total_minutes) return time_diff <= 60 # Within 1 hour = overlap except: return False return False def create_event( self, summary: str, description: str = "", location: str = "", start_time: Optional[str] = None, end_time: Optional[str] = None, extracted_when: str = "", ) -> Dict[str, Any]: """Create a calendar event using the gog CLI. Args: summary: Event title/summary description: Optional description location: Optional location start_time: ISO 8601 start time. If None, computed from extracted_when. end_time: ISO 8601 end time. If None, defaults to start_time + 1 hour. extracted_when: Raw extracted "when" string for parsing start time. Returns: Dict with status, event_id, event_url, or error details. """ # Resolve start time if not start_time: start_time = self._resolve_start_time(extracted_when, summary) if not start_time: return { "status": "error", "error": "Could not determine event start time. Provide a specific date/time." } # Default end = start + 1 hour if not end_time: from datetime import datetime, timedelta try: dt = datetime.fromisoformat(start_time.replace("Z", "+00:00")) end_dt = dt + timedelta(hours=1) end_time = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception: end_time = start_time # Build gog command cmd = [ "gog", "calendar", "create", self.calendar_id, "--summary", summary, "--from", start_time, "--to", end_time, "--account", self.account, "--json", ] if description: cmd.extend(["--description", description]) if location: cmd.extend(["--location", location]) self.logger.info("[CalendarValidator] Creating event: %s at %s", summary, start_time) self.logger.debug("[CalendarValidator] Command: %s", " ".join(cmd)) try: result = subprocess.run( cmd, capture_output=True, text=True, check=True, ) data = json.loads(result.stdout) event = data.get("event", {}) self.logger.info( "[CalendarValidator] Event created: id=%s title=%s", event.get("id"), event.get("summary") ) return { "status": "created", "event_id": event.get("id"), "event_summary": event.get("summary"), "event_start": event.get("start"), "event_end": event.get("end"), "event_url": event.get("htmlLink", ""), } except subprocess.CalledProcessError as e: error_msg = e.stderr or str(e) self.logger.error("[CalendarValidator] Failed to create event: %s", error_msg) return { "status": "error", "error": error_msg, "stdout": e.stdout, } except json.JSONDecodeError as e: self.logger.error("[CalendarValidator] Failed to parse gog output: %s", e) return { "status": "error", "error": f"Failed to parse gog output: {e}", } def _resolve_start_time(self, extracted_when: str, summary: str) -> Optional[str]: """Resolve a friendly "when" string into an ISO 8601 start time. Handles: - "today", "tomorrow" - "Monday", "Tuesday", etc. (next occurrence) - "May 3" or "may 3" - "5/3" or "05/03" - time suffixes: "at 3pm", "at 0800" """ import re from datetime import datetime, timezone now = datetime.now() when_lower = extracted_when.lower().strip() # Determine target date target_date = None if when_lower == "today" or when_lower.startswith("today"): target_date = now.date() elif when_lower == "tomorrow" or when_lower.startswith("tomorrow"): target_date = now.date() + timedelta(days=1) else: # Try day of week days = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] for i, day_name in enumerate(days): if day_name in when_lower: current_weekday = now.weekday() days_ahead = (i - current_weekday) % 7 if days_ahead == 0: days_ahead = 7 target_date = (now + timedelta(days=days_ahead)).date() break if target_date is None: # Try "May 3" format month_pattern = r"(january|february|march|april|may|june|july|august|september|october|november|december)\s+(\d{1,2})" match = re.search(month_pattern, when_lower) if match: month_names = [ "january", "february", "march", "april", "may", "june", "july", "august", "september", "october", "november", "december" ] month = month_names.index(match.group(1)) + 1 day = int(match.group(2)) target_date = datetime(now.year, month, day).date() if target_date is None: # Try "5/3" format date_pattern = r"(\d{1,2})[/-](\d{1,2})" match = re.search(date_pattern, when_lower) if match: month = int(match.group(1)) day = int(match.group(2)) if 1 <= month <= 12 and 1 <= day <= 31: target_date = datetime(now.year, month, day).date() if target_date is None: return None # Extract time if present hour = 9 # Default: 9 AM minute = 0 time_patterns = [ r"(\d{1,2}):(\d{2})\s*(am|pm)?", # 8:00, 8:00am r"(\d{1,2})\s*(am|pm)", # 8am, 8 pm r"at\s+(0\d{3}|1\d{3}|2[0-3]\d{2})", # at 0800, at 1430 ] for pattern in time_patterns: match = re.search(pattern, when_lower) if match: groups = match.groups() if len(groups) == 1: # Military time: 0800 military = groups[0] hour = int(military[:2]) minute = int(military[2:]) elif len(groups) >= 2: h = int(groups[0]) m = int(groups[1]) if groups[1].isdigit() else 0 ampm = groups[2].lower() if len(groups) > 2 and groups[2] else None if ampm == "pm" and h != 12: h += 12 elif ampm == "am" and h == 12: h = 0 hour = h minute = m break # Build ISO 8601 string from datetime import timezone as dt_tz dt = datetime( target_date.year, target_date.month, target_date.day, hour, minute, tzinfo=now.astimezone().tzinfo or dt_tz.utc ) return dt.strftime("%Y-%m-%dT%H:%M:%SZ") def create_default_validator() -> CalendarValidator: """Create a CalendarValidator with default settings.""" return CalendarValidator()