📄 calendar_action.py 14,010 bytes Apr 28, 2026 📋 Raw

"""Calendar action handler — Telegram inline keyboard for briefing events.

Handles callback queries:
- calendar_add|{doc_id}|{event_hash} → Create Radicale calendar event
- calendar_done|{doc_id}|{event_hash} → Dismiss the button
"""

import logging
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

from icarus.core.db.documents import get_event_by_hash
from icarus.core.calendar_sync import create_event, event_exists
from icarus.core.config import CHICAGO_TZ

logger = logging.getLogger(name)

---------------------------------------------------------------------------

MarkdownV2 escaping

---------------------------------------------------------------------------

MDV2_SPECIAL = r'*~`>#+-=|{}.!'
_MDV2_ESCAPE_RE = import('re').compile(r'([%s])' % import('re').escape(_MDV2_SPECIAL))

def escape_mdv2(text: str) -> str:
"""Escape Telegram MarkdownV2 special characters."""
return _MDV2_ESCAPE_RE.sub(r'\\1', text)

---------------------------------------------------------------------------

Parse callback data

---------------------------------------------------------------------------

def parse_callback(callback_data: str) -> tuple[str, str, str] | None:
"""Parse calendar callback data into (action, doc_id, event_hash).

Expected format: calendar_add|{doc_id}|{event_hash}
"""
parts = callback_data.split("|")
if len(parts) != 3:
    return None
action, doc_id, event_hash = parts
if action not in ("calendar_add", "calendar_done"):
    return None
return (action, doc_id, event_hash)

---------------------------------------------------------------------------

Event creation

---------------------------------------------------------------------------

def _parse_datetime(dt_str: str) -> datetime | None:
"""Parse an ISO datetime string into a timezone-aware datetime.

Falls back to date-only parsing if time component is missing.
"""
if not dt_str:
    return None

# Try ISO format with timezone
for fmt in (
    "%Y-%m-%dT%H:%M:%S%z",
    "%Y-%m-%dT%H:%M:%S.%f%z",
    "%Y-%m-%dT%H:%M:%S",
    "%Y-%m-%dT%H:%M",
    "%Y-%m-%d",
):
    try:
        dt = datetime.strptime(dt_str, fmt)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=CHICAGO_TZ)
        return dt.astimezone(CHICAGO_TZ)
    except ValueError:
        continue

# Python 3.7+ fromisoformat
try:
    dt = datetime.fromisoformat(dt_str)
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=CHICAGO_TZ)
    return dt.astimezone(CHICAGO_TZ)
except (ValueError, TypeError):
    pass

logger.warning("Could not parse datetime: %s", dt_str)
return None

def handle_calendar_add(callback_data: str, user_id: str) -> dict:
"""Handle calendar_add callback — create a Radicale calendar event.

Args:
    callback_data: "calendar_add|{doc_id}|{event_hash}"
    user_id: Telegram user ID (for future user mapping)

Returns:
    dict with status, summary, formatted time, and any error.
"""
parsed = parse_callback(callback_data)
if not parsed:
    return {"status": "error", "message": "Invalid callback data"}

_, doc_id, event_hash = parsed

# Retrieve stored event data
event = get_event_by_hash(doc_id, event_hash)
if not event:
    return {
        "status": "expired",
        "message": "This briefing has expired. Send the document again to get a fresh briefing.",
    }

summary = event.get("summary", "Untitled Event")
start_str = event.get("start_time", "")
end_str = event.get("end_time", "")
location = event.get("location", "")
description = event.get("description", "")
category = event.get("category", "event")

# Parse start time
start_dt = _parse_datetime(start_str)

if not start_dt:
    # Try to create an all-day event if we only have a date or nothing
    if start_str and len(start_str) >= 10:
        # Date-only: create all-day event
        date_str = start_str[:10]
        try:
            start_dt = date_str  # Pass as string for all-day event
        except Exception:
            return {"status": "error", "message": f"Could not parse date: {start_str}"}
    else:
        return {
            "status": "error",
            "message": "No date/time found for this event. Cannot add to calendar.",
        }

# Determine end time
if end_str:
    end_dt = _parse_datetime(end_str)
    if not end_dt and isinstance(start_dt, datetime):
        end_dt = start_dt + timedelta(hours=1)
elif isinstance(start_dt, datetime):
    end_dt = start_dt + timedelta(hours=1)
else:
    # All-day event — end = same day (will be incremented by create_event)
    end_dt = start_dt

# Check for duplicates before creating
if isinstance(start_dt, datetime):
    existing = event_exists(summary, start_dt)
    if existing:
        # Format the existing event time for the confirmation
        existing_start = existing.get("start", {}).get("dateTime", "")
        formatted_time = _format_time(existing_start)
        return {
            "status": "duplicate",
            "summary": summary,
            "formatted_time": formatted_time,
            "message": f"Already on calendar: \"{summary}\"\n{formatted_time}",
        }

# Create the event
try:
    # Append source info to description
    source_filename = event.get("source_filename", "")
    if source_filename:
        description = f"{description}\n\nSource: {source_filename}".strip()

    # Add category tag to description
    if category and category != "event":
        description = f"[{category.upper()}] {description}"

    result = create_event(
        summary=summary,
        start_dt=start_dt,
        end_dt=end_dt,
        description=description,
        location=location,
    )

    formatted_time = _format_event_time(start_dt, end_dt)
    return {
        "status": "created",
        "summary": summary,
        "formatted_time": formatted_time,
        "event_id": result.get("id", ""),
        "message": f"✅ Added to calendar:\n   \"{summary}\"\n   {formatted_time}",
    }

except Exception as e:
    logger.error("Failed to create calendar event: %s", e, exc_info=True)
    return {
        "status": "error",
        "message": f"Failed to create event: {str(e)[:200]}",
    }

def _format_time(iso_str: str) -> str:
"""Format an ISO datetime string for display."""
dt = _parse_datetime(iso_str)
if dt:
return dt.strftime("%B %-d, %Y at %-I:%M %p")
return iso_str

def _format_event_time(start_dt, end_dt) -> str:
"""Format start/end time for display.

Handles both datetime objects and date strings (all-day events).
"""
if isinstance(start_dt, str):
    # All-day event
    try:
        from datetime import date as date_type
        d = date_type.fromisoformat(start_dt)
        return d.strftime("%B %-d, %Y (all day)")
    except (ValueError, TypeError):
        return start_dt

if isinstance(start_dt, datetime):
    date_str = start_dt.strftime("%B %-d, %Y")
    time_str = start_dt.strftime("%-I:%M %p")
    if isinstance(end_dt, datetime):
        # Same day?
        if start_dt.date() == end_dt.date():
            end_time_str = end_dt.strftime("%-I:%M %p")
            return f"{date_str}, {time_str}{end_time_str}"
        else:
            end_date_str = end_dt.strftime("%B %-d, %Y at %-I:%M %p")
            return f"{date_str} at {time_str}{end_date_str}"
    return f"{date_str} at {time_str}"

return str(start_dt)

---------------------------------------------------------------------------

Callback dispatchers (called from telegram handler)

---------------------------------------------------------------------------

async def handle_calendar_callback(callback_query: dict, bot) -> None:
"""Dispatch calendar-related callback queries.

Routes calendar_add and calendar_done actions.
"""
cb_data = callback_query.get("data", "")
parsed = parse_callback(cb_data)
if not parsed:
    await bot.answer_callback_query(callback_query["id"], text="Invalid request")
    return

action, doc_id, event_hash = parsed

if action == "calendar_add":
    await _handle_add(callback_query, bot, doc_id, event_hash)
elif action == "calendar_done":
    await _handle_done(callback_query, bot, doc_id, event_hash)
else:
    await bot.answer_callback_query(callback_query["id"], text="Unknown action")

async def _handle_add(callback_query: dict, bot, doc_id: str, event_hash: str) -> None:
"""Handle calendar_add: create event and update the message."""
user_id = str(callback_query.get("from", {}).get("id", ""))
cb_data = f"calendar_add|{doc_id}|{event_hash}"

result = handle_calendar_add(cb_data, user_id)

message = callback_query.get("message", {})
chat_id = message.get("chat", {}).get("id")
message_id = message.get("message_id")

if not chat_id or not message_id:
    await bot.answer_callback_query(callback_query["id"], text=result.get("message", "Done"))
    return

status = result.get("status")

if status == "created":
    # Update the message with confirmation and new buttons
    summary = result["summary"]
    formatted_time = result.get("formatted_time", "")
    event_id = result.get("event_id", "")

    # Build confirmation message (MarkdownV2 safe)
    msg = (
        f"✅ Added to calendar:\n"
        f"   📅 {escape_mdv2(summary)}\n"
        f"   🕐 {escape_mdv2(formatted_time)}"
    )

    # New keyboard: Edit, Delete, Done
    keyboard = {
        "inline_keyboard": [[
            {"text": "✅ Done", "callback_data": f"calendar_done|{doc_id}|{event_hash}"},
        ]]
    }

    try:
        await bot.edit_message_text(
            chat_id=chat_id,
            message_id=message_id,
            text=msg,
            reply_markup=keyboard,
        )
    except Exception as e:
        logger.warning("Failed to edit message after calendar add: %s", e)
        # Fallback: just answer the callback
        await bot.answer_callback_query(
            callback_query["id"],
            text=f"Added: {summary}"
        )
        return

    await bot.answer_callback_query(callback_query["id"], text=f"✅ Added: {summary}")

elif status == "duplicate":
    summary = result["summary"]
    formatted_time = result.get("formatted_time", "")

    msg = (
        f"📋 Already on calendar:\n"
        f"   📅 {escape_mdv2(summary)}\n"
        f"   🕐 {escape_mdv2(formatted_time)}"
    )

    keyboard = {
        "inline_keyboard": [[
            {"text": "✅ Done", "callback_data": f"calendar_done|{doc_id}|{event_hash}"},
        ]]
    }

    try:
        await bot.edit_message_text(
            chat_id=chat_id,
            message_id=message_id,
            text=msg,
            reply_markup=keyboard,
        )
    except Exception as e:
        logger.warning("Failed to edit message for duplicate: %s", e)

    await bot.answer_callback_query(callback_query["id"], text=f"Already exists: {summary}")

elif status == "expired":
    await bot.answer_callback_query(
        callback_query["id"],
        text="⚠️ Briefing expired. Send the document again."
    )

else:
    # Error
    error_msg = result.get("message", "Unknown error")
    await bot.answer_callback_query(
        callback_query["id"],
        text=f"❌ {error_msg[:200]}"
    )

async def _handle_done(callback_query: dict, bot, doc_id: str, event_hash: str) -> None:
"""Handle calendar_done: dismiss the button UI."""
message = callback_query.get("message", {})
chat_id = message.get("chat", {}).get("id")
message_id = message.get("message_id")

if chat_id and message_id:
    # Remove inline keyboard (edit message, remove markup)
    try:
        # Get current text, just remove the keyboard
        current_text = message.get("text", "✅")
        await bot.edit_message_text(
            chat_id=chat_id,
            message_id=message_id,
            text=current_text,
            reply_markup=None,
        )
    except Exception as e:
        logger.warning("Failed to remove keyboard: %s", e)

await bot.answer_callback_query(callback_query["id"], text="👍")

---------------------------------------------------------------------------

Action button builder (used by briefing formatter)

---------------------------------------------------------------------------

def build_action_buttons(doc_id: str, event_hash: str, has_datetime: bool = True) -> dict:
"""Build the inline keyboard for calendar action buttons.

Args:
    doc_id: Document ID from store_briefing_event
    event_hash: Event hash from store_briefing_event
    has_datetime: Whether the event has parseable dates/times

Returns:
    Telegram ReplyMarkupKeyboard dict.
"""
buttons = []

if has_datetime:
    buttons.append({
        "text": "🗓️ Add to Calendar",
        "callback_data": f"calendar_add|{doc_id}|{event_hash}"
    })

# Always show Done button
buttons.append({
    "text": " Done",
    "callback_data": f"calendar_done|{doc_id}|{event_hash}"
})

return {"inline_keyboard": [buttons]}