From 66ef7588086d05b850a83cf55e656586e57241da Mon Sep 17 00:00:00 2001 From: "Billy D." Date: Sat, 21 Feb 2026 15:30:17 -0500 Subject: [PATCH] feat: migrate from msgpack to protobuf (handler-base v1.0.0) - Replace msgpack encoding with protobuf wire format - Update field names to proto convention - Change Parameters type from map[string]any to map[string]string - Rewrite tests for proto round-trips --- e2e_test.go | 17 +++++++++-------- go.mod | 6 ++---- go.sum | 8 ++------ main.go | 28 +++++++++++++++------------- main_test.go | 34 +++++++++++++++++----------------- 5 files changed, 45 insertions(+), 48 deletions(-) diff --git a/e2e_test.go b/e2e_test.go index 9eea37a..f03c114 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "fmt" "net/http" "net/http/httptest" "sync/atomic" @@ -27,10 +28,10 @@ func TestSubmitArgoE2E_FullPayload(t *testing.T) { defer ts.Close() ctx := t.Context() - params := map[string]any{ + params := map[string]string{ "source": "s3://bucket/docs", "collection": "knowledge-base", - "batch_size": 100, + "batch_size": "100", } runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001") @@ -80,9 +81,9 @@ func TestSubmitKubeflowE2E_FullPayload(t *testing.T) { defer ts.Close() ctx := t.Context() - params := map[string]any{ + params := map[string]string{ "query": "what is kubernetes", - "top_k": 5, + "top_k": "5", "user_id": "test-user", } @@ -131,7 +132,7 @@ func TestPipelineDispatchE2E_UnknownPipeline(t *testing.T) { names = append(names, k) } resp := &messages.PipelineStatus{ - RequestID: "req-bad", + RequestId: "req-bad", Status: "error", Error: "Unknown pipeline: nonexistent-pipeline", AvailablePipelines: names, @@ -162,7 +163,7 @@ func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) { for i := 0; i < 10; i++ { go func() { _, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference", - map[string]any{"batch": i}, "req-concurrent") + map[string]string{"batch": fmt.Sprintf("%d", i)}, "req-concurrent") errs <- err }() } @@ -189,7 +190,7 @@ func BenchmarkSubmitArgo(b *testing.B) { defer ts.Close() ctx := b.Context() - params := map[string]any{"source": "test"} + params := map[string]string{"source": "test"} b.ResetTimer() for b.Loop() { @@ -204,7 +205,7 @@ func BenchmarkSubmitKubeflow(b *testing.B) { defer ts.Close() ctx := b.Context() - params := map[string]any{"query": "test"} + params := map[string]string{"query": "test"} b.ResetTimer() for b.Loop() { diff --git a/go.mod b/go.mod index 03184e3..79d4078 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module git.daviestechlabs.io/daviestechlabs/pipeline-bridge go 1.25.1 require ( - git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5 + git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0 github.com/nats-io/nats.go v1.48.0 - github.com/vmihailenco/msgpack/v5 v5.4.1 + google.golang.org/protobuf v1.36.11 ) require ( @@ -19,7 +19,6 @@ require ( github.com/klauspost/compress v1.18.0 // indirect github.com/nats-io/nkeys v0.4.11 // indirect github.com/nats-io/nuid v1.0.1 // indirect - github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel v1.40.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect @@ -37,5 +36,4 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/grpc v1.78.0 // indirect - google.golang.org/protobuf v1.36.11 // indirect ) diff --git a/go.sum b/go.sum index efceef0..5d7dfc1 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5 h1:DqYZpeluTXh5QKqdVFgN8YIMh4Ycqzw5E9+5FTNDFCA= -git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5/go.mod h1:M3HgvUDWnRn7cX3BE8l+HvoCUYtmRr5OoumB+hnRHoE= +git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0 h1:pB3ehOKaDYQfbyRBKQXrB9curqSFteLrDveoElRKnBY= +git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0/go.mod h1:zocOHFt8yY3cW4+Xi37sNr5Tw7KcjGFSZqgWYxPWyqA= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -33,10 +33,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= -github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= -github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= -github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= diff --git a/main.go b/main.go index f8dcbd0..3d3a189 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "git.daviestechlabs.io/daviestechlabs/handler-base/handler" "git.daviestechlabs.io/daviestechlabs/handler-base/messages" "git.daviestechlabs.io/daviestechlabs/handler-base/natsutil" + "google.golang.org/protobuf/proto" ) // Pipeline definitions — maps pipeline name to engine config. @@ -47,20 +48,20 @@ func main() { h := handler.New("ai.pipeline.trigger", cfg) - h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) { - req, err := natsutil.Decode[messages.PipelineTrigger](msg.Data) - if err != nil { + h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) { + var req messages.PipelineTrigger + if err := natsutil.Decode(msg.Data, &req); err != nil { return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil } - requestID := req.RequestID + requestID := req.RequestId if requestID == "" { requestID = "unknown" } pipelineName := req.Pipeline params := req.Parameters if params == nil { - params = map[string]any{} + params = map[string]string{} } slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID) @@ -73,7 +74,7 @@ func main() { names = append(names, k) } return &messages.PipelineStatus{ - RequestID: requestID, + RequestId: requestID, Status: "error", Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName), AvailablePipelines: names, @@ -81,6 +82,7 @@ func main() { } var runID string + var err error if pipeline.Engine == "argo" { runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID) @@ -91,16 +93,16 @@ func main() { if err != nil { slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err) return &messages.PipelineStatus{ - RequestID: requestID, + RequestId: requestID, Status: "error", Error: err.Error(), }, nil } result := &messages.PipelineStatus{ - RequestID: requestID, + RequestId: requestID, Status: "submitted", - RunID: runID, + RunId: runID, Engine: pipeline.Engine, Pipeline: pipelineName, SubmittedAt: time.Now().UTC().Format(time.RFC3339), @@ -118,10 +120,10 @@ func main() { } } -func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]any, requestID string) (string, error) { +func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]string, requestID string) (string, error) { argoParams := make([]map[string]string, 0, len(params)) for k, v := range params { - argoParams = append(argoParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)}) + argoParams = append(argoParams, map[string]string{"name": k, "value": v}) } workflow := map[string]any{ @@ -169,10 +171,10 @@ func submitArgo(ctx context.Context, client *http.Client, host, namespace, templ return result.Metadata.Name, nil } -func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]any, requestID string) (string, error) { +func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]string, requestID string) (string, error) { kfParams := make([]map[string]string, 0, len(params)) for k, v := range params { - kfParams = append(kfParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)}) + kfParams = append(kfParams, map[string]string{"name": k, "value": v}) } runRequest := map[string]any{ diff --git a/main_test.go b/main_test.go index 3fe4048..e540dae 100644 --- a/main_test.go +++ b/main_test.go @@ -8,25 +8,25 @@ import ( "git.daviestechlabs.io/daviestechlabs/handler-base/messages" "git.daviestechlabs.io/daviestechlabs/handler-base/natsutil" - "github.com/vmihailenco/msgpack/v5" + "google.golang.org/protobuf/proto" ) func TestPipelineTriggerDecode(t *testing.T) { - req := messages.PipelineTrigger{ - RequestID: "req-001", + req := &messages.PipelineTrigger{ + RequestId: "req-001", Pipeline: "document-ingestion", - Parameters: map[string]any{"source": "s3://bucket"}, + Parameters: map[string]string{"source": "s3://bucket"}, } - data, err := msgpack.Marshal(&req) + data, err := proto.Marshal(req) if err != nil { t.Fatal(err) } - decoded, err := natsutil.Decode[messages.PipelineTrigger](data) - if err != nil { + var decoded messages.PipelineTrigger + if err := natsutil.Decode(data, &decoded); err != nil { t.Fatal(err) } - if decoded.RequestID != "req-001" { - t.Errorf("RequestID = %q", decoded.RequestID) + if decoded.RequestId != "req-001" { + t.Errorf("RequestID = %q", decoded.RequestId) } if decoded.Pipeline != "document-ingestion" { t.Errorf("Pipeline = %q", decoded.Pipeline) @@ -38,22 +38,22 @@ func TestPipelineTriggerDecode(t *testing.T) { func TestPipelineStatusRoundtrip(t *testing.T) { status := messages.PipelineStatus{ - RequestID: "req-002", + RequestId: "req-002", Status: "submitted", - RunID: "argo-abc123", + RunId: "argo-abc123", Engine: "argo", Pipeline: "batch-inference", } - data, err := msgpack.Marshal(&status) + data, err := proto.Marshal(&status) if err != nil { t.Fatal(err) } var got messages.PipelineStatus - if err := msgpack.Unmarshal(data, &got); err != nil { + if err := proto.Unmarshal(data, &got); err != nil { t.Fatal(err) } - if got.RunID != "argo-abc123" { - t.Errorf("RunID = %q", got.RunID) + if got.RunId != "argo-abc123" { + t.Errorf("RunID = %q", got.RunId) } if got.Engine != "argo" { t.Errorf("Engine = %q", got.Engine) @@ -118,7 +118,7 @@ func TestSubmitArgo(t *testing.T) { defer ts.Close() ctx := t.Context() - runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]any{ + runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]string{ "source": "test", }, "req-001") if err != nil { @@ -160,7 +160,7 @@ func TestSubmitKubeflow(t *testing.T) { defer ts.Close() ctx := t.Context() - runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]any{ + runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]string{ "query": "test", }, "req-002") if err != nil {