📄 family_brain.py 20,099 bytes Sunday 21:50 📋 Raw

"""Family Brain — Local RAG retrieval for family knowledge.

The Family Brain is a ChromaDB-based vector store that remembers
everything parsed from emails and newsletters. It answers questions
like "what do the kids need for the field trip?" by semantically
retrieving the relevant context.

HARDWARE TOPOLOGY (Tri-Node):
- ChromaDB data files: Beelink (DATA_DIR/chroma_db/)
- Embeddings: Gaming PC Ollama (nomic-embed-text) via Tailscale API
- Retrieval + synthesis: Beelink local LLM (qwen2.5-coder:7b via localhost)
"""

import hashlib
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any

import requests

from icarus.core.config import CHICAGO_TZ, LLM_MODEL, LLM_URL, LLM_TIMEOUT, CHROMA_DB_PATH

ChromaDB data path from config (staging/production isolation)

CHROMA_DB_PATH.mkdir(parents=True, exist_ok=True)

Embedding service runs on Gaming PC via Tailscale

OLLAMA_EMBED_URL = os.environ.get(
"OLLAMA_EMBED_URL", "http://matt-pc.tail864e81.ts.net:11434/api/embeddings"
)
EMBEDDING_MODEL = "nomic-embed-text"
EMBEDDING_TIMEOUT = 30

Document TTL (auto-purge old newsletters)

DEFAULT_TTL_MONTHS = 12

def _get_chroma_client():
"""Lazy-load ChromaDB client (embedded mode, local files)."""
import chromadb

return chromadb.PersistentClient(path=str(CHROMA_DB_PATH))

def _get_collection(name: str = "family_knowledge"):
"""Get or create a ChromaDB collection."""
client = _get_chroma_client()
return client.get_or_create_collection(
name=name,
metadata={"hnsw:space": "cosine"}, # Cosine similarity for semantic search
)

def _embed_text(text: str) -> list[float]:
"""Generate embeddings via Ollama API on Gaming PC.

Returns a 768-dimensional vector from nomic-embed-text.
"""
resp = requests.post(
    OLLAMA_EMBED_URL,
    json={"model": EMBEDDING_MODEL, "prompt": text},
    timeout=EMBEDDING_TIMEOUT,
)
resp.raise_for_status()
data = resp.json()
embedding = data.get("embedding")
if not embedding or not isinstance(embedding, list):
    raise ValueError(f"Unexpected embedding response: {data}")
return embedding

def _chunk_text(text: str, chunk_size: int = 512, overlap: int = 128) -> list[str]:
"""Split text into overlapping chunks for better retrieval.

Uses sentence-aware splitting (roughly  splits on periods/newlines).
"""
sentences = [s.strip() for s in text.replace("\n", ". ").split(".") if s.strip()]
chunks = []
current_chunk = []
current_len = 0

for sent in sentences:
    sent_len = len(sent.split())
    if current_len + sent_len > chunk_size and current_chunk:
        chunks.append(". ".join(current_chunk) + ".")
        # Keep overlap sentences
        overlap_sentences = current_chunk[-(overlap // 20):] if len(current_chunk) > 1 else []
        current_chunk = overlap_sentences + [sent]
        current_len = sum(len(s.split()) for s in current_chunk)
    else:
        current_chunk.append(sent)
        current_len += sent_len

if current_chunk:
    chunks.append(". ".join(current_chunk) + ".")

return chunks if chunks else [text[:chunk_size * 5]]  # Fallback

def _doc_id(source: str, date: str, chunk_idx: int = 0) -> str:
"""Generate a deterministic document ID."""
base = f"{source}:{date}:{chunk_idx}"
return hashlib.sha256(base.encode()).hexdigest()[:32]

def ingest_email(
subject: str,
body: str,
from_addr: str,
email_date: str,
parsed_items: list[dict],
) -> dict:
"""Ingest an email into the Family Brain.

Args:
    subject: Email subject line
    body: Raw email body text
    from_addr: Sender email address
    email_date: ISO 8601 date string of the email
    parsed_items: List of items extracted (appointments, reminders, etc.)

Returns:
    Dict with ids of inserted documents.
"""
collection = _get_collection()

# Build rich text for embedding
items_summary = "\n".join(
    f"- {item.get('type', 'item')}: {item.get('summary', '')} "
    f"(when: {item.get('start', item.get('due', 'TBD'))!s})"
    for item in parsed_items
)

full_text = f"Subject: {subject}\nFrom: {from_addr}\nDate: {email_date}\n\n{body}\n\nExtracted Items:\n{items_summary}"

chunks = _chunk_text(full_text)
ids = []
embeddings = []
metadatas = []
documents = []

for idx, chunk in enumerate(chunks):
    doc_id = _doc_id(subject, email_date, idx)
    embedding = _embed_text(chunk)

    ids.append(doc_id)
    embeddings.append(embedding)
    documents.append(chunk)

    # Serialize parsed_items for JSON storage (handle datetime objects)
    safe_items = json.dumps(parsed_items, default=str)
    metadatas.append({
        "source": "email",
        "subject": subject,
        "from": from_addr,
        "date": email_date,
        "chunk_idx": idx,
        "total_chunks": len(chunks),
        "ingested_at": datetime.now(CHICAGO_TZ).isoformat(),
        "parsed_items": safe_items,
    })

collection.upsert(
    ids=ids,
    embeddings=embeddings,
    documents=documents,
    metadatas=metadatas,
)

return {"status": "ingested", "ids": ids, "chunks": len(chunks)}

def ingest_newsletter(
subject: str,
body: str,
from_addr: str,
email_date: str,
items: list[dict],
) -> dict:
"""Ingest a newsletter into the Family Brain.

Stores full newsletter text + structured items for retrieval.

Args:
    subject: Email subject
    body: Raw newsletter body
    from_addr: Sender
    email_date: ISO date
    items: Parsed items (events, reminders, actions, info)

Returns:
    Dict with ingestion status.
"""
collection = _get_collection()

# Build structured summary from parsed items
items_by_type = {}
for item in items:
    t = item.get("type", "info")
    items_by_type.setdefault(t, []).append(item)

items_summary = ""
for t, t_items in items_by_type.items():
    items_summary += f"\n{t.upper()}:\n"
    for it in t_items:
        summary = it.get("summary", "")
        who = ", ".join(it.get("who", []))
        when = it.get("start", it.get("due", "TBD"))
        desc = it.get("description", "")
        relevance = it.get("relevance", "high")
        items_summary += f"  - {summary}"
        if who:
            items_summary += f" ({who})"
        items_summary += f" [{when}]"
        if desc:
            items_summary += f" — {desc[:100]}"
        if relevance == "low":
            items_summary += " [low relevance]"
        items_summary += "\n"

full_text = (
    f"Newsletter: {subject}\nFrom: {from_addr}\nDate: {email_date}\n\n"
    f"{body}\n\n---\nParsed Items:{items_summary}"
)

chunks = _chunk_text(full_text)
ids = []
embeddings = []
metadatas = []
documents = []

for idx, chunk in enumerate(chunks):
    doc_id = _doc_id(subject, email_date, idx)
    embedding = _embed_text(chunk)

    ids.append(doc_id)
    embeddings.append(embedding)
    documents.append(chunk)
    # Serialize items for JSON storage (handle datetime objects)
    safe_items = json.dumps(items, default=str)
    metadatas.append({
        "source": "newsletter",
        "subject": subject,
        "from": from_addr,
        "date": email_date,
        "chunk_idx": idx,
        "total_chunks": len(chunks),
        "ingested_at": datetime.now(CHICAGO_TZ).isoformat(),
        "items": safe_items,
        "item_count": len(items),
    })

collection.upsert(
    ids=ids,
    embeddings=embeddings,
    documents=documents,
    metadatas=metadatas,
)

return {"status": "ingested", "ids": ids, "chunks": len(chunks)}

def query(
question: str,
top_k: int = 3,
where: dict | None = None,
) -> list[dict]:
"""Query the Family Brain with a natural language question.

Args:
    question: The user's question
    top_k: Number of chunks to retrieve
    where: Optional ChromaDB filter (e.g., {"source": "newsletter"})

Returns:
    List of retrieved documents with metadata and distance scores.
"""
collection = _get_collection()
query_embedding = _embed_text(question)

results = collection.query(
    query_embeddings=[query_embedding],
    n_results=top_k,
    where=where,
    include=["documents", "metadatas", "distances"],
)

# Flatten results
documents = results.get("documents", [[]])[0]
metadatas = results.get("metadatas", [[]])[0]
distances = results.get("distances", [[]])[0]

return [
    {
        "document": doc,
        "metadata": meta,
        "distance": dist,
        "score": 1 - dist,  # Convert distance to similarity score
    }
    for doc, meta, dist in zip(documents, metadatas, distances)
]

def _build_synthesis_prompt(question: str, context_chunks: list[dict]) -> str:
"""Build a prompt for the synthesis LLM.

Brief, conversational, no robotic formatting.
"""
context_text = "\n\n".join(
    f"[Source: {c['metadata'].get('subject', 'Unknown')} (relevance: {c.get('score', 0):.2f}), "
    f"{c['metadata'].get('date', 'Unknown')}]\n{c['document'][:800]}"
    for c in context_chunks
)

# Get top score for routing
top_score = context_chunks[0].get("score", 0) if context_chunks else 0

# Force direct answer for high relevance
if top_score >= 0.7:
    return f"""You are a helpful assistant. Based on the following email context, answer the user's question directly and confidently.

IMPORTANT: The top match has a relevance score of {top_score:.3f} ( HIGH confidence). You MUST answer using the context provided. Do NOT say you don't see it.

QUESTION: {question}

CONTEXT:
{context_text}

Provide a brief, conversational answer:"""
elif top_score >= 0.5:
return f"""You are a helpful assistant. Based on the following email context, answer the user's question.

IMPORTANT: The top match has a relevance score of {top_score:.3f} (medium confidence). You SHOULD answer using the context provided. Start with \"Based on the emails...\" if needed.

QUESTION: {question}

CONTEXT:
{context_text}

Provide a brief answer:"""
else:
return f"""You are a helpful assistant.

The retrieved documents have low relevance ({top_score:.3f}).

Say only: "I don't see that in the emails I've processed."

QUESTION: {question}"""

def _build_hybrid_prompt(
question: str,
calendar_events: list[dict] | None = None,
brain_chunks: list[dict] | None = None,
) -> str:
"""Build a hybrid prompt combining Calendar (source of truth) + Brain context.

Calendar is the authoritative source for temporal data (time/date/location).
ChromaDB provides detail context (dress codes, what to bring, signup info).
"""
sections = []
top_score = 0.0

if calendar_events:
    cal_text = "\n".join(
        f"- {e['summary']} | {e['start']} → {e['end']}"
        f"{f' | Location: {e["location"]}' if e.get('location') else ''}"
        f"{f' | {e["description"][:200]}' if e.get('description') else ''}"
        for e in calendar_events
    )
    sections.append(
        f"GOOGLE CALENDAR DATA (source of truth for time/date/location):\n{cal_text}"
    )

if brain_chunks:
    # Filter by relevance score >= 0.5
    filtered_chunks = [c for c in brain_chunks if c.get("score", 0) >= 0.5]
    if filtered_chunks:
        top_score = max(c.get("score", 0) for c in filtered_chunks)
        brain_text = "\n\n".join(
            f"[Source: {c['metadata'].get('subject', 'Unknown')} (relevance: {c.get('score', 0):.2f}), "
            f"{c['metadata'].get('date', 'Unknown')}]\n{c['document'][:800]}"
            for c in filtered_chunks
        )
        sections.append(
            f"EMAIL/NEWSLETTER CONTEXT (details, requirements, dress codes, what to bring):\n{brain_text}"
        )

context_block = "\n\n".join(sections) if sections else "No context found."

# Determine response guidance
if top_score >= 0.7:
    score_guidance = "HIGH RELEVANCE: Answer confidently using the retrieved context."
elif top_score >= 0.5:
    score_guidance = "MEDIUM RELEVANCE: Answer tentatively with 'Based on the emails...'"
elif top_score > 0:
    score_guidance = "LOW RELEVANCE: Only say 'I don't see that in the emails I've processed.'"
else:
    score_guidance = "NO CONTEXT: Say 'I don't see that in the emails I've processed.'"

return f"""You are a retrieval assistant answering a question using both calendar data and email context.

INSTRUCTIONS:
- Answer the user's question using the provided data only
- Be brief and conversational (like texting a spouse)
- DO NOT ask follow-up questions
- DO NOT offer to perform actions (like updating the calendar)
- Just provide the information
- Calendar is the source of truth for time/date/location — if the Calendar and email context conflict on scheduling details, trust the Calendar
- Use email/newsletter context for details like dress codes, what to bring, requirements, signup links
- {score_guidance}
- Avoid bullet points, headers, or robotic formatting

QUESTION: {question}

{context_block}

ANSWER:"""

def answer(
question: str,
top_k: int = 3,
where: dict | None = None,
) -> dict:
"""Answer a question using RAG (retrieve + synthesize).

Args:
    question: The user's question
    top_k: Number of chunks to retrieve
    where: Optional filter

Returns:
    Dict with answer, sources, and confidence.
"""
# Retrieve relevant chunks
chunks = query(question, top_k=top_k, where=where)

# Filter chunks by relevance (minimum 0.5 threshold)
relevant_chunks = [c for c in chunks if c.get("score", 0) >= 0.5]

if not relevant_chunks:
    return {
        "answer": "I don't see that in the emails I've processed.",
        "sources": [],
        "confidence": "low",
    }

# Synthesize answer with local LLM (Beelink → localhost Ollama)
prompt = _build_synthesis_prompt(question, relevant_chunks)
payload = {
    "model": LLM_MODEL,
    "messages": [{"role": "user", "content": prompt}],
    "temperature": 0.3,
    "stream": False,
}

resp = requests.post(LLM_URL, json=payload, timeout=LLM_TIMEOUT)
resp.raise_for_status()
data = resp.json()

# Handle both formats
choices = data.get("choices", [])
if choices:
    answer_text = choices[0].get("message", {}).get("content", "").strip()
elif "message" in data:
    answer_text = data["message"].get("content", "").strip()
else:
    answer_text = data.get("response", "").strip()

# Clean up common LLM artifacts
answer_text = answer_text.replace("ANSWER:", "").replace("Answer:", "").strip()

# Determine confidence based on top score
top_score = relevant_chunks[0]["score"] if relevant_chunks else 0
if top_score >= 0.7:
    confidence = "high"
elif top_score >= 0.5:
    confidence = "medium"
else:
    confidence = "low"

return {
    "answer": answer_text,
    "sources": [
        {
            "subject": c["metadata"].get("subject"),
            "date": c["metadata"].get("date"),
            "score": round(c["score"], 3),
            "source": c["metadata"].get("source", "unknown"),
        }
        for c in relevant_chunks
    ],
    "confidence": confidence,
}

def ingest_telegram(
extraction_id: int,
message_id: str,
sender_name: str,
sender_username: str,
chat_title: str,
message_text: str,
extraction_type: str,
extracted_who: list[str],
extracted_what: str,
extracted_when: str,
extracted_where: str,
confidence: float,
needs_confirmation: bool,
calendar_check_status: str = "pending",
calendar_event_id: str | None = None,
calendar_event_title: str | None = None,
fuzzy_match_score: float | None = None,
) -> dict:
"""Ingest a Telegram extraction into the Family Brain.

Stores the full message context + structured extraction as ChromaDB documents
for semantic retrieval. Metadata includes source provenance and calendar
validation status.

Returns:
    Dict with ingestion status and document IDs.
"""
collection = _get_collection()

# Build rich text for embedding
who_str = ", ".join(extracted_who) if extracted_who else ""
full_text = (
    f"Telegram message from {sender_name}"
    f"{" (@" + sender_username + ")" if sender_username else ""}"
    f" in {chat_title}:\n\n"
    f"{message_text}\n\n"
    f"Extracted: {extraction_type}\n"
    f"Who: {who_str}\n"
    f"What: {extracted_what}\n"
    f"When: {extracted_when}\n"
    f"Where: {extracted_where}"
)

chunks = _chunk_text(full_text)
ids: list[str] = []
embeddings: list[list[float]] = []
metadatas: list[dict] = []
documents: list[str] = []

for idx, chunk in enumerate(chunks):
    doc_id = _doc_id(f"telegram:{message_id}", "", idx)
    embedding = _embed_text(chunk)

    ids.append(doc_id)
    embeddings.append(embedding)
    documents.append(chunk)
    metadatas.append({
        "source": "telegram",
        "extraction_type": extraction_type,
        "extraction_id": str(extraction_id),
        "message_id": str(message_id),
        "sender": sender_name,
        "sender_username": sender_username or "",
        "chat": chat_title,
        "who": who_str,
        "what": extracted_what,
        "when": extracted_when,
        "where": extracted_where,
        "confidence": str(round(confidence, 3)),
        "needs_confirmation": str(needs_confirmation),
        "calendar_check_status": calendar_check_status,
        "calendar_event_id": calendar_event_id or "",
        "calendar_event_title": calendar_event_title or "",
        "fuzzy_match_score": str(round(fuzzy_match_score, 3)) if fuzzy_match_score is not None else "",
        "chunk_idx": idx,
        "total_chunks": len(chunks),
        "ingested_at": datetime.now(CHICAGO_TZ).isoformat(),
    })

collection.upsert(
    ids=ids,
    embeddings=embeddings,
    documents=documents,
    metadatas=metadatas,
)

return {"status": "ingested", "ids": ids, "chunks": len(chunks)}

def stats() -> dict:
"""Get statistics about the Family Brain."""
client = _get_chroma_client()
collection = _get_collection()
return {
"db_path": str(CHROMA_DB_PATH),
"collection": "family_knowledge",
"count": collection.count(),
"collections": client.list_collections(),
}

def purge_old(months: int = DEFAULT_TTL_MONTHS) -> dict:
"""Remove documents older than the specified TTL.

Returns:
    Dict with count of deleted documents.
"""
cutoff = datetime.now(CHICAGO_TZ) - timedelta(days=months * 30)
cutoff_iso = cutoff.isoformat()

collection = _get_collection()

# Get all documents
all_docs = collection.get(include=["metadatas"])
ids_to_delete = []

for doc_id, meta in zip(all_docs.get("ids", []), all_docs.get("metadatas", [])):
    ingested = meta.get("ingested_at", "")
    if ingested and ingested < cutoff_iso:
        ids_to_delete.append(doc_id)

if ids_to_delete:
    collection.delete(ids=ids_to_delete)

return {
    "purged": len(ids_to_delete),
    "cutoff": cutoff_iso,
    "remaining": collection.count(),
}