"""End-to-end vision pipeline: Email attachment → briefing card.""" import json import tempfile from pathlib import Path from typing import Optional from icarus.core.vision.parser import parse_document, parse_document_bytes from icarus.core.briefing.generator import generate_briefing from icarus.core.config.staging import DATA_DIR # Cache directory for processed attachments ATTACHMENT_CACHE_DIR = DATA_DIR / "vision_cache" ATTACHMENT_CACHE_DIR.mkdir(parents=True, exist_ok=True) def _infer_family_members(email_meta: dict) -> list: """Infer which family members this email concerns from subject/to fields.""" recipients = [] text = f"{email_meta.get('subject', '')} {email_meta.get('to', '')} {email_meta.get('body', '')}".lower() if any(name in text for name in ['sully', 'sullivan', "sullivan's"]): recipients.append('Sullivan') if 'harper' in text: recipients.append('Harper') if 'aundrea' in text or 'mom' in text: recipients.append('Aundrea') if 'matt' in text or 'dad' in text or 'hoffmann' in text: recipients.append('Matt') return recipients or ['Family'] def _infer_urgency(email_meta: dict, text: str) -> str: """Infer urgency from keywords in email and document.""" urgent_keywords = ['urgent', 'asap', 'deadline', 'tomorrow', 'due', 'required'] text_lower = f"{email_meta.get('subject', '')} {text}".lower() if any(kw in text_lower for kw in urgent_keywords): return "high" # Check for dates within 3 days # (Simplified — could use actual date parsing) return "medium" async def process_attachment( email_meta: dict, attachment: bytes, filename: str ) -> dict: """ Process an email attachment through the vision pipeline. Args: email_meta: {"from", "subject", "date", "to", "body"} attachment: Raw bytes filename: Original filename Returns: Complete briefing card with metadata """ suffix = Path(filename).suffix.lower() # Validate file type if suffix not in ['.pdf', '.png', '.jpg', '.jpeg', '.gif', '.webp']: raise ValueError(f"Unsupported file type: {suffix}") # Step 1: Parse document (pdfplumber or vision) parsed = await parse_document_bytes(attachment, filename) # Step 2: Gather calendar context # TODO: Query calendar for conflicts around parsed date # For now, use empty context (will be populated when calendar integration ready) calendar_events = [] # Step 3: Infer metadata family_members = _infer_family_members(email_meta) urgency = _infer_urgency(email_meta, parsed.get("text", "")) # Step 4: Generate briefing briefing = await generate_briefing( parsed_doc=parsed, calendar_events=calendar_events, family_members=family_members, urgency=urgency ) # Step 5: Add pipeline metadata result = { "briefing": briefing, "source": { "filename": filename, "email_from": email_meta.get("from"), "email_subject": email_meta.get("subject"), "email_date": email_meta.get("date"), "parsed_at": str(Path.cwd() / ATTACHMENT_CACHE_DIR) }, "processing": { "parser_method": parsed.get("method"), "parser_confidence": parsed.get("confidence"), "pages_processed": parsed.get("pages", 1), "total_pages": parsed.get("total_pages", 1) } } return result async def process_standalone(file_path: Path) -> dict: """ Process a standalone file (for API uploads/testing). Args: file_path: Path to PDF or image file Returns: Briefing card """ content = file_path.read_bytes() filename = file_path.name email_meta = { "from": "upload@icarus.local", "subject": filename, "date": "now", "to": "family", "body": "" } return await process_attachment(email_meta, content, filename) # Integration hook for email worker async def handle_email_attachment(email_data: dict, attachment_data: dict) -> dict: """ Hook for email worker to process attachments. Args: email_data: Full email metadata attachment_data: {"filename", "content", "content_type"} Returns: Briefing card + notification info """ email_meta = { "from": email_data.get("from"), "subject": email_data.get("subject"), "date": email_data.get("date"), "to": email_data.get("to"), "body": email_data.get("body", "")[:500] # Preview only } result = await process_attachment( email_meta=email_meta, attachment=attachment_data.get("content"), filename=attachment_data.get("filename") ) # Determine if notification should be sent should_notify = result["briefing"].get("confidence", 0) > 0.5 return { **result, "notification": { "should_notify": should_notify, "priority": result["briefing"].get("category", "info"), "channels": ["telegram"] if should_notify else [] } }