#!/usr/bin/env python3 """Icarus daemon — the bespoke always-on service. Self-contained scheduler with: - IMAP polling → tripwire → extraction → event graph - Telegram long-polling → tripwire → extraction → event graph - Cron jobs (HBM, TTL purge, brain ingest, metrics) Runs as a standalone process. No OpenClaw dependency. """ import asyncio import json import logging import os import signal import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Optional import yaml # Import Icarus modules from icarus.config import load_family, get_data_dir from icarus.tripwire import run_tripwire, TripwireResult from icarus.extractor import extract from icarus import event_graph as eg # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- def setup_logging(): data_dir = get_data_dir() data_dir.mkdir(parents=True, exist_ok=True) log_path = data_dir / "icarus.log" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.FileHandler(str(log_path)), logging.StreamHandler(sys.stdout), ], ) logger = logging.getLogger("icarus.daemon") # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- class DaemonConfig: """Runtime configuration loaded from env + config file.""" def __init__(self): # IMAP self.imap_enabled = bool(os.environ.get("IMAP_USER")) self.imap_user = os.environ.get("IMAP_USER", "") self.imap_password = os.environ.get("IMAP_PASSWORD", "") self.imap_provider = os.environ.get("IMAP_PROVIDER", "icloud") self.imap_host = os.environ.get("IMAP_HOST", "imap.mail.me.com") self.imap_port = int(os.environ.get("IMAP_PORT", "993")) self.imap_poll_interval = int(os.environ.get("IMAP_POLL_INTERVAL", "60")) # Telegram self.telegram_enabled = bool(os.environ.get("TELEGRAM_BOT_TOKEN")) self.telegram_token = os.environ.get("TELEGRAM_BOT_TOKEN", "") self.telegram_group_id = os.environ.get("TELEGRAM_GROUP_ID", "") # Ollama — Gaming PC default, localhost fallback self.ollama_base_url = os.environ.get( "OLLAMA_BASE_URL", "http://matt-pc.tail864e81.ts.net:11434" ) self.extraction_model = os.environ.get( "ICARUS_EXTRACT_MODEL", "qwen2.5-coder:7b" ) # Scheduling self.ttl_hours = int(os.environ.get("SHADOW_TTL_HOURS", "24")) self.brain_ingest_interval = int( os.environ.get("BRAIN_INGEST_INTERVAL", "3600") ) self.verbose = os.environ.get("ICARUS_VERBOSE", "false").lower() == "true" # Telegram admin DM for notifications self.admin_chat_id = os.environ.get("ICARUS_ADMIN_CHAT_ID", "") # Experiment mode (shadow testing) self.experiment_enabled = ( os.environ.get("ICARUS_EXPERIMENT", "false").lower() == "true" ) self.experiment_name = os.environ.get("ICARUS_EXPERIMENT_NAME", "") self.experiment_log_comparisons = ( os.environ.get("ICARUS_EXPERIMENT_LOG", "false").lower() == "true" ) self.experiment_auto_promote = ( os.environ.get("ICARUS_EXPERIMENT_PROMOTE", "false").lower() == "true" ) # --------------------------------------------------------------------------- # IMAP Poller # --------------------------------------------------------------------------- class IMAPPoller: """Poll IMAP inbox, run tripwire + extraction on each new email.""" def __init__(self, config: DaemonConfig, family): self.config = config self.family = family self._processed_ids: set[str] = set() self._persist_path = Path("/tmp/icarus-imap-processed.json") self._load_processed() def _load_processed(self): if self._persist_path.exists(): try: data = json.loads(self._persist_path.read_text()) self._processed_ids = set(data) if isinstance(data, list) else set() except (json.JSONDecodeError, OSError): self._processed_ids = set() def _save_processed(self): try: self._persist_path.write_text( json.dumps(list(self._processed_ids)[-1000:]) ) except OSError: pass async def poll_once(self): """Poll IMAP, process any new emails, return count processed.""" if not self.config.imap_enabled: return 0 import imaplib import email as eml from email.header import decode_header try: mail = imaplib.IMAP4_SSL(self.config.imap_host, self.config.imap_port) mail.login(self.config.imap_user, self.config.imap_password) mail.select("INBOX") # Fetch recent unseen emails _status, ids = mail.search(None, "UNSEEN") if not ids[0]: mail.logout() return 0 email_ids = ids[0].split()[-10:] # last 10 unseen processed = 0 for eid in email_ids: _status, data = mail.fetch(eid, "(RFC822)") if not data or not data[0]: continue raw_msg = data[0][1] msg = eml.message_from_bytes(raw_msg) subject = self._decode_str(msg.get("Subject", "")) sender = self._decode_str(msg.get("From", "")) body = self._get_body(msg) source_id = f"imap:{subject}:{sender}" if source_id in self._processed_ids: continue # Run tripwire tripwire = run_tripwire(f"{subject}\n{body}") if tripwire.fired: # Experiment mode: run shadow extraction alongside production if self.config.experiment_enabled and self.config.experiment_name: extraction = await self._run_experiment( f"{subject}\n{body}", tripwire ) else: extraction = await extract( f"{subject}\n{body}", tripwire, self.config.ollama_base_url, self.config.extraction_model ) if extraction.confidence >= 0.3: eg.insert_event( source="email", source_text=f"{subject}\n{body}", event_type=extraction.event_type, summary=extraction.summary or subject, dates=extraction.dates, times=extraction.times, people=extraction.people, context=extraction.context, location=extraction.location, action_needed=extraction.action_needed, confidence=extraction.confidence, needs_confirmation=extraction.confidence < self.family.fallback_threshold, raw_extraction=extraction.raw, source_id=source_id, ) if self.config.verbose: logger.info( f"IMAP extracted: {extraction.summary} " f"({extraction.event_type} @ {extraction.confidence})" ) self._processed_ids.add(source_id) processed += 1 mail.logout() self._save_processed() return processed except Exception as e: logger.error(f"IMAP poll failed: {e}") return 0 @staticmethod def _decode_str(s: bytes | str) -> str: if isinstance(s, bytes): try: return s.decode("utf-8", errors="replace") except (UnicodeDecodeError, AttributeError): return str(s) return s @staticmethod def _get_body(msg) -> str: """Extract plain text body from email message.""" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if payload: try: return payload.decode("utf-8", errors="replace") except (UnicodeDecodeError, AttributeError): return str(payload) else: payload = msg.get_payload(decode=True) if payload: try: return payload.decode("utf-8", errors="replace") except (UnicodeDecodeError, AttributeError): return str(payload) return "(no body)" # --------------------------------------------------------------------------- # Telegram Shadow Poller # --------------------------------------------------------------------------- class TelegramPoller: """Long-poll Telegram for new group messages. Silent observer mode.""" def __init__(self, config: DaemonConfig, family): self.config = config self.family = family self._offset: Optional[int] = None self._running = False async def poll_once(self) -> int: """Poll Telegram, process any new messages. Returns count processed.""" if not self.config.telegram_enabled: return 0 import httpx base = f"https://api.telegram.org/bot{self.config.telegram_token}" params = {"timeout": 30, "allowed_updates": json.dumps(["message"])} if self._offset: params["offset"] = self._offset try: async with httpx.AsyncClient(timeout=35) as client: resp = await client.get(f"{base}/getUpdates", params=params) resp.raise_for_status() data = resp.json() except Exception as e: logger.warning(f"Telegram poll failed: {e}") return 0 updates = data.get("result", []) if not updates: return 0 # Update offset to ack self._offset = updates[-1]["update_id"] + 1 processed = 0 for update in updates: msg = update.get("message") if not msg: continue chat_id = str(msg.get("chat", {}).get("id")) # Only process the configured family group if chat_id != self.config.telegram_group_id: continue text = msg.get("text", "") if not text: continue source_id = f"telegram:{msg['message_id']}" start = time.monotonic() # Tier 1: Tripwire tripwire = run_tripwire(text) if not tripwire.fired: continue # Tier 2: LLM extraction if self.config.experiment_enabled and self.config.experiment_name: extraction = await self._run_experiment(text, tripwire) else: extraction = await extract( text, tripwire, self.config.ollama_base_url, self.config.extraction_model ) elapsed_ms = int((time.monotonic() - start) * 1000) # Log extraction eg.insert_extraction_log( source_id=source_id, tripwire_score=tripwire.score, tripwire_patterns=tripwire.patterns_matched, llm_output=json.dumps(extraction.raw) if extraction.raw else "{}", extraction_result={ "event_type": extraction.event_type, "summary": extraction.summary, "confidence": extraction.confidence, }, duration_ms=elapsed_ms, ) if extraction.confidence >= 0.3: eg.insert_event( source="telegram", source_text=text, event_type=extraction.event_type, summary=extraction.summary, dates=extraction.dates, times=extraction.times, people=extraction.people, context=extraction.context, location=extraction.location, action_needed=extraction.action_needed, confidence=extraction.confidence, needs_confirmation=extraction.confidence < self.family.fallback_threshold, raw_extraction=extraction.raw, source_id=source_id, ) if self.config.verbose: logger.info( f"Telegram extracted: {extraction.summary} " f"({extraction.event_type} @ {extraction.confidence})" ) # Check calendar conflicts for high-confidence calendar events if (extraction.event_type == "calendar_event" and extraction.confidence >= 0.5 and extraction.dates and extraction.times): await self._check_calendar_conflicts(extraction, tripwire) processed += 1 return processed async def _check_calendar_conflicts(self, extraction, tripwire): """Check Radicale calendar for conflicts and DM admin if found.""" import httpx cal_url = os.environ.get("CALDAV_URL", "http://127.0.0.1:5232") cal_user = os.environ.get("CALDAV_USER", "") cal_pass = os.environ.get("CALDAV_PASSWORD", "") if not cal_user or not self.config.admin_chat_id: return try: # Basic CalDAV query for the event date date = extraction.dates[0] async with httpx.AsyncClient(timeout=10, auth=(cal_user, cal_pass)) as client: # Use CalDAV REPORT to find events on this date report_body = f""" """ resp = await client.request( "REPORT", f"{cal_url}/family/", content=report_body, headers={"Depth": "1"}, ) if resp.status_code == 207: # Found existing events on same day — log potential conflict logger.info( f"Potential calendar conflict on {date}: " f"new '{extraction.summary}' vs existing calendar events" ) # TODO: DM admin notification via Telegram except Exception as e: logger.debug(f"Calendar conflict check failed: {e}") # --------------------------------------------------------------------------- # Cron Jobs (APScheduler) # --------------------------------------------------------------------------- def run_ttl_purge(): """Purge old extraction logs.""" import sqlite3 try: db_path = get_data_dir() / "icarus.db" if not db_path.exists(): return conn = sqlite3.connect(str(db_path)) conn.execute( "DELETE FROM extractions WHERE created_at < datetime('now', '-7 days', 'utc')" ) conn.commit() conn.close() logger.info("TTL purge: extraction logs >7 days removed") except Exception as e: logger.error(f"TTL purge failed: {e}") def run_brain_ingest(): """Ingest recent events into the ChromaDB brain.""" from icarus.brain import ingest as brain_ingest try: events = eg.get_recent_events(days=1, limit=50) ingested = 0 for event in events: doc_id = f"event:{event['source_id']}" text = f"{event['summary']} — {event.get('context', '')} — {event.get('action_needed', '')}" metadata = { "type": event["type"], "source": event["source"], "created": event["created_at"], "people": event["people"], } if brain_ingest(doc_id, text, metadata): ingested += 1 logger.info(f"Brain ingest: {ingested}/{len(events)} events indexed") except Exception as e: logger.error(f"Brain ingest failed: {e}") # --------------------------------------------------------------------------- # Daemon Main # --------------------------------------------------------------------------- class IcarusDaemon: """The main daemon — runs all polling loops and scheduled jobs.""" def __init__(self, config: Optional[DaemonConfig] = None): self.config = config or DaemonConfig() self.family = load_family() self.imap_poller = IMAPPoller(self.config, self.family) self.tg_poller = TelegramPoller(self.config, self.family) self._running = False self._tasks: list[asyncio.Task] = [] async def _imap_loop(self): """Poll IMAP every N seconds.""" while self._running: count = await self.imap_poller.poll_once() if count and count > 0 and self.config.verbose: logger.info(f"IMAP: processed {count} emails") await asyncio.sleep(self.config.imap_poll_interval) async def _telegram_loop(self): """Long-poll Telegram continuously.""" while self._running: count = await self.tg_poller.poll_once() if count and count > 0 and self.config.verbose: logger.info(f"Telegram: processed {count} messages") # No sleep — long-poll blocks for up to 30s internally async def _scheduler_loop(self): """Simple internal scheduler for cron-like jobs.""" import math last_ttl = 0 last_brain = 0 while self._running: now = time.time() # TTL purge every 6 hours if now - last_ttl >= 6 * 3600: run_ttl_purge() last_ttl = now # Brain ingest every hour if now - last_brain >= 3600: run_brain_ingest() last_brain = now await asyncio.sleep(60) # check every minute async def start(self): """Start the daemon.""" self._running = True logger.info(f"Icarus daemon starting (family={self.family.family_id})") logger.info(f" IMAP: {'enabled' if self.config.imap_enabled else 'disabled'}") logger.info(f" Telegram: {'enabled' if self.config.telegram_enabled else 'disabled'}") logger.info(f" Ollama: {self.config.ollama_base_url} ({self.config.extraction_model})") # Initialize event graph DB eg._ensure_db() tasks = [] if self.config.imap_enabled: tasks.append(asyncio.create_task(self._imap_loop())) if self.config.telegram_enabled: tasks.append(asyncio.create_task(self._telegram_loop())) tasks.append(asyncio.create_task(self._scheduler_loop())) self._tasks = tasks # Wait until cancelled try: await asyncio.gather(*tasks) except asyncio.CancelledError: pass finally: self._running = False logger.info("Icarus daemon stopped") def stop(self): """Signal the daemon to stop.""" self._running = False for t in self._tasks: t.cancel() async def _run_experiment(self, text: str, tripwire): """Run shadow extraction if a named experiment is active.""" exp_name = self.config.experiment_name log_path = Path.home() / "icarus" / "experiments" / exp_name / "eval_results" log_path.mkdir(parents=True, exist_ok=True) # Run production extractor first prod_result = await extract( text, tripwire, self.config.ollama_base_url, self.config.extraction_model ) # Try to load experiment extractor try: import importlib mod = importlib.import_module(f"experiments.{exp_name}.adapter") class_name = f"{exp_name.title()}Extractor" exp_class = getattr(mod, class_name, None) if exp_class: exp_instance = exp_class() exp_result = exp_instance.extract(text) if exp_result and self.config.experiment_log_comparisons: self._log_experiment_comparison( text, prod_result, exp_result, log_path ) if self.config.experiment_auto_promote and exp_result: if self.config.verbose: print(f"[experiment] auto-promoting {exp_name} result") return exp_result except (ImportError, AttributeError, NotImplementedError) as e: if self.config.verbose: print(f"[experiment] {exp_name} not available: {e}") return prod_result @staticmethod def _log_experiment_comparison(text, prod, exp, log_path): """Log extraction comparison to experiment eval_results.""" import json from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") entry = { "timestamp": timestamp, "text": text[:500], "production": { "event_type": prod.event_type if prod else None, "summary": prod.summary if prod else None, "confidence": prod.confidence if prod else None, }, "experiment": { "event_type": exp.event_type if exp else None, "summary": exp.summary if exp else None, "confidence": exp.confidence if exp else None, }, } log_file = log_path / f"comparison-{timestamp}.json" with open(log_file, "w") as f: json.dump(entry, f, indent=2) if prod and exp: match = ( prod.event_type == exp.event_type and prod.summary == exp.summary ) print(f"[experiment] comparison logged: match={match}") # --------------------------------------------------------------------------- # CLI Entry Point # --------------------------------------------------------------------------- def main(): """CLI entry point: icarus start, icarus stop, icarus status.""" import argparse parser = argparse.ArgumentParser(description="Icarus — sovereign home intelligence") sub = parser.add_subparsers(dest="command") start_parser = sub.add_parser("start", help="Start the Icarus daemon") start_parser.add_argument("--daemonize", action="store_true", help="Run as background daemon") sub.add_parser("stop", help="Stop the Icarus daemon") sub.add_parser("status", help="Check daemon status") sub.add_parser("config", help="Show current configuration") sub.add_parser("events", help="Show recent events") args = parser.parse_args() if args.command == "start": setup_logging() if args.daemonize: # Fork to background pid = os.fork() if pid > 0: print(f"Icarus daemon starting (PID {pid})") sys.exit(0) # Write PID file pid_path = Path("/tmp/icarus.pid") pid_path.write_text(str(os.getpid())) config = DaemonConfig() daemon = IcarusDaemon(config) # Handle signals loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, lambda: asyncio.ensure_future(_shutdown(daemon, loop))) try: loop.run_until_complete(daemon.start()) except (KeyboardInterrupt, SystemExit): pass finally: loop.close() if pid_path.exists(): pid_path.unlink() elif args.command == "stop": pid_path = Path("/tmp/icarus.pid") if pid_path.exists(): pid = int(pid_path.read_text().strip()) try: os.kill(pid, signal.SIGTERM) print(f"Icarus daemon (PID {pid}) stopped") pid_path.unlink() except ProcessLookupError: print("Icarus daemon not running") pid_path.unlink() else: print("Icarus daemon not running") elif args.command == "status": pid_path = Path("/tmp/icarus.pid") if pid_path.exists(): pid = int(pid_path.read_text().strip()) try: os.kill(pid, 0) # check if running print(f"Icarus daemon running (PID {pid})") except ProcessLookupError: print("Icarus daemon: pid file exists but process not running") pid_path.unlink() else: print("Icarus daemon not running") elif args.command == "config": config = DaemonConfig() family = load_family() print(f"Family: {family.family_id} ({len(family.members)} members)") print(f" IMAP: {'yes' if config.imap_enabled else 'no'}") print(f" Telegram: {'yes' if config.telegram_enabled else 'no'}") print(f" Ollama: {config.ollama_base_url} ({config.extraction_model})") print(f" Verbose: {config.verbose}") print(f" Data dir: {get_data_dir()}") elif args.command == "events": events = eg.get_recent_events(days=1, limit=10) if not events: print("No events in the last 24 hours") else: print(f"Recent events ({len(events)}):") for e in events: print(f" [{e['type']:15s}] {e['summary'][:60]} ({e.get('confidence', 0):.2f})") async def _shutdown(daemon: IcarusDaemon, loop: asyncio.AbstractEventLoop): """Graceful shutdown handler.""" logger.info("Shutting down Icarus daemon...") daemon.stop() # Give tasks time to finish await asyncio.sleep(1) loop.stop() if __name__ == "__main__": main()