"""Local-first LLM client with cloud fallback."""
import os
import json
import logging
from typing import Optional, Dict, Any
import httpx
logger = logging.getLogger(name)
GAMING_PC_URL = os.getenv("GAMING_PC_OLLAMA_URL", "http://100.104.147.116:11434")
CLOUD_URL = os.getenv("CLOUD_OLLAMA_URL", "http://127.0.0.1:11434")
DEFAULT_LOCAL_MODEL = "qwen2.5-coder:7b"
DEFAULT_CLOUD_MODEL = "glm-5.1:cloud"
class LLMClient:
"""Local-first LLM client with automatic cloud fallback."""
def __init__(self, timeout: float = 60.0):
self.timeout = timeout
self.client = httpx.AsyncClient(timeout=timeout)
async def generate(
self,
prompt: str,
model: Optional[str] = None,
system: Optional[str] = None,
temperature: float = 0.7,
format: Optional[str] = None,
prefer_cloud: bool = False
) -> Dict[str, Any]:
"""Generate completion with local-first fallback.
Args:
prompt: The user prompt
model: Model name (uses defaults if not specified)
system: Optional system prompt
temperature: Sampling temperature
format: "json" for structured output
prefer_cloud: Skip local, go straight to cloud
Returns:
Dict with "content", "model_used", "source" keys
"""
if prefer_cloud:
return await self._try_cloud(prompt, model, system, temperature, format)
# Try local first
local_result = await self._try_local(prompt, model, system, temperature, format)
if local_result:
return local_result
# Fallback to cloud
logger.warning("Local LLM unavailable, falling back to cloud")
return await self._try_cloud(prompt, model, system, temperature, format)
async def _try_local(
self,
prompt: str,
model: Optional[str],
system: Optional[str],
temperature: float,
format: Optional[str]
) -> Optional[Dict[str, Any]]:
"""Attempt local Gaming PC inference."""
model = model or DEFAULT_LOCAL_MODEL
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": temperature}
}
if system:
payload["system"] = system
if format == "json":
payload["format"] = "json"
try:
response = await self.client.post(
f"{GAMING_PC_URL}/api/generate",
json=payload
)
response.raise_for_status()
data = response.json()
return {
"content": data.get("response", ""),
"model_used": model,
"source": "local"
}
except Exception as e:
logger.debug(f"Local LLM failed: {e}")
return None
async def _try_cloud(
self,
prompt: str,
model: Optional[str],
system: Optional[str],
temperature: float,
format: Optional[str]
) -> Dict[str, Any]:
"""Attempt cloud fallback inference."""
model = model or DEFAULT_CLOUD_MODEL
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": temperature}
}
if system:
payload["system"] = system
if format == "json":
payload["format"] = "json"
try:
response = await self.client.post(
f"{CLOUD_URL}/api/generate",
json=payload
)
response.raise_for_status()
data = response.json()
return {
"content": data.get("response", ""),
"model_used": model,
"source": "cloud"
}
except Exception as e:
logger.error(f"Cloud LLM also failed: {e}")
raise RuntimeError(f"Both local and cloud LLM unavailable: {e}")
async def health(self) -> Dict[str, bool]:
"""Check LLM endpoint health."""
result = {"local": False, "cloud": False}
try:
response = await self.client.get(f"{GAMING_PC_URL}/api/tags", timeout=5.0)
result["local"] = response.status_code == 200
except:
pass
try:
response = await self.client.get(f"{CLOUD_URL}/api/tags", timeout=5.0)
result["cloud"] = response.status_code == 200
except:
pass
return result
async def close(self):
await self.client.aclose()