""" Ray Serve deployment for sentence-transformers CrossEncoder reranking. Runs on: drizzt (Radeon 680M iGPU, ROCm) or danilo (Intel i915 iGPU, OpenVINO/IPEX) """ import os import time from typing import Any from ray import serve try: from ray_serve.mlflow_logger import InferenceLogger except ImportError: InferenceLogger = None @serve.deployment(name="RerankerDeployment", num_replicas=1) class RerankerDeployment: def __init__(self): import torch from sentence_transformers import CrossEncoder self.model_id = os.environ.get("MODEL_ID", "BAAI/bge-reranker-v2-m3") self.use_ipex = False self.device = "cpu" # Detect device - check for Intel GPU first via IPEX try: import intel_extension_for_pytorch as ipex self.use_ipex = True if hasattr(torch, "xpu") and torch.xpu.is_available(): self.device = "xpu" print("Intel GPU detected via IPEX, using XPU device") else: print("IPEX available, will use CPU optimization") except ImportError: print("IPEX not available, checking for other GPUs") # Check for CUDA/ROCm if not using Intel if not self.use_ipex: if torch.cuda.is_available(): self.device = "cuda" print("Using CUDA/ROCm device") else: print("No GPU detected, using CPU") print(f"Loading reranker model: {self.model_id}") print(f"Using device: {self.device}") # Load model self.model = CrossEncoder(self.model_id, device=self.device) # Apply IPEX optimization if available if self.use_ipex and self.device == "cpu": try: import intel_extension_for_pytorch as ipex self.model.model = ipex.optimize(self.model.model) print("IPEX CPU optimization applied") except Exception as e: print(f"IPEX optimization failed: {e}") print("Reranker model loaded successfully") # MLflow metrics if InferenceLogger is not None: self._mlflow = InferenceLogger( experiment_name="ray-serve-reranker", run_name=f"reranker-{self.model_id.split('/')[-1]}", tags={"model.name": self.model_id, "model.framework": "sentence-transformers", "device": self.device}, flush_every=10, ) self._mlflow.initialize( params={"model_id": self.model_id, "device": self.device, "use_ipex": str(self.use_ipex)} ) else: self._mlflow = None async def __call__(self, request: dict[str, Any]) -> dict[str, Any]: """ Handle reranking requests. Expected request format: { "query": "search query", "documents": ["doc1", "doc2", "doc3"], "top_k": 3, "return_documents": true } Alternative format (pairs): { "pairs": [["query", "doc1"], ["query", "doc2"]] } """ _start = time.time() # Handle pairs format if "pairs" in request: pairs = request["pairs"] scores = self.model.predict(pairs) results = [] for i, (_pair, score) in enumerate(zip(pairs, scores, strict=False)): results.append( { "index": i, "score": float(score), } ) if self._mlflow: self._mlflow.log_request( latency_s=time.time() - _start, num_pairs=len(pairs), ) return { "object": "list", "results": results, "model": self.model_id, } # Handle query + documents format query = request.get("query", "") documents = request.get("documents", []) top_k = request.get("top_k", len(documents)) return_documents = request.get("return_documents", True) if not documents: return { "object": "list", "results": [], "model": self.model_id, } # Create query-document pairs pairs = [[query, doc] for doc in documents] # Get scores scores = self.model.predict(pairs) # Create results with indices and scores results = [] for i, (doc, score) in enumerate(zip(documents, scores, strict=False)): result = { "index": i, "score": float(score), } if return_documents: result["document"] = doc results.append(result) # Sort by score descending results.sort(key=lambda x: x["score"], reverse=True) # Apply top_k results = results[:top_k] # Log to MLflow if self._mlflow: self._mlflow.log_request( latency_s=time.time() - _start, num_pairs=len(pairs), num_documents=len(documents), top_k=top_k, ) return { "object": "list", "results": results, "model": self.model_id, "usage": { "total_pairs": len(pairs), }, } app = RerankerDeployment.bind()