feat!: replace msgpack with protobuf for all NATS messages
BREAKING CHANGE: All NATS message serialization now uses Protocol Buffers. - Added proto/messages/v1/messages.proto with 22 message types - Generated Go code at gen/messagespb/ - messages/ package now exports type aliases to proto types - natsutil.Publish/Request/Decode use proto.Marshal/Unmarshal - Removed legacy MessageHandler, OnMessage, wrapMapHandler - TypedMessageHandler now returns (proto.Message, error) - EffectiveQuery is now a free function: messages.EffectiveQuery(req) - Removed msgpack dependency entirely
This commit is contained in:
10
buf.gen.yaml
Normal file
10
buf.gen.yaml
Normal file
@@ -0,0 +1,10 @@
|
||||
version: v2
|
||||
managed:
|
||||
enabled: true
|
||||
override:
|
||||
- file_option: go_package_prefix
|
||||
value: git.daviestechlabs.io/daviestechlabs/handler-base/gen
|
||||
plugins:
|
||||
- protoc_builtin: go
|
||||
out: gen
|
||||
opt: paths=source_relative
|
||||
9
buf.yaml
Normal file
9
buf.yaml
Normal file
@@ -0,0 +1,9 @@
|
||||
version: v2
|
||||
modules:
|
||||
- path: proto
|
||||
lint:
|
||||
use:
|
||||
- STANDARD
|
||||
breaking:
|
||||
use:
|
||||
- FILE
|
||||
2011
gen/messagespb/messages.pb.go
Normal file
2011
gen/messagespb/messages.pb.go
Normal file
File diff suppressed because it is too large
Load Diff
6
go.mod
6
go.mod
@@ -3,8 +3,8 @@ module git.daviestechlabs.io/daviestechlabs/handler-base
|
||||
go 1.25.1
|
||||
|
||||
require (
|
||||
github.com/fsnotify/fsnotify v1.9.0
|
||||
github.com/nats-io/nats.go v1.48.0
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1
|
||||
go.opentelemetry.io/otel v1.40.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0
|
||||
@@ -12,12 +12,12 @@ require (
|
||||
go.opentelemetry.io/otel/sdk v1.40.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.40.0
|
||||
go.opentelemetry.io/otel/trace v1.40.0
|
||||
google.golang.org/protobuf v1.36.11
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.9.0 // indirect
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
@@ -25,7 +25,6 @@ require (
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
|
||||
@@ -36,5 +35,4 @@ require (
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
|
||||
google.golang.org/grpc v1.78.0 // indirect
|
||||
google.golang.org/protobuf v1.36.11 // indirect
|
||||
)
|
||||
|
||||
4
go.sum
4
go.sum
@@ -31,10 +31,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
|
||||
|
||||
@@ -10,22 +10,19 @@ import (
|
||||
"syscall"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/config"
|
||||
pb "git.daviestechlabs.io/daviestechlabs/handler-base/gen/messagespb"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/health"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/telemetry"
|
||||
)
|
||||
|
||||
// MessageHandler is the callback for processing decoded NATS messages.
|
||||
// data is the msgpack-decoded map. Return a response map (or nil for no reply).
|
||||
type MessageHandler func(ctx context.Context, msg *nats.Msg, data map[string]any) (map[string]any, error)
|
||||
|
||||
// TypedMessageHandler processes the raw NATS message without pre-decoding to
|
||||
// map[string]any. Services unmarshal msg.Data into their own typed structs,
|
||||
// avoiding the double-decode overhead. Return any msgpack-serialisable value
|
||||
// (a typed struct, map, or nil for no reply).
|
||||
type TypedMessageHandler func(ctx context.Context, msg *nats.Msg) (any, error)
|
||||
// TypedMessageHandler processes the raw NATS message.
|
||||
// Services unmarshal msg.Data into their own typed structs via natsutil.Decode.
|
||||
// Return a proto.Message (or nil for no reply).
|
||||
type TypedMessageHandler func(ctx context.Context, msg *nats.Msg) (proto.Message, error)
|
||||
|
||||
// SetupFunc is called once before the handler starts processing messages.
|
||||
type SetupFunc func(ctx context.Context) error
|
||||
@@ -43,7 +40,6 @@ type Handler struct {
|
||||
|
||||
onSetup SetupFunc
|
||||
onTeardown TeardownFunc
|
||||
onMessage MessageHandler
|
||||
onTypedMessage TypedMessageHandler
|
||||
running bool
|
||||
}
|
||||
@@ -74,12 +70,7 @@ func (h *Handler) OnSetup(fn SetupFunc) { h.onSetup = fn }
|
||||
// OnTeardown registers the teardown callback.
|
||||
func (h *Handler) OnTeardown(fn TeardownFunc) { h.onTeardown = fn }
|
||||
|
||||
// OnMessage registers the message handler callback.
|
||||
func (h *Handler) OnMessage(fn MessageHandler) { h.onMessage = fn }
|
||||
|
||||
// OnTypedMessage registers a typed message handler. It replaces OnMessage —
|
||||
// wrapHandler will skip the map[string]any decode and let the callback
|
||||
// unmarshal msg.Data directly.
|
||||
// OnTypedMessage registers the message handler callback.
|
||||
func (h *Handler) OnTypedMessage(fn TypedMessageHandler) { h.onTypedMessage = fn }
|
||||
|
||||
// Run starts the handler: telemetry, health server, NATS subscription, and blocks until SIGTERM/SIGINT.
|
||||
@@ -131,7 +122,7 @@ func (h *Handler) Run() error {
|
||||
}
|
||||
|
||||
// Subscribe
|
||||
if h.onMessage == nil && h.onTypedMessage == nil {
|
||||
if h.onTypedMessage == nil {
|
||||
return fmt.Errorf("no message handler registered")
|
||||
}
|
||||
if err := h.NATS.Subscribe(h.Subject, h.wrapHandler(ctx), h.QueueGroup); err != nil {
|
||||
@@ -161,26 +152,16 @@ func (h *Handler) Run() error {
|
||||
}
|
||||
|
||||
// wrapHandler creates a nats.MsgHandler that dispatches to the registered callback.
|
||||
// If OnTypedMessage was used, msg.Data is passed directly without map decode.
|
||||
// If OnMessage was used, msg.Data is decoded to map[string]any first.
|
||||
func (h *Handler) wrapHandler(ctx context.Context) nats.MsgHandler {
|
||||
if h.onTypedMessage != nil {
|
||||
return h.wrapTypedHandler(ctx)
|
||||
}
|
||||
return h.wrapMapHandler(ctx)
|
||||
}
|
||||
|
||||
// wrapTypedHandler dispatches to the TypedMessageHandler (no map decode).
|
||||
func (h *Handler) wrapTypedHandler(ctx context.Context) nats.MsgHandler {
|
||||
return func(msg *nats.Msg) {
|
||||
response, err := h.onTypedMessage(ctx, msg)
|
||||
if err != nil {
|
||||
slog.Error("handler error", "subject", msg.Subject, "error", err)
|
||||
if msg.Reply != "" {
|
||||
_ = h.NATS.Publish(msg.Reply, map[string]any{
|
||||
"error": true,
|
||||
"message": err.Error(),
|
||||
"type": fmt.Sprintf("%T", err),
|
||||
_ = h.NATS.Publish(msg.Reply, &pb.ErrorResponse{
|
||||
Error: true,
|
||||
Message: err.Error(),
|
||||
Type: fmt.Sprintf("%T", err),
|
||||
})
|
||||
}
|
||||
return
|
||||
@@ -192,40 +173,3 @@ func (h *Handler) wrapTypedHandler(ctx context.Context) nats.MsgHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// wrapMapHandler dispatches to the legacy MessageHandler (decodes to map first).
|
||||
func (h *Handler) wrapMapHandler(ctx context.Context) nats.MsgHandler {
|
||||
return func(msg *nats.Msg) {
|
||||
data, err := natsutil.DecodeMsgpackMap(msg.Data)
|
||||
if err != nil {
|
||||
slog.Error("failed to decode message", "subject", msg.Subject, "error", err)
|
||||
if msg.Reply != "" {
|
||||
_ = h.NATS.Publish(msg.Reply, map[string]any{
|
||||
"error": true,
|
||||
"message": err.Error(),
|
||||
"type": "DecodeError",
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
response, err := h.onMessage(ctx, msg, data)
|
||||
if err != nil {
|
||||
slog.Error("handler error", "subject", msg.Subject, "error", err)
|
||||
if msg.Reply != "" {
|
||||
_ = h.NATS.Publish(msg.Reply, map[string]any{
|
||||
"error": true,
|
||||
"message": err.Error(),
|
||||
"type": fmt.Sprintf("%T", err),
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if response != nil && msg.Reply != "" {
|
||||
if err := h.NATS.Publish(msg.Reply, response); err != nil {
|
||||
slog.Error("failed to publish reply", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,9 +5,11 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/config"
|
||||
pb "git.daviestechlabs.io/daviestechlabs/handler-base/gen/messagespb"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
||||
)
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
@@ -57,11 +59,11 @@ func TestCallbackRegistration(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
|
||||
h.OnMessage(func(ctx context.Context, msg *nats.Msg, data map[string]any) (map[string]any, error) {
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
if h.onSetup == nil || h.onTeardown == nil || h.onMessage == nil {
|
||||
if h.onSetup == nil || h.onTeardown == nil || h.onTypedMessage == nil {
|
||||
t.Error("callbacks should not be nil after registration")
|
||||
}
|
||||
|
||||
@@ -77,8 +79,8 @@ func TestTypedMessageRegistration(t *testing.T) {
|
||||
cfg := config.Load()
|
||||
h := New("ai.test", cfg)
|
||||
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
|
||||
return map[string]any{"ok": true}, nil
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
return &pb.ChatResponse{Response: "ok"}, nil
|
||||
})
|
||||
|
||||
if h.onTypedMessage == nil {
|
||||
@@ -94,19 +96,20 @@ func TestWrapHandler_ValidMessage(t *testing.T) {
|
||||
cfg := config.Load()
|
||||
h := New("ai.test", cfg)
|
||||
|
||||
var receivedData map[string]any
|
||||
h.OnMessage(func(ctx context.Context, msg *nats.Msg, data map[string]any) (map[string]any, error) {
|
||||
receivedData = data
|
||||
return map[string]any{"status": "ok"}, nil
|
||||
var receivedReq pb.ChatRequest
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
if err := natsutil.Decode(msg.Data, &receivedReq); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pb.ChatResponse{Response: "ok", UserId: receivedReq.GetUserId()}, nil
|
||||
})
|
||||
|
||||
// Encode a message the same way services would.
|
||||
payload := map[string]any{
|
||||
"request_id": "test-001",
|
||||
"message": "hello",
|
||||
"premium": true,
|
||||
}
|
||||
encoded, err := msgpack.Marshal(payload)
|
||||
encoded, err := proto.Marshal(&pb.ChatRequest{
|
||||
RequestId: "test-001",
|
||||
Message: "hello",
|
||||
Premium: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -118,47 +121,48 @@ func TestWrapHandler_ValidMessage(t *testing.T) {
|
||||
Data: encoded,
|
||||
})
|
||||
|
||||
if receivedData == nil {
|
||||
t.Fatal("handler was not called")
|
||||
if receivedReq.GetRequestId() != "test-001" {
|
||||
t.Errorf("request_id = %v", receivedReq.GetRequestId())
|
||||
}
|
||||
if receivedData["request_id"] != "test-001" {
|
||||
t.Errorf("request_id = %v", receivedData["request_id"])
|
||||
}
|
||||
if receivedData["premium"] != true {
|
||||
t.Errorf("premium = %v", receivedData["premium"])
|
||||
if receivedReq.GetPremium() != true {
|
||||
t.Errorf("premium = %v", receivedReq.GetPremium())
|
||||
}
|
||||
}
|
||||
|
||||
func TestWrapHandler_InvalidMsgpack(t *testing.T) {
|
||||
func TestWrapHandler_InvalidMessage(t *testing.T) {
|
||||
cfg := config.Load()
|
||||
h := New("ai.test", cfg)
|
||||
|
||||
handlerCalled := false
|
||||
h.OnMessage(func(ctx context.Context, msg *nats.Msg, data map[string]any) (map[string]any, error) {
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
handlerCalled = true
|
||||
return nil, nil
|
||||
var req pb.ChatRequest
|
||||
if err := natsutil.Decode(msg.Data, &req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pb.ChatResponse{}, nil
|
||||
})
|
||||
|
||||
handler := h.wrapHandler(context.Background())
|
||||
handler(&nats.Msg{
|
||||
Subject: "ai.test",
|
||||
Data: []byte{0xFF, 0xFE, 0xFD}, // invalid msgpack
|
||||
Data: []byte{0xFF, 0xFE, 0xFD}, // invalid protobuf
|
||||
})
|
||||
|
||||
if handlerCalled {
|
||||
t.Error("handler should not be called for invalid msgpack")
|
||||
}
|
||||
// The handler IS called (wrapHandler doesn't pre-decode), but it should
|
||||
// return an error from Decode. Either way no panic.
|
||||
_ = handlerCalled
|
||||
}
|
||||
|
||||
func TestWrapHandler_HandlerError(t *testing.T) {
|
||||
cfg := config.Load()
|
||||
h := New("ai.test", cfg)
|
||||
|
||||
h.OnMessage(func(ctx context.Context, msg *nats.Msg, data map[string]any) (map[string]any, error) {
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
return nil, context.DeadlineExceeded
|
||||
})
|
||||
|
||||
encoded, _ := msgpack.Marshal(map[string]any{"key": "val"})
|
||||
encoded, _ := proto.Marshal(&pb.ChatRequest{RequestId: "err-test"})
|
||||
handler := h.wrapHandler(context.Background())
|
||||
|
||||
// Should not panic even when handler returns error.
|
||||
@@ -172,11 +176,11 @@ func TestWrapHandler_NilResponse(t *testing.T) {
|
||||
cfg := config.Load()
|
||||
h := New("ai.test", cfg)
|
||||
|
||||
h.OnMessage(func(ctx context.Context, msg *nats.Msg, data map[string]any) (map[string]any, error) {
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
return nil, nil // fire-and-forget style
|
||||
})
|
||||
|
||||
encoded, _ := msgpack.Marshal(map[string]any{"x": 1})
|
||||
encoded, _ := proto.Marshal(&pb.ChatRequest{RequestId: "nil-resp"})
|
||||
handler := h.wrapHandler(context.Background())
|
||||
|
||||
// Should not panic with nil response and no reply subject.
|
||||
@@ -190,63 +194,58 @@ func TestWrapHandler_NilResponse(t *testing.T) {
|
||||
// wrapHandler dispatch tests — typed handler path
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestWrapTypedHandler_ValidMessage(t *testing.T) {
|
||||
func TestWrapHandler_Typed(t *testing.T) {
|
||||
cfg := config.Load()
|
||||
h := New("ai.test", cfg)
|
||||
|
||||
type testReq struct {
|
||||
RequestID string `msgpack:"request_id"`
|
||||
Message string `msgpack:"message"`
|
||||
}
|
||||
|
||||
var received testReq
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
|
||||
if err := msgpack.Unmarshal(msg.Data, &received); err != nil {
|
||||
var received pb.ChatRequest
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
if err := natsutil.Decode(msg.Data, &received); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return map[string]any{"status": "ok"}, nil
|
||||
return &pb.ChatResponse{UserId: received.GetUserId(), Response: "ok"}, nil
|
||||
})
|
||||
|
||||
encoded, _ := msgpack.Marshal(map[string]any{
|
||||
"request_id": "typed-001",
|
||||
"message": "hello typed",
|
||||
encoded, _ := proto.Marshal(&pb.ChatRequest{
|
||||
RequestId: "typed-001",
|
||||
Message: "hello typed",
|
||||
})
|
||||
|
||||
handler := h.wrapHandler(context.Background())
|
||||
handler(&nats.Msg{Subject: "ai.test", Data: encoded})
|
||||
|
||||
if received.RequestID != "typed-001" {
|
||||
t.Errorf("RequestID = %q", received.RequestID)
|
||||
if received.GetRequestId() != "typed-001" {
|
||||
t.Errorf("RequestId = %q", received.GetRequestId())
|
||||
}
|
||||
if received.Message != "hello typed" {
|
||||
t.Errorf("Message = %q", received.Message)
|
||||
if received.GetMessage() != "hello typed" {
|
||||
t.Errorf("Message = %q", received.GetMessage())
|
||||
}
|
||||
}
|
||||
|
||||
func TestWrapTypedHandler_Error(t *testing.T) {
|
||||
func TestWrapHandler_TypedError(t *testing.T) {
|
||||
cfg := config.Load()
|
||||
h := New("ai.test", cfg)
|
||||
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
return nil, context.DeadlineExceeded
|
||||
})
|
||||
|
||||
encoded, _ := msgpack.Marshal(map[string]any{"key": "val"})
|
||||
encoded, _ := proto.Marshal(&pb.ChatRequest{RequestId: "err"})
|
||||
handler := h.wrapHandler(context.Background())
|
||||
|
||||
// Should not panic.
|
||||
handler(&nats.Msg{Subject: "ai.test", Data: encoded})
|
||||
}
|
||||
|
||||
func TestWrapTypedHandler_NilResponse(t *testing.T) {
|
||||
func TestWrapHandler_TypedNilResponse(t *testing.T) {
|
||||
cfg := config.Load()
|
||||
h := New("ai.test", cfg)
|
||||
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
encoded, _ := msgpack.Marshal(map[string]any{"x": 1})
|
||||
encoded, _ := proto.Marshal(&pb.ChatRequest{RequestId: "nil"})
|
||||
handler := h.wrapHandler(context.Background())
|
||||
handler(&nats.Msg{Subject: "ai.test", Data: encoded})
|
||||
}
|
||||
@@ -258,49 +257,18 @@ func TestWrapTypedHandler_NilResponse(t *testing.T) {
|
||||
func BenchmarkWrapHandler(b *testing.B) {
|
||||
cfg := config.Load()
|
||||
h := New("ai.test", cfg)
|
||||
h.OnMessage(func(ctx context.Context, msg *nats.Msg, data map[string]any) (map[string]any, error) {
|
||||
return map[string]any{"ok": true}, nil
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
var req pb.ChatRequest
|
||||
_ = natsutil.Decode(msg.Data, &req)
|
||||
return &pb.ChatResponse{Response: "ok"}, nil
|
||||
})
|
||||
|
||||
payload := map[string]any{
|
||||
"request_id": "bench-001",
|
||||
"message": "What is the capital of France?",
|
||||
"premium": true,
|
||||
"top_k": 10,
|
||||
}
|
||||
encoded, _ := msgpack.Marshal(payload)
|
||||
handler := h.wrapHandler(context.Background())
|
||||
msg := &nats.Msg{Subject: "ai.test", Data: encoded}
|
||||
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
handler(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkWrapTypedHandler(b *testing.B) {
|
||||
type benchReq struct {
|
||||
RequestID string `msgpack:"request_id"`
|
||||
Message string `msgpack:"message"`
|
||||
Premium bool `msgpack:"premium"`
|
||||
TopK int `msgpack:"top_k"`
|
||||
}
|
||||
|
||||
cfg := config.Load()
|
||||
h := New("ai.test", cfg)
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
|
||||
var req benchReq
|
||||
_ = msgpack.Unmarshal(msg.Data, &req)
|
||||
return map[string]any{"ok": true}, nil
|
||||
})
|
||||
|
||||
payload := map[string]any{
|
||||
"request_id": "bench-001",
|
||||
"message": "What is the capital of France?",
|
||||
"premium": true,
|
||||
"top_k": 10,
|
||||
}
|
||||
encoded, _ := msgpack.Marshal(payload)
|
||||
encoded, _ := proto.Marshal(&pb.ChatRequest{
|
||||
RequestId: "bench-001",
|
||||
Message: "What is the capital of France?",
|
||||
Premium: true,
|
||||
TopK: 10,
|
||||
})
|
||||
handler := h.wrapHandler(context.Background())
|
||||
msg := &nats.Msg{Subject: "ai.test", Data: encoded}
|
||||
|
||||
|
||||
@@ -1,8 +1,4 @@
|
||||
// Package messages benchmarks compare three serialization strategies:
|
||||
//
|
||||
// 1. msgpack map[string]any — the old approach (dynamic, no types)
|
||||
// 2. msgpack typed struct — the new approach (compile-time safe, short keys)
|
||||
// 3. protobuf — optional future migration
|
||||
// Package messages benchmarks protobuf encoding/decoding of all message types.
|
||||
//
|
||||
// Run with:
|
||||
//
|
||||
@@ -14,52 +10,15 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
pb "git.daviestechlabs.io/daviestechlabs/handler-base/messages/proto"
|
||||
pb "git.daviestechlabs.io/daviestechlabs/handler-base/gen/messagespb"
|
||||
)
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// Test fixtures — equivalent data across all three encodings
|
||||
// Test fixtures — proto message constructors
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// chatRequestMap is the legacy map[string]any representation.
|
||||
func chatRequestMap() map[string]any {
|
||||
return map[string]any{
|
||||
"request_id": "req-abc-123",
|
||||
"user_id": "user-42",
|
||||
"message": "What is the capital of France?",
|
||||
"query": "",
|
||||
"premium": true,
|
||||
"enable_rag": true,
|
||||
"enable_reranker": true,
|
||||
"enable_streaming": false,
|
||||
"top_k": 10,
|
||||
"collection": "documents",
|
||||
"enable_tts": false,
|
||||
"system_prompt": "You are a helpful assistant.",
|
||||
"response_subject": "ai.chat.response.req-abc-123",
|
||||
}
|
||||
}
|
||||
|
||||
// chatRequestStruct is the typed struct representation.
|
||||
func chatRequestStruct() ChatRequest {
|
||||
return ChatRequest{
|
||||
RequestID: "req-abc-123",
|
||||
UserID: "user-42",
|
||||
Message: "What is the capital of France?",
|
||||
Premium: true,
|
||||
EnableRAG: true,
|
||||
EnableReranker: true,
|
||||
TopK: 10,
|
||||
Collection: "documents",
|
||||
SystemPrompt: "You are a helpful assistant.",
|
||||
ResponseSubject: "ai.chat.response.req-abc-123",
|
||||
}
|
||||
}
|
||||
|
||||
// chatRequestProto is the protobuf representation.
|
||||
func chatRequestProto() *pb.ChatRequest {
|
||||
return &pb.ChatRequest{
|
||||
RequestId: "req-abc-123",
|
||||
@@ -75,25 +34,6 @@ func chatRequestProto() *pb.ChatRequest {
|
||||
}
|
||||
}
|
||||
|
||||
// voiceResponseMap is a voice response with a 16 KB audio payload.
|
||||
func voiceResponseMap() map[string]any {
|
||||
return map[string]any{
|
||||
"request_id": "vr-001",
|
||||
"response": "The capital of France is Paris.",
|
||||
"audio": make([]byte, 16384),
|
||||
"transcription": "What is the capital of France?",
|
||||
}
|
||||
}
|
||||
|
||||
func voiceResponseStruct() VoiceResponse {
|
||||
return VoiceResponse{
|
||||
RequestID: "vr-001",
|
||||
Response: "The capital of France is Paris.",
|
||||
Audio: make([]byte, 16384),
|
||||
Transcription: "What is the capital of France?",
|
||||
}
|
||||
}
|
||||
|
||||
func voiceResponseProto() *pb.VoiceResponse {
|
||||
return &pb.VoiceResponse{
|
||||
RequestId: "vr-001",
|
||||
@@ -103,31 +43,6 @@ func voiceResponseProto() *pb.VoiceResponse {
|
||||
}
|
||||
}
|
||||
|
||||
// ttsChunkMap simulates a streaming audio chunk (~32 KB).
|
||||
func ttsChunkMap() map[string]any {
|
||||
return map[string]any{
|
||||
"session_id": "tts-sess-99",
|
||||
"chunk_index": 3,
|
||||
"total_chunks": 12,
|
||||
"audio_b64": string(make([]byte, 32768)), // old: base64 string
|
||||
"is_last": false,
|
||||
"timestamp": time.Now().Unix(),
|
||||
"sample_rate": 24000,
|
||||
}
|
||||
}
|
||||
|
||||
func ttsChunkStruct() TTSAudioChunk {
|
||||
return TTSAudioChunk{
|
||||
SessionID: "tts-sess-99",
|
||||
ChunkIndex: 3,
|
||||
TotalChunks: 12,
|
||||
Audio: make([]byte, 32768), // new: raw bytes
|
||||
IsLast: false,
|
||||
Timestamp: time.Now().Unix(),
|
||||
SampleRate: 24000,
|
||||
}
|
||||
}
|
||||
|
||||
func ttsChunkProto() *pb.TTSAudioChunk {
|
||||
return &pb.TTSAudioChunk{
|
||||
SessionId: "tts-sess-99",
|
||||
@@ -147,26 +62,16 @@ func ttsChunkProto() *pb.TTSAudioChunk {
|
||||
func TestWireSize(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
mapData any
|
||||
structVal any
|
||||
protoMsg proto.Message
|
||||
}{
|
||||
{"ChatRequest", chatRequestMap(), chatRequestStruct(), chatRequestProto()},
|
||||
{"VoiceResponse", voiceResponseMap(), voiceResponseStruct(), voiceResponseProto()},
|
||||
{"TTSAudioChunk", ttsChunkMap(), ttsChunkStruct(), ttsChunkProto()},
|
||||
{"ChatRequest", chatRequestProto()},
|
||||
{"VoiceResponse", voiceResponseProto()},
|
||||
{"TTSAudioChunk", ttsChunkProto()},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
mapBytes, _ := msgpack.Marshal(tt.mapData)
|
||||
structBytes, _ := msgpack.Marshal(tt.structVal)
|
||||
protoBytes, _ := proto.Marshal(tt.protoMsg)
|
||||
|
||||
t.Logf("%-16s map=%5d B struct=%5d B proto=%5d B (struct saves %.0f%%, proto saves %.0f%%)",
|
||||
tt.name,
|
||||
len(mapBytes), len(structBytes), len(protoBytes),
|
||||
100*(1-float64(len(structBytes))/float64(len(mapBytes))),
|
||||
100*(1-float64(len(protoBytes))/float64(len(mapBytes))),
|
||||
)
|
||||
t.Logf("%-16s proto=%5d B", tt.name, len(protoBytes))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,23 +79,7 @@ func TestWireSize(t *testing.T) {
|
||||
// Encode benchmarks
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func BenchmarkEncode_ChatRequest_MsgpackMap(b *testing.B) {
|
||||
data := chatRequestMap()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncode_ChatRequest_MsgpackStruct(b *testing.B) {
|
||||
data := chatRequestStruct()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncode_ChatRequest_Protobuf(b *testing.B) {
|
||||
func BenchmarkEncode_ChatRequest(b *testing.B) {
|
||||
data := chatRequestProto()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
@@ -198,23 +87,7 @@ func BenchmarkEncode_ChatRequest_Protobuf(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncode_VoiceResponse_MsgpackMap(b *testing.B) {
|
||||
data := voiceResponseMap()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncode_VoiceResponse_MsgpackStruct(b *testing.B) {
|
||||
data := voiceResponseStruct()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncode_VoiceResponse_Protobuf(b *testing.B) {
|
||||
func BenchmarkEncode_VoiceResponse(b *testing.B) {
|
||||
data := voiceResponseProto()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
@@ -222,23 +95,7 @@ func BenchmarkEncode_VoiceResponse_Protobuf(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncode_TTSChunk_MsgpackMap(b *testing.B) {
|
||||
data := ttsChunkMap()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncode_TTSChunk_MsgpackStruct(b *testing.B) {
|
||||
data := ttsChunkStruct()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncode_TTSChunk_Protobuf(b *testing.B) {
|
||||
func BenchmarkEncode_TTSChunk(b *testing.B) {
|
||||
data := ttsChunkProto()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
@@ -250,25 +107,7 @@ func BenchmarkEncode_TTSChunk_Protobuf(b *testing.B) {
|
||||
// Decode benchmarks
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func BenchmarkDecode_ChatRequest_MsgpackMap(b *testing.B) {
|
||||
encoded, _ := msgpack.Marshal(chatRequestMap())
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m map[string]any
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecode_ChatRequest_MsgpackStruct(b *testing.B) {
|
||||
encoded, _ := msgpack.Marshal(chatRequestStruct())
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m ChatRequest
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecode_ChatRequest_Protobuf(b *testing.B) {
|
||||
func BenchmarkDecode_ChatRequest(b *testing.B) {
|
||||
encoded, _ := proto.Marshal(chatRequestProto())
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
@@ -277,25 +116,7 @@ func BenchmarkDecode_ChatRequest_Protobuf(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecode_VoiceResponse_MsgpackMap(b *testing.B) {
|
||||
encoded, _ := msgpack.Marshal(voiceResponseMap())
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m map[string]any
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecode_VoiceResponse_MsgpackStruct(b *testing.B) {
|
||||
encoded, _ := msgpack.Marshal(voiceResponseStruct())
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m VoiceResponse
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecode_VoiceResponse_Protobuf(b *testing.B) {
|
||||
func BenchmarkDecode_VoiceResponse(b *testing.B) {
|
||||
encoded, _ := proto.Marshal(voiceResponseProto())
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
@@ -304,25 +125,7 @@ func BenchmarkDecode_VoiceResponse_Protobuf(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecode_TTSChunk_MsgpackMap(b *testing.B) {
|
||||
encoded, _ := msgpack.Marshal(ttsChunkMap())
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m map[string]any
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecode_TTSChunk_MsgpackStruct(b *testing.B) {
|
||||
encoded, _ := msgpack.Marshal(ttsChunkStruct())
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m TTSAudioChunk
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecode_TTSChunk_Protobuf(b *testing.B) {
|
||||
func BenchmarkDecode_TTSChunk(b *testing.B) {
|
||||
encoded, _ := proto.Marshal(ttsChunkProto())
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
@@ -335,27 +138,7 @@ func BenchmarkDecode_TTSChunk_Protobuf(b *testing.B) {
|
||||
// Roundtrip benchmarks (encode + decode)
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func BenchmarkRoundtrip_ChatRequest_MsgpackMap(b *testing.B) {
|
||||
data := chatRequestMap()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
enc, _ := msgpack.Marshal(data)
|
||||
var dec map[string]any
|
||||
_ = msgpack.Unmarshal(enc, &dec)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRoundtrip_ChatRequest_MsgpackStruct(b *testing.B) {
|
||||
data := chatRequestStruct()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
enc, _ := msgpack.Marshal(data)
|
||||
var dec ChatRequest
|
||||
_ = msgpack.Unmarshal(enc, &dec)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRoundtrip_ChatRequest_Protobuf(b *testing.B) {
|
||||
func BenchmarkRoundtrip_ChatRequest(b *testing.B) {
|
||||
data := chatRequestProto()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
@@ -366,143 +149,157 @@ func BenchmarkRoundtrip_ChatRequest_Protobuf(b *testing.B) {
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// Typed struct unit tests — verify roundtrip correctness
|
||||
// Correctness tests — verify proto roundtrip
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestRoundtrip_ChatRequest(t *testing.T) {
|
||||
orig := chatRequestStruct()
|
||||
data, err := msgpack.Marshal(orig)
|
||||
orig := chatRequestProto()
|
||||
data, err := proto.Marshal(orig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var dec ChatRequest
|
||||
if err := msgpack.Unmarshal(data, &dec); err != nil {
|
||||
var dec pb.ChatRequest
|
||||
if err := proto.Unmarshal(data, &dec); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if dec.RequestID != orig.RequestID {
|
||||
t.Errorf("RequestID = %q, want %q", dec.RequestID, orig.RequestID)
|
||||
if dec.GetRequestId() != orig.GetRequestId() {
|
||||
t.Errorf("RequestId = %q, want %q", dec.GetRequestId(), orig.GetRequestId())
|
||||
}
|
||||
if dec.Message != orig.Message {
|
||||
t.Errorf("Message = %q, want %q", dec.Message, orig.Message)
|
||||
if dec.GetMessage() != orig.GetMessage() {
|
||||
t.Errorf("Message = %q, want %q", dec.GetMessage(), orig.GetMessage())
|
||||
}
|
||||
if dec.TopK != orig.TopK {
|
||||
t.Errorf("TopK = %d, want %d", dec.TopK, orig.TopK)
|
||||
if dec.GetTopK() != orig.GetTopK() {
|
||||
t.Errorf("TopK = %d, want %d", dec.GetTopK(), orig.GetTopK())
|
||||
}
|
||||
if dec.Premium != orig.Premium {
|
||||
t.Errorf("Premium = %v, want %v", dec.Premium, orig.Premium)
|
||||
if dec.GetPremium() != orig.GetPremium() {
|
||||
t.Errorf("Premium = %v, want %v", dec.GetPremium(), orig.GetPremium())
|
||||
}
|
||||
if dec.EffectiveQuery() != orig.Message {
|
||||
t.Errorf("EffectiveQuery() = %q, want %q", dec.EffectiveQuery(), orig.Message)
|
||||
if EffectiveQuery(&dec) != orig.GetMessage() {
|
||||
t.Errorf("EffectiveQuery() = %q, want %q", EffectiveQuery(&dec), orig.GetMessage())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoundtrip_VoiceResponse(t *testing.T) {
|
||||
orig := voiceResponseStruct()
|
||||
data, err := msgpack.Marshal(orig)
|
||||
orig := voiceResponseProto()
|
||||
data, err := proto.Marshal(orig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var dec VoiceResponse
|
||||
if err := msgpack.Unmarshal(data, &dec); err != nil {
|
||||
var dec pb.VoiceResponse
|
||||
if err := proto.Unmarshal(data, &dec); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if dec.RequestID != orig.RequestID {
|
||||
t.Errorf("RequestID mismatch")
|
||||
if dec.GetRequestId() != orig.GetRequestId() {
|
||||
t.Errorf("RequestId mismatch")
|
||||
}
|
||||
if len(dec.Audio) != len(orig.Audio) {
|
||||
t.Errorf("Audio len = %d, want %d", len(dec.Audio), len(orig.Audio))
|
||||
if len(dec.GetAudio()) != len(orig.GetAudio()) {
|
||||
t.Errorf("Audio len = %d, want %d", len(dec.GetAudio()), len(orig.GetAudio()))
|
||||
}
|
||||
if dec.Transcription != orig.Transcription {
|
||||
if dec.GetTranscription() != orig.GetTranscription() {
|
||||
t.Errorf("Transcription mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoundtrip_TTSAudioChunk(t *testing.T) {
|
||||
orig := ttsChunkStruct()
|
||||
data, err := msgpack.Marshal(orig)
|
||||
orig := ttsChunkProto()
|
||||
data, err := proto.Marshal(orig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var dec TTSAudioChunk
|
||||
if err := msgpack.Unmarshal(data, &dec); err != nil {
|
||||
var dec pb.TTSAudioChunk
|
||||
if err := proto.Unmarshal(data, &dec); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if dec.SessionID != orig.SessionID {
|
||||
t.Errorf("SessionID mismatch")
|
||||
if dec.GetSessionId() != orig.GetSessionId() {
|
||||
t.Errorf("SessionId mismatch")
|
||||
}
|
||||
if dec.ChunkIndex != orig.ChunkIndex {
|
||||
t.Errorf("ChunkIndex = %d, want %d", dec.ChunkIndex, orig.ChunkIndex)
|
||||
if dec.GetChunkIndex() != orig.GetChunkIndex() {
|
||||
t.Errorf("ChunkIndex = %d, want %d", dec.GetChunkIndex(), orig.GetChunkIndex())
|
||||
}
|
||||
if len(dec.Audio) != len(orig.Audio) {
|
||||
t.Errorf("Audio len = %d, want %d", len(dec.Audio), len(orig.Audio))
|
||||
if len(dec.GetAudio()) != len(orig.GetAudio()) {
|
||||
t.Errorf("Audio len = %d, want %d", len(dec.GetAudio()), len(orig.GetAudio()))
|
||||
}
|
||||
if dec.SampleRate != orig.SampleRate {
|
||||
t.Errorf("SampleRate = %d, want %d", dec.SampleRate, orig.SampleRate)
|
||||
if dec.GetSampleRate() != orig.GetSampleRate() {
|
||||
t.Errorf("SampleRate = %d, want %d", dec.GetSampleRate(), orig.GetSampleRate())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoundtrip_PipelineTrigger(t *testing.T) {
|
||||
orig := PipelineTrigger{
|
||||
RequestID: "pip-001",
|
||||
orig := &pb.PipelineTrigger{
|
||||
RequestId: "pip-001",
|
||||
Pipeline: "document-ingestion",
|
||||
Parameters: map[string]any{"source": "s3://bucket/data"},
|
||||
Parameters: map[string]string{"source": "s3://bucket/data"},
|
||||
}
|
||||
data, err := msgpack.Marshal(orig)
|
||||
data, err := proto.Marshal(orig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var dec PipelineTrigger
|
||||
if err := msgpack.Unmarshal(data, &dec); err != nil {
|
||||
var dec pb.PipelineTrigger
|
||||
if err := proto.Unmarshal(data, &dec); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if dec.Pipeline != orig.Pipeline {
|
||||
t.Errorf("Pipeline = %q, want %q", dec.Pipeline, orig.Pipeline)
|
||||
if dec.GetPipeline() != orig.GetPipeline() {
|
||||
t.Errorf("Pipeline = %q, want %q", dec.GetPipeline(), orig.GetPipeline())
|
||||
}
|
||||
if dec.Parameters["source"] != orig.Parameters["source"] {
|
||||
if dec.GetParameters()["source"] != orig.GetParameters()["source"] {
|
||||
t.Errorf("Parameters[source] mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoundtrip_STTTranscription(t *testing.T) {
|
||||
orig := STTTranscription{
|
||||
SessionID: "stt-001",
|
||||
orig := &pb.STTTranscription{
|
||||
SessionId: "stt-001",
|
||||
Transcript: "hello world",
|
||||
Sequence: 5,
|
||||
IsPartial: false,
|
||||
IsFinal: true,
|
||||
Timestamp: time.Now().Unix(),
|
||||
SpeakerID: "speaker-1",
|
||||
SpeakerId: "speaker-1",
|
||||
HasVoiceActivity: true,
|
||||
State: "listening",
|
||||
}
|
||||
data, err := msgpack.Marshal(orig)
|
||||
data, err := proto.Marshal(orig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var dec STTTranscription
|
||||
if err := msgpack.Unmarshal(data, &dec); err != nil {
|
||||
var dec pb.STTTranscription
|
||||
if err := proto.Unmarshal(data, &dec); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if dec.Transcript != orig.Transcript {
|
||||
t.Errorf("Transcript = %q, want %q", dec.Transcript, orig.Transcript)
|
||||
if dec.GetTranscript() != orig.GetTranscript() {
|
||||
t.Errorf("Transcript = %q, want %q", dec.GetTranscript(), orig.GetTranscript())
|
||||
}
|
||||
if dec.IsFinal != orig.IsFinal {
|
||||
if dec.GetIsFinal() != orig.GetIsFinal() {
|
||||
t.Error("IsFinal mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoundtrip_ErrorResponse(t *testing.T) {
|
||||
orig := ErrorResponse{Error: true, Message: "something broke", Type: "InternalError"}
|
||||
data, err := msgpack.Marshal(orig)
|
||||
orig := &pb.ErrorResponse{Error: true, Message: "something broke", Type: "InternalError"}
|
||||
data, err := proto.Marshal(orig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var dec ErrorResponse
|
||||
if err := msgpack.Unmarshal(data, &dec); err != nil {
|
||||
var dec pb.ErrorResponse
|
||||
if err := proto.Unmarshal(data, &dec); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !dec.Error || dec.Message != "something broke" || dec.Type != "InternalError" {
|
||||
t.Errorf("ErrorResponse roundtrip mismatch: %+v", dec)
|
||||
if !dec.GetError() || dec.GetMessage() != "something broke" || dec.GetType() != "InternalError" {
|
||||
t.Errorf("ErrorResponse roundtrip mismatch: %+v", &dec)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEffectiveQuery_MessageSet(t *testing.T) {
|
||||
req := &pb.ChatRequest{Message: "hello", Query: "world"}
|
||||
if got := EffectiveQuery(req); got != "hello" {
|
||||
t.Errorf("EffectiveQuery() = %q, want %q", got, "hello")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEffectiveQuery_FallbackToQuery(t *testing.T) {
|
||||
req := &pb.ChatRequest{Query: "world"}
|
||||
if got := EffectiveQuery(req); got != "world" {
|
||||
t.Errorf("EffectiveQuery() = %q, want %q", got, "world")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,224 +1,69 @@
|
||||
// Package messages defines typed NATS message structs for all services.
|
||||
// Package messages re-exports protobuf message types and provides NATS
|
||||
// subject constants plus helper functions.
|
||||
//
|
||||
// Using typed structs with short msgpack field tags instead of map[string]any
|
||||
// provides compile-time safety, smaller wire size (integer-like short keys vs
|
||||
// full string keys), and faster encode/decode by avoiding interface{} boxing.
|
||||
//
|
||||
// Audio data uses raw []byte instead of base64-encoded strings — msgpack
|
||||
// supports binary natively, eliminating the 33% base64 overhead.
|
||||
// The canonical type definitions live in the generated package
|
||||
// gen/messagespb (from proto/messages/v1/messages.proto).
|
||||
// This package provides type aliases so existing callers can keep using
|
||||
// messages.ChatRequest, etc., while the wire format is now protobuf.
|
||||
package messages
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// Pipeline Bridge
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
pb "git.daviestechlabs.io/daviestechlabs/handler-base/gen/messagespb"
|
||||
)
|
||||
|
||||
// PipelineTrigger is the request to start a pipeline.
|
||||
type PipelineTrigger struct {
|
||||
RequestID string `msgpack:"request_id" json:"request_id"`
|
||||
Pipeline string `msgpack:"pipeline" json:"pipeline"`
|
||||
Parameters map[string]any `msgpack:"parameters,omitempty" json:"parameters,omitempty"`
|
||||
}
|
||||
// ════════════════════════════════════════════════════════════════════════════
|
||||
// Type aliases — use these or import gen/messagespb directly.
|
||||
// ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
// PipelineStatus is the response / status update for a pipeline run.
|
||||
type PipelineStatus struct {
|
||||
RequestID string `msgpack:"request_id" json:"request_id"`
|
||||
Status string `msgpack:"status" json:"status"`
|
||||
RunID string `msgpack:"run_id,omitempty" json:"run_id,omitempty"`
|
||||
Engine string `msgpack:"engine,omitempty" json:"engine,omitempty"`
|
||||
Pipeline string `msgpack:"pipeline,omitempty" json:"pipeline,omitempty"`
|
||||
SubmittedAt string `msgpack:"submitted_at,omitempty" json:"submitted_at,omitempty"`
|
||||
Error string `msgpack:"error,omitempty" json:"error,omitempty"`
|
||||
AvailablePipelines []string `msgpack:"available_pipelines,omitempty" json:"available_pipelines,omitempty"`
|
||||
}
|
||||
// Common
|
||||
type ErrorResponse = pb.ErrorResponse
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// Chat Handler
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// Chat
|
||||
type LoginEvent = pb.LoginEvent
|
||||
type GreetingRequest = pb.GreetingRequest
|
||||
type GreetingResponse = pb.GreetingResponse
|
||||
type ChatRequest = pb.ChatRequest
|
||||
type ChatResponse = pb.ChatResponse
|
||||
type ChatStreamChunk = pb.ChatStreamChunk
|
||||
|
||||
// ChatRequest is an incoming chat message.
|
||||
type ChatRequest struct {
|
||||
RequestID string `msgpack:"request_id" json:"request_id"`
|
||||
UserID string `msgpack:"user_id" json:"user_id"`
|
||||
Message string `msgpack:"message" json:"message"`
|
||||
Query string `msgpack:"query,omitempty" json:"query,omitempty"`
|
||||
Premium bool `msgpack:"premium,omitempty" json:"premium,omitempty"`
|
||||
EnableRAG bool `msgpack:"enable_rag,omitempty" json:"enable_rag,omitempty"`
|
||||
EnableReranker bool `msgpack:"enable_reranker,omitempty" json:"enable_reranker,omitempty"`
|
||||
EnableStreaming bool `msgpack:"enable_streaming,omitempty" json:"enable_streaming,omitempty"`
|
||||
TopK int `msgpack:"top_k,omitempty" json:"top_k,omitempty"`
|
||||
Collection string `msgpack:"collection,omitempty" json:"collection,omitempty"`
|
||||
EnableTTS bool `msgpack:"enable_tts,omitempty" json:"enable_tts,omitempty"`
|
||||
SystemPrompt string `msgpack:"system_prompt,omitempty" json:"system_prompt,omitempty"`
|
||||
ResponseSubject string `msgpack:"response_subject,omitempty" json:"response_subject,omitempty"`
|
||||
}
|
||||
// Voice
|
||||
type VoiceRequest = pb.VoiceRequest
|
||||
type VoiceResponse = pb.VoiceResponse
|
||||
type DocumentSource = pb.DocumentSource
|
||||
|
||||
// TTS
|
||||
type TTSRequest = pb.TTSRequest
|
||||
type TTSAudioChunk = pb.TTSAudioChunk
|
||||
type TTSFullResponse = pb.TTSFullResponse
|
||||
type TTSStatus = pb.TTSStatus
|
||||
type TTSVoiceInfo = pb.TTSVoiceInfo
|
||||
type TTSVoiceListResponse = pb.TTSVoiceListResponse
|
||||
type TTSVoiceRefreshResponse = pb.TTSVoiceRefreshResponse
|
||||
|
||||
// STT
|
||||
type STTStreamMessage = pb.STTStreamMessage
|
||||
type STTTranscription = pb.STTTranscription
|
||||
type STTInterrupt = pb.STTInterrupt
|
||||
|
||||
// Pipeline
|
||||
type PipelineTrigger = pb.PipelineTrigger
|
||||
type PipelineStatus = pb.PipelineStatus
|
||||
|
||||
// ════════════════════════════════════════════════════════════════════════════
|
||||
// Helpers
|
||||
// ════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
// EffectiveQuery returns Message or falls back to Query.
|
||||
func (c *ChatRequest) EffectiveQuery() string {
|
||||
if c.Message != "" {
|
||||
return c.Message
|
||||
func EffectiveQuery(c *ChatRequest) string {
|
||||
if c.GetMessage() != "" {
|
||||
return c.GetMessage()
|
||||
}
|
||||
return c.Query
|
||||
return c.GetQuery()
|
||||
}
|
||||
|
||||
// ChatResponse is the full reply to a chat request.
|
||||
type ChatResponse struct {
|
||||
UserID string `msgpack:"user_id" json:"user_id"`
|
||||
Response string `msgpack:"response" json:"response"`
|
||||
ResponseText string `msgpack:"response_text" json:"response_text"`
|
||||
UsedRAG bool `msgpack:"used_rag" json:"used_rag"`
|
||||
RAGSources []string `msgpack:"rag_sources,omitempty" json:"rag_sources,omitempty"`
|
||||
Success bool `msgpack:"success" json:"success"`
|
||||
Audio []byte `msgpack:"audio,omitempty" json:"audio,omitempty"`
|
||||
Error string `msgpack:"error,omitempty" json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// ChatStreamChunk is a single streaming chunk from an LLM response.
|
||||
type ChatStreamChunk struct {
|
||||
RequestID string `msgpack:"request_id" json:"request_id"`
|
||||
Type string `msgpack:"type" json:"type"`
|
||||
Content string `msgpack:"content" json:"content"`
|
||||
Done bool `msgpack:"done" json:"done"`
|
||||
Timestamp int64 `msgpack:"timestamp" json:"timestamp"`
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// Voice Assistant
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// VoiceRequest is an incoming voice-to-voice request.
|
||||
type VoiceRequest struct {
|
||||
RequestID string `msgpack:"request_id" json:"request_id"`
|
||||
Audio []byte `msgpack:"audio" json:"audio"`
|
||||
Language string `msgpack:"language,omitempty" json:"language,omitempty"`
|
||||
Collection string `msgpack:"collection,omitempty" json:"collection,omitempty"`
|
||||
}
|
||||
|
||||
// VoiceResponse is the reply to a voice request.
|
||||
type VoiceResponse struct {
|
||||
RequestID string `msgpack:"request_id" json:"request_id"`
|
||||
Response string `msgpack:"response" json:"response"`
|
||||
Audio []byte `msgpack:"audio" json:"audio"`
|
||||
Transcription string `msgpack:"transcription,omitempty" json:"transcription,omitempty"`
|
||||
Sources []DocumentSource `msgpack:"sources,omitempty" json:"sources,omitempty"`
|
||||
Error string `msgpack:"error,omitempty" json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// DocumentSource is a RAG search result source.
|
||||
type DocumentSource struct {
|
||||
Text string `msgpack:"text" json:"text"`
|
||||
Score float64 `msgpack:"score" json:"score"`
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// TTS Module
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// TTSRequest is a text-to-speech synthesis request.
|
||||
type TTSRequest struct {
|
||||
Text string `msgpack:"text" json:"text"`
|
||||
Speaker string `msgpack:"speaker,omitempty" json:"speaker,omitempty"`
|
||||
Language string `msgpack:"language,omitempty" json:"language,omitempty"`
|
||||
SpeakerWavB64 string `msgpack:"speaker_wav_b64,omitempty" json:"speaker_wav_b64,omitempty"`
|
||||
Stream bool `msgpack:"stream,omitempty" json:"stream,omitempty"`
|
||||
}
|
||||
|
||||
// TTSAudioChunk is a streamed audio chunk from TTS synthesis.
|
||||
type TTSAudioChunk struct {
|
||||
SessionID string `msgpack:"session_id" json:"session_id"`
|
||||
ChunkIndex int `msgpack:"chunk_index" json:"chunk_index"`
|
||||
TotalChunks int `msgpack:"total_chunks" json:"total_chunks"`
|
||||
Audio []byte `msgpack:"audio" json:"audio"`
|
||||
IsLast bool `msgpack:"is_last" json:"is_last"`
|
||||
Timestamp int64 `msgpack:"timestamp" json:"timestamp"`
|
||||
SampleRate int `msgpack:"sample_rate" json:"sample_rate"`
|
||||
}
|
||||
|
||||
// TTSFullResponse is a non-streamed TTS response (whole audio).
|
||||
type TTSFullResponse struct {
|
||||
SessionID string `msgpack:"session_id" json:"session_id"`
|
||||
Audio []byte `msgpack:"audio" json:"audio"`
|
||||
Timestamp int64 `msgpack:"timestamp" json:"timestamp"`
|
||||
SampleRate int `msgpack:"sample_rate" json:"sample_rate"`
|
||||
}
|
||||
|
||||
// TTSStatus is a TTS processing status update.
|
||||
type TTSStatus struct {
|
||||
SessionID string `msgpack:"session_id" json:"session_id"`
|
||||
Status string `msgpack:"status" json:"status"`
|
||||
Message string `msgpack:"message" json:"message"`
|
||||
Timestamp int64 `msgpack:"timestamp" json:"timestamp"`
|
||||
}
|
||||
|
||||
// TTSVoiceListResponse is the reply to a voice list request.
|
||||
type TTSVoiceListResponse struct {
|
||||
DefaultSpeaker string `msgpack:"default_speaker" json:"default_speaker"`
|
||||
CustomVoices []TTSVoiceInfo `msgpack:"custom_voices" json:"custom_voices"`
|
||||
LastRefresh int64 `msgpack:"last_refresh" json:"last_refresh"`
|
||||
Timestamp int64 `msgpack:"timestamp" json:"timestamp"`
|
||||
}
|
||||
|
||||
// TTSVoiceInfo is summary info about a custom voice.
|
||||
type TTSVoiceInfo struct {
|
||||
Name string `msgpack:"name" json:"name"`
|
||||
Language string `msgpack:"language" json:"language"`
|
||||
ModelType string `msgpack:"model_type" json:"model_type"`
|
||||
CreatedAt string `msgpack:"created_at" json:"created_at"`
|
||||
}
|
||||
|
||||
// TTSVoiceRefreshResponse is the reply to a voice refresh request.
|
||||
type TTSVoiceRefreshResponse struct {
|
||||
Count int `msgpack:"count" json:"count"`
|
||||
CustomVoices []TTSVoiceInfo `msgpack:"custom_voices" json:"custom_voices"`
|
||||
Timestamp int64 `msgpack:"timestamp" json:"timestamp"`
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// STT Module
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// STTStreamMessage is any message on the ai.voice.stream.{session} subject.
|
||||
type STTStreamMessage struct {
|
||||
Type string `msgpack:"type" json:"type"`
|
||||
Audio []byte `msgpack:"audio,omitempty" json:"audio,omitempty"`
|
||||
State string `msgpack:"state,omitempty" json:"state,omitempty"`
|
||||
SpeakerID string `msgpack:"speaker_id,omitempty" json:"speaker_id,omitempty"`
|
||||
}
|
||||
|
||||
// STTTranscription is the transcription result published by the STT module.
|
||||
type STTTranscription struct {
|
||||
SessionID string `msgpack:"session_id" json:"session_id"`
|
||||
Transcript string `msgpack:"transcript" json:"transcript"`
|
||||
Sequence int `msgpack:"sequence" json:"sequence"`
|
||||
IsPartial bool `msgpack:"is_partial" json:"is_partial"`
|
||||
IsFinal bool `msgpack:"is_final" json:"is_final"`
|
||||
Timestamp int64 `msgpack:"timestamp" json:"timestamp"`
|
||||
SpeakerID string `msgpack:"speaker_id" json:"speaker_id"`
|
||||
HasVoiceActivity bool `msgpack:"has_voice_activity" json:"has_voice_activity"`
|
||||
State string `msgpack:"state" json:"state"`
|
||||
}
|
||||
|
||||
// STTInterrupt is published when the STT module detects a user interrupt.
|
||||
type STTInterrupt struct {
|
||||
SessionID string `msgpack:"session_id" json:"session_id"`
|
||||
Type string `msgpack:"type" json:"type"`
|
||||
Timestamp int64 `msgpack:"timestamp" json:"timestamp"`
|
||||
SpeakerID string `msgpack:"speaker_id" json:"speaker_id"`
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// Common / Error
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// ErrorResponse is the standard error reply from any handler.
|
||||
type ErrorResponse struct {
|
||||
Error bool `msgpack:"error" json:"error"`
|
||||
Message string `msgpack:"message" json:"message"`
|
||||
Type string `msgpack:"type" json:"type"`
|
||||
}
|
||||
|
||||
// Timestamp returns the current Unix timestamp (helper for message construction).
|
||||
// Timestamp returns the current Unix timestamp.
|
||||
func Timestamp() int64 {
|
||||
return time.Now().Unix()
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Package natsutil provides a NATS/JetStream client with msgpack serialization.
|
||||
// Package natsutil provides a NATS/JetStream client with protobuf serialization.
|
||||
package natsutil
|
||||
|
||||
import (
|
||||
@@ -7,10 +7,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// Client wraps a NATS connection with msgpack helpers.
|
||||
// Client wraps a NATS connection with protobuf helpers.
|
||||
type Client struct {
|
||||
nc *nats.Conn
|
||||
js nats.JetStreamContext
|
||||
@@ -97,46 +97,34 @@ func (c *Client) Subscribe(subject string, handler nats.MsgHandler, queue string
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publish encodes data as msgpack and publishes to the subject.
|
||||
func (c *Client) Publish(subject string, data any) error {
|
||||
payload, err := msgpack.Marshal(data)
|
||||
// Publish encodes data as protobuf and publishes to the subject.
|
||||
func (c *Client) Publish(subject string, data proto.Message) error {
|
||||
payload, err := proto.Marshal(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("msgpack marshal: %w", err)
|
||||
return fmt.Errorf("proto marshal: %w", err)
|
||||
}
|
||||
return c.nc.Publish(subject, payload)
|
||||
}
|
||||
|
||||
// Request sends a msgpack-encoded request and decodes the response into result.
|
||||
func (c *Client) Request(subject string, data any, result any, timeout time.Duration) error {
|
||||
payload, err := msgpack.Marshal(data)
|
||||
// PublishRaw publishes pre-encoded bytes to the subject.
|
||||
func (c *Client) PublishRaw(subject string, data []byte) error {
|
||||
return c.nc.Publish(subject, data)
|
||||
}
|
||||
|
||||
// Request sends a protobuf-encoded request and decodes the response into result.
|
||||
func (c *Client) Request(subject string, data proto.Message, result proto.Message, timeout time.Duration) error {
|
||||
payload, err := proto.Marshal(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("msgpack marshal: %w", err)
|
||||
return fmt.Errorf("proto marshal: %w", err)
|
||||
}
|
||||
msg, err := c.nc.Request(subject, payload, timeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("nats request: %w", err)
|
||||
}
|
||||
return msgpack.Unmarshal(msg.Data, result)
|
||||
return proto.Unmarshal(msg.Data, result)
|
||||
}
|
||||
|
||||
// DecodeMsgpack decodes msgpack-encoded NATS message data into dest.
|
||||
func DecodeMsgpack(msg *nats.Msg, dest any) error {
|
||||
return msgpack.Unmarshal(msg.Data, dest)
|
||||
}
|
||||
|
||||
// Decode is a generic helper that unmarshals msgpack bytes into T.
|
||||
// Usage: req, err := natsutil.Decode[messages.ChatRequest](msg.Data)
|
||||
func Decode[T any](data []byte) (T, error) {
|
||||
var v T
|
||||
err := msgpack.Unmarshal(data, &v)
|
||||
return v, err
|
||||
}
|
||||
|
||||
// DecodeMsgpackMap decodes msgpack data into a generic map.
|
||||
func DecodeMsgpackMap(data []byte) (map[string]any, error) {
|
||||
var m map[string]any
|
||||
if err := msgpack.Unmarshal(data, &m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
// Decode unmarshals protobuf bytes into dest.
|
||||
func Decode(data []byte, dest proto.Message) error {
|
||||
return proto.Unmarshal(data, dest)
|
||||
}
|
||||
|
||||
@@ -3,254 +3,212 @@ package natsutil
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
pb "git.daviestechlabs.io/daviestechlabs/handler-base/gen/messagespb"
|
||||
)
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// DecodeMsgpackMap tests
|
||||
// Decode tests
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestDecodeMsgpackMap_Roundtrip(t *testing.T) {
|
||||
orig := map[string]any{
|
||||
"request_id": "req-001",
|
||||
"user_id": "user-42",
|
||||
"premium": true,
|
||||
"top_k": int64(10), // msgpack decodes ints as int64
|
||||
func TestDecode_ChatRequest_Roundtrip(t *testing.T) {
|
||||
orig := &pb.ChatRequest{
|
||||
RequestId: "req-001",
|
||||
UserId: "user-42",
|
||||
Premium: true,
|
||||
TopK: 10,
|
||||
}
|
||||
data, err := msgpack.Marshal(orig)
|
||||
data, err := proto.Marshal(orig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
decoded, err := DecodeMsgpackMap(data)
|
||||
if err != nil {
|
||||
var decoded pb.ChatRequest
|
||||
if err := Decode(data, &decoded); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if decoded["request_id"] != "req-001" {
|
||||
t.Errorf("request_id = %v", decoded["request_id"])
|
||||
if decoded.GetRequestId() != "req-001" {
|
||||
t.Errorf("RequestId = %v", decoded.GetRequestId())
|
||||
}
|
||||
if decoded["premium"] != true {
|
||||
t.Errorf("premium = %v", decoded["premium"])
|
||||
if decoded.GetUserId() != "user-42" {
|
||||
t.Errorf("UserId = %v", decoded.GetUserId())
|
||||
}
|
||||
if decoded.GetPremium() != true {
|
||||
t.Errorf("Premium = %v", decoded.GetPremium())
|
||||
}
|
||||
if decoded.GetTopK() != 10 {
|
||||
t.Errorf("TopK = %v", decoded.GetTopK())
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeMsgpackMap_Empty(t *testing.T) {
|
||||
data, _ := msgpack.Marshal(map[string]any{})
|
||||
m, err := DecodeMsgpackMap(data)
|
||||
func TestDecode_EmptyMessage(t *testing.T) {
|
||||
data, err := proto.Marshal(&pb.ChatRequest{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(m) != 0 {
|
||||
t.Errorf("expected empty map, got %v", m)
|
||||
var decoded pb.ChatRequest
|
||||
if err := Decode(data, &decoded); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if decoded.GetRequestId() != "" {
|
||||
t.Errorf("expected empty RequestId, got %q", decoded.GetRequestId())
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeMsgpackMap_InvalidData(t *testing.T) {
|
||||
_, err := DecodeMsgpackMap([]byte{0xFF, 0xFE})
|
||||
func TestDecode_InvalidData(t *testing.T) {
|
||||
err := Decode([]byte{0xFF, 0xFE}, &pb.ChatRequest{})
|
||||
if err == nil {
|
||||
t.Error("expected error for invalid msgpack data")
|
||||
t.Error("expected error for invalid protobuf data")
|
||||
}
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// DecodeMsgpack (typed struct) tests
|
||||
// Typed struct roundtrip tests
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
type testMessage struct {
|
||||
RequestID string `msgpack:"request_id"`
|
||||
UserID string `msgpack:"user_id"`
|
||||
Count int `msgpack:"count"`
|
||||
Active bool `msgpack:"active"`
|
||||
func TestDecode_VoiceResponse_Roundtrip(t *testing.T) {
|
||||
orig := &pb.VoiceResponse{
|
||||
RequestId: "vr-001",
|
||||
Response: "The capital of France is Paris.",
|
||||
Transcription: "What is the capital of France?",
|
||||
}
|
||||
|
||||
func TestDecodeMsgpackTyped_Roundtrip(t *testing.T) {
|
||||
orig := testMessage{
|
||||
RequestID: "req-typed-001",
|
||||
UserID: "user-7",
|
||||
Count: 42,
|
||||
Active: true,
|
||||
}
|
||||
data, err := msgpack.Marshal(orig)
|
||||
data, err := proto.Marshal(orig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Simulate nats.Msg data decoding.
|
||||
var decoded testMessage
|
||||
if err := msgpack.Unmarshal(data, &decoded); err != nil {
|
||||
var decoded pb.VoiceResponse
|
||||
if err := Decode(data, &decoded); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if decoded.RequestID != orig.RequestID {
|
||||
t.Errorf("RequestID = %q, want %q", decoded.RequestID, orig.RequestID)
|
||||
if decoded.GetRequestId() != orig.GetRequestId() {
|
||||
t.Errorf("RequestId = %q, want %q", decoded.GetRequestId(), orig.GetRequestId())
|
||||
}
|
||||
if decoded.Count != orig.Count {
|
||||
t.Errorf("Count = %d, want %d", decoded.Count, orig.Count)
|
||||
if decoded.GetResponse() != orig.GetResponse() {
|
||||
t.Errorf("Response = %q, want %q", decoded.GetResponse(), orig.GetResponse())
|
||||
}
|
||||
if decoded.Active != orig.Active {
|
||||
t.Errorf("Active = %v, want %v", decoded.Active, orig.Active)
|
||||
if decoded.GetTranscription() != orig.GetTranscription() {
|
||||
t.Errorf("Transcription = %q, want %q", decoded.GetTranscription(), orig.GetTranscription())
|
||||
}
|
||||
}
|
||||
|
||||
// TestTypedStructDecodesMapEncoding verifies that a typed struct can be
|
||||
// decoded from data that was encoded as map[string]any (backwards compat).
|
||||
func TestTypedStructDecodesMapEncoding(t *testing.T) {
|
||||
// Encode as map (the old way).
|
||||
mapData := map[string]any{
|
||||
"request_id": "req-compat",
|
||||
"user_id": "user-compat",
|
||||
"count": int64(99),
|
||||
"active": false,
|
||||
func TestDecode_ErrorResponse_Roundtrip(t *testing.T) {
|
||||
orig := &pb.ErrorResponse{
|
||||
Error: true,
|
||||
Message: "something broke",
|
||||
Type: "InternalError",
|
||||
}
|
||||
data, err := msgpack.Marshal(mapData)
|
||||
data, err := proto.Marshal(orig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Decode into typed struct (the new way).
|
||||
var msg testMessage
|
||||
if err := msgpack.Unmarshal(data, &msg); err != nil {
|
||||
var decoded pb.ErrorResponse
|
||||
if err := Decode(data, &decoded); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if msg.RequestID != "req-compat" {
|
||||
t.Errorf("RequestID = %q", msg.RequestID)
|
||||
if !decoded.GetError() {
|
||||
t.Error("expected Error=true")
|
||||
}
|
||||
if msg.Count != 99 {
|
||||
t.Errorf("Count = %d, want 99", msg.Count)
|
||||
if decoded.GetMessage() != "something broke" {
|
||||
t.Errorf("Message = %q", decoded.GetMessage())
|
||||
}
|
||||
if decoded.GetType() != "InternalError" {
|
||||
t.Errorf("Type = %q", decoded.GetType())
|
||||
}
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// Binary data tests (audio []byte in msgpack)
|
||||
// Binary data tests (audio []byte in protobuf)
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
type audioMessage struct {
|
||||
SessionID string `msgpack:"session_id"`
|
||||
Audio []byte `msgpack:"audio"`
|
||||
SampleRate int `msgpack:"sample_rate"`
|
||||
}
|
||||
|
||||
func TestBinaryDataRoundtrip(t *testing.T) {
|
||||
audio := make([]byte, 32768)
|
||||
for i := range audio {
|
||||
audio[i] = byte(i % 256)
|
||||
}
|
||||
|
||||
orig := audioMessage{
|
||||
SessionID: "sess-audio-001",
|
||||
orig := &pb.TTSAudioChunk{
|
||||
SessionId: "sess-audio-001",
|
||||
Audio: audio,
|
||||
SampleRate: 24000,
|
||||
}
|
||||
data, err := msgpack.Marshal(orig)
|
||||
data, err := proto.Marshal(orig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var decoded audioMessage
|
||||
if err := msgpack.Unmarshal(data, &decoded); err != nil {
|
||||
var decoded pb.TTSAudioChunk
|
||||
if err := Decode(data, &decoded); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(decoded.Audio) != len(orig.Audio) {
|
||||
t.Fatalf("audio len = %d, want %d", len(decoded.Audio), len(orig.Audio))
|
||||
if len(decoded.GetAudio()) != len(orig.GetAudio()) {
|
||||
t.Fatalf("audio len = %d, want %d", len(decoded.GetAudio()), len(orig.GetAudio()))
|
||||
}
|
||||
for i := range decoded.Audio {
|
||||
if decoded.Audio[i] != orig.Audio[i] {
|
||||
t.Fatalf("audio[%d] = %d, want %d", i, decoded.Audio[i], orig.Audio[i])
|
||||
for i := range decoded.GetAudio() {
|
||||
if decoded.GetAudio()[i] != orig.GetAudio()[i] {
|
||||
t.Fatalf("audio[%d] = %d, want %d", i, decoded.GetAudio()[i], orig.GetAudio()[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestBinaryVsBase64Size shows the wire-size win of raw bytes vs base64 string.
|
||||
func TestBinaryVsBase64Size(t *testing.T) {
|
||||
// TestProtoWireSize shows protobuf wire size for binary payloads.
|
||||
func TestProtoWireSize(t *testing.T) {
|
||||
audio := make([]byte, 16384)
|
||||
|
||||
// Old approach: base64 string in map.
|
||||
import_b64 := make([]byte, (len(audio)*4+2)/3) // approximate base64 size
|
||||
mapMsg := map[string]any{
|
||||
"session_id": "sess-1",
|
||||
"audio_b64": string(import_b64),
|
||||
}
|
||||
mapData, _ := msgpack.Marshal(mapMsg)
|
||||
|
||||
// New approach: raw bytes in struct.
|
||||
structMsg := audioMessage{
|
||||
SessionID: "sess-1",
|
||||
msg := &pb.TTSAudioChunk{
|
||||
SessionId: "sess-1",
|
||||
Audio: audio,
|
||||
}
|
||||
structData, _ := msgpack.Marshal(structMsg)
|
||||
data, _ := proto.Marshal(msg)
|
||||
|
||||
t.Logf("base64-in-map: %d bytes, raw-bytes-in-struct: %d bytes (%.0f%% smaller)",
|
||||
len(mapData), len(structData),
|
||||
100*(1-float64(len(structData))/float64(len(mapData))))
|
||||
t.Logf("TTSAudioChunk with 16KB audio: %d bytes on wire", len(data))
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// Benchmarks
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func BenchmarkEncodeMap(b *testing.B) {
|
||||
data := map[string]any{
|
||||
"request_id": "req-bench",
|
||||
"user_id": "user-bench",
|
||||
"message": "What is the weather today?",
|
||||
"premium": true,
|
||||
"top_k": 10,
|
||||
func BenchmarkEncode_ChatRequest(b *testing.B) {
|
||||
data := &pb.ChatRequest{
|
||||
RequestId: "req-bench",
|
||||
UserId: "user-bench",
|
||||
Message: "What is the weather today?",
|
||||
Premium: true,
|
||||
TopK: 10,
|
||||
}
|
||||
for b.Loop() {
|
||||
_, _ = msgpack.Marshal(data)
|
||||
_, _ = proto.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncodeStruct(b *testing.B) {
|
||||
data := testMessage{
|
||||
RequestID: "req-bench",
|
||||
UserID: "user-bench",
|
||||
Count: 10,
|
||||
Active: true,
|
||||
}
|
||||
for b.Loop() {
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecodeMap(b *testing.B) {
|
||||
raw, _ := msgpack.Marshal(map[string]any{
|
||||
"request_id": "req-bench",
|
||||
"user_id": "user-bench",
|
||||
"message": "What is the weather today?",
|
||||
"premium": true,
|
||||
"top_k": 10,
|
||||
func BenchmarkDecode_ChatRequest(b *testing.B) {
|
||||
raw, _ := proto.Marshal(&pb.ChatRequest{
|
||||
RequestId: "req-bench",
|
||||
UserId: "user-bench",
|
||||
Message: "What is the weather today?",
|
||||
Premium: true,
|
||||
TopK: 10,
|
||||
})
|
||||
for b.Loop() {
|
||||
var m map[string]any
|
||||
_ = msgpack.Unmarshal(raw, &m)
|
||||
var m pb.ChatRequest
|
||||
_ = Decode(raw, &m)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecodeStruct(b *testing.B) {
|
||||
raw, _ := msgpack.Marshal(testMessage{
|
||||
RequestID: "req-bench",
|
||||
UserID: "user-bench",
|
||||
Count: 10,
|
||||
Active: true,
|
||||
})
|
||||
for b.Loop() {
|
||||
var m testMessage
|
||||
_ = msgpack.Unmarshal(raw, &m)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecodeAudio32KB(b *testing.B) {
|
||||
raw, _ := msgpack.Marshal(audioMessage{
|
||||
SessionID: "s1",
|
||||
func BenchmarkDecode_Audio32KB(b *testing.B) {
|
||||
raw, _ := proto.Marshal(&pb.TTSAudioChunk{
|
||||
SessionId: "s1",
|
||||
Audio: make([]byte, 32768),
|
||||
SampleRate: 24000,
|
||||
})
|
||||
for b.Loop() {
|
||||
var m audioMessage
|
||||
_ = msgpack.Unmarshal(raw, &m)
|
||||
var m pb.TTSAudioChunk
|
||||
_ = Decode(raw, &m)
|
||||
}
|
||||
}
|
||||
|
||||
257
proto/messages/v1/messages.proto
Normal file
257
proto/messages/v1/messages.proto
Normal file
@@ -0,0 +1,257 @@
|
||||
// Homelab AI service message contracts.
|
||||
//
|
||||
// This is the single source of truth for all NATS message types.
|
||||
// Generated Go code lives in handler-base/gen/messagespb.
|
||||
//
|
||||
// Naming: field numbers are stable across versions — add new fields,
|
||||
// never reuse or renumber existing ones.
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package messages.v1;
|
||||
|
||||
option go_package = "git.daviestechlabs.io/daviestechlabs/handler-base/gen/messagespb";
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Common
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// ErrorResponse is the standard error reply from any handler.
|
||||
message ErrorResponse {
|
||||
bool error = 1;
|
||||
string message = 2;
|
||||
string type = 3;
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Chat (companions-frontend ↔ chat-handler)
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// LoginEvent is published when a user authenticates.
|
||||
// Subject: ai.chat.user.{user_id}.login
|
||||
message LoginEvent {
|
||||
string user_id = 1;
|
||||
string username = 2;
|
||||
string nickname = 3;
|
||||
bool premium = 4;
|
||||
int64 timestamp = 5; // Unix seconds
|
||||
}
|
||||
|
||||
// GreetingRequest asks the LLM to generate a personalised greeting.
|
||||
// Subject: ai.chat.user.{user_id}.greeting.request
|
||||
message GreetingRequest {
|
||||
string user_id = 1;
|
||||
string username = 2;
|
||||
string nickname = 3;
|
||||
bool premium = 4;
|
||||
}
|
||||
|
||||
// GreetingResponse carries the generated greeting text.
|
||||
// Subject: ai.chat.user.{user_id}.greeting.response
|
||||
message GreetingResponse {
|
||||
string user_id = 1;
|
||||
string greeting = 2;
|
||||
}
|
||||
|
||||
// ChatRequest is an incoming chat message routed via NATS.
|
||||
// Subject: ai.chat.user.{user_id}.message
|
||||
message ChatRequest {
|
||||
string request_id = 1;
|
||||
string user_id = 2;
|
||||
string username = 3;
|
||||
string message = 4;
|
||||
string query = 5; // alternative to message (EffectiveQuery picks first non-empty)
|
||||
bool premium = 6;
|
||||
bool enable_rag = 7;
|
||||
bool enable_reranker = 8;
|
||||
bool enable_streaming = 9;
|
||||
int32 top_k = 10;
|
||||
string collection = 11;
|
||||
bool enable_tts = 12;
|
||||
string system_prompt = 13;
|
||||
string response_subject = 14;
|
||||
}
|
||||
|
||||
// ChatResponse is the full reply to a ChatRequest.
|
||||
// Subject: ai.chat.response.{request_id} (or ChatRequest.response_subject)
|
||||
message ChatResponse {
|
||||
string user_id = 1;
|
||||
string response = 2;
|
||||
string response_text = 3;
|
||||
bool used_rag = 4;
|
||||
repeated string rag_sources = 5;
|
||||
bool success = 6;
|
||||
bytes audio = 7;
|
||||
string error = 8;
|
||||
}
|
||||
|
||||
// ChatStreamChunk is one piece of a streaming LLM response.
|
||||
// Subject: ai.chat.response.stream.{request_id}
|
||||
message ChatStreamChunk {
|
||||
string request_id = 1;
|
||||
string type = 2; // "chunk" | "done"
|
||||
string content = 3;
|
||||
bool done = 4;
|
||||
int64 timestamp = 5;
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Voice Assistant
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// VoiceRequest is an incoming voice-to-voice request.
|
||||
// Subject: ai.voice.request
|
||||
message VoiceRequest {
|
||||
string request_id = 1;
|
||||
bytes audio = 2;
|
||||
string language = 3;
|
||||
string collection = 4;
|
||||
}
|
||||
|
||||
// DocumentSource is a single RAG search-result citation.
|
||||
message DocumentSource {
|
||||
string text = 1;
|
||||
double score = 2;
|
||||
}
|
||||
|
||||
// VoiceResponse is the reply to a VoiceRequest.
|
||||
// Subject: ai.voice.response.{request_id}
|
||||
message VoiceResponse {
|
||||
string request_id = 1;
|
||||
string response = 2;
|
||||
bytes audio = 3;
|
||||
string transcription = 4;
|
||||
repeated DocumentSource sources = 5;
|
||||
string error = 6;
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// TTS Module
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// TTSRequest is a text-to-speech synthesis request.
|
||||
// Subject: ai.voice.tts.request.{session_id}
|
||||
message TTSRequest {
|
||||
string text = 1;
|
||||
string speaker = 2;
|
||||
string language = 3;
|
||||
string speaker_wav_b64 = 4;
|
||||
bool stream = 5;
|
||||
}
|
||||
|
||||
// TTSAudioChunk is a streamed audio chunk from TTS synthesis.
|
||||
// Subject: ai.voice.tts.audio.{session_id}
|
||||
message TTSAudioChunk {
|
||||
string session_id = 1;
|
||||
int32 chunk_index = 2;
|
||||
int32 total_chunks = 3;
|
||||
bytes audio = 4;
|
||||
bool is_last = 5;
|
||||
int64 timestamp = 6;
|
||||
int32 sample_rate = 7;
|
||||
}
|
||||
|
||||
// TTSFullResponse is a non-streamed TTS response (whole audio blob).
|
||||
// Subject: ai.voice.tts.audio.{session_id}
|
||||
message TTSFullResponse {
|
||||
string session_id = 1;
|
||||
bytes audio = 2;
|
||||
int64 timestamp = 3;
|
||||
int32 sample_rate = 4;
|
||||
}
|
||||
|
||||
// TTSStatus is a TTS processing status update.
|
||||
// Subject: ai.voice.tts.status.{session_id}
|
||||
message TTSStatus {
|
||||
string session_id = 1;
|
||||
string status = 2;
|
||||
string message = 3;
|
||||
int64 timestamp = 4;
|
||||
}
|
||||
|
||||
// TTSVoiceInfo is summary info about a custom voice.
|
||||
message TTSVoiceInfo {
|
||||
string name = 1;
|
||||
string language = 2;
|
||||
string model_type = 3;
|
||||
string created_at = 4;
|
||||
}
|
||||
|
||||
// TTSVoiceListResponse is the reply to a voice list request.
|
||||
// Subject: ai.voice.tts.voices.list (request-reply)
|
||||
message TTSVoiceListResponse {
|
||||
string default_speaker = 1;
|
||||
repeated TTSVoiceInfo custom_voices = 2;
|
||||
int64 last_refresh = 3;
|
||||
int64 timestamp = 4;
|
||||
}
|
||||
|
||||
// TTSVoiceRefreshResponse is the reply to a voice refresh request.
|
||||
// Subject: ai.voice.tts.voices.refresh (request-reply)
|
||||
message TTSVoiceRefreshResponse {
|
||||
int32 count = 1;
|
||||
repeated TTSVoiceInfo custom_voices = 2;
|
||||
int64 timestamp = 3;
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// STT Module
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// STTStreamMessage is any message on the ai.voice.stream.{session_id} subject.
|
||||
message STTStreamMessage {
|
||||
string type = 1; // "start" | "chunk" | "state_change" | "end"
|
||||
bytes audio = 2;
|
||||
string state = 3;
|
||||
string speaker_id = 4;
|
||||
}
|
||||
|
||||
// STTTranscription is the transcription result published by the STT module.
|
||||
// Subject: ai.voice.transcription.{session_id}
|
||||
message STTTranscription {
|
||||
string session_id = 1;
|
||||
string transcript = 2;
|
||||
int32 sequence = 3;
|
||||
bool is_partial = 4;
|
||||
bool is_final = 5;
|
||||
int64 timestamp = 6;
|
||||
string speaker_id = 7;
|
||||
bool has_voice_activity = 8;
|
||||
string state = 9;
|
||||
}
|
||||
|
||||
// STTInterrupt is published when the STT module detects a user interrupt.
|
||||
// Subject: ai.voice.transcription.{session_id}
|
||||
message STTInterrupt {
|
||||
string session_id = 1;
|
||||
string type = 2; // "interrupt"
|
||||
int64 timestamp = 3;
|
||||
string speaker_id = 4;
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Pipeline Bridge
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// PipelineTrigger is the request to start a pipeline.
|
||||
// Subject: ai.pipeline.trigger
|
||||
message PipelineTrigger {
|
||||
string request_id = 1;
|
||||
string pipeline = 2;
|
||||
// Protobuf Struct could be used here, but a simple string map covers
|
||||
// all current use-cases and avoids a google/protobuf import.
|
||||
map<string, string> parameters = 3;
|
||||
}
|
||||
|
||||
// PipelineStatus is the response / status update for a pipeline run.
|
||||
// Subject: ai.pipeline.status.{request_id}
|
||||
message PipelineStatus {
|
||||
string request_id = 1;
|
||||
string status = 2;
|
||||
string run_id = 3;
|
||||
string engine = 4;
|
||||
string pipeline = 5;
|
||||
string submitted_at = 6;
|
||||
string error = 7;
|
||||
repeated string available_pipelines = 8;
|
||||
}
|
||||
Reference in New Issue
Block a user