"""Drop-Box Document Sorter — OCR + LLM + Drive filing.
Turns photos of physical documents (bills, invoices, receipts) into
properly named files on Google Drive. Zero cloud vision — all OCR
runs locally on the Gaming PC's 3080 Ti via qwen3-vl:8b.
Flow:
1. Image arrives in Telegram DM
2. Save to /tmp/dropbox/ (ephemeral, deleted after upload)
3. Base64 encode → send to Gaming PC Ollama qwen3-vl:8b
4. LLM extracts {vendor, date, category, amount}
5. Build filename: YYYY-MM-DD_Vendor_Category_$Amount.ext
6. Upload to Google Drive: Family Documents/YYYY/
7. Delete local file (finally block, always runs)
8. React 👍 on Telegram message
Security:
- No PII leaves the network except the Drive upload (family-owned)
- /tmp files deleted in finally block regardless of outcome
- Vision model unloads immediately (keep_alive: 0) to free VRAM
"""
import base64
import json
import os
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
import requests
from icarus.core.config import (
CHICAGO_TZ,
LLM_TIMEOUT,
DRIVE_FOLDER_ID,
DRIVE_SA_KEY_PATH,
VISION_LLM_URL,
VISION_MODEL,
)
---------------------------------------------------------------------------
Config
---------------------------------------------------------------------------
Google Drive folder ID (set in .env after sharing with service account)
DRIVE_FOLDER_ID — imported from config.py
Service account key path — imported from config.py
DRIVE_SA_KEY_PATH — imported from config.py
Vision LLM URL and model — imported from config.py
VISION_LLM_URL, VISION_MODEL
Ephemeral temp directory — deleted after upload
TEMP_DIR = Path(os.environ.get("DROPBOX_TEMP_DIR", "/tmp/dropbox"))
Extraction prompt — strict JSON output
EXTRACTION_PROMPT = """You are a document OCR assistant. Extract structured data from this document photo.
Return ONLY a JSON object with these fields:
- vendor: The business/organization name (SHORTENED to 2-3 words max, no LLC/Inc suffixes, hyphens instead of spaces, title case)
- date: The document date in YYYY-MM-DD format (use service date, not due date)
- category: One word category from: Medical, Vet, Lawn, Home, Auto, Insurance, Tax, School, Other
- amount: The total amount as a number (no $ symbol, no commas). If no amount, use 0.
Examples of good vendor names:
- Aurora Health Care → "Aurora-Health"
- Lizer Lawn Service → "Lizer-Lawn"
- Golrusk Pet Care Center → "Golrusk-Pet"
- Pick N Save → "Pick-N-Save"
If the document has no clear vendor, date, or amount, use empty string or 0.
Return ONLY the JSON object, no other text."""
---------------------------------------------------------------------------
Vision LLM Call
---------------------------------------------------------------------------
def _call_vision_llm(image_bytes: bytes, prompt: str) -> dict | None:
"""Send image to local Ollama vision model for extraction.
Uses keep_alive: 0 to immediately unload the vision model after
inference, freeing VRAM for the primary text model on the 3080 Ti.
"""
img_b64 = base64.b64encode(image_bytes).decode("utf-8")
payload = {
"model": VISION_MODEL,
"messages": [
{
"role": "user",
"content": prompt,
"images": [img_b64],
}
],
"stream": False,
"keep_alive": 0, # Unload immediately after inference
"options": {
"temperature": 0,
},
}
try:
resp = requests.post(VISION_LLM_URL, json=payload, timeout=120)
resp.raise_for_status()
data = resp.json()
content = data.get("message", {}).get("content", "").strip()
return _parse_extraction_json(content)
except requests.exceptions.Timeout:
print(" [DropBox] Vision LLM timeout (120s)", file=sys.stderr)
return None
except requests.exceptions.ConnectionError:
print(f" [DropBox] Vision LLM connection failed: {VISION_LLM_URL}", file=sys.stderr)
return None
except Exception as e:
print(f" [DropBox] Vision LLM error: {e}", file=sys.stderr)
return None
def _parse_extraction_json(text: str) -> dict | None:
"""Parse JSON from LLM response, handling code fences and noise."""
if not text:
return None
# Strip markdown code fences
text = text.strip()
if text.startswith("```"):
text = re.sub(r'^```(?:json)?\s*\n?', '', text)
text = re.sub(r'\n?```\s*$', '', text)
text = text.strip()
try:
result = json.loads(text)
if isinstance(result, dict):
return result
except json.JSONDecodeError:
# Try to find JSON object in response
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
try:
result = json.loads(match.group(0))
if isinstance(result, dict):
return result
except json.JSONDecodeError:
pass
print(f" [DropBox] Could not parse extraction JSON: {text[:200]}", file=sys.stderr)
return None
---------------------------------------------------------------------------
Filename Builder
---------------------------------------------------------------------------
Characters not allowed in Drive filenames
_UNSAFE_CHARS = re.compile(r'[/:*?"<>|\]')
def build_filename(extraction: dict, original_ext: str = ".jpg") -> str:
"""Build a strict taxonomy filename from extraction data.
Format: YYYY-MM-DD_Vendor_Category_$Amount.ext
Example: 2026-04-17_Lizer-Lawn_Aeration_$150.jpg
"""
date_str = extraction.get("date", "")
if date_str:
# Validate format
try:
datetime.strptime(date_str, "%Y-%m-%d")
except (ValueError, TypeError):
date_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d")
else:
date_str = datetime.now(CHICAGO_TZ).strftime("%Y-%m-%d")
vendor = str(extraction.get("vendor", "Unknown")).strip()
if not vendor:
vendor = "Unknown"
# Clean vendor: remove suffixes, title case, replace spaces with hyphens
vendor = re.sub(r'\b(LLC|Inc|Corp|Co|Ltd)\b\.?', '', vendor, flags=re.IGNORECASE).strip()
# Take first 2-3 meaningful words max (avoid long names like "Lizer-Lawn-Service")
words = vendor.split()
if len(words) > 3:
vendor = ' '.join(words[:3])
vendor = re.sub(r'\s+', '-', vendor)
vendor = _UNSAFE_CHARS.sub('', vendor)
category = str(extraction.get("category", "Other")).strip().title()
if not category:
category = "Other"
category = _UNSAFE_CHARS.sub('', category)
amount = extraction.get("amount", 0)
try:
amount = float(amount)
except (ValueError, TypeError):
amount = 0.0
# Build amount string
if amount > 0:
amount_str = f"${amount:.0f}" if amount == int(amount) else f"${amount:.2f}"
amount_str = amount_str.replace(",", "")
else:
amount_str = "NoAmount"
# Ensure extension starts with dot
if not original_ext.startswith("."):
original_ext = f".{original_ext}"
filename = f"{date_str}_{vendor}_{category}_{amount_str}{original_ext}"
return filename
---------------------------------------------------------------------------
Google Drive Upload
---------------------------------------------------------------------------
def _get_drive_service():
"""Get an authenticated Google Drive service using the service account."""
from google.oauth2 import service_account
from googleapiclient.discovery import build
key_path = Path(DRIVE_SA_KEY_PATH)
if not key_path.is_file():
raise FileNotFoundError(f"Service account key not found: {key_path}")
credentials = service_account.Credentials.from_service_account_file(
str(key_path),
scopes=["https://www.googleapis.com/auth/drive.file"],
)
return build("drive", "v3", credentials=credentials, cache_discovery=False)
def _get_or_create_year_folder(service, year: int) -> str:
"""Get or create a year subfolder under the root Drive folder.
Returns the folder ID.
"""
if not DRIVE_FOLDER_ID:
raise ValueError("DRIVE_FOLDER_ID not set in environment")
year_name = str(year)
# Check if year folder already exists
query = (
f"'{DRIVE_FOLDER_ID}' in parents and "
f"name='{year_name}' and "
f"mimeType='application/vnd.google-apps.folder' and "
f"trashed=false"
)
results = service.files().list(q=query, spaces="drive", fields="files(id, name)").execute()
folders = results.get("files", [])
if folders:
return folders[0]["id"]
# Create year folder
folder_metadata = {
"name": year_name,
"mimeType": "application/vnd.google-apps.folder",
"parents": [DRIVE_FOLDER_ID],
}
folder = service.files().create(body=folder_metadata, fields="id").execute()
print(f" [DropBox] Created Drive folder: {year_name} (id={folder['id']})", file=sys.stderr)
return folder["id"]
def upload_to_drive(file_path: Path, filename: str) -> dict | None:
"""Upload a file to Google Drive under the appropriate year folder.
Returns the Drive file metadata dict, or None on failure.
"""
from googleapiclient.http import MediaFileUpload
try:
service = _get_drive_service()
# Determine year from filename prefix (YYYY-MM-DD)
year = int(filename[:4]) if len(filename) >= 4 and filename[:4].isdigit() else datetime.now(CHICAGO_TZ).year
folder_id = _get_or_create_year_folder(service, year)
# Upload
file_metadata = {
"name": filename,
"parents": [folder_id],
}
media = MediaFileUpload(str(file_path), resumable=True)
drive_file = service.files().create(
body=file_metadata,
media_body=media,
fields="id, name, webViewLink",
).execute()
print(f" [DropBox] Uploaded: {filename} → Drive (id={drive_file['id']})", file=sys.stderr)
return drive_file
except Exception as e:
print(f" [DropBox] Drive upload failed: {e}", file=sys.stderr)
return None
---------------------------------------------------------------------------
Telegram React
---------------------------------------------------------------------------
def react_thumbs_up(message_id: str | int, target: str = "") -> bool:
"""React with 👍 on the source Telegram message."""
target = target or os.environ.get("TELEGRAM_CHAT_ID", "")
if not target:
print(" [DropBox] No Telegram target for reaction", file=sys.stderr)
return False
cmd = [
"openclaw", "message", "react",
"--channel", "telegram",
"--target", target,
"--message-id", str(message_id),
"--emoji", "👍",
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
return result.returncode == 0
except Exception as e:
print(f" [DropBox] React failed: {e}", file=sys.stderr)
return False
---------------------------------------------------------------------------
Main Pipeline
---------------------------------------------------------------------------
def process_document(
image_bytes: bytes,
original_filename: str = "document.jpg",
message_id: str | int = "",
telegram_target: str = "",
) -> dict:
"""Full pipeline: extract → rename → upload → cleanup.
Args:
image_bytes: Raw image bytes from Telegram
original_filename: Original filename for extension detection
message_id: Telegram message ID for 👍 reaction
telegram_target: Telegram chat ID for reaction
Returns:
Dict with status, filename, drive_link, and any errors.
"""
result = {
"status": "pending",
"filename": "",
"drive_link": "",
"extraction": {},
"error": "",
}
temp_file = None
try:
# 1. Save to ephemeral temp
TEMP_DIR.mkdir(parents=True, exist_ok=True)
ext = Path(original_filename).suffix or ".jpg"
temp_file = TEMP_DIR / f"doc_{datetime.now().strftime('%Y%m%d_%H%M%S')}{ext}"
temp_file.write_bytes(image_bytes)
print(f" [DropBox] Saved temp: {temp_file} ({len(image_bytes)} bytes)", file=sys.stderr)
# 2. OCR extraction via local vision model
print(f" [DropBox] Sending to {VISION_MODEL} for OCR...", file=sys.stderr)
extraction = _call_vision_llm(image_bytes, EXTRACTION_PROMPT)
if not extraction:
result["status"] = "extraction_failed"
result["error"] = "Vision LLM returned no usable data"
return result
result["extraction"] = extraction
print(f" [DropBox] Extracted: {extraction}", file=sys.stderr)
# 3. Build filename
filename = build_filename(extraction, ext)
result["filename"] = filename
print(f" [DropBox] Filename: {filename}", file=sys.stderr)
# 4. Upload to Drive
if not DRIVE_FOLDER_ID:
result["status"] = "no_drive_config"
result["error"] = "DRIVE_FOLDER_ID not set"
return result
drive_result = upload_to_drive(temp_file, filename)
if drive_result:
result["status"] = "filed"
result["drive_link"] = drive_result.get("webViewLink", "")
else:
result["status"] = "upload_failed"
result["error"] = "Drive upload failed"
return result
# 5. React 👍
if message_id:
react_thumbs_up(message_id, telegram_target)
return result
except Exception as e:
result["status"] = "error"
result["error"] = str(e)
print(f" [DropBox] Pipeline error: {e}", file=sys.stderr)
return result
finally:
# 6. Ephemeral cleanup — ALWAYS delete local PII
if temp_file and temp_file.exists():
try:
os.remove(temp_file)
print(f" [DropBox] Deleted temp: {temp_file}", file=sys.stderr)
except OSError as e:
print(f" [DropBox] Failed to delete temp file: {e}", file=sys.stderr)
def process_document_from_file(file_path: str, message_id: str = "") -> dict:
"""Convenience wrapper: process a document from a local file path."""
path = Path(file_path)
if not path.is_file():
return {"status": "error", "error": f"File not found: {file_path}"}
return process_document(
image_bytes=path.read_bytes(),
original_filename=path.name,
message_id=message_id,
)
---------------------------------------------------------------------------
CLI
---------------------------------------------------------------------------
def cli_process(args):
"""Handle: family-assistant dropbox --file photo.jpg"""
if args.file:
result = process_document_from_file(args.file, message_id=getattr(args, "message_id", ""))
elif args.test:
# Dry-run: test extraction without uploading
if not Path(args.test).is_file():
print(f"File not found: {args.test}")
sys.exit(1)
image_bytes = Path(args.test).read_bytes()
extraction = _call_vision_llm(image_bytes, EXTRACTION_PROMPT)
if extraction:
filename = build_filename(extraction, Path(args.test).suffix or ".jpg")
print(f"Extraction: {json.dumps(extraction, indent=2)}")
print(f"Filename: {filename}")
else:
print("❌ Extraction failed")
else:
print("Usage: family-assistant dropbox --file photo.jpg")
print(" family-assistant dropbox --test photo.jpg (dry-run, no upload)")
sys.exit(1)
def cli_drive_setup(args):
"""Handle: family-assistant drive-setup"""
if not DRIVE_FOLDER_ID:
print("❌ DRIVE_FOLDER_ID not set in .env")
print(" 1. Enable Google Drive API in GCP")
print(" 2. Create a 'Family Documents' folder in Drive")
print(" 3. Share the folder with the service account email")
print(" 4. Add DRIVE_FOLDER_ID=
return
try:
service = _get_drive_service()
# Test access by listing the folder
folder = service.files().get(fileId=DRIVE_FOLDER_ID, fields="id, name").execute()
print(f"✅ Drive connected: \"{folder['name']}\" (id={folder['id']})")
# Check/create current year subfolder
year = datetime.now(CHICAGO_TZ).year
year_id = _get_or_create_year_folder(service, year)
print(f"✅ Year folder ready: {year}/ (id={year_id})")
except Exception as e:
print(f"❌ Drive setup failed: {e}")