📄 maintenance_sentinel.py 12,058 bytes Apr 19, 2026 📋 Raw

"""Maintenance Sentinel — Proactive household maintenance tracker.

Checks maintenance.yaml for recurring tasks that are due or upcoming.
Integrates into the Daily Brief and heartbeat, with Telegram inline
"Done" buttons to mark items complete.

The system moves from "Email Reader" to "Family Log" by tracking
recurring non-calendar tasks: HVAC filters, pet medications, seasonal
prep, vehicle service, etc.
"""

import hashlib
import json
import os
import re
import sys
from datetime import datetime, timedelta, date

import yaml

from family_assistant.config import CHICAGO_TZ

def _load_maintenance_config():
"""Load maintenance.yaml from the same directory as family.yaml."""
config_path = os.environ.get("FAMILY_CONFIG_PATH", "")
if config_path:
# Use same directory as family config
base_dir = os.path.dirname(config_path)
else:
# Search CWD + package parent
base_dir = os.getcwd()
pkg_dir = os.path.dirname(os.path.dirname(os.path.abspath(file)))
if os.path.exists(os.path.join(pkg_dir, "maintenance.yaml")):
base_dir = pkg_dir

mpath = os.path.join(base_dir, "maintenance.yaml")
if not os.path.exists(mpath):
    return {"items": []}

with open(mpath, "r") as f:
    return yaml.safe_load(f) or {"items": []}

def _save_maintenance_config(config):
"""Save maintenance.yaml back to disk."""
config_path = os.environ.get("FAMILY_CONFIG_PATH", "")
if config_path:
base_dir = os.path.dirname(config_path)
else:
base_dir = os.getcwd()
pkg_dir = os.path.dirname(os.path.dirname(os.path.abspath(file)))
if os.path.exists(os.path.join(pkg_dir, "maintenance.yaml")):
base_dir = pkg_dir

mpath = os.path.join(base_dir, "maintenance.yaml")
with open(mpath, "w") as f:
    yaml.dump(config, f, default_flow_style=False, allow_unicode=True, sort_keys=False)

def _parse_interval(interval_str):
"""Parse an interval string like '30 days', '3 months', '12 months'.

Returns the interval as a timedelta (approximate for months/years).
"""
interval_str = interval_str.strip().lower()
match = re.match(r"(\d+)\s*(day|week|month|year)s?", interval_str)
if not match:
    return None

amount = int(match.group(1))
unit = match.group(2)

if unit == "day":
    return timedelta(days=amount)
elif unit == "week":
    return timedelta(weeks=amount)
elif unit == "month":
    return timedelta(days=amount * 30)  # Approximate
elif unit == "year":
    return timedelta(days=amount * 365)  # Approximate
return None

def _parse_date(date_str):
"""Parse an ISO date string (YYYY-MM-DD) into a date object."""
if not date_str:
return None
try:
return date.fromisoformat(str(date_str))
except (ValueError, TypeError):
return None

def check_maintenance(as_of=None):
"""Check for due and upcoming maintenance items.

Args:
    as_of: Date to check against (defaults to today in Chicago time).

Returns:
    Dict with 'due_now', 'upcoming', and 'all_items' lists.
    Each item has: name, category, who, due_date, days_overdue,
    notify_days_before, raw (original item dict)
"""
if as_of is None:
    as_of = datetime.now(CHICAGO_TZ).date()

config = _load_maintenance_config()
items = config.get("items", [])

due_now = []
upcoming = []
all_items = []

for item in items:
    name = item.get("name", "Unknown")
    category = item.get("category", "general")
    who = item.get("who", "")
    interval_str = item.get("interval", "30 days")
    last_done = _parse_date(item.get("last_done"))
    notify_before = item.get("notify_days_before", 1)
    due_month = item.get("due_month")  # Optional seasonal constraint

    if not last_done:
        continue  # Skip items without a baseline date

    interval = _parse_interval(interval_str)
    if not interval:
        continue

    # Calculate next due date
    next_due = last_done + interval

    # Seasonal constraint: if due_month is set, snap to the next occurrence
    if due_month is not None:
        # Find the next occurrence of due_month after last_done
        candidate_year = last_done.year
        candidate = date(candidate_year, due_month, last_done.day if last_done.day <= 28 else 1)
        if candidate <= last_done:
            candidate = date(candidate_year + 1, due_month, last_done.day if last_done.day <= 28 else 1)
        next_due = candidate

    days_until_due = (next_due - as_of).days
    days_overdue = -days_until_due if days_until_due < 0 else 0

    enriched = {
        "name": name,
        "category": category,
        "who": who,
        "due_date": next_due.isoformat(),
        "days_overdue": days_overdue,
        "days_until": days_until_due,
        "notify_days_before": notify_before,
        "interval": interval_str,
        "last_done": last_done.isoformat(),
        "raw": item,
    }
    all_items.append(enriched)

    if days_until_due <= 0:
        # Due now or overdue
        due_now.append(enriched)
    elif days_until_due <= notify_before:
        # Upcoming within notification window
        upcoming.append(enriched)

return {
    "due_now": due_now,
    "upcoming": upcoming,
    "all_items": all_items,
}

def mark_done(item_name, as_of=None):
"""Mark a maintenance item as done by updating its last_done date.

Args:
    item_name: Name of the item to mark complete.
    as_of: Date to set as last_done (defaults to today).

Returns:
    Dict with status and updated item info.
"""
if as_of is None:
    as_of = datetime.now(CHICAGO_TZ).date()

config = _load_maintenance_config()
items = config.get("items", [])

found = None
for item in items:
    if item.get("name") == item_name:
        item["last_done"] = as_of.isoformat()
        found = item
        break

if not found:
    return {"status": "NOT_FOUND", "name": item_name}

_save_maintenance_config(config)
return {
    "status": "MARKED_DONE",
    "name": item_name,
    "last_done": as_of.isoformat(),
}

def format_maintenance_brief(check_result=None):
"""Format maintenance items for the Daily Brief.

Args:
    check_result: Output from check_maintenance(). If None, runs check.

Returns:
    String for inclusion in Daily Brief, or empty string if nothing due.
"""
if check_result is None:
    check_result = check_maintenance()

due_now = check_result.get("due_now", [])
upcoming = check_result.get("upcoming", [])

if not due_now and not upcoming:
    return ""

lines = []

if due_now:
    lines.append("📌 **Due Now**")
    for item in due_now:
        overdue = f" ({item['days_overdue']}d overdue)" if item["days_overdue"] > 0 else ""
        lines.append(f"   {item['name']}{overdue}")

if upcoming:
    lines.append("📅 **Coming Up**")
    for item in upcoming:
        days = item["days_until"]
        day_label = "tomorrow" if days == 1 else f"in {days} days"
        lines.append(f"   {item['name']}  {day_label}")

return "\n".join(lines)

def build_maintenance_buttons(check_result=None):
"""Build Telegram inline buttons for due-now items.

Each button: [ Done] with callback: maint_done|<item_name_hash>
"""
if check_result is None:
    check_result = check_maintenance()

due_now = check_result.get("due_now", [])
if not due_now:
    return []

buttons = []
for item in due_now:
    import hashlib
    name_hash = hashlib.md5(item["name"].encode()).hexdigest()[:8]
    callback = f"maint_done|{name_hash}"
    label = f"✅ {item['name']} done"
    buttons.append([{"text": label[:40], "callback_data": callback}])

return buttons

def handle_maintenance_callback(callback_data, dry_run=False):
"""Handle a Telegram callback from a maintenance Done button.

Args:
    callback_data: Callback string (format: maint_done|<name_hash>)
    dry_run: If True, don't actually update the file.

Returns:
    Dict with result status.
"""
parts = callback_data.split("|")
if len(parts) != 2 or parts[0] != "maint_done":
    return {"status": "ERROR", "message": f"Invalid callback format: {callback_data}"}

name_hash = parts[1]

# Find the item by hash
check_result = check_maintenance()
for item in check_result.get("due_now", []):
    import hashlib
    if hashlib.md5(item["name"].encode()).hexdigest()[:8] == name_hash:
        if dry_run:
            return {
                "status": "DRY_RUN",
                "name": item["name"],
                "message": f"✅ Would mark '{item['name']}' as done",
            }

        result = mark_done(item["name"])
        if result["status"] == "MARKED_DONE":
            return {
                "status": "DONE",
                "name": item["name"],
                "message": f"✅ {item['name']} marked done! Next occurrence calculated from today.",
            }
        else:
            return result

return {"status": "NOT_FOUND", "message": f"No matching maintenance item found for hash {name_hash}"}

---------------------------------------------------------------------------

Alert Dedup — only alert once per due-date per item

---------------------------------------------------------------------------

def _alert_state_path():
"""Path to the alert state file (same dir as heartbeat-state.json)."""
# Use the workspace memory directory
workspace = os.environ.get("OPENCLAW_WORKSPACE", os.path.expanduser("~/.openclaw/workspace"))
return os.path.join(workspace, "memory", "maintenance-alert-state.json")

def _load_alert_state():
"""Load the alert state file."""
path = _alert_state_path()
if os.path.exists(path):
try:
with open(path, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {"alerted": {}}

def _save_alert_state(state):
"""Save the alert state file."""
path = _alert_state_path()
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
json.dump(state, f, indent=2)

def filter_new_alerts(check_result=None):
"""Filter check_maintenance results to only items not yet alerted about.

Alert state is keyed by `<item_name_hash>:<due_date>`  so the same
item gets alerted once per due cycle. When it's marked done and a new
due date is calculated, the key changes and it alerts again.

Args:
    check_result: Output from check_maintenance(). If None, runs check.

Returns:
    Dict with 'due_now' (new only), 'upcoming', and 'all_items'.
"""
if check_result is None:
    check_result = check_maintenance()

state = _load_alert_state()
alerted = state.get("alerted", {})

new_due = []
for item in check_result.get("due_now", []):
    key = f"{hashlib.md5(item['name'].encode()).hexdigest()[:8]}:{item['due_date']}"
    if key not in alerted:
        new_due.append(item)
        alerted[key] = datetime.now(CHICAGO_TZ).isoformat()

# Clean up old entries (items done or past their next cycle)
# Keep alerts for the last 90 days
cutoff = (datetime.now(CHICAGO_TZ) - timedelta(days=90)).isoformat()
alerted = {k: v for k, v in alerted.items() if v > cutoff}
state["alerted"] = alerted
_save_alert_state(state)

return {
    "due_now": new_due,
    "upcoming": check_result.get("upcoming", []),
    "all_items": check_result.get("all_items", []),
}