!/usr/bin/env python3
"""Icarus daemon — the bespoke always-on service.
Self-contained scheduler with:
- IMAP polling → tripwire → extraction → event graph
- Telegram long-polling → tripwire → extraction → event graph
- Cron jobs (HBM, TTL purge, brain ingest, metrics)
Runs as a standalone process. No OpenClaw dependency.
"""
import asyncio
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import yaml
Import Icarus modules
from icarus.config import load_family, get_data_dir
from icarus.tripwire import run_tripwire, TripwireResult
from icarus.extractor import extract
from icarus import event_graph as eg
---------------------------------------------------------------------------
Logging
---------------------------------------------------------------------------
def setup_logging():
data_dir = get_data_dir()
data_dir.mkdir(parents=True, exist_ok=True)
log_path = data_dir / "icarus.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler(str(log_path)),
logging.StreamHandler(sys.stdout),
],
)
logger = logging.getLogger("icarus.daemon")
---------------------------------------------------------------------------
Configuration
---------------------------------------------------------------------------
class DaemonConfig:
"""Runtime configuration loaded from env + config file."""
def __init__(self):
# IMAP
self.imap_enabled = bool(os.environ.get("IMAP_USER"))
self.imap_user = os.environ.get("IMAP_USER", "")
self.imap_password = os.environ.get("IMAP_PASSWORD", "")
self.imap_provider = os.environ.get("IMAP_PROVIDER", "icloud")
self.imap_host = os.environ.get("IMAP_HOST", "imap.mail.me.com")
self.imap_port = int(os.environ.get("IMAP_PORT", "993"))
self.imap_poll_interval = int(os.environ.get("IMAP_POLL_INTERVAL", "60"))
# Telegram
self.telegram_enabled = bool(os.environ.get("TELEGRAM_BOT_TOKEN"))
self.telegram_token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
self.telegram_group_id = os.environ.get("TELEGRAM_GROUP_ID", "")
# Ollama — Gaming PC default, localhost fallback
self.ollama_base_url = os.environ.get(
"OLLAMA_BASE_URL", "http://matt-pc.tail864e81.ts.net:11434"
)
self.extraction_model = os.environ.get(
"ICARUS_EXTRACT_MODEL", "qwen2.5-coder:7b"
)
# Scheduling
self.ttl_hours = int(os.environ.get("SHADOW_TTL_HOURS", "24"))
self.brain_ingest_interval = int(
os.environ.get("BRAIN_INGEST_INTERVAL", "3600")
)
self.verbose = os.environ.get("ICARUS_VERBOSE", "false").lower() == "true"
# Telegram admin DM for notifications
self.admin_chat_id = os.environ.get("ICARUS_ADMIN_CHAT_ID", "")
# Experiment mode (shadow testing)
self.experiment_enabled = (
os.environ.get("ICARUS_EXPERIMENT", "false").lower() == "true"
)
self.experiment_name = os.environ.get("ICARUS_EXPERIMENT_NAME", "")
self.experiment_log_comparisons = (
os.environ.get("ICARUS_EXPERIMENT_LOG", "false").lower() == "true"
)
self.experiment_auto_promote = (
os.environ.get("ICARUS_EXPERIMENT_PROMOTE", "false").lower() == "true"
)
---------------------------------------------------------------------------
IMAP Poller
---------------------------------------------------------------------------
class IMAPPoller:
"""Poll IMAP inbox, run tripwire + extraction on each new email."""
def __init__(self, config: DaemonConfig, family):
self.config = config
self.family = family
self._processed_ids: set[str] = set()
self._persist_path = Path("/tmp/icarus-imap-processed.json")
self._load_processed()
def _load_processed(self):
if self._persist_path.exists():
try:
data = json.loads(self._persist_path.read_text())
self._processed_ids = set(data) if isinstance(data, list) else set()
except (json.JSONDecodeError, OSError):
self._processed_ids = set()
def _save_processed(self):
try:
self._persist_path.write_text(
json.dumps(list(self._processed_ids)[-1000:])
)
except OSError:
pass
async def poll_once(self):
"""Poll IMAP, process any new emails, return count processed."""
if not self.config.imap_enabled:
return 0
import imaplib
import email as eml
from email.header import decode_header
try:
mail = imaplib.IMAP4_SSL(self.config.imap_host, self.config.imap_port)
mail.login(self.config.imap_user, self.config.imap_password)
mail.select("INBOX")
# Fetch recent unseen emails
_status, ids = mail.search(None, "UNSEEN")
if not ids[0]:
mail.logout()
return 0
email_ids = ids[0].split()[-10:] # last 10 unseen
processed = 0
for eid in email_ids:
_status, data = mail.fetch(eid, "(RFC822)")
if not data or not data[0]:
continue
raw_msg = data[0][1]
msg = eml.message_from_bytes(raw_msg)
subject = self._decode_str(msg.get("Subject", ""))
sender = self._decode_str(msg.get("From", ""))
body = self._get_body(msg)
source_id = f"imap:{subject}:{sender}"
if source_id in self._processed_ids:
continue
# Run tripwire
tripwire = run_tripwire(f"{subject}\n{body}")
if tripwire.fired:
# Experiment mode: run shadow extraction alongside production
if self.config.experiment_enabled and self.config.experiment_name:
extraction = await self._run_experiment(
f"{subject}\n{body}", tripwire
)
else:
extraction = await extract(
f"{subject}\n{body}", tripwire,
self.config.ollama_base_url, self.config.extraction_model
)
if extraction.confidence >= 0.3:
eg.insert_event(
source="email",
source_text=f"{subject}\n{body}",
event_type=extraction.event_type,
summary=extraction.summary or subject,
dates=extraction.dates,
times=extraction.times,
people=extraction.people,
context=extraction.context,
location=extraction.location,
action_needed=extraction.action_needed,
confidence=extraction.confidence,
needs_confirmation=extraction.confidence < self.family.fallback_threshold,
raw_extraction=extraction.raw,
source_id=source_id,
)
if self.config.verbose:
logger.info(
f"IMAP extracted: {extraction.summary} "
f"({extraction.event_type} @ {extraction.confidence})"
)
self._processed_ids.add(source_id)
processed += 1
mail.logout()
self._save_processed()
return processed
except Exception as e:
logger.error(f"IMAP poll failed: {e}")
return 0
@staticmethod
def _decode_str(s: bytes | str) -> str:
if isinstance(s, bytes):
try:
return s.decode("utf-8", errors="replace")
except (UnicodeDecodeError, AttributeError):
return str(s)
return s
@staticmethod
def _get_body(msg) -> str:
"""Extract plain text body from email message."""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload:
try:
return payload.decode("utf-8", errors="replace")
except (UnicodeDecodeError, AttributeError):
return str(payload)
else:
payload = msg.get_payload(decode=True)
if payload:
try:
return payload.decode("utf-8", errors="replace")
except (UnicodeDecodeError, AttributeError):
return str(payload)
return "(no body)"
---------------------------------------------------------------------------
Telegram Shadow Poller
---------------------------------------------------------------------------
class TelegramPoller:
"""Long-poll Telegram for new group messages. Silent observer mode."""
def __init__(self, config: DaemonConfig, family):
self.config = config
self.family = family
self._offset: Optional[int] = None
self._running = False
async def poll_once(self) -> int:
"""Poll Telegram, process any new messages. Returns count processed."""
if not self.config.telegram_enabled:
return 0
import httpx
base = f"https://api.telegram.org/bot{self.config.telegram_token}"
params = {"timeout": 30, "allowed_updates": json.dumps(["message"])}
if self._offset:
params["offset"] = self._offset
try:
async with httpx.AsyncClient(timeout=35) as client:
resp = await client.get(f"{base}/getUpdates", params=params)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.warning(f"Telegram poll failed: {e}")
return 0
updates = data.get("result", [])
if not updates:
return 0
# Update offset to ack
self._offset = updates[-1]["update_id"] + 1
processed = 0
for update in updates:
msg = update.get("message")
if not msg:
continue
chat_id = str(msg.get("chat", {}).get("id"))
# Only process the configured family group
if chat_id != self.config.telegram_group_id:
continue
text = msg.get("text", "")
if not text:
continue
source_id = f"telegram:{msg['message_id']}"
start = time.monotonic()
# Tier 1: Tripwire
tripwire = run_tripwire(text)
if not tripwire.fired:
continue
# Tier 2: LLM extraction
if self.config.experiment_enabled and self.config.experiment_name:
extraction = await self._run_experiment(text, tripwire)
else:
extraction = await extract(
text, tripwire,
self.config.ollama_base_url, self.config.extraction_model
)
elapsed_ms = int((time.monotonic() - start) * 1000)
# Log extraction
eg.insert_extraction_log(
source_id=source_id,
tripwire_score=tripwire.score,
tripwire_patterns=tripwire.patterns_matched,
llm_output=json.dumps(extraction.raw) if extraction.raw else "{}",
extraction_result={
"event_type": extraction.event_type,
"summary": extraction.summary,
"confidence": extraction.confidence,
},
duration_ms=elapsed_ms,
)
if extraction.confidence >= 0.3:
eg.insert_event(
source="telegram",
source_text=text,
event_type=extraction.event_type,
summary=extraction.summary,
dates=extraction.dates,
times=extraction.times,
people=extraction.people,
context=extraction.context,
location=extraction.location,
action_needed=extraction.action_needed,
confidence=extraction.confidence,
needs_confirmation=extraction.confidence < self.family.fallback_threshold,
raw_extraction=extraction.raw,
source_id=source_id,
)
if self.config.verbose:
logger.info(
f"Telegram extracted: {extraction.summary} "
f"({extraction.event_type} @ {extraction.confidence})"
)
# Check calendar conflicts for high-confidence calendar events
if (extraction.event_type == "calendar_event"
and extraction.confidence >= 0.5
and extraction.dates
and extraction.times):
await self._check_calendar_conflicts(extraction, tripwire)
processed += 1
return processed
async def _check_calendar_conflicts(self, extraction, tripwire):
"""Check Radicale calendar for conflicts and DM admin if found."""
import httpx
cal_url = os.environ.get("CALDAV_URL", "http://127.0.0.1:5232")
cal_user = os.environ.get("CALDAV_USER", "")
cal_pass = os.environ.get("CALDAV_PASSWORD", "")
if not cal_user or not self.config.admin_chat_id:
return
try:
# Basic CalDAV query for the event date
date = extraction.dates[0]
async with httpx.AsyncClient(timeout=10, auth=(cal_user, cal_pass)) as client:
# Use CalDAV REPORT to find events on this date
report_body = f"""<?xml version="1.0" encoding="utf-8"?>
resp = await client.request(
"REPORT", f"{cal_url}/family/",
content=report_body,
headers={"Depth": "1"},
)
if resp.status_code == 207:
# Found existing events on same day — log potential conflict
logger.info(
f"Potential calendar conflict on {date}: "
f"new '{extraction.summary}' vs existing calendar events"
)
# TODO: DM admin notification via Telegram
except Exception as e:
logger.debug(f"Calendar conflict check failed: {e}")
---------------------------------------------------------------------------
Cron Jobs (APScheduler)
---------------------------------------------------------------------------
def run_ttl_purge():
"""Purge old extraction logs."""
import sqlite3
try:
db_path = get_data_dir() / "icarus.db"
if not db_path.exists():
return
conn = sqlite3.connect(str(db_path))
conn.execute(
"DELETE FROM extractions WHERE created_at < datetime('now', '-7 days', 'utc')"
)
conn.commit()
conn.close()
logger.info("TTL purge: extraction logs >7 days removed")
except Exception as e:
logger.error(f"TTL purge failed: {e}")
def run_brain_ingest():
"""Ingest recent events into the ChromaDB brain."""
from icarus.brain import ingest as brain_ingest
try:
events = eg.get_recent_events(days=1, limit=50)
ingested = 0
for event in events:
doc_id = f"event:{event['source_id']}"
text = f"{event['summary']} — {event.get('context', '')} — {event.get('action_needed', '')}"
metadata = {
"type": event["type"],
"source": event["source"],
"created": event["created_at"],
"people": event["people"],
}
if brain_ingest(doc_id, text, metadata):
ingested += 1
logger.info(f"Brain ingest: {ingested}/{len(events)} events indexed")
except Exception as e:
logger.error(f"Brain ingest failed: {e}")
---------------------------------------------------------------------------
Daemon Main
---------------------------------------------------------------------------
class IcarusDaemon:
"""The main daemon — runs all polling loops and scheduled jobs."""
def __init__(self, config: Optional[DaemonConfig] = None):
self.config = config or DaemonConfig()
self.family = load_family()
self.imap_poller = IMAPPoller(self.config, self.family)
self.tg_poller = TelegramPoller(self.config, self.family)
self._running = False
self._tasks: list[asyncio.Task] = []
async def _imap_loop(self):
"""Poll IMAP every N seconds."""
while self._running:
count = await self.imap_poller.poll_once()
if count and count > 0 and self.config.verbose:
logger.info(f"IMAP: processed {count} emails")
await asyncio.sleep(self.config.imap_poll_interval)
async def _telegram_loop(self):
"""Long-poll Telegram continuously."""
while self._running:
count = await self.tg_poller.poll_once()
if count and count > 0 and self.config.verbose:
logger.info(f"Telegram: processed {count} messages")
# No sleep — long-poll blocks for up to 30s internally
async def _scheduler_loop(self):
"""Simple internal scheduler for cron-like jobs."""
import math
last_ttl = 0
last_brain = 0
while self._running:
now = time.time()
# TTL purge every 6 hours
if now - last_ttl >= 6 * 3600:
run_ttl_purge()
last_ttl = now
# Brain ingest every hour
if now - last_brain >= 3600:
run_brain_ingest()
last_brain = now
await asyncio.sleep(60) # check every minute
async def start(self):
"""Start the daemon."""
self._running = True
logger.info(f"Icarus daemon starting (family={self.family.family_id})")
logger.info(f" IMAP: {'enabled' if self.config.imap_enabled else 'disabled'}")
logger.info(f" Telegram: {'enabled' if self.config.telegram_enabled else 'disabled'}")
logger.info(f" Ollama: {self.config.ollama_base_url} ({self.config.extraction_model})")
# Initialize event graph DB
eg._ensure_db()
tasks = []
if self.config.imap_enabled:
tasks.append(asyncio.create_task(self._imap_loop()))
if self.config.telegram_enabled:
tasks.append(asyncio.create_task(self._telegram_loop()))
tasks.append(asyncio.create_task(self._scheduler_loop()))
self._tasks = tasks
# Wait until cancelled
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
pass
finally:
self._running = False
logger.info("Icarus daemon stopped")
def stop(self):
"""Signal the daemon to stop."""
self._running = False
for t in self._tasks:
t.cancel()
async def _run_experiment(self, text: str, tripwire):
"""Run shadow extraction if a named experiment is active."""
exp_name = self.config.experiment_name
log_path = Path.home() / "icarus" / "experiments" / exp_name / "eval_results"
log_path.mkdir(parents=True, exist_ok=True)
# Run production extractor first
prod_result = await extract(
text, tripwire,
self.config.ollama_base_url, self.config.extraction_model
)
# Try to load experiment extractor
try:
import importlib
mod = importlib.import_module(f"experiments.{exp_name}.adapter")
class_name = f"{exp_name.title()}Extractor"
exp_class = getattr(mod, class_name, None)
if exp_class:
exp_instance = exp_class()
exp_result = exp_instance.extract(text)
if exp_result and self.config.experiment_log_comparisons:
self._log_experiment_comparison(
text, prod_result, exp_result, log_path
)
if self.config.experiment_auto_promote and exp_result:
if self.config.verbose:
print(f"[experiment] auto-promoting {exp_name} result")
return exp_result
except (ImportError, AttributeError, NotImplementedError) as e:
if self.config.verbose:
print(f"[experiment] {exp_name} not available: {e}")
return prod_result
@staticmethod
def _log_experiment_comparison(text, prod, exp, log_path):
"""Log extraction comparison to experiment eval_results."""
import json
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
entry = {
"timestamp": timestamp,
"text": text[:500],
"production": {
"event_type": prod.event_type if prod else None,
"summary": prod.summary if prod else None,
"confidence": prod.confidence if prod else None,
},
"experiment": {
"event_type": exp.event_type if exp else None,
"summary": exp.summary if exp else None,
"confidence": exp.confidence if exp else None,
},
}
log_file = log_path / f"comparison-{timestamp}.json"
with open(log_file, "w") as f:
json.dump(entry, f, indent=2)
if prod and exp:
match = (
prod.event_type == exp.event_type
and prod.summary == exp.summary
)
print(f"[experiment] comparison logged: match={match}")
---------------------------------------------------------------------------
CLI Entry Point
---------------------------------------------------------------------------
def main():
"""CLI entry point: icarus start, icarus stop, icarus status."""
import argparse
parser = argparse.ArgumentParser(description="Icarus — sovereign home intelligence")
sub = parser.add_subparsers(dest="command")
start_parser = sub.add_parser("start", help="Start the Icarus daemon")
start_parser.add_argument("--daemonize", action="store_true", help="Run as background daemon")
sub.add_parser("stop", help="Stop the Icarus daemon")
sub.add_parser("status", help="Check daemon status")
sub.add_parser("config", help="Show current configuration")
sub.add_parser("events", help="Show recent events")
args = parser.parse_args()
if args.command == "start":
setup_logging()
if args.daemonize:
# Fork to background
pid = os.fork()
if pid > 0:
print(f"Icarus daemon starting (PID {pid})")
sys.exit(0)
# Write PID file
pid_path = Path("/tmp/icarus.pid")
pid_path.write_text(str(os.getpid()))
config = DaemonConfig()
daemon = IcarusDaemon(config)
# Handle signals
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda: asyncio.ensure_future(_shutdown(daemon, loop)))
try:
loop.run_until_complete(daemon.start())
except (KeyboardInterrupt, SystemExit):
pass
finally:
loop.close()
if pid_path.exists():
pid_path.unlink()
elif args.command == "stop":
pid_path = Path("/tmp/icarus.pid")
if pid_path.exists():
pid = int(pid_path.read_text().strip())
try:
os.kill(pid, signal.SIGTERM)
print(f"Icarus daemon (PID {pid}) stopped")
pid_path.unlink()
except ProcessLookupError:
print("Icarus daemon not running")
pid_path.unlink()
else:
print("Icarus daemon not running")
elif args.command == "status":
pid_path = Path("/tmp/icarus.pid")
if pid_path.exists():
pid = int(pid_path.read_text().strip())
try:
os.kill(pid, 0) # check if running
print(f"Icarus daemon running (PID {pid})")
except ProcessLookupError:
print("Icarus daemon: pid file exists but process not running")
pid_path.unlink()
else:
print("Icarus daemon not running")
elif args.command == "config":
config = DaemonConfig()
family = load_family()
print(f"Family: {family.family_id} ({len(family.members)} members)")
print(f" IMAP: {'yes' if config.imap_enabled else 'no'}")
print(f" Telegram: {'yes' if config.telegram_enabled else 'no'}")
print(f" Ollama: {config.ollama_base_url} ({config.extraction_model})")
print(f" Verbose: {config.verbose}")
print(f" Data dir: {get_data_dir()}")
elif args.command == "events":
events = eg.get_recent_events(days=1, limit=10)
if not events:
print("No events in the last 24 hours")
else:
print(f"Recent events ({len(events)}):")
for e in events:
print(f" [{e['type']:15s}] {e['summary'][:60]} ({e.get('confidence', 0):.2f})")
async def _shutdown(daemon: IcarusDaemon, loop: asyncio.AbstractEventLoop):
"""Graceful shutdown handler."""
logger.info("Shutting down Icarus daemon...")
daemon.stop()
# Give tasks time to finish
await asyncio.sleep(1)
loop.stop()
if name == "main":
main()