From 15e4b8afa30fb68907e86d33e656f60cc3936af3 Mon Sep 17 00:00:00 2001 From: "Billy D." Date: Thu, 12 Feb 2026 07:01:17 -0500 Subject: [PATCH] fix: make mlflow_logger import optional with no-op fallback The strixhalo LLM worker uses py_executable pointing to the Docker image venv which doesn't have the updated ray-serve-apps package. Wrap all InferenceLogger imports in try/except and guard usage with None checks so apps degrade gracefully without MLflow logging. --- ray_serve/serve_embeddings.py | 37 ++++++++++++++---------- ray_serve/serve_llm.py | 53 ++++++++++++++++++++--------------- ray_serve/serve_reranker.py | 48 ++++++++++++++++++------------- ray_serve/serve_tts.py | 35 ++++++++++++++--------- ray_serve/serve_whisper.py | 39 +++++++++++++++----------- 5 files changed, 124 insertions(+), 88 deletions(-) diff --git a/ray_serve/serve_embeddings.py b/ray_serve/serve_embeddings.py index 8350505..6d91e4c 100644 --- a/ray_serve/serve_embeddings.py +++ b/ray_serve/serve_embeddings.py @@ -9,7 +9,10 @@ from typing import Any from ray import serve -from ray_serve.mlflow_logger import InferenceLogger +try: + from ray_serve.mlflow_logger import InferenceLogger +except ImportError: + InferenceLogger = None @serve.deployment(name="EmbeddingsDeployment", num_replicas=1) @@ -37,15 +40,18 @@ class EmbeddingsDeployment: print(f"Model loaded. Embedding dimension: {self.embedding_dim}") # MLflow metrics - self._mlflow = InferenceLogger( - experiment_name="ray-serve-embeddings", - run_name=f"embeddings-{self.model_id.split('/')[-1]}", - tags={"model.name": self.model_id, "model.framework": "sentence-transformers", "device": self.device}, - flush_every=10, - ) - self._mlflow.initialize( - params={"model_id": self.model_id, "embedding_dim": str(self.embedding_dim), "device": self.device} - ) + if InferenceLogger is not None: + self._mlflow = InferenceLogger( + experiment_name="ray-serve-embeddings", + run_name=f"embeddings-{self.model_id.split('/')[-1]}", + tags={"model.name": self.model_id, "model.framework": "sentence-transformers", "device": self.device}, + flush_every=10, + ) + self._mlflow.initialize( + params={"model_id": self.model_id, "embedding_dim": str(self.embedding_dim), "device": self.device} + ) + else: + self._mlflow = None async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ @@ -86,11 +92,12 @@ class EmbeddingsDeployment: total_tokens += len(text.split()) # Log to MLflow - self._mlflow.log_request( - latency_s=time.time() - _start, - batch_size=len(texts), - total_tokens=total_tokens, - ) + if self._mlflow: + self._mlflow.log_request( + latency_s=time.time() - _start, + batch_size=len(texts), + total_tokens=total_tokens, + ) # Return OpenAI-compatible response return { diff --git a/ray_serve/serve_llm.py b/ray_serve/serve_llm.py index ac191e9..8034217 100644 --- a/ray_serve/serve_llm.py +++ b/ray_serve/serve_llm.py @@ -10,7 +10,10 @@ from typing import Any from ray import serve -from ray_serve.mlflow_logger import InferenceLogger +try: + from ray_serve.mlflow_logger import InferenceLogger +except ImportError: + InferenceLogger = None @serve.deployment(name="LLMDeployment", num_replicas=1) @@ -40,19 +43,22 @@ class LLMDeployment: print(f"Model {self.model_id} async engine created") # MLflow metrics - self._mlflow = InferenceLogger( - experiment_name="ray-serve-llm", - run_name=f"llm-{self.model_id.split('/')[-1]}", - tags={"model.name": self.model_id, "model.framework": "vllm", "gpu": "strixhalo"}, - flush_every=5, - ) - self._mlflow.initialize( - params={ - "model_id": self.model_id, - "max_model_len": str(self.max_model_len), - "gpu_memory_utilization": str(self.gpu_memory_utilization), - } - ) + if InferenceLogger is not None: + self._mlflow = InferenceLogger( + experiment_name="ray-serve-llm", + run_name=f"llm-{self.model_id.split('/')[-1]}", + tags={"model.name": self.model_id, "model.framework": "vllm", "gpu": "strixhalo"}, + flush_every=5, + ) + self._mlflow.initialize( + params={ + "model_id": self.model_id, + "max_model_len": str(self.max_model_len), + "gpu_memory_utilization": str(self.gpu_memory_utilization), + } + ) + else: + self._mlflow = None async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ @@ -96,15 +102,16 @@ class LLMDeployment: completion_tokens = len(generated_text.split()) # Log to MLflow - self._mlflow.log_request( - latency_s=latency, - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=prompt_tokens + completion_tokens, - tokens_per_second=completion_tokens / latency if latency > 0 else 0, - temperature=temperature, - max_tokens_requested=max_tokens, - ) + if self._mlflow: + self._mlflow.log_request( + latency_s=latency, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=prompt_tokens + completion_tokens, + tokens_per_second=completion_tokens / latency if latency > 0 else 0, + temperature=temperature, + max_tokens_requested=max_tokens, + ) # Return OpenAI-compatible response return { diff --git a/ray_serve/serve_reranker.py b/ray_serve/serve_reranker.py index 6baa641..c61bb75 100644 --- a/ray_serve/serve_reranker.py +++ b/ray_serve/serve_reranker.py @@ -9,7 +9,10 @@ from typing import Any from ray import serve -from ray_serve.mlflow_logger import InferenceLogger +try: + from ray_serve.mlflow_logger import InferenceLogger +except ImportError: + InferenceLogger = None @serve.deployment(name="RerankerDeployment", num_replicas=1) @@ -62,15 +65,18 @@ class RerankerDeployment: print("Reranker model loaded successfully") # MLflow metrics - self._mlflow = InferenceLogger( - experiment_name="ray-serve-reranker", - run_name=f"reranker-{self.model_id.split('/')[-1]}", - tags={"model.name": self.model_id, "model.framework": "sentence-transformers", "device": self.device}, - flush_every=10, - ) - self._mlflow.initialize( - params={"model_id": self.model_id, "device": self.device, "use_ipex": str(self.use_ipex)} - ) + if InferenceLogger is not None: + self._mlflow = InferenceLogger( + experiment_name="ray-serve-reranker", + run_name=f"reranker-{self.model_id.split('/')[-1]}", + tags={"model.name": self.model_id, "model.framework": "sentence-transformers", "device": self.device}, + flush_every=10, + ) + self._mlflow.initialize( + params={"model_id": self.model_id, "device": self.device, "use_ipex": str(self.use_ipex)} + ) + else: + self._mlflow = None async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ @@ -105,10 +111,11 @@ class RerankerDeployment: } ) - self._mlflow.log_request( - latency_s=time.time() - _start, - num_pairs=len(pairs), - ) + if self._mlflow: + self._mlflow.log_request( + latency_s=time.time() - _start, + num_pairs=len(pairs), + ) return { "object": "list", @@ -153,12 +160,13 @@ class RerankerDeployment: results = results[:top_k] # Log to MLflow - self._mlflow.log_request( - latency_s=time.time() - _start, - num_pairs=len(pairs), - num_documents=len(documents), - top_k=top_k, - ) + if self._mlflow: + self._mlflow.log_request( + latency_s=time.time() - _start, + num_pairs=len(pairs), + num_documents=len(documents), + top_k=top_k, + ) return { "object": "list", diff --git a/ray_serve/serve_tts.py b/ray_serve/serve_tts.py index 643f1b7..68486e6 100644 --- a/ray_serve/serve_tts.py +++ b/ray_serve/serve_tts.py @@ -11,7 +11,10 @@ from typing import Any from ray import serve -from ray_serve.mlflow_logger import InferenceLogger +try: + from ray_serve.mlflow_logger import InferenceLogger +except ImportError: + InferenceLogger = None @serve.deployment(name="TTSDeployment", num_replicas=1) @@ -36,13 +39,16 @@ class TTSDeployment: print("TTS model loaded successfully") # MLflow metrics - self._mlflow = InferenceLogger( - experiment_name="ray-serve-tts", - run_name=f"tts-{self.model_name.split('/')[-1]}", - tags={"model.name": self.model_name, "model.framework": "coqui-tts", "gpu": str(self.use_gpu)}, - flush_every=5, - ) - self._mlflow.initialize(params={"model_name": self.model_name, "use_gpu": str(self.use_gpu)}) + if InferenceLogger is not None: + self._mlflow = InferenceLogger( + experiment_name="ray-serve-tts", + run_name=f"tts-{self.model_name.split('/')[-1]}", + tags={"model.name": self.model_name, "model.framework": "coqui-tts", "gpu": str(self.use_gpu)}, + flush_every=5, + ) + self._mlflow.initialize(params={"model_name": self.model_name, "use_gpu": str(self.use_gpu)}) + else: + self._mlflow = None async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ @@ -104,12 +110,13 @@ class TTSDeployment: duration = len(wav) / sample_rate # Log to MLflow - self._mlflow.log_request( - latency_s=time.time() - _start, - audio_duration_s=duration, - text_chars=len(text), - realtime_factor=(time.time() - _start) / duration if duration > 0 else 0, - ) + if self._mlflow: + self._mlflow.log_request( + latency_s=time.time() - _start, + audio_duration_s=duration, + text_chars=len(text), + realtime_factor=(time.time() - _start) / duration if duration > 0 else 0, + ) response = { "model": self.model_name, diff --git a/ray_serve/serve_whisper.py b/ray_serve/serve_whisper.py index c690a3a..5c9f117 100644 --- a/ray_serve/serve_whisper.py +++ b/ray_serve/serve_whisper.py @@ -11,7 +11,10 @@ from typing import Any from ray import serve -from ray_serve.mlflow_logger import InferenceLogger +try: + from ray_serve.mlflow_logger import InferenceLogger +except ImportError: + InferenceLogger = None @serve.deployment(name="WhisperDeployment", num_replicas=1) @@ -42,15 +45,18 @@ class WhisperDeployment: print("Whisper model loaded successfully") # MLflow metrics - self._mlflow = InferenceLogger( - experiment_name="ray-serve-whisper", - run_name=f"whisper-{self.model_size}", - tags={"model.name": f"whisper-{self.model_size}", "model.framework": "faster-whisper", "device": self.device}, - flush_every=5, - ) - self._mlflow.initialize( - params={"model_size": self.model_size, "device": self.device, "compute_type": self.compute_type} - ) + if InferenceLogger is not None: + self._mlflow = InferenceLogger( + experiment_name="ray-serve-whisper", + run_name=f"whisper-{self.model_size}", + tags={"model.name": f"whisper-{self.model_size}", "model.framework": "faster-whisper", "device": self.device}, + flush_every=5, + ) + self._mlflow.initialize( + params={"model_size": self.model_size, "device": self.device, "compute_type": self.compute_type} + ) + else: + self._mlflow = None async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ @@ -146,12 +152,13 @@ class WhisperDeployment: } # Log to MLflow - self._mlflow.log_request( - latency_s=time.time() - _start, - audio_duration_s=info.duration, - segments=len(segment_list), - realtime_factor=(time.time() - _start) / info.duration if info.duration > 0 else 0, - ) + if self._mlflow: + self._mlflow.log_request( + latency_s=time.time() - _start, + audio_duration_s=info.duration, + segments=len(segment_list), + realtime_factor=(time.time() - _start) / info.duration if info.duration > 0 else 0, + ) # Default JSON format (OpenAI-compatible) return {