"""Query processing with temporal weighting."""
import logging
from datetime import datetime
from enum import Enum
from typing import Optional
logger = logging.getLogger(name)
class QueryPattern(Enum):
"""Query pattern types for temporal weighting."""
LOGISTICAL = "logistical" # Time-sensitive (school schedules)
ENTITY = "entity" # Fact lookup (filter sizes, dates)
HISTORICAL = "historical" # Past events (roofer names)
Pattern configuration
PATTERN_CONFIG = {
QueryPattern.LOGISTICAL: {
"decay": "exponential",
"half_life_days": 7,
"recency_cutoff": 30,
"source_weights": {
"calendar_event": 1.0,
"email": 0.9,
"newsletter": 0.7,
"static_pdf": 0.5
}
},
QueryPattern.ENTITY: {
"decay": "linear",
"half_life_days": 90,
"recency_cutoff": None,
"source_weights": {
"invoice": 0.95,
"receipt": 0.95,
"manual": 0.8,
"email": 0.6,
"static_pdf": 0.5
}
},
QueryPattern.HISTORICAL: {
"decay": "none",
"half_life_days": None,
"recency_cutoff": None,
"source_weights": {
"invoice": 1.0,
"receipt": 1.0,
"email": 0.8,
"photo": 0.7,
"static_pdf": 0.6
}
}
}
def detect_query_pattern(query: str) -> QueryPattern:
"""Detect query pattern from keywords.
Args:
query: User's natural language query
Returns:
Detected QueryPattern
"""
logistical_keywords = [
"today", "tomorrow", "this week", "this month",
"schedule", "time", "now", "current", "upcoming",
"next week", "half-day", "early dismissal", "cancelled"
]
entity_keywords = [
"what size", "what is", "how much", "where",
"password", "filter", "number", "phone", "address",
"size", "cost", "price"
]
historical_keywords = [
"last year", "ago", "before",
"previous", "roofer", "plumber", "contractor", "last time",
"did we", "have we"
]
# Check for explicit pattern signals first
query_lower = query.lower()
# Historical patterns (strongest signal)
for kw in ["last year", "years ago", "roofer", "plumber", "contractor", "replaced", "used"]:
if kw in query_lower:
return QueryPattern.HISTORICAL
# Entity patterns (strong signal)
for kw in entity_keywords:
if kw in query_lower:
return QueryPattern.ENTITY
# Logistical patterns
for kw in logistical_keywords:
if kw in query_lower:
return QueryPattern.LOGISTICAL
# Ambiguous "when" queries - check context
if "when" in query_lower:
if any(w in query_lower for w in ["was", "did", "last"]):
return QueryPattern.HISTORICAL
return QueryPattern.LOGISTICAL
# Default to ENTITY
return QueryPattern.ENTITY
def calculate_temporal_score(
doc_date: datetime,
pattern: QueryPattern,
doc_type: str,
now: Optional[datetime] = None
) -> float:
"""Calculate temporal relevance score.
Args:
doc_date: Document source date
pattern: Query pattern for weighting
doc_type: Document type (invoice, receipt, etc.)
now: Optional reference date (defaults to now)
Returns:
Temporal relevance score [0.0, 1.0]
"""
config = PATTERN_CONFIG[pattern]
if now is None:
now = datetime.now()
age_days = (now - doc_date).days
# Calculate recency multiplier
if config["decay"] == "exponential":
half_life = config["half_life_days"]
recency = 2 ** (-age_days / half_life) if half_life > 0 else 1.0
elif config["decay"] == "linear":
half_life = config["half_life_days"]
recency = max(0, 1 - (age_days / (half_life * 2))) if half_life else 1.0
else: # no decay
recency = 1.0
# Source type weight
type_weight = config["source_weights"].get(doc_type, 0.5)
# Combined score
score = recency * type_weight
# Recency cutoff for logistical queries
if pattern == QueryPattern.LOGISTICAL:
cutoff = config.get("recency_cutoff")
if cutoff and age_days > cutoff:
score *= 0.1 # Drastically reduce, don't zero
return score
def calculate_confidence(
combined_score: float,
pattern: QueryPattern
) -> tuple[float, str]:
"""Calculate confidence level and message.
Args:
combined_score: Combined semantic + temporal score
pattern: Query pattern
Returns:
Tuple of (confidence_score, message)
"""
if pattern == QueryPattern.LOGISTICAL:
if combined_score < 0.3:
return 0.2, "Stale information — verify with school/organizer"
elif combined_score < 0.6:
return 0.5, "Possibly outdated — check for newer updates"
else:
return 0.9, "Recent information found"
elif pattern == QueryPattern.ENTITY:
if combined_score < 0.4:
return 0.4, "Entity found in old document — verify if still current"
else:
return 0.85, "Entity found with source document"
else: # historical
return 0.8, "Historical record found"
def format_answer(
result: dict,
pattern: QueryPattern
) -> str:
"""Format a simple answer from search result.
Args:
result: Top search result
pattern: Query pattern used
Returns:
Formatted answer string
"""
metadata = result.get("metadata", {})
doc_type = metadata.get("doc_type", "document")
source_date = metadata.get("source_date", "unknown date")
text = result.get("text", "")[:300]
# Article for doc_type
article = "an" if doc_type[0].lower() in "aeiou" else "a"
return f"Based on {article} {doc_type} from {source_date}: {text}..."