📄 query.py 6,092 bytes Apr 30, 2026 📋 Raw

"""Query processing with temporal weighting."""

import logging
from datetime import datetime
from enum import Enum
from typing import Optional

logger = logging.getLogger(name)

class QueryPattern(Enum):
"""Query pattern types for temporal weighting."""
LOGISTICAL = "logistical" # Time-sensitive (school schedules)
ENTITY = "entity" # Fact lookup (filter sizes, dates)
HISTORICAL = "historical" # Past events (roofer names)

Pattern configuration

PATTERN_CONFIG = {
QueryPattern.LOGISTICAL: {
"decay": "exponential",
"half_life_days": 7,
"recency_cutoff": 30,
"source_weights": {
"calendar_event": 1.0,
"email": 0.9,
"newsletter": 0.7,
"static_pdf": 0.5
}
},
QueryPattern.ENTITY: {
"decay": "linear",
"half_life_days": 90,
"recency_cutoff": None,
"source_weights": {
"invoice": 0.95,
"receipt": 0.95,
"manual": 0.8,
"email": 0.6,
"static_pdf": 0.5
}
},
QueryPattern.HISTORICAL: {
"decay": "none",
"half_life_days": None,
"recency_cutoff": None,
"source_weights": {
"invoice": 1.0,
"receipt": 1.0,
"email": 0.8,
"photo": 0.7,
"static_pdf": 0.6
}
}
}

def detect_query_pattern(query: str) -> QueryPattern:
"""Detect query pattern from keywords.

Args:
    query: User's natural language query

Returns:
    Detected QueryPattern
"""
logistical_keywords = [
    "today", "tomorrow", "this week", "this month",
    "schedule", "time", "now", "current", "upcoming",
    "next week", "half-day", "early dismissal", "cancelled"
]
entity_keywords = [
    "what size", "what is", "how much", "where", 
    "password", "filter", "number", "phone", "address",
    "size", "cost", "price"
]
historical_keywords = [
    "last year", "ago", "before",
    "previous", "roofer", "plumber", "contractor", "last time",
    "did we", "have we"
]

# Check for explicit pattern signals first
query_lower = query.lower()

# Historical patterns (strongest signal)
for kw in ["last year", "years ago", "roofer", "plumber", "contractor", "replaced", "used"]:
    if kw in query_lower:
        return QueryPattern.HISTORICAL

# Entity patterns (strong signal)
for kw in entity_keywords:
    if kw in query_lower:
        return QueryPattern.ENTITY

# Logistical patterns
for kw in logistical_keywords:
    if kw in query_lower:
        return QueryPattern.LOGISTICAL

# Ambiguous "when" queries - check context
if "when" in query_lower:
    if any(w in query_lower for w in ["was", "did", "last"]):
        return QueryPattern.HISTORICAL
    return QueryPattern.LOGISTICAL

# Default to ENTITY
return QueryPattern.ENTITY

def calculate_temporal_score(
doc_date: datetime,
pattern: QueryPattern,
doc_type: str,
now: Optional[datetime] = None
) -> float:
"""Calculate temporal relevance score.

Args:
    doc_date: Document source date
    pattern: Query pattern for weighting
    doc_type: Document type (invoice, receipt, etc.)
    now: Optional reference date (defaults to now)

Returns:
    Temporal relevance score [0.0, 1.0]
"""
config = PATTERN_CONFIG[pattern]

if now is None:
    now = datetime.now()

age_days = (now - doc_date).days

# Calculate recency multiplier
if config["decay"] == "exponential":
    half_life = config["half_life_days"]
    recency = 2 ** (-age_days / half_life) if half_life > 0 else 1.0
elif config["decay"] == "linear":
    half_life = config["half_life_days"]
    recency = max(0, 1 - (age_days / (half_life * 2))) if half_life else 1.0
else:  # no decay
    recency = 1.0

# Source type weight
type_weight = config["source_weights"].get(doc_type, 0.5)

# Combined score
score = recency * type_weight

# Recency cutoff for logistical queries
if pattern == QueryPattern.LOGISTICAL:
    cutoff = config.get("recency_cutoff")
    if cutoff and age_days > cutoff:
        score *= 0.1  # Drastically reduce, don't zero

return score

def calculate_confidence(
combined_score: float,
pattern: QueryPattern
) -> tuple[float, str]:
"""Calculate confidence level and message.

Args:
    combined_score: Combined semantic + temporal score
    pattern: Query pattern

Returns:
    Tuple of (confidence_score, message)
"""
if pattern == QueryPattern.LOGISTICAL:
    if combined_score < 0.3:
        return 0.2, "Stale information  verify with school/organizer"
    elif combined_score < 0.6:
        return 0.5, "Possibly outdated  check for newer updates"
    else:
        return 0.9, "Recent information found"

elif pattern == QueryPattern.ENTITY:
    if combined_score < 0.4:
        return 0.4, "Entity found in old document  verify if still current"
    else:
        return 0.85, "Entity found with source document"

else:  # historical
    return 0.8, "Historical record found"

def format_answer(
result: dict,
pattern: QueryPattern
) -> str:
"""Format a simple answer from search result.

Args:
    result: Top search result
    pattern: Query pattern used

Returns:
    Formatted answer string
"""
metadata = result.get("metadata", {})
doc_type = metadata.get("doc_type", "document")
source_date = metadata.get("source_date", "unknown date")
text = result.get("text", "")[:300]

# Article for doc_type
article = "an" if doc_type[0].lower() in "aeiou" else "a"

return f"Based on {article} {doc_type} from {source_date}: {text}..."