"""Vision document parser — PDF/image → structured text (hybrid approach). Strategy: 1. Text PDFs: pdfplumber (fast, ~0.1s) 2. Scanned PDFs: qwen3-vl:8b vision model (1-3s per page) 3. Images: qwen3-vl:8b vision model """ import base64 import io from pathlib import Path from typing import Optional import httpx from icarus.core.config.staging import OLLAMA_BASE_URL from icarus.core.utils.model_gate import validate_ollama_request # Vision model priority: qwen3-vl:8b (available), fallback to qwen2.5-vl, then llava VISION_MODELS = ["qwen3-vl:8b", "qwen2.5-vl", "llava"] PRIMARY_VISION_MODEL = "qwen3-vl:8b" # Optional pdf2image for PDF → image conversion try: from pdf2image import convert_from_path PDF2IMAGE_AVAILABLE = True except ImportError: PDF2IMAGE_AVAILABLE = False convert_from_path = None def _extract_text_with_pdfplumber(file_path: Path) -> Optional[str]: """Extract text from PDF using pdfplumber (fast text extraction).""" try: import pdfplumber text_parts = [] with pdfplumber.open(file_path) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: text_parts.append(page_text) return "\n\n".join(text_parts) if text_parts else None except Exception as e: return None def _convert_pdf_page_to_image(file_path: Path, page_num: int = 0) -> bytes: """Convert PDF page to image bytes for vision model.""" if not PDF2IMAGE_AVAILABLE: raise ImportError("pdf2image not installed. Install with: pip install pdf2image") images = convert_from_path(file_path, first_page=page_num + 1, last_page=page_num + 1, dpi=150) if not images: raise ValueError(f"Could not convert page {page_num} to image") img = images[0] img_bytes = io.BytesIO() img.save(img_bytes, format='PNG') return img_bytes.getvalue() async def _parse_with_vision(file_path: Path, model: str = PRIMARY_VISION_MODEL) -> dict: """Parse document using vision model (qwen3-vl:8b or fallback).""" validate_ollama_request(model) # Convert file to base64 content = file_path.read_bytes() b64_content = base64.b64encode(content).decode() async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( f"{OLLAMA_BASE_URL}/api/chat", json={ "model": model, "messages": [{ "role": "user", "content": "Extract all text from this document. Preserve structure and layout. Return as plain text with clear section headers.", "images": [b64_content] }], "stream": False } ) response.raise_for_status() result = response.json() extracted_text = result.get("message", {}).get("content", "") return { "text": extracted_text, "method": f"vision-{model}", "confidence": 0.85, "pages": 1 } async def _parse_pdf_with_vision(file_path: Path, model: str = PRIMARY_VISION_MODEL) -> dict: """Parse PDF using vision model (convert pages to images).""" if not PDF2IMAGE_AVAILABLE: # Fallback: try to extract text with pdfplumber, or error text = _extract_text_with_pdfplumber(file_path) if text: return { "text": text, "method": "pdfplumber-fallback", "confidence": 0.70, "pages": 1 } raise ImportError("pdf2image required for scanned PDFs. Install with: pip install pdf2image") validate_ollama_request(model) try: import pdfplumber num_pages = len(pdfplumber.open(file_path).pages) except: num_pages = 1 # Process up to first 5 pages (limit for performance) pages_to_process = min(num_pages, 5) all_text = [] for page_num in range(pages_to_process): try: img_bytes = _convert_pdf_page_to_image(file_path, page_num) b64_content = base64.b64encode(img_bytes).decode() async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( f"{OLLAMA_BASE_URL}/api/chat", json={ "model": model, "messages": [{ "role": "user", "content": f"Extract all text from page {page_num + 1}. Preserve structure. Return as plain text.", "images": [b64_content] }], "stream": False } ) response.raise_for_status() result = response.json() page_text = result.get("message", {}).get("content", "") if page_text.strip(): all_text.append(f"--- Page {page_num + 1} ---\n{page_text}") except Exception as e: all_text.append(f"--- Page {page_num + 1} ---\n[Error processing page: {e}]") full_text = "\n\n".join(all_text) if num_pages > pages_to_process: full_text += f"\n\n[Note: {num_pages - pages_to_process} additional pages not processed]" return { "text": full_text, "method": f"vision-pdf-{model}", "confidence": 0.80, "pages": pages_to_process, "total_pages": num_pages } async def parse_document(file_path: Path) -> dict: """ Parse a PDF or image document via hybrid approach. Strategy: - Text PDFs: pdfplumber (fast) - Scanned PDFs: vision model (qwen3-vl:8b) - Images: vision model Returns: { "text": "extracted text content", "method": "pdfplumber|vision-qwen3-vl:8b|vision-pdf-...", "confidence": 0.85, "pages": 1, "total_pages": N # For PDFs } """ suffix = file_path.suffix.lower() if suffix == ".pdf": # Try text extraction first (faster) text = _extract_text_with_pdfplumber(file_path) if text and len(text.strip()) > 50: try: import pdfplumber num_pages = len(pdfplumber.open(file_path).pages) except: num_pages = 1 return { "text": text, "method": "pdfplumber", "confidence": 0.95, "pages": num_pages, "total_pages": num_pages } # Fall back to vision for scanned PDFs return await _parse_pdf_with_vision(file_path) elif suffix in [".png", ".jpg", ".jpeg", ".gif", ".webp"]: return await _parse_with_vision(file_path) else: raise ValueError(f"Unsupported file type: {suffix}") # Convenience function for testing async def parse_document_bytes(content: bytes, filename: str) -> dict: """Parse document from bytes (for API uploads).""" import tempfile suffix = Path(filename).suffix with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: tmp.write(content) tmp_path = Path(tmp.name) try: return await parse_document(tmp_path) finally: tmp_path.unlink(missing_ok=True)