#!/usr/bin/env python3 """ Shadow Database TTL Purge — deletes messages/extractions older than 24 hours. Called by the shadow-ttl-purge cron job every 6 hours. """ import sqlite3 import logging import sys from datetime import datetime, timezone, timedelta from pathlib import Path logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s:%(message)s", handlers=[ logging.StreamHandler(sys.stdout), ], ) logger = logging.getLogger("shadow.purge") SHADOW_DB = Path.home() / ".icarus" / "shadow" / "shadow.db" TTL_HOURS = 24 def purge_old_messages(db_path: Path, ttl_hours: int = 24): """Delete shadow_messages and shadow_extractions older than ttl_hours.""" if not db_path.exists(): logger.info("Shadow DB not found at %s — nothing to purge.", db_path) return 0 cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours) cutoff_iso = cutoff.isoformat() conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA foreign_keys = ON") try: # Delete extractions linked to old messages cur = conn.execute(""" DELETE FROM shadow_extractions WHERE shadow_message_id IN ( SELECT id FROM shadow_messages WHERE timestamp < ? ) """, (cutoff_iso,)) ext_deleted = cur.rowcount # Delete old messages cur = conn.execute( "DELETE FROM shadow_messages WHERE timestamp < ?", (cutoff_iso,) ) msg_deleted = cur.rowcount conn.commit() # VACUUM to reclaim space conn.execute("VACUUM") logger.info( "Purge complete: %d messages, %d extractions redacted (TTL: %dh). " "Cutoff: %s", msg_deleted, ext_deleted, ttl_hours, cutoff_iso ) # Append to purge log log_path = db_path.parent / "purge.log" with open(log_path, "a") as f: f.write( f"INFO:{datetime.now(timezone.utc).isoformat()}:" f"Purge: {msg_deleted} msgs, {ext_deleted} extractions (TTL: {ttl_hours}h)\n" ) return msg_deleted finally: conn.close() if __name__ == "__main__": purge_old_messages(SHADOW_DB, TTL_HOURS)