2 Commits

Author SHA1 Message Date
66ef758808 feat: migrate from msgpack to protobuf (handler-base v1.0.0)
Some checks failed
CI / Lint (push) Successful in 3m9s
CI / Test (push) Successful in 2m42s
CI / Release (push) Successful in 1m0s
CI / Notify (push) Successful in 2s
CI / Docker Build & Push (push) Failing after 9m28s
- Replace msgpack encoding with protobuf wire format
- Update field names to proto convention
- Change Parameters type from map[string]any to map[string]string
- Rewrite tests for proto round-trips
2026-02-21 15:30:17 -05:00
338210eb74 chore: bump handler-base to v0.1.5, add netrc secret mount to Dockerfile
Some checks failed
CI / Lint (push) Successful in 2m56s
CI / Test (push) Successful in 2m53s
CI / Release (push) Successful in 1m34s
CI / Docker Build & Push (push) Failing after 7m4s
CI / Notify (push) Successful in 1s
2026-02-20 18:17:58 -05:00
6 changed files with 50 additions and 50 deletions

View File

@@ -4,11 +4,14 @@ FROM golang:1.25-alpine AS builder
WORKDIR /app WORKDIR /app
# Install ca-certificates for HTTPS # Install ca-certificates for HTTPS
RUN apk add --no-cache ca-certificates RUN apk add --no-cache ca-certificates git
ENV GOPRIVATE=git.daviestechlabs.io
ENV GONOSUMCHECK=git.daviestechlabs.io
# Copy go mod files # Copy go mod files
COPY go.mod go.sum ./ COPY go.mod go.sum ./
RUN go mod download RUN --mount=type=secret,id=netrc,target=/root/.netrc go mod download
# Copy source code # Copy source code
COPY . . COPY . .

View File

@@ -2,6 +2,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"sync/atomic" "sync/atomic"
@@ -27,10 +28,10 @@ func TestSubmitArgoE2E_FullPayload(t *testing.T) {
defer ts.Close() defer ts.Close()
ctx := t.Context() ctx := t.Context()
params := map[string]any{ params := map[string]string{
"source": "s3://bucket/docs", "source": "s3://bucket/docs",
"collection": "knowledge-base", "collection": "knowledge-base",
"batch_size": 100, "batch_size": "100",
} }
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001") runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001")
@@ -80,9 +81,9 @@ func TestSubmitKubeflowE2E_FullPayload(t *testing.T) {
defer ts.Close() defer ts.Close()
ctx := t.Context() ctx := t.Context()
params := map[string]any{ params := map[string]string{
"query": "what is kubernetes", "query": "what is kubernetes",
"top_k": 5, "top_k": "5",
"user_id": "test-user", "user_id": "test-user",
} }
@@ -131,7 +132,7 @@ func TestPipelineDispatchE2E_UnknownPipeline(t *testing.T) {
names = append(names, k) names = append(names, k)
} }
resp := &messages.PipelineStatus{ resp := &messages.PipelineStatus{
RequestID: "req-bad", RequestId: "req-bad",
Status: "error", Status: "error",
Error: "Unknown pipeline: nonexistent-pipeline", Error: "Unknown pipeline: nonexistent-pipeline",
AvailablePipelines: names, AvailablePipelines: names,
@@ -162,7 +163,7 @@ func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
go func() { go func() {
_, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference", _, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference",
map[string]any{"batch": i}, "req-concurrent") map[string]string{"batch": fmt.Sprintf("%d", i)}, "req-concurrent")
errs <- err errs <- err
}() }()
} }
@@ -189,7 +190,7 @@ func BenchmarkSubmitArgo(b *testing.B) {
defer ts.Close() defer ts.Close()
ctx := b.Context() ctx := b.Context()
params := map[string]any{"source": "test"} params := map[string]string{"source": "test"}
b.ResetTimer() b.ResetTimer()
for b.Loop() { for b.Loop() {
@@ -204,7 +205,7 @@ func BenchmarkSubmitKubeflow(b *testing.B) {
defer ts.Close() defer ts.Close()
ctx := b.Context() ctx := b.Context()
params := map[string]any{"query": "test"} params := map[string]string{"query": "test"}
b.ResetTimer() b.ResetTimer()
for b.Loop() { for b.Loop() {

6
go.mod
View File

@@ -3,9 +3,9 @@ module git.daviestechlabs.io/daviestechlabs/pipeline-bridge
go 1.25.1 go 1.25.1
require ( require (
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3 git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0
github.com/nats-io/nats.go v1.48.0 github.com/nats-io/nats.go v1.48.0
github.com/vmihailenco/msgpack/v5 v5.4.1 google.golang.org/protobuf v1.36.11
) )
require ( require (
@@ -19,7 +19,6 @@ require (
github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/compress v1.18.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect go.opentelemetry.io/otel v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
@@ -37,5 +36,4 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/grpc v1.78.0 // indirect google.golang.org/grpc v1.78.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
) )

8
go.sum
View File

@@ -1,5 +1,5 @@
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3 h1:uYog8B839ulqrWoht3qqCvT7CnR3e2skpaLZc2Pg3GI= git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0 h1:pB3ehOKaDYQfbyRBKQXrB9curqSFteLrDveoElRKnBY=
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3/go.mod h1:M3HgvUDWnRn7cX3BE8l+HvoCUYtmRr5OoumB+hnRHoE= git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0/go.mod h1:zocOHFt8yY3cW4+Xi37sNr5Tw7KcjGFSZqgWYxPWyqA=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@@ -33,10 +33,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=

28
main.go
View File

@@ -17,6 +17,7 @@ import (
"git.daviestechlabs.io/daviestechlabs/handler-base/handler" "git.daviestechlabs.io/daviestechlabs/handler-base/handler"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages" "git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil" "git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
"google.golang.org/protobuf/proto"
) )
// Pipeline definitions — maps pipeline name to engine config. // Pipeline definitions — maps pipeline name to engine config.
@@ -47,20 +48,20 @@ func main() {
h := handler.New("ai.pipeline.trigger", cfg) h := handler.New("ai.pipeline.trigger", cfg)
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) { h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
req, err := natsutil.Decode[messages.PipelineTrigger](msg.Data) var req messages.PipelineTrigger
if err != nil { if err := natsutil.Decode(msg.Data, &req); err != nil {
return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil
} }
requestID := req.RequestID requestID := req.RequestId
if requestID == "" { if requestID == "" {
requestID = "unknown" requestID = "unknown"
} }
pipelineName := req.Pipeline pipelineName := req.Pipeline
params := req.Parameters params := req.Parameters
if params == nil { if params == nil {
params = map[string]any{} params = map[string]string{}
} }
slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID) slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID)
@@ -73,7 +74,7 @@ func main() {
names = append(names, k) names = append(names, k)
} }
return &messages.PipelineStatus{ return &messages.PipelineStatus{
RequestID: requestID, RequestId: requestID,
Status: "error", Status: "error",
Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName), Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName),
AvailablePipelines: names, AvailablePipelines: names,
@@ -81,6 +82,7 @@ func main() {
} }
var runID string var runID string
var err error
if pipeline.Engine == "argo" { if pipeline.Engine == "argo" {
runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID) runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID)
@@ -91,16 +93,16 @@ func main() {
if err != nil { if err != nil {
slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err) slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err)
return &messages.PipelineStatus{ return &messages.PipelineStatus{
RequestID: requestID, RequestId: requestID,
Status: "error", Status: "error",
Error: err.Error(), Error: err.Error(),
}, nil }, nil
} }
result := &messages.PipelineStatus{ result := &messages.PipelineStatus{
RequestID: requestID, RequestId: requestID,
Status: "submitted", Status: "submitted",
RunID: runID, RunId: runID,
Engine: pipeline.Engine, Engine: pipeline.Engine,
Pipeline: pipelineName, Pipeline: pipelineName,
SubmittedAt: time.Now().UTC().Format(time.RFC3339), SubmittedAt: time.Now().UTC().Format(time.RFC3339),
@@ -118,10 +120,10 @@ func main() {
} }
} }
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]any, requestID string) (string, error) { func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]string, requestID string) (string, error) {
argoParams := make([]map[string]string, 0, len(params)) argoParams := make([]map[string]string, 0, len(params))
for k, v := range params { for k, v := range params {
argoParams = append(argoParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)}) argoParams = append(argoParams, map[string]string{"name": k, "value": v})
} }
workflow := map[string]any{ workflow := map[string]any{
@@ -169,10 +171,10 @@ func submitArgo(ctx context.Context, client *http.Client, host, namespace, templ
return result.Metadata.Name, nil return result.Metadata.Name, nil
} }
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]any, requestID string) (string, error) { func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]string, requestID string) (string, error) {
kfParams := make([]map[string]string, 0, len(params)) kfParams := make([]map[string]string, 0, len(params))
for k, v := range params { for k, v := range params {
kfParams = append(kfParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)}) kfParams = append(kfParams, map[string]string{"name": k, "value": v})
} }
runRequest := map[string]any{ runRequest := map[string]any{

View File

@@ -8,25 +8,25 @@ import (
"git.daviestechlabs.io/daviestechlabs/handler-base/messages" "git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil" "git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
"github.com/vmihailenco/msgpack/v5" "google.golang.org/protobuf/proto"
) )
func TestPipelineTriggerDecode(t *testing.T) { func TestPipelineTriggerDecode(t *testing.T) {
req := messages.PipelineTrigger{ req := &messages.PipelineTrigger{
RequestID: "req-001", RequestId: "req-001",
Pipeline: "document-ingestion", Pipeline: "document-ingestion",
Parameters: map[string]any{"source": "s3://bucket"}, Parameters: map[string]string{"source": "s3://bucket"},
} }
data, err := msgpack.Marshal(&req) data, err := proto.Marshal(req)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
decoded, err := natsutil.Decode[messages.PipelineTrigger](data) var decoded messages.PipelineTrigger
if err != nil { if err := natsutil.Decode(data, &decoded); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if decoded.RequestID != "req-001" { if decoded.RequestId != "req-001" {
t.Errorf("RequestID = %q", decoded.RequestID) t.Errorf("RequestID = %q", decoded.RequestId)
} }
if decoded.Pipeline != "document-ingestion" { if decoded.Pipeline != "document-ingestion" {
t.Errorf("Pipeline = %q", decoded.Pipeline) t.Errorf("Pipeline = %q", decoded.Pipeline)
@@ -38,22 +38,22 @@ func TestPipelineTriggerDecode(t *testing.T) {
func TestPipelineStatusRoundtrip(t *testing.T) { func TestPipelineStatusRoundtrip(t *testing.T) {
status := messages.PipelineStatus{ status := messages.PipelineStatus{
RequestID: "req-002", RequestId: "req-002",
Status: "submitted", Status: "submitted",
RunID: "argo-abc123", RunId: "argo-abc123",
Engine: "argo", Engine: "argo",
Pipeline: "batch-inference", Pipeline: "batch-inference",
} }
data, err := msgpack.Marshal(&status) data, err := proto.Marshal(&status)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
var got messages.PipelineStatus var got messages.PipelineStatus
if err := msgpack.Unmarshal(data, &got); err != nil { if err := proto.Unmarshal(data, &got); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if got.RunID != "argo-abc123" { if got.RunId != "argo-abc123" {
t.Errorf("RunID = %q", got.RunID) t.Errorf("RunID = %q", got.RunId)
} }
if got.Engine != "argo" { if got.Engine != "argo" {
t.Errorf("Engine = %q", got.Engine) t.Errorf("Engine = %q", got.Engine)
@@ -118,7 +118,7 @@ func TestSubmitArgo(t *testing.T) {
defer ts.Close() defer ts.Close()
ctx := t.Context() ctx := t.Context()
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]any{ runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]string{
"source": "test", "source": "test",
}, "req-001") }, "req-001")
if err != nil { if err != nil {
@@ -160,7 +160,7 @@ func TestSubmitKubeflow(t *testing.T) {
defer ts.Close() defer ts.Close()
ctx := t.Context() ctx := t.Context()
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]any{ runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]string{
"query": "test", "query": "test",
}, "req-002") }, "req-002")
if err != nil { if err != nil {