- tests/conftest.py: Pytest fixtures and configuration - tests/unit/test_config.py: Settings tests - tests/unit/test_nats_client.py: NATS client tests - tests/unit/test_health.py: Health server tests - tests/unit/test_clients.py: Service client tests - pytest.ini: Pytest configuration
92 lines
2.9 KiB
Python
92 lines
2.9 KiB
Python
"""
|
|
Unit tests for handler_base.nats_client module.
|
|
"""
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
import msgpack
|
|
|
|
|
|
class TestNATSClient:
|
|
"""Tests for NATSClient."""
|
|
|
|
@pytest.fixture
|
|
def nats_client(self, settings):
|
|
"""Create a NATSClient instance."""
|
|
from handler_base.nats_client import NATSClient
|
|
return NATSClient(settings)
|
|
|
|
def test_init(self, nats_client, settings):
|
|
"""Test NATSClient initialization."""
|
|
assert nats_client.settings == settings
|
|
assert nats_client._nc is None
|
|
assert nats_client._js is None
|
|
|
|
def test_decode_msgpack(self, nats_client):
|
|
"""Test msgpack decoding."""
|
|
data = {"query": "hello", "request_id": "123"}
|
|
encoded = msgpack.packb(data, use_bin_type=True)
|
|
|
|
msg = MagicMock()
|
|
msg.data = encoded
|
|
|
|
result = nats_client.decode_msgpack(msg)
|
|
assert result == data
|
|
|
|
def test_decode_json(self, nats_client):
|
|
"""Test JSON decoding."""
|
|
import json
|
|
data = {"query": "hello"}
|
|
|
|
msg = MagicMock()
|
|
msg.data = json.dumps(data).encode()
|
|
|
|
result = nats_client.decode_json(msg)
|
|
assert result == data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect(self, nats_client):
|
|
"""Test NATS connection."""
|
|
with patch("handler_base.nats_client.nats") as mock_nats:
|
|
mock_nc = AsyncMock()
|
|
mock_js = MagicMock()
|
|
mock_nc.jetstream = MagicMock(return_value=mock_js) # Not async
|
|
mock_nats.connect = AsyncMock(return_value=mock_nc)
|
|
|
|
await nats_client.connect()
|
|
|
|
assert nats_client._nc == mock_nc
|
|
assert nats_client._js == mock_js
|
|
mock_nats.connect.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_publish(self, nats_client):
|
|
"""Test publishing a message."""
|
|
mock_nc = AsyncMock()
|
|
nats_client._nc = mock_nc
|
|
|
|
data = {"key": "value"}
|
|
await nats_client.publish("test.subject", data)
|
|
|
|
mock_nc.publish.assert_called_once()
|
|
call_args = mock_nc.publish.call_args
|
|
assert call_args.args[0] == "test.subject"
|
|
|
|
# Verify msgpack encoding
|
|
decoded = msgpack.unpackb(call_args.args[1], raw=False)
|
|
assert decoded == data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subscribe(self, nats_client):
|
|
"""Test subscribing to a subject."""
|
|
mock_nc = AsyncMock()
|
|
mock_sub = MagicMock()
|
|
mock_nc.subscribe = AsyncMock(return_value=mock_sub)
|
|
nats_client._nc = mock_nc
|
|
|
|
handler = AsyncMock()
|
|
await nats_client.subscribe("test.subject", handler, queue="test-queue")
|
|
|
|
mock_nc.subscribe.assert_called_once()
|
|
call_kwargs = mock_nc.subscribe.call_args.kwargs
|
|
assert call_kwargs["queue"] == "test-queue"
|