"""Family Brain — Local RAG retrieval for family knowledge. The Family Brain is a ChromaDB-based vector store that remembers everything parsed from emails and newsletters. It answers questions like "what do the kids need for the field trip?" by semantically retrieving the relevant context. HARDWARE TOPOLOGY (Tri-Node): - ChromaDB data files: Beelink (~/.family_assistant/chroma_db/) - Embeddings: Gaming PC Ollama (nomic-embed-text) via Tailscale API - Retrieval + synthesis: Beelink local LLM (qwen2.5-coder:7b via localhost) """ import hashlib import json import os from datetime import datetime, timedelta from pathlib import Path from typing import Any import requests from family_assistant.config import CHICAGO_TZ, LLM_MODEL, LLM_URL, LLM_TIMEOUT # ChromaDB data lives on Beelink (where the pipeline runs) CHROMA_DB_PATH = Path.home() / ".family_assistant" / "chroma_db" CHROMA_DB_PATH.mkdir(parents=True, exist_ok=True) # Embedding service runs on Gaming PC via Tailscale OLLAMA_EMBED_URL = os.environ.get( "OLLAMA_EMBED_URL", "http://localhost:11434/api/embeddings" ) EMBEDDING_MODEL = "nomic-embed-text" EMBEDDING_TIMEOUT = 30 # Document TTL (auto-purge old newsletters) DEFAULT_TTL_MONTHS = 12 def _get_chroma_client(): """Lazy-load ChromaDB client (embedded mode, local files).""" import chromadb return chromadb.PersistentClient(path=str(CHROMA_DB_PATH)) def _get_collection(name: str = "family_knowledge"): """Get or create a ChromaDB collection.""" client = _get_chroma_client() return client.get_or_create_collection( name=name, metadata={"hnsw:space": "cosine"}, # Cosine similarity for semantic search ) def _embed_text(text: str) -> list[float]: """Generate embeddings via Ollama API on Gaming PC. Returns a 768-dimensional vector from nomic-embed-text. """ resp = requests.post( OLLAMA_EMBED_URL, json={"model": EMBEDDING_MODEL, "prompt": text}, timeout=EMBEDDING_TIMEOUT, ) resp.raise_for_status() data = resp.json() embedding = data.get("embedding") if not embedding or not isinstance(embedding, list): raise ValueError(f"Unexpected embedding response: {data}") return embedding def _chunk_text(text: str, chunk_size: int = 512, overlap: int = 128) -> list[str]: """Split text into overlapping chunks for better retrieval. Uses sentence-aware splitting (roughly — splits on periods/newlines). """ sentences = [s.strip() for s in text.replace("\n", ". ").split(".") if s.strip()] chunks = [] current_chunk = [] current_len = 0 for sent in sentences: sent_len = len(sent.split()) if current_len + sent_len > chunk_size and current_chunk: chunks.append(". ".join(current_chunk) + ".") # Keep overlap sentences overlap_sentences = current_chunk[-(overlap // 20):] if len(current_chunk) > 1 else [] current_chunk = overlap_sentences + [sent] current_len = sum(len(s.split()) for s in current_chunk) else: current_chunk.append(sent) current_len += sent_len if current_chunk: chunks.append(". ".join(current_chunk) + ".") return chunks if chunks else [text[:chunk_size * 5]] # Fallback def _doc_id(source: str, date: str, chunk_idx: int = 0) -> str: """Generate a deterministic document ID.""" base = f"{source}:{date}:{chunk_idx}" return hashlib.sha256(base.encode()).hexdigest()[:32] def ingest_email( subject: str, body: str, from_addr: str, email_date: str, parsed_items: list[dict], ) -> dict: """Ingest an email into the Family Brain. Args: subject: Email subject line body: Raw email body text from_addr: Sender email address email_date: ISO 8601 date string of the email parsed_items: List of items extracted (appointments, reminders, etc.) Returns: Dict with ids of inserted documents. """ collection = _get_collection() # Build rich text for embedding items_summary = "\n".join( f"- {item.get('type', 'item')}: {item.get('summary', '')} " f"(when: {item.get('start', item.get('due', 'TBD'))!s})" for item in parsed_items ) full_text = f"Subject: {subject}\nFrom: {from_addr}\nDate: {email_date}\n\n{body}\n\nExtracted Items:\n{items_summary}" chunks = _chunk_text(full_text) ids = [] embeddings = [] metadatas = [] documents = [] for idx, chunk in enumerate(chunks): doc_id = _doc_id(subject, email_date, idx) embedding = _embed_text(chunk) ids.append(doc_id) embeddings.append(embedding) documents.append(chunk) # Serialize parsed_items for JSON storage (handle datetime objects) safe_items = json.dumps(parsed_items, default=str) metadatas.append({ "source": "email", "subject": subject, "from": from_addr, "date": email_date, "chunk_idx": idx, "total_chunks": len(chunks), "ingested_at": datetime.now(CHICAGO_TZ).isoformat(), "parsed_items": safe_items, }) collection.upsert( ids=ids, embeddings=embeddings, documents=documents, metadatas=metadatas, ) return {"status": "ingested", "ids": ids, "chunks": len(chunks)} def ingest_newsletter( subject: str, body: str, from_addr: str, email_date: str, items: list[dict], ) -> dict: """Ingest a newsletter into the Family Brain. Stores full newsletter text + structured items for retrieval. Args: subject: Email subject body: Raw newsletter body from_addr: Sender email_date: ISO date items: Parsed items (events, reminders, actions, info) Returns: Dict with ingestion status. """ collection = _get_collection() # Build structured summary from parsed items items_by_type = {} for item in items: t = item.get("type", "info") items_by_type.setdefault(t, []).append(item) items_summary = "" for t, t_items in items_by_type.items(): items_summary += f"\n{t.upper()}:\n" for it in t_items: summary = it.get("summary", "") who = ", ".join(it.get("who", [])) when = it.get("start", it.get("due", "TBD")) desc = it.get("description", "") relevance = it.get("relevance", "high") items_summary += f" - {summary}" if who: items_summary += f" ({who})" items_summary += f" [{when}]" if desc: items_summary += f" — {desc[:100]}" if relevance == "low": items_summary += " [low relevance]" items_summary += "\n" full_text = ( f"Newsletter: {subject}\nFrom: {from_addr}\nDate: {email_date}\n\n" f"{body}\n\n---\nParsed Items:{items_summary}" ) chunks = _chunk_text(full_text) ids = [] embeddings = [] metadatas = [] documents = [] for idx, chunk in enumerate(chunks): doc_id = _doc_id(subject, email_date, idx) embedding = _embed_text(chunk) ids.append(doc_id) embeddings.append(embedding) documents.append(chunk) # Serialize items for JSON storage (handle datetime objects) safe_items = json.dumps(items, default=str) metadatas.append({ "source": "newsletter", "subject": subject, "from": from_addr, "date": email_date, "chunk_idx": idx, "total_chunks": len(chunks), "ingested_at": datetime.now(CHICAGO_TZ).isoformat(), "items": safe_items, "item_count": len(items), }) collection.upsert( ids=ids, embeddings=embeddings, documents=documents, metadatas=metadatas, ) return {"status": "ingested", "ids": ids, "chunks": len(chunks)} def query( question: str, top_k: int = 3, where: dict | None = None, ) -> list[dict]: """Query the Family Brain with a natural language question. Args: question: The user's question top_k: Number of chunks to retrieve where: Optional ChromaDB filter (e.g., {"source": "newsletter"}) Returns: List of retrieved documents with metadata and distance scores. """ collection = _get_collection() query_embedding = _embed_text(question) results = collection.query( query_embeddings=[query_embedding], n_results=top_k, where=where, include=["documents", "metadatas", "distances"], ) # Flatten results documents = results.get("documents", [[]])[0] metadatas = results.get("metadatas", [[]])[0] distances = results.get("distances", [[]])[0] return [ { "document": doc, "metadata": meta, "distance": dist, "score": 1 - dist, # Convert distance to similarity score } for doc, meta, dist in zip(documents, metadatas, distances) ] def _build_synthesis_prompt(question: str, context_chunks: list[dict]) -> str: """Build a prompt for the synthesis LLM. Brief, conversational, no robotic formatting. """ context_text = "\n\n".join( f"[Source: {c['metadata'].get('subject', 'Unknown')}, " f"{c['metadata'].get('date', 'Unknown')}]\n{c['document'][:800]}" for c in context_chunks ) return f"""You are a retrieval assistant answering a question based on email and newsletter context. INSTRUCTIONS: - Answer the user's question using the provided context only - Be brief and conversational (like texting a spouse) - DO NOT ask follow-up questions - DO NOT offer to perform actions (like updating the calendar) - Just provide the information - If the answer isn't in the context, say "I don't see that in the emails I've processed" - Avoid bullet points, headers, or robotic formatting QUESTION: {question} CONTEXT: {context_text} ANSWER:""" def _build_hybrid_prompt( question: str, calendar_events: list[dict] | None = None, brain_chunks: list[dict] | None = None, ) -> str: """Build a hybrid prompt combining Calendar (source of truth) + Brain context. Calendar is the authoritative source for temporal data (time/date/location). ChromaDB provides detail context (dress codes, what to bring, signup info). """ sections = [] if calendar_events: cal_text = "\n".join( f"- {e['summary']} | {e['start']} → {e['end']}" f"{f' | Location: {e["location"]}' if e.get('location') else ''}" f"{f' | {e["description"][:200]}' if e.get('description') else ''}" for e in calendar_events ) sections.append( f"GOOGLE CALENDAR DATA (source of truth for time/date/location):\n{cal_text}" ) if brain_chunks: brain_text = "\n\n".join( f"[Source: {c['metadata'].get('subject', 'Unknown')}, " f"{c['metadata'].get('date', 'Unknown')}]\n{c['document'][:800]}" for c in brain_chunks if c.get("score", 0) >= 0.3 ) if brain_text: sections.append( f"EMAIL/NEWSLETTER CONTEXT (details, requirements, dress codes, what to bring):\n{brain_text}" ) context_block = "\n\n".join(sections) if sections else "No context found." return f"""You are a retrieval assistant answering a question using both calendar data and email context. INSTRUCTIONS: - Answer the user's question using the provided data only - Be brief and conversational (like texting a spouse) - DO NOT ask follow-up questions - DO NOT offer to perform actions (like updating the calendar) - Just provide the information - **Calendar is the source of truth for time/date/location** — if the Calendar and email context conflict on scheduling details, trust the Calendar - Use email/newsletter context for details like dress codes, what to bring, requirements, signup links - If neither source answers the question, say "I don't see that in the emails I've processed" - Avoid bullet points, headers, or robotic formatting QUESTION: {question} {context_block} ANSWER:""" def answer( question: str, top_k: int = 3, where: dict | None = None, ) -> dict: """Answer a question using RAG (retrieve + synthesize). Args: question: The user's question top_k: Number of chunks to retrieve where: Optional filter Returns: Dict with answer, sources, and confidence. """ # Retrieve relevant chunks chunks = query(question, top_k=top_k, where=where) if not chunks or all(c["score"] < 0.3 for c in chunks): return { "answer": "I don't see that in the emails I've processed.", "sources": [], "confidence": "low", } # Synthesize answer with local LLM (Beelink → localhost Ollama) prompt = _build_synthesis_prompt(question, chunks) payload = { "model": LLM_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, "stream": False, } resp = requests.post(LLM_URL, json=payload, timeout=LLM_TIMEOUT) resp.raise_for_status() data = resp.json() # Handle both formats choices = data.get("choices", []) if choices: answer_text = choices[0].get("message", {}).get("content", "").strip() elif "message" in data: answer_text = data["message"].get("content", "").strip() else: answer_text = data.get("response", "").strip() # Clean up common LLM artifacts answer_text = answer_text.replace("ANSWER:", "").replace("Answer:", "").strip() return { "answer": answer_text, "sources": [ { "subject": c["metadata"].get("subject"), "date": c["metadata"].get("date"), "score": round(c["score"], 3), } for c in chunks ], "confidence": "high" if chunks[0]["score"] > 0.7 else "medium", } def stats() -> dict: """Get statistics about the Family Brain.""" client = _get_chroma_client() collection = _get_collection() return { "db_path": str(CHROMA_DB_PATH), "collection": "family_knowledge", "count": collection.count(), "collections": client.list_collections(), } def purge_old(months: int = DEFAULT_TTL_MONTHS) -> dict: """Remove documents older than the specified TTL. Returns: Dict with count of deleted documents. """ cutoff = datetime.now(CHICAGO_TZ) - timedelta(days=months * 30) cutoff_iso = cutoff.isoformat() collection = _get_collection() # Get all documents all_docs = collection.get(include=["metadatas"]) ids_to_delete = [] for doc_id, meta in zip(all_docs.get("ids", []), all_docs.get("metadatas", [])): ingested = meta.get("ingested_at", "") if ingested and ingested < cutoff_iso: ids_to_delete.append(doc_id) if ids_to_delete: collection.delete(ids=ids_to_delete) return { "purged": len(ids_to_delete), "cutoff": cutoff_iso, "remaining": collection.count(), }