feat!: replace msgpack with protobuf for all NATS messages
Some checks failed
CI / Lint (push) Failing after 3m2s
CI / Test (push) Successful in 3m44s
CI / Release (push) Has been skipped
CI / Notify Downstream (chat-handler) (push) Has been skipped
CI / Notify Downstream (pipeline-bridge) (push) Has been skipped
CI / Notify Downstream (stt-module) (push) Has been skipped
CI / Notify Downstream (tts-module) (push) Has been skipped
CI / Notify Downstream (voice-assistant) (push) Has been skipped
CI / Notify (push) Successful in 1s

BREAKING CHANGE: All NATS message serialization now uses Protocol Buffers.
- Added proto/messages/v1/messages.proto with 22 message types
- Generated Go code at gen/messagespb/
- messages/ package now exports type aliases to proto types
- natsutil.Publish/Request/Decode use proto.Marshal/Unmarshal
- Removed legacy MessageHandler, OnMessage, wrapMapHandler
- TypedMessageHandler now returns (proto.Message, error)
- EffectiveQuery is now a free function: messages.EffectiveQuery(req)
- Removed msgpack dependency entirely
This commit is contained in:
2026-02-21 14:58:05 -05:00
parent 3585d81ff5
commit 13ef1df109
12 changed files with 3074 additions and 1293 deletions

View File

@@ -1,70 +1,70 @@
// Package natsutil provides a NATS/JetStream client with msgpack serialization.
// Package natsutil provides a NATS/JetStream client with protobuf serialization.
package natsutil
import (
"fmt"
"log/slog"
"time"
"fmt"
"log/slog"
"time"
"github.com/nats-io/nats.go"
"github.com/vmihailenco/msgpack/v5"
"github.com/nats-io/nats.go"
"google.golang.org/protobuf/proto"
)
// Client wraps a NATS connection with msgpack helpers.
// Client wraps a NATS connection with protobuf helpers.
type Client struct {
nc *nats.Conn
js nats.JetStreamContext
subs []*nats.Subscription
url string
opts []nats.Option
nc *nats.Conn
js nats.JetStreamContext
subs []*nats.Subscription
url string
opts []nats.Option
}
// New creates a NATS client configured to connect to the given URL.
// Optional NATS options (e.g. credentials) can be appended.
func New(url string, opts ...nats.Option) *Client {
defaults := []nats.Option{
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(-1),
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
slog.Warn("NATS disconnected", "error", err)
}),
nats.ReconnectHandler(func(_ *nats.Conn) {
slog.Info("NATS reconnected")
}),
}
return &Client{
url: url,
opts: append(defaults, opts...),
}
defaults := []nats.Option{
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(-1),
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
slog.Warn("NATS disconnected", "error", err)
}),
nats.ReconnectHandler(func(_ *nats.Conn) {
slog.Info("NATS reconnected")
}),
}
return &Client{
url: url,
opts: append(defaults, opts...),
}
}
// Connect establishes the NATS connection and JetStream context.
func (c *Client) Connect() error {
nc, err := nats.Connect(c.url, c.opts...)
if err != nil {
return fmt.Errorf("nats connect: %w", err)
}
js, err := nc.JetStream()
if err != nil {
nc.Close()
return fmt.Errorf("jetstream: %w", err)
}
c.nc = nc
c.js = js
slog.Info("connected to NATS", "url", c.url)
return nil
nc, err := nats.Connect(c.url, c.opts...)
if err != nil {
return fmt.Errorf("nats connect: %w", err)
}
js, err := nc.JetStream()
if err != nil {
nc.Close()
return fmt.Errorf("jetstream: %w", err)
}
c.nc = nc
c.js = js
slog.Info("connected to NATS", "url", c.url)
return nil
}
// Close drains subscriptions and closes the connection.
func (c *Client) Close() {
if c.nc == nil {
return
}
for _, sub := range c.subs {
_ = sub.Drain()
}
c.nc.Close()
slog.Info("NATS connection closed")
if c.nc == nil {
return
}
for _, sub := range c.subs {
_ = sub.Drain()
}
c.nc.Close()
slog.Info("NATS connection closed")
}
// Conn returns the underlying *nats.Conn.
@@ -75,68 +75,56 @@ func (c *Client) JS() nats.JetStreamContext { return c.js }
// IsConnected returns true if the NATS connection is active.
func (c *Client) IsConnected() bool {
return c.nc != nil && c.nc.IsConnected()
return c.nc != nil && c.nc.IsConnected()
}
// Subscribe subscribes to a subject with an optional queue group.
// The handler receives the raw *nats.Msg.
func (c *Client) Subscribe(subject string, handler nats.MsgHandler, queue string) error {
var sub *nats.Subscription
var err error
if queue != "" {
sub, err = c.nc.QueueSubscribe(subject, queue, handler)
slog.Info("subscribed", "subject", subject, "queue", queue)
} else {
sub, err = c.nc.Subscribe(subject, handler)
slog.Info("subscribed", "subject", subject)
}
if err != nil {
return fmt.Errorf("subscribe %s: %w", subject, err)
}
c.subs = append(c.subs, sub)
return nil
var sub *nats.Subscription
var err error
if queue != "" {
sub, err = c.nc.QueueSubscribe(subject, queue, handler)
slog.Info("subscribed", "subject", subject, "queue", queue)
} else {
sub, err = c.nc.Subscribe(subject, handler)
slog.Info("subscribed", "subject", subject)
}
if err != nil {
return fmt.Errorf("subscribe %s: %w", subject, err)
}
c.subs = append(c.subs, sub)
return nil
}
// Publish encodes data as msgpack and publishes to the subject.
func (c *Client) Publish(subject string, data any) error {
payload, err := msgpack.Marshal(data)
if err != nil {
return fmt.Errorf("msgpack marshal: %w", err)
}
return c.nc.Publish(subject, payload)
// Publish encodes data as protobuf and publishes to the subject.
func (c *Client) Publish(subject string, data proto.Message) error {
payload, err := proto.Marshal(data)
if err != nil {
return fmt.Errorf("proto marshal: %w", err)
}
return c.nc.Publish(subject, payload)
}
// Request sends a msgpack-encoded request and decodes the response into result.
func (c *Client) Request(subject string, data any, result any, timeout time.Duration) error {
payload, err := msgpack.Marshal(data)
if err != nil {
return fmt.Errorf("msgpack marshal: %w", err)
}
msg, err := c.nc.Request(subject, payload, timeout)
if err != nil {
return fmt.Errorf("nats request: %w", err)
}
return msgpack.Unmarshal(msg.Data, result)
// PublishRaw publishes pre-encoded bytes to the subject.
func (c *Client) PublishRaw(subject string, data []byte) error {
return c.nc.Publish(subject, data)
}
// DecodeMsgpack decodes msgpack-encoded NATS message data into dest.
func DecodeMsgpack(msg *nats.Msg, dest any) error {
return msgpack.Unmarshal(msg.Data, dest)
// Request sends a protobuf-encoded request and decodes the response into result.
func (c *Client) Request(subject string, data proto.Message, result proto.Message, timeout time.Duration) error {
payload, err := proto.Marshal(data)
if err != nil {
return fmt.Errorf("proto marshal: %w", err)
}
msg, err := c.nc.Request(subject, payload, timeout)
if err != nil {
return fmt.Errorf("nats request: %w", err)
}
return proto.Unmarshal(msg.Data, result)
}
// Decode is a generic helper that unmarshals msgpack bytes into T.
// Usage: req, err := natsutil.Decode[messages.ChatRequest](msg.Data)
func Decode[T any](data []byte) (T, error) {
var v T
err := msgpack.Unmarshal(data, &v)
return v, err
}
// DecodeMsgpackMap decodes msgpack data into a generic map.
func DecodeMsgpackMap(data []byte) (map[string]any, error) {
var m map[string]any
if err := msgpack.Unmarshal(data, &m); err != nil {
return nil, err
}
return m, nil
// Decode unmarshals protobuf bytes into dest.
func Decode(data []byte, dest proto.Message) error {
return proto.Unmarshal(data, dest)
}

View File

@@ -3,254 +3,212 @@ package natsutil
import (
"testing"
"github.com/vmihailenco/msgpack/v5"
"google.golang.org/protobuf/proto"
pb "git.daviestechlabs.io/daviestechlabs/handler-base/gen/messagespb"
)
// ────────────────────────────────────────────────────────────────────────────
// DecodeMsgpackMap tests
// Decode tests
// ────────────────────────────────────────────────────────────────────────────
func TestDecodeMsgpackMap_Roundtrip(t *testing.T) {
orig := map[string]any{
"request_id": "req-001",
"user_id": "user-42",
"premium": true,
"top_k": int64(10), // msgpack decodes ints as int64
func TestDecode_ChatRequest_Roundtrip(t *testing.T) {
orig := &pb.ChatRequest{
RequestId: "req-001",
UserId: "user-42",
Premium: true,
TopK: 10,
}
data, err := msgpack.Marshal(orig)
data, err := proto.Marshal(orig)
if err != nil {
t.Fatal(err)
}
decoded, err := DecodeMsgpackMap(data)
if err != nil {
var decoded pb.ChatRequest
if err := Decode(data, &decoded); err != nil {
t.Fatal(err)
}
if decoded["request_id"] != "req-001" {
t.Errorf("request_id = %v", decoded["request_id"])
if decoded.GetRequestId() != "req-001" {
t.Errorf("RequestId = %v", decoded.GetRequestId())
}
if decoded["premium"] != true {
t.Errorf("premium = %v", decoded["premium"])
if decoded.GetUserId() != "user-42" {
t.Errorf("UserId = %v", decoded.GetUserId())
}
if decoded.GetPremium() != true {
t.Errorf("Premium = %v", decoded.GetPremium())
}
if decoded.GetTopK() != 10 {
t.Errorf("TopK = %v", decoded.GetTopK())
}
}
func TestDecodeMsgpackMap_Empty(t *testing.T) {
data, _ := msgpack.Marshal(map[string]any{})
m, err := DecodeMsgpackMap(data)
func TestDecode_EmptyMessage(t *testing.T) {
data, err := proto.Marshal(&pb.ChatRequest{})
if err != nil {
t.Fatal(err)
}
if len(m) != 0 {
t.Errorf("expected empty map, got %v", m)
var decoded pb.ChatRequest
if err := Decode(data, &decoded); err != nil {
t.Fatal(err)
}
if decoded.GetRequestId() != "" {
t.Errorf("expected empty RequestId, got %q", decoded.GetRequestId())
}
}
func TestDecodeMsgpackMap_InvalidData(t *testing.T) {
_, err := DecodeMsgpackMap([]byte{0xFF, 0xFE})
func TestDecode_InvalidData(t *testing.T) {
err := Decode([]byte{0xFF, 0xFE}, &pb.ChatRequest{})
if err == nil {
t.Error("expected error for invalid msgpack data")
t.Error("expected error for invalid protobuf data")
}
}
// ────────────────────────────────────────────────────────────────────────────
// DecodeMsgpack (typed struct) tests
// Typed struct roundtrip tests
// ────────────────────────────────────────────────────────────────────────────
type testMessage struct {
RequestID string `msgpack:"request_id"`
UserID string `msgpack:"user_id"`
Count int `msgpack:"count"`
Active bool `msgpack:"active"`
}
func TestDecodeMsgpackTyped_Roundtrip(t *testing.T) {
orig := testMessage{
RequestID: "req-typed-001",
UserID: "user-7",
Count: 42,
Active: true,
func TestDecode_VoiceResponse_Roundtrip(t *testing.T) {
orig := &pb.VoiceResponse{
RequestId: "vr-001",
Response: "The capital of France is Paris.",
Transcription: "What is the capital of France?",
}
data, err := msgpack.Marshal(orig)
data, err := proto.Marshal(orig)
if err != nil {
t.Fatal(err)
}
// Simulate nats.Msg data decoding.
var decoded testMessage
if err := msgpack.Unmarshal(data, &decoded); err != nil {
var decoded pb.VoiceResponse
if err := Decode(data, &decoded); err != nil {
t.Fatal(err)
}
if decoded.RequestID != orig.RequestID {
t.Errorf("RequestID = %q, want %q", decoded.RequestID, orig.RequestID)
if decoded.GetRequestId() != orig.GetRequestId() {
t.Errorf("RequestId = %q, want %q", decoded.GetRequestId(), orig.GetRequestId())
}
if decoded.Count != orig.Count {
t.Errorf("Count = %d, want %d", decoded.Count, orig.Count)
if decoded.GetResponse() != orig.GetResponse() {
t.Errorf("Response = %q, want %q", decoded.GetResponse(), orig.GetResponse())
}
if decoded.Active != orig.Active {
t.Errorf("Active = %v, want %v", decoded.Active, orig.Active)
if decoded.GetTranscription() != orig.GetTranscription() {
t.Errorf("Transcription = %q, want %q", decoded.GetTranscription(), orig.GetTranscription())
}
}
// TestTypedStructDecodesMapEncoding verifies that a typed struct can be
// decoded from data that was encoded as map[string]any (backwards compat).
func TestTypedStructDecodesMapEncoding(t *testing.T) {
// Encode as map (the old way).
mapData := map[string]any{
"request_id": "req-compat",
"user_id": "user-compat",
"count": int64(99),
"active": false,
func TestDecode_ErrorResponse_Roundtrip(t *testing.T) {
orig := &pb.ErrorResponse{
Error: true,
Message: "something broke",
Type: "InternalError",
}
data, err := msgpack.Marshal(mapData)
data, err := proto.Marshal(orig)
if err != nil {
t.Fatal(err)
}
// Decode into typed struct (the new way).
var msg testMessage
if err := msgpack.Unmarshal(data, &msg); err != nil {
var decoded pb.ErrorResponse
if err := Decode(data, &decoded); err != nil {
t.Fatal(err)
}
if msg.RequestID != "req-compat" {
t.Errorf("RequestID = %q", msg.RequestID)
if !decoded.GetError() {
t.Error("expected Error=true")
}
if msg.Count != 99 {
t.Errorf("Count = %d, want 99", msg.Count)
if decoded.GetMessage() != "something broke" {
t.Errorf("Message = %q", decoded.GetMessage())
}
if decoded.GetType() != "InternalError" {
t.Errorf("Type = %q", decoded.GetType())
}
}
// ────────────────────────────────────────────────────────────────────────────
// Binary data tests (audio []byte in msgpack)
// Binary data tests (audio []byte in protobuf)
// ────────────────────────────────────────────────────────────────────────────
type audioMessage struct {
SessionID string `msgpack:"session_id"`
Audio []byte `msgpack:"audio"`
SampleRate int `msgpack:"sample_rate"`
}
func TestBinaryDataRoundtrip(t *testing.T) {
audio := make([]byte, 32768)
for i := range audio {
audio[i] = byte(i % 256)
}
orig := audioMessage{
SessionID: "sess-audio-001",
orig := &pb.TTSAudioChunk{
SessionId: "sess-audio-001",
Audio: audio,
SampleRate: 24000,
}
data, err := msgpack.Marshal(orig)
data, err := proto.Marshal(orig)
if err != nil {
t.Fatal(err)
}
var decoded audioMessage
if err := msgpack.Unmarshal(data, &decoded); err != nil {
var decoded pb.TTSAudioChunk
if err := Decode(data, &decoded); err != nil {
t.Fatal(err)
}
if len(decoded.Audio) != len(orig.Audio) {
t.Fatalf("audio len = %d, want %d", len(decoded.Audio), len(orig.Audio))
if len(decoded.GetAudio()) != len(orig.GetAudio()) {
t.Fatalf("audio len = %d, want %d", len(decoded.GetAudio()), len(orig.GetAudio()))
}
for i := range decoded.Audio {
if decoded.Audio[i] != orig.Audio[i] {
t.Fatalf("audio[%d] = %d, want %d", i, decoded.Audio[i], orig.Audio[i])
for i := range decoded.GetAudio() {
if decoded.GetAudio()[i] != orig.GetAudio()[i] {
t.Fatalf("audio[%d] = %d, want %d", i, decoded.GetAudio()[i], orig.GetAudio()[i])
}
}
}
// TestBinaryVsBase64Size shows the wire-size win of raw bytes vs base64 string.
func TestBinaryVsBase64Size(t *testing.T) {
// TestProtoWireSize shows protobuf wire size for binary payloads.
func TestProtoWireSize(t *testing.T) {
audio := make([]byte, 16384)
// Old approach: base64 string in map.
import_b64 := make([]byte, (len(audio)*4+2)/3) // approximate base64 size
mapMsg := map[string]any{
"session_id": "sess-1",
"audio_b64": string(import_b64),
}
mapData, _ := msgpack.Marshal(mapMsg)
// New approach: raw bytes in struct.
structMsg := audioMessage{
SessionID: "sess-1",
msg := &pb.TTSAudioChunk{
SessionId: "sess-1",
Audio: audio,
}
structData, _ := msgpack.Marshal(structMsg)
data, _ := proto.Marshal(msg)
t.Logf("base64-in-map: %d bytes, raw-bytes-in-struct: %d bytes (%.0f%% smaller)",
len(mapData), len(structData),
100*(1-float64(len(structData))/float64(len(mapData))))
t.Logf("TTSAudioChunk with 16KB audio: %d bytes on wire", len(data))
}
// ────────────────────────────────────────────────────────────────────────────
// Benchmarks
// ────────────────────────────────────────────────────────────────────────────
func BenchmarkEncodeMap(b *testing.B) {
data := map[string]any{
"request_id": "req-bench",
"user_id": "user-bench",
"message": "What is the weather today?",
"premium": true,
"top_k": 10,
func BenchmarkEncode_ChatRequest(b *testing.B) {
data := &pb.ChatRequest{
RequestId: "req-bench",
UserId: "user-bench",
Message: "What is the weather today?",
Premium: true,
TopK: 10,
}
for b.Loop() {
_, _ = msgpack.Marshal(data)
_, _ = proto.Marshal(data)
}
}
func BenchmarkEncodeStruct(b *testing.B) {
data := testMessage{
RequestID: "req-bench",
UserID: "user-bench",
Count: 10,
Active: true,
}
for b.Loop() {
_, _ = msgpack.Marshal(data)
}
}
func BenchmarkDecodeMap(b *testing.B) {
raw, _ := msgpack.Marshal(map[string]any{
"request_id": "req-bench",
"user_id": "user-bench",
"message": "What is the weather today?",
"premium": true,
"top_k": 10,
func BenchmarkDecode_ChatRequest(b *testing.B) {
raw, _ := proto.Marshal(&pb.ChatRequest{
RequestId: "req-bench",
UserId: "user-bench",
Message: "What is the weather today?",
Premium: true,
TopK: 10,
})
for b.Loop() {
var m map[string]any
_ = msgpack.Unmarshal(raw, &m)
var m pb.ChatRequest
_ = Decode(raw, &m)
}
}
func BenchmarkDecodeStruct(b *testing.B) {
raw, _ := msgpack.Marshal(testMessage{
RequestID: "req-bench",
UserID: "user-bench",
Count: 10,
Active: true,
})
for b.Loop() {
var m testMessage
_ = msgpack.Unmarshal(raw, &m)
}
}
func BenchmarkDecodeAudio32KB(b *testing.B) {
raw, _ := msgpack.Marshal(audioMessage{
SessionID: "s1",
func BenchmarkDecode_Audio32KB(b *testing.B) {
raw, _ := proto.Marshal(&pb.TTSAudioChunk{
SessionId: "s1",
Audio: make([]byte, 32768),
SampleRate: 24000,
})
for b.Loop() {
var m audioMessage
_ = msgpack.Unmarshal(raw, &m)
var m pb.TTSAudioChunk
_ = Decode(raw, &m)
}
}