feat: migrate from msgpack to protobuf (handler-base v1.0.0)
Some checks failed
CI / Lint (push) Successful in 3m9s
CI / Test (push) Successful in 2m42s
CI / Release (push) Successful in 1m0s
CI / Notify (push) Successful in 2s
CI / Docker Build & Push (push) Failing after 9m28s

- Replace msgpack encoding with protobuf wire format
- Update field names to proto convention
- Change Parameters type from map[string]any to map[string]string
- Rewrite tests for proto round-trips
This commit is contained in:
2026-02-21 15:30:17 -05:00
parent 338210eb74
commit 66ef758808
5 changed files with 45 additions and 48 deletions

28
main.go
View File

@@ -17,6 +17,7 @@ import (
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
"google.golang.org/protobuf/proto"
)
// Pipeline definitions — maps pipeline name to engine config.
@@ -47,20 +48,20 @@ func main() {
h := handler.New("ai.pipeline.trigger", cfg)
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
req, err := natsutil.Decode[messages.PipelineTrigger](msg.Data)
if err != nil {
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
var req messages.PipelineTrigger
if err := natsutil.Decode(msg.Data, &req); err != nil {
return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil
}
requestID := req.RequestID
requestID := req.RequestId
if requestID == "" {
requestID = "unknown"
}
pipelineName := req.Pipeline
params := req.Parameters
if params == nil {
params = map[string]any{}
params = map[string]string{}
}
slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID)
@@ -73,7 +74,7 @@ func main() {
names = append(names, k)
}
return &messages.PipelineStatus{
RequestID: requestID,
RequestId: requestID,
Status: "error",
Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName),
AvailablePipelines: names,
@@ -81,6 +82,7 @@ func main() {
}
var runID string
var err error
if pipeline.Engine == "argo" {
runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID)
@@ -91,16 +93,16 @@ func main() {
if err != nil {
slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err)
return &messages.PipelineStatus{
RequestID: requestID,
RequestId: requestID,
Status: "error",
Error: err.Error(),
}, nil
}
result := &messages.PipelineStatus{
RequestID: requestID,
RequestId: requestID,
Status: "submitted",
RunID: runID,
RunId: runID,
Engine: pipeline.Engine,
Pipeline: pipelineName,
SubmittedAt: time.Now().UTC().Format(time.RFC3339),
@@ -118,10 +120,10 @@ func main() {
}
}
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]any, requestID string) (string, error) {
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]string, requestID string) (string, error) {
argoParams := make([]map[string]string, 0, len(params))
for k, v := range params {
argoParams = append(argoParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
argoParams = append(argoParams, map[string]string{"name": k, "value": v})
}
workflow := map[string]any{
@@ -169,10 +171,10 @@ func submitArgo(ctx context.Context, client *http.Client, host, namespace, templ
return result.Metadata.Name, nil
}
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]any, requestID string) (string, error) {
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]string, requestID string) (string, error) {
kfParams := make([]map[string]string, 0, len(params))
for k, v := range params {
kfParams = append(kfParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
kfParams = append(kfParams, map[string]string{"name": k, "value": v})
}
runRequest := map[string]any{