"""CLI entry point for the Family Assistant package.""" import argparse import json import sys from datetime import datetime, timedelta from family_assistant.config import CHICAGO_TZ from family_assistant.email_fetcher import fetch_unread from family_assistant.calendar_sync import ( get_calendar_service, list_upcoming_events, create_event, _delete_event, ) from family_assistant.conflict_engine import ( detect_conflicts, resolve_all_conflicts, execute_resolution, ) from family_assistant.pipeline import process_emails from family_assistant.setup import run_setup from family_assistant.tests.test_qa import run_qa_tests def main(): parser = argparse.ArgumentParser(description="Family Calendar Manager") parser.add_argument( "command", choices=[ "check-email", "upcoming", "process", "test", "qa", "conflicts", "resolve", "handle", "setup", "conflict-notify", "conflict-callback", "reject", "rejections", "intent", "chat-intent", "click", "maintenance", "maint-done", "remind", "inbound-hook", "slot-callback", "slot-cache-cleanup", "add-recurring", "brain-query", "brain-stats", "brain-purge", "brain-backfill", "resolve-location", "location-cache", "seed-locations", "dropbox", "drive-setup", "reset-circuit", ], help="Command to run", ) parser.add_argument("--hours", type=int, default=48, help="Hours ahead for upcoming events") parser.add_argument("--dry-run", action="store_true", help="Process emails without creating events") parser.add_argument("--no-notify", action="store_true", help="Skip Telegram notifications") parser.add_argument("--quiet", action="store_true", help="Only send DM alerts, no family group messages") parser.add_argument("--json", action="store_true", help="Output raw JSON") parser.add_argument("--option", type=int, default=0, help="Option number for handle command (1-based)") parser.add_argument("--callback-data", type=str, default=None, help="Callback data from Telegram button (format: resolve|id|index)") parser.add_argument("--event-summary", type=str, default=None, help="Event summary for reject command") parser.add_argument("--reject-message", type=str, default=None, help="Natural language rejection reason") parser.add_argument("--message", type=str, default=None, help="Natural language message for intent command") parser.add_argument("--reply", action="store_true", help="Indicate the message is a reply to the bot (for chat-intent)") parser.add_argument("--quoted", type=str, default=None, help="Text of the message being replied to (for context)") parser.add_argument("--non-interactive", action="store_true", help="Non-interactive setup (create templates)") parser.add_argument("--url", type=str, default=None, help="URL to click/fetch (for click command)") parser.add_argument("--context", type=str, default="", help="Context summary for the URL (for click command)") parser.add_argument("--question", type=str, default=None, help="Question for brain-query command") parser.add_argument("--top-k", type=int, default=3, help="Number of results to retrieve for brain-query") parser.add_argument("--days", type=int, default=30, help="Days to look back for brain-backfill (default: 30)") parser.add_argument("--file", type=str, default=None, help="File path for dropbox command") parser.add_argument("--test", type=str, default=None, help="Dry-run test: extract without uploading (dropbox)") parser.add_argument("--message-id", type=str, default="", help="Telegram message ID for dropbox reaction") args = parser.parse_args() if args.command == "check-email": result = fetch_unread() print(json.dumps(result, indent=2, default=str)) elif args.command == "upcoming": events = list_upcoming_events(hours=args.hours) print(json.dumps(events, indent=2, default=str)) elif args.command == "process": result = process_emails(dry_run=args.dry_run, notify=not args.no_notify, quiet=args.quiet) print(json.dumps(result, indent=2, default=str)) elif args.command == "test": start = datetime(2026, 4, 16, 10, 0, tzinfo=CHICAGO_TZ) end = start + timedelta(hours=1) created = create_event( "Test: Socrates Calendar Integration", start, end, description="Automated test event from OpenClaw", ) print(f"Created: {created['id']} — {created['summary']}") _delete_event(created["id"]) print("Deleted test event ✅") elif args.command == "conflicts": conflicts = detect_conflicts(hours=args.hours) if args.json: print(json.dumps(conflicts, indent=2, default=str)) elif conflicts: print(f"⚠️ {len(conflicts)} scheduling conflict(s) in the next {args.hours}h:") for c in conflicts: e1 = c["event1"] e2 = c["event2"] print(f" • {e1['summary']} ({e1['start']}) conflicts with {e2['summary']} ({e2['start']}) — {c['overlap_minutes']}min overlap") else: print(f"✅ No scheduling conflicts in the next {args.hours}h") elif args.command == "resolve": resolutions = resolve_all_conflicts(hours=args.hours) if args.json: print(json.dumps(resolutions, indent=2, default=str)) elif not resolutions: print(f"✅ No scheduling conflicts to resolve in the next {args.hours}h") else: print(f"⚠️ {len(resolutions)} conflict(s) found — resolving...\n") for r in resolutions: c = r["conflict"] res = r["resolution"] print(f" Conflict: {c['event1']['summary']} vs {c['event2']['summary']}") print(f" {res.get('message', 'No suggestion available')}") options = res.get("options", []) if options: for i, opt in enumerate(options, 1): print(f" {i}. [{opt.get('action','?').upper()}] {opt.get('description', '')}") print() elif args.command == "handle": if args.option < 1: print("Usage: family_calendar.py handle --option N [--hours 168] [--dry-run]") print("First run 'resolve' to see options, then 'handle --option N' to execute one.") sys.exit(1) resolutions = resolve_all_conflicts(hours=args.hours) if not resolutions: print(f"✅ No scheduling conflicts to handle in the next {args.hours}h") sys.exit(0) # Handle the first conflict with the chosen option # (future: support multiple conflicts by index) r = resolutions[0] conflict = r["conflict"] resolution = r["resolution"] options = resolution.get("options", []) if args.option > len(options): print(f"❌ Invalid option {args.option}. Only {len(options)} available:") for i, opt in enumerate(options, 1): print(f" {i}. [{opt.get('action','?').upper()}] {opt.get('description', '')}") sys.exit(1) result = execute_resolution(conflict, resolution, args.option, dry_run=args.dry_run) if args.json: print(json.dumps(result, indent=2, default=str)) else: status = result.get("status", "?") msg = result.get("message", "No details") if status == "ERROR": print(f"❌ {msg}") elif status == "DRY_RUN": print(f"🔍 [DRY RUN] {msg}") else: print(f"✅ {msg}") elif args.command == "conflict-notify": from family_assistant.conflict_notify import scan_and_notify notified = scan_and_notify() if not notified: print("No conflicts found") else: print(f"Notified about {len(notified)} conflict(s): {notified}") elif args.command == "conflict-callback": if not args.callback_data: print("Usage: family_assistant conflict-callback --callback-data 'resolve|abc123|0'") sys.exit(1) # Route to the correct handler based on callback prefix if args.callback_data.startswith("slot|"): from family_assistant.slot_handler import handle_slot_callback result = handle_slot_callback(args.callback_data, dry_run=args.dry_run) else: from family_assistant.conflict_notify import handle_callback result = handle_callback(args.callback_data, dry_run=args.dry_run) print(json.dumps(result, indent=2, default=str)) elif args.command == "reject": from family_assistant.rejection_engine import reject_event event_summary = args.event_summary if hasattr(args, 'event_summary') and args.event_summary else "" message = args.reject_message if hasattr(args, 'reject_message') and args.reject_message else "User rejected" if not event_summary: print("Usage: family_assistant reject ") print('Example: family_assistant reject "First Communion Mass" "we don\'t need that, ignore future"') sys.exit(1) result = reject_event(event_summary, message, delete_from_calendar=not args.dry_run) print(json.dumps(result, indent=2, default=str)) elif args.command == "rejections": from family_assistant.rejection_engine import get_rejection_rules rules = get_rejection_rules() if not rules: print("No rejection rules configured") else: print(f"{len(rules)} rejection rule(s):") for r in rules: print(f" [{r.get('scope','?')}] {r['pattern']} — {r.get('reason','')}") elif args.command == "intent": from family_assistant.intent_engine import parse_intent, execute_intent if not args.message: print("Usage: family_calendar.py intent --message 'change OT Session to April 20'") sys.exit(1) intent = parse_intent(args.message) print(f"Parsed intent: {json.dumps(intent, indent=2, default=str)}") result = execute_intent(intent, dry_run=args.dry_run) print(f"Result: {json.dumps(result, indent=2, default=str)}") elif args.command == "chat-intent": from family_assistant.intent_router import process_group_message if not args.message: print("Usage: family_calendar.py chat-intent --message 'move it to the 21st' [--reply] [--quoted 'bot msg']") sys.exit(1) is_reply = args.reply if hasattr(args, 'reply') else False quoted = args.quoted if hasattr(args, 'quoted') else "" result = process_group_message(args.message, is_reply_to_bot=is_reply, quoted_message=quoted or "", dry_run=args.dry_run) print(json.dumps(result, indent=2, default=str)) if result.get("notification"): print(f"\nNotification: {result['notification']}") elif args.command == "maintenance": # DISABLED — maintenance tracking purged. Module preserved for Phase 6. print("⚠️ Maintenance tracking is currently disabled (pending Phase 6 redesign).") print("The maintenance_sentinel module is preserved for reference.") sys.exit(0) elif args.command == "maint-done": # DISABLED — maintenance tracking purged. print("⚠️ Maintenance 'Done' callback is currently disabled (pending Phase 6 redesign).") sys.exit(0) elif args.command == "click": from family_assistant.clicker import click_url, build_slot_buttons, hash_url url = args.url if not url: print("Usage: family_calendar.py click --url 'https://www.signupgenius.com/...' [--context 'Parent-Teacher Conferences']") sys.exit(1) result = click_url(url, context_summary=args.context, dry_run=args.dry_run) print(json.dumps(result, indent=2, default=str, ensure_ascii=False)) if result.get("slots"): print(f"\nSlot buttons:") buttons = build_slot_buttons(result["slots"], hash_url(url)) for row in buttons: for btn in row: print(f" [{btn['text']}] → {btn['callback_data']}") elif args.command == "remind": from family_assistant.intent_engine import execute_intent if not args.message: print("Usage: family_calendar.py remind --message 'kids wear red shirts tomorrow'") sys.exit(1) intent = {"type": "remind", "summary": args.message, "date": None} # Parse through intent engine so the LLM extracts date/who/description from family_assistant.intent_engine import parse_intent intent = parse_intent(f"remind me: {args.message}") if intent.get("type") != "remind": # If LLM didn't classify as remind, force it but use its date extraction if intent.get("type") == "none": print("Couldn't extract a date from that message. Try: remind --message 'kids wear red shirts tomorrow'") sys.exit(1) # If it classified as add, convert to remind if intent.get("type") == "add": intent["type"] = "remind" intent["date"] = intent.get("start", "")[:10] # Extract date portion print(f"Parsed intent: {json.dumps(intent, indent=2, default=str)}") result = execute_intent(intent, dry_run=args.dry_run) print(f"Result: {json.dumps(result, indent=2, default=str)}") elif args.command == "inbound-hook": from family_assistant.inbound_hook import process_inbound, send_notification_to_group if not args.message: print("Usage: family_calendar.py inbound-hook --message 'Socrates, remind us...' [--reply] [--quoted 'bot msg']") sys.exit(1) is_reply = args.reply if hasattr(args, 'reply') else False quoted = args.quoted if hasattr(args, 'quoted') else "" result = process_inbound( message=args.message, is_reply_to_bot=is_reply, quoted_text=quoted or "", dry_run=args.dry_run, ) if result is None: print(json.dumps({"status": "not_triggered", "reason": "no_wake_word_or_reply"})) else: print(json.dumps(result, indent=2, default=str)) if result.get("notification") and not args.dry_run: sent = send_notification_to_group(result["notification"]) print(f"Notification sent: {sent}") elif args.command == "slot-callback": from family_assistant.slot_handler import handle_slot_callback if not args.callback_data: print("Usage: family_calendar.py slot-callback --callback-data 'slot||'") sys.exit(1) result = handle_slot_callback(args.callback_data, dry_run=args.dry_run) print(json.dumps(result, indent=2, default=str, ensure_ascii=False)) if result.get("confirmation") and not args.dry_run: from family_assistant.hermes import push_pipeline_results # Send confirmation to the user (DM or group depending on context) print(f"\nConfirmation: {result['confirmation']}") elif args.command == "slot-cache-cleanup": from family_assistant.slot_handler import cleanup_old_cache removed = cleanup_old_cache() print(f"Removed {removed} expired cache entries") elif args.command == "add-recurring": from family_assistant.calendar_sync import create_recurring_event from family_assistant.rrule_builder import build_rrule, validate_recurrence if not args.message: print("Usage: family_calendar.py add-recurring --message 'OT Session every Tuesday 4pm for 10 weeks'") sys.exit(1) # Parse the message through the intent engine to extract recurrence + datetime from family_assistant.intent_engine import parse_intent intent = parse_intent(f"add recurring: {args.message}") if intent.get("type") not in ("add", "remind"): print(f"Couldn't parse as recurring event: {intent}") sys.exit(1) # Accept both 'recurrence' (correct) and 'recurring' (legacy) recurrence = intent.get("recurrence") or intent.get("recurring") # If the intent engine returned a non-standard 'recurring' key, normalize it if recurrence and intent.get("is_recurring") is None: intent["is_recurring"] = True if not recurrence: print(f"No recurrence pattern found in message. Intent: {intent}") sys.exit(1) errors = validate_recurrence(recurrence) if errors: print(f"Invalid recurrence: {'; '.join(errors)}") sys.exit(1) rrule = build_rrule(recurrence) print(f"RRULE: {rrule}") print(f"Intent: {json.dumps(intent, indent=2, default=str)}") if not args.dry_run: start_dt = intent.get("start") end_dt = intent.get("end") if not start_dt or not end_dt: print("Missing start/end datetime") sys.exit(1) try: event = create_recurring_event( summary=intent.get("summary", "Recurring Event"), start_dt=start_dt, end_dt=end_dt, recurrence=recurrence, description=intent.get("description", ""), location=intent.get("location", ""), ) print(f"Created: {event.get('summary')} (ID: {event['id']})") print(f"Link: {event.get('htmlLink', '')}") except Exception as e: print(f"Error: {e}") else: print("(dry run — no calendar event created)") elif args.command == "brain-query": from family_assistant.family_brain import answer if not args.question: print("Usage: family-assistant brain-query --question 'What do the kids need for the field trip?'") sys.exit(1) result = answer(args.question, top_k=args.top_k) print(result.get("answer", "No answer")) if args.json: print("\n--- Sources ---") for src in result.get("sources", []): print(f" [{src.get('score', 0)}] {src.get('subject', '?')} ({src.get('date', '?')})") elif args.command == "brain-stats": from family_assistant.family_brain import stats s = stats() print(f"Family Brain Stats:") print(f" DB Path: {s.get('db_path')}") print(f" Collection: {s.get('collection')}") print(f" Documents: {s.get('count', 0)}") elif args.command == "brain-purge": from family_assistant.family_brain import purge_old result = purge_old(months=12) print(f"Purged {result.get('purged', 0)} old documents") print(f"Remaining: {result.get('remaining', 0)}") elif args.command == "brain-backfill": from family_assistant.pipeline import backfill_brain result = backfill_brain(days=args.days) print(f"\nBackfill Results:") print(f" Emails scanned: {result.get('emails_scanned', 0)}") print(f" Appointment emails ingested: {result.get('ingested_emails', 0)}") print(f" Newsletters ingested: {result.get('ingested_newsletters', 0)}") if result.get('errors'): print(f" Errors: {len(result['errors'])}") for err in result['errors'][:5]: print(f" - {err}") elif args.command == "resolve-location": from family_assistant.location_cache import resolve, format_location_enrichment if not args.message: print("Usage: family-assistant resolve-location --message 'Aurora BayCare Green Bay'") sys.exit(1) result = resolve(args.message, force_refresh=args.dry_run is False) if result: print(format_location_enrichment(result)) if args.json: print(json.dumps(result, indent=2, default=str, ensure_ascii=False)) else: print(f"❌ Could not resolve: {args.message}") elif args.command == "location-cache": from family_assistant.location_cache import stats, purge_expired s = stats() print(f"Location Cache:") print(f" Entries: {s['total_entries']}") print(f" With travel time: {s['with_travel_time']}") print(f" Expired: {s['expired_entries']}") if s.get('cached_locations'): print(f" Known locations:") for name in s['cached_locations']: print(f" • {name}") if args.json: print(json.dumps(s, indent=2, default=str)) elif args.command == "seed-locations": from family_assistant.location_cache import seed_known_locations results = seed_known_locations() if not results: print("No locations to seed. Add 'known_locations' to family.yaml.") else: print(f"Seeded {len([r for _, r in results if r])} / {len(results)} locations") elif args.command == "dropbox": from family_assistant.document_sorter import cli_process cli_process(args) elif args.command == "drive-setup": from family_assistant.document_sorter import cli_drive_setup cli_drive_setup(args) elif args.command == "reset-circuit": from family_assistant.pipeline import reset_circuit_breaker reset_circuit_breaker() elif args.command == "setup": run_setup(non_interactive=args.non_interactive) elif args.command == "qa": run_qa_tests() if __name__ == '__main__': main()