"""Shadow Mode Polling — Long-polling for Telegram updates in shadow mode.
Runs the silent observer bot that captures messages without responding.
"""
import asyncio
import logging
from typing import Optional
import httpx
from .shadow_bot import ShadowBot
async def shadow_polling_loop(
bot: ShadowBot,
offset: int = 0,
timeout: int = 60,
) -> None:
"""Long-polling loop for shadow mode.
Continuously polls Telegram for updates and processes them silently.
Never sends responses.
Args:
bot: Configured ShadowBot instance
offset: Update offset for Telegram API
timeout: Long-polling timeout in seconds
"""
base_url = bot.base_url
current_offset = offset
logging.info("[shadow_polling] Starting shadow mode polling loop")
logging.info("[shadow_polling] SPEAK_ENABLED = %s", bot.__class__.__module__)
# Clear any existing webhook first
async with httpx.AsyncClient() as client:
try:
resp = await client.post(f"{base_url}/deleteWebhook")
result = resp.json()
if result.get("ok"):
logging.info("[shadow_polling] Webhook cleared")
else:
logging.warning("[shadow_polling] deleteWebhook: %s", result)
except Exception as e:
logging.warning("[shadow_polling] Failed to clear webhook: %s", e)
while True:
try:
async with httpx.AsyncClient(timeout=timeout + 5.0) as client:
# Long-polling request
resp = await client.post(
f"{base_url}/getUpdates",
json={"offset": current_offset, "limit": 100, "timeout": timeout}
)
resp.raise_for_status()
data = resp.json()
if data.get("ok") and data.get("result"):
for update in data["result"]:
current_offset = update["update_id"] + 1
update_id = update["update_id"]
logging.debug("[shadow_polling] Processing update %s", update_id)
try:
result = await bot.process_message(update)
if result:
logging.info(
"[shadow_polling] Update %s: tripwire=%s extraction=%s",
update_id,
result.get("tripwire", {}).fired if result.get("tripwire") else False,
result.get("extraction") is not None,
)
except Exception as e:
logging.exception("[shadow_polling] Error processing update %s: %s", update_id, e)
except asyncio.CancelledError:
logging.info("[shadow_polling] Loop cancelled, exiting")
break
except Exception as e:
logging.error("[shadow_polling] Polling error: %s", e)
await asyncio.sleep(5) # Backoff on error
async def start_shadow_polling(bot: ShadowBot) -> asyncio.Task:
"""Start the shadow polling loop as a background task.
Returns the task handle for cancellation.
"""
logging.info("[shadow_polling] Starting shadow mode background task")
task = asyncio.create_task(shadow_polling_loop(bot))
return task