📄 calendar_validator.py 25,892 bytes Sunday 21:52 📋 Raw

"""CalendarValidator — Read-only calendar validation for extracted events.

Queries Google Calendar using gog CLI, performs fuzzy matching on event titles,
and returns MATCH | NO_MATCH | CONFLICT status.
"""

import json
import logging
import subprocess
from dataclasses import dataclass
from datetime import datetime, timedelta
from difflib import SequenceMatcher
from typing import Dict, Any, List, Optional

@dataclass
class CalendarCheckResult:
"""Result of calendar validation check."""
status: str # "MATCH" | "NO_MATCH" | "CONFLICT"
event_id: Optional[str] = None
event_title: Optional[str] = None
event_start: Optional[str] = None
event_end: Optional[str] = None
fuzzy_score: float = 0.0
conflict_description: Optional[str] = None
matched_event: Optional[Dict[str, Any]] = None

class CalendarValidator:
"""Read-only calendar validation for extracted events.

Uses gog CLI to query Google Calendar events and performs fuzzy matching
to determine if an extracted event already exists in the calendar.
"""

# Fuzzy matching thresholds
STRONG_MATCH_THRESHOLD = 0.7  # ≥ 0.7: Same event
POSSIBLE_MATCH_THRESHOLD = 0.4  # 0.4-0.7: Check time/location

def __init__(
    self,
    calendar_id: str = "hoffmann.family.manager@gmail.com",
    account: str = "family-calendar-sync@hoffmann-family-manager.iam.gserviceaccount.com",
):
    self.calendar_id = calendar_id
    self.account = account
    self.logger = logging.getLogger(__name__)

def check_event(self, extracted_event: Dict[str, Any]) -> CalendarCheckResult:
    """Check if an extracted event exists in the calendar.

    Args:
        extracted_event: Dict with 'what' (title), 'when' (time), 
                       optionally 'where' (location)

    Returns:
        CalendarCheckResult with status MATCH | NO_MATCH | CONFLICT
    """
    extracted_title = extracted_event.get("what", "")
    extracted_when = extracted_event.get("when", "")
    extracted_where = extracted_event.get("where", "")

    if not extracted_title:
        self.logger.warning("[CalendarValidator] No title in extracted event")
        return CalendarCheckResult(status="NO_MATCH")

    # Parse date range from extracted_when
    date_start, date_end = self._parse_date_range(extracted_when)

    if not date_start:
        self.logger.warning("[CalendarValidator] Could not parse date from: %s", extracted_when)
        return CalendarCheckResult(status="NO_MATCH")

    # Query calendar for events in date range
    try:
        calendar_events = self._query_calendar(date_start, date_end)
    except Exception as e:
        self.logger.error("[CalendarValidator] Calendar query failed: %s", e)
        return CalendarCheckResult(status="NO_MATCH")

    if not calendar_events:
        self.logger.info("[CalendarValidator] No events found in range %s to %s", date_start, date_end)
        return CalendarCheckResult(status="NO_MATCH")

    # Find best fuzzy match
    best_match = None
    best_score = 0.0

    for event in calendar_events:
        event_title = event.get("summary", "")
        if not event_title:
            continue

        score = self._fuzzy_match_score(extracted_event, event_title)
        self.logger.debug("[CalendarValidator] Fuzzy match: '%s' vs '%s' = %.2f",
                        extracted_title, event_title, score)

        if score > best_score:
            best_score = score
            best_match = event

    # Determine status based on fuzzy score
    if best_score >= self.STRONG_MATCH_THRESHOLD:
        # Strong match — likely same event
        self.logger.info("[CalendarValidator] MATCH: '%s' matches '%s' (score: %.2f)",
                       extracted_title, best_match.get("summary"), best_score)
        return CalendarCheckResult(
            status="MATCH",
            event_id=best_match.get("id"),
            event_title=best_match.get("summary"),
            event_start=self._extract_datetime(best_match.get("start")),
            event_end=self._extract_datetime(best_match.get("end")),
            fuzzy_score=best_score,
            matched_event=best_match,
        )

    elif best_score >= self.POSSIBLE_MATCH_THRESHOLD:
        # Possible match — check for conflicts
        conflict_desc = self._check_conflict(
            extracted_event, best_match, best_score
        )

        if conflict_desc:
            self.logger.info("[CalendarValidator] CONFLICT: '%s' vs '%s' (score: %.2f) — %s",
                           extracted_title, best_match.get("summary"), best_score, conflict_desc)
            return CalendarCheckResult(
                status="CONFLICT",
                event_id=best_match.get("id"),
                event_title=best_match.get("summary"),
                event_start=self._extract_datetime(best_match.get("start")),
                event_end=self._extract_datetime(best_match.get("end")),
                fuzzy_score=best_score,
                conflict_description=conflict_desc,
                matched_event=best_match,
            )
        else:
            # No conflict detected — treat as match
            self.logger.info("[CalendarValidator] MATCH (possible): '%s' matches '%s' (score: %.2f)",
                           extracted_title, best_match.get("summary"), best_score)
            return CalendarCheckResult(
                status="MATCH",
                event_id=best_match.get("id"),
                event_title=best_match.get("summary"),
                event_start=self._extract_datetime(best_match.get("start")),
                event_end=self._extract_datetime(best_match.get("end")),
                fuzzy_score=best_score,
                matched_event=best_match,
            )

    else:
        # No match found
        self.logger.info("[CalendarValidator] NO_MATCH: '%s' not found in calendar (best score: %.2f)",
                       extracted_title, best_score)
        return CalendarCheckResult(
            status="NO_MATCH",
            fuzzy_score=best_score,
        )

def _fuzzy_match_score(self, extracted_event: Dict[str, Any], calendar_title: str) -> float:
    """Calculate fuzzy string matching score (0.0-1.0).

    Uses difflib.SequenceMatcher for fuzzy matching.
    """
    if not extracted_event or not calendar_title:
        return 0.0

    extracted_title = extracted_event.get("what", "").lower().strip()

    # Normalize: lowercase, strip whitespace
    extracted = extracted_title.lower().strip()
    calendar = calendar_title.lower().strip()

    # Use SequenceMatcher for fuzzy matching
    score = SequenceMatcher(None, extracted, calendar).ratio()

    # Bonus: substring match (e.g., "soccer practice" matches "Sullivan Soccer Practice")
    if extracted in calendar or calendar in extracted:
        score = max(score, 0.6)

    # Bonus: location/venue overlap
    extracted_where = extracted_event.get("where", "").lower()
    calendar_location = (matched_event.get("location", "") if 'matched_event' in dir() else "").lower()
    if extracted_where and calendar_location:
        # Extract city/venue from location
        import re
        # Simple city extraction - look for city names
        city_pattern = r"([a-z]+),?\s*[a-z]{0,2}\s*\d{0,5}"
        extracted_city = re.search(city_pattern, extracted_where)
        calendar_city = re.search(city_pattern, calendar_location)

        if extracted_city and calendar_city:
            if extracted_city.group(1) == calendar_city.group(1):
                score = max(score, 0.55)  # Boost for location match

        # Venue name match
        venue_pattern = r"([a-z]+(?:\s+[a-z]+){0,2})"
        extracted_venue = re.search(venue_pattern, extracted_where)
        calendar_venue = re.search(venue_pattern, calendar_location)

        if extracted_venue and calendar_venue:
            venue_score = SequenceMatcher(None, extracted_venue.group(1), calendar_venue.group(1)).ratio()
            if venue_score > 0.7:
                score = max(score, 0.55)  # Boost for venue match

    # Bonus: keyword overlap for common events
    keywords = ["appointment", "practice", "game", "school", "doctor", "dentist", "haircut", "oil change"]
    extracted_keywords = [k for k in keywords if k in extracted]
    calendar_keywords = [k for k in keywords if k in calendar]
    if extracted_keywords and calendar_keywords:
        if set(extracted_keywords) & set(calendar_keywords):  # Any overlap
            score = max(score, 0.5)  # Boost for keyword match

    return round(score, 3)

def _query_calendar(self, date_start: str, date_end: str) -> List[Dict[str, Any]]:
    """Execute gog calendar events query.

    Args:
        date_start: ISO 8601 start time
        date_end: ISO 8601 end time

    Returns:
        List of calendar events

    Raises:
        subprocess.CalledProcessError: If gog CLI fails
        json.JSONDecodeError: If JSON parsing fails
    """
    cmd = [
        "gog", "calendar", "events", self.calendar_id,
        "--from", date_start,
        "--to", date_end,
        "--account", self.account,
        "--json",
    ]

    self.logger.debug("[CalendarValidator] Query: %s", " ".join(cmd))

    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        check=True,
    )

    data = json.loads(result.stdout)
    events = data.get("events", [])

    self.logger.debug("[CalendarValidator] Found %d events", len(events))
    return events

def _parse_date_range(self, when_str: str) -> tuple:
    """Parse date/time string into ISO 8601 date range.

    Handles various formats:
    - "today", "tomorrow"
    - "Monday", "Tuesday", etc.
    - "5/3", "05/03"
    - "May 3"

    Returns:
        (date_start, date_end) as ISO 8601 strings, or (None, None) if parsing fails
    """
    if not when_str:
        return None, None

    when_lower = when_str.lower().strip()
    now = datetime.now()

    # Handle relative dates
    if when_lower == "today" or when_lower.startswith("today"):
        target_date = now.date()
    elif when_lower == "tomorrow" or when_lower.startswith("tomorrow"):
        target_date = now.date() + timedelta(days=1)
    else:
        # Try to parse various date formats
        target_date = self._parse_date_string(when_str)

    if not target_date:
        return None, None

    # Create full day range in ISO 8601 format
    # Use UTC for consistency, gog CLI handles timezone conversion
    date_start = f"{target_date.isoformat()}T00:00:00Z"
    date_end = f"{target_date.isoformat()}T23:59:59Z"

    return date_start, date_end

def _parse_date_string(self, date_str: str) -> Optional[datetime.date]:
    """Parse various date string formats into a date object."""
    import re

    now = datetime.now()

    # Try to parse "May 3" or "may 3"
    month_pattern = r"(january|february|march|april|may|june|july|august|september|october|november|december)\s+(\d{1,2})"
    match = re.search(month_pattern, date_str.lower())
    if match:
        month_names = [
            "january", "february", "march", "april", "may", "june",
            "july", "august", "september", "october", "november", "december"
        ]
        month = month_names.index(match.group(1)) + 1
        day = int(match.group(2))
        return datetime(now.year, month, day).date()

    # Try to parse "5/3" or "05/03"
    date_pattern = r"(\d{1,2})[/-](\d{1,2})"
    match = re.search(date_pattern, date_str)
    if match:
        month = int(match.group(1))
        day = int(match.group(2))
        if 1 <= month <= 12 and 1 <= day <= 31:
            return datetime(now.year, month, day).date()

    # Try to parse day of week
    days = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
    for i, day_name in enumerate(days):
        if day_name in date_str.lower():
            # Calculate days until that day
            current_weekday = now.weekday()  # Monday = 0
            target_weekday = i
            days_ahead = (target_weekday - current_weekday) % 7
            if days_ahead == 0:
                days_ahead = 7  # Next week if today
            return (now + timedelta(days=days_ahead)).date()

    return None

def _extract_datetime(self, time_obj: Optional[Dict[str, Any]]) -> Optional[str]:
    """Extract datetime string from calendar event time object."""
    if not time_obj:
        return None
    # Prefer dateTime, fall back to date
    return time_obj.get("dateTime") or time_obj.get("date")

def _check_conflict(
    self,
    extracted_event: Dict[str, Any],
    calendar_event: Dict[str, Any],
    fuzzy_score: float,
) -> Optional[str]:
    """Check for conflicts between extracted and calendar event.

    Returns conflict description if conflict detected, None otherwise.
    """
    extracted_title = extracted_event.get("what", "").lower()
    calendar_title = calendar_event.get("summary", "").lower()

    # Check for significant title differences despite fuzzy match
    # (e.g., "Soccer Practice" vs "Dentist Appointment")
    if fuzzy_score < 0.6:
        # Check if they're completely different event types
        event_types = ["soccer", "dentist", "doctor", "practice", "game", "school"]
        extracted_types = [t for t in event_types if t in extracted_title]
        calendar_types = [t for t in event_types if t in calendar_title]

        if extracted_types and calendar_types and extracted_types != calendar_types:
            return f"Different event types: message mentions '{extracted_types[0]}' but calendar has '{calendar_types[0]}'"

    # NEW: Check for time overlap conflicts
    # If extracted event has a time and calendar event has a time,
    # check if they overlap
    extracted_when = extracted_event.get("when", "")
    if self._extracted_time_overlaps(extracted_when, calendar_event):
        return f"Time conflict: You have '{calendar_title}' scheduled during this time"

    return None

def _extracted_time_overlaps(self, extracted_when: str, calendar_event: Dict[str, Any]) -> bool:
    """Check if extracted time overlaps with calendar event time.

    Simple check: if extracted mentions a time and calendar event
    starts at or around that time, it's a conflict.
    """
    import re

    # Extract time from when string (e.g., "0800", "8:00am", "8 AM")
    time_patterns = [
        r"(\d{1,2}):(\d{2})\s*(am|pm)?",  # 8:00, 8:00am
        r"(\d{1,2})\s*(am|pm)",  # 8am, 8 pm
        r"(0\d{3}|1\d{3}|2[0-3]\d{2})",  # 0800, 1430 (military)
    ]

    extracted_hour = None
    extracted_minute = 0

    for pattern in time_patterns:
        match = re.search(pattern, extracted_when.lower())
        if match:
            groups = match.groups()
            if len(groups) == 1:
                # Military time: 0800
                military = groups[0]
                extracted_hour = int(military[:2])
                extracted_minute = int(military[2:])
            elif len(groups) >= 2:
                hour = int(groups[0])
                minute = int(groups[1]) if groups[1].isdigit() else 0
                ampm = groups[2].lower() if len(groups) > 2 and groups[2] else None

                if ampm == "pm" and hour != 12:
                    hour += 12
                elif ampm == "am" and hour == 12:
                    hour = 0

                extracted_hour = hour
                extracted_minute = minute
            break

    if extracted_hour is None:
        return False

    # Get calendar event start time
    event_start = calendar_event.get("start", {})
    if "dateTime" in event_start:
        # Parse ISO datetime
        from datetime import datetime
        dt_str = event_start["dateTime"]
        # Handle timezone offset
        if "-" in dt_str[10:]:
            dt_str = dt_str[:dt_str.rfind("-")]
        elif "+" in dt_str[10:]:
            dt_str = dt_str[:dt_str.rfind("+")]

        try:
            event_dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
            event_hour = event_dt.hour
            event_minute = event_dt.minute

            # Check if within 1 hour of each other
            extracted_total_minutes = extracted_hour * 60 + extracted_minute
            event_total_minutes = event_hour * 60 + event_minute
            time_diff = abs(extracted_total_minutes - event_total_minutes)

            return time_diff <= 60  # Within 1 hour = overlap
        except:
            return False

    return False

def create_event(
    self,
    summary: str,
    description: str = "",
    location: str = "",
    start_time: Optional[str] = None,
    end_time: Optional[str] = None,
    extracted_when: str = "",
) -> Dict[str, Any]:
    """Create a calendar event using the gog CLI.

    Args:
        summary: Event title/summary
        description: Optional description
        location: Optional location
        start_time: ISO 8601 start time. If None, computed from extracted_when.
        end_time: ISO 8601 end time. If None, defaults to start_time + 1 hour.
        extracted_when: Raw extracted "when" string for parsing start time.

    Returns:
        Dict with status, event_id, event_url, or error details.
    """
    # Resolve start time
    if not start_time:
        start_time = self._resolve_start_time(extracted_when, summary)

    if not start_time:
        return {
            "status": "error",
            "error": "Could not determine event start time. Provide a specific date/time."
        }

    # Default end = start + 1 hour
    if not end_time:
        from datetime import datetime, timedelta
        try:
            dt = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
            end_dt = dt + timedelta(hours=1)
            end_time = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        except Exception:
            end_time = start_time

    # Build gog command
    cmd = [
        "gog", "calendar", "create", self.calendar_id,
        "--summary", summary,
        "--from", start_time,
        "--to", end_time,
        "--account", self.account,
        "--json",
    ]

    if description:
        cmd.extend(["--description", description])
    if location:
        cmd.extend(["--location", location])

    self.logger.info("[CalendarValidator] Creating event: %s at %s", summary, start_time)
    self.logger.debug("[CalendarValidator] Command: %s", " ".join(cmd))

    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            check=True,
        )

        data = json.loads(result.stdout)
        event = data.get("event", {})

        self.logger.info(
            "[CalendarValidator] Event created: id=%s title=%s",
            event.get("id"), event.get("summary")
        )

        return {
            "status": "created",
            "event_id": event.get("id"),
            "event_summary": event.get("summary"),
            "event_start": event.get("start"),
            "event_end": event.get("end"),
            "event_url": event.get("htmlLink", ""),
        }

    except subprocess.CalledProcessError as e:
        error_msg = e.stderr or str(e)
        self.logger.error("[CalendarValidator] Failed to create event: %s", error_msg)
        return {
            "status": "error",
            "error": error_msg,
            "stdout": e.stdout,
        }
    except json.JSONDecodeError as e:
        self.logger.error("[CalendarValidator] Failed to parse gog output: %s", e)
        return {
            "status": "error",
            "error": f"Failed to parse gog output: {e}",
        }

def _resolve_start_time(self, extracted_when: str, summary: str) -> Optional[str]:
    """Resolve a friendly "when" string into an ISO 8601 start time.

    Handles:
    - "today", "tomorrow"
    - "Monday", "Tuesday", etc. (next occurrence)
    - "May 3" or "may 3"
    - "5/3" or "05/03"
    - time suffixes: "at 3pm", "at 0800"
    """
    import re
    from datetime import datetime, timezone

    now = datetime.now()
    when_lower = extracted_when.lower().strip()

    # Determine target date
    target_date = None

    if when_lower == "today" or when_lower.startswith("today"):
        target_date = now.date()
    elif when_lower == "tomorrow" or when_lower.startswith("tomorrow"):
        target_date = now.date() + timedelta(days=1)
    else:
        # Try day of week
        days = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
        for i, day_name in enumerate(days):
            if day_name in when_lower:
                current_weekday = now.weekday()
                days_ahead = (i - current_weekday) % 7
                if days_ahead == 0:
                    days_ahead = 7
                target_date = (now + timedelta(days=days_ahead)).date()
                break

    if target_date is None:
        # Try "May 3" format
        month_pattern = r"(january|february|march|april|may|june|july|august|september|october|november|december)\s+(\d{1,2})"
        match = re.search(month_pattern, when_lower)
        if match:
            month_names = [
                "january", "february", "march", "april", "may", "june",
                "july", "august", "september", "october", "november", "december"
            ]
            month = month_names.index(match.group(1)) + 1
            day = int(match.group(2))
            target_date = datetime(now.year, month, day).date()

    if target_date is None:
        # Try "5/3" format
        date_pattern = r"(\d{1,2})[/-](\d{1,2})"
        match = re.search(date_pattern, when_lower)
        if match:
            month = int(match.group(1))
            day = int(match.group(2))
            if 1 <= month <= 12 and 1 <= day <= 31:
                target_date = datetime(now.year, month, day).date()

    if target_date is None:
        return None

    # Extract time if present
    hour = 9  # Default: 9 AM
    minute = 0

    time_patterns = [
        r"(\d{1,2}):(\d{2})\s*(am|pm)?",  # 8:00, 8:00am
        r"(\d{1,2})\s*(am|pm)",  # 8am, 8 pm
        r"at\s+(0\d{3}|1\d{3}|2[0-3]\d{2})",  # at 0800, at 1430
    ]

    for pattern in time_patterns:
        match = re.search(pattern, when_lower)
        if match:
            groups = match.groups()
            if len(groups) == 1:
                # Military time: 0800
                military = groups[0]
                hour = int(military[:2])
                minute = int(military[2:])
            elif len(groups) >= 2:
                h = int(groups[0])
                m = int(groups[1]) if groups[1].isdigit() else 0
                ampm = groups[2].lower() if len(groups) > 2 and groups[2] else None

                if ampm == "pm" and h != 12:
                    h += 12
                elif ampm == "am" and h == 12:
                    h = 0

                hour = h
                minute = m
            break

    # Build ISO 8601 string
    from datetime import timezone as dt_tz
    dt = datetime(
        target_date.year, target_date.month, target_date.day,
        hour, minute, tzinfo=now.astimezone().tzinfo or dt_tz.utc
    )
    return dt.strftime("%Y-%m-%dT%H:%M:%SZ")

def create_default_validator() -> CalendarValidator:
"""Create a CalendarValidator with default settings."""
return CalendarValidator()