Files
handler-base/handler_base/health.py
Billy D. dbf1a93141
All checks were successful
CI / Lint (push) Successful in 52s
CI / Test (push) Successful in 1m1s
CI / Release (push) Successful in 5s
CI / Notify (push) Successful in 1s
fix: auto-fix ruff linting errors and remove unsupported upload-artifact
2026-02-02 08:34:00 -05:00

126 lines
3.8 KiB
Python

"""
HTTP health check server.
Provides /health and /ready endpoints for Kubernetes probes.
"""
import asyncio
import json
import logging
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Awaitable, Callable, Optional
from handler_base.config import Settings
logger = logging.getLogger(__name__)
class HealthHandler(BaseHTTPRequestHandler):
"""HTTP request handler for health checks."""
# Class-level state
ready_check: Optional[Callable[[], Awaitable[bool]]] = None
health_path: str = "/health"
ready_path: str = "/ready"
def log_message(self, format, *args):
"""Suppress default logging."""
pass
def do_GET(self):
"""Handle GET requests for health/ready endpoints."""
if self.path == self.health_path:
self._respond_ok({"status": "healthy"})
elif self.path == self.ready_path:
self._handle_ready()
else:
self._respond_not_found()
def _handle_ready(self):
"""Check readiness and respond."""
# Access via class to avoid method binding issues
ready_check = HealthHandler.ready_check
if ready_check is None:
self._respond_ok({"status": "ready"})
return
try:
# Run the async check in a new event loop
loop = asyncio.new_event_loop()
try:
is_ready = loop.run_until_complete(ready_check())
finally:
loop.close()
if is_ready:
self._respond_ok({"status": "ready"})
else:
self._respond_unavailable({"status": "not ready"})
except Exception as e:
logger.exception("Readiness check failed")
self._respond_unavailable({"status": "error", "message": str(e)})
def _respond_ok(self, data: dict):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def _respond_unavailable(self, data: dict):
self.send_response(503)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def _respond_not_found(self):
self.send_response(404)
self.end_headers()
class HealthServer:
"""
Background HTTP server for health checks.
Usage:
server = HealthServer(settings)
server.start()
# ... run your service ...
server.stop()
"""
def __init__(
self,
settings: Optional[Settings] = None,
ready_check: Optional[Callable[[], Awaitable[bool]]] = None,
):
self.settings = settings or Settings()
self.ready_check = ready_check
self._server: Optional[HTTPServer] = None
self._thread: Optional[threading.Thread] = None
def start(self) -> None:
"""Start the health check server in a background thread."""
# Configure handler class
HealthHandler.ready_check = self.ready_check
HealthHandler.health_path = self.settings.health_path
HealthHandler.ready_path = self.settings.ready_path
# Create and start server
self._server = HTTPServer(("0.0.0.0", self.settings.health_port), HealthHandler)
self._thread = threading.Thread(target=self._server.serve_forever, daemon=True)
self._thread.start()
logger.info(
f"Health server started on port {self.settings.health_port} "
f"(health: {self.settings.health_path}, ready: {self.settings.ready_path})"
)
def stop(self) -> None:
"""Stop the health check server."""
if self._server:
self._server.shutdown()
self._server = None
self._thread = None
logger.info("Health server stopped")