"""Shadow Mode Polling — Long-polling for Telegram updates in shadow mode. Runs the silent observer bot that captures messages without responding. """ import asyncio import logging from typing import Optional import httpx from .shadow_bot import ShadowBot async def shadow_polling_loop( bot: ShadowBot, offset: int = 0, timeout: int = 60, ) -> None: """Long-polling loop for shadow mode. Continuously polls Telegram for updates and processes them silently. Never sends responses. Args: bot: Configured ShadowBot instance offset: Update offset for Telegram API timeout: Long-polling timeout in seconds """ base_url = bot.base_url current_offset = offset logging.info("[shadow_polling] Starting shadow mode polling loop") logging.info("[shadow_polling] SPEAK_ENABLED = %s", bot.__class__.__module__) # Clear any existing webhook first async with httpx.AsyncClient() as client: try: resp = await client.post(f"{base_url}/deleteWebhook") result = resp.json() if result.get("ok"): logging.info("[shadow_polling] Webhook cleared") else: logging.warning("[shadow_polling] deleteWebhook: %s", result) except Exception as e: logging.warning("[shadow_polling] Failed to clear webhook: %s", e) while True: try: async with httpx.AsyncClient(timeout=timeout + 5.0) as client: # Long-polling request resp = await client.post( f"{base_url}/getUpdates", json={"offset": current_offset, "limit": 100, "timeout": timeout} ) resp.raise_for_status() data = resp.json() if data.get("ok") and data.get("result"): for update in data["result"]: current_offset = update["update_id"] + 1 update_id = update["update_id"] logging.debug("[shadow_polling] Processing update %s", update_id) try: result = await bot.process_message(update) if result: logging.info( "[shadow_polling] Update %s: tripwire=%s extraction=%s", update_id, result.get("tripwire", {}).fired if result.get("tripwire") else False, result.get("extraction") is not None, ) except Exception as e: logging.exception("[shadow_polling] Error processing update %s: %s", update_id, e) except asyncio.CancelledError: logging.info("[shadow_polling] Loop cancelled, exiting") break except Exception as e: logging.error("[shadow_polling] Polling error: %s", e) await asyncio.sleep(5) # Backoff on error async def start_shadow_polling(bot: ShadowBot) -> asyncio.Task: """Start the shadow polling loop as a background task. Returns the task handle for cancellation. """ logging.info("[shadow_polling] Starting shadow mode background task") task = asyncio.create_task(shadow_polling_loop(bot)) return task