📄 daemon.py 27,077 bytes Today 22:46 📋 Raw

!/usr/bin/env python3

"""Icarus daemon — the bespoke always-on service.

Self-contained scheduler with:
- IMAP polling → tripwire → extraction → event graph
- Telegram long-polling → tripwire → extraction → event graph
- Cron jobs (HBM, TTL purge, brain ingest, metrics)

Runs as a standalone process. No OpenClaw dependency.
"""

import asyncio
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

import yaml

Import Icarus modules

from icarus.config import load_family, get_data_dir
from icarus.tripwire import run_tripwire, TripwireResult
from icarus.extractor import extract
from icarus import event_graph as eg

---------------------------------------------------------------------------

Logging

---------------------------------------------------------------------------

def setup_logging():
data_dir = get_data_dir()
data_dir.mkdir(parents=True, exist_ok=True)
log_path = data_dir / "icarus.log"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    handlers=[
        logging.FileHandler(str(log_path)),
        logging.StreamHandler(sys.stdout),
    ],
)

logger = logging.getLogger("icarus.daemon")

---------------------------------------------------------------------------

Configuration

---------------------------------------------------------------------------

class DaemonConfig:
"""Runtime configuration loaded from env + config file."""

def __init__(self):
    # IMAP
    self.imap_enabled = bool(os.environ.get("IMAP_USER"))
    self.imap_user = os.environ.get("IMAP_USER", "")
    self.imap_password = os.environ.get("IMAP_PASSWORD", "")
    self.imap_provider = os.environ.get("IMAP_PROVIDER", "icloud")
    self.imap_host = os.environ.get("IMAP_HOST", "imap.mail.me.com")
    self.imap_port = int(os.environ.get("IMAP_PORT", "993"))
    self.imap_poll_interval = int(os.environ.get("IMAP_POLL_INTERVAL", "60"))

    # Telegram
    self.telegram_enabled = bool(os.environ.get("TELEGRAM_BOT_TOKEN"))
    self.telegram_token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
    self.telegram_group_id = os.environ.get("TELEGRAM_GROUP_ID", "")

    # Ollama — Gaming PC default, localhost fallback
    self.ollama_base_url = os.environ.get(
        "OLLAMA_BASE_URL", "http://matt-pc.tail864e81.ts.net:11434"
    )
    self.extraction_model = os.environ.get(
        "ICARUS_EXTRACT_MODEL", "qwen2.5-coder:7b"
    )

    # Scheduling
    self.ttl_hours = int(os.environ.get("SHADOW_TTL_HOURS", "24"))
    self.brain_ingest_interval = int(
        os.environ.get("BRAIN_INGEST_INTERVAL", "3600")
    )
    self.verbose = os.environ.get("ICARUS_VERBOSE", "false").lower() == "true"

    # Telegram admin DM for notifications
    self.admin_chat_id = os.environ.get("ICARUS_ADMIN_CHAT_ID", "")

    # Experiment mode (shadow testing)
    self.experiment_enabled = (
        os.environ.get("ICARUS_EXPERIMENT", "false").lower() == "true"
    )
    self.experiment_name = os.environ.get("ICARUS_EXPERIMENT_NAME", "")
    self.experiment_log_comparisons = (
        os.environ.get("ICARUS_EXPERIMENT_LOG", "false").lower() == "true"
    )
    self.experiment_auto_promote = (
        os.environ.get("ICARUS_EXPERIMENT_PROMOTE", "false").lower() == "true"
    )

---------------------------------------------------------------------------

IMAP Poller

---------------------------------------------------------------------------

class IMAPPoller:
"""Poll IMAP inbox, run tripwire + extraction on each new email."""

def __init__(self, config: DaemonConfig, family):
    self.config = config
    self.family = family
    self._processed_ids: set[str] = set()
    self._persist_path = Path("/tmp/icarus-imap-processed.json")
    self._load_processed()

def _load_processed(self):
    if self._persist_path.exists():
        try:
            data = json.loads(self._persist_path.read_text())
            self._processed_ids = set(data) if isinstance(data, list) else set()
        except (json.JSONDecodeError, OSError):
            self._processed_ids = set()

def _save_processed(self):
    try:
        self._persist_path.write_text(
            json.dumps(list(self._processed_ids)[-1000:])
        )
    except OSError:
        pass

async def poll_once(self):
    """Poll IMAP, process any new emails, return count processed."""
    if not self.config.imap_enabled:
        return 0

    import imaplib
    import email as eml
    from email.header import decode_header

    try:
        mail = imaplib.IMAP4_SSL(self.config.imap_host, self.config.imap_port)
        mail.login(self.config.imap_user, self.config.imap_password)
        mail.select("INBOX")

        # Fetch recent unseen emails
        _status, ids = mail.search(None, "UNSEEN")
        if not ids[0]:
            mail.logout()
            return 0

        email_ids = ids[0].split()[-10:]  # last 10 unseen
        processed = 0

        for eid in email_ids:
            _status, data = mail.fetch(eid, "(RFC822)")
            if not data or not data[0]:
                continue

            raw_msg = data[0][1]
            msg = eml.message_from_bytes(raw_msg)

            subject = self._decode_str(msg.get("Subject", ""))
            sender = self._decode_str(msg.get("From", ""))
            body = self._get_body(msg)

            source_id = f"imap:{subject}:{sender}"
            if source_id in self._processed_ids:
                continue

            # Run tripwire
            tripwire = run_tripwire(f"{subject}\n{body}")

            if tripwire.fired:
                # Experiment mode: run shadow extraction alongside production
                if self.config.experiment_enabled and self.config.experiment_name:
                    extraction = await self._run_experiment(
                        f"{subject}\n{body}", tripwire
                    )
                else:
                    extraction = await extract(
                        f"{subject}\n{body}", tripwire,
                        self.config.ollama_base_url, self.config.extraction_model
                    )

                if extraction.confidence >= 0.3:
                    eg.insert_event(
                        source="email",
                        source_text=f"{subject}\n{body}",
                        event_type=extraction.event_type,
                        summary=extraction.summary or subject,
                        dates=extraction.dates,
                        times=extraction.times,
                        people=extraction.people,
                        context=extraction.context,
                        location=extraction.location,
                        action_needed=extraction.action_needed,
                        confidence=extraction.confidence,
                        needs_confirmation=extraction.confidence < self.family.fallback_threshold,
                        raw_extraction=extraction.raw,
                        source_id=source_id,
                    )

                    if self.config.verbose:
                        logger.info(
                            f"IMAP extracted: {extraction.summary} "
                            f"({extraction.event_type} @ {extraction.confidence})"
                        )

            self._processed_ids.add(source_id)
            processed += 1

        mail.logout()
        self._save_processed()
        return processed

    except Exception as e:
        logger.error(f"IMAP poll failed: {e}")
        return 0

@staticmethod
def _decode_str(s: bytes | str) -> str:
    if isinstance(s, bytes):
        try:
            return s.decode("utf-8", errors="replace")
        except (UnicodeDecodeError, AttributeError):
            return str(s)
    return s

@staticmethod
def _get_body(msg) -> str:
    """Extract plain text body from email message."""
    if msg.is_multipart():
        for part in msg.walk():
            if part.get_content_type() == "text/plain":
                payload = part.get_payload(decode=True)
                if payload:
                    try:
                        return payload.decode("utf-8", errors="replace")
                    except (UnicodeDecodeError, AttributeError):
                        return str(payload)
    else:
        payload = msg.get_payload(decode=True)
        if payload:
            try:
                return payload.decode("utf-8", errors="replace")
            except (UnicodeDecodeError, AttributeError):
                return str(payload)
    return "(no body)"

---------------------------------------------------------------------------

Telegram Shadow Poller

---------------------------------------------------------------------------

class TelegramPoller:
"""Long-poll Telegram for new group messages. Silent observer mode."""

def __init__(self, config: DaemonConfig, family):
    self.config = config
    self.family = family
    self._offset: Optional[int] = None
    self._running = False

async def poll_once(self) -> int:
    """Poll Telegram, process any new messages. Returns count processed."""
    if not self.config.telegram_enabled:
        return 0

    import httpx

    base = f"https://api.telegram.org/bot{self.config.telegram_token}"
    params = {"timeout": 30, "allowed_updates": json.dumps(["message"])}
    if self._offset:
        params["offset"] = self._offset

    try:
        async with httpx.AsyncClient(timeout=35) as client:
            resp = await client.get(f"{base}/getUpdates", params=params)
            resp.raise_for_status()
            data = resp.json()
    except Exception as e:
        logger.warning(f"Telegram poll failed: {e}")
        return 0

    updates = data.get("result", [])
    if not updates:
        return 0

    # Update offset to ack
    self._offset = updates[-1]["update_id"] + 1

    processed = 0
    for update in updates:
        msg = update.get("message")
        if not msg:
            continue

        chat_id = str(msg.get("chat", {}).get("id"))
        # Only process the configured family group
        if chat_id != self.config.telegram_group_id:
            continue

        text = msg.get("text", "")
        if not text:
            continue

        source_id = f"telegram:{msg['message_id']}"
        start = time.monotonic()

        # Tier 1: Tripwire
        tripwire = run_tripwire(text)

        if not tripwire.fired:
            continue

        # Tier 2: LLM extraction
        if self.config.experiment_enabled and self.config.experiment_name:
            extraction = await self._run_experiment(text, tripwire)
        else:
            extraction = await extract(
                text, tripwire,
                self.config.ollama_base_url, self.config.extraction_model
            )

        elapsed_ms = int((time.monotonic() - start) * 1000)

        # Log extraction
        eg.insert_extraction_log(
            source_id=source_id,
            tripwire_score=tripwire.score,
            tripwire_patterns=tripwire.patterns_matched,
            llm_output=json.dumps(extraction.raw) if extraction.raw else "{}",
            extraction_result={
                "event_type": extraction.event_type,
                "summary": extraction.summary,
                "confidence": extraction.confidence,
            },
            duration_ms=elapsed_ms,
        )

        if extraction.confidence >= 0.3:
            eg.insert_event(
                source="telegram",
                source_text=text,
                event_type=extraction.event_type,
                summary=extraction.summary,
                dates=extraction.dates,
                times=extraction.times,
                people=extraction.people,
                context=extraction.context,
                location=extraction.location,
                action_needed=extraction.action_needed,
                confidence=extraction.confidence,
                needs_confirmation=extraction.confidence < self.family.fallback_threshold,
                raw_extraction=extraction.raw,
                source_id=source_id,
            )

            if self.config.verbose:
                logger.info(
                    f"Telegram extracted: {extraction.summary} "
                    f"({extraction.event_type} @ {extraction.confidence})"
                )

        # Check calendar conflicts for high-confidence calendar events
        if (extraction.event_type == "calendar_event"
                and extraction.confidence >= 0.5
                and extraction.dates
                and extraction.times):
            await self._check_calendar_conflicts(extraction, tripwire)

        processed += 1

    return processed

async def _check_calendar_conflicts(self, extraction, tripwire):
    """Check Radicale calendar for conflicts and DM admin if found."""
    import httpx

    cal_url = os.environ.get("CALDAV_URL", "http://127.0.0.1:5232")
    cal_user = os.environ.get("CALDAV_USER", "")
    cal_pass = os.environ.get("CALDAV_PASSWORD", "")

    if not cal_user or not self.config.admin_chat_id:
        return

    try:
        # Basic CalDAV query for the event date
        date = extraction.dates[0]
        async with httpx.AsyncClient(timeout=10, auth=(cal_user, cal_pass)) as client:
            # Use CalDAV REPORT to find events on this date
            report_body = f"""<?xml version="1.0" encoding="utf-8"?>













"""

            resp = await client.request(
                "REPORT", f"{cal_url}/family/",
                content=report_body,
                headers={"Depth": "1"},
            )

            if resp.status_code == 207:
                # Found existing events on same day — log potential conflict
                logger.info(
                    f"Potential calendar conflict on {date}: "
                    f"new '{extraction.summary}' vs existing calendar events"
                )
                # TODO: DM admin notification via Telegram
    except Exception as e:
        logger.debug(f"Calendar conflict check failed: {e}")

---------------------------------------------------------------------------

Cron Jobs (APScheduler)

---------------------------------------------------------------------------

def run_ttl_purge():
"""Purge old extraction logs."""
import sqlite3
try:
db_path = get_data_dir() / "icarus.db"
if not db_path.exists():
return
conn = sqlite3.connect(str(db_path))
conn.execute(
"DELETE FROM extractions WHERE created_at < datetime('now', '-7 days', 'utc')"
)
conn.commit()
conn.close()
logger.info("TTL purge: extraction logs >7 days removed")
except Exception as e:
logger.error(f"TTL purge failed: {e}")

def run_brain_ingest():
"""Ingest recent events into the ChromaDB brain."""
from icarus.brain import ingest as brain_ingest

try:
    events = eg.get_recent_events(days=1, limit=50)
    ingested = 0
    for event in events:
        doc_id = f"event:{event['source_id']}"
        text = f"{event['summary']} — {event.get('context', '')} — {event.get('action_needed', '')}"
        metadata = {
            "type": event["type"],
            "source": event["source"],
            "created": event["created_at"],
            "people": event["people"],
        }
        if brain_ingest(doc_id, text, metadata):
            ingested += 1
    logger.info(f"Brain ingest: {ingested}/{len(events)} events indexed")
except Exception as e:
    logger.error(f"Brain ingest failed: {e}")

---------------------------------------------------------------------------

Daemon Main

---------------------------------------------------------------------------

class IcarusDaemon:
"""The main daemon — runs all polling loops and scheduled jobs."""

def __init__(self, config: Optional[DaemonConfig] = None):
    self.config = config or DaemonConfig()
    self.family = load_family()
    self.imap_poller = IMAPPoller(self.config, self.family)
    self.tg_poller = TelegramPoller(self.config, self.family)
    self._running = False
    self._tasks: list[asyncio.Task] = []

async def _imap_loop(self):
    """Poll IMAP every N seconds."""
    while self._running:
        count = await self.imap_poller.poll_once()
        if count and count > 0 and self.config.verbose:
            logger.info(f"IMAP: processed {count} emails")
        await asyncio.sleep(self.config.imap_poll_interval)

async def _telegram_loop(self):
    """Long-poll Telegram continuously."""
    while self._running:
        count = await self.tg_poller.poll_once()
        if count and count > 0 and self.config.verbose:
            logger.info(f"Telegram: processed {count} messages")
        # No sleep  long-poll blocks for up to 30s internally

async def _scheduler_loop(self):
    """Simple internal scheduler for cron-like jobs."""
    import math

    last_ttl = 0
    last_brain = 0

    while self._running:
        now = time.time()

        # TTL purge every 6 hours
        if now - last_ttl >= 6 * 3600:
            run_ttl_purge()
            last_ttl = now

        # Brain ingest every hour
        if now - last_brain >= 3600:
            run_brain_ingest()
            last_brain = now

        await asyncio.sleep(60)  # check every minute

async def start(self):
    """Start the daemon."""
    self._running = True
    logger.info(f"Icarus daemon starting (family={self.family.family_id})")
    logger.info(f"  IMAP: {'enabled' if self.config.imap_enabled else 'disabled'}")
    logger.info(f"  Telegram: {'enabled' if self.config.telegram_enabled else 'disabled'}")
    logger.info(f"  Ollama: {self.config.ollama_base_url} ({self.config.extraction_model})")

    # Initialize event graph DB
    eg._ensure_db()

    tasks = []
    if self.config.imap_enabled:
        tasks.append(asyncio.create_task(self._imap_loop()))
    if self.config.telegram_enabled:
        tasks.append(asyncio.create_task(self._telegram_loop()))
    tasks.append(asyncio.create_task(self._scheduler_loop()))

    self._tasks = tasks

    # Wait until cancelled
    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        pass
    finally:
        self._running = False
        logger.info("Icarus daemon stopped")

def stop(self):
    """Signal the daemon to stop."""
    self._running = False
    for t in self._tasks:
        t.cancel()

async def _run_experiment(self, text: str, tripwire):
    """Run shadow extraction if a named experiment is active."""
    exp_name = self.config.experiment_name
    log_path = Path.home() / "icarus" / "experiments" / exp_name / "eval_results"
    log_path.mkdir(parents=True, exist_ok=True)

    # Run production extractor first
    prod_result = await extract(
        text, tripwire,
        self.config.ollama_base_url, self.config.extraction_model
    )

    # Try to load experiment extractor
    try:
        import importlib
        mod = importlib.import_module(f"experiments.{exp_name}.adapter")
        class_name = f"{exp_name.title()}Extractor"
        exp_class = getattr(mod, class_name, None)
        if exp_class:
            exp_instance = exp_class()
            exp_result = exp_instance.extract(text)
            if exp_result and self.config.experiment_log_comparisons:
                self._log_experiment_comparison(
                    text, prod_result, exp_result, log_path
                )
            if self.config.experiment_auto_promote and exp_result:
                if self.config.verbose:
                    print(f"[experiment] auto-promoting {exp_name} result")
                return exp_result
    except (ImportError, AttributeError, NotImplementedError) as e:
        if self.config.verbose:
            print(f"[experiment] {exp_name} not available: {e}")

    return prod_result

@staticmethod
def _log_experiment_comparison(text, prod, exp, log_path):
    """Log extraction comparison to experiment eval_results."""
    import json
    from datetime import datetime

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    entry = {
        "timestamp": timestamp,
        "text": text[:500],
        "production": {
            "event_type": prod.event_type if prod else None,
            "summary": prod.summary if prod else None,
            "confidence": prod.confidence if prod else None,
        },
        "experiment": {
            "event_type": exp.event_type if exp else None,
            "summary": exp.summary if exp else None,
            "confidence": exp.confidence if exp else None,
        },
    }
    log_file = log_path / f"comparison-{timestamp}.json"
    with open(log_file, "w") as f:
        json.dump(entry, f, indent=2)
    if prod and exp:
        match = (
            prod.event_type == exp.event_type
            and prod.summary == exp.summary
        )
        print(f"[experiment] comparison logged: match={match}")

---------------------------------------------------------------------------

CLI Entry Point

---------------------------------------------------------------------------

def main():
"""CLI entry point: icarus start, icarus stop, icarus status."""
import argparse

parser = argparse.ArgumentParser(description="Icarus — sovereign home intelligence")
sub = parser.add_subparsers(dest="command")

start_parser = sub.add_parser("start", help="Start the Icarus daemon")
start_parser.add_argument("--daemonize", action="store_true", help="Run as background daemon")

sub.add_parser("stop", help="Stop the Icarus daemon")
sub.add_parser("status", help="Check daemon status")
sub.add_parser("config", help="Show current configuration")
sub.add_parser("events", help="Show recent events")

args = parser.parse_args()

if args.command == "start":
    setup_logging()

    if args.daemonize:
        # Fork to background
        pid = os.fork()
        if pid > 0:
            print(f"Icarus daemon starting (PID {pid})")
            sys.exit(0)

    # Write PID file
    pid_path = Path("/tmp/icarus.pid")
    pid_path.write_text(str(os.getpid()))

    config = DaemonConfig()
    daemon = IcarusDaemon(config)

    # Handle signals
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, lambda: asyncio.ensure_future(_shutdown(daemon, loop)))

    try:
        loop.run_until_complete(daemon.start())
    except (KeyboardInterrupt, SystemExit):
        pass
    finally:
        loop.close()
        if pid_path.exists():
            pid_path.unlink()

elif args.command == "stop":
    pid_path = Path("/tmp/icarus.pid")
    if pid_path.exists():
        pid = int(pid_path.read_text().strip())
        try:
            os.kill(pid, signal.SIGTERM)
            print(f"Icarus daemon (PID {pid}) stopped")
            pid_path.unlink()
        except ProcessLookupError:
            print("Icarus daemon not running")
            pid_path.unlink()
    else:
        print("Icarus daemon not running")

elif args.command == "status":
    pid_path = Path("/tmp/icarus.pid")
    if pid_path.exists():
        pid = int(pid_path.read_text().strip())
        try:
            os.kill(pid, 0)  # check if running
            print(f"Icarus daemon running (PID {pid})")
        except ProcessLookupError:
            print("Icarus daemon: pid file exists but process not running")
            pid_path.unlink()
    else:
        print("Icarus daemon not running")

elif args.command == "config":
    config = DaemonConfig()
    family = load_family()
    print(f"Family: {family.family_id} ({len(family.members)} members)")
    print(f"  IMAP: {'yes' if config.imap_enabled else 'no'}")
    print(f"  Telegram: {'yes' if config.telegram_enabled else 'no'}")
    print(f"  Ollama: {config.ollama_base_url} ({config.extraction_model})")
    print(f"  Verbose: {config.verbose}")
    print(f"  Data dir: {get_data_dir()}")

elif args.command == "events":
    events = eg.get_recent_events(days=1, limit=10)
    if not events:
        print("No events in the last 24 hours")
    else:
        print(f"Recent events ({len(events)}):")
        for e in events:
            print(f"  [{e['type']:15s}] {e['summary'][:60]} ({e.get('confidence', 0):.2f})")

async def _shutdown(daemon: IcarusDaemon, loop: asyncio.AbstractEventLoop):
"""Graceful shutdown handler."""
logger.info("Shutting down Icarus daemon...")
daemon.stop()
# Give tasks time to finish
await asyncio.sleep(1)
loop.stop()

if name == "main":
main()