📄 rejection_engine.py 14,432 bytes Apr 19, 2026 📋 Raw

!/usr/bin/env python3

"""Rejection engine for calendar events.

Handles two flows:
1. LLM-based rejection: parse natural language ("we don't need that mass on
the calendar, ignore it for future newsletters") into a structured rule
2. Rule-based filtering: during newsletter processing, skip events that match
known rejection rules

Rejection rules are stored in family.yaml under the 'rejections' key:
rejections:
- pattern: "First Communion Mass"
reason: "Kids not at communion age yet"
scope: "all" # all|event|newsletter
created: "2026-04-16"
- pattern: "STAR Testing"
reason: "Not applicable to our grades"
scope: "newsletter"
created: "2026-04-16"

Scope meanings:
- "all": reject from both calendar AND future newsletter extraction
- "event": only reject calendar creation (still appears in newsletter digest)
- "newsletter": only skip in newsletter extraction (still shows as info)
"""

import json
import os
import re
import sys
from datetime import date, datetime

import yaml

from .config import (
FAMILY_CONFIG_PATH,
LLM_MODEL,
LLM_URL,
LLM_TIMEOUT,
)

def _load_family_config():
"""Load family.yaml, returning the full dict."""
config_path = FAMILY_CONFIG_PATH
if not config_path or not os.path.exists(config_path):
# Search common locations
for candidate in [
os.path.join(os.getcwd(), "family.yaml"),
os.path.join(os.getcwd(), "scripts", "family.yaml"),
os.path.expanduser("~/.config/family-assistant/family.yaml"),
]:
if os.path.exists(candidate):
config_path = candidate
break

if not config_path or not os.path.exists(config_path):
    return {"family": {"members": []}, "rejections": []}

with open(config_path, "r") as f:
    return yaml.safe_load(f) or {"family": {"members": []}, "rejections": []}

def _save_family_config(config):
"""Save updated family.yaml."""
config_path = FAMILY_CONFIG_PATH
if not config_path or not os.path.exists(config_path):
for candidate in [
os.path.join(os.getcwd(), "family.yaml"),
os.path.join(os.getcwd(), "scripts", "family.yaml"),
os.path.expanduser("~/.config/family-assistant/family.yaml"),
]:
if os.path.exists(candidate):
config_path = candidate
break

if not config_path:
    raise FileNotFoundError("Cannot find family.yaml to save rejection rules")

with open(config_path, "w") as f:
    yaml.dump(config, f, default_flow_style=False, sort_keys=False)

def get_rejection_rules():
"""Get current rejection rules from family.yaml."""
config = _load_family_config()
return config.get("rejections", [])

def add_rejection_rule(pattern, reason, scope="all"):
"""Add a rejection rule to family.yaml.

Args:
    pattern: Event summary pattern to reject (substring match)
    reason: Human-readable reason for the rejection
    scope: "all", "event", or "newsletter"

Returns:
    The new rule dict
"""
config = _load_family_config()
if "rejections" not in config:
    config["rejections"] = []

# Check for duplicate
for existing in config["rejections"]:
    if existing.get("pattern", "").lower() == pattern.lower():
        # Update existing rule
        existing["reason"] = reason
        existing["scope"] = scope
        existing["updated"] = date.today().isoformat()
        _save_family_config(config)
        return existing

rule = {
    "pattern": pattern,
    "reason": reason,
    "scope": scope,
    "created": date.today().isoformat(),
}
config["rejections"].append(rule)
_save_family_config(config)
return rule

def remove_rejection_rule(pattern):
"""Remove a rejection rule by pattern."""
config = _load_family_config()
rules = config.get("rejections", [])
before = len(rules)
config["rejections"] = [r for r in rules if r.get("pattern", "").lower() != pattern.lower()]
_save_family_config(config)
return len(config["rejections"]) < before

def should_reject(item, scope="event"):
"""Check if a newsletter/calendar item should be rejected.

Args:
    item: Dict with at least 'summary' key
    scope: Check against this scope ("event" or "newsletter")

Returns:
    Matching rule dict if rejected, None if not rejected
"""
rules = get_rejection_rules()
summary = item.get("summary", "").lower()

for rule in rules:
    pattern = rule.get("pattern", "").lower()
    rule_scope = rule.get("scope", "all")

    # Check if pattern matches (substring match)
    if pattern and pattern in summary:
        # Check scope: "all" matches any scope, otherwise must match exactly
        if rule_scope == "all" or rule_scope == scope:
            return rule

return None

def shadow_filter_items(items, scope="event"):
"""Apply rejection rules as a shadow filter — items are NOT removed.

Instead, rejected items are flagged as low_relevance with a note about
which rule caught them. This ensures the user still sees what was filtered
and can restore items if needed.

Args:
    items: List of newsletter item dicts
    scope: "event" or "newsletter"

Returns:
    List of items with rejected ones flagged as low_relevance
"""
rules = get_rejection_rules()
result = []
for item in items:
    rule = should_reject(item, scope=scope)
    if rule:
        # Downgrade to low_relevance info instead of removing
        item = dict(item)  # shallow copy
        item["original_type"] = item.get("type", "info")
        item["original_relevance"] = item.get("relevance", "high")
        item["type"] = "info"
        item["relevance"] = "low"
        item["reason"] = f"Auto-filtered: {rule.get('pattern', '')} ({rule.get('reason', '')})"
        item["_auto_filtered"] = True
    result.append(item)
return result

def filter_items(items, scope="event"):
"""Filter a list of newsletter items, removing rejected ones.

Args:
    items: List of item dicts with 'summary' keys
    scope: "event" or "newsletter"

Returns:
    Tuple of (kept_items, rejected_items_with_rules)
"""
kept = []
rejected = []

for item in items:
    rule = should_reject(item, scope=scope)
    if rule:
        rejected.append({"item": item, "rule": rule})
    else:
        kept.append(item)

return kept, rejected

def parse_rejection_intent(text, event_summary=""):
"""Use the LLM to parse a natural language rejection into a structured rule.

Args:
    text: User's rejection message (e.g., "we don't need that mass, ignore it")
    event_summary: The event being rejected (for context)

Returns:
    Dict with pattern, reason, scope keys, or None on failure
"""
import requests

system_msg = (
    "You are a calendar rejection parser. Given a user's message about an event "
    "they want to reject from their family calendar, extract a structured rejection rule.\n\n"
    "Rules:\n"
    "- 'pattern' MUST be a short, specific substring extracted directly from the event summary — "
    "do NOT generalize or infer. Use the exact wording from the event name.\n"
    "  Good: 'First Communion Mass', 'STAR Testing', 'WI Tornado Drill'\n"
    "  Bad: 'communion', 'testing', 'drill' (too broad, will over-match)\n"
    "- If the user says 'just this one', 'skip this time', or 'remove this event', set scope to 'event' "
    "(only skip calendar creation for this specific occurrence, still show in newsletter).\n"
    "- If the user explicitly says 'always skip', 'ignore future', 'we don't do that', or 'never again', "
    "set scope to 'all' (skip from both calendar and newsletter extraction).\n"
    "- DEFAULT to scope 'event' — do NOT set scope 'all' unless the user explicitly says so.\n"
    "- If the user says 'not relevant to newsletter', set scope to 'newsletter'.\n"
    "- 'reason' should be a brief human-readable explanation.\n"
    "- Return ONLY a JSON object with keys: pattern, reason, scope. No markdown, no explanation."
)

user_msg = f"Event: {event_summary}\nUser message: {text}"

try:
    # LLM_URL already includes the full path (e.g., http://host:11434/v1/chat/completions)
    resp = requests.post(
        LLM_URL,
        json={
            "model": LLM_MODEL,
            "messages": [
                {"role": "system", "content": system_msg},
                {"role": "user", "content": user_msg},
            ],
            "temperature": 0.1,
            "max_tokens": 256,
        },
        timeout=LLM_TIMEOUT,
    )

    if resp.status_code != 200:
        print(f"[Rejection] LLM error: HTTP {resp.status_code}", file=sys.stderr)
        return None

    content = resp.json()["choices"][0]["message"]["content"].strip()

    # Strip code fences if present
    if content.startswith("```"):
        content = re.sub(r"^```(?:json)?\s*\n?", "", content)
        content = re.sub(r"\n?```\s*$", "", content)
        content = content.strip()

    parsed = json.loads(content)
    if "pattern" in parsed and "reason" in parsed:
        parsed.setdefault("scope", "all")
        return parsed

except json.JSONDecodeError:
    # Try to extract JSON from the response
    match = re.search(r"\{.*\}", content, re.DOTALL)
    if match:
        try:
            parsed = json.loads(match.group(0))
            if "pattern" in parsed and "reason" in parsed:
                parsed.setdefault("scope", "all")
                return parsed
        except json.JSONDecodeError:
            pass
except Exception as e:
    print(f"[Rejection] Error: {e}", file=sys.stderr)

return None

def reject_event(event_summary, user_message, delete_from_calendar=True):
"""Full rejection flow: parse intent → create rule → optionally delete event.

Args:
    event_summary: The calendar event summary to reject
    user_message: Natural language rejection reason
    delete_from_calendar: If True, also delete the event from Google Calendar

Returns:
    Dict with status, rule, and any calendar deletion results
"""
# 1. Parse the user's intent with LLM
rule = parse_rejection_intent(user_message, event_summary)

if not rule:
    # Fallback: use the event summary as pattern, user message as reason
    rule = {
        "pattern": event_summary,
        "reason": user_message,
        "scope": "event",  # Default to one-time skip, not permanent
    }

# 2. Save the rule
saved_rule = add_rejection_rule(
    pattern=rule["pattern"],
    reason=rule["reason"],
    scope=rule.get("scope", "all"),
)

result = {
    "status": "rejected",
    "rule": saved_rule,
    "calendar_deleted": False,
}

# 3. Optionally delete from calendar
if delete_from_calendar:
    from .calendar_sync import get_calendar_service, _event_to_dict, _delete_event
    from .config import CHICAGO_TZ
    from datetime import timedelta

    calendar = get_calendar_service()
    now = datetime.now(CHICAGO_TZ)

    # Search for matching events in the next 90 days
    try:
        event_objs = calendar.date_search(start=now, end=now + timedelta(days=90))
    except Exception:
        event_objs = calendar.events()

    pattern = rule["pattern"].lower()
    deleted_ids = []

    for obj in event_objs:
        evt = _event_to_dict(obj)
        if pattern in evt.get("summary", "").lower():
            try:
                _delete_event(evt["id"])
                deleted_ids.append(evt["id"])
            except Exception as e:
                print(f"[Rejection] Failed to delete {evt['id']}: {e}", file=sys.stderr)

    result["calendar_deleted"] = len(deleted_ids) > 0
    result["deleted_count"] = len(deleted_ids)
    result["deleted_ids"] = deleted_ids

return result

if name == "main":
import argparse

parser = argparse.ArgumentParser(description="Rejection engine")
sub = parser.add_subparsers(dest="command")

list_cmd = sub.add_parser("list", help="List current rejection rules")
add_cmd = sub.add_parser("add", help="Add a rejection rule")
add_cmd.add_argument("pattern", help="Event pattern to reject")
add_cmd.add_argument("--reason", default="User rejected", help="Reason")
add_cmd.add_argument("--scope", choices=["all", "event", "newsletter"], default="all")

reject_cmd = sub.add_parser("reject", help="Reject an event with natural language")
reject_cmd.add_argument("event_summary", help="Event summary to reject")
reject_cmd.add_argument("message", help="Natural language rejection reason")
reject_cmd.add_argument("--no-delete", action="store_true", help="Don't delete from calendar")

test_cmd = sub.add_parser("test", help="Test if an event would be rejected")
test_cmd.add_argument("summary", help="Event summary to test")

args = parser.parse_args()

if args.command == "list":
    rules = get_rejection_rules()
    if not rules:
        print("No rejection rules")
    else:
        for r in rules:
            print(f"  [{r.get('scope','?')}] {r['pattern']} — {r.get('reason','')}")

elif args.command == "add":
    rule = add_rejection_rule(args.pattern, args.reason, args.scope)
    print(f"Added: {rule}")

elif args.command == "reject":
    result = reject_event(args.event_summary, args.message, delete_from_calendar=not args.no_delete)
    print(json.dumps(result, indent=2, default=str))

elif args.command == "test":
    rule = should_reject({"summary": args.summary})
    if rule:
        print(f"REJECTED by rule: [{rule.get('scope','?')}] {rule['pattern']} — {rule.get('reason','')}")
    else:
        print("Not rejected")

else:
    parser.print_help()