#!/usr/bin/env python3 """Rejection engine for calendar events. Handles two flows: 1. LLM-based rejection: parse natural language ("we don't need that mass on the calendar, ignore it for future newsletters") into a structured rule 2. Rule-based filtering: during newsletter processing, skip events that match known rejection rules Rejection rules are stored in family.yaml under the 'rejections' key: rejections: - pattern: "First Communion Mass" reason: "Kids not at communion age yet" scope: "all" # all|event|newsletter created: "2026-04-16" - pattern: "STAR Testing" reason: "Not applicable to our grades" scope: "newsletter" created: "2026-04-16" Scope meanings: - "all": reject from both calendar AND future newsletter extraction - "event": only reject calendar creation (still appears in newsletter digest) - "newsletter": only skip in newsletter extraction (still shows as info) """ import json import os import re import sys from datetime import date, datetime import yaml from .config import ( FAMILY_CONFIG_PATH, LLM_MODEL, LLM_URL, LLM_TIMEOUT, ) def _load_family_config(): """Load family.yaml, returning the full dict.""" config_path = FAMILY_CONFIG_PATH if not config_path or not os.path.exists(config_path): # Search common locations for candidate in [ os.path.join(os.getcwd(), "family.yaml"), os.path.join(os.getcwd(), "scripts", "family.yaml"), os.path.expanduser("~/.config/family-assistant/family.yaml"), ]: if os.path.exists(candidate): config_path = candidate break if not config_path or not os.path.exists(config_path): return {"family": {"members": []}, "rejections": []} with open(config_path, "r") as f: return yaml.safe_load(f) or {"family": {"members": []}, "rejections": []} def _save_family_config(config): """Save updated family.yaml.""" config_path = FAMILY_CONFIG_PATH if not config_path or not os.path.exists(config_path): for candidate in [ os.path.join(os.getcwd(), "family.yaml"), os.path.join(os.getcwd(), "scripts", "family.yaml"), os.path.expanduser("~/.config/family-assistant/family.yaml"), ]: if os.path.exists(candidate): config_path = candidate break if not config_path: raise FileNotFoundError("Cannot find family.yaml to save rejection rules") with open(config_path, "w") as f: yaml.dump(config, f, default_flow_style=False, sort_keys=False) def get_rejection_rules(): """Get current rejection rules from family.yaml.""" config = _load_family_config() return config.get("rejections", []) def add_rejection_rule(pattern, reason, scope="all"): """Add a rejection rule to family.yaml. Args: pattern: Event summary pattern to reject (substring match) reason: Human-readable reason for the rejection scope: "all", "event", or "newsletter" Returns: The new rule dict """ config = _load_family_config() if "rejections" not in config: config["rejections"] = [] # Check for duplicate for existing in config["rejections"]: if existing.get("pattern", "").lower() == pattern.lower(): # Update existing rule existing["reason"] = reason existing["scope"] = scope existing["updated"] = date.today().isoformat() _save_family_config(config) return existing rule = { "pattern": pattern, "reason": reason, "scope": scope, "created": date.today().isoformat(), } config["rejections"].append(rule) _save_family_config(config) return rule def remove_rejection_rule(pattern): """Remove a rejection rule by pattern.""" config = _load_family_config() rules = config.get("rejections", []) before = len(rules) config["rejections"] = [r for r in rules if r.get("pattern", "").lower() != pattern.lower()] _save_family_config(config) return len(config["rejections"]) < before def should_reject(item, scope="event"): """Check if a newsletter/calendar item should be rejected. Args: item: Dict with at least 'summary' key scope: Check against this scope ("event" or "newsletter") Returns: Matching rule dict if rejected, None if not rejected """ rules = get_rejection_rules() summary = item.get("summary", "").lower() for rule in rules: pattern = rule.get("pattern", "").lower() rule_scope = rule.get("scope", "all") # Check if pattern matches (substring match) if pattern and pattern in summary: # Check scope: "all" matches any scope, otherwise must match exactly if rule_scope == "all" or rule_scope == scope: return rule return None def shadow_filter_items(items, scope="event"): """Apply rejection rules as a shadow filter — items are NOT removed. Instead, rejected items are flagged as low_relevance with a note about which rule caught them. This ensures the user still sees what was filtered and can restore items if needed. Args: items: List of newsletter item dicts scope: "event" or "newsletter" Returns: List of items with rejected ones flagged as low_relevance """ rules = get_rejection_rules() result = [] for item in items: rule = should_reject(item, scope=scope) if rule: # Downgrade to low_relevance info instead of removing item = dict(item) # shallow copy item["original_type"] = item.get("type", "info") item["original_relevance"] = item.get("relevance", "high") item["type"] = "info" item["relevance"] = "low" item["reason"] = f"Auto-filtered: {rule.get('pattern', '')} ({rule.get('reason', '')})" item["_auto_filtered"] = True result.append(item) return result def filter_items(items, scope="event"): """Filter a list of newsletter items, removing rejected ones. Args: items: List of item dicts with 'summary' keys scope: "event" or "newsletter" Returns: Tuple of (kept_items, rejected_items_with_rules) """ kept = [] rejected = [] for item in items: rule = should_reject(item, scope=scope) if rule: rejected.append({"item": item, "rule": rule}) else: kept.append(item) return kept, rejected def parse_rejection_intent(text, event_summary=""): """Use the LLM to parse a natural language rejection into a structured rule. Args: text: User's rejection message (e.g., "we don't need that mass, ignore it") event_summary: The event being rejected (for context) Returns: Dict with pattern, reason, scope keys, or None on failure """ import requests system_msg = ( "You are a calendar rejection parser. Given a user's message about an event " "they want to reject from their family calendar, extract a structured rejection rule.\n\n" "Rules:\n" "- 'pattern' MUST be a short, specific substring extracted directly from the event summary — " "do NOT generalize or infer. Use the exact wording from the event name.\n" " Good: 'First Communion Mass', 'STAR Testing', 'WI Tornado Drill'\n" " Bad: 'communion', 'testing', 'drill' (too broad, will over-match)\n" "- If the user says 'just this one', 'skip this time', or 'remove this event', set scope to 'event' " "(only skip calendar creation for this specific occurrence, still show in newsletter).\n" "- If the user explicitly says 'always skip', 'ignore future', 'we don't do that', or 'never again', " "set scope to 'all' (skip from both calendar and newsletter extraction).\n" "- DEFAULT to scope 'event' — do NOT set scope 'all' unless the user explicitly says so.\n" "- If the user says 'not relevant to newsletter', set scope to 'newsletter'.\n" "- 'reason' should be a brief human-readable explanation.\n" "- Return ONLY a JSON object with keys: pattern, reason, scope. No markdown, no explanation." ) user_msg = f"Event: {event_summary}\nUser message: {text}" try: # LLM_URL already includes the full path (e.g., http://host:11434/v1/chat/completions) resp = requests.post( LLM_URL, json={ "model": LLM_MODEL, "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": user_msg}, ], "temperature": 0.1, "max_tokens": 256, }, timeout=LLM_TIMEOUT, ) if resp.status_code != 200: print(f"[Rejection] LLM error: HTTP {resp.status_code}", file=sys.stderr) return None content = resp.json()["choices"][0]["message"]["content"].strip() # Strip code fences if present if content.startswith("```"): content = re.sub(r"^```(?:json)?\s*\n?", "", content) content = re.sub(r"\n?```\s*$", "", content) content = content.strip() parsed = json.loads(content) if "pattern" in parsed and "reason" in parsed: parsed.setdefault("scope", "all") return parsed except json.JSONDecodeError: # Try to extract JSON from the response match = re.search(r"\{.*\}", content, re.DOTALL) if match: try: parsed = json.loads(match.group(0)) if "pattern" in parsed and "reason" in parsed: parsed.setdefault("scope", "all") return parsed except json.JSONDecodeError: pass except Exception as e: print(f"[Rejection] Error: {e}", file=sys.stderr) return None def reject_event(event_summary, user_message, delete_from_calendar=True): """Full rejection flow: parse intent → create rule → optionally delete event. Args: event_summary: The calendar event summary to reject user_message: Natural language rejection reason delete_from_calendar: If True, also delete the event from Google Calendar Returns: Dict with status, rule, and any calendar deletion results """ # 1. Parse the user's intent with LLM rule = parse_rejection_intent(user_message, event_summary) if not rule: # Fallback: use the event summary as pattern, user message as reason rule = { "pattern": event_summary, "reason": user_message, "scope": "event", # Default to one-time skip, not permanent } # 2. Save the rule saved_rule = add_rejection_rule( pattern=rule["pattern"], reason=rule["reason"], scope=rule.get("scope", "all"), ) result = { "status": "rejected", "rule": saved_rule, "calendar_deleted": False, } # 3. Optionally delete from calendar if delete_from_calendar: from .calendar_sync import get_calendar_service, _event_to_dict, _delete_event from .config import CHICAGO_TZ from datetime import timedelta calendar = get_calendar_service() now = datetime.now(CHICAGO_TZ) # Search for matching events in the next 90 days try: event_objs = calendar.date_search(start=now, end=now + timedelta(days=90)) except Exception: event_objs = calendar.events() pattern = rule["pattern"].lower() deleted_ids = [] for obj in event_objs: evt = _event_to_dict(obj) if pattern in evt.get("summary", "").lower(): try: _delete_event(evt["id"]) deleted_ids.append(evt["id"]) except Exception as e: print(f"[Rejection] Failed to delete {evt['id']}: {e}", file=sys.stderr) result["calendar_deleted"] = len(deleted_ids) > 0 result["deleted_count"] = len(deleted_ids) result["deleted_ids"] = deleted_ids return result if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Rejection engine") sub = parser.add_subparsers(dest="command") list_cmd = sub.add_parser("list", help="List current rejection rules") add_cmd = sub.add_parser("add", help="Add a rejection rule") add_cmd.add_argument("pattern", help="Event pattern to reject") add_cmd.add_argument("--reason", default="User rejected", help="Reason") add_cmd.add_argument("--scope", choices=["all", "event", "newsletter"], default="all") reject_cmd = sub.add_parser("reject", help="Reject an event with natural language") reject_cmd.add_argument("event_summary", help="Event summary to reject") reject_cmd.add_argument("message", help="Natural language rejection reason") reject_cmd.add_argument("--no-delete", action="store_true", help="Don't delete from calendar") test_cmd = sub.add_parser("test", help="Test if an event would be rejected") test_cmd.add_argument("summary", help="Event summary to test") args = parser.parse_args() if args.command == "list": rules = get_rejection_rules() if not rules: print("No rejection rules") else: for r in rules: print(f" [{r.get('scope','?')}] {r['pattern']} — {r.get('reason','')}") elif args.command == "add": rule = add_rejection_rule(args.pattern, args.reason, args.scope) print(f"Added: {rule}") elif args.command == "reject": result = reject_event(args.event_summary, args.message, delete_from_calendar=not args.no_delete) print(json.dumps(result, indent=2, default=str)) elif args.command == "test": rule = should_reject({"summary": args.summary}) if rule: print(f"REJECTED by rule: [{rule.get('scope','?')}] {rule['pattern']} — {rule.get('reason','')}") else: print("Not rejected") else: parser.print_help()