"""Family Brain — Local RAG retrieval for family knowledge. The Family Brain is a ChromaDB-based vector store that remembers everything parsed from emails and newsletters. It answers questions like "what do the kids need for the field trip?" by semantically retrieving the relevant context. HARDWARE TOPOLOGY (Tri-Node): - ChromaDB data files: Beelink (~/.family_assistant/chroma_db/) - Embeddings: Gaming PC Ollama (nomic-embed-text) via Tailscale API - Retrieval + synthesis: Beelink local LLM (qwen2.5-coder:7b via localhost) """ import hashlib import json import os from datetime import datetime, timedelta from pathlib import Path from typing import Any import requests from family_assistant.config import CHICAGO_TZ, LLM_MODEL, LLM_URL, LLM_TIMEOUT # ChromaDB data lives on Beelink (where the pipeline runs) CHROMA_DB_PATH = Path.home() / ".family_assistant" / "chroma_db" CHROMA_DB_PATH.mkdir(parents=True, exist_ok=True) # Embedding service runs on Gaming PC via Tailscale OLLAMA_EMBED_URL = os.environ.get( "OLLAMA_EMBED_URL", "http://localhost:11434/api/embeddings" ) EMBEDDING_MODEL = "nomic-embed-text" EMBEDDING_TIMEOUT = 30 # Document TTL (auto-purge old newsletters) DEFAULT_TTL_MONTHS = 12 def _get_chroma_client(): """Lazy-load ChromaDB client (embedded mode, local files).""" import chromadb return chromadb.PersistentClient(path=str(CHROMA_DB_PATH)) def _get_collection(name: str = "family_knowledge"): """Get or create a ChromaDB collection.""" client = _get_chroma_client() return client.get_or_create_collection( name=name, metadata={"hnsw:space": "cosine"}, # Cosine similarity for semantic search ) def _embed_text(text: str) -> list[float]: """Generate embeddings via Ollama API on Gaming PC. Returns a 768-dimensional vector from nomic-embed-text. """ resp = requests.post( OLLAMA_EMBED_URL, json={"model": EMBEDDING_MODEL, "prompt": text}, timeout=EMBEDDING_TIMEOUT, ) resp.raise_for_status() data = resp.json() embedding = data.get("embedding") if not embedding or not isinstance(embedding, list): raise ValueError(f"Unexpected embedding response: {data}") return embedding def _chunk_text(text: str, chunk_size: int = 512, overlap: int = 128) -> list[str]: """Split text into overlapping chunks for better retrieval. Uses sentence-aware splitting (roughly — splits on periods/newlines). """ sentences = [s.strip() for s in text.replace("\n", ". ").split(".") if s.strip()] chunks = [] current_chunk = [] current_len = 0 for sent in sentences: sent_len = len(sent.split()) if current_len + sent_len > chunk_size and current_chunk: chunks.append(". ".join(current_chunk) + ".") # Keep overlap sentences overlap_sentences = current_chunk[-(overlap // 20):] if len(current_chunk) > 1 else [] current_chunk = overlap_sentences + [sent] current_len = sum(len(s.split()) for s in current_chunk) else: current_chunk.append(sent) current_len += sent_len if current_chunk: chunks.append(". ".join(current_chunk) + ".") return chunks if chunks else [text[:chunk_size * 5]] # Fallback def _doc_id(source: str, date: str, chunk_idx: int = 0) -> str: """Generate a deterministic document ID.""" base = f"{source}:{date}:{chunk_idx}" return hashlib.sha256(base.encode()).hexdigest()[:32] def ingest_email( subject: str, body: str, from_addr: str, email_date: str, parsed_items: list[dict], ) -> dict: """Ingest an email into the Family Brain. Args: subject: Email subject line body: Raw email body text from_addr: Sender email address email_date: ISO 8601 date string of the email parsed_items: List of items extracted (appointments, reminders, etc.) Returns: Dict with ids of inserted documents. """ collection = _get_collection() # Build rich text for embedding items_summary = "\n".join( f"- {item.get('type', 'item')}: {item.get('summary', '')} " f"(when: {item.get('start', item.get('due', 'TBD'))!s})" for item in parsed_items ) full_text = f"Subject: {subject}\nFrom: {from_addr}\nDate: {email_date}\n\n{body}\n\nExtracted Items:\n{items_summary}" chunks = _chunk_text(full_text) ids = [] embeddings = [] metadatas = [] documents = [] for idx, chunk in enumerate(chunks): doc_id = _doc_id(subject, email_date, idx) embedding = _embed_text(chunk) ids.append(doc_id) embeddings.append(embedding) documents.append(chunk) # Serialize parsed_items for JSON storage (handle datetime objects) safe_items = json.dumps(parsed_items, default=str) metadatas.append({ "source": "email", "subject": subject, "from": from_addr, "date": email_date, "chunk_idx": idx, "total_chunks": len(chunks), "ingested_at": datetime.now(CHICAGO_TZ).isoformat(), "parsed_items": safe_items, }) collection.upsert( ids=ids, embeddings=embeddings, documents=documents, metadatas=metadatas, ) return {"status": "ingested", "ids": ids, "chunks": len(chunks)} def ingest_newsletter( subject: str, body: str, from_addr: str, email_date: str, items: list[dict], ) -> dict: """Ingest a newsletter into the Family Brain. Stores full newsletter text + structured items for retrieval. Args: subject: Email subject body: Raw newsletter body from_addr: Sender email_date: ISO date items: Parsed items (events, reminders, actions, info) Returns: Dict with ingestion status. """ collection = _get_collection() # Build structured summary from parsed items items_by_type = {} for item in items: t = item.get("type", "info") items_by_type.setdefault(t, []).append(item) items_summary = "" for t, t_items in items_by_type.items(): items_summary += f"\n{t.upper()}:\n" for it in t_items: summary = it.get("summary", "") who = ", ".join(it.get("who", [])) when = it.get("start", it.get("due", "TBD")) desc = it.get("description", "") relevance = it.get("relevance", "high") items_summary += f" - {summary}" if who: items_summary += f" ({who})" items_summary += f" [{when}]" if desc: items_summary += f" — {desc[:100]}" if relevance == "low": items_summary += " [low relevance]" items_summary += "\n" full_text = ( f"Newsletter: {subject}\nFrom: {from_addr}\nDate: {email_date}\n\n" f"{body}\n\n---\nParsed Items:{items_summary}" ) chunks = _chunk_text(full_text) ids = [] embeddings = [] metadatas = [] documents = [] for idx, chunk in enumerate(chunks): doc_id = _doc_id(subject, email_date, idx) embedding = _embed_text(chunk) ids.append(doc_id) embeddings.append(embedding) documents.append(chunk) # Serialize items for JSON storage (handle datetime objects) safe_items = json.dumps(items, default=str) metadatas.append({ "source": "newsletter", "subject": subject, "from": from_addr, "date": email_date, "chunk_idx": idx, "total_chunks": len(chunks), "ingested_at": datetime.now(CHICAGO_TZ).isoformat(), "items": safe_items, "item_count": len(items), }) collection.upsert( ids=ids, embeddings=embeddings, documents=documents, metadatas=metadatas, ) return {"status": "ingested", "ids": ids, "chunks": len(chunks)} def query( question: str, top_k: int = 3, where: dict | None = None, ) -> list[dict]: """Query the Family Brain with a natural language question. Args: question: The user's question top_k: Number of chunks to retrieve where: Optional ChromaDB filter (e.g., {"source": "newsletter"}) Returns: List of retrieved documents with metadata and distance scores. """ collection = _get_collection() query_embedding = _embed_text(question) results = collection.query( query_embeddings=[query_embedding], n_results=top_k, where=where, include=["documents", "metadatas", "distances"], ) # Flatten results documents = results.get("documents", [[]])[0] metadatas = results.get("metadatas", [[]])[0] distances = results.get("distances", [[]])[0] return [ { "document": doc, "metadata": meta, "distance": dist, "score": 1 - dist, # Convert distance to similarity score } for doc, meta, dist in zip(documents, metadatas, distances) ] def _build_synthesis_prompt(question: str, context_chunks: list[dict]) -> str: """Build a prompt for the synthesis LLM. Brief, conversational, no robotic formatting. """ context_text = "\n\n".join( f"[Source: {c['metadata'].get('subject', 'Unknown')} (relevance: {c.get('score', 0):.2f}), " f"{c['metadata'].get('date', 'Unknown')}]\n{c['document'][:800]}" for c in context_chunks ) # Get top score for routing top_score = context_chunks[0].get("score", 0) if context_chunks else 0 return f"""You are a retrieval assistant answering a question based on email and newsletter context. INSTRUCTIONS: - Answer the user's question using the provided context only - Be brief and conversational (like texting a spouse) - DO NOT ask follow-up questions - DO NOT offer to perform actions (like updating the calendar) - Just provide the information - HIGH RELEVANCE (>0.7): Directly answer from the context with confidence - MEDIUM RELEVANCE (0.5-0.7): Answer with "Based on the [source], it appears..." - LOW RELEVANCE (<0.5): Only say "I don't see that in the emails I've processed" - Avoid bullet points, headers, or robotic formatting TOP MATCH SCORE: {top_score:.3f} QUESTION: {question} CONTEXT: {context_text} ANSWER:""" def _build_hybrid_prompt( question: str, calendar_events: list[dict] | None = None, brain_chunks: list[dict] | None = None, ) -> str: """Build a hybrid prompt combining Calendar (source of truth) + Brain context. Calendar is the authoritative source for temporal data (time/date/location). ChromaDB provides detail context (dress codes, what to bring, signup info). """ sections = [] top_score = 0.0 if calendar_events: cal_text = "\n".join( f"- {e['summary']} | {e['start']} → {e['end']}" f"{f' | Location: {e["location"]}' if e.get('location') else ''}" f"{f' | {e["description"][:200]}' if e.get('description') else ''}" for e in calendar_events ) sections.append( f"GOOGLE CALENDAR DATA (source of truth for time/date/location):\n{cal_text}" ) if brain_chunks: # Filter by relevance score >= 0.5 for medium confidence filtered_chunks = [c for c in brain_chunks if c.get("score", 0) >= 0.5] if filtered_chunks: top_score = max(c.get("score", 0) for c in filtered_chunks) brain_text = "\n\n".join( f"[Source: {c['metadata'].get('subject', 'Unknown')} (relevance: {c.get('score', 0):.2f}), " f"{c['metadata'].get('date', 'Unknown')}]\n{c['document'][:800]}" for c in filtered_chunks ) sections.append( f"EMAIL/NEWSLETTER CONTEXT (details, requirements, dress codes, what to bring):\n{brain_text}" ) context_block = "\n\n".join(sections) if sections else "No context found." # Determine response guidance based on top score if top_score >= 0.7: score_guidance = "HIGH RELEVANCE: Answer confidently using the retrieved context." elif top_score >= 0.5: score_guidance = "MEDIUM RELEVANCE: Answer tentatively with 'Based on the emails...'" elif top_score > 0: score_guidance = "LOW RELEVANCE: Only say 'I don't see that in the emails I've processed.'" else: score_guidance = "NO CONTEXT: Say 'I don't see that in the emails I've processed.'" return f"""You are a retrieval assistant answering a question using both calendar data and email context. INSTRUCTIONS: - Answer the user's question using the provided data only - Be brief and conversational (like texting a spouse) - DO NOT ask follow-up questions - DO NOT offer to perform actions (like updating the calendar) - Just provide the information - **Calendar is the source of truth for time/date/location** — if the Calendar and email context conflict on scheduling details, trust the Calendar - Use email/newsletter context for details like dress codes, what to bring, requirements, signup links - {score_guidance} - Avoid bullet points, headers, or robotic formatting QUESTION: {question} {context_block} ANSWER:""" def answer( question: str, top_k: int = 3, where: dict | None = None, ) -> dict: """Answer a question using RAG (retrieve + synthesize). Args: question: The user's question top_k: Number of chunks to retrieve where: Optional filter Returns: Dict with answer, sources, and confidence. """ # Retrieve relevant chunks chunks = query(question, top_k=top_k, where=where) # Filter chunks by relevance score (minimum 0.5 threshold) relevant_chunks = [c for c in chunks if c.get("score", 0) >= 0.5] if not relevant_chunks: return { "answer": "I don't see that in the emails I've processed.", "sources": [], "confidence": "low", } # Synthesize answer with local LLM (Beelink → localhost Ollama) prompt = _build_synthesis_prompt(question, relevant_chunks) payload = { "model": LLM_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, "stream": False, } resp = requests.post(LLM_URL, json=payload, timeout=LLM_TIMEOUT) resp.raise_for_status() data = resp.json() # Handle both formats choices = data.get("choices", []) if choices: answer_text = choices[0].get("message", {}).get("content", "").strip() elif "message" in data: answer_text = data["message"].get("content", "").strip() else: answer_text = data.get("response", "").strip() # Clean up common LLM artifacts answer_text = answer_text.replace("ANSWER:", "").replace("Answer:", "").strip() # Determine confidence based on top score top_score = relevant_chunks[0]["score"] if relevant_chunks else 0 if top_score >= 0.7: confidence = "high" elif top_score >= 0.5: confidence = "medium" else: confidence = "low" return { "answer": answer_text, "sources": [ { "subject": c["metadata"].get("subject"), "date": c["metadata"].get("date"), "score": round(c["score"], 3), } for c in relevant_chunks ], "confidence": confidence, } def stats() -> dict: """Get statistics about the Family Brain.""" client = _get_chroma_client() collection = _get_collection() return { "db_path": str(CHROMA_DB_PATH), "collection": "family_knowledge", "count": collection.count(), "collections": client.list_collections(), } def purge_old(months: int = DEFAULT_TTL_MONTHS) -> dict: """Remove documents older than the specified TTL. Returns: Dict with count of deleted documents. """ cutoff = datetime.now(CHICAGO_TZ) - timedelta(days=months * 30) cutoff_iso = cutoff.isoformat() collection = _get_collection() # Get all documents all_docs = collection.get(include=["metadatas"]) ids_to_delete = [] for doc_id, meta in zip(all_docs.get("ids", []), all_docs.get("metadatas", [])): ingested = meta.get("ingested_at", "") if ingested and ingested < cutoff_iso: ids_to_delete.append(doc_id) if ids_to_delete: collection.delete(ids=ids_to_delete) return { "purged": len(ids_to_delete), "cutoff": cutoff_iso, "remaining": collection.count(), }