"""Maintenance Sentinel — Proactive household maintenance tracker. Checks maintenance.yaml for recurring tasks that are due or upcoming. Integrates into the Daily Brief and heartbeat, with Telegram inline "Done" buttons to mark items complete. The system moves from "Email Reader" to "Family Log" by tracking recurring non-calendar tasks: HVAC filters, pet medications, seasonal prep, vehicle service, etc. """ import hashlib import json import os import re import sys from datetime import datetime, timedelta, date import yaml from family_assistant.config import CHICAGO_TZ def _load_maintenance_config(): """Load maintenance.yaml from the same directory as family.yaml.""" config_path = os.environ.get("FAMILY_CONFIG_PATH", "") if config_path: # Use same directory as family config base_dir = os.path.dirname(config_path) else: # Search CWD + package parent base_dir = os.getcwd() pkg_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if os.path.exists(os.path.join(pkg_dir, "maintenance.yaml")): base_dir = pkg_dir mpath = os.path.join(base_dir, "maintenance.yaml") if not os.path.exists(mpath): return {"items": []} with open(mpath, "r") as f: return yaml.safe_load(f) or {"items": []} def _save_maintenance_config(config): """Save maintenance.yaml back to disk.""" config_path = os.environ.get("FAMILY_CONFIG_PATH", "") if config_path: base_dir = os.path.dirname(config_path) else: base_dir = os.getcwd() pkg_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if os.path.exists(os.path.join(pkg_dir, "maintenance.yaml")): base_dir = pkg_dir mpath = os.path.join(base_dir, "maintenance.yaml") with open(mpath, "w") as f: yaml.dump(config, f, default_flow_style=False, allow_unicode=True, sort_keys=False) def _parse_interval(interval_str): """Parse an interval string like '30 days', '3 months', '12 months'. Returns the interval as a timedelta (approximate for months/years). """ interval_str = interval_str.strip().lower() match = re.match(r"(\d+)\s*(day|week|month|year)s?", interval_str) if not match: return None amount = int(match.group(1)) unit = match.group(2) if unit == "day": return timedelta(days=amount) elif unit == "week": return timedelta(weeks=amount) elif unit == "month": return timedelta(days=amount * 30) # Approximate elif unit == "year": return timedelta(days=amount * 365) # Approximate return None def _parse_date(date_str): """Parse an ISO date string (YYYY-MM-DD) into a date object.""" if not date_str: return None try: return date.fromisoformat(str(date_str)) except (ValueError, TypeError): return None def check_maintenance(as_of=None): """Check for due and upcoming maintenance items. Args: as_of: Date to check against (defaults to today in Chicago time). Returns: Dict with 'due_now', 'upcoming', and 'all_items' lists. Each item has: name, category, who, due_date, days_overdue, notify_days_before, raw (original item dict) """ if as_of is None: as_of = datetime.now(CHICAGO_TZ).date() config = _load_maintenance_config() items = config.get("items", []) due_now = [] upcoming = [] all_items = [] for item in items: name = item.get("name", "Unknown") category = item.get("category", "general") who = item.get("who", "") interval_str = item.get("interval", "30 days") last_done = _parse_date(item.get("last_done")) notify_before = item.get("notify_days_before", 1) due_month = item.get("due_month") # Optional seasonal constraint if not last_done: continue # Skip items without a baseline date interval = _parse_interval(interval_str) if not interval: continue # Calculate next due date next_due = last_done + interval # Seasonal constraint: if due_month is set, snap to the next occurrence if due_month is not None: # Find the next occurrence of due_month after last_done candidate_year = last_done.year candidate = date(candidate_year, due_month, last_done.day if last_done.day <= 28 else 1) if candidate <= last_done: candidate = date(candidate_year + 1, due_month, last_done.day if last_done.day <= 28 else 1) next_due = candidate days_until_due = (next_due - as_of).days days_overdue = -days_until_due if days_until_due < 0 else 0 enriched = { "name": name, "category": category, "who": who, "due_date": next_due.isoformat(), "days_overdue": days_overdue, "days_until": days_until_due, "notify_days_before": notify_before, "interval": interval_str, "last_done": last_done.isoformat(), "raw": item, } all_items.append(enriched) if days_until_due <= 0: # Due now or overdue due_now.append(enriched) elif days_until_due <= notify_before: # Upcoming within notification window upcoming.append(enriched) return { "due_now": due_now, "upcoming": upcoming, "all_items": all_items, } def mark_done(item_name, as_of=None): """Mark a maintenance item as done by updating its last_done date. Args: item_name: Name of the item to mark complete. as_of: Date to set as last_done (defaults to today). Returns: Dict with status and updated item info. """ if as_of is None: as_of = datetime.now(CHICAGO_TZ).date() config = _load_maintenance_config() items = config.get("items", []) found = None for item in items: if item.get("name") == item_name: item["last_done"] = as_of.isoformat() found = item break if not found: return {"status": "NOT_FOUND", "name": item_name} _save_maintenance_config(config) return { "status": "MARKED_DONE", "name": item_name, "last_done": as_of.isoformat(), } def format_maintenance_brief(check_result=None): """Format maintenance items for the Daily Brief. Args: check_result: Output from check_maintenance(). If None, runs check. Returns: String for inclusion in Daily Brief, or empty string if nothing due. """ if check_result is None: check_result = check_maintenance() due_now = check_result.get("due_now", []) upcoming = check_result.get("upcoming", []) if not due_now and not upcoming: return "" lines = [] if due_now: lines.append("📌 **Due Now**") for item in due_now: overdue = f" ({item['days_overdue']}d overdue)" if item["days_overdue"] > 0 else "" lines.append(f" • {item['name']}{overdue}") if upcoming: lines.append("📅 **Coming Up**") for item in upcoming: days = item["days_until"] day_label = "tomorrow" if days == 1 else f"in {days} days" lines.append(f" • {item['name']} — {day_label}") return "\n".join(lines) def build_maintenance_buttons(check_result=None): """Build Telegram inline buttons for due-now items. Each button: [✅ Done] with callback: maint_done| """ if check_result is None: check_result = check_maintenance() due_now = check_result.get("due_now", []) if not due_now: return [] buttons = [] for item in due_now: import hashlib name_hash = hashlib.md5(item["name"].encode()).hexdigest()[:8] callback = f"maint_done|{name_hash}" label = f"✅ {item['name']} done" buttons.append([{"text": label[:40], "callback_data": callback}]) return buttons def handle_maintenance_callback(callback_data, dry_run=False): """Handle a Telegram callback from a maintenance Done button. Args: callback_data: Callback string (format: maint_done|) dry_run: If True, don't actually update the file. Returns: Dict with result status. """ parts = callback_data.split("|") if len(parts) != 2 or parts[0] != "maint_done": return {"status": "ERROR", "message": f"Invalid callback format: {callback_data}"} name_hash = parts[1] # Find the item by hash check_result = check_maintenance() for item in check_result.get("due_now", []): import hashlib if hashlib.md5(item["name"].encode()).hexdigest()[:8] == name_hash: if dry_run: return { "status": "DRY_RUN", "name": item["name"], "message": f"✅ Would mark '{item['name']}' as done", } result = mark_done(item["name"]) if result["status"] == "MARKED_DONE": return { "status": "DONE", "name": item["name"], "message": f"✅ {item['name']} marked done! Next occurrence calculated from today.", } else: return result return {"status": "NOT_FOUND", "message": f"No matching maintenance item found for hash {name_hash}"} # --------------------------------------------------------------------------- # Alert Dedup — only alert once per due-date per item # --------------------------------------------------------------------------- def _alert_state_path(): """Path to the alert state file (same dir as heartbeat-state.json).""" # Use the workspace memory directory workspace = os.environ.get("OPENCLAW_WORKSPACE", os.path.expanduser("~/.openclaw/workspace")) return os.path.join(workspace, "memory", "maintenance-alert-state.json") def _load_alert_state(): """Load the alert state file.""" path = _alert_state_path() if os.path.exists(path): try: with open(path, "r") as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return {"alerted": {}} def _save_alert_state(state): """Save the alert state file.""" path = _alert_state_path() os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: json.dump(state, f, indent=2) def filter_new_alerts(check_result=None): """Filter check_maintenance results to only items not yet alerted about. Alert state is keyed by `:` — so the same item gets alerted once per due cycle. When it's marked done and a new due date is calculated, the key changes and it alerts again. Args: check_result: Output from check_maintenance(). If None, runs check. Returns: Dict with 'due_now' (new only), 'upcoming', and 'all_items'. """ if check_result is None: check_result = check_maintenance() state = _load_alert_state() alerted = state.get("alerted", {}) new_due = [] for item in check_result.get("due_now", []): key = f"{hashlib.md5(item['name'].encode()).hexdigest()[:8]}:{item['due_date']}" if key not in alerted: new_due.append(item) alerted[key] = datetime.now(CHICAGO_TZ).isoformat() # Clean up old entries (items done or past their next cycle) # Keep alerts for the last 90 days cutoff = (datetime.now(CHICAGO_TZ) - timedelta(days=90)).isoformat() alerted = {k: v for k, v in alerted.items() if v > cutoff} state["alerted"] = alerted _save_alert_state(state) return { "due_now": new_due, "upcoming": check_result.get("upcoming", []), "all_items": check_result.get("all_items", []), }