📄 shadow_polling.py 3,525 bytes May 01, 2026 📋 Raw

"""Shadow Mode Polling — Long-polling for Telegram updates in shadow mode.

Runs the silent observer bot that captures messages without responding.
"""

import asyncio
import logging
from typing import Optional

import httpx

from .shadow_bot import ShadowBot

async def shadow_polling_loop(
bot: ShadowBot,
offset: int = 0,
timeout: int = 60,
) -> None:
"""Long-polling loop for shadow mode.

Continuously polls Telegram for updates and processes them silently.
Never sends responses.

Args:
    bot: Configured ShadowBot instance
    offset: Update offset for Telegram API
    timeout: Long-polling timeout in seconds
"""
base_url = bot.base_url
current_offset = offset

logging.info("[shadow_polling] Starting shadow mode polling loop")
logging.info("[shadow_polling] SPEAK_ENABLED = %s", bot.__class__.__module__)

# Clear any existing webhook first
async with httpx.AsyncClient() as client:
    try:
        resp = await client.post(f"{base_url}/deleteWebhook")
        result = resp.json()
        if result.get("ok"):
            logging.info("[shadow_polling] Webhook cleared")
        else:
            logging.warning("[shadow_polling] deleteWebhook: %s", result)
    except Exception as e:
        logging.warning("[shadow_polling] Failed to clear webhook: %s", e)

while True:
    try:
        async with httpx.AsyncClient(timeout=timeout + 5.0) as client:
            # Long-polling request
            resp = await client.post(
                f"{base_url}/getUpdates",
                json={"offset": current_offset, "limit": 100, "timeout": timeout}
            )
            resp.raise_for_status()
            data = resp.json()

            if data.get("ok") and data.get("result"):
                for update in data["result"]:
                    current_offset = update["update_id"] + 1
                    update_id = update["update_id"]

                    logging.debug("[shadow_polling] Processing update %s", update_id)

                    try:
                        result = await bot.process_message(update)
                        if result:
                            logging.info(
                                "[shadow_polling] Update %s: tripwire=%s extraction=%s",
                                update_id,
                                result.get("tripwire", {}).fired if result.get("tripwire") else False,
                                result.get("extraction") is not None,
                            )
                    except Exception as e:
                        logging.exception("[shadow_polling] Error processing update %s: %s", update_id, e)

    except asyncio.CancelledError:
        logging.info("[shadow_polling] Loop cancelled, exiting")
        break
    except Exception as e:
        logging.error("[shadow_polling] Polling error: %s", e)
        await asyncio.sleep(5)  # Backoff on error

async def start_shadow_polling(bot: ShadowBot) -> asyncio.Task:
"""Start the shadow polling loop as a background task.

Returns the task handle for cancellation.
"""
logging.info("[shadow_polling] Starting shadow mode background task")
task = asyncio.create_task(shadow_polling_loop(bot))
return task