"""Family Brain — Local RAG retrieval for family knowledge.
The Family Brain is a ChromaDB-based vector store that remembers
everything parsed from emails and newsletters. It answers questions
like "what do the kids need for the field trip?" by semantically
retrieving the relevant context.
HARDWARE TOPOLOGY (Tri-Node):
- ChromaDB data files: Beelink (~/.family_assistant/chroma_db/)
- Embeddings: Gaming PC Ollama (nomic-embed-text) via Tailscale API
- Retrieval + synthesis: Beelink local LLM (qwen2.5-coder:7b via localhost)
"""
import hashlib
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import requests
from family_assistant.config import CHICAGO_TZ, LLM_MODEL, LLM_URL, LLM_TIMEOUT
ChromaDB data lives on Beelink (where the pipeline runs)
CHROMA_DB_PATH = Path.home() / ".family_assistant" / "chroma_db"
CHROMA_DB_PATH.mkdir(parents=True, exist_ok=True)
Embedding service runs on Gaming PC via Tailscale
OLLAMA_EMBED_URL = os.environ.get(
"OLLAMA_EMBED_URL", "http://localhost:11434/api/embeddings"
)
EMBEDDING_MODEL = "nomic-embed-text"
EMBEDDING_TIMEOUT = 30
Document TTL (auto-purge old newsletters)
DEFAULT_TTL_MONTHS = 12
def _get_chroma_client():
"""Lazy-load ChromaDB client (embedded mode, local files)."""
import chromadb
return chromadb.PersistentClient(path=str(CHROMA_DB_PATH))
def _get_collection(name: str = "family_knowledge"):
"""Get or create a ChromaDB collection."""
client = _get_chroma_client()
return client.get_or_create_collection(
name=name,
metadata={"hnsw:space": "cosine"}, # Cosine similarity for semantic search
)
def _embed_text(text: str) -> list[float]:
"""Generate embeddings via Ollama API on Gaming PC.
Returns a 768-dimensional vector from nomic-embed-text.
"""
resp = requests.post(
OLLAMA_EMBED_URL,
json={"model": EMBEDDING_MODEL, "prompt": text},
timeout=EMBEDDING_TIMEOUT,
)
resp.raise_for_status()
data = resp.json()
embedding = data.get("embedding")
if not embedding or not isinstance(embedding, list):
raise ValueError(f"Unexpected embedding response: {data}")
return embedding
def _chunk_text(text: str, chunk_size: int = 512, overlap: int = 128) -> list[str]:
"""Split text into overlapping chunks for better retrieval.
Uses sentence-aware splitting (roughly — splits on periods/newlines).
"""
sentences = [s.strip() for s in text.replace("\n", ". ").split(".") if s.strip()]
chunks = []
current_chunk = []
current_len = 0
for sent in sentences:
sent_len = len(sent.split())
if current_len + sent_len > chunk_size and current_chunk:
chunks.append(". ".join(current_chunk) + ".")
# Keep overlap sentences
overlap_sentences = current_chunk[-(overlap // 20):] if len(current_chunk) > 1 else []
current_chunk = overlap_sentences + [sent]
current_len = sum(len(s.split()) for s in current_chunk)
else:
current_chunk.append(sent)
current_len += sent_len
if current_chunk:
chunks.append(". ".join(current_chunk) + ".")
return chunks if chunks else [text[:chunk_size * 5]] # Fallback
def _doc_id(source: str, date: str, chunk_idx: int = 0) -> str:
"""Generate a deterministic document ID."""
base = f"{source}:{date}:{chunk_idx}"
return hashlib.sha256(base.encode()).hexdigest()[:32]
def ingest_email(
subject: str,
body: str,
from_addr: str,
email_date: str,
parsed_items: list[dict],
) -> dict:
"""Ingest an email into the Family Brain.
Args:
subject: Email subject line
body: Raw email body text
from_addr: Sender email address
email_date: ISO 8601 date string of the email
parsed_items: List of items extracted (appointments, reminders, etc.)
Returns:
Dict with ids of inserted documents.
"""
collection = _get_collection()
# Build rich text for embedding
items_summary = "\n".join(
f"- {item.get('type', 'item')}: {item.get('summary', '')} "
f"(when: {item.get('start', item.get('due', 'TBD'))!s})"
for item in parsed_items
)
full_text = f"Subject: {subject}\nFrom: {from_addr}\nDate: {email_date}\n\n{body}\n\nExtracted Items:\n{items_summary}"
chunks = _chunk_text(full_text)
ids = []
embeddings = []
metadatas = []
documents = []
for idx, chunk in enumerate(chunks):
doc_id = _doc_id(subject, email_date, idx)
embedding = _embed_text(chunk)
ids.append(doc_id)
embeddings.append(embedding)
documents.append(chunk)
# Serialize parsed_items for JSON storage (handle datetime objects)
safe_items = json.dumps(parsed_items, default=str)
metadatas.append({
"source": "email",
"subject": subject,
"from": from_addr,
"date": email_date,
"chunk_idx": idx,
"total_chunks": len(chunks),
"ingested_at": datetime.now(CHICAGO_TZ).isoformat(),
"parsed_items": safe_items,
})
collection.upsert(
ids=ids,
embeddings=embeddings,
documents=documents,
metadatas=metadatas,
)
return {"status": "ingested", "ids": ids, "chunks": len(chunks)}
def ingest_newsletter(
subject: str,
body: str,
from_addr: str,
email_date: str,
items: list[dict],
) -> dict:
"""Ingest a newsletter into the Family Brain.
Stores full newsletter text + structured items for retrieval.
Args:
subject: Email subject
body: Raw newsletter body
from_addr: Sender
email_date: ISO date
items: Parsed items (events, reminders, actions, info)
Returns:
Dict with ingestion status.
"""
collection = _get_collection()
# Build structured summary from parsed items
items_by_type = {}
for item in items:
t = item.get("type", "info")
items_by_type.setdefault(t, []).append(item)
items_summary = ""
for t, t_items in items_by_type.items():
items_summary += f"\n{t.upper()}:\n"
for it in t_items:
summary = it.get("summary", "")
who = ", ".join(it.get("who", []))
when = it.get("start", it.get("due", "TBD"))
desc = it.get("description", "")
relevance = it.get("relevance", "high")
items_summary += f" - {summary}"
if who:
items_summary += f" ({who})"
items_summary += f" [{when}]"
if desc:
items_summary += f" — {desc[:100]}"
if relevance == "low":
items_summary += " [low relevance]"
items_summary += "\n"
full_text = (
f"Newsletter: {subject}\nFrom: {from_addr}\nDate: {email_date}\n\n"
f"{body}\n\n---\nParsed Items:{items_summary}"
)
chunks = _chunk_text(full_text)
ids = []
embeddings = []
metadatas = []
documents = []
for idx, chunk in enumerate(chunks):
doc_id = _doc_id(subject, email_date, idx)
embedding = _embed_text(chunk)
ids.append(doc_id)
embeddings.append(embedding)
documents.append(chunk)
# Serialize items for JSON storage (handle datetime objects)
safe_items = json.dumps(items, default=str)
metadatas.append({
"source": "newsletter",
"subject": subject,
"from": from_addr,
"date": email_date,
"chunk_idx": idx,
"total_chunks": len(chunks),
"ingested_at": datetime.now(CHICAGO_TZ).isoformat(),
"items": safe_items,
"item_count": len(items),
})
collection.upsert(
ids=ids,
embeddings=embeddings,
documents=documents,
metadatas=metadatas,
)
return {"status": "ingested", "ids": ids, "chunks": len(chunks)}
def query(
question: str,
top_k: int = 3,
where: dict | None = None,
) -> list[dict]:
"""Query the Family Brain with a natural language question.
Args:
question: The user's question
top_k: Number of chunks to retrieve
where: Optional ChromaDB filter (e.g., {"source": "newsletter"})
Returns:
List of retrieved documents with metadata and distance scores.
"""
collection = _get_collection()
query_embedding = _embed_text(question)
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=where,
include=["documents", "metadatas", "distances"],
)
# Flatten results
documents = results.get("documents", [[]])[0]
metadatas = results.get("metadatas", [[]])[0]
distances = results.get("distances", [[]])[0]
return [
{
"document": doc,
"metadata": meta,
"distance": dist,
"score": 1 - dist, # Convert distance to similarity score
}
for doc, meta, dist in zip(documents, metadatas, distances)
]
def _build_synthesis_prompt(question: str, context_chunks: list[dict]) -> str:
"""Build a prompt for the synthesis LLM.
Brief, conversational, no robotic formatting.
"""
context_text = "\n\n".join(
f"[Source: {c['metadata'].get('subject', 'Unknown')} (relevance: {c.get('score', 0):.2f}), "
f"{c['metadata'].get('date', 'Unknown')}]\n{c['document'][:800]}"
for c in context_chunks
)
# Get top score for routing
top_score = context_chunks[0].get("score", 0) if context_chunks else 0
return f"""You are a retrieval assistant answering a question based on email and newsletter context.
INSTRUCTIONS:
- Answer the user's question using the provided context only
- Be brief and conversational (like texting a spouse)
- DO NOT ask follow-up questions
- DO NOT offer to perform actions (like updating the calendar)
- Just provide the information
- HIGH RELEVANCE (>0.7): Directly answer from the context with confidence
- MEDIUM RELEVANCE (0.5-0.7): Answer with "Based on the [source], it appears..."
- LOW RELEVANCE (<0.5): Only say "I don't see that in the emails I've processed"
- Avoid bullet points, headers, or robotic formatting
TOP MATCH SCORE: {top_score:.3f}
QUESTION: {question}
CONTEXT:
{context_text}
ANSWER:"""
def _build_hybrid_prompt(
question: str,
calendar_events: list[dict] | None = None,
brain_chunks: list[dict] | None = None,
) -> str:
"""Build a hybrid prompt combining Calendar (source of truth) + Brain context.
Calendar is the authoritative source for temporal data (time/date/location).
ChromaDB provides detail context (dress codes, what to bring, signup info).
"""
sections = []
top_score = 0.0
if calendar_events:
cal_text = "\n".join(
f"- {e['summary']} | {e['start']} → {e['end']}"
f"{f' | Location: {e["location"]}' if e.get('location') else ''}"
f"{f' | {e["description"][:200]}' if e.get('description') else ''}"
for e in calendar_events
)
sections.append(
f"GOOGLE CALENDAR DATA (source of truth for time/date/location):\n{cal_text}"
)
if brain_chunks:
# Filter by relevance score >= 0.5 for medium confidence
filtered_chunks = [c for c in brain_chunks if c.get("score", 0) >= 0.5]
if filtered_chunks:
top_score = max(c.get("score", 0) for c in filtered_chunks)
brain_text = "\n\n".join(
f"[Source: {c['metadata'].get('subject', 'Unknown')} (relevance: {c.get('score', 0):.2f}), "
f"{c['metadata'].get('date', 'Unknown')}]\n{c['document'][:800]}"
for c in filtered_chunks
)
sections.append(
f"EMAIL/NEWSLETTER CONTEXT (details, requirements, dress codes, what to bring):\n{brain_text}"
)
context_block = "\n\n".join(sections) if sections else "No context found."
# Determine response guidance based on top score
if top_score >= 0.7:
score_guidance = "HIGH RELEVANCE: Answer confidently using the retrieved context."
elif top_score >= 0.5:
score_guidance = "MEDIUM RELEVANCE: Answer tentatively with 'Based on the emails...'"
elif top_score > 0:
score_guidance = "LOW RELEVANCE: Only say 'I don't see that in the emails I've processed.'"
else:
score_guidance = "NO CONTEXT: Say 'I don't see that in the emails I've processed.'"
return f"""You are a retrieval assistant answering a question using both calendar data and email context.
INSTRUCTIONS:
- Answer the user's question using the provided data only
- Be brief and conversational (like texting a spouse)
- DO NOT ask follow-up questions
- DO NOT offer to perform actions (like updating the calendar)
- Just provide the information
- Calendar is the source of truth for time/date/location — if the Calendar and email context conflict on scheduling details, trust the Calendar
- Use email/newsletter context for details like dress codes, what to bring, requirements, signup links
- {score_guidance}
- Avoid bullet points, headers, or robotic formatting
QUESTION: {question}
{context_block}
ANSWER:"""
def answer(
question: str,
top_k: int = 3,
where: dict | None = None,
) -> dict:
"""Answer a question using RAG (retrieve + synthesize).
Args:
question: The user's question
top_k: Number of chunks to retrieve
where: Optional filter
Returns:
Dict with answer, sources, and confidence.
"""
# Retrieve relevant chunks
chunks = query(question, top_k=top_k, where=where)
# Filter chunks by relevance score (minimum 0.5 threshold)
relevant_chunks = [c for c in chunks if c.get("score", 0) >= 0.5]
if not relevant_chunks:
return {
"answer": "I don't see that in the emails I've processed.",
"sources": [],
"confidence": "low",
}
# Synthesize answer with local LLM (Beelink → localhost Ollama)
prompt = _build_synthesis_prompt(question, relevant_chunks)
payload = {
"model": LLM_MODEL,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"stream": False,
}
resp = requests.post(LLM_URL, json=payload, timeout=LLM_TIMEOUT)
resp.raise_for_status()
data = resp.json()
# Handle both formats
choices = data.get("choices", [])
if choices:
answer_text = choices[0].get("message", {}).get("content", "").strip()
elif "message" in data:
answer_text = data["message"].get("content", "").strip()
else:
answer_text = data.get("response", "").strip()
# Clean up common LLM artifacts
answer_text = answer_text.replace("ANSWER:", "").replace("Answer:", "").strip()
# Determine confidence based on top score
top_score = relevant_chunks[0]["score"] if relevant_chunks else 0
if top_score >= 0.7:
confidence = "high"
elif top_score >= 0.5:
confidence = "medium"
else:
confidence = "low"
return {
"answer": answer_text,
"sources": [
{
"subject": c["metadata"].get("subject"),
"date": c["metadata"].get("date"),
"score": round(c["score"], 3),
}
for c in relevant_chunks
],
"confidence": confidence,
}
def stats() -> dict:
"""Get statistics about the Family Brain."""
client = _get_chroma_client()
collection = _get_collection()
return {
"db_path": str(CHROMA_DB_PATH),
"collection": "family_knowledge",
"count": collection.count(),
"collections": client.list_collections(),
}
def purge_old(months: int = DEFAULT_TTL_MONTHS) -> dict:
"""Remove documents older than the specified TTL.
Returns:
Dict with count of deleted documents.
"""
cutoff = datetime.now(CHICAGO_TZ) - timedelta(days=months * 30)
cutoff_iso = cutoff.isoformat()
collection = _get_collection()
# Get all documents
all_docs = collection.get(include=["metadatas"])
ids_to_delete = []
for doc_id, meta in zip(all_docs.get("ids", []), all_docs.get("metadatas", [])):
ingested = meta.get("ingested_at", "")
if ingested and ingested < cutoff_iso:
ids_to_delete.append(doc_id)
if ids_to_delete:
collection.delete(ids=ids_to_delete)
return {
"purged": len(ids_to_delete),
"cutoff": cutoff_iso,
"remaining": collection.count(),
}