""" Ray integration utilities for handler-base clients. When running inside a Ray cluster, clients can use Ray Serve handles for faster internal communication (gRPC instead of HTTP). """ import logging from typing import Any, Optional logger = logging.getLogger(__name__) # Ray handle cache to avoid repeated lookups _ray_handles: dict[str, Any] = {} _ray_available: Optional[bool] = None def is_ray_available() -> bool: """Check if we're running inside a Ray cluster.""" global _ray_available if _ray_available is not None: return _ray_available try: import ray _ray_available = ray.is_initialized() if _ray_available: logger.info("Ray detected - will use Ray handles for internal calls") return _ray_available except ImportError: _ray_available = False return False def get_ray_handle(deployment_name: str, app_name: str) -> Optional[Any]: """ Get a Ray Serve deployment handle for internal calls. Args: deployment_name: Name of the Ray Serve deployment app_name: Name of the Ray Serve application Returns: DeploymentHandle if available, None otherwise """ if not is_ray_available(): return None cache_key = f"{app_name}/{deployment_name}" if cache_key in _ray_handles: return _ray_handles[cache_key] try: from ray import serve handle = serve.get_deployment_handle(deployment_name, app_name=app_name) _ray_handles[cache_key] = handle logger.debug(f"Got Ray handle for {cache_key}") return handle except Exception as e: logger.debug(f"Could not get Ray handle for {cache_key}: {e}") return None def clear_ray_handles() -> None: """Clear cached Ray handles (useful for testing).""" global _ray_handles, _ray_available _ray_handles.clear() _ray_available = None