📄 llm.py 4,807 bytes Apr 22, 2026 📋 Raw

"""Local-first LLM client with cloud fallback."""

import os
import json
import logging
from typing import Optional, Dict, Any
import httpx

logger = logging.getLogger(name)

GAMING_PC_URL = os.getenv("GAMING_PC_OLLAMA_URL", "http://100.104.147.116:11434")
CLOUD_URL = os.getenv("CLOUD_OLLAMA_URL", "http://127.0.0.1:11434")

DEFAULT_LOCAL_MODEL = "qwen2.5-coder:7b"
DEFAULT_CLOUD_MODEL = "glm-5.1:cloud"

class LLMClient:
"""Local-first LLM client with automatic cloud fallback."""

def __init__(self, timeout: float = 60.0):
    self.timeout = timeout
    self.client = httpx.AsyncClient(timeout=timeout)

async def generate(
    self,
    prompt: str,
    model: Optional[str] = None,
    system: Optional[str] = None,
    temperature: float = 0.7,
    format: Optional[str] = None,
    prefer_cloud: bool = False
) -> Dict[str, Any]:
    """Generate completion with local-first fallback.

    Args:
        prompt: The user prompt
        model: Model name (uses defaults if not specified)
        system: Optional system prompt
        temperature: Sampling temperature
        format: "json" for structured output
        prefer_cloud: Skip local, go straight to cloud

    Returns:
        Dict with "content", "model_used", "source" keys
    """
    if prefer_cloud:
        return await self._try_cloud(prompt, model, system, temperature, format)

    # Try local first
    local_result = await self._try_local(prompt, model, system, temperature, format)
    if local_result:
        return local_result

    # Fallback to cloud
    logger.warning("Local LLM unavailable, falling back to cloud")
    return await self._try_cloud(prompt, model, system, temperature, format)

async def _try_local(
    self,
    prompt: str,
    model: Optional[str],
    system: Optional[str],
    temperature: float,
    format: Optional[str]
) -> Optional[Dict[str, Any]]:
    """Attempt local Gaming PC inference."""
    model = model or DEFAULT_LOCAL_MODEL

    payload = {
        "model": model,
        "prompt": prompt,
        "stream": False,
        "options": {"temperature": temperature}
    }
    if system:
        payload["system"] = system
    if format == "json":
        payload["format"] = "json"

    try:
        response = await self.client.post(
            f"{GAMING_PC_URL}/api/generate",
            json=payload
        )
        response.raise_for_status()
        data = response.json()

        return {
            "content": data.get("response", ""),
            "model_used": model,
            "source": "local"
        }
    except Exception as e:
        logger.debug(f"Local LLM failed: {e}")
        return None

async def _try_cloud(
    self,
    prompt: str,
    model: Optional[str],
    system: Optional[str],
    temperature: float,
    format: Optional[str]
) -> Dict[str, Any]:
    """Attempt cloud fallback inference."""
    model = model or DEFAULT_CLOUD_MODEL

    payload = {
        "model": model,
        "prompt": prompt,
        "stream": False,
        "options": {"temperature": temperature}
    }
    if system:
        payload["system"] = system
    if format == "json":
        payload["format"] = "json"

    try:
        response = await self.client.post(
            f"{CLOUD_URL}/api/generate",
            json=payload
        )
        response.raise_for_status()
        data = response.json()

        return {
            "content": data.get("response", ""),
            "model_used": model,
            "source": "cloud"
        }
    except Exception as e:
        logger.error(f"Cloud LLM also failed: {e}")
        raise RuntimeError(f"Both local and cloud LLM unavailable: {e}")

async def health(self) -> Dict[str, bool]:
    """Check LLM endpoint health."""
    result = {"local": False, "cloud": False}

    try:
        response = await self.client.get(f"{GAMING_PC_URL}/api/tags", timeout=5.0)
        result["local"] = response.status_code == 200
    except:
        pass

    try:
        response = await self.client.get(f"{CLOUD_URL}/api/tags", timeout=5.0)
        result["cloud"] = response.status_code == 200
    except:
        pass

    return result

async def close(self):
    await self.client.aclose()