"""Vision document parser — PDF/image → structured text (hybrid approach).
Strategy:
1. Text PDFs: pdfplumber (fast, ~0.1s)
2. Scanned PDFs: qwen3-vl:8b vision model (1-3s per page)
3. Images: qwen3-vl:8b vision model
"""
import base64
import io
from pathlib import Path
from typing import Optional
import httpx
from icarus.core.config.staging import OLLAMA_BASE_URL
from icarus.core.utils.model_gate import validate_ollama_request
Vision model priority: qwen3-vl:8b (available), fallback to qwen2.5-vl, then llava
VISION_MODELS = ["qwen3-vl:8b", "qwen2.5-vl", "llava"]
PRIMARY_VISION_MODEL = "qwen3-vl:8b"
Optional pdf2image for PDF → image conversion
try:
from pdf2image import convert_from_path
PDF2IMAGE_AVAILABLE = True
except ImportError:
PDF2IMAGE_AVAILABLE = False
convert_from_path = None
def _extract_text_with_pdfplumber(file_path: Path) -> Optional[str]:
"""Extract text from PDF using pdfplumber (fast text extraction)."""
try:
import pdfplumber
text_parts = []
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text_parts.append(page_text)
return "\n\n".join(text_parts) if text_parts else None
except Exception as e:
return None
def _convert_pdf_page_to_image(file_path: Path, page_num: int = 0) -> bytes:
"""Convert PDF page to image bytes for vision model."""
if not PDF2IMAGE_AVAILABLE:
raise ImportError("pdf2image not installed. Install with: pip install pdf2image")
images = convert_from_path(file_path, first_page=page_num + 1, last_page=page_num + 1, dpi=150)
if not images:
raise ValueError(f"Could not convert page {page_num} to image")
img = images[0]
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
return img_bytes.getvalue()
async def _parse_with_vision(file_path: Path, model: str = PRIMARY_VISION_MODEL) -> dict:
"""Parse document using vision model (qwen3-vl:8b or fallback)."""
validate_ollama_request(model)
# Convert file to base64
content = file_path.read_bytes()
b64_content = base64.b64encode(content).decode()
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{OLLAMA_BASE_URL}/api/chat",
json={
"model": model,
"messages": [{
"role": "user",
"content": "Extract all text from this document. Preserve structure and layout. Return as plain text with clear section headers.",
"images": [b64_content]
}],
"stream": False
}
)
response.raise_for_status()
result = response.json()
extracted_text = result.get("message", {}).get("content", "")
return {
"text": extracted_text,
"method": f"vision-{model}",
"confidence": 0.85,
"pages": 1
}
async def _parse_pdf_with_vision(file_path: Path, model: str = PRIMARY_VISION_MODEL) -> dict:
"""Parse PDF using vision model (convert pages to images)."""
if not PDF2IMAGE_AVAILABLE:
# Fallback: try to extract text with pdfplumber, or error
text = _extract_text_with_pdfplumber(file_path)
if text:
return {
"text": text,
"method": "pdfplumber-fallback",
"confidence": 0.70,
"pages": 1
}
raise ImportError("pdf2image required for scanned PDFs. Install with: pip install pdf2image")
validate_ollama_request(model)
try:
import pdfplumber
num_pages = len(pdfplumber.open(file_path).pages)
except:
num_pages = 1
# Process up to first 5 pages (limit for performance)
pages_to_process = min(num_pages, 5)
all_text = []
for page_num in range(pages_to_process):
try:
img_bytes = _convert_pdf_page_to_image(file_path, page_num)
b64_content = base64.b64encode(img_bytes).decode()
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{OLLAMA_BASE_URL}/api/chat",
json={
"model": model,
"messages": [{
"role": "user",
"content": f"Extract all text from page {page_num + 1}. Preserve structure. Return as plain text.",
"images": [b64_content]
}],
"stream": False
}
)
response.raise_for_status()
result = response.json()
page_text = result.get("message", {}).get("content", "")
if page_text.strip():
all_text.append(f"--- Page {page_num + 1} ---\n{page_text}")
except Exception as e:
all_text.append(f"--- Page {page_num + 1} ---\n[Error processing page: {e}]")
full_text = "\n\n".join(all_text)
if num_pages > pages_to_process:
full_text += f"\n\n[Note: {num_pages - pages_to_process} additional pages not processed]"
return {
"text": full_text,
"method": f"vision-pdf-{model}",
"confidence": 0.80,
"pages": pages_to_process,
"total_pages": num_pages
}
async def parse_document(file_path: Path) -> dict:
"""
Parse a PDF or image document via hybrid approach.
Strategy:
- Text PDFs: pdfplumber (fast)
- Scanned PDFs: vision model (qwen3-vl:8b)
- Images: vision model
Returns:
{
"text": "extracted text content",
"method": "pdfplumber|vision-qwen3-vl:8b|vision-pdf-...",
"confidence": 0.85,
"pages": 1,
"total_pages": N # For PDFs
}
"""
suffix = file_path.suffix.lower()
if suffix == ".pdf":
# Try text extraction first (faster)
text = _extract_text_with_pdfplumber(file_path)
if text and len(text.strip()) > 50:
try:
import pdfplumber
num_pages = len(pdfplumber.open(file_path).pages)
except:
num_pages = 1
return {
"text": text,
"method": "pdfplumber",
"confidence": 0.95,
"pages": num_pages,
"total_pages": num_pages
}
# Fall back to vision for scanned PDFs
return await _parse_pdf_with_vision(file_path)
elif suffix in [".png", ".jpg", ".jpeg", ".gif", ".webp"]:
return await _parse_with_vision(file_path)
else:
raise ValueError(f"Unsupported file type: {suffix}")
Convenience function for testing
async def parse_document_bytes(content: bytes, filename: str) -> dict:
"""Parse document from bytes (for API uploads)."""
import tempfile
suffix = Path(filename).suffix
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
try:
return await parse_document(tmp_path)
finally:
tmp_path.unlink(missing_ok=True)