"""Local-first LLM client with cloud fallback.""" import os import json import logging from typing import Optional, Dict, Any import httpx logger = logging.getLogger(__name__) GAMING_PC_URL = os.getenv("GAMING_PC_OLLAMA_URL", "http://100.104.147.116:11434") CLOUD_URL = os.getenv("CLOUD_OLLAMA_URL", "http://127.0.0.1:11434") DEFAULT_LOCAL_MODEL = "qwen2.5-coder:7b" DEFAULT_CLOUD_MODEL = "glm-5.1:cloud" class LLMClient: """Local-first LLM client with automatic cloud fallback.""" def __init__(self, timeout: float = 60.0): self.timeout = timeout self.client = httpx.AsyncClient(timeout=timeout) async def generate( self, prompt: str, model: Optional[str] = None, system: Optional[str] = None, temperature: float = 0.7, format: Optional[str] = None, prefer_cloud: bool = False ) -> Dict[str, Any]: """Generate completion with local-first fallback. Args: prompt: The user prompt model: Model name (uses defaults if not specified) system: Optional system prompt temperature: Sampling temperature format: "json" for structured output prefer_cloud: Skip local, go straight to cloud Returns: Dict with "content", "model_used", "source" keys """ if prefer_cloud: return await self._try_cloud(prompt, model, system, temperature, format) # Try local first local_result = await self._try_local(prompt, model, system, temperature, format) if local_result: return local_result # Fallback to cloud logger.warning("Local LLM unavailable, falling back to cloud") return await self._try_cloud(prompt, model, system, temperature, format) async def _try_local( self, prompt: str, model: Optional[str], system: Optional[str], temperature: float, format: Optional[str] ) -> Optional[Dict[str, Any]]: """Attempt local Gaming PC inference.""" model = model or DEFAULT_LOCAL_MODEL payload = { "model": model, "prompt": prompt, "stream": False, "options": {"temperature": temperature} } if system: payload["system"] = system if format == "json": payload["format"] = "json" try: response = await self.client.post( f"{GAMING_PC_URL}/api/generate", json=payload ) response.raise_for_status() data = response.json() return { "content": data.get("response", ""), "model_used": model, "source": "local" } except Exception as e: logger.debug(f"Local LLM failed: {e}") return None async def _try_cloud( self, prompt: str, model: Optional[str], system: Optional[str], temperature: float, format: Optional[str] ) -> Dict[str, Any]: """Attempt cloud fallback inference.""" model = model or DEFAULT_CLOUD_MODEL payload = { "model": model, "prompt": prompt, "stream": False, "options": {"temperature": temperature} } if system: payload["system"] = system if format == "json": payload["format"] = "json" try: response = await self.client.post( f"{CLOUD_URL}/api/generate", json=payload ) response.raise_for_status() data = response.json() return { "content": data.get("response", ""), "model_used": model, "source": "cloud" } except Exception as e: logger.error(f"Cloud LLM also failed: {e}") raise RuntimeError(f"Both local and cloud LLM unavailable: {e}") async def health(self) -> Dict[str, bool]: """Check LLM endpoint health.""" result = {"local": False, "cloud": False} try: response = await self.client.get(f"{GAMING_PC_URL}/api/tags", timeout=5.0) result["local"] = response.status_code == 200 except: pass try: response = await self.client.get(f"{CLOUD_URL}/api/tags", timeout=5.0) result["cloud"] = response.status_code == 200 except: pass return result async def close(self): await self.client.aclose()