diff --git a/ray-serve/ray_serve/__init__.py b/ray-serve/ray_serve/__init__.py index 2bf591d..496c7aa 100644 --- a/ray-serve/ray_serve/__init__.py +++ b/ray-serve/ray_serve/__init__.py @@ -12,4 +12,4 @@ __all__ = [ "reranker_app", "tts_app", "whisper_app", -] \ No newline at end of file +] diff --git a/ray-serve/ray_serve/serve_embeddings.py b/ray-serve/ray_serve/serve_embeddings.py index 3a13985..8ef5258 100644 --- a/ray-serve/ray_serve/serve_embeddings.py +++ b/ray-serve/ray_serve/serve_embeddings.py @@ -4,9 +4,7 @@ Runs on: drizzt (Radeon 680M iGPU, ROCm) """ import os -import time -import uuid -from typing import Any, Dict, List, Union +from typing import Any from ray import serve @@ -14,11 +12,11 @@ from ray import serve @serve.deployment(name="EmbeddingsDeployment", num_replicas=1) class EmbeddingsDeployment: def __init__(self): - from sentence_transformers import SentenceTransformer import torch - + from sentence_transformers import SentenceTransformer + self.model_id = os.environ.get("MODEL_ID", "BAAI/bge-large-en-v1.5") - + # Detect device if torch.cuda.is_available(): self.device = "cuda" @@ -26,19 +24,19 @@ class EmbeddingsDeployment: self.device = "xpu" else: self.device = "cpu" - + print(f"Loading embeddings model: {self.model_id}") print(f"Using device: {self.device}") - + self.model = SentenceTransformer(self.model_id, device=self.device) self.embedding_dim = self.model.get_sentence_embedding_dimension() - + print(f"Model loaded. Embedding dimension: {self.embedding_dim}") - async def __call__(self, request: Dict[str, Any]) -> Dict[str, Any]: + async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ Handle OpenAI-compatible embedding requests. - + Expected request format: { "model": "model-name", @@ -47,31 +45,30 @@ class EmbeddingsDeployment: } """ input_data = request.get("input", "") - + # Handle both single string and list of strings - if isinstance(input_data, str): - texts = [input_data] - else: - texts = input_data - + texts = [input_data] if isinstance(input_data, str) else input_data + # Generate embeddings embeddings = self.model.encode( texts, normalize_embeddings=True, show_progress_bar=False, ) - + # Build response data data = [] total_tokens = 0 - for i, (text, embedding) in enumerate(zip(texts, embeddings)): - data.append({ - "object": "embedding", - "index": i, - "embedding": embedding.tolist(), - }) + for i, (text, embedding) in enumerate(zip(texts, embeddings, strict=False)): + data.append( + { + "object": "embedding", + "index": i, + "embedding": embedding.tolist(), + } + ) total_tokens += len(text.split()) - + # Return OpenAI-compatible response return { "object": "list", diff --git a/ray-serve/ray_serve/serve_llm.py b/ray-serve/ray_serve/serve_llm.py index 4e15965..c14c65f 100644 --- a/ray-serve/ray_serve/serve_llm.py +++ b/ray-serve/ray_serve/serve_llm.py @@ -6,7 +6,7 @@ Runs on: khelben (Strix Halo 64GB, ROCm) import os import time import uuid -from typing import Any, Dict, List, Optional +from typing import Any from ray import serve @@ -15,15 +15,15 @@ from ray import serve class LLMDeployment: def __init__(self): from vllm import LLM, SamplingParams - + self.model_id = os.environ.get("MODEL_ID", "meta-llama/Llama-3.1-70B-Instruct") self.max_model_len = int(os.environ.get("MAX_MODEL_LEN", "8192")) self.gpu_memory_utilization = float(os.environ.get("GPU_MEMORY_UTILIZATION", "0.9")) - + print(f"Loading vLLM model: {self.model_id}") print(f"Max model length: {self.max_model_len}") print(f"GPU memory utilization: {self.gpu_memory_utilization}") - + self.llm = LLM( model=self.model_id, max_model_len=self.max_model_len, @@ -33,10 +33,10 @@ class LLMDeployment: self.SamplingParams = SamplingParams print(f"Model {self.model_id} loaded successfully") - async def __call__(self, request: Dict[str, Any]) -> Dict[str, Any]: + async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ Handle OpenAI-compatible chat completion requests. - + Expected request format: { "model": "model-name", @@ -51,21 +51,21 @@ class LLMDeployment: temperature = request.get("temperature", 0.7) max_tokens = request.get("max_tokens", 256) top_p = request.get("top_p", 1.0) - stop = request.get("stop", None) - + stop = request.get("stop") + # Convert messages to prompt prompt = self._format_messages(messages) - + sampling_params = self.SamplingParams( temperature=temperature, max_tokens=max_tokens, top_p=top_p, stop=stop, ) - + outputs = self.llm.generate([prompt], sampling_params) generated_text = outputs[0].outputs[0].text - + # Return OpenAI-compatible response return { "id": f"chatcmpl-{uuid.uuid4().hex[:8]}", @@ -89,7 +89,7 @@ class LLMDeployment: }, } - def _format_messages(self, messages: List[Dict[str, str]]) -> str: + def _format_messages(self, messages: list[dict[str, str]]) -> str: """Format chat messages into a prompt string.""" formatted = "" for msg in messages: diff --git a/ray-serve/ray_serve/serve_reranker.py b/ray-serve/ray_serve/serve_reranker.py index 13ff876..582e6b2 100644 --- a/ray-serve/ray_serve/serve_reranker.py +++ b/ray-serve/ray_serve/serve_reranker.py @@ -4,9 +4,7 @@ Runs on: drizzt (Radeon 680M iGPU, ROCm) or danilo (Intel i915 iGPU, OpenVINO/IP """ import os -import time -import uuid -from typing import Any, Dict, List, Tuple +from typing import Any from ray import serve @@ -14,16 +12,17 @@ from ray import serve @serve.deployment(name="RerankerDeployment", num_replicas=1) class RerankerDeployment: def __init__(self): - from sentence_transformers import CrossEncoder import torch - + from sentence_transformers import CrossEncoder + self.model_id = os.environ.get("MODEL_ID", "BAAI/bge-reranker-v2-m3") self.use_ipex = False self.device = "cpu" - + # Detect device - check for Intel GPU first via IPEX try: import intel_extension_for_pytorch as ipex + self.use_ipex = True if hasattr(torch, "xpu") and torch.xpu.is_available(): self.device = "xpu" @@ -32,36 +31,37 @@ class RerankerDeployment: print("IPEX available, will use CPU optimization") except ImportError: print("IPEX not available, checking for other GPUs") - + # Check for CUDA/ROCm if not using Intel if not self.use_ipex: if torch.cuda.is_available(): self.device = "cuda" - print(f"Using CUDA/ROCm device") + print("Using CUDA/ROCm device") else: print("No GPU detected, using CPU") - + print(f"Loading reranker model: {self.model_id}") print(f"Using device: {self.device}") - + # Load model self.model = CrossEncoder(self.model_id, device=self.device) - + # Apply IPEX optimization if available if self.use_ipex and self.device == "cpu": try: import intel_extension_for_pytorch as ipex + self.model.model = ipex.optimize(self.model.model) print("IPEX CPU optimization applied") except Exception as e: print(f"IPEX optimization failed: {e}") - - print(f"Reranker model loaded successfully") - async def __call__(self, request: Dict[str, Any]) -> Dict[str, Any]: + print("Reranker model loaded successfully") + + async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ Handle reranking requests. - + Expected request format: { "query": "search query", @@ -69,7 +69,7 @@ class RerankerDeployment: "top_k": 3, "return_documents": true } - + Alternative format (pairs): { "pairs": [["query", "doc1"], ["query", "doc2"]] @@ -79,42 +79,44 @@ class RerankerDeployment: if "pairs" in request: pairs = request["pairs"] scores = self.model.predict(pairs) - + results = [] - for i, (pair, score) in enumerate(zip(pairs, scores)): - results.append({ - "index": i, - "score": float(score), - }) - + for i, (_pair, score) in enumerate(zip(pairs, scores, strict=False)): + results.append( + { + "index": i, + "score": float(score), + } + ) + return { "object": "list", "results": results, "model": self.model_id, } - + # Handle query + documents format query = request.get("query", "") documents = request.get("documents", []) top_k = request.get("top_k", len(documents)) return_documents = request.get("return_documents", True) - + if not documents: return { "object": "list", "results": [], "model": self.model_id, } - + # Create query-document pairs pairs = [[query, doc] for doc in documents] - + # Get scores scores = self.model.predict(pairs) - + # Create results with indices and scores results = [] - for i, (doc, score) in enumerate(zip(documents, scores)): + for i, (doc, score) in enumerate(zip(documents, scores, strict=False)): result = { "index": i, "score": float(score), @@ -122,13 +124,13 @@ class RerankerDeployment: if return_documents: result["document"] = doc results.append(result) - + # Sort by score descending results.sort(key=lambda x: x["score"], reverse=True) - + # Apply top_k results = results[:top_k] - + return { "object": "list", "results": results, diff --git a/ray-serve/ray_serve/serve_tts.py b/ray-serve/ray_serve/serve_tts.py index 636895e..9dee6de 100644 --- a/ray-serve/ray_serve/serve_tts.py +++ b/ray-serve/ray_serve/serve_tts.py @@ -3,12 +3,10 @@ Ray Serve deployment for Coqui TTS. Runs on: elminster (RTX 2070 8GB, CUDA) """ -import os -import io -import time -import uuid import base64 -from typing import Any, Dict, Optional +import io +import os +from typing import Any from ray import serve @@ -16,28 +14,28 @@ from ray import serve @serve.deployment(name="TTSDeployment", num_replicas=1) class TTSDeployment: def __init__(self): - from TTS.api import TTS import torch - + from TTS.api import TTS + self.model_name = os.environ.get("MODEL_NAME", "tts_models/en/ljspeech/tacotron2-DDC") - + # Detect device self.use_gpu = torch.cuda.is_available() - + print(f"Loading TTS model: {self.model_name}") print(f"Using GPU: {self.use_gpu}") - + self.tts = TTS(model_name=self.model_name, progress_bar=False) - + if self.use_gpu: self.tts = self.tts.to("cuda") - - print(f"TTS model loaded successfully") - async def __call__(self, request: Dict[str, Any]) -> Dict[str, Any]: + print("TTS model loaded successfully") + + async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ Handle text-to-speech requests. - + Expected request format: { "text": "Text to synthesize", @@ -50,17 +48,17 @@ class TTSDeployment: """ import numpy as np from scipy.io import wavfile - + text = request.get("text", "") - speaker = request.get("speaker", None) - language = request.get("language", None) + speaker = request.get("speaker") + language = request.get("language") speed = request.get("speed", 1.0) output_format = request.get("output_format", "wav") return_base64 = request.get("return_base64", True) - + if not text: return {"error": "No text provided"} - + # Generate speech try: # TTS.tts returns a numpy array of audio samples @@ -70,48 +68,52 @@ class TTSDeployment: language=language, speed=speed, ) - + # Convert to numpy array if needed if not isinstance(wav, np.ndarray): wav = np.array(wav) - + # Normalize to int16 wav_int16 = (wav * 32767).astype(np.int16) - + # Get sample rate from model config - sample_rate = self.tts.synthesizer.output_sample_rate if hasattr(self.tts, 'synthesizer') else 22050 - + sample_rate = ( + self.tts.synthesizer.output_sample_rate + if hasattr(self.tts, "synthesizer") + else 22050 + ) + # Write to buffer buffer = io.BytesIO() wavfile.write(buffer, sample_rate, wav_int16) audio_bytes = buffer.getvalue() - + response = { "model": self.model_name, "sample_rate": sample_rate, "duration": len(wav) / sample_rate, "format": output_format, } - + if return_base64: response["audio"] = base64.b64encode(audio_bytes).decode("utf-8") else: response["audio_bytes"] = audio_bytes - + return response - + except Exception as e: return { "error": str(e), "model": self.model_name, } - def list_speakers(self) -> Dict[str, Any]: + def list_speakers(self) -> dict[str, Any]: """List available speakers for multi-speaker models.""" speakers = [] - if hasattr(self.tts, 'speakers') and self.tts.speakers: + if hasattr(self.tts, "speakers") and self.tts.speakers: speakers = self.tts.speakers - + return { "model": self.model_name, "speakers": speakers, diff --git a/ray-serve/ray_serve/serve_whisper.py b/ray-serve/ray_serve/serve_whisper.py index 23861f2..1bb16ab 100644 --- a/ray-serve/ray_serve/serve_whisper.py +++ b/ray-serve/ray_serve/serve_whisper.py @@ -3,12 +3,10 @@ Ray Serve deployment for faster-whisper STT. Runs on: elminster (RTX 2070 8GB, CUDA) """ -import os -import io -import time -import uuid import base64 -from typing import Any, Dict, Optional +import io +import os +from typing import Any from ray import serve @@ -16,11 +14,11 @@ from ray import serve @serve.deployment(name="WhisperDeployment", num_replicas=1) class WhisperDeployment: def __init__(self): - from faster_whisper import WhisperModel import torch - + from faster_whisper import WhisperModel + self.model_size = os.environ.get("MODEL_SIZE", "large-v3") - + # Detect device and compute type if torch.cuda.is_available(): self.device = "cuda" @@ -28,22 +26,22 @@ class WhisperDeployment: else: self.device = "cpu" self.compute_type = "int8" - + print(f"Loading Whisper model: {self.model_size}") print(f"Using device: {self.device}, compute_type: {self.compute_type}") - + self.model = WhisperModel( self.model_size, device=self.device, compute_type=self.compute_type, ) - - print(f"Whisper model loaded successfully") - async def __call__(self, request: Dict[str, Any]) -> Dict[str, Any]: + print("Whisper model loaded successfully") + + async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ Handle transcription requests. - + Expected request format: { "audio": "base64_encoded_audio_data", @@ -53,24 +51,22 @@ class WhisperDeployment: "response_format": "json", "word_timestamps": false } - + Alternative with file path: { "file": "/path/to/audio.wav", ... } """ - import numpy as np - from scipy.io import wavfile - - language = request.get("language", None) + + language = request.get("language") task = request.get("task", "transcribe") # transcribe or translate response_format = request.get("response_format", "json") word_timestamps = request.get("word_timestamps", False) - + # Get audio data audio_input = None - + if "audio" in request: # Base64 encoded audio audio_bytes = base64.b64decode(request["audio"]) @@ -85,7 +81,7 @@ class WhisperDeployment: return { "error": "No audio data provided. Use 'audio' (base64), 'file' (path), or 'audio_bytes'", } - + # Transcribe segments, info = self.model.transcribe( audio_input, @@ -94,11 +90,11 @@ class WhisperDeployment: word_timestamps=word_timestamps, vad_filter=True, ) - + # Collect segments segment_list = [] full_text = "" - + for segment in segments: seg_data = { "id": segment.id, @@ -106,7 +102,7 @@ class WhisperDeployment: "end": segment.end, "text": segment.text, } - + if word_timestamps and segment.words: seg_data["words"] = [ { @@ -117,14 +113,14 @@ class WhisperDeployment: } for word in segment.words ] - + segment_list.append(seg_data) full_text += segment.text - + # Build response based on format if response_format == "text": return {"text": full_text.strip()} - + if response_format == "verbose_json": return { "task": task, @@ -133,7 +129,7 @@ class WhisperDeployment: "text": full_text.strip(), "segments": segment_list, } - + # Default JSON format (OpenAI-compatible) return { "text": full_text.strip(),