feat: initial ray-serve-apps PyPI package
Some checks failed
Build and Publish ray-serve-apps / lint (push) Failing after 11m2s
Build and Publish ray-serve-apps / publish (push) Has been cancelled

Implements ADR-0024: Ray Repository Structure

- Ray Serve deployments for GPU-shared AI inference
- Published as PyPI package for dynamic code loading
- Deployments: LLM, embeddings, reranker, whisper, TTS
- CI/CD workflow publishes to Gitea PyPI on push to main

Extracted from kuberay-images repo per ADR-0024
This commit is contained in:
2026-02-03 07:03:39 -05:00
parent eac8f27f2e
commit 8ef914ec12
11 changed files with 887 additions and 1 deletions

15
ray_serve/__init__.py Normal file
View File

@@ -0,0 +1,15 @@
# Ray Serve deployments for GPU-shared AI inference
# Published to Gitea PyPI as ray-serve-apps
from ray_serve.serve_embeddings import app as embeddings_app
from ray_serve.serve_llm import app as llm_app
from ray_serve.serve_reranker import app as reranker_app
from ray_serve.serve_tts import app as tts_app
from ray_serve.serve_whisper import app as whisper_app
__all__ = [
"embeddings_app",
"llm_app",
"reranker_app",
"tts_app",
"whisper_app",
]

0
ray_serve/py.typed Normal file
View File

View File

@@ -0,0 +1,84 @@
"""
Ray Serve deployment for sentence-transformers BGE embeddings.
Runs on: drizzt (Radeon 680M iGPU, ROCm)
"""
import os
from typing import Any
from ray import serve
@serve.deployment(name="EmbeddingsDeployment", num_replicas=1)
class EmbeddingsDeployment:
def __init__(self):
import torch
from sentence_transformers import SentenceTransformer
self.model_id = os.environ.get("MODEL_ID", "BAAI/bge-large-en-v1.5")
# Detect device
if torch.cuda.is_available():
self.device = "cuda"
elif hasattr(torch, "xpu") and torch.xpu.is_available():
self.device = "xpu"
else:
self.device = "cpu"
print(f"Loading embeddings model: {self.model_id}")
print(f"Using device: {self.device}")
self.model = SentenceTransformer(self.model_id, device=self.device)
self.embedding_dim = self.model.get_sentence_embedding_dimension()
print(f"Model loaded. Embedding dimension: {self.embedding_dim}")
async def __call__(self, request: dict[str, Any]) -> dict[str, Any]:
"""
Handle OpenAI-compatible embedding requests.
Expected request format:
{
"model": "model-name",
"input": "text to embed" or ["text1", "text2"],
"encoding_format": "float"
}
"""
input_data = request.get("input", "")
# Handle both single string and list of strings
texts = [input_data] if isinstance(input_data, str) else input_data
# Generate embeddings
embeddings = self.model.encode(
texts,
normalize_embeddings=True,
show_progress_bar=False,
)
# Build response data
data = []
total_tokens = 0
for i, (text, embedding) in enumerate(zip(texts, embeddings, strict=False)):
data.append(
{
"object": "embedding",
"index": i,
"embedding": embedding.tolist(),
}
)
total_tokens += len(text.split())
# Return OpenAI-compatible response
return {
"object": "list",
"data": data,
"model": self.model_id,
"usage": {
"prompt_tokens": total_tokens,
"total_tokens": total_tokens,
},
}
app = EmbeddingsDeployment.bind()

108
ray_serve/serve_llm.py Normal file
View File

@@ -0,0 +1,108 @@
"""
Ray Serve deployment for vLLM with OpenAI-compatible API.
Runs on: khelben (Strix Halo 64GB, ROCm)
"""
import os
import time
import uuid
from typing import Any
from ray import serve
@serve.deployment(name="LLMDeployment", num_replicas=1)
class LLMDeployment:
def __init__(self):
from vllm import LLM, SamplingParams
self.model_id = os.environ.get("MODEL_ID", "meta-llama/Llama-3.1-70B-Instruct")
self.max_model_len = int(os.environ.get("MAX_MODEL_LEN", "8192"))
self.gpu_memory_utilization = float(os.environ.get("GPU_MEMORY_UTILIZATION", "0.9"))
print(f"Loading vLLM model: {self.model_id}")
print(f"Max model length: {self.max_model_len}")
print(f"GPU memory utilization: {self.gpu_memory_utilization}")
self.llm = LLM(
model=self.model_id,
max_model_len=self.max_model_len,
gpu_memory_utilization=self.gpu_memory_utilization,
trust_remote_code=True,
)
self.SamplingParams = SamplingParams
print(f"Model {self.model_id} loaded successfully")
async def __call__(self, request: dict[str, Any]) -> dict[str, Any]:
"""
Handle OpenAI-compatible chat completion requests.
Expected request format:
{
"model": "model-name",
"messages": [{"role": "user", "content": "Hello"}],
"temperature": 0.7,
"max_tokens": 256,
"top_p": 1.0,
"stream": false
}
"""
messages = request.get("messages", [])
temperature = request.get("temperature", 0.7)
max_tokens = request.get("max_tokens", 256)
top_p = request.get("top_p", 1.0)
stop = request.get("stop")
# Convert messages to prompt
prompt = self._format_messages(messages)
sampling_params = self.SamplingParams(
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
stop=stop,
)
outputs = self.llm.generate([prompt], sampling_params)
generated_text = outputs[0].outputs[0].text
# Return OpenAI-compatible response
return {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"created": int(time.time()),
"model": self.model_id,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": generated_text,
},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": len(prompt.split()),
"completion_tokens": len(generated_text.split()),
"total_tokens": len(prompt.split()) + len(generated_text.split()),
},
}
def _format_messages(self, messages: list[dict[str, str]]) -> str:
"""Format chat messages into a prompt string."""
formatted = ""
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
formatted += f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{content}<|eot_id|>"
elif role == "user":
formatted += f"<|start_header_id|>user<|end_header_id|>\n\n{content}<|eot_id|>"
elif role == "assistant":
formatted += f"<|start_header_id|>assistant<|end_header_id|>\n\n{content}<|eot_id|>"
formatted += "<|start_header_id|>assistant<|end_header_id|>\n\n"
return formatted
app = LLMDeployment.bind()

144
ray_serve/serve_reranker.py Normal file
View File

@@ -0,0 +1,144 @@
"""
Ray Serve deployment for sentence-transformers CrossEncoder reranking.
Runs on: drizzt (Radeon 680M iGPU, ROCm) or danilo (Intel i915 iGPU, OpenVINO/IPEX)
"""
import os
from typing import Any
from ray import serve
@serve.deployment(name="RerankerDeployment", num_replicas=1)
class RerankerDeployment:
def __init__(self):
import torch
from sentence_transformers import CrossEncoder
self.model_id = os.environ.get("MODEL_ID", "BAAI/bge-reranker-v2-m3")
self.use_ipex = False
self.device = "cpu"
# Detect device - check for Intel GPU first via IPEX
try:
import intel_extension_for_pytorch as ipex
self.use_ipex = True
if hasattr(torch, "xpu") and torch.xpu.is_available():
self.device = "xpu"
print("Intel GPU detected via IPEX, using XPU device")
else:
print("IPEX available, will use CPU optimization")
except ImportError:
print("IPEX not available, checking for other GPUs")
# Check for CUDA/ROCm if not using Intel
if not self.use_ipex:
if torch.cuda.is_available():
self.device = "cuda"
print("Using CUDA/ROCm device")
else:
print("No GPU detected, using CPU")
print(f"Loading reranker model: {self.model_id}")
print(f"Using device: {self.device}")
# Load model
self.model = CrossEncoder(self.model_id, device=self.device)
# Apply IPEX optimization if available
if self.use_ipex and self.device == "cpu":
try:
import intel_extension_for_pytorch as ipex
self.model.model = ipex.optimize(self.model.model)
print("IPEX CPU optimization applied")
except Exception as e:
print(f"IPEX optimization failed: {e}")
print("Reranker model loaded successfully")
async def __call__(self, request: dict[str, Any]) -> dict[str, Any]:
"""
Handle reranking requests.
Expected request format:
{
"query": "search query",
"documents": ["doc1", "doc2", "doc3"],
"top_k": 3,
"return_documents": true
}
Alternative format (pairs):
{
"pairs": [["query", "doc1"], ["query", "doc2"]]
}
"""
# Handle pairs format
if "pairs" in request:
pairs = request["pairs"]
scores = self.model.predict(pairs)
results = []
for i, (_pair, score) in enumerate(zip(pairs, scores, strict=False)):
results.append(
{
"index": i,
"score": float(score),
}
)
return {
"object": "list",
"results": results,
"model": self.model_id,
}
# Handle query + documents format
query = request.get("query", "")
documents = request.get("documents", [])
top_k = request.get("top_k", len(documents))
return_documents = request.get("return_documents", True)
if not documents:
return {
"object": "list",
"results": [],
"model": self.model_id,
}
# Create query-document pairs
pairs = [[query, doc] for doc in documents]
# Get scores
scores = self.model.predict(pairs)
# Create results with indices and scores
results = []
for i, (doc, score) in enumerate(zip(documents, scores, strict=False)):
result = {
"index": i,
"score": float(score),
}
if return_documents:
result["document"] = doc
results.append(result)
# Sort by score descending
results.sort(key=lambda x: x["score"], reverse=True)
# Apply top_k
results = results[:top_k]
return {
"object": "list",
"results": results,
"model": self.model_id,
"usage": {
"total_pairs": len(pairs),
},
}
app = RerankerDeployment.bind()

124
ray_serve/serve_tts.py Normal file
View File

@@ -0,0 +1,124 @@
"""
Ray Serve deployment for Coqui TTS.
Runs on: elminster (RTX 2070 8GB, CUDA)
"""
import base64
import io
import os
from typing import Any
from ray import serve
@serve.deployment(name="TTSDeployment", num_replicas=1)
class TTSDeployment:
def __init__(self):
import torch
from TTS.api import TTS
self.model_name = os.environ.get("MODEL_NAME", "tts_models/en/ljspeech/tacotron2-DDC")
# Detect device
self.use_gpu = torch.cuda.is_available()
print(f"Loading TTS model: {self.model_name}")
print(f"Using GPU: {self.use_gpu}")
self.tts = TTS(model_name=self.model_name, progress_bar=False)
if self.use_gpu:
self.tts = self.tts.to("cuda")
print("TTS model loaded successfully")
async def __call__(self, request: dict[str, Any]) -> dict[str, Any]:
"""
Handle text-to-speech requests.
Expected request format:
{
"text": "Text to synthesize",
"speaker": "speaker_name",
"language": "en",
"speed": 1.0,
"output_format": "wav",
"return_base64": true
}
"""
import numpy as np
from scipy.io import wavfile
text = request.get("text", "")
speaker = request.get("speaker")
language = request.get("language")
speed = request.get("speed", 1.0)
output_format = request.get("output_format", "wav")
return_base64 = request.get("return_base64", True)
if not text:
return {"error": "No text provided"}
# Generate speech
try:
# TTS.tts returns a numpy array of audio samples
wav = self.tts.tts(
text=text,
speaker=speaker,
language=language,
speed=speed,
)
# Convert to numpy array if needed
if not isinstance(wav, np.ndarray):
wav = np.array(wav)
# Normalize to int16
wav_int16 = (wav * 32767).astype(np.int16)
# Get sample rate from model config
sample_rate = (
self.tts.synthesizer.output_sample_rate
if hasattr(self.tts, "synthesizer")
else 22050
)
# Write to buffer
buffer = io.BytesIO()
wavfile.write(buffer, sample_rate, wav_int16)
audio_bytes = buffer.getvalue()
response = {
"model": self.model_name,
"sample_rate": sample_rate,
"duration": len(wav) / sample_rate,
"format": output_format,
}
if return_base64:
response["audio"] = base64.b64encode(audio_bytes).decode("utf-8")
else:
response["audio_bytes"] = audio_bytes
return response
except Exception as e:
return {
"error": str(e),
"model": self.model_name,
}
def list_speakers(self) -> dict[str, Any]:
"""List available speakers for multi-speaker models."""
speakers = []
if hasattr(self.tts, "speakers") and self.tts.speakers:
speakers = self.tts.speakers
return {
"model": self.model_name,
"speakers": speakers,
"is_multi_speaker": len(speakers) > 0,
}
app = TTSDeployment.bind()

142
ray_serve/serve_whisper.py Normal file
View File

@@ -0,0 +1,142 @@
"""
Ray Serve deployment for faster-whisper STT.
Runs on: elminster (RTX 2070 8GB, CUDA)
"""
import base64
import io
import os
from typing import Any
from ray import serve
@serve.deployment(name="WhisperDeployment", num_replicas=1)
class WhisperDeployment:
def __init__(self):
import torch
from faster_whisper import WhisperModel
self.model_size = os.environ.get("MODEL_SIZE", "large-v3")
# Detect device and compute type
if torch.cuda.is_available():
self.device = "cuda"
self.compute_type = "float16"
else:
self.device = "cpu"
self.compute_type = "int8"
print(f"Loading Whisper model: {self.model_size}")
print(f"Using device: {self.device}, compute_type: {self.compute_type}")
self.model = WhisperModel(
self.model_size,
device=self.device,
compute_type=self.compute_type,
)
print("Whisper model loaded successfully")
async def __call__(self, request: dict[str, Any]) -> dict[str, Any]:
"""
Handle transcription requests.
Expected request format:
{
"audio": "base64_encoded_audio_data",
"audio_format": "wav",
"language": "en",
"task": "transcribe",
"response_format": "json",
"word_timestamps": false
}
Alternative with file path:
{
"file": "/path/to/audio.wav",
...
}
"""
language = request.get("language")
task = request.get("task", "transcribe") # transcribe or translate
response_format = request.get("response_format", "json")
word_timestamps = request.get("word_timestamps", False)
# Get audio data
audio_input = None
if "audio" in request:
# Base64 encoded audio
audio_bytes = base64.b64decode(request["audio"])
audio_input = io.BytesIO(audio_bytes)
elif "file" in request:
# File path
audio_input = request["file"]
elif "audio_bytes" in request:
# Raw bytes
audio_input = io.BytesIO(request["audio_bytes"])
else:
return {
"error": "No audio data provided. Use 'audio' (base64), 'file' (path), or 'audio_bytes'",
}
# Transcribe
segments, info = self.model.transcribe(
audio_input,
language=language,
task=task,
word_timestamps=word_timestamps,
vad_filter=True,
)
# Collect segments
segment_list = []
full_text = ""
for segment in segments:
seg_data = {
"id": segment.id,
"start": segment.start,
"end": segment.end,
"text": segment.text,
}
if word_timestamps and segment.words:
seg_data["words"] = [
{
"word": word.word,
"start": word.start,
"end": word.end,
"probability": word.probability,
}
for word in segment.words
]
segment_list.append(seg_data)
full_text += segment.text
# Build response based on format
if response_format == "text":
return {"text": full_text.strip()}
if response_format == "verbose_json":
return {
"task": task,
"language": info.language,
"duration": info.duration,
"text": full_text.strip(),
"segments": segment_list,
}
# Default JSON format (OpenAI-compatible)
return {
"text": full_text.strip(),
"language": info.language,
"duration": info.duration,
"model": self.model_size,
}
app = WhisperDeployment.bind()