!/usr/bin/env python3
"""Rejection engine for calendar events.
Handles two flows:
1. LLM-based rejection: parse natural language ("we don't need that mass on
the calendar, ignore it for future newsletters") into a structured rule
2. Rule-based filtering: during newsletter processing, skip events that match
known rejection rules
Rejection rules are stored in family.yaml under the 'rejections' key:
rejections:
- pattern: "First Communion Mass"
reason: "Kids not at communion age yet"
scope: "all" # all|event|newsletter
created: "2026-04-16"
- pattern: "STAR Testing"
reason: "Not applicable to our grades"
scope: "newsletter"
created: "2026-04-16"
Scope meanings:
- "all": reject from both calendar AND future newsletter extraction
- "event": only reject calendar creation (still appears in newsletter digest)
- "newsletter": only skip in newsletter extraction (still shows as info)
"""
import json
import os
import re
import sys
from datetime import date, datetime
import yaml
from .config import (
FAMILY_CONFIG_PATH,
LLM_MODEL,
LLM_URL,
LLM_TIMEOUT,
)
def _load_family_config():
"""Load family.yaml, returning the full dict."""
config_path = FAMILY_CONFIG_PATH
if not config_path or not os.path.exists(config_path):
# Search common locations
for candidate in [
os.path.join(os.getcwd(), "family.yaml"),
os.path.join(os.getcwd(), "scripts", "family.yaml"),
os.path.expanduser("~/.config/family-assistant/family.yaml"),
]:
if os.path.exists(candidate):
config_path = candidate
break
if not config_path or not os.path.exists(config_path):
return {"family": {"members": []}, "rejections": []}
with open(config_path, "r") as f:
return yaml.safe_load(f) or {"family": {"members": []}, "rejections": []}
def _save_family_config(config):
"""Save updated family.yaml."""
config_path = FAMILY_CONFIG_PATH
if not config_path or not os.path.exists(config_path):
for candidate in [
os.path.join(os.getcwd(), "family.yaml"),
os.path.join(os.getcwd(), "scripts", "family.yaml"),
os.path.expanduser("~/.config/family-assistant/family.yaml"),
]:
if os.path.exists(candidate):
config_path = candidate
break
if not config_path:
raise FileNotFoundError("Cannot find family.yaml to save rejection rules")
with open(config_path, "w") as f:
yaml.dump(config, f, default_flow_style=False, sort_keys=False)
def get_rejection_rules():
"""Get current rejection rules from family.yaml."""
config = _load_family_config()
return config.get("rejections", [])
def add_rejection_rule(pattern, reason, scope="all"):
"""Add a rejection rule to family.yaml.
Args:
pattern: Event summary pattern to reject (substring match)
reason: Human-readable reason for the rejection
scope: "all", "event", or "newsletter"
Returns:
The new rule dict
"""
config = _load_family_config()
if "rejections" not in config:
config["rejections"] = []
# Check for duplicate
for existing in config["rejections"]:
if existing.get("pattern", "").lower() == pattern.lower():
# Update existing rule
existing["reason"] = reason
existing["scope"] = scope
existing["updated"] = date.today().isoformat()
_save_family_config(config)
return existing
rule = {
"pattern": pattern,
"reason": reason,
"scope": scope,
"created": date.today().isoformat(),
}
config["rejections"].append(rule)
_save_family_config(config)
return rule
def remove_rejection_rule(pattern):
"""Remove a rejection rule by pattern."""
config = _load_family_config()
rules = config.get("rejections", [])
before = len(rules)
config["rejections"] = [r for r in rules if r.get("pattern", "").lower() != pattern.lower()]
_save_family_config(config)
return len(config["rejections"]) < before
def should_reject(item, scope="event"):
"""Check if a newsletter/calendar item should be rejected.
Args:
item: Dict with at least 'summary' key
scope: Check against this scope ("event" or "newsletter")
Returns:
Matching rule dict if rejected, None if not rejected
"""
rules = get_rejection_rules()
summary = item.get("summary", "").lower()
for rule in rules:
pattern = rule.get("pattern", "").lower()
rule_scope = rule.get("scope", "all")
# Check if pattern matches (substring match)
if pattern and pattern in summary:
# Check scope: "all" matches any scope, otherwise must match exactly
if rule_scope == "all" or rule_scope == scope:
return rule
return None
def shadow_filter_items(items, scope="event"):
"""Apply rejection rules as a shadow filter — items are NOT removed.
Instead, rejected items are flagged as low_relevance with a note about
which rule caught them. This ensures the user still sees what was filtered
and can restore items if needed.
Args:
items: List of newsletter item dicts
scope: "event" or "newsletter"
Returns:
List of items with rejected ones flagged as low_relevance
"""
rules = get_rejection_rules()
result = []
for item in items:
rule = should_reject(item, scope=scope)
if rule:
# Downgrade to low_relevance info instead of removing
item = dict(item) # shallow copy
item["original_type"] = item.get("type", "info")
item["original_relevance"] = item.get("relevance", "high")
item["type"] = "info"
item["relevance"] = "low"
item["reason"] = f"Auto-filtered: {rule.get('pattern', '')} ({rule.get('reason', '')})"
item["_auto_filtered"] = True
result.append(item)
return result
def filter_items(items, scope="event"):
"""Filter a list of newsletter items, removing rejected ones.
Args:
items: List of item dicts with 'summary' keys
scope: "event" or "newsletter"
Returns:
Tuple of (kept_items, rejected_items_with_rules)
"""
kept = []
rejected = []
for item in items:
rule = should_reject(item, scope=scope)
if rule:
rejected.append({"item": item, "rule": rule})
else:
kept.append(item)
return kept, rejected
def parse_rejection_intent(text, event_summary=""):
"""Use the LLM to parse a natural language rejection into a structured rule.
Args:
text: User's rejection message (e.g., "we don't need that mass, ignore it")
event_summary: The event being rejected (for context)
Returns:
Dict with pattern, reason, scope keys, or None on failure
"""
import requests
system_msg = (
"You are a calendar rejection parser. Given a user's message about an event "
"they want to reject from their family calendar, extract a structured rejection rule.\n\n"
"Rules:\n"
"- 'pattern' MUST be a short, specific substring extracted directly from the event summary — "
"do NOT generalize or infer. Use the exact wording from the event name.\n"
" Good: 'First Communion Mass', 'STAR Testing', 'WI Tornado Drill'\n"
" Bad: 'communion', 'testing', 'drill' (too broad, will over-match)\n"
"- If the user says 'just this one', 'skip this time', or 'remove this event', set scope to 'event' "
"(only skip calendar creation for this specific occurrence, still show in newsletter).\n"
"- If the user explicitly says 'always skip', 'ignore future', 'we don't do that', or 'never again', "
"set scope to 'all' (skip from both calendar and newsletter extraction).\n"
"- DEFAULT to scope 'event' — do NOT set scope 'all' unless the user explicitly says so.\n"
"- If the user says 'not relevant to newsletter', set scope to 'newsletter'.\n"
"- 'reason' should be a brief human-readable explanation.\n"
"- Return ONLY a JSON object with keys: pattern, reason, scope. No markdown, no explanation."
)
user_msg = f"Event: {event_summary}\nUser message: {text}"
try:
# LLM_URL already includes the full path (e.g., http://host:11434/v1/chat/completions)
resp = requests.post(
LLM_URL,
json={
"model": LLM_MODEL,
"messages": [
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg},
],
"temperature": 0.1,
"max_tokens": 256,
},
timeout=LLM_TIMEOUT,
)
if resp.status_code != 200:
print(f"[Rejection] LLM error: HTTP {resp.status_code}", file=sys.stderr)
return None
content = resp.json()["choices"][0]["message"]["content"].strip()
# Strip code fences if present
if content.startswith("```"):
content = re.sub(r"^```(?:json)?\s*\n?", "", content)
content = re.sub(r"\n?```\s*$", "", content)
content = content.strip()
parsed = json.loads(content)
if "pattern" in parsed and "reason" in parsed:
parsed.setdefault("scope", "all")
return parsed
except json.JSONDecodeError:
# Try to extract JSON from the response
match = re.search(r"\{.*\}", content, re.DOTALL)
if match:
try:
parsed = json.loads(match.group(0))
if "pattern" in parsed and "reason" in parsed:
parsed.setdefault("scope", "all")
return parsed
except json.JSONDecodeError:
pass
except Exception as e:
print(f"[Rejection] Error: {e}", file=sys.stderr)
return None
def reject_event(event_summary, user_message, delete_from_calendar=True):
"""Full rejection flow: parse intent → create rule → optionally delete event.
Args:
event_summary: The calendar event summary to reject
user_message: Natural language rejection reason
delete_from_calendar: If True, also delete the event from Google Calendar
Returns:
Dict with status, rule, and any calendar deletion results
"""
# 1. Parse the user's intent with LLM
rule = parse_rejection_intent(user_message, event_summary)
if not rule:
# Fallback: use the event summary as pattern, user message as reason
rule = {
"pattern": event_summary,
"reason": user_message,
"scope": "event", # Default to one-time skip, not permanent
}
# 2. Save the rule
saved_rule = add_rejection_rule(
pattern=rule["pattern"],
reason=rule["reason"],
scope=rule.get("scope", "all"),
)
result = {
"status": "rejected",
"rule": saved_rule,
"calendar_deleted": False,
}
# 3. Optionally delete from calendar
if delete_from_calendar:
from .calendar_sync import get_calendar_service, _event_to_dict, _delete_event
from .config import CHICAGO_TZ
from datetime import timedelta
calendar = get_calendar_service()
now = datetime.now(CHICAGO_TZ)
# Search for matching events in the next 90 days
try:
event_objs = calendar.date_search(start=now, end=now + timedelta(days=90))
except Exception:
event_objs = calendar.events()
pattern = rule["pattern"].lower()
deleted_ids = []
for obj in event_objs:
evt = _event_to_dict(obj)
if pattern in evt.get("summary", "").lower():
try:
_delete_event(evt["id"])
deleted_ids.append(evt["id"])
except Exception as e:
print(f"[Rejection] Failed to delete {evt['id']}: {e}", file=sys.stderr)
result["calendar_deleted"] = len(deleted_ids) > 0
result["deleted_count"] = len(deleted_ids)
result["deleted_ids"] = deleted_ids
return result
if name == "main":
import argparse
parser = argparse.ArgumentParser(description="Rejection engine")
sub = parser.add_subparsers(dest="command")
list_cmd = sub.add_parser("list", help="List current rejection rules")
add_cmd = sub.add_parser("add", help="Add a rejection rule")
add_cmd.add_argument("pattern", help="Event pattern to reject")
add_cmd.add_argument("--reason", default="User rejected", help="Reason")
add_cmd.add_argument("--scope", choices=["all", "event", "newsletter"], default="all")
reject_cmd = sub.add_parser("reject", help="Reject an event with natural language")
reject_cmd.add_argument("event_summary", help="Event summary to reject")
reject_cmd.add_argument("message", help="Natural language rejection reason")
reject_cmd.add_argument("--no-delete", action="store_true", help="Don't delete from calendar")
test_cmd = sub.add_parser("test", help="Test if an event would be rejected")
test_cmd.add_argument("summary", help="Event summary to test")
args = parser.parse_args()
if args.command == "list":
rules = get_rejection_rules()
if not rules:
print("No rejection rules")
else:
for r in rules:
print(f" [{r.get('scope','?')}] {r['pattern']} — {r.get('reason','')}")
elif args.command == "add":
rule = add_rejection_rule(args.pattern, args.reason, args.scope)
print(f"Added: {rule}")
elif args.command == "reject":
result = reject_event(args.event_summary, args.message, delete_from_calendar=not args.no_delete)
print(json.dumps(result, indent=2, default=str))
elif args.command == "test":
rule = should_reject({"summary": args.summary})
if rule:
print(f"REJECTED by rule: [{rule.get('scope','?')}] {rule['pattern']} — {rule.get('reason','')}")
else:
print("Not rejected")
else:
parser.print_help()