📄 hoffgraft_steer.py 18,429 bytes Wednesday 03:35 📋 Raw

!/usr/bin/env python3

"""
HoffGraft — Phase 1: Runtime Steering Module
─────────────────────────────────────────────

At runtime, this module:
1. Classifies the incoming prompt into a reasoning domain
2. Loads the matching donor fingerprint (bias vector)
3. Calls Ollama with logit_bias to steer the chassis model
4. Returns the chassis-generated response (with big-model reasoning influence)

The chassis model stays in GPU VRAM at all times (7B–14B, ~4–9 GB).
The bias file is <5 MB and loaded once at startup.

Architecture:
User prompt
→ DomainClassifier (tiny model, ~100ms)
→ Load bias vector for domain
→ Ollama generate(..., logit_bias=biases) on chassis model
→ Return response

Usage:
# As a library
from hoffgraft_steer import HoffGraftSteerer
steerer = HoffGraftSteerer(
fingerprints_path="fingerprints/hoffgraft_fingerprints.npz",
chassis_model="qwen2.5-coder:14b",
ollama_host="http://127.0.0.1:11434",
)
response = steerer.steer("What's on my calendar tomorrow?")
print(response)

# As a CLI
python hoffgraft_steer.py \
--fingerprints fingerprints/hoffgraft_fingerprints.npz \
--chassis qwen2.5-coder:7b \
--prompt "Debug why the API is returning 500 errors"
"""

from future import annotations

import argparse
import json
import sys
import time
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

import numpy as np

── Lightweight domain classification ────────────────────────────────────────

Keyword + pattern-based classification (no model needed for this step).

Fast (<1ms), deterministic, good enough for our 5-domain use case.

DOMAIN_CLASSIFIERS = {
"scheduling": {
"keywords": [
"schedule", "calendar", "appointment", "meeting", "conflict",
"available", "time slot", "remind", "when is", "what time",
"plan", "week", "tomorrow", "today", "next monday", "next week",
"reschedule", "cancel", "postpone", "duration", "deadline",
"event", "book", "slot", "free", "busy",
],
"score_boost": 0.0,
},
"email_triage": {
"keywords": [
"email", "inbox", "message from", "classify", "spam", "receipt",
"newsletter", "unsubscribe", "forwarded", "fwd:", "subject:",
"reply to", "cc:", "bcc:", "attachment", "sender",
],
"score_boost": 0.0,
},
"coordination": {
"keywords": [
"can you", "could you", "please", "need to", "remind", "don't forget",
"pick up", "drop off", "stuck in", "handle", "grab", "get", "buy",
"shopping", "grocery", "milk", "eggs", "bread",
"who's", "whose turn", "could someone", "anyone", "help with",
"did you", "have you", "are we", "what about", "how about",
"let's", "we should", "do we need", "whose",
"dinner", "lunch", "breakfast", "pizza",
],
"score_boost": 0.0,
},
"content_generation": {
"keywords": [
"write", "draft", "summarize", "summarise", "briefing", "digest",
"rewrite", "edit", "compose", "generate", "create", "translate",
"explain", "describe", "bullet points", "paragraph", "article",
"report", "document", "text", "content",
"reply", "response", "declining", "accepting",
],
"score_boost": 0.0,
},
"analysis": {
"keywords": [
"debug", "error", "bug", "fix", "why is", "what caused",
"diagnose", "investigate", "troubleshoot", "broken", "failing",
"crash", "traceback", "log", "exception", "500", "503", "502",
"down", "unavailable", "slow", "performance", "bottleneck",
"regression", "analyse", "analyze", "root cause", "compare",
"tradeoff", "approach", "architecture",
],
"score_boost": 0.0,
},
}

def classify_domain(prompt: str) -> tuple[str, float]:
"""Classify a prompt into a reasoning domain.

Returns (domain_name, confidence).
Simple keyword-matching approach  deterministic, fast, works offline.
"""
prompt_lower = prompt.lower()
scores: dict[str, float] = {}

for domain, config in DOMAIN_CLASSIFIERS.items():
    score = config["score_boost"]
    for kw in config["keywords"]:
        if kw.lower() in prompt_lower:
            score += 1.0
    # Normalise by keyword count to avoid bias toward large keyword sets
    scores[domain] = score / max(len(config["keywords"]), 1)

if not scores:
    return ("analysis", 0.3)  # Default fallback

best_domain = max(scores, key=scores.get)
best_score = scores[best_domain]

# Calculate confidence: how much better is best vs second-best?
sorted_scores = sorted(scores.values(), reverse=True)
if len(sorted_scores) > 1 and sorted_scores[0] > 0:
    margin = (sorted_scores[0] - sorted_scores[1]) / max(sorted_scores[0], 0.001)
else:
    margin = 0.5

confidence = min(1.0, max(0.1, margin))

return (best_domain, confidence)

── Fingerprint loading ──────────────────────────────────────────────────────

@dataclass
class FingerprintSet:
domain: str
token_ids: np.ndarray # shape (top_k,)
biases: np.ndarray # shape (top_k,)

class FingerprintStore:
"""Loads and manages domain fingerprint vectors."""

def __init__(self, path: Path):
    self.path = path
    self._store: dict[str, FingerprintSet] = {}
    self._load()

def _load(self) -> None:
    if not self.path.exists():
        raise FileNotFoundError(
            f"Fingerprint file not found: {self.path}\n"
            f"Run hoffgraft_extract.py first to generate fingerprints."
        )

    data = np.load(str(self.path), allow_pickle=True)
    domains = data["_domains"].tolist()

    for domain in domains:
        token_ids = data[f"{domain}_token_ids"]
        biases = data[f"{domain}_biases"]
        self._store[domain] = FingerprintSet(
            domain=domain,
            token_ids=token_ids,
            biases=biases,
        )

    print(f"[hoffgraft] Loaded {len(self._store)} domain fingerprint sets", file=sys.stderr)

def get(self, domain: str) -> Optional[FingerprintSet]:
    return self._store.get(domain)

def available_domains(self) -> list[str]:
    return list(self._store.keys())

def get_bias_map(self, domain: str, max_biases: int = 128) -> dict[int, float]:
    """Return a {token_id: bias_value} dict suitable for Ollama's logit_bias.

    Clamps to max_biases to respect Ollama's API limits.
    """
    fp = self._store.get(domain)
    if fp is None:
        return {}

    n = min(len(fp.token_ids), max_biases)
    result = {}
    for i in range(n):
        tid = int(fp.token_ids[i])
        bias = float(fp.biases[i])
        # Clamp to Ollama's allowed range (-100 to 100)
        bias_clamped = max(-100.0, min(100.0, bias * 10.0))
        if abs(bias_clamped) > 0.01:
            result[tid] = bias_clamped
    return result

── Ollama integration ────────────────────────────────────────────────────────

class OllamaClient:
"""Minimal async-free Ollama client using urllib (no extra deps)."""

def __init__(self, host: str = "http://127.0.0.1:11434"):
    self.host = host.rstrip("/")

def _post(self, endpoint: str, payload: dict) -> dict:
    url = f"{self.host}/api/{endpoint}"
    body = json.dumps(payload).encode("utf-8")
    req = urllib.request.Request(
        url,
        data=body,
        headers={"Content-Type": "application/json"},
        method="POST",
    )
    try:
        with urllib.request.urlopen(req, timeout=120) as resp:
            return json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        detail = e.read().decode("utf-8", errors="replace") if e.fp else str(e)
        raise RuntimeError(f"Ollama HTTP {e.code}: {detail}") from e
    except urllib.error.URLError as e:
        raise RuntimeError(f"Cannot reach Ollama at {self.host}: {e}") from e

def generate(
    self,
    model: str,
    prompt: str,
    system: str = "",
    temperature: float = 0.7,
    max_tokens: int = 1024,
    logit_bias: Optional[dict[int, float]] = None,
) -> str:
    """Generate a completion from the chassis model with optional logit bias."""
    payload = {
        "model": model,
        "prompt": prompt,
        "stream": False,
        "options": {
            "temperature": temperature,
            "num_predict": max_tokens,
        },
    }
    if system:
        payload["system"] = system

    if logit_bias:
        # Ollama expects string keys for logit_bias
        payload["options"]["logit_bias"] = {
            str(k): v for k, v in logit_bias.items()
        }

    result = self._post("generate", payload)
    return result.get("response", "")

── Steerer ───────────────────────────────────────────────────────────────────

CHASSIS_SYSTEM_PROMPTS = {
"scheduling": (
"You are a precise scheduling assistant. Analyse calendar conflicts, "
"estimate durations realistically (including buffer time), and provide "
"actionable schedule recommendations. Always account for travel time, "
"preparation, and family coordination needs."
),
"email_triage": (
"You are an email triage specialist. Classify emails by urgency and type, "
"extract key details (dates, amounts, action items), and recommend "
"routing decisions. Distinguish between urgent action, informational, "
"spam, receipts, and school/medical/financial categories."
),
"coordination": (
"You are a family coordination assistant. Extract tasks, commitments, "
"and requests from natural language messages. Identify who needs to do "
"what, by when, and any dependencies. Detect implied calendar events "
"and shopping needs. Be precise and structured."
),
"content_generation": (
"You are a concise content writer. Generate summaries, briefings, emails, "
"and explanations that are clear, well-structured, and audience-appropriate. "
"Focus on the key information. Avoid fluff."
),
"analysis": (
"You are a systematic debugger and analyst. Diagnose problems by "
"eliminating possibilities, identify root causes, and propose concrete fixes. "
"Surface tradeoffs explicitly. Be engineering-grade and precise."
),
}

FALLBACK_SYSTEM = (
"You are a helpful, precise assistant. Answer directly and concisely."
)

class HoffGraftSteerer:
"""Main runtime steering engine.

Usage:
    steerer = HoffGraftSteerer("fingerprints.npz", "qwen2.5-coder:14b")
    response = steerer.steer("What's on my calendar tomorrow?")
"""

def __init__(
    self,
    fingerprints_path: str | Path,
    chassis_model: str = "qwen2.5-coder:14b",
    ollama_host: str = "http://127.0.0.1:11434",
):
    self.fingerprints = FingerprintStore(Path(fingerprints_path))
    self.chassis_model = chassis_model
    self.ollama = OllamaClient(host=ollama_host)
    self._domain_hits: dict[str, int] = {}

def steer(
    self,
    prompt: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    force_domain: Optional[str] = None,
) -> dict:
    """Steer the chassis model with donor biases and return the response.

    Args:
        prompt: The user's input
        temperature: Generation temperature (default 0.7)
        max_tokens: Max output tokens
        force_domain: Override domain classification (for testing)

    Returns:
        dict with keys: response, domain, confidence, bias_count, time_ms
    """
    t0 = time.time()

    # Step 1: Classify domain
    if force_domain:
        domain = force_domain
        confidence = 1.0
    else:
        domain, confidence = classify_domain(prompt)

    self._domain_hits[domain] = self._domain_hits.get(domain, 0) + 1

    # Step 2: Load bias vector
    bias_map = self.fingerprints.get_bias_map(domain)

    # Step 3: Select system prompt
    system = CHASSIS_SYSTEM_PROMPTS.get(domain, FALLBACK_SYSTEM)

    # Step 4: Generate with biases
    try:
        response = self.ollama.generate(
            model=self.chassis_model,
            prompt=prompt,
            system=system,
            temperature=temperature,
            max_tokens=max_tokens,
            logit_bias=bias_map if bias_map else None,
        )
    except RuntimeError as e:
        # Fallback: retry without biases if biased generation fails
        print(f"[hoffgraft] Biased generation failed: {e}. Retrying without biases.", file=sys.stderr)
        response = self.ollama.generate(
            model=self.chassis_model,
            prompt=prompt,
            system=system,
            temperature=temperature,
            max_tokens=max_tokens,
            logit_bias=None,
        )

    elapsed_ms = int((time.time() - t0) * 1000)

    return {
        "response": response,
        "domain": domain,
        "confidence": round(confidence, 3),
        "bias_count": len(bias_map),
        "time_ms": elapsed_ms,
    }

def get_stats(self) -> dict:
    """Return domain classification statistics."""
    return {
        "chassis_model": self.chassis_model,
        "available_domains": self.fingerprints.available_domains(),
        "domain_hits": dict(self._domain_hits),
    }

── CLI ──────────────────────────────────────────────────────────────────────

def main():
parser = argparse.ArgumentParser(
description="HoffGraft: Steer a chassis model with donor logic biases",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="Example:\n python hoffgraft_steer.py --fingerprints fingerprints/hoffgraft_fingerprints.npz --prompt 'Debug the API outage'",
)
parser.add_argument("--fingerprints", required=True, type=Path, help="Path to fingerprint .npz file")
parser.add_argument("--chassis", default="qwen2.5-coder:14b", help="Chassis model name (default: qwen2.5-coder:14b)")
parser.add_argument("--ollama-host", default="http://127.0.0.1:11434", help="Ollama host (default: localhost:11434)")
parser.add_argument("--prompt", help="Single prompt to process")
parser.add_argument("--prompt-file", type=Path, help="File containing prompts (one per line)")
parser.add_argument("--temperature", type=float, default=0.7, help="Temperature (default: 0.7)")
parser.add_argument("--max-tokens", type=int, default=1024, help="Max output tokens (default: 1024)")
parser.add_argument("--domain", help="Force a specific domain (for testing)")
parser.add_argument("--verbose", action="store_true", help="Show classification info")
parser.add_argument("--stats", action="store_true", help="Show fingerprint stats and exit")

args = parser.parse_args()

steerer = HoffGraftSteerer(
    fingerprints_path=args.fingerprints,
    chassis_model=args.chassis,
    ollama_host=args.ollama_host,
)

if args.stats:
    stats = steerer.get_stats()
    print(json.dumps(stats, indent=2))
    return

prompts = []
if args.prompt:
    prompts = [args.prompt]
elif args.prompt_file:
    prompts = [line.strip() for line in args.prompt_file.read_text().splitlines() if line.strip()]
else:
    # Interactive mode
    print("HoffGraft interactive mode. Type 'exit' to quit, 'stats' for statistics.")
    print(f"Chassis: {args.chassis}")
    print(f"Domains: {', '.join(steerer.fingerprints.available_domains())}")
    print()
    while True:
        try:
            prompt = input("> ").strip()
        except (EOFError, KeyboardInterrupt):
            print()
            break
        if not prompt:
            continue
        if prompt.lower() == "exit":
            break
        if prompt.lower() == "stats":
            stats = steerer.get_stats()
            print(json.dumps(stats, indent=2))
            continue

        result = steerer.steer(
            prompt,
            temperature=args.temperature,
            max_tokens=args.max_tokens,
            force_domain=args.domain,
        )

        if args.verbose:
            print(f"[domain={result['domain']} confidence={result['confidence']} "
                  f"biases={result['bias_count']} time={result['time_ms']}ms]")

        print(result["response"])
        print()

for prompt in prompts:
    result = steerer.steer(
        prompt,
        temperature=args.temperature,
        max_tokens=args.max_tokens,
        force_domain=args.domain,
    )

    if args.verbose or len(prompts) > 1:
        print(f"[domain={result['domain']} confidence={result['confidence']} "
              f"biases={result['bias_count']} time={result['time_ms']}ms]")

    print(result["response"])
    if len(prompts) > 1:
        print("---")

if name == "main":
main()