"""Drop-Box Document Sorter — OCR + LLM + Drive filing. Turns photos of physical documents (bills, invoices, receipts) into properly named files on Google Drive. Zero cloud vision — all OCR runs locally on the Gaming PC's 3080 Ti via qwen3-vl:8b. Flow: 1. Image arrives in Telegram DM 2. Save to /tmp/dropbox/ (ephemeral, deleted after upload) 3. Base64 encode → send to Gaming PC Ollama qwen3-vl:8b 4. LLM extracts {vendor, date, category, amount} 5. Build filename: YYYY-MM-DD_Vendor_Category_$Amount.ext 6. Upload to Google Drive: Family Documents/YYYY/ 7. Delete local file (finally block, always runs) 8. React 👍 on Telegram message Security: - No PII leaves the network except the Drive upload (family-owned) - /tmp files deleted in finally block regardless of outcome - Vision model unloads immediately (keep_alive: 0) to free VRAM """ import base64 import json import os import re import subprocess import sys from datetime import datetime from pathlib import Path from zoneinfo import ZoneInfo import requests from family_assistant.config import ( CHICAGO_TZ, LLM_TIMEOUT, DRIVE_FOLDER_ID, DRIVE_SA_KEY_PATH, VISION_LLM_URL, VISION_MODEL, ) # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- # Google Drive folder ID (set in .env after sharing with service account) # DRIVE_FOLDER_ID — imported from config.py # Service account key path — imported from config.py # DRIVE_SA_KEY_PATH — imported from config.py # Vision LLM URL and model — imported from config.py # VISION_LLM_URL, VISION_MODEL # Ephemeral temp directory — deleted after upload TEMP_DIR = Path(os.environ.get("DROPBOX_TEMP_DIR", "/tmp/dropbox")) # Extraction prompt — strict JSON output EXTRACTION_PROMPT = """You are a document OCR assistant. Extract structured data from this document photo. Return ONLY a JSON object with these fields: - vendor: The business/organization name (SHORTENED to 2-3 words max, no LLC/Inc suffixes, hyphens instead of spaces, title case) - date: The document date in YYYY-MM-DD format (use service date, not due date) - category: One word category from: Medical, Vet, Lawn, Home, Auto, Insurance, Tax, School, Other - amount: The total amount as a number (no $ symbol, no commas). If no amount, use 0. Examples of good vendor names: - Aurora Health Care → "Aurora-Health" - Lizer Lawn Service → "Lizer-Lawn" - Golrusk Pet Care Center → "Golrusk-Pet" - Pick N Save → "Pick-N-Save" If the document has no clear vendor, date, or amount, use empty string or 0. Return ONLY the JSON object, no other text.""" # --------------------------------------------------------------------------- # Vision LLM Call # --------------------------------------------------------------------------- def _call_vision_llm(image_bytes: bytes, prompt: str) -> dict | None: """Send image to local Ollama vision model for extraction. Uses keep_alive: 0 to immediately unload the vision model after inference, freeing VRAM for the primary text model on the 3080 Ti. """ img_b64 = base64.b64encode(image_bytes).decode("utf-8") payload = { "model": VISION_MODEL, "messages": [ { "role": "user", "content": prompt, "images": [img_b64], } ], "stream": False, "keep_alive": 0, # Unload immediately after inference "options": { "temperature": 0, }, } try: resp = requests.post(VISION_LLM_URL, json=payload, timeout=120) resp.raise_for_status() data = resp.json() content = data.get("message", {}).get("content", "").strip() return _parse_extraction_json(content) except requests.exceptions.Timeout: print(" [DropBox] Vision LLM timeout (120s)", file=sys.stderr) return None except requests.exceptions.ConnectionError: print(f" [DropBox] Vision LLM connection failed: {VISION_LLM_URL}", file=sys.stderr) return None except Exception as e: print(f" [DropBox] Vision LLM error: {e}", file=sys.stderr) return None def _parse_extraction_json(text: str) -> dict | None: """Parse JSON from LLM response, handling code fences and noise.""" if not text: return None # Strip markdown code fences text = text.strip() if text.startswith("```"): text = re.sub(r'^```(?:json)?\s*\n?', '', text) text = re.sub(r'\n?```\s*$', '', text) text = text.strip() try: result = json.loads(text) if isinstance(result, dict): return result except json.JSONDecodeError: # Try to find JSON object in response match = re.search(r'\{.*\}', text, re.DOTALL) if match: try: result = json.loads(match.group(0)) if isinstance(result, dict): return result except json.JSONDecodeError: pass print(f" [DropBox] Could not parse extraction JSON: {text[:200]}", file=sys.stderr) return None # --------------------------------------------------------------------------- # Filename Builder # --------------------------------------------------------------------------- # Characters not allowed in Drive filenames _UNSAFE_CHARS = re.compile(r'[/:*?"<>|\\]') def build_filename(extraction: dict, original_ext: str = ".jpg") -> str: """Build a strict taxonomy filename from extraction data. Format: YYYY-MM-DD_Vendor_Category_$Amount.ext Example: 2026-04-17_Lizer-Lawn_Aeration_$150.jpg """ date_str = extraction.get("date", "") if date_str: # Validate format try: datetime.strptime(date_str, "%Y-%m-%d") except (ValueError, TypeError): date_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d") else: date_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d") vendor = str(extraction.get("vendor", "Unknown")).strip() if not vendor: vendor = "Unknown" # Clean vendor: remove suffixes, title case, replace spaces with hyphens vendor = re.sub(r'\b(LLC|Inc|Corp|Co|Ltd)\b\.?', '', vendor, flags=re.IGNORECASE).strip() # Take first 2-3 meaningful words max (avoid long names like "Lizer-Lawn-Service") words = vendor.split() if len(words) > 3: vendor = ' '.join(words[:3]) vendor = re.sub(r'\s+', '-', vendor) vendor = _UNSAFE_CHARS.sub('', vendor) category = str(extraction.get("category", "Other")).strip().title() if not category: category = "Other" category = _UNSAFE_CHARS.sub('', category) amount = extraction.get("amount", 0) try: amount = float(amount) except (ValueError, TypeError): amount = 0.0 # Build amount string if amount > 0: amount_str = f"${amount:.0f}" if amount == int(amount) else f"${amount:.2f}" amount_str = amount_str.replace(",", "") else: amount_str = "NoAmount" # Ensure extension starts with dot if not original_ext.startswith("."): original_ext = f".{original_ext}" filename = f"{date_str}_{vendor}_{category}_{amount_str}{original_ext}" return filename # --------------------------------------------------------------------------- # Google Drive Upload # --------------------------------------------------------------------------- def _get_drive_service(): """Get an authenticated Google Drive service using the service account.""" from google.oauth2 import service_account from googleapiclient.discovery import build key_path = Path(DRIVE_SA_KEY_PATH) if not key_path.is_file(): raise FileNotFoundError(f"Service account key not found: {key_path}") credentials = service_account.Credentials.from_service_account_file( str(key_path), scopes=["https://www.googleapis.com/auth/drive.file"], ) return build("drive", "v3", credentials=credentials, cache_discovery=False) def _get_or_create_year_folder(service, year: int) -> str: """Get or create a year subfolder under the root Drive folder. Returns the folder ID. """ if not DRIVE_FOLDER_ID: raise ValueError("DRIVE_FOLDER_ID not set in environment") year_name = str(year) # Check if year folder already exists query = ( f"'{DRIVE_FOLDER_ID}' in parents and " f"name='{year_name}' and " f"mimeType='application/vnd.google-apps.folder' and " f"trashed=false" ) results = service.files().list(q=query, spaces="drive", fields="files(id, name)").execute() folders = results.get("files", []) if folders: return folders[0]["id"] # Create year folder folder_metadata = { "name": year_name, "mimeType": "application/vnd.google-apps.folder", "parents": [DRIVE_FOLDER_ID], } folder = service.files().create(body=folder_metadata, fields="id").execute() print(f" [DropBox] Created Drive folder: {year_name} (id={folder['id']})", file=sys.stderr) return folder["id"] def upload_to_drive(file_path: Path, filename: str) -> dict | None: """Upload a file to Google Drive under the appropriate year folder. Returns the Drive file metadata dict, or None on failure. """ from googleapiclient.http import MediaFileUpload try: service = _get_drive_service() # Determine year from filename prefix (YYYY-MM-DD) year = int(filename[:4]) if len(filename) >= 4 and filename[:4].isdigit() else datetime.now(CHICAGO_TZ).year folder_id = _get_or_create_year_folder(service, year) # Upload file_metadata = { "name": filename, "parents": [folder_id], } media = MediaFileUpload(str(file_path), resumable=True) drive_file = service.files().create( body=file_metadata, media_body=media, fields="id, name, webViewLink", ).execute() print(f" [DropBox] Uploaded: {filename} → Drive (id={drive_file['id']})", file=sys.stderr) return drive_file except Exception as e: print(f" [DropBox] Drive upload failed: {e}", file=sys.stderr) return None # --------------------------------------------------------------------------- # Telegram React # --------------------------------------------------------------------------- def react_thumbs_up(message_id: str | int, target: str = "") -> bool: """React with 👍 on the source Telegram message.""" target = target or os.environ.get("TELEGRAM_CHAT_ID", "") if not target: print(" [DropBox] No Telegram target for reaction", file=sys.stderr) return False cmd = [ "openclaw", "message", "react", "--channel", "telegram", "--target", target, "--message-id", str(message_id), "--emoji", "👍", ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) return result.returncode == 0 except Exception as e: print(f" [DropBox] React failed: {e}", file=sys.stderr) return False # --------------------------------------------------------------------------- # Main Pipeline # --------------------------------------------------------------------------- def process_document( image_bytes: bytes, original_filename: str = "document.jpg", message_id: str | int = "", telegram_target: str = "", ) -> dict: """Full pipeline: extract → rename → upload → cleanup. Args: image_bytes: Raw image bytes from Telegram original_filename: Original filename for extension detection message_id: Telegram message ID for 👍 reaction telegram_target: Telegram chat ID for reaction Returns: Dict with status, filename, drive_link, and any errors. """ result = { "status": "pending", "filename": "", "drive_link": "", "extraction": {}, "error": "", } temp_file = None try: # 1. Save to ephemeral temp TEMP_DIR.mkdir(parents=True, exist_ok=True) ext = Path(original_filename).suffix or ".jpg" temp_file = TEMP_DIR / f"doc_{datetime.now().strftime('%Y%m%d_%H%M%S')}{ext}" temp_file.write_bytes(image_bytes) print(f" [DropBox] Saved temp: {temp_file} ({len(image_bytes)} bytes)", file=sys.stderr) # 2. OCR extraction via local vision model print(f" [DropBox] Sending to {VISION_MODEL} for OCR...", file=sys.stderr) extraction = _call_vision_llm(image_bytes, EXTRACTION_PROMPT) if not extraction: result["status"] = "extraction_failed" result["error"] = "Vision LLM returned no usable data" return result result["extraction"] = extraction print(f" [DropBox] Extracted: {extraction}", file=sys.stderr) # 3. Build filename filename = build_filename(extraction, ext) result["filename"] = filename print(f" [DropBox] Filename: {filename}", file=sys.stderr) # 4. Upload to Drive if not DRIVE_FOLDER_ID: result["status"] = "no_drive_config" result["error"] = "DRIVE_FOLDER_ID not set" return result drive_result = upload_to_drive(temp_file, filename) if drive_result: result["status"] = "filed" result["drive_link"] = drive_result.get("webViewLink", "") else: result["status"] = "upload_failed" result["error"] = "Drive upload failed" return result # 5. React 👍 if message_id: react_thumbs_up(message_id, telegram_target) return result except Exception as e: result["status"] = "error" result["error"] = str(e) print(f" [DropBox] Pipeline error: {e}", file=sys.stderr) return result finally: # 6. Ephemeral cleanup — ALWAYS delete local PII if temp_file and temp_file.exists(): try: os.remove(temp_file) print(f" [DropBox] Deleted temp: {temp_file}", file=sys.stderr) except OSError as e: print(f" [DropBox] Failed to delete temp file: {e}", file=sys.stderr) def process_document_from_file(file_path: str, message_id: str = "") -> dict: """Convenience wrapper: process a document from a local file path.""" path = Path(file_path) if not path.is_file(): return {"status": "error", "error": f"File not found: {file_path}"} return process_document( image_bytes=path.read_bytes(), original_filename=path.name, message_id=message_id, ) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def cli_process(args): """Handle: family-assistant dropbox --file photo.jpg""" if args.file: result = process_document_from_file(args.file, message_id=getattr(args, "message_id", "")) elif args.test: # Dry-run: test extraction without uploading if not Path(args.test).is_file(): print(f"File not found: {args.test}") sys.exit(1) image_bytes = Path(args.test).read_bytes() extraction = _call_vision_llm(image_bytes, EXTRACTION_PROMPT) if extraction: filename = build_filename(extraction, Path(args.test).suffix or ".jpg") print(f"Extraction: {json.dumps(extraction, indent=2)}") print(f"Filename: {filename}") else: print("❌ Extraction failed") else: print("Usage: family-assistant dropbox --file photo.jpg") print(" family-assistant dropbox --test photo.jpg (dry-run, no upload)") sys.exit(1) def cli_drive_setup(args): """Handle: family-assistant drive-setup""" if not DRIVE_FOLDER_ID: print("❌ DRIVE_FOLDER_ID not set in .env") print(" 1. Enable Google Drive API in GCP") print(" 2. Create a 'Family Documents' folder in Drive") print(" 3. Share the folder with the service account email") print(" 4. Add DRIVE_FOLDER_ID= to scripts/.env") return try: service = _get_drive_service() # Test access by listing the folder folder = service.files().get(fileId=DRIVE_FOLDER_ID, fields="id, name").execute() print(f"✅ Drive connected: \"{folder['name']}\" (id={folder['id']})") # Check/create current year subfolder year = datetime.now(CHICAGO_TZ).year year_id = _get_or_create_year_folder(service, year) print(f"✅ Year folder ready: {year}/ (id={year_id})") except Exception as e: print(f"❌ Drive setup failed: {e}")