"""End-to-end vision pipeline: Email attachment → briefing card."""
import json
import tempfile
from pathlib import Path
from typing import Optional
from icarus.core.vision.parser import parse_document, parse_document_bytes
from icarus.core.briefing.generator import generate_briefing
from icarus.core.config.staging import DATA_DIR
Cache directory for processed attachments
ATTACHMENT_CACHE_DIR = DATA_DIR / "vision_cache"
ATTACHMENT_CACHE_DIR.mkdir(parents=True, exist_ok=True)
def _infer_family_members(email_meta: dict) -> list:
"""Infer which family members this email concerns from subject/to fields."""
recipients = []
text = f"{email_meta.get('subject', '')} {email_meta.get('to', '')} {email_meta.get('body', '')}".lower()
if any(name in text for name in ['sully', 'sullivan', "sullivan's"]):
recipients.append('Sullivan')
if 'harper' in text:
recipients.append('Harper')
if 'aundrea' in text or 'mom' in text:
recipients.append('Aundrea')
if 'matt' in text or 'dad' in text or 'hoffmann' in text:
recipients.append('Matt')
return recipients or ['Family']
def _infer_urgency(email_meta: dict, text: str) -> str:
"""Infer urgency from keywords in email and document."""
urgent_keywords = ['urgent', 'asap', 'deadline', 'tomorrow', 'due', 'required']
text_lower = f"{email_meta.get('subject', '')} {text}".lower()
if any(kw in text_lower for kw in urgent_keywords):
return "high"
# Check for dates within 3 days
# (Simplified — could use actual date parsing)
return "medium"
async def process_attachment(
email_meta: dict,
attachment: bytes,
filename: str
) -> dict:
"""
Process an email attachment through the vision pipeline.
Args:
email_meta: {"from", "subject", "date", "to", "body"}
attachment: Raw bytes
filename: Original filename
Returns:
Complete briefing card with metadata
"""
suffix = Path(filename).suffix.lower()
# Validate file type
if suffix not in ['.pdf', '.png', '.jpg', '.jpeg', '.gif', '.webp']:
raise ValueError(f"Unsupported file type: {suffix}")
# Step 1: Parse document (pdfplumber or vision)
parsed = await parse_document_bytes(attachment, filename)
# Step 2: Gather calendar context
# TODO: Query calendar for conflicts around parsed date
# For now, use empty context (will be populated when calendar integration ready)
calendar_events = []
# Step 3: Infer metadata
family_members = _infer_family_members(email_meta)
urgency = _infer_urgency(email_meta, parsed.get("text", ""))
# Step 4: Generate briefing
briefing = await generate_briefing(
parsed_doc=parsed,
calendar_events=calendar_events,
family_members=family_members,
urgency=urgency
)
# Step 5: Add pipeline metadata
result = {
"briefing": briefing,
"source": {
"filename": filename,
"email_from": email_meta.get("from"),
"email_subject": email_meta.get("subject"),
"email_date": email_meta.get("date"),
"parsed_at": str(Path.cwd() / ATTACHMENT_CACHE_DIR)
},
"processing": {
"parser_method": parsed.get("method"),
"parser_confidence": parsed.get("confidence"),
"pages_processed": parsed.get("pages", 1),
"total_pages": parsed.get("total_pages", 1)
}
}
return result
async def process_standalone(file_path: Path) -> dict:
"""
Process a standalone file (for API uploads/testing).
Args:
file_path: Path to PDF or image file
Returns:
Briefing card
"""
content = file_path.read_bytes()
filename = file_path.name
email_meta = {
"from": "upload@icarus.local",
"subject": filename,
"date": "now",
"to": "family",
"body": ""
}
return await process_attachment(email_meta, content, filename)
Integration hook for email worker
async def handle_email_attachment(email_data: dict, attachment_data: dict) -> dict:
"""
Hook for email worker to process attachments.
Args:
email_data: Full email metadata
attachment_data: {"filename", "content", "content_type"}
Returns:
Briefing card + notification info
"""
email_meta = {
"from": email_data.get("from"),
"subject": email_data.get("subject"),
"date": email_data.get("date"),
"to": email_data.get("to"),
"body": email_data.get("body", "")[:500] # Preview only
}
result = await process_attachment(
email_meta=email_meta,
attachment=attachment_data.get("content"),
filename=attachment_data.get("filename")
)
# Determine if notification should be sent
should_notify = result["briefing"].get("confidence", 0) > 0.5
return {
**result,
"notification": {
"should_notify": should_notify,
"priority": result["briefing"].get("category", "info"),
"channels": ["telegram"] if should_notify else []
}
}