"""Query processing with temporal weighting.""" import logging from datetime import datetime from enum import Enum from typing import Optional logger = logging.getLogger(__name__) class QueryPattern(Enum): """Query pattern types for temporal weighting.""" LOGISTICAL = "logistical" # Time-sensitive (school schedules) ENTITY = "entity" # Fact lookup (filter sizes, dates) HISTORICAL = "historical" # Past events (roofer names) # Pattern configuration PATTERN_CONFIG = { QueryPattern.LOGISTICAL: { "decay": "exponential", "half_life_days": 7, "recency_cutoff": 30, "source_weights": { "calendar_event": 1.0, "email": 0.9, "newsletter": 0.7, "static_pdf": 0.5 } }, QueryPattern.ENTITY: { "decay": "linear", "half_life_days": 90, "recency_cutoff": None, "source_weights": { "invoice": 0.95, "receipt": 0.95, "manual": 0.8, "email": 0.6, "static_pdf": 0.5 } }, QueryPattern.HISTORICAL: { "decay": "none", "half_life_days": None, "recency_cutoff": None, "source_weights": { "invoice": 1.0, "receipt": 1.0, "email": 0.8, "photo": 0.7, "static_pdf": 0.6 } } } def detect_query_pattern(query: str) -> QueryPattern: """Detect query pattern from keywords. Args: query: User's natural language query Returns: Detected QueryPattern """ logistical_keywords = [ "today", "tomorrow", "this week", "this month", "schedule", "time", "now", "current", "upcoming", "next week", "half-day", "early dismissal", "cancelled" ] entity_keywords = [ "what size", "what is", "how much", "where", "password", "filter", "number", "phone", "address", "size", "cost", "price" ] historical_keywords = [ "last year", "ago", "before", "previous", "roofer", "plumber", "contractor", "last time", "did we", "have we" ] # Check for explicit pattern signals first query_lower = query.lower() # Historical patterns (strongest signal) for kw in ["last year", "years ago", "roofer", "plumber", "contractor", "replaced", "used"]: if kw in query_lower: return QueryPattern.HISTORICAL # Entity patterns (strong signal) for kw in entity_keywords: if kw in query_lower: return QueryPattern.ENTITY # Logistical patterns for kw in logistical_keywords: if kw in query_lower: return QueryPattern.LOGISTICAL # Ambiguous "when" queries - check context if "when" in query_lower: if any(w in query_lower for w in ["was", "did", "last"]): return QueryPattern.HISTORICAL return QueryPattern.LOGISTICAL # Default to ENTITY return QueryPattern.ENTITY def calculate_temporal_score( doc_date: datetime, pattern: QueryPattern, doc_type: str, now: Optional[datetime] = None ) -> float: """Calculate temporal relevance score. Args: doc_date: Document source date pattern: Query pattern for weighting doc_type: Document type (invoice, receipt, etc.) now: Optional reference date (defaults to now) Returns: Temporal relevance score [0.0, 1.0] """ config = PATTERN_CONFIG[pattern] if now is None: now = datetime.now() age_days = (now - doc_date).days # Calculate recency multiplier if config["decay"] == "exponential": half_life = config["half_life_days"] recency = 2 ** (-age_days / half_life) if half_life > 0 else 1.0 elif config["decay"] == "linear": half_life = config["half_life_days"] recency = max(0, 1 - (age_days / (half_life * 2))) if half_life else 1.0 else: # no decay recency = 1.0 # Source type weight type_weight = config["source_weights"].get(doc_type, 0.5) # Combined score score = recency * type_weight # Recency cutoff for logistical queries if pattern == QueryPattern.LOGISTICAL: cutoff = config.get("recency_cutoff") if cutoff and age_days > cutoff: score *= 0.1 # Drastically reduce, don't zero return score def calculate_confidence( combined_score: float, pattern: QueryPattern ) -> tuple[float, str]: """Calculate confidence level and message. Args: combined_score: Combined semantic + temporal score pattern: Query pattern Returns: Tuple of (confidence_score, message) """ if pattern == QueryPattern.LOGISTICAL: if combined_score < 0.3: return 0.2, "Stale information — verify with school/organizer" elif combined_score < 0.6: return 0.5, "Possibly outdated — check for newer updates" else: return 0.9, "Recent information found" elif pattern == QueryPattern.ENTITY: if combined_score < 0.4: return 0.4, "Entity found in old document — verify if still current" else: return 0.85, "Entity found with source document" else: # historical return 0.8, "Historical record found" def format_answer( result: dict, pattern: QueryPattern ) -> str: """Format a simple answer from search result. Args: result: Top search result pattern: Query pattern used Returns: Formatted answer string """ metadata = result.get("metadata", {}) doc_type = metadata.get("doc_type", "document") source_date = metadata.get("source_date", "unknown date") text = result.get("text", "")[:300] # Article for doc_type article = "an" if doc_type[0].lower() in "aeiou" else "a" return f"Based on {article} {doc_type} from {source_date}: {text}..."