📄 conflict_notify.py 14,260 bytes Apr 18, 2026 📋 Raw

!/usr/bin/env python3

"""Post conflict resolution options to Telegram with inline buttons.

When a scheduling conflict is detected, this module:
1. Generates resolution options via the LLM
2. Formats a human-readable message
3. Posts it to the family Telegram group with inline buttons
4. Handles callback responses when someone taps a button

Callback format: resolve||
Example: resolve|abc123|0 (first option for conflict abc123)
"""

import hashlib
import json
import os
import subprocess
import sys
from datetime import datetime

from .config import CHICAGO_TZ
from .conflict_engine import detect_conflicts, resolve_conflict, execute_resolution
from .calendar_sync import get_calendar_service, _event_to_dict, _update_event_description

Default: override via environment variable or .env file

These are placeholder values — set TELEGRAM_CHAT_ID in .env

FAMILY_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "-1000000000000")

def conflict_id(conflict):
"""Generate a stable hash ID for a conflict pair."""
# Use event summaries + start times as unique key
e1 = conflict["event1"]
e2 = conflict["event2"]
key = f"{e1['summary']}|{e1['start']}|{e2['summary']}|{e2['start']}"
return hashlib.md5(key.encode()).hexdigest()[:8]

def format_conflict_message(conflict, resolution):
"""Format a conflict + resolution into a Telegram message."""
e1 = conflict["event1"]
e2 = conflict["event2"]

# Parse times for display
def fmt_time(iso_str):
    try:
        dt = datetime.fromisoformat(iso_str)
        if dt.tzinfo:
            dt = dt.astimezone(CHICAGO_TZ)
        return dt.strftime("%a %b %d, %-I:%M %p")
    except (ValueError, TypeError):
        return iso_str

msg = f"⚠️ *Schedule Conflict*\n\n"
msg += f"• {e1['summary']}\n  {fmt_time(e1['start'])}\n"
msg += f"• {e2['summary']}\n  {fmt_time(e2['start'])}\n"
msg += f"Overlap: {conflict['overlap_minutes']}min\n\n"

if resolution.get("message"):
    msg += f"{resolution['message']}\n\n"

options = resolution.get("options", [])
if not options:
    msg += "_No resolution options available._"
    return msg

msg += "How should we handle this?"
return msg

def build_buttons(cid, options):
"""Build Telegram inline keyboard buttons for resolution options.

Each button has callback_data in format: resolve|<conflict_id>|<option_index>
"""
rows = []
for i, option in enumerate(options):
    option_type = option.get("type", "unknown")
    label = option.get("label", option.get("description", f"Option {i+1}"))
    # Short labels for buttons (Telegram max 64 chars for callback_data)
    button_text = f"{label}"[:40]
    callback = f"resolve|{cid}|{i+1}"  # 1-based index to match handle command
    rows.append([{"text": button_text, "callback_data": callback}])

# Add reject buttons for each conflicting event
rows.append([
    {"text": "🚫 Reject event 1", "callback_data": f"reject|{cid}|1"},
    {"text": "🚫 Reject event 2", "callback_data": f"reject|{cid}|2"},
])

# Add dismiss button (acknowledge the conflict, stop future alerts)
rows.append([
    {"text": "👋 We've got this", "callback_data": f"dismiss|{cid}|0"},
])

return rows

def send_conflict_alert(conflict, resolution):
"""Post a conflict alert with inline buttons to the family Telegram group."""
cid = conflict_id(conflict)
msg = format_conflict_message(conflict, resolution)
options = resolution.get("options", [])

cmd = [
    "openclaw", "message", "send",
    "--channel", "telegram",
    "--target", FAMILY_CHAT_ID,
    "--message", msg,
]

if options:
    buttons = build_buttons(cid, options)
    cmd.extend(["--buttons", json.dumps(buttons)])

try:
    result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
    if result.returncode != 0:
        print(f"[Conflict Notify] Failed to send: {result.stderr}", file=sys.stderr)
        return False
    print(f"[Conflict Notify] Sent conflict {cid} to family group", file=sys.stderr)
    return True
except Exception as e:
    print(f"[Conflict Notify] Error: {e}", file=sys.stderr)
    return False

def parse_callback(callback_data):
"""Parse a Telegram callback_data string.

Expected formats:
  resolve|<conflict_id>|<option_index>
  reject|<conflict_id>|<event_num>  (1 or 2)

Returns dict with action, conflict_id, and action-specific fields, or None if invalid.
"""
parts = callback_data.split("|")
if len(parts) != 3:
    return None

action = parts[0]
if action == "resolve":
    try:
        return {
            "action": "resolve",
            "conflict_id": parts[1],
            "option_index": int(parts[2]),
        }
    except (ValueError, IndexError):
        return None
elif action == "reject":
    try:
        return {
            "action": "reject",
            "conflict_id": parts[1],
            "event_num": int(parts[2]),
        }
    except (ValueError, IndexError):
        return None
elif action == "dismiss":
    try:
        return {
            "action": "dismiss",
            "conflict_id": parts[1],
        }
    except (ValueError, IndexError):
        return None
else:
    return None

def handle_callback(callback_data, dry_run=False):
"""Handle a Telegram button callback for conflict resolution or event rejection.

For resolve callbacks: re-detects conflicts, finds matching one by hash,
and executes the chosen resolution option.
For reject callbacks: rejects the specified event from the conflict pair.
"""
parsed = parse_callback(callback_data)
if not parsed:
    return {"error": f"Invalid callback format: {callback_data}"}

cid = parsed["conflict_id"]

# Re-detect conflicts to find the matching one
conflicts = detect_conflicts(hours=168)
if not conflicts:
    return {"error": "No conflicts currently found — may have been resolved already"}

# Find the conflict matching our ID
matched = None
for c in conflicts:
    if conflict_id(c) == cid:
        matched = c
        break

if not matched:
    return {"error": f"Conflict {cid} not found — may have been resolved already"}

if parsed["action"] == "reject":
    # Reject the specified event from the conflict pair
    from .rejection_engine import reject_event
    event_num = parsed["event_num"]
    if event_num == 1:
        event_summary = matched["event1"]["summary"]
    elif event_num == 2:
        event_summary = matched["event2"]["summary"]
    else:
        return {"error": f"Invalid event number: {event_num}"}

    result = reject_event(
        event_summary,
        f"Rejected via Telegram button (event {event_num} of conflict)",
        delete_from_calendar=not dry_run,
    )

    # Send confirmation
    confirm_msg = f"🚫 Rejected: {event_summary}\n"
    confirm_msg += f"Rule: {result['rule']['pattern']} ({result['rule']['scope']})\n"
    confirm_msg += f"Reason: {result['rule']['reason']}"
    if dry_run:
        confirm_msg += "\n_(dry run — no changes made)_"

    cmd = [
        "openclaw", "message", "send",
        "--channel", "telegram",
        "--target", FAMILY_CHAT_ID,
        "--message", confirm_msg,
    ]
    try:
        subprocess.run(cmd, capture_output=True, text=True, timeout=30)
    except Exception:
        pass

    return result

elif parsed["action"] == "resolve":
    option_idx = parsed["option_index"]

    # Get resolution options
    resolution = resolve_conflict(matched)
    options = resolution.get("options", [])

    if option_idx > len(options) or option_idx < 1:
        return {"error": f"Option index {option_idx} out of range (have {len(options)} options)"}

    chosen = options[option_idx - 1]  # Convert to 0-based
    print(f"[Conflict Notify] Executing: {chosen.get('type', '?')} — {chosen.get('label', chosen.get('description', '?'))}", file=sys.stderr)

    # Execute the resolution
    result = execute_resolution(matched, resolution, option_idx, dry_run=dry_run)

    # Send confirmation to family group
    action = chosen.get("type", "unknown")
    label = chosen.get("label", chosen.get("description", "resolution"))
    summary1 = matched["event1"]["summary"]
    summary2 = matched["event2"]["summary"]

    confirm_msg = f"✅ Resolved: {summary1} vs {summary2}\n"
    confirm_msg += f"Action: {label}"

    if dry_run:
        confirm_msg += "\n_(dry run — no changes made)_"

    cmd = [
        "openclaw", "message", "send",
        "--channel", "telegram",
        "--target", FAMILY_CHAT_ID,
        "--message", confirm_msg,
    ]

    try:
        subprocess.run(cmd, capture_output=True, text=True, timeout=30)
    except Exception:
        pass  # Confirmation is best-effort

    return result

elif parsed["action"] == "dismiss":
    # Acknowledge the conflict (stop future alerts)
    if not dry_run:
        acknowledge_conflict(matched, cid)

    confirm_msg = f"👋 Conflict acknowledged: {matched['event1']['summary']} vs {matched['event2']['summary']}\n"
    confirm_msg += "No more alerts for this conflict."
    if dry_run:
        confirm_msg += "\n_(dry run — no changes made)_"

    cmd = [
        "openclaw", "message", "send",
        "--channel", "telegram",
        "--target", FAMILY_CHAT_ID,
        "--message", confirm_msg,
    ]
    try:
        subprocess.run(cmd, capture_output=True, text=True, timeout=30)
    except Exception:
        pass

    return {"status": "dismissed", "conflict_id": cid}

else:
    return {"error": f"Unknown action: {parsed['action']}"}

def scan_and_notify():
"""Scan for conflicts and post alerts for any NEW or UNACKNOWLEDGED ones.

State tracking: conflicts are marked as acknowledged by appending
[Conflict_Ack:<conflict_id>] to the description of event1. If both events
in a conflict have the ack tag, the conflict is suppressed.

A conflict can be acknowledged via:
- Telegram "Dismiss" button (sets ack tag on both events)
- Manual: adding [Conflict_Ack:<id>] to event description

Returns list of conflict IDs that were notified about.
"""
conflicts = detect_conflicts(hours=168)
if not conflicts:
    print("[Conflict Notify] No conflicts found", file=sys.stderr)
    return []

notified = []
for conflict in conflicts:
    cid = conflict_id(conflict)
    if _is_conflict_acknowledged(conflict, cid):
        print(f"[Conflict Notify] Skipping acknowledged conflict {cid}: {conflict['event1']['summary']} vs {conflict['event2']['summary']}", file=sys.stderr)
        continue

    resolution = resolve_conflict(conflict)
    if send_conflict_alert(conflict, resolution):
        notified.append(cid)

return notified

def _is_conflict_acknowledged(conflict, cid):
"""Check if a conflict has been acknowledged via calendar description tags.

A conflict is acknowledged if BOTH events contain the tag
[Conflict_Ack:<cid>] in their description.
"""
calendar = get_calendar_service()
ack_tag = f"[Conflict_Ack:{cid}]"

for event_key in ("event1", "event2"):
    event_data = conflict.get(event_key, {})
    event_id = event_data.get("id")
    if not event_id:
        continue
    try:
        obj = calendar.event_by_uid(event_id)
        evt = _event_to_dict(obj)
        description = evt.get("description", "") or ""
        if ack_tag not in description:
            return False
    except Exception:
        return False

return True

def acknowledge_conflict(conflict, cid):
"""Mark a conflict as acknowledged by adding tags to both events' descriptions."""
ack_tag = f"[Conflict_Ack:{cid}]"

for event_key in ("event1", "event2"):
    event_data = conflict.get(event_key, {})
    event_id = event_data.get("id")
    if not event_id:
        continue
    try:
        _update_event_description(event_id, ack_tag)
        print(f"  [Conflict Notify] Added ack tag to event {event_id}", file=sys.stderr)
    except Exception as e:
        print(f"  [Conflict Notify] Failed to ack {event_id}: {e}", file=sys.stderr)

if name == "main":
import argparse

parser = argparse.ArgumentParser(description="Conflict notification handler")
sub = parser.add_subparsers(dest="command")

scan_cmd = sub.add_parser("scan", help="Scan for conflicts and post alerts")
scan_cmd.add_argument("--dry-run", action="store_true")

handle_cmd = sub.add_parser("handle", help="Handle a callback response")
handle_cmd.add_argument("callback_data", help="Callback data from button tap")
handle_cmd.add_argument("--dry-run", action="store_true")

args = parser.parse_args()

if args.command == "scan":
    conflicts = detect_conflicts(hours=168)
    if not conflicts:
        print("No conflicts found")
    else:
        for c in conflicts:
            resolution = resolve_conflict(c)
            if args.dry_run:
                print(f"Would notify: {conflict_id(c)}")
                print(f"  {c['event1']['summary']} vs {c['event2']['summary']}")
                print(f"  Options: {[o.get('label', o.get('description', '?')) for o in resolution.get('options', [])]}")
            else:
                send_conflict_alert(c, resolution)

elif args.command == "handle":
    result = handle_callback(args.callback_data, dry_run=args.dry_run)
    print(json.dumps(result, indent=2, default=str))

else:
    parser.print_help()