"""Calendar action handler — Telegram inline keyboard for briefing events. Handles callback queries: - calendar_add|{doc_id}|{event_hash} → Create Radicale calendar event - calendar_done|{doc_id}|{event_hash} → Dismiss the button """ import logging from datetime import datetime, timedelta from zoneinfo import ZoneInfo from icarus.core.db.documents import get_event_by_hash from icarus.core.calendar_sync import create_event, event_exists from icarus.core.config import CHICAGO_TZ logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # MarkdownV2 escaping # --------------------------------------------------------------------------- _MDV2_SPECIAL = r'_*[]()~`>#+-=|{}.!' _MDV2_ESCAPE_RE = __import__('re').compile(r'([%s])' % __import__('re').escape(_MDV2_SPECIAL)) def escape_mdv2(text: str) -> str: """Escape Telegram MarkdownV2 special characters.""" return _MDV2_ESCAPE_RE.sub(r'\\\1', text) # --------------------------------------------------------------------------- # Parse callback data # --------------------------------------------------------------------------- def parse_callback(callback_data: str) -> tuple[str, str, str] | None: """Parse calendar callback data into (action, doc_id, event_hash). Expected format: calendar_add|{doc_id}|{event_hash} """ parts = callback_data.split("|") if len(parts) != 3: return None action, doc_id, event_hash = parts if action not in ("calendar_add", "calendar_done"): return None return (action, doc_id, event_hash) # --------------------------------------------------------------------------- # Event creation # --------------------------------------------------------------------------- def _parse_datetime(dt_str: str) -> datetime | None: """Parse an ISO datetime string into a timezone-aware datetime. Falls back to date-only parsing if time component is missing. """ if not dt_str: return None # Try ISO format with timezone for fmt in ( "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M", "%Y-%m-%d", ): try: dt = datetime.strptime(dt_str, fmt) if dt.tzinfo is None: dt = dt.replace(tzinfo=CHICAGO_TZ) return dt.astimezone(CHICAGO_TZ) except ValueError: continue # Python 3.7+ fromisoformat try: dt = datetime.fromisoformat(dt_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=CHICAGO_TZ) return dt.astimezone(CHICAGO_TZ) except (ValueError, TypeError): pass logger.warning("Could not parse datetime: %s", dt_str) return None def handle_calendar_add(callback_data: str, user_id: str) -> dict: """Handle calendar_add callback — create a Radicale calendar event. Args: callback_data: "calendar_add|{doc_id}|{event_hash}" user_id: Telegram user ID (for future user mapping) Returns: dict with status, summary, formatted time, and any error. """ parsed = parse_callback(callback_data) if not parsed: return {"status": "error", "message": "Invalid callback data"} _, doc_id, event_hash = parsed # Retrieve stored event data event = get_event_by_hash(doc_id, event_hash) if not event: return { "status": "expired", "message": "This briefing has expired. Send the document again to get a fresh briefing.", } summary = event.get("summary", "Untitled Event") start_str = event.get("start_time", "") end_str = event.get("end_time", "") location = event.get("location", "") description = event.get("description", "") category = event.get("category", "event") # Parse start time start_dt = _parse_datetime(start_str) if not start_dt: # Try to create an all-day event if we only have a date or nothing if start_str and len(start_str) >= 10: # Date-only: create all-day event date_str = start_str[:10] try: start_dt = date_str # Pass as string for all-day event except Exception: return {"status": "error", "message": f"Could not parse date: {start_str}"} else: return { "status": "error", "message": "No date/time found for this event. Cannot add to calendar.", } # Determine end time if end_str: end_dt = _parse_datetime(end_str) if not end_dt and isinstance(start_dt, datetime): end_dt = start_dt + timedelta(hours=1) elif isinstance(start_dt, datetime): end_dt = start_dt + timedelta(hours=1) else: # All-day event — end = same day (will be incremented by create_event) end_dt = start_dt # Check for duplicates before creating if isinstance(start_dt, datetime): existing = event_exists(summary, start_dt) if existing: # Format the existing event time for the confirmation existing_start = existing.get("start", {}).get("dateTime", "") formatted_time = _format_time(existing_start) return { "status": "duplicate", "summary": summary, "formatted_time": formatted_time, "message": f"Already on calendar: \"{summary}\"\n{formatted_time}", } # Create the event try: # Append source info to description source_filename = event.get("source_filename", "") if source_filename: description = f"{description}\n\nSource: {source_filename}".strip() # Add category tag to description if category and category != "event": description = f"[{category.upper()}] {description}" result = create_event( summary=summary, start_dt=start_dt, end_dt=end_dt, description=description, location=location, ) formatted_time = _format_event_time(start_dt, end_dt) return { "status": "created", "summary": summary, "formatted_time": formatted_time, "event_id": result.get("id", ""), "message": f"✅ Added to calendar:\n \"{summary}\"\n {formatted_time}", } except Exception as e: logger.error("Failed to create calendar event: %s", e, exc_info=True) return { "status": "error", "message": f"Failed to create event: {str(e)[:200]}", } def _format_time(iso_str: str) -> str: """Format an ISO datetime string for display.""" dt = _parse_datetime(iso_str) if dt: return dt.strftime("%B %-d, %Y at %-I:%M %p") return iso_str def _format_event_time(start_dt, end_dt) -> str: """Format start/end time for display. Handles both datetime objects and date strings (all-day events). """ if isinstance(start_dt, str): # All-day event try: from datetime import date as date_type d = date_type.fromisoformat(start_dt) return d.strftime("%B %-d, %Y (all day)") except (ValueError, TypeError): return start_dt if isinstance(start_dt, datetime): date_str = start_dt.strftime("%B %-d, %Y") time_str = start_dt.strftime("%-I:%M %p") if isinstance(end_dt, datetime): # Same day? if start_dt.date() == end_dt.date(): end_time_str = end_dt.strftime("%-I:%M %p") return f"{date_str}, {time_str} – {end_time_str}" else: end_date_str = end_dt.strftime("%B %-d, %Y at %-I:%M %p") return f"{date_str} at {time_str} – {end_date_str}" return f"{date_str} at {time_str}" return str(start_dt) # --------------------------------------------------------------------------- # Callback dispatchers (called from telegram handler) # --------------------------------------------------------------------------- async def handle_calendar_callback(callback_query: dict, bot) -> None: """Dispatch calendar-related callback queries. Routes calendar_add and calendar_done actions. """ cb_data = callback_query.get("data", "") parsed = parse_callback(cb_data) if not parsed: await bot.answer_callback_query(callback_query["id"], text="Invalid request") return action, doc_id, event_hash = parsed if action == "calendar_add": await _handle_add(callback_query, bot, doc_id, event_hash) elif action == "calendar_done": await _handle_done(callback_query, bot, doc_id, event_hash) else: await bot.answer_callback_query(callback_query["id"], text="Unknown action") async def _handle_add(callback_query: dict, bot, doc_id: str, event_hash: str) -> None: """Handle calendar_add: create event and update the message.""" user_id = str(callback_query.get("from", {}).get("id", "")) cb_data = f"calendar_add|{doc_id}|{event_hash}" result = handle_calendar_add(cb_data, user_id) message = callback_query.get("message", {}) chat_id = message.get("chat", {}).get("id") message_id = message.get("message_id") if not chat_id or not message_id: await bot.answer_callback_query(callback_query["id"], text=result.get("message", "Done")) return status = result.get("status") if status == "created": # Update the message with confirmation and new buttons summary = result["summary"] formatted_time = result.get("formatted_time", "") event_id = result.get("event_id", "") # Build confirmation message (MarkdownV2 safe) msg = ( f"✅ Added to calendar:\n" f" 📅 {escape_mdv2(summary)}\n" f" 🕐 {escape_mdv2(formatted_time)}" ) # New keyboard: Edit, Delete, Done keyboard = { "inline_keyboard": [[ {"text": "✅ Done", "callback_data": f"calendar_done|{doc_id}|{event_hash}"}, ]] } try: await bot.edit_message_text( chat_id=chat_id, message_id=message_id, text=msg, reply_markup=keyboard, ) except Exception as e: logger.warning("Failed to edit message after calendar add: %s", e) # Fallback: just answer the callback await bot.answer_callback_query( callback_query["id"], text=f"Added: {summary}" ) return await bot.answer_callback_query(callback_query["id"], text=f"✅ Added: {summary}") elif status == "duplicate": summary = result["summary"] formatted_time = result.get("formatted_time", "") msg = ( f"📋 Already on calendar:\n" f" 📅 {escape_mdv2(summary)}\n" f" 🕐 {escape_mdv2(formatted_time)}" ) keyboard = { "inline_keyboard": [[ {"text": "✅ Done", "callback_data": f"calendar_done|{doc_id}|{event_hash}"}, ]] } try: await bot.edit_message_text( chat_id=chat_id, message_id=message_id, text=msg, reply_markup=keyboard, ) except Exception as e: logger.warning("Failed to edit message for duplicate: %s", e) await bot.answer_callback_query(callback_query["id"], text=f"Already exists: {summary}") elif status == "expired": await bot.answer_callback_query( callback_query["id"], text="⚠️ Briefing expired. Send the document again." ) else: # Error error_msg = result.get("message", "Unknown error") await bot.answer_callback_query( callback_query["id"], text=f"❌ {error_msg[:200]}" ) async def _handle_done(callback_query: dict, bot, doc_id: str, event_hash: str) -> None: """Handle calendar_done: dismiss the button UI.""" message = callback_query.get("message", {}) chat_id = message.get("chat", {}).get("id") message_id = message.get("message_id") if chat_id and message_id: # Remove inline keyboard (edit message, remove markup) try: # Get current text, just remove the keyboard current_text = message.get("text", "✅") await bot.edit_message_text( chat_id=chat_id, message_id=message_id, text=current_text, reply_markup=None, ) except Exception as e: logger.warning("Failed to remove keyboard: %s", e) await bot.answer_callback_query(callback_query["id"], text="👍") # --------------------------------------------------------------------------- # Action button builder (used by briefing formatter) # --------------------------------------------------------------------------- def build_action_buttons(doc_id: str, event_hash: str, has_datetime: bool = True) -> dict: """Build the inline keyboard for calendar action buttons. Args: doc_id: Document ID from store_briefing_event event_hash: Event hash from store_briefing_event has_datetime: Whether the event has parseable dates/times Returns: Telegram ReplyMarkupKeyboard dict. """ buttons = [] if has_datetime: buttons.append({ "text": "🗓️ Add to Calendar", "callback_data": f"calendar_add|{doc_id}|{event_hash}" }) # Always show Done button buttons.append({ "text": "✅ Done", "callback_data": f"calendar_done|{doc_id}|{event_hash}" }) return {"inline_keyboard": [buttons]}