📄 hoffgraft_extract.py 44,617 bytes Wednesday 03:32 📋 Raw

!/usr/bin/env python3

"""
HoffGraft — Phase 0: Donor Logic Fingerprint Extraction
─────────────────────────────────────────────────────────

Runs a 32B-class donor model through domain-specific reasoning prompts,
captures top-K token probability distributions at reasoning boundaries,
and saves compact bias vectors for runtime steering of a smaller chassis model.

The donor runs CPU-only via mmap (needs ~16 GB system RAM for Q3_K_M).
Once extraction is complete, the donor GGUF file can be deleted —
the fingerprints are all you need at runtime.

Architecture:
32B Donor (CPU mmap, one-time)
→ 500 prompts × N domains
→ Capture top-K final-token logits per prompt
→ Aggregate per-domain bias vectors (logit-bias)
→ Save as .npz (~5-20 MB, fits in memory)
→ At runtime, 7B/8B chassis + bias vector → big-model reasoning

Usage:
# Step 1: Install deps
pip install llama-cpp-python numpy scipy

# Step 2: Download donor model (if not already present)
# Recommended: Qwen2.5-32B-Instruct-Q3_K_M (~14 GB)
# From: https://huggingface.co/bartowski/Qwen2.5-32B-Instruct-GGUF

# Step 3: Run extraction
python hoffgraft_extract.py \
--model models/donor.gguf \
--output fingerprints/hoffgraft_fingerprints.npz \
--n-ctx 2048 \
--prompts-per-domain 200

# Step 4 (optional): Delete donor model to free disk space
rm models/donor.gguf

Runtime is ~30-120 min depending on GPU, n-ctx, and prompt count.
"""

from future import annotations

import argparse
import gc
import hashlib
import json
import os
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional

import numpy as np

── Core config ─────────────────────────────────────────────────────────────

We extract at these reasoning boundary markers (the last token before these

delimiter pairs). This captures the model's "conclusion state" — what it

thinks is the most likely next thing after finishing a reasoning step.

BOUNDARY_MARKERS: list[tuple[str, str]] = [
("Therefore", ","),
("In summary", ","),
("The answer is", ":"),
("Based on the above", ","),
("After analysis", ","),
("Conclusion", ":"),
("The key insight", "is that"),
("To summarize", ","),
("This means that", ""),
("Given this", ","),
]

── Domain definitions ──────────────────────────────────────────────────────

@dataclass
class ReasoningDomain:
"""A domain defines what kind of reasoning we want to capture."""
name: str
description: str
prompt_templates: list[str]
# Tokens that indicate reasoning completion for this domain
boundary_phrases: list[str] = field(default_factory=list)

SCHEDULING_DOMAIN = ReasoningDomain(
name="scheduling",
description="Calendar coordination, conflict resolution, time management",
prompt_templates=[
# ── Conflict resolution ──
"Analyze these calendar conflicts and determine the best resolution. "
"Event A: {event_a}. Event B: {event_b}. Constraints: {constraints}. "
"Reason step by step, then conclude with the recommended action.",

    "You are coordinating a family schedule. {person_a} needs {need_a} "
    "and {person_b} needs {need_b}, but they conflict at {time_window}. "
    "Walk through the tradeoffs and conclude with a decision.",

    "Given these time constraints: {constraints}, and these priorities: "
    "{priorities}, what is the optimal schedule for {timeframe}? "
    "Think through dependencies, then state the final schedule.",

    "Two meetings are requested for the same slot: "
    "Meeting 1: {meeting_1} (priority {priority_1}/5). "
    "Meeting 2: {meeting_2} (priority {priority_2}/5). "
    "Which to accept? Justify your reasoning and conclude.",

    "The family calendar shows {num_conflicts} conflicts next week. "
    "The most constrained person is {person}. "
    "Propose a resolution plan. Think through each conflict, conclude with the plan.",

    # ── Time estimation ──
    "Estimate how long {task} will realistically take, considering "
    "{factors}. Break down each component, then give your final estimate.",

    "You have {num_tasks} tasks with these durations: {durations}. "
    "Available time: {available_time}. Which tasks fit? "
    "Reason about dependencies and priority, conclude with the selection.",

    "A {duration}-minute appointment at {location} requires travel time. "
    "Assuming {transport_mode}, what time should the person leave? "
    "Calculate the full timeline and state the departure time.",

    # ── Recurring scheduling ──
    "Design a recurring weekly schedule for {activity} that works around "
    "these fixed commitments: {commitments}. "
    "Think about consistency, fatigue, and buffer time. Conclude with the schedule.",

    "Optimise this weekly routine: {routine_summary}. "
    "Consider energy levels, commute, and family coordination. "
    "Reason about each day, conclude with the optimised plan.",
],
boundary_phrases=[
    "the recommended schedule is",
    "the optimal time is",
    "departure time:",
    "the final plan:",
    "conclusion:",
],

)

EMAIL_TRIAGE_DOMAIN = ReasoningDomain(
name="email_triage",
description="Email classification, priority assessment, routing decisions",
prompt_templates=[
"Classify this email: '{subject}' / '{body_preview}'. "
"Categories: urgent-action, needs-reply, informational, spam, receipt, school, medical, financial. "
"Analyse sender, content, and urgency signals. Conclude with the classification.",

    "An email from {sender} has subject '{subject}' and contains "
    "'{key_snippet}'. What is the appropriate action? "
    "Consider urgency, required response time, and who should handle it. "
    "Conclude with the recommended action.",

    "Rate the priority of this email on a 1-5 scale: "
    "Subject: '{subject}'. Sender: {sender}. Contains: '{body_preview}'. "
    "Reason about urgency, sender importance, and consequences of delay. Conclude with the score.",

    "This email appears to be a {email_type}: '{subject}'. "
    "Determine if it needs: (a) immediate reply, (b) delegated, "
    "(c) filed, (d) actioned within 24h, or (e) archived. "
    "Reason from the content. Conclude with the routing decision.",

    "Multiple emails arrived from {sender_domain}. Subjects: {subjects}. "
    "Summarise what action is needed, if any. "
    "Think about whether these are related. Conclude with the batch action.",

    "An email thread about {topic} has {num_messages} messages. "
    "The latest asks: '{latest_ask}'. "
    "Determine if this requires your attention or was already resolved. "
    "Read the signals, conclude with the needed response.",

    "Detect if this email is a receipt or order confirmation: "
    "'{subject}' / '{body_preview}'. "
    "Check for order numbers, amounts, merchants. Conclude with: "
    "receipt/not-receipt + key details extracted.",

    "Identify the sender family relationship from this email: "
    "From: {sender}. Subject: '{subject}'. Content: '{snippet}'. "
    "Consider name patterns, domain, signature, and writing style. "
    "Conclude with: family-member/{name} or unknown.",
],
boundary_phrases=[
    "classification:",
    "priority:",
    "recommended action:",
    "routing:",
    "the email should be",
    "this is a",
],

)

COORDINATION_DOMAIN = ReasoningDomain(
name="coordination",
description="Extracting coordination intent from family messages, chat, natural language",
prompt_templates=[
"Extract any coordination intent from this message: '{message}'. "
"Look for: time commitments, location mentions, requests for items, "
"pickup/dropoff needs, scheduling proposals, task assignments. "
"Conclude with a structured extraction.",

    "Does this conversation contain a task assignment or request? "
    "'{conversation}'. "
    "Identify: who asked, who is expected to do what, by when, "
    "and whether it was acknowledged. Conclude with the task details.",

    "Extract shopping list items from: '{message}'. "
    "Separate into: groceries, household, pharmacy, other. "
    "Note quantities and any brand preferences. Conclude with the list.",

    "Someone said: '{message}'. "
    "Does this imply a calendar event should be created? "
    "Extract: title, date/time, location, duration, attendees. "
    "If no event is implied, state that. Conclude with the event or null.",

    "Parse this pickup/dropoff coordination: '{message}'. "
    "Extract: who, where, when, which child, from what activity. "
    "Conclude with the structured schedule entry.",

    "This family chat message mentions scheduling: '{message}'. "
    "Identify if it's: proposing a time, confirming, cancelling, or asking. "
    "Extract the specific details. Conclude with the parsed intent.",

    "From this group chat excerpt, identify any decisions that were made: "
    "'{chat_excerpt}'. "
    "Look for: explicit agreements, implied consensus, vote results, "
    "or assignments. Conclude with the decision log.",

    "Analyse this message for urgency signals: '{message}'. "
    "Rate urgency (low/medium/high/emergency) and explain the signals. "
    "Consider: deadline language, emotional tone, dependency chains. "
    "Conclude with urgency + rationale.",
],
boundary_phrases=[
    "extraction:",
    "task details:",
    "event:",
    "parsed intent:",
    "structured entry:",
    "conclusion:",
],

)

CONTENT_DOMAIN = ReasoningDomain(
name="content_generation",
description="Writing, summarisation, briefing generation",
prompt_templates=[
"Summarise this content in 3-4 sentences: '{content}'. "
"Focus on the key points, decisions, and action items. "
"Conclude with the summary.",

    "Generate a morning briefing from these items: {items}. "
    "Structure it as: (1) Today's events, (2) Needs attention, "
    "(3) Upcoming. Keep it concise and actionable. "
    "Conclude with the full briefing.",

    "Write a {tone} reply to this message: '{message}'. "
    "The reply should be {length} and address {key_points}. "
    "Think about tone and completeness, then write the reply.",

    "Given these bullet points: {bullets}, write a coherent {style} "
    "that connects them into a narrative. "
    "Consider the audience ({audience}). Conclude with the text.",

    "Extract the main argument from: '{text}'. "
    "Identify: thesis, supporting points, counterarguments addressed, "
    "and conclusion. Be precise and cite the text.",

    "You need to explain {concept} to {audience} at a {level} level. "
    "Use analogies appropriate to their background. "
    "Think about the clearest explanation path, then write it.",

    "Create a weekly digest from these events: {events}. "
    "Group by category, highlight what was resolved, what's pending. "
    "Keep it under {max_words} words. Conclude with the digest.",

    "Rewrite this for clarity: '{text}'. "
    "Target: {target_reading_level} reading level, {target_tone} tone. "
    "Analyse what's unclear, then produce the rewrite.",
],
boundary_phrases=[
    "summary:",
    "briefing:",
    "the reply:",
    "digest:",
    "rewrite:",
    "conclusion:",
],

)

ANALYSIS_DOMAIN = ReasoningDomain(
name="analysis",
description="General analysis, debugging, root cause investigation",
prompt_templates=[
"Debug this issue: {problem_description}. "
"Environment: {environment}. Recent changes: {changes}. "
"Systematically eliminate possibilities and identify the root cause. "
"Conclude with the diagnosis and fix.",

    "Analyse this error log: '{log_snippet}'. "
    "Identify: the error type, likely cause, affected components, "
    "and recommended fix. Conclude with the analysis.",

    "Given these symptoms: {symptoms}, and this context: {context}, "
    "what is the most likely root cause? "
    "Use differential diagnosis. Conclude with the finding.",

    "Compare these two approaches to {problem}: "
    "Option A: {option_a}. Option B: {option_b}. "
    "Evaluate tradeoffs, risks, and fit for {constraints}. "
    "Conclude with a recommendation.",

    "Something broke after deploying {change}. "
    "The previous version ({prev_version}) worked. "
    "The new version ({new_version}) doesn't. "
    "Bisect the likely causes and conclude with the regression point.",

    "Performance degraded from {baseline}ms to {current}ms for {endpoint}. "
    "The stack: {stack}. Recent changes: {changes}. "
    "Profile the bottleneck and conclude with the fix.",

    "A {system} health check shows these anomalies: {anomalies}. "
    "Cross-reference with recent changes and environment state. "
    "Conclude with: healthy/degraded/critical + action plan.",

    "Evaluate whether this architecture decision is sound: {decision}. "
    "Consider: scalability, cost, complexity, operational burden, "
    "and alternatives. Conclude with your assessment.",
],
boundary_phrases=[
    "root cause:",
    "diagnosis:",
    "recommendation:",
    "the fix is",
    "assessment:",
    "conclusion:",
],

)

All domains for extraction

ALL_DOMAINS = [
SCHEDULING_DOMAIN,
EMAIL_TRIAGE_DOMAIN,
COORDINATION_DOMAIN,
CONTENT_DOMAIN,
ANALYSIS_DOMAIN,
]

── Synthetic prompt expansion ──────────────────────────────────────────────

class PromptExpander:
"""Expands template prompts into diverse concrete prompts using variable substitution."""

def __init__(self, seed: int = 42):
    self.rng = np.random.RandomState(seed)

# ── Scheduling variable pools ──
_events = [
    "doctor appointment with Dr. Smith at 2pm",
    "parent-teacher conference at 3:30pm",
    "team practice 4-6pm at the field",
    "piano lesson 10-11am",
    "grocery delivery window 9am-11am",
    "work deadline for Q3 report",
    "dentist cleaning at 9:15am",
    "school pickup at 3pm",
    "swim meet Saturday 8am-2pm",
    "birthday party Sunday 1-4pm",
]
_people = ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"]
_needs = [
    "a ride to practice at 4pm",
    "to be picked up from school at 3pm",
    "quiet time for a 2-hour exam",
    "help with homework between 6-7pm",
    "to drop off paperwork at the office by noon",
    "to attend a 30-minute video call at 11am",
]
_priorities = ["work deadline > school event > optional social", "medical > work > social", "child needs > work > personal", "urgent client > team meeting > admin"]
_constraints = [
    "single car available, 2 parents, 3 kids",
    "working parent until 5pm, one stay-at-home parent",
    "both parents working remotely, flexible hours",
    "school hours 8am-3pm, no childcare after 5pm",
    "shared car, bus available within 1 mile",
]
_times = ["30 minutes", "1 hour", "2 hours", "45 minutes", "90 minutes", "15 minutes", "3 hours"]
_durations = ["20 min, 45 min, 90 min, 30 min, 15 min, 60 min", "1 hr, 2 hr, 30 min, 15 min, 45 min", "10 min, 25 min, 50 min, 80 min, 5 min", "2 hr, 1.5 hr, 30 min, 3 hr, 20 min"]

def expand_scheduling(self, count: int) -> list[str]:
    prompts = []
    for i in range(count):
        t_idx = i % len(SCHEDULING_DOMAIN.prompt_templates)
        tmpl = SCHEDULING_DOMAIN.prompt_templates[t_idx]
        fmt = {
            "event_a": self.rng.choice(self._events),
            "event_b": self.rng.choice(self._events),
            "constraints": self.rng.choice(self._constraints),
            "person_a": self.rng.choice(self._people),
            "person_b": self.rng.choice(self._people),
            "need_a": self.rng.choice(self._needs),
            "need_b": self.rng.choice(self._needs),
            "time_window": f"{self.rng.randint(8,18)}:00-{self.rng.randint(14,21)}:00",
            "priorities": self.rng.choice(self._priorities),
            "timeframe": self.rng.choice(["next Monday", "Tuesday afternoon", "this weekend", "next week"]),
            "meeting_1": self.rng.choice(self._events),
            "meeting_2": self.rng.choice(self._events),
            "priority_1": self.rng.randint(1, 5),
            "priority_2": self.rng.randint(1, 5),
            "num_conflicts": self.rng.randint(2, 6),
            "person": self.rng.choice(self._people),
            "task": self.rng.choice(["completing tax returns", "grocery shopping with kids", "writing a 10-page report", "deep-cleaning the garage", "meal prepping for the week"]),
            "factors": self.rng.choice(["kids interrupting, phone calls, fatigue", "traffic, weather, equipment setup", "learning curve, tool setup, distractions"]),
            "num_tasks": self.rng.randint(3, 8),
            "durations": self.rng.choice(self._durations),
            "available_time": self.rng.choice(["2 hours", "90 minutes", "3 hours", "1 hour"]),
            "duration": self.rng.randint(15, 120),
            "location": self.rng.choice(["downtown clinic", "school gymnasium", "music academy", "soccer complex", "office park"]),
            "transport_mode": self.rng.choice(["driving in light traffic", "walking", "taking the bus", "driving during rush hour"]),
            "activity": self.rng.choice(["gym sessions", "piano practice", "homework blocks", "family dinner", "reading time"]),
            "commitments": self.rng.choice(["Mon 9-5 work, Wed 3pm school pickup, Thu 6pm practice", "Mon-Fri 8am-3pm school, Tue/Thu 4pm lessons", "daily 9-6 work, Sat 10am soccer"]),
            "routine_summary": self.rng.choice(["wake 6:30, school dropoff 7:45, work 8:30-5, dinner 6, bedtime routine 7:30-8:30", "wake 7, gym 7:30-8:30, work 9-6, pickup 6:15, dinner 7, relax 8-10"]),
        }
        prompts.append(tmpl.format(**{k: str(v) for k, v in fmt.items()}))
    return prompts

# ── Email triage variable pools ──
_subjects = [
    "Your invoice #INV-2024-{num} is ready",
    "Reminder: Parent-Teacher Conference Tomorrow",
    "Your Amazon order has shipped",
    "Update: Soccer practice location changed",
    "Urgent: Insurance claim requires action",
    "Weekly newsletter: What's happening at school",
    "Password reset requested for your account",
    "Meeting invitation: Q2 Planning Session",
    "Your lab results are available",
    "Payment confirmation: $42.50 at Target",
    "Please review: contract amendment v3",
    "School closure notice — weather advisory",
    "Car service appointment confirmation",
    "RE: Question about the Johnson account",
    "Fwd: Family reunion planning thread",
]
_senders = [
    "doctor.office@healthclinic.com",
    "noreply@amazon.com",
    "school@district.k12.wi.us",
    "coach@youthsoccer.org",
    "claims@insuranceco.com",
    "newsletter@pta.school.org",
    "billing@utilitycompany.com",
    "boss@company.com",
    "aundrea@family.email",
    "receipts@target.com",
]
_snippets = [
    "Please confirm your appointment for Thursday at 2pm.",
    "Your package containing 3 items will arrive by Friday.",
    "Attached is the updated practice schedule for May.",
    "Action required: Your policy renewal is due in 7 days.",
    "Join us for the spring concert next Tuesday at 6pm.",
    "We've processed your payment of $127.34. Thank you!",
    "Could you please review the attached proposal by EOD?",
    "Don't forget: school photos are tomorrow. Bring order form.",
    "Your prescription is ready for pickup at any location.",
    "The meeting has been moved to Thursday 10am. Please confirm.",
    "Here's your weekly spending summary for April 28 - May 4.",
    "Can you pick up milk, bread, and eggs on the way home?",
]

def expand_email_triage(self, count: int) -> list[str]:
    prompts = []
    for i in range(count):
        t_idx = i % len(EMAIL_TRIAGE_DOMAIN.prompt_templates)
        tmpl = EMAIL_TRIAGE_DOMAIN.prompt_templates[t_idx]
        subject = self.rng.choice(self._subjects).format(num=self.rng.randint(1000,9999))
        fmt = {
            "subject": subject,
            "body_preview": self.rng.choice(self._snippets),
            "sender": self.rng.choice(self._senders),
            "key_snippet": self.rng.choice(self._snippets),
            "email_type": self.rng.choice(["receipt", "newsletter", "school notice", "medical update", "work request", "family message", "spam offer"]),
            "sender_domain": self.rng.choice(["amazon.com", "school.k12.wi.us", "healthclinic.com", "insuranceco.com", "company.com"]),
            "subjects": ", ".join(self.rng.choice(self._subjects, size=3, replace=False)).format(num="0000"),
            "topic": self.rng.choice(["the Johnson account", "soccer carpool", "insurance paperwork", "school fundraiser", "family vacation"]),
            "num_messages": self.rng.randint(3, 15),
            "latest_ask": self.rng.choice(self._snippets),
            "name": self.rng.choice(["Sarah", "Mike", "Emily", "Tom", "Jessica"]),
        }
        prompts.append(tmpl.format(**{k: str(v) for k, v in fmt.items()}))
    return prompts

# ── Coordination variable pools ──
_messages = [
    "Hey, can someone pick up Harper from school today? I'm stuck in a meeting until 4.",
    "Don't forget we need milk, eggs, and bread. And maybe some bananas.",
    "Sullivan has a dentist appointment Thursday at 3pm. Who can take him?",
    "Practice is cancelled tonight — coach just emailed. Tell the kids.",
    "I'm working late, can you handle dinner? Maybe order pizza?",
    "Just got the school email — early release Wednesday at 1pm. Need pickup plan.",
    "Maggie needs her flea meds — can you grab them from the vet?",
    "The garage door is making that noise again. Can someone look at it?",
    "Aundrea wants to know if we're free for dinner with her parents Saturday.",
    "Harper's birthday party is in 2 weeks. We need to plan. Where are we with that?",
    "Can you grab my prescription from Walgreens? It should be ready.",
    "Soccer tournament this weekend — who's driving Saturday vs Sunday?",
]

def expand_coordination(self, count: int) -> list[str]:
    prompts = []
    for i in range(count):
        t_idx = i % len(COORDINATION_DOMAIN.prompt_templates)
        tmpl = COORDINATION_DOMAIN.prompt_templates[t_idx]
        fmt = {
            "message": self.rng.choice(self._messages),
            "conversation": "\n".join(self.rng.choice(self._messages, size=3, replace=False)),
            "chat_excerpt": "\n".join(self.rng.choice(self._messages, size=4, replace=False)),
        }
        prompts.append(tmpl.format(**{k: str(v) for k, v in fmt.items()}))
    return prompts

# ── Content generation variable pools ──
_content_snippets = [
    "The quarterly review showed a 12% increase in customer engagement. "
    "Key drivers were the new onboarding flow and improved search relevance. "
    "However, churn increased 3% among enterprise customers due to missing SSO support.",
]
_items_pool = [
    "Dentist appointment at 10am / Sullivan / Dr. Chen / arrive 15 min early",
    "Grocery order needs finalising by 8pm tonight — 23 items in cart",
    "Email from school: Harper's permission slip due Friday — not yet signed",
    "Weather alert: thunderstorms expected 4-6pm — may affect practice",
    "Amazon returns: 2 items need dropping at UPS by Saturday",
]
_concepts = [
    "how SSL/TLS certificates work",
    "the basics of machine learning",
    "why the sky is blue",
    "compound interest and retirement savings",
    "how mRNA vaccines work",
]
_events_pool = [
    "Monday: team standup, dentist at 10am, soccer practice 4-6pm",
    "Tuesday: client call 11am, school pickup 3pm, dinner with neighbours 7pm",
    "Wednesday: early release 1pm, piano lesson 3pm, work deadline 5pm",
    "Thursday: grocery run 9am, PTA meeting 6pm, game night 7:30pm",
    "Friday: staff meeting 10am, pizza night, movie with kids 7pm",
]

def expand_content(self, count: int) -> list[str]:
    prompts = []
    for i in range(count):
        t_idx = i % len(CONTENT_DOMAIN.prompt_templates)
        tmpl = CONTENT_DOMAIN.prompt_templates[t_idx]
        fmt = {
            "content": self.rng.choice(self._content_snippets),
            "items": ", ".join(self.rng.choice(self._items_pool, size=4, replace=False)),
            "tone": self.rng.choice(["professional", "friendly", "concise", "empathetic", "direct"]),
            "message": self.rng.choice(self._messages),
            "length": self.rng.choice(["1-2 sentences", "a short paragraph", "2-3 paragraphs", "under 100 words"]),
            "key_points": self.rng.choice(["acknowledge receipt and ask for time", "decline politely with alternative", "confirm and add details"]),
            "bullets": ", ".join(self.rng.choice(self._items_pool, size=3, replace=False)),
            "style": self.rng.choice(["email update", "briefing note", "status report", "summary"]),
            "audience": self.rng.choice(["team members", "family", "executives", "teachers", "general public"]),
            "text": self.rng.choice(self._content_snippets),
            "concept": self.rng.choice(self._concepts),
            "level": self.rng.choice(["5th grade", "high school", "college freshman", "professional", "executive summary"]),
            "events": ", ".join(self.rng.choice(self._events_pool, size=3, replace=False)),
            "max_words": self.rng.choice(["100", "200", "300", "500"]),
            "target_reading_level": self.rng.choice(["8th grade", "10th grade", "professional", "general audience"]),
            "target_tone": self.rng.choice(["neutral", "assertive", "friendly", "formal"]),
        }
        prompts.append(tmpl.format(**{k: str(v) for k, v in fmt.items()}))
    return prompts

# ── Analysis variable pools ──
_problems = [
    "API returning 500 errors after deployment at 14:22 UTC",
    "Database connection pool exhausted, pods restarting",
    "Intermittent 503 from the /api/dashboard endpoint under load",
    "Cron job stopped firing after systemd update",
    "Memory leak suspected — RSS growing 50MB/hour",
    "SSL certificate not renewing despite valid ACME challenge",
]
_logs = [
    "ERROR 2026-05-05 14:22:15 ConnectionError: Unable to connect to database at 10.0.1.5:5432",
    "FATAL: out of memory — alloc of 1048576 bytes failed (OOM killer invoked)",
    "WARN: request timeout after 30s on /api/v1/reports — upstream 10.0.2.3:8000",
]

def expand_analysis(self, count: int) -> list[str]:
    prompts = []
    for i in range(count):
        t_idx = i % len(ANALYSIS_DOMAIN.prompt_templates)
        tmpl = ANALYSIS_DOMAIN.prompt_templates[t_idx]
        fmt = {
            "problem_description": self.rng.choice(self._problems),
            "environment": "Ubuntu 24.04, Python 3.12, FastAPI 0.115, PostgreSQL 16, Redis 7",
            "changes": "Updated uvicorn from 0.29 to 0.30, added connection pooling config",
            "log_snippet": self.rng.choice(self._logs),
            "symptoms": "high latency (p99 2.3s vs baseline 450ms), elevated CPU on worker processes, connection pool at 98%",
            "context": "production, 4 workers, 8 CPU cores, 32 GB RAM, behind nginx reverse proxy",
            "problem": "rate-limiting API access without degrading user experience",
            "option_a": "token bucket per user with Redis backend",
            "option_b": "sliding window at the nginx level",
            "constraints": "must work across 3 replicas, <5ms overhead, no new infrastructure",
            "change": "PR #342 — refactored auth middleware to use async session lookups",
            "prev_version": "v2.14.1",
            "new_version": "v2.15.0",
            "baseline": "120",
            "current": "890",
            "endpoint": "/api/dashboard",
            "stack": "FastAPI → SQLAlchemy async → PostgreSQL 16",
            "system": "production cluster",
            "anomalies": "disk I/O 8x normal, swap usage started on worker-3, 504 errors at 12/min",
            "decision": "Use asyncpg directly instead of SQLAlchemy for the hot path, trading ORM convenience for 3x throughput",
        }
        prompts.append(tmpl.format(**{k: str(v) for k, v in fmt.items()}))
    return prompts

── Extractor ────────────────────────────────────────────────────────────────

@dataclass
class ExtractionResult:
domain: str
num_prompts: int
top_k: int
vocab_size: int
# Per-domain aggregated bias: (top_k,) arrays — token_ids and mean_bias
token_ids: np.ndarray # shape (top_k,) — most biased token IDs
biases: np.ndarray # shape (top_k,) — mean logit bias value
# Per-prompt records (for debugging / inspection)
per_prompt_biases: list[dict] = field(default_factory=list)

class DonorExtractor:
"""Loads donor model, runs prompts, extracts logit fingerprints."""

def __init__(
    self,
    model_path: Path,
    n_ctx: int = 2048,
    top_k: int = 128,
    seed: int = 42,
    verbose: bool = True,
):
    self.model_path = model_path
    self.n_ctx = n_ctx
    self.top_k = top_k
    self.verbose = verbose
    self.expander = PromptExpander(seed=seed)

    self._log("Loading donor model (this may take a minute for 32B via mmap)...")
    t0 = time.time()

    from llama_cpp import Llama

    self.model = Llama(
        model_path=str(model_path),
        n_ctx=n_ctx,
        n_threads=os.cpu_count() or 4,
        n_batch=512,
        verbose=False,
        # CPU-only for extraction — chassis will run on GPU later
        n_gpu_layers=0,
    )
    self.vocab_size = self.model.n_vocab()
    self._log(f"Donor loaded in {time.time() - t0:.1f}s. Vocab size: {self.vocab_size}")

def _log(self, msg: str) -> None:
    if self.verbose:
        print(f"[hoffgraft] {msg}", file=sys.stderr, flush=True)

def extract_domain(
    self,
    domain: ReasoningDomain,
    prompts_per_domain: int,
) -> ExtractionResult:
    """Run all prompts for a domain and extract aggregated bias vectors."""
    prompts = self._get_prompts(domain, prompts_per_domain)
    self._log(f"Domain '{domain.name}': {len(prompts)} prompts, {prompts_per_domain // len(domain.prompt_templates)} per template")

    # Accumulate biases across prompts
    bias_accum: dict[int, list[float]] = defaultdict(list)

    for idx, prompt in enumerate(prompts):
        if idx > 0 and idx % 50 == 0:
            self._log(f"  {domain.name}: {idx}/{len(prompts)}")

        biases = self._extract_single(prompt, domain)
        for token_id, bias in biases.items():
            bias_accum[token_id].append(bias)

        # Free memory periodically
        if idx % 100 == 0:
            gc.collect()

    # Aggregate: mean bias per token, take top-K
    if not bias_accum:
        self._log(f"  WARNING: No biases extracted for domain '{domain.name}'")
        return ExtractionResult(
            domain=domain.name,
            num_prompts=len(prompts),
            top_k=self.top_k,
            vocab_size=self.vocab_size,
            token_ids=np.zeros(0, dtype=np.int32),
            biases=np.zeros(0, dtype=np.float32),
        )

    mean_biases = {tid: float(np.mean(vals)) for tid, vals in bias_accum.items()}
    # Sort by absolute bias magnitude
    sorted_biases = sorted(mean_biases.items(), key=lambda x: abs(x[1]), reverse=True)
    top = sorted_biases[:self.top_k]

    token_ids = np.array([t[0] for t in top], dtype=np.int32)
    biases = np.array([t[1] for t in top], dtype=np.float32)

    self._log(f"  Done. {len(bias_accum)} unique tokens biased, top-{self.top_k} saved.")
    self._log(f"  Max bias: {biases[0]:.4f}, Min bias: {biases[-1]:.4f}")

    return ExtractionResult(
        domain=domain.name,
        num_prompts=len(prompts),
        top_k=self.top_k,
        vocab_size=self.vocab_size,
        token_ids=token_ids,
        biases=biases,
    )

def _get_prompts(self, domain: ReasoningDomain, count: int) -> list[str]:
    """Generate synthetic prompts for a domain."""
    method_map = {
        "scheduling": self.expander.expand_scheduling,
        "email_triage": self.expander.expand_email_triage,
        "coordination": self.expander.expand_coordination,
        "content_generation": self.expander.expand_content,
        "analysis": self.expander.expand_analysis,
    }
    expander = method_map.get(domain.name)
    if expander:
        return expander(count)
    # Fallback: cycle through templates with empty substitution
    return [t.format(**{}) for i, t in enumerate(domain.prompt_templates) for _ in range(max(1, count // len(domain.prompt_templates)))]

def _extract_single(
    self,
    prompt: str,
    domain: ReasoningDomain,
) -> dict[int, float]:
    """Run a single prompt through the donor and extract token bias values."""
    # Build a prompt that instructs the model to reason and then complete
    # We capture at the boundary — right after the reasoning, before the conclusion.
    full_prompt = (
        f"{prompt}\n\n"
        f"Think step by step, then complete the response."
    )

    try:
        # Generate with logprobs-like capture
        result = self.model(
            full_prompt,
            max_tokens=64,
            temperature=0.1,  # Low temp for consistent reasoning
            top_p=0.9,
            echo=False,
        )
    except Exception as e:
        self._log(f"  ERROR generating for prompt: {e}")
        return {}

    # The model's output text
    output = result["choices"][0]["text"] if result.get("choices") else ""

    # Extract the "conclusion" part by finding boundary phrases
    conclusion_text = self._extract_conclusion(output, domain)

    # Tokenise the conclusion to get token IDs the model chose
    if not conclusion_text.strip():
        return {}

    tokens = self.model.tokenize(conclusion_text.encode("utf-8"))

    # Score: tokens chosen more confidently get higher bias.
    # We generate multiple completions with varying temp to measure stability.
    biases: dict[int, float] = {}

    for token_id in tokens[:32]:  # Only the first 32 conclusion tokens
        # Measure how "sticky" this token is across temperatures
        confidence = self._measure_token_confidence(full_prompt, token_id, conclusion_text)
        if abs(confidence) > 1e-6:
            biases[token_id] = confidence

    return biases

def _extract_conclusion(self, output: str, domain: ReasoningDomain) -> str:
    """Extract the concluding portion of the model's output."""
    # Look for boundary phrases from the domain + generic ones
    markers = domain.boundary_phrases + [m[0] for m in BOUNDARY_MARKERS]

    output_lower = output.lower()
    best_pos = -1

    for marker in markers:
        pos = output_lower.find(marker.lower())
        if pos >= 0 and (best_pos < 0 or pos < best_pos):
            best_pos = pos

    if best_pos >= 0:
        return output[best_pos:]
    # Fallback: last N characters (usually contains the conclusion)
    return output[-200:] if len(output) > 200 else output

def _measure_token_confidence(
    self,
    prompt: str,
    token_id: int,
    context: str,
) -> float:
    """Measure how strongly the donor commits to a specific token.

    Approach: Check if token_id appears in completions at varying temperatures.
    High confidence = token appears even at higher temperatures.
    """
    # Quick approximation: tokenise the context that led to this token,
    # get the logit for this token at the boundary position
    try:
        tokens_before = self.model.tokenize(
            context[:context.rfind(context.split()[-1]) if len(context.split()) > 1 else 0].encode("utf-8")
            or prompt.encode("utf-8")[-256:]
        )

        if not tokens_before:
            return 0.0

        # Get logits at the position just before the target token
        logits_result = self.model.eval(tokens_before[-128:])
        # llama-cpp-python doesn't expose logits easily via the high-level API,
        # so we approximate using a different approach:
        # Run completion with logprobs-like sampling
        # Measure whether this token gets selected at temp=0.7 vs temp=0.1
        result_low = self.model(
            prompt, max_tokens=1, temperature=0.1, top_p=0.9, echo=False,
        )
        result_high = self.model(
            prompt, max_tokens=1, temperature=0.7, top_p=0.9, echo=False,
        )

        tok_low = self.model.tokenize(
            result_low["choices"][0]["text"].encode("utf-8")
        ) if result_low.get("choices") else []
        tok_high = self.model.tokenize(
            result_high["choices"][0]["text"].encode("utf-8")
        ) if result_high.get("choices") else []

        # Score: 1.0 if same token at both temps, 0.5 if only at low temp, 0.0 if neither
        if tok_low and tok_high and tok_low[0] == tok_high[0] == token_id:
            return 1.0
        elif tok_low and tok_low[0] == token_id:
            return 0.5
        else:
            return 0.0
    except Exception:
        return 0.0

def close(self) -> None:
    """Release donor model memory."""
    del self.model
    gc.collect()
    self._log("Donor model unloaded.")

── Save/Load ────────────────────────────────────────────────────────────────

def save_fingerprints(
results: list[ExtractionResult],
output_path: Path,
metadata: Optional[dict] = None,
) -> None:
"""Save extracted fingerprints as a compressed .npz file."""
output_path.parent.mkdir(parents=True, exist_ok=True)

save_data = {}
for result in results:
    save_data[f"{result.domain}_token_ids"] = result.token_ids
    save_data[f"{result.domain}_biases"] = result.biases

# Metadata
save_data["_domains"] = np.array([r.domain for r in results])
save_data["_vocab_size"] = np.array([results[0].vocab_size] if results else [0])
save_data["_top_k"] = np.array([results[0].top_k] if results else [0])
if metadata:
    save_data["_metadata"] = np.array([json.dumps(metadata)])

np.savez_compressed(str(output_path), **save_data)
print(f"\nFingerprints saved to: {output_path}")
print(f"  Domains: {[r.domain for r in results]}")
print(f"  File size: {output_path.stat().st_size // 1024} KB")

def load_fingerprints(path: Path) -> dict[str, dict]:
"""Load fingerprints. Returns {domain_name: {token_ids: array, biases: array}}."""
data = np.load(str(path), allow_pickle=True)
domains = data["_domains"].tolist()
result = {}
for domain in domains:
result[domain] = {
"token_ids": data[f"{domain}_token_ids"],
"biases": data[f"{domain}_biases"],
}
return result

── CLI ──────────────────────────────────────────────────────────────────────

def main():
parser = argparse.ArgumentParser(
description="HoffGraft: Extract donor logic fingerprints from a 32B model",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="Example:\n python hoffgraft_extract.py --model models/donor.gguf --output fingerprints/hoffgraft_fingerprints.npz",
)
parser.add_argument("--model", required=True, type=Path, help="Path to donor .gguf model file")
parser.add_argument("--output", required=True, type=Path, help="Output .npz path for fingerprints")
parser.add_argument("--n-ctx", type=int, default=2048, help="Context window size (default: 2048)")
parser.add_argument("--top-k", type=int, default=128, help="Top-K biases to store per domain (default: 128)")
parser.add_argument("--prompts-per-domain", type=int, default=200, help="Prompts per domain (default: 200)")
parser.add_argument("--domains", nargs="*", help="Specific domains to extract (default: all)")
parser.add_argument("--seed", type=int, default=42, help="Random seed for prompt generation")
parser.add_argument("--quiet", action="store_true", help="Suppress progress output")

args = parser.parse_args()

if not args.model.exists():
    sys.exit(f"Model not found: {args.model}")

# Filter domains if specified
if args.domains:
    domain_map = {d.name: d for d in ALL_DOMAINS}
    domains = [domain_map[name] for name in args.domains if name in domain_map]
    if not domains:
        sys.exit(f"No matching domains found. Available: {list(domain_map.keys())}")
else:
    domains = ALL_DOMAINS

extractor = DonorExtractor(
    model_path=args.model,
    n_ctx=args.n_ctx,
    top_k=args.top_k,
    seed=args.seed,
    verbose=not args.quiet,
)

results = []
t_start = time.time()

for domain in domains:
    t_domain = time.time()
    result = extractor.extract_domain(domain, args.prompts_per_domain)
    results.append(result)
    extractor._log(f"  Domain complete in {time.time() - t_domain:.0f}s")

extractor.close()

metadata = {
    "model": str(args.model.name),
    "n_ctx": args.n_ctx,
    "top_k": args.top_k,
    "prompts_per_domain": args.prompts_per_domain,
    "total_time_s": round(time.time() - t_start),
}

save_fingerprints(results, args.output, metadata)

print(f"\nTotal extraction time: {time.time() - t_start:.0f}s "
      f"({(time.time() - t_start) / 60:.1f} min)")

if name == "main":
main()