📄 pipeline.py 5,270 bytes Apr 25, 2026 📋 Raw

"""End-to-end vision pipeline: Email attachment → briefing card."""

import json
import tempfile
from pathlib import Path
from typing import Optional

from icarus.core.vision.parser import parse_document, parse_document_bytes
from icarus.core.briefing.generator import generate_briefing
from icarus.core.config.staging import DATA_DIR

Cache directory for processed attachments

ATTACHMENT_CACHE_DIR = DATA_DIR / "vision_cache"
ATTACHMENT_CACHE_DIR.mkdir(parents=True, exist_ok=True)

def _infer_family_members(email_meta: dict) -> list:
"""Infer which family members this email concerns from subject/to fields."""
recipients = []
text = f"{email_meta.get('subject', '')} {email_meta.get('to', '')} {email_meta.get('body', '')}".lower()

if any(name in text for name in ['sully', 'sullivan', "sullivan's"]):
    recipients.append('Sullivan')
if 'harper' in text:
    recipients.append('Harper')
if 'aundrea' in text or 'mom' in text:
    recipients.append('Aundrea')
if 'matt' in text or 'dad' in text or 'hoffmann' in text:
    recipients.append('Matt')

return recipients or ['Family']

def _infer_urgency(email_meta: dict, text: str) -> str:
"""Infer urgency from keywords in email and document."""
urgent_keywords = ['urgent', 'asap', 'deadline', 'tomorrow', 'due', 'required']
text_lower = f"{email_meta.get('subject', '')} {text}".lower()

if any(kw in text_lower for kw in urgent_keywords):
    return "high"

# Check for dates within 3 days
# (Simplified  could use actual date parsing)
return "medium"

async def process_attachment(
email_meta: dict,
attachment: bytes,
filename: str
) -> dict:
"""
Process an email attachment through the vision pipeline.

Args:
    email_meta: {"from", "subject", "date", "to", "body"}
    attachment: Raw bytes
    filename: Original filename

Returns:
    Complete briefing card with metadata
"""
suffix = Path(filename).suffix.lower()

# Validate file type
if suffix not in ['.pdf', '.png', '.jpg', '.jpeg', '.gif', '.webp']:
    raise ValueError(f"Unsupported file type: {suffix}")

# Step 1: Parse document (pdfplumber or vision)
parsed = await parse_document_bytes(attachment, filename)

# Step 2: Gather calendar context
# TODO: Query calendar for conflicts around parsed date
# For now, use empty context (will be populated when calendar integration ready)
calendar_events = []

# Step 3: Infer metadata
family_members = _infer_family_members(email_meta)
urgency = _infer_urgency(email_meta, parsed.get("text", ""))

# Step 4: Generate briefing
briefing = await generate_briefing(
    parsed_doc=parsed,
    calendar_events=calendar_events,
    family_members=family_members,
    urgency=urgency
)

# Step 5: Add pipeline metadata
result = {
    "briefing": briefing,
    "source": {
        "filename": filename,
        "email_from": email_meta.get("from"),
        "email_subject": email_meta.get("subject"),
        "email_date": email_meta.get("date"),
        "parsed_at": str(Path.cwd() / ATTACHMENT_CACHE_DIR)
    },
    "processing": {
        "parser_method": parsed.get("method"),
        "parser_confidence": parsed.get("confidence"),
        "pages_processed": parsed.get("pages", 1),
        "total_pages": parsed.get("total_pages", 1)
    }
}

return result

async def process_standalone(file_path: Path) -> dict:
"""
Process a standalone file (for API uploads/testing).

Args:
    file_path: Path to PDF or image file

Returns:
    Briefing card
"""
content = file_path.read_bytes()
filename = file_path.name

email_meta = {
    "from": "upload@icarus.local",
    "subject": filename,
    "date": "now",
    "to": "family",
    "body": ""
}

return await process_attachment(email_meta, content, filename)

Integration hook for email worker

async def handle_email_attachment(email_data: dict, attachment_data: dict) -> dict:
"""
Hook for email worker to process attachments.

Args:
    email_data: Full email metadata
    attachment_data: {"filename", "content", "content_type"}

Returns:
    Briefing card + notification info
"""
email_meta = {
    "from": email_data.get("from"),
    "subject": email_data.get("subject"),
    "date": email_data.get("date"),
    "to": email_data.get("to"),
    "body": email_data.get("body", "")[:500]  # Preview only
}

result = await process_attachment(
    email_meta=email_meta,
    attachment=attachment_data.get("content"),
    filename=attachment_data.get("filename")
)

# Determine if notification should be sent
should_notify = result["briefing"].get("confidence", 0) > 0.5

return {
    **result,
    "notification": {
        "should_notify": should_notify,
        "priority": result["briefing"].get("category", "info"),
        "channels": ["telegram"] if should_notify else []
    }
}