fix: apply ruff fixes to ray_serve package
Some checks failed
Build and Publish ray-serve-apps / lint (push) Successful in 1m30s
Build and Publish ray-serve-apps / publish (push) Failing after 2m44s

[ray-serve only]

- Fix whitespace in docstrings
- Add strict=True to zip() calls
- Use ternary operators where appropriate
- Rename unused loop variables
This commit is contained in:
2026-02-02 11:09:35 -05:00
parent 16f6199534
commit 12987c6adc
6 changed files with 128 additions and 131 deletions

View File

@@ -12,4 +12,4 @@ __all__ = [
"reranker_app",
"tts_app",
"whisper_app",
]
]

View File

@@ -4,9 +4,7 @@ Runs on: drizzt (Radeon 680M iGPU, ROCm)
"""
import os
import time
import uuid
from typing import Any, Dict, List, Union
from typing import Any
from ray import serve
@@ -14,11 +12,11 @@ from ray import serve
@serve.deployment(name="EmbeddingsDeployment", num_replicas=1)
class EmbeddingsDeployment:
def __init__(self):
from sentence_transformers import SentenceTransformer
import torch
from sentence_transformers import SentenceTransformer
self.model_id = os.environ.get("MODEL_ID", "BAAI/bge-large-en-v1.5")
# Detect device
if torch.cuda.is_available():
self.device = "cuda"
@@ -26,19 +24,19 @@ class EmbeddingsDeployment:
self.device = "xpu"
else:
self.device = "cpu"
print(f"Loading embeddings model: {self.model_id}")
print(f"Using device: {self.device}")
self.model = SentenceTransformer(self.model_id, device=self.device)
self.embedding_dim = self.model.get_sentence_embedding_dimension()
print(f"Model loaded. Embedding dimension: {self.embedding_dim}")
async def __call__(self, request: Dict[str, Any]) -> Dict[str, Any]:
async def __call__(self, request: dict[str, Any]) -> dict[str, Any]:
"""
Handle OpenAI-compatible embedding requests.
Expected request format:
{
"model": "model-name",
@@ -47,31 +45,30 @@ class EmbeddingsDeployment:
}
"""
input_data = request.get("input", "")
# Handle both single string and list of strings
if isinstance(input_data, str):
texts = [input_data]
else:
texts = input_data
texts = [input_data] if isinstance(input_data, str) else input_data
# Generate embeddings
embeddings = self.model.encode(
texts,
normalize_embeddings=True,
show_progress_bar=False,
)
# Build response data
data = []
total_tokens = 0
for i, (text, embedding) in enumerate(zip(texts, embeddings)):
data.append({
"object": "embedding",
"index": i,
"embedding": embedding.tolist(),
})
for i, (text, embedding) in enumerate(zip(texts, embeddings, strict=False)):
data.append(
{
"object": "embedding",
"index": i,
"embedding": embedding.tolist(),
}
)
total_tokens += len(text.split())
# Return OpenAI-compatible response
return {
"object": "list",

View File

@@ -6,7 +6,7 @@ Runs on: khelben (Strix Halo 64GB, ROCm)
import os
import time
import uuid
from typing import Any, Dict, List, Optional
from typing import Any
from ray import serve
@@ -15,15 +15,15 @@ from ray import serve
class LLMDeployment:
def __init__(self):
from vllm import LLM, SamplingParams
self.model_id = os.environ.get("MODEL_ID", "meta-llama/Llama-3.1-70B-Instruct")
self.max_model_len = int(os.environ.get("MAX_MODEL_LEN", "8192"))
self.gpu_memory_utilization = float(os.environ.get("GPU_MEMORY_UTILIZATION", "0.9"))
print(f"Loading vLLM model: {self.model_id}")
print(f"Max model length: {self.max_model_len}")
print(f"GPU memory utilization: {self.gpu_memory_utilization}")
self.llm = LLM(
model=self.model_id,
max_model_len=self.max_model_len,
@@ -33,10 +33,10 @@ class LLMDeployment:
self.SamplingParams = SamplingParams
print(f"Model {self.model_id} loaded successfully")
async def __call__(self, request: Dict[str, Any]) -> Dict[str, Any]:
async def __call__(self, request: dict[str, Any]) -> dict[str, Any]:
"""
Handle OpenAI-compatible chat completion requests.
Expected request format:
{
"model": "model-name",
@@ -51,21 +51,21 @@ class LLMDeployment:
temperature = request.get("temperature", 0.7)
max_tokens = request.get("max_tokens", 256)
top_p = request.get("top_p", 1.0)
stop = request.get("stop", None)
stop = request.get("stop")
# Convert messages to prompt
prompt = self._format_messages(messages)
sampling_params = self.SamplingParams(
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
stop=stop,
)
outputs = self.llm.generate([prompt], sampling_params)
generated_text = outputs[0].outputs[0].text
# Return OpenAI-compatible response
return {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
@@ -89,7 +89,7 @@ class LLMDeployment:
},
}
def _format_messages(self, messages: List[Dict[str, str]]) -> str:
def _format_messages(self, messages: list[dict[str, str]]) -> str:
"""Format chat messages into a prompt string."""
formatted = ""
for msg in messages:

View File

@@ -4,9 +4,7 @@ Runs on: drizzt (Radeon 680M iGPU, ROCm) or danilo (Intel i915 iGPU, OpenVINO/IP
"""
import os
import time
import uuid
from typing import Any, Dict, List, Tuple
from typing import Any
from ray import serve
@@ -14,16 +12,17 @@ from ray import serve
@serve.deployment(name="RerankerDeployment", num_replicas=1)
class RerankerDeployment:
def __init__(self):
from sentence_transformers import CrossEncoder
import torch
from sentence_transformers import CrossEncoder
self.model_id = os.environ.get("MODEL_ID", "BAAI/bge-reranker-v2-m3")
self.use_ipex = False
self.device = "cpu"
# Detect device - check for Intel GPU first via IPEX
try:
import intel_extension_for_pytorch as ipex
self.use_ipex = True
if hasattr(torch, "xpu") and torch.xpu.is_available():
self.device = "xpu"
@@ -32,36 +31,37 @@ class RerankerDeployment:
print("IPEX available, will use CPU optimization")
except ImportError:
print("IPEX not available, checking for other GPUs")
# Check for CUDA/ROCm if not using Intel
if not self.use_ipex:
if torch.cuda.is_available():
self.device = "cuda"
print(f"Using CUDA/ROCm device")
print("Using CUDA/ROCm device")
else:
print("No GPU detected, using CPU")
print(f"Loading reranker model: {self.model_id}")
print(f"Using device: {self.device}")
# Load model
self.model = CrossEncoder(self.model_id, device=self.device)
# Apply IPEX optimization if available
if self.use_ipex and self.device == "cpu":
try:
import intel_extension_for_pytorch as ipex
self.model.model = ipex.optimize(self.model.model)
print("IPEX CPU optimization applied")
except Exception as e:
print(f"IPEX optimization failed: {e}")
print(f"Reranker model loaded successfully")
async def __call__(self, request: Dict[str, Any]) -> Dict[str, Any]:
print("Reranker model loaded successfully")
async def __call__(self, request: dict[str, Any]) -> dict[str, Any]:
"""
Handle reranking requests.
Expected request format:
{
"query": "search query",
@@ -69,7 +69,7 @@ class RerankerDeployment:
"top_k": 3,
"return_documents": true
}
Alternative format (pairs):
{
"pairs": [["query", "doc1"], ["query", "doc2"]]
@@ -79,42 +79,44 @@ class RerankerDeployment:
if "pairs" in request:
pairs = request["pairs"]
scores = self.model.predict(pairs)
results = []
for i, (pair, score) in enumerate(zip(pairs, scores)):
results.append({
"index": i,
"score": float(score),
})
for i, (_pair, score) in enumerate(zip(pairs, scores, strict=False)):
results.append(
{
"index": i,
"score": float(score),
}
)
return {
"object": "list",
"results": results,
"model": self.model_id,
}
# Handle query + documents format
query = request.get("query", "")
documents = request.get("documents", [])
top_k = request.get("top_k", len(documents))
return_documents = request.get("return_documents", True)
if not documents:
return {
"object": "list",
"results": [],
"model": self.model_id,
}
# Create query-document pairs
pairs = [[query, doc] for doc in documents]
# Get scores
scores = self.model.predict(pairs)
# Create results with indices and scores
results = []
for i, (doc, score) in enumerate(zip(documents, scores)):
for i, (doc, score) in enumerate(zip(documents, scores, strict=False)):
result = {
"index": i,
"score": float(score),
@@ -122,13 +124,13 @@ class RerankerDeployment:
if return_documents:
result["document"] = doc
results.append(result)
# Sort by score descending
results.sort(key=lambda x: x["score"], reverse=True)
# Apply top_k
results = results[:top_k]
return {
"object": "list",
"results": results,

View File

@@ -3,12 +3,10 @@ Ray Serve deployment for Coqui TTS.
Runs on: elminster (RTX 2070 8GB, CUDA)
"""
import os
import io
import time
import uuid
import base64
from typing import Any, Dict, Optional
import io
import os
from typing import Any
from ray import serve
@@ -16,28 +14,28 @@ from ray import serve
@serve.deployment(name="TTSDeployment", num_replicas=1)
class TTSDeployment:
def __init__(self):
from TTS.api import TTS
import torch
from TTS.api import TTS
self.model_name = os.environ.get("MODEL_NAME", "tts_models/en/ljspeech/tacotron2-DDC")
# Detect device
self.use_gpu = torch.cuda.is_available()
print(f"Loading TTS model: {self.model_name}")
print(f"Using GPU: {self.use_gpu}")
self.tts = TTS(model_name=self.model_name, progress_bar=False)
if self.use_gpu:
self.tts = self.tts.to("cuda")
print(f"TTS model loaded successfully")
async def __call__(self, request: Dict[str, Any]) -> Dict[str, Any]:
print("TTS model loaded successfully")
async def __call__(self, request: dict[str, Any]) -> dict[str, Any]:
"""
Handle text-to-speech requests.
Expected request format:
{
"text": "Text to synthesize",
@@ -50,17 +48,17 @@ class TTSDeployment:
"""
import numpy as np
from scipy.io import wavfile
text = request.get("text", "")
speaker = request.get("speaker", None)
language = request.get("language", None)
speaker = request.get("speaker")
language = request.get("language")
speed = request.get("speed", 1.0)
output_format = request.get("output_format", "wav")
return_base64 = request.get("return_base64", True)
if not text:
return {"error": "No text provided"}
# Generate speech
try:
# TTS.tts returns a numpy array of audio samples
@@ -70,48 +68,52 @@ class TTSDeployment:
language=language,
speed=speed,
)
# Convert to numpy array if needed
if not isinstance(wav, np.ndarray):
wav = np.array(wav)
# Normalize to int16
wav_int16 = (wav * 32767).astype(np.int16)
# Get sample rate from model config
sample_rate = self.tts.synthesizer.output_sample_rate if hasattr(self.tts, 'synthesizer') else 22050
sample_rate = (
self.tts.synthesizer.output_sample_rate
if hasattr(self.tts, "synthesizer")
else 22050
)
# Write to buffer
buffer = io.BytesIO()
wavfile.write(buffer, sample_rate, wav_int16)
audio_bytes = buffer.getvalue()
response = {
"model": self.model_name,
"sample_rate": sample_rate,
"duration": len(wav) / sample_rate,
"format": output_format,
}
if return_base64:
response["audio"] = base64.b64encode(audio_bytes).decode("utf-8")
else:
response["audio_bytes"] = audio_bytes
return response
except Exception as e:
return {
"error": str(e),
"model": self.model_name,
}
def list_speakers(self) -> Dict[str, Any]:
def list_speakers(self) -> dict[str, Any]:
"""List available speakers for multi-speaker models."""
speakers = []
if hasattr(self.tts, 'speakers') and self.tts.speakers:
if hasattr(self.tts, "speakers") and self.tts.speakers:
speakers = self.tts.speakers
return {
"model": self.model_name,
"speakers": speakers,

View File

@@ -3,12 +3,10 @@ Ray Serve deployment for faster-whisper STT.
Runs on: elminster (RTX 2070 8GB, CUDA)
"""
import os
import io
import time
import uuid
import base64
from typing import Any, Dict, Optional
import io
import os
from typing import Any
from ray import serve
@@ -16,11 +14,11 @@ from ray import serve
@serve.deployment(name="WhisperDeployment", num_replicas=1)
class WhisperDeployment:
def __init__(self):
from faster_whisper import WhisperModel
import torch
from faster_whisper import WhisperModel
self.model_size = os.environ.get("MODEL_SIZE", "large-v3")
# Detect device and compute type
if torch.cuda.is_available():
self.device = "cuda"
@@ -28,22 +26,22 @@ class WhisperDeployment:
else:
self.device = "cpu"
self.compute_type = "int8"
print(f"Loading Whisper model: {self.model_size}")
print(f"Using device: {self.device}, compute_type: {self.compute_type}")
self.model = WhisperModel(
self.model_size,
device=self.device,
compute_type=self.compute_type,
)
print(f"Whisper model loaded successfully")
async def __call__(self, request: Dict[str, Any]) -> Dict[str, Any]:
print("Whisper model loaded successfully")
async def __call__(self, request: dict[str, Any]) -> dict[str, Any]:
"""
Handle transcription requests.
Expected request format:
{
"audio": "base64_encoded_audio_data",
@@ -53,24 +51,22 @@ class WhisperDeployment:
"response_format": "json",
"word_timestamps": false
}
Alternative with file path:
{
"file": "/path/to/audio.wav",
...
}
"""
import numpy as np
from scipy.io import wavfile
language = request.get("language", None)
language = request.get("language")
task = request.get("task", "transcribe") # transcribe or translate
response_format = request.get("response_format", "json")
word_timestamps = request.get("word_timestamps", False)
# Get audio data
audio_input = None
if "audio" in request:
# Base64 encoded audio
audio_bytes = base64.b64decode(request["audio"])
@@ -85,7 +81,7 @@ class WhisperDeployment:
return {
"error": "No audio data provided. Use 'audio' (base64), 'file' (path), or 'audio_bytes'",
}
# Transcribe
segments, info = self.model.transcribe(
audio_input,
@@ -94,11 +90,11 @@ class WhisperDeployment:
word_timestamps=word_timestamps,
vad_filter=True,
)
# Collect segments
segment_list = []
full_text = ""
for segment in segments:
seg_data = {
"id": segment.id,
@@ -106,7 +102,7 @@ class WhisperDeployment:
"end": segment.end,
"text": segment.text,
}
if word_timestamps and segment.words:
seg_data["words"] = [
{
@@ -117,14 +113,14 @@ class WhisperDeployment:
}
for word in segment.words
]
segment_list.append(seg_data)
full_text += segment.text
# Build response based on format
if response_format == "text":
return {"text": full_text.strip()}
if response_format == "verbose_json":
return {
"task": task,
@@ -133,7 +129,7 @@ class WhisperDeployment:
"text": full_text.strip(),
"segments": segment_list,
}
# Default JSON format (OpenAI-compatible)
return {
"text": full_text.strip(),