Some checks failed
CI / Lint (push) Failing after 3m2s
CI / Test (push) Successful in 3m44s
CI / Release (push) Has been skipped
CI / Notify Downstream (chat-handler) (push) Has been skipped
CI / Notify Downstream (pipeline-bridge) (push) Has been skipped
CI / Notify Downstream (stt-module) (push) Has been skipped
CI / Notify Downstream (tts-module) (push) Has been skipped
CI / Notify Downstream (voice-assistant) (push) Has been skipped
CI / Notify (push) Successful in 1s
BREAKING CHANGE: All NATS message serialization now uses Protocol Buffers. - Added proto/messages/v1/messages.proto with 22 message types - Generated Go code at gen/messagespb/ - messages/ package now exports type aliases to proto types - natsutil.Publish/Request/Decode use proto.Marshal/Unmarshal - Removed legacy MessageHandler, OnMessage, wrapMapHandler - TypedMessageHandler now returns (proto.Message, error) - EffectiveQuery is now a free function: messages.EffectiveQuery(req) - Removed msgpack dependency entirely
280 lines
8.5 KiB
Go
280 lines
8.5 KiB
Go
package handler
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"git.daviestechlabs.io/daviestechlabs/handler-base/config"
|
|
pb "git.daviestechlabs.io/daviestechlabs/handler-base/gen/messagespb"
|
|
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
|
)
|
|
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
// Handler construction tests
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
|
|
func TestNewHandler(t *testing.T) {
|
|
cfg := config.Load()
|
|
cfg.ServiceName = "test-handler"
|
|
cfg.NATSQueueGroup = "test-group"
|
|
|
|
h := New("ai.test.subject", cfg)
|
|
if h.Subject != "ai.test.subject" {
|
|
t.Errorf("Subject = %q", h.Subject)
|
|
}
|
|
if h.QueueGroup != "test-group" {
|
|
t.Errorf("QueueGroup = %q", h.QueueGroup)
|
|
}
|
|
if h.Settings.ServiceName != "test-handler" {
|
|
t.Errorf("ServiceName = %q", h.Settings.ServiceName)
|
|
}
|
|
}
|
|
|
|
func TestNewHandlerNilSettings(t *testing.T) {
|
|
h := New("ai.test", nil)
|
|
if h.Settings == nil {
|
|
t.Fatal("Settings should be loaded automatically")
|
|
}
|
|
if h.Settings.ServiceName != "handler" {
|
|
t.Errorf("ServiceName = %q, want default", h.Settings.ServiceName)
|
|
}
|
|
}
|
|
|
|
func TestCallbackRegistration(t *testing.T) {
|
|
cfg := config.Load()
|
|
h := New("ai.test", cfg)
|
|
|
|
setupCalled := false
|
|
h.OnSetup(func(ctx context.Context) error {
|
|
setupCalled = true
|
|
return nil
|
|
})
|
|
|
|
teardownCalled := false
|
|
h.OnTeardown(func(ctx context.Context) error {
|
|
teardownCalled = true
|
|
return nil
|
|
})
|
|
|
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
|
return nil, nil
|
|
})
|
|
|
|
if h.onSetup == nil || h.onTeardown == nil || h.onTypedMessage == nil {
|
|
t.Error("callbacks should not be nil after registration")
|
|
}
|
|
|
|
// Verify setup/teardown work when called directly.
|
|
_ = h.onSetup(context.Background())
|
|
_ = h.onTeardown(context.Background())
|
|
if !setupCalled || !teardownCalled {
|
|
t.Error("callbacks should have been invoked")
|
|
}
|
|
}
|
|
|
|
func TestTypedMessageRegistration(t *testing.T) {
|
|
cfg := config.Load()
|
|
h := New("ai.test", cfg)
|
|
|
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
|
return &pb.ChatResponse{Response: "ok"}, nil
|
|
})
|
|
|
|
if h.onTypedMessage == nil {
|
|
t.Error("onTypedMessage should not be nil after registration")
|
|
}
|
|
}
|
|
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
// wrapHandler dispatch tests (unit test the message decode + dispatch logic)
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
|
|
func TestWrapHandler_ValidMessage(t *testing.T) {
|
|
cfg := config.Load()
|
|
h := New("ai.test", cfg)
|
|
|
|
var receivedReq pb.ChatRequest
|
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
|
if err := natsutil.Decode(msg.Data, &receivedReq); err != nil {
|
|
return nil, err
|
|
}
|
|
return &pb.ChatResponse{Response: "ok", UserId: receivedReq.GetUserId()}, nil
|
|
})
|
|
|
|
// Encode a message the same way services would.
|
|
encoded, err := proto.Marshal(&pb.ChatRequest{
|
|
RequestId: "test-001",
|
|
Message: "hello",
|
|
Premium: true,
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Call wrapHandler directly without NATS.
|
|
handler := h.wrapHandler(context.Background())
|
|
handler(&nats.Msg{
|
|
Subject: "ai.test.user.42.message",
|
|
Data: encoded,
|
|
})
|
|
|
|
if receivedReq.GetRequestId() != "test-001" {
|
|
t.Errorf("request_id = %v", receivedReq.GetRequestId())
|
|
}
|
|
if receivedReq.GetPremium() != true {
|
|
t.Errorf("premium = %v", receivedReq.GetPremium())
|
|
}
|
|
}
|
|
|
|
func TestWrapHandler_InvalidMessage(t *testing.T) {
|
|
cfg := config.Load()
|
|
h := New("ai.test", cfg)
|
|
|
|
handlerCalled := false
|
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
|
handlerCalled = true
|
|
var req pb.ChatRequest
|
|
if err := natsutil.Decode(msg.Data, &req); err != nil {
|
|
return nil, err
|
|
}
|
|
return &pb.ChatResponse{}, nil
|
|
})
|
|
|
|
handler := h.wrapHandler(context.Background())
|
|
handler(&nats.Msg{
|
|
Subject: "ai.test",
|
|
Data: []byte{0xFF, 0xFE, 0xFD}, // invalid protobuf
|
|
})
|
|
|
|
// The handler IS called (wrapHandler doesn't pre-decode), but it should
|
|
// return an error from Decode. Either way no panic.
|
|
_ = handlerCalled
|
|
}
|
|
|
|
func TestWrapHandler_HandlerError(t *testing.T) {
|
|
cfg := config.Load()
|
|
h := New("ai.test", cfg)
|
|
|
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
|
return nil, context.DeadlineExceeded
|
|
})
|
|
|
|
encoded, _ := proto.Marshal(&pb.ChatRequest{RequestId: "err-test"})
|
|
handler := h.wrapHandler(context.Background())
|
|
|
|
// Should not panic even when handler returns error.
|
|
handler(&nats.Msg{
|
|
Subject: "ai.test",
|
|
Data: encoded,
|
|
})
|
|
}
|
|
|
|
func TestWrapHandler_NilResponse(t *testing.T) {
|
|
cfg := config.Load()
|
|
h := New("ai.test", cfg)
|
|
|
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
|
return nil, nil // fire-and-forget style
|
|
})
|
|
|
|
encoded, _ := proto.Marshal(&pb.ChatRequest{RequestId: "nil-resp"})
|
|
handler := h.wrapHandler(context.Background())
|
|
|
|
// Should not panic with nil response and no reply subject.
|
|
handler(&nats.Msg{
|
|
Subject: "ai.test",
|
|
Data: encoded,
|
|
})
|
|
}
|
|
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
// wrapHandler dispatch tests — typed handler path
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
|
|
func TestWrapHandler_Typed(t *testing.T) {
|
|
cfg := config.Load()
|
|
h := New("ai.test", cfg)
|
|
|
|
var received pb.ChatRequest
|
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
|
if err := natsutil.Decode(msg.Data, &received); err != nil {
|
|
return nil, err
|
|
}
|
|
return &pb.ChatResponse{UserId: received.GetUserId(), Response: "ok"}, nil
|
|
})
|
|
|
|
encoded, _ := proto.Marshal(&pb.ChatRequest{
|
|
RequestId: "typed-001",
|
|
Message: "hello typed",
|
|
})
|
|
|
|
handler := h.wrapHandler(context.Background())
|
|
handler(&nats.Msg{Subject: "ai.test", Data: encoded})
|
|
|
|
if received.GetRequestId() != "typed-001" {
|
|
t.Errorf("RequestId = %q", received.GetRequestId())
|
|
}
|
|
if received.GetMessage() != "hello typed" {
|
|
t.Errorf("Message = %q", received.GetMessage())
|
|
}
|
|
}
|
|
|
|
func TestWrapHandler_TypedError(t *testing.T) {
|
|
cfg := config.Load()
|
|
h := New("ai.test", cfg)
|
|
|
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
|
return nil, context.DeadlineExceeded
|
|
})
|
|
|
|
encoded, _ := proto.Marshal(&pb.ChatRequest{RequestId: "err"})
|
|
handler := h.wrapHandler(context.Background())
|
|
|
|
// Should not panic.
|
|
handler(&nats.Msg{Subject: "ai.test", Data: encoded})
|
|
}
|
|
|
|
func TestWrapHandler_TypedNilResponse(t *testing.T) {
|
|
cfg := config.Load()
|
|
h := New("ai.test", cfg)
|
|
|
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
|
return nil, nil
|
|
})
|
|
|
|
encoded, _ := proto.Marshal(&pb.ChatRequest{RequestId: "nil"})
|
|
handler := h.wrapHandler(context.Background())
|
|
handler(&nats.Msg{Subject: "ai.test", Data: encoded})
|
|
}
|
|
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
// Benchmark: message decode + dispatch overhead
|
|
// ────────────────────────────────────────────────────────────────────────────
|
|
|
|
func BenchmarkWrapHandler(b *testing.B) {
|
|
cfg := config.Load()
|
|
h := New("ai.test", cfg)
|
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
|
var req pb.ChatRequest
|
|
_ = natsutil.Decode(msg.Data, &req)
|
|
return &pb.ChatResponse{Response: "ok"}, nil
|
|
})
|
|
|
|
encoded, _ := proto.Marshal(&pb.ChatRequest{
|
|
RequestId: "bench-001",
|
|
Message: "What is the capital of France?",
|
|
Premium: true,
|
|
TopK: 10,
|
|
})
|
|
handler := h.wrapHandler(context.Background())
|
|
msg := &nats.Msg{Subject: "ai.test", Data: encoded}
|
|
|
|
b.ResetTimer()
|
|
for b.Loop() {
|
|
handler(msg)
|
|
}
|
|
}
|