Files
handler-base/tests/unit/test_health.py
Billy D. dbf1a93141
All checks were successful
CI / Lint (push) Successful in 52s
CI / Test (push) Successful in 1m1s
CI / Release (push) Successful in 5s
CI / Notify (push) Successful in 1s
fix: auto-fix ruff linting errors and remove unsupported upload-artifact
2026-02-02 08:34:00 -05:00

123 lines
3.6 KiB
Python

"""
Unit tests for handler_base.health module.
"""
import json
import time
from http.client import HTTPConnection
import pytest
class TestHealthServer:
"""Tests for HealthServer."""
@pytest.fixture
def health_server(self, settings):
"""Create a HealthServer instance."""
from handler_base.health import HealthServer
# Use a random high port to avoid conflicts
settings.health_port = 18080
return HealthServer(settings)
def test_start_stop(self, health_server):
"""Test starting and stopping the health server."""
health_server.start()
time.sleep(0.1) # Give server time to start
# Verify server is running
assert health_server._server is not None
assert health_server._thread is not None
assert health_server._thread.is_alive()
health_server.stop()
time.sleep(0.1)
assert health_server._server is None
def test_health_endpoint(self, health_server):
"""Test the /health endpoint."""
health_server.start()
time.sleep(0.1)
try:
conn = HTTPConnection("localhost", 18080, timeout=5)
conn.request("GET", "/health")
response = conn.getresponse()
assert response.status == 200
data = json.loads(response.read().decode())
assert data["status"] == "healthy"
finally:
conn.close()
health_server.stop()
def test_ready_endpoint_default(self, health_server):
"""Test the /ready endpoint with no custom check."""
health_server.start()
time.sleep(0.1)
try:
conn = HTTPConnection("localhost", 18080, timeout=5)
conn.request("GET", "/ready")
response = conn.getresponse()
assert response.status == 200
data = json.loads(response.read().decode())
assert data["status"] == "ready"
finally:
conn.close()
health_server.stop()
def test_ready_endpoint_with_check(self, settings):
"""Test /ready endpoint with custom readiness check."""
from handler_base.health import HealthServer
ready_flag = [False] # Use list to allow mutation in closure
async def check_ready():
return ready_flag[0]
settings.health_port = 18081
server = HealthServer(settings, ready_check=check_ready)
server.start()
time.sleep(0.2)
try:
conn = HTTPConnection("localhost", 18081, timeout=5)
# Should be not ready initially
conn.request("GET", "/ready")
response = conn.getresponse()
response.read() # Consume response body
assert response.status == 503
# Mark as ready
ready_flag[0] = True
# Need new connection after consuming response
conn.close()
conn = HTTPConnection("localhost", 18081, timeout=5)
conn.request("GET", "/ready")
response = conn.getresponse()
assert response.status == 200
finally:
conn.close()
server.stop()
def test_404_for_unknown_path(self, health_server):
"""Test that unknown paths return 404."""
health_server.start()
time.sleep(0.1)
try:
conn = HTTPConnection("localhost", 18080, timeout=5)
conn.request("GET", "/unknown")
response = conn.getresponse()
assert response.status == 404
finally:
conn.close()
health_server.stop()