!/usr/bin/env python3
"""
Shadow Database TTL Purge — deletes messages/extractions older than 24 hours.
Called by the shadow-ttl-purge cron job every 6 hours.
"""
import sqlite3
import logging
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s:%(name)s:%(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
],
)
logger = logging.getLogger("shadow.purge")
SHADOW_DB = Path.home() / ".icarus" / "shadow" / "shadow.db"
TTL_HOURS = 24
def purge_old_messages(db_path: Path, ttl_hours: int = 24):
"""Delete shadow_messages and shadow_extractions older than ttl_hours."""
if not db_path.exists():
logger.info("Shadow DB not found at %s — nothing to purge.", db_path)
return 0
cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
cutoff_iso = cutoff.isoformat()
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA foreign_keys = ON")
try:
# Delete extractions linked to old messages
cur = conn.execute("""
DELETE FROM shadow_extractions
WHERE shadow_message_id IN (
SELECT id FROM shadow_messages WHERE timestamp < ?
)
""", (cutoff_iso,))
ext_deleted = cur.rowcount
# Delete old messages
cur = conn.execute(
"DELETE FROM shadow_messages WHERE timestamp < ?",
(cutoff_iso,)
)
msg_deleted = cur.rowcount
conn.commit()
# VACUUM to reclaim space
conn.execute("VACUUM")
logger.info(
"Purge complete: %d messages, %d extractions redacted (TTL: %dh). "
"Cutoff: %s",
msg_deleted, ext_deleted, ttl_hours, cutoff_iso
)
# Append to purge log
log_path = db_path.parent / "purge.log"
with open(log_path, "a") as f:
f.write(
f"INFO:{datetime.now(timezone.utc).isoformat()}:"
f"Purge: {msg_deleted} msgs, {ext_deleted} extractions (TTL: {ttl_hours}h)\n"
)
return msg_deleted
finally:
conn.close()
if name == "main":
purge_old_messages(SHADOW_DB, TTL_HOURS)