📄 icarus-phase-4-telegram-bot.md 12,746 bytes Apr 26, 2026 📋 Raw

🤖 Icarus Phase 4 — Telegram Bot Handler

To: Socrates 🧠
From: Matt (Director) via Wadsworth 📋
Date: 2026-04-26
Priority: P1 — Execute immediately
Status: Phase 3 complete ✅ → Phase 4 Bot Integration


MISSION

Enable Telegram document ingestion: User sends image/PDF to @IcarusTestBot → bot processes via vision pipeline → returns briefing card.

This is the primary user interface for Icarus.


ARCHITECTURE

User  Telegram  @IcarusTestBot
                        
                POST /telegram/webhook
                        
              Icarus API (icarus/core/telegram/)
                        
              Download file from Telegram
                        
              Vision pipeline (parse  briefing)
                        
              Telegram API (send briefing)
                        
              User receives card in Telegram

IMPLEMENTATION

Component 1: Telegram Config

File: icarus/core/config/staging.py (append)

# Telegram Bot Integration
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN")
TELEGRAM_WEBHOOK_SECRET = os.environ.get("TELEGRAM_WEBHOOK_SECRET", "icarus-webhook-2026")
TELEGRAM_ALLOWED_USERS = [
    "8386527252",  # Matt
]

Token already in: staging.env (created during Phase 2)


Component 2: Telegram Handler

Create: icarus/core/telegram/__init__.py

"""Telegram bot integration for Icarus."""

Create: icarus/core/telegram/handler.py

"""Telegram webhook handler and bot API client."""
import os
import io
from pathlib import Path
from fastapi import Request, HTTPException
import httpx
from icarus.core.config.staging import (
    TELEGRAM_BOT_TOKEN,
    TELEGRAM_ALLOWED_USERS,
    OLLAMA_BASE_URL
)
from icarus.core.vision.pipeline import process_attachment


async def handle_telegram_update(request: Request) -> dict:
    """Handle incoming Telegram webhook update.

    Returns: {"ok": True} to acknowledge receipt
    """
    data = await request.json()

    # Validate user is allowlisted
    user_id = str(data.get("message", {}).get("from", {}).get("id", ""))
    if user_id not in TELEGRAM_ALLOWED_USERS:
        return {"ok": True}  # Silent ignore

    message = data.get("message", {})
    chat_id = message.get("chat", {}).get("id")

    if not chat_id:
        return {"ok": True}

    # Handle photo (largest size)
    if "photo" in message:
        photo = message["photo"][-1]
        file_id = photo["file_id"]
        await _process_file(file_id, chat_id, "image.jpg", user_id)
        return {"ok": True}

    # Handle document (PDF, etc.)
    if "document" in message:
        doc = message["document"]
        file_id = doc["file_id"]
        file_name = doc.get("file_name", "document.pdf")
        mime_type = doc.get("mime_type", "")

        # Validate supported types
        if not _is_supported_file(file_name, mime_type):
            await _send_message(
                chat_id,
                "📄 Please send PDF or image files only (PNG, JPG)."
            )
            return {"ok": True}

        await _process_file(file_id, chat_id, file_name, user_id)
        return {"ok": True}

    # Handle text commands
    if "text" in message:
        text = message["text"]
        if text == "/start":
            await _send_message(
                chat_id,
                "👋 Send me a document or photo and I'll create a briefing card!"
            )
        elif text == "/status":
            status = await _get_status()
            await _send_message(chat_id, status)
        else:
            await _send_message(
                chat_id,
                "📎 Send a PDF or image to generate a briefing card."
            )
        return {"ok": True}

    return {"ok": True}


def _is_supported_file(file_name: str, mime_type: str) -> bool:
    """Check if file type is supported."""
    supported_ext = [".pdf", ".png", ".jpg", ".jpeg"]
    supported_mime = [
        "application/pdf",
        "image/png",
        "image/jpeg",
        "image/jpg"
    ]

    has_supported_ext = any(file_name.lower().endswith(ext) for ext in supported_ext)
    has_supported_mime = mime_type in supported_mime

    return has_supported_ext or has_supported_mime


async def _process_file(file_id: str, chat_id: int, file_name: str, user_id: str):
    """Download file from Telegram, process, send briefing."""
    # Send "processing" message
    processing_msg = await _send_message(chat_id, "⏳ Processing document...")

    try:
        # Download file from Telegram
        file_bytes = await _download_telegram_file(file_id)

        # Build email_meta for pipeline
        email_meta = {
            "from": f"telegram:{user_id}",
            "subject": file_name,
            "date": "now",
            "to": "family",
            "body": ""
        }

        # Process through vision pipeline
        briefing = await process_attachment(email_meta, file_bytes, file_name)

        # Format and send response
        text = _format_briefing(briefing)
        await _send_message(chat_id, text, markdown=True)

    except Exception as e:
        await _send_message(
            chat_id,
            f"❌ Processing failed: {str(e)[:200]}"
        )


async def _download_telegram_file(file_id: str) -> bytes:
    """Download file from Telegram servers."""
    async with httpx.AsyncClient() as client:
        # Get file path
        resp = await client.get(
            f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile",
            params={"file_id": file_id}
        )
        resp.raise_for_status()
        result = resp.json()

        if not result.get("ok"):
            raise ValueError(f"Failed to get file: {result}")

        file_path = result["result"]["file_path"]

        # Download actual file
        file_resp = await client.get(
            f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}"
        )
        file_resp.raise_for_status()

        return file_resp.content


async def _send_message(chat_id: int, text: str, markdown: bool = False) -> dict:
    """Send message via Telegram Bot API."""
    payload = {
        "chat_id": chat_id,
        "text": text[:4000]  # Telegram limit
    }

    if markdown:
        payload["parse_mode"] = "Markdown"

    async with httpx.AsyncClient() as client:
        resp = await client.post(
            f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
            json=payload
        )
        resp.raise_for_status()
        return resp.json()


def _format_briefing(briefing: dict) -> str:
    """Format briefing card for Telegram (Markdown)."""
    lines = [
        f"📋 *{briefing.get('title', 'Briefing')}*",
        "",
        briefing.get('summary', 'No summary available.'),
        ""
    ]

    # Key details
    key_details = briefing.get('key_details', {})
    if key_details:
        lines.append("*Key Details:*")
        for key, value in key_details.items():
            lines.append(f"• {key.replace('_', ' ').title()}: {value}")
        lines.append("")

    # Conflicts
    conflicts = briefing.get('conflicts', [])
    if conflicts:
        lines.append("⚠️ *Conflicts:*")
        for conflict in conflicts:
            lines.append(f"• {conflict}")
        lines.append("")

    # Suggested actions
    actions = briefing.get('suggested_actions', [])
    if actions:
        lines.append("✅ *Suggested Actions:*")
        for action in actions:
            lines.append(f"• {action}")

    # Confidence
    confidence = briefing.get('confidence')
    if confidence:
        lines.append(f"")
        lines.append(f"_Confidence: {int(confidence * 100)}%_")

    return "\n".join(lines)


async def _get_status() -> str:
    """Get system status for /status command."""
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"{OLLAMA_BASE_URL}/api/tags", timeout=5.0)
            if resp.status_code == 200:
                return "✅ Icarus is online. Send a document to begin."
    except:
        pass
    return "⚠️ Vision service may be unavailable."

Component 3: API Endpoint

File: icarus/core/api.py (append)

from fastapi import Request
from icarus.core.telegram.handler import handle_telegram_update

@app.post("/telegram/webhook")
async def telegram_webhook(request: Request):
    """Receive updates from Telegram Bot.

    Set webhook via:
    https://api.telegram.org/bot<token>/setWebhook?url=https://icarus-test.hoffdesk.com/telegram/webhook
    """
    return await handle_telegram_update(request)


@app.get("/telegram/webhook")
async def telegram_webhook_info():
    """Info about webhook endpoint."""
    return {
        "status": "active",
        "endpoint": "/telegram/webhook",
        "method": "POST",
        "description": "Receive Telegram bot updates"
    }

DEPLOYMENT STEPS

1. Install httpx (if not present)

pip install --break-system-packages httpx
# Or: pip install httpx  # if in venv

2. Restart Service

sudo systemctl restart icarus-staging.service

3. Set Telegram Webhook

# Run once to configure webhook
BOT_TOKEN="8469114191:AAG6w4-uo4VZ8HV3d_wGUOf-KFuvJhSMCbw"
WEBHOOK_URL="https://icarus-test.hoffdesk.com/telegram/webhook"

curl -X POST "https://api.telegram.org/bot${BOT_TOKEN}/setWebhook" \
    -d "url=${WEBHOOK_URL}"

# Verify webhook is set
curl "https://api.telegram.org/bot${BOT_TOKEN}/getWebhookInfo"

Expected response:

{
  "ok": true,
  "result": {
    "url": "https://icarus-test.hoffdesk.com/telegram/webhook",
    "has_custom_certificate": false,
    "pending_update_count": 0
  }
}

4. Test

  1. Open Telegram → Find @IcarusTestBot
  2. Send /start
  3. Expected: "👋 Send me a document or photo..."
  4. Send a test image or PDF
  5. Expected: Briefing card in reply

COMMANDS

Command Description
/start Welcome message
/status Check system status
[any file] Process document through vision pipeline

📋 Dependency Management

Critical: Maintain shared/project-docs/icarus-requirements.txt

When Adding Dependencies:

  1. Update icarus-requirements.txt with package + version
  2. Test install: pip install --break-system-packages -r requirements.txt
  3. Document in commit message why it's needed
  4. Wadsworth validates during Phase reviews

Current Dependencies (as of Phase 3):

  • fastapi — API framework
  • uvicorn — ASGI server
  • httpx — HTTP client for Telegram API
  • pdfplumber — PDF text extraction
  • pdf2image — PDF to image conversion
  • Pillow — Image processing
  • python-multipart — Form data parsing (critical for file uploads)

System Dependencies:

  • poppler-utils — Required by pdf2image (install via apt)

VERIFICATION CHECKLIST

# 1. Webhook set
curl "https://api.telegram.org/bot8469114191:AAG6w4-uo4VZ8HV3d_wGUOf-KFuvJhSMCbw/getWebhookInfo"

# 2. Health endpoint still works
curl https://icarus-test.hoffdesk.com/health

# 3. Webhook info endpoint works
curl https://icarus-test.hoffdesk.com/telegram/webhook

# 4. Manual test via curl (simulate Telegram message)
curl -X POST https://icarus-test.hoffdesk.com/telegram/webhook \
  -H "Content-Type: application/json" \
  -d '{
    "message": {
      "from": {"id": 8386527252},
      "chat": {"id": 8386527252},
      "text": "/start"
    }
  }'

TROUBLESHOOTING

Issue Check
Bot not responding Webhook set? getWebhookInfo
"Processing failed" Vision model available? qwen3-vl:8b
File too large Telegram limit is 20MB
"Not supported" File extension in .pdf, .png, .jpg?
Icarus restart fails python-multipart installed?

SUCCESS CRITERIA

✅ Phase 4 Complete When:

  1. ✅ Telegram webhook configured
  2. /start command responds
  3. ✅ Image upload → briefing card
  4. ✅ PDF upload → briefing card
  5. ✅ Markdown formatting in responses
  6. ✅ Error handling (unsupported files, failures)

FUTURE ENHANCEMENTS (Phase 5+)

  • Inline buttons: "Add to calendar", "Mark done"
  • Multi-user support (Aundrea's ID, kids)
  • Conversation context (follow-up questions)
  • Voice message transcription

Execute. Build the bot handler. Deploy. Test with a real document.


Questions: @mention Wadsworth in The Hoffmann Board