""" LLM service client (vLLM/OpenAI-compatible). """ import logging from typing import Optional, AsyncIterator import httpx from handler_base.config import LLMSettings from handler_base.telemetry import create_span logger = logging.getLogger(__name__) class LLMClient: """ Client for the LLM service (vLLM with OpenAI-compatible API). Usage: client = LLMClient() response = await client.generate("Hello, how are you?") # With context for RAG response = await client.generate( "What is the capital?", context="France is a country in Europe..." ) # Streaming async for chunk in client.stream("Tell me a story"): print(chunk, end="") """ def __init__(self, settings: Optional[LLMSettings] = None): self.settings = settings or LLMSettings() self._client = httpx.AsyncClient( base_url=self.settings.llm_url, timeout=self.settings.http_timeout, ) async def close(self) -> None: """Close the HTTP client.""" await self._client.aclose() async def generate( self, prompt: str, context: Optional[str] = None, system_prompt: Optional[str] = None, max_tokens: Optional[int] = None, temperature: Optional[float] = None, top_p: Optional[float] = None, stop: Optional[list[str]] = None, ) -> str: """ Generate a response from the LLM. Args: prompt: User prompt/query context: Optional context for RAG system_prompt: Optional system prompt max_tokens: Maximum tokens to generate temperature: Sampling temperature top_p: Top-p sampling stop: Stop sequences Returns: Generated text response """ with create_span("llm.generate") as span: messages = self._build_messages(prompt, context, system_prompt) if span: span.set_attribute("llm.model", self.settings.llm_model) span.set_attribute("llm.prompt_length", len(prompt)) if context: span.set_attribute("llm.context_length", len(context)) payload = { "model": self.settings.llm_model, "messages": messages, "max_tokens": max_tokens or self.settings.llm_max_tokens, "temperature": temperature or self.settings.llm_temperature, "top_p": top_p or self.settings.llm_top_p, } if stop: payload["stop"] = stop response = await self._client.post("/v1/chat/completions", json=payload) response.raise_for_status() result = response.json() content = result["choices"][0]["message"]["content"] if span: span.set_attribute("llm.response_length", len(content)) usage = result.get("usage", {}) span.set_attribute("llm.prompt_tokens", usage.get("prompt_tokens", 0)) span.set_attribute("llm.completion_tokens", usage.get("completion_tokens", 0)) return content async def stream( self, prompt: str, context: Optional[str] = None, system_prompt: Optional[str] = None, max_tokens: Optional[int] = None, temperature: Optional[float] = None, ) -> AsyncIterator[str]: """ Stream a response from the LLM. Args: prompt: User prompt/query context: Optional context for RAG system_prompt: Optional system prompt max_tokens: Maximum tokens to generate temperature: Sampling temperature Yields: Text chunks as they're generated """ messages = self._build_messages(prompt, context, system_prompt) payload = { "model": self.settings.llm_model, "messages": messages, "max_tokens": max_tokens or self.settings.llm_max_tokens, "temperature": temperature or self.settings.llm_temperature, "stream": True, } async with self._client.stream( "POST", "/v1/chat/completions", json=payload ) as response: response.raise_for_status() async for line in response.aiter_lines(): if line.startswith("data: "): data = line[6:] if data == "[DONE]": break import json chunk = json.loads(data) delta = chunk["choices"][0].get("delta", {}) content = delta.get("content", "") if content: yield content def _build_messages( self, prompt: str, context: Optional[str] = None, system_prompt: Optional[str] = None, ) -> list[dict]: """Build the messages list for the API call.""" messages = [] # System prompt if system_prompt: messages.append({"role": "system", "content": system_prompt}) elif context: # Default RAG system prompt messages.append({ "role": "system", "content": ( "You are a helpful assistant. Use the provided context to answer " "the user's question. If the context doesn't contain relevant " "information, say so." ), }) # Add context as a separate message if provided if context: messages.append({ "role": "user", "content": f"Context:\n{context}\n\nQuestion: {prompt}", }) else: messages.append({"role": "user", "content": prompt}) return messages async def health(self) -> bool: """Check if the LLM service is healthy.""" try: response = await self._client.get("/health") return response.status_code == 200 except Exception: return False