📄 parser.py 7,491 bytes Apr 25, 2026 📋 Raw

"""Vision document parser — PDF/image → structured text (hybrid approach).

Strategy:
1. Text PDFs: pdfplumber (fast, ~0.1s)
2. Scanned PDFs: qwen3-vl:8b vision model (1-3s per page)
3. Images: qwen3-vl:8b vision model
"""

import base64
import io
from pathlib import Path
from typing import Optional

import httpx

from icarus.core.config.staging import OLLAMA_BASE_URL
from icarus.core.utils.model_gate import validate_ollama_request

Vision model priority: qwen3-vl:8b (available), fallback to qwen2.5-vl, then llava

VISION_MODELS = ["qwen3-vl:8b", "qwen2.5-vl", "llava"]
PRIMARY_VISION_MODEL = "qwen3-vl:8b"

Optional pdf2image for PDF → image conversion

try:
from pdf2image import convert_from_path
PDF2IMAGE_AVAILABLE = True
except ImportError:
PDF2IMAGE_AVAILABLE = False
convert_from_path = None

def _extract_text_with_pdfplumber(file_path: Path) -> Optional[str]:
"""Extract text from PDF using pdfplumber (fast text extraction)."""
try:
import pdfplumber

    text_parts = []
    with pdfplumber.open(file_path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                text_parts.append(page_text)

    return "\n\n".join(text_parts) if text_parts else None

except Exception as e:
    return None

def _convert_pdf_page_to_image(file_path: Path, page_num: int = 0) -> bytes:
"""Convert PDF page to image bytes for vision model."""
if not PDF2IMAGE_AVAILABLE:
raise ImportError("pdf2image not installed. Install with: pip install pdf2image")

images = convert_from_path(file_path, first_page=page_num + 1, last_page=page_num + 1, dpi=150)
if not images:
    raise ValueError(f"Could not convert page {page_num} to image")

img = images[0]
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
return img_bytes.getvalue()

async def _parse_with_vision(file_path: Path, model: str = PRIMARY_VISION_MODEL) -> dict:
"""Parse document using vision model (qwen3-vl:8b or fallback)."""
validate_ollama_request(model)

# Convert file to base64
content = file_path.read_bytes()
b64_content = base64.b64encode(content).decode()

async with httpx.AsyncClient(timeout=60.0) as client:
    response = await client.post(
        f"{OLLAMA_BASE_URL}/api/chat",
        json={
            "model": model,
            "messages": [{
                "role": "user",
                "content": "Extract all text from this document. Preserve structure and layout. Return as plain text with clear section headers.",
                "images": [b64_content]
            }],
            "stream": False
        }
    )
    response.raise_for_status()
    result = response.json()

extracted_text = result.get("message", {}).get("content", "")

return {
    "text": extracted_text,
    "method": f"vision-{model}",
    "confidence": 0.85,
    "pages": 1
}

async def _parse_pdf_with_vision(file_path: Path, model: str = PRIMARY_VISION_MODEL) -> dict:
"""Parse PDF using vision model (convert pages to images)."""
if not PDF2IMAGE_AVAILABLE:
# Fallback: try to extract text with pdfplumber, or error
text = _extract_text_with_pdfplumber(file_path)
if text:
return {
"text": text,
"method": "pdfplumber-fallback",
"confidence": 0.70,
"pages": 1
}
raise ImportError("pdf2image required for scanned PDFs. Install with: pip install pdf2image")

validate_ollama_request(model)

try:
    import pdfplumber
    num_pages = len(pdfplumber.open(file_path).pages)
except:
    num_pages = 1

# Process up to first 5 pages (limit for performance)
pages_to_process = min(num_pages, 5)
all_text = []

for page_num in range(pages_to_process):
    try:
        img_bytes = _convert_pdf_page_to_image(file_path, page_num)
        b64_content = base64.b64encode(img_bytes).decode()

        async with httpx.AsyncClient(timeout=60.0) as client:
            response = await client.post(
                f"{OLLAMA_BASE_URL}/api/chat",
                json={
                    "model": model,
                    "messages": [{
                        "role": "user",
                        "content": f"Extract all text from page {page_num + 1}. Preserve structure. Return as plain text.",
                        "images": [b64_content]
                    }],
                    "stream": False
                }
            )
            response.raise_for_status()
            result = response.json()
            page_text = result.get("message", {}).get("content", "")
            if page_text.strip():
                all_text.append(f"--- Page {page_num + 1} ---\n{page_text}")

    except Exception as e:
        all_text.append(f"--- Page {page_num + 1} ---\n[Error processing page: {e}]")

full_text = "\n\n".join(all_text)
if num_pages > pages_to_process:
    full_text += f"\n\n[Note: {num_pages - pages_to_process} additional pages not processed]"

return {
    "text": full_text,
    "method": f"vision-pdf-{model}",
    "confidence": 0.80,
    "pages": pages_to_process,
    "total_pages": num_pages
}

async def parse_document(file_path: Path) -> dict:
"""
Parse a PDF or image document via hybrid approach.

Strategy:
- Text PDFs: pdfplumber (fast)
- Scanned PDFs: vision model (qwen3-vl:8b)
- Images: vision model

Returns:
    {
        "text": "extracted text content",
        "method": "pdfplumber|vision-qwen3-vl:8b|vision-pdf-...",
        "confidence": 0.85,
        "pages": 1,
        "total_pages": N  # For PDFs
    }
"""
suffix = file_path.suffix.lower()

if suffix == ".pdf":
    # Try text extraction first (faster)
    text = _extract_text_with_pdfplumber(file_path)
    if text and len(text.strip()) > 50:
        try:
            import pdfplumber
            num_pages = len(pdfplumber.open(file_path).pages)
        except:
            num_pages = 1

        return {
            "text": text,
            "method": "pdfplumber",
            "confidence": 0.95,
            "pages": num_pages,
            "total_pages": num_pages
        }

    # Fall back to vision for scanned PDFs
    return await _parse_pdf_with_vision(file_path)

elif suffix in [".png", ".jpg", ".jpeg", ".gif", ".webp"]:
    return await _parse_with_vision(file_path)

else:
    raise ValueError(f"Unsupported file type: {suffix}")

Convenience function for testing

async def parse_document_bytes(content: bytes, filename: str) -> dict:
"""Parse document from bytes (for API uploads)."""
import tempfile

suffix = Path(filename).suffix
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
    tmp.write(content)
    tmp_path = Path(tmp.name)

try:
    return await parse_document(tmp_path)
finally:
    tmp_path.unlink(missing_ok=True)