feature/go-handler-refactor #1

Merged
billy merged 3 commits from feature/go-handler-refactor into main 2026-02-20 12:34:07 +00:00
5 changed files with 97 additions and 75 deletions
Showing only changes of commit 7cdcbfbff3 - Show all commits

9
.dockerignore Normal file
View File

@@ -0,0 +1,9 @@
.git
.gitignore
*.md
LICENSE
renovate.json
*_test.go
e2e_test.go
__pycache__
.env*

View File

@@ -14,7 +14,7 @@ RUN go mod download
COPY . . COPY . .
# Build static binary # Build static binary
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-w -s" -o /pipeline-bridge . RUN CGO_ENABLED=0 GOOS=linux GOAMD64=v3 go build -ldflags="-w -s" -o /pipeline-bridge .
# Runtime stage - scratch for minimal image # Runtime stage - scratch for minimal image
FROM scratch FROM scratch

View File

@@ -6,6 +6,8 @@ import (
"net/http/httptest" "net/http/httptest"
"sync/atomic" "sync/atomic"
"testing" "testing"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
) )
// ──────────────────────────────────────────────────────────────────────────── // ────────────────────────────────────────────────────────────────────────────
@@ -117,34 +119,28 @@ func TestPipelineDispatchE2E_AllEngines(t *testing.T) {
} }
func TestPipelineDispatchE2E_UnknownPipeline(t *testing.T) { func TestPipelineDispatchE2E_UnknownPipeline(t *testing.T) {
// Simulate what main.go's OnMessage does for unknown pipeline // Verify unknown pipeline is rejected and available list is provided
data := map[string]any{ pipelineName := "nonexistent-pipeline"
"request_id": "req-bad",
"pipeline": "nonexistent-pipeline",
}
pipelineName := strVal(data, "pipeline", "")
_, ok := pipelines[pipelineName] _, ok := pipelines[pipelineName]
if ok { if ok {
t.Error("nonexistent pipeline should not be found") t.Error("nonexistent pipeline should not be found")
} }
// Build error response like main.go
names := make([]string, 0, len(pipelines)) names := make([]string, 0, len(pipelines))
for k := range pipelines { for k := range pipelines {
names = append(names, k) names = append(names, k)
} }
resp := map[string]any{ resp := &messages.PipelineStatus{
"request_id": strVal(data, "request_id", ""), RequestID: "req-bad",
"status": "error", Status: "error",
"error": "Unknown pipeline: nonexistent-pipeline", Error: "Unknown pipeline: nonexistent-pipeline",
"available_pipelines": names, AvailablePipelines: names,
} }
if resp["status"] != "error" { if resp.Status != "error" {
t.Error("expected error status") t.Error("expected error status")
} }
if len(resp["available_pipelines"].([]string)) != len(pipelines) { if len(resp.AvailablePipelines) != len(pipelines) {
t.Errorf("available_pipelines count mismatch") t.Errorf("available_pipelines count mismatch")
} }
} }

72
main.go
View File

@@ -15,6 +15,8 @@ import (
"git.daviestechlabs.io/daviestechlabs/handler-base/config" "git.daviestechlabs.io/daviestechlabs/handler-base/config"
"git.daviestechlabs.io/daviestechlabs/handler-base/handler" "git.daviestechlabs.io/daviestechlabs/handler-base/handler"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
) )
// Pipeline definitions — maps pipeline name to engine config. // Pipeline definitions — maps pipeline name to engine config.
@@ -45,10 +47,21 @@ func main() {
h := handler.New("ai.pipeline.trigger", cfg) h := handler.New("ai.pipeline.trigger", cfg)
h.OnMessage(func(ctx context.Context, msg *nats.Msg, data map[string]any) (map[string]any, error) { h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
requestID := strVal(data, "request_id", "unknown") req, err := natsutil.Decode[messages.PipelineTrigger](msg.Data)
pipelineName := strVal(data, "pipeline", "") if err != nil {
params := mapVal(data, "parameters") return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil
}
requestID := req.RequestID
if requestID == "" {
requestID = "unknown"
}
pipelineName := req.Pipeline
params := req.Parameters
if params == nil {
params = map[string]any{}
}
slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID) slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID)
@@ -59,16 +72,15 @@ func main() {
for k := range pipelines { for k := range pipelines {
names = append(names, k) names = append(names, k)
} }
return map[string]any{ return &messages.PipelineStatus{
"request_id": requestID, RequestID: requestID,
"status": "error", Status: "error",
"error": fmt.Sprintf("Unknown pipeline: %s", pipelineName), Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName),
"available_pipelines": names, AvailablePipelines: names,
}, nil }, nil
} }
var runID string var runID string
var err error
if pipeline.Engine == "argo" { if pipeline.Engine == "argo" {
runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID) runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID)
@@ -78,20 +90,20 @@ func main() {
if err != nil { if err != nil {
slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err) slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err)
return map[string]any{ return &messages.PipelineStatus{
"request_id": requestID, RequestID: requestID,
"status": "error", Status: "error",
"error": err.Error(), Error: err.Error(),
}, nil }, nil
} }
result := map[string]any{ result := &messages.PipelineStatus{
"request_id": requestID, RequestID: requestID,
"status": "submitted", Status: "submitted",
"run_id": runID, RunID: runID,
"engine": pipeline.Engine, Engine: pipeline.Engine,
"pipeline": pipelineName, Pipeline: pipelineName,
"submitted_at": time.Now().UTC().Format(time.RFC3339), SubmittedAt: time.Now().UTC().Format(time.RFC3339),
} }
// Publish status update // Publish status update
@@ -204,24 +216,6 @@ func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID s
// Helpers // Helpers
func strVal(m map[string]any, key, fallback string) string {
if v, ok := m[key]; ok {
if s, ok := v.(string); ok {
return s
}
}
return fallback
}
func mapVal(m map[string]any, key string) map[string]any {
if v, ok := m[key]; ok {
if sub, ok := v.(map[string]any); ok {
return sub
}
}
return map[string]any{}
}
func getEnv(key, fallback string) string { func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" { if v := os.Getenv(key); v != "" {
return v return v

View File

@@ -5,35 +5,58 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
"github.com/vmihailenco/msgpack/v5"
) )
func TestStrVal(t *testing.T) { func TestPipelineTriggerDecode(t *testing.T) {
m := map[string]any{"key": "value", "num": 42} req := messages.PipelineTrigger{
if got := strVal(m, "key", ""); got != "value" { RequestID: "req-001",
t.Errorf("strVal(key) = %q, want %q", got, "value") Pipeline: "document-ingestion",
Parameters: map[string]any{"source": "s3://bucket"},
} }
if got := strVal(m, "missing", "default"); got != "default" { data, err := msgpack.Marshal(&req)
t.Errorf("strVal(missing) = %q, want %q", got, "default") if err != nil {
t.Fatal(err)
} }
if got := strVal(m, "num", "fallback"); got != "fallback" { decoded, err := natsutil.Decode[messages.PipelineTrigger](data)
t.Errorf("strVal(num) = %q, want %q", got, "fallback") if err != nil {
t.Fatal(err)
}
if decoded.RequestID != "req-001" {
t.Errorf("RequestID = %q", decoded.RequestID)
}
if decoded.Pipeline != "document-ingestion" {
t.Errorf("Pipeline = %q", decoded.Pipeline)
}
if decoded.Parameters["source"] != "s3://bucket" {
t.Errorf("Parameters = %v", decoded.Parameters)
} }
} }
func TestMapVal(t *testing.T) { func TestPipelineStatusRoundtrip(t *testing.T) {
inner := map[string]any{"a": "b"} status := messages.PipelineStatus{
m := map[string]any{"nested": inner, "scalar": "hi"} RequestID: "req-002",
got := mapVal(m, "nested") Status: "submitted",
if got["a"] != "b" { RunID: "argo-abc123",
t.Errorf("mapVal(nested) = %v, want {a:b}", got) Engine: "argo",
Pipeline: "batch-inference",
} }
got2 := mapVal(m, "missing") data, err := msgpack.Marshal(&status)
if len(got2) != 0 { if err != nil {
t.Errorf("mapVal(missing) should be empty, got %v", got2) t.Fatal(err)
} }
got3 := mapVal(m, "scalar") var got messages.PipelineStatus
if len(got3) != 0 { if err := msgpack.Unmarshal(data, &got); err != nil {
t.Errorf("mapVal(scalar) should be empty, got %v", got3) t.Fatal(err)
}
if got.RunID != "argo-abc123" {
t.Errorf("RunID = %q", got.RunID)
}
if got.Engine != "argo" {
t.Errorf("Engine = %q", got.Engine)
} }
} }