diff --git a/e2e_test.go b/e2e_test.go new file mode 100644 index 0000000..a1900d5 --- /dev/null +++ b/e2e_test.go @@ -0,0 +1,217 @@ +package main + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" +) + +// ──────────────────────────────────────────────────────────────────────────── +// E2E tests: Argo + Kubeflow pipeline submission integration +// ──────────────────────────────────────────────────────────────────────────── + +func TestSubmitArgoE2E_FullPayload(t *testing.T) { + // Verify the full Argo workflow structure + var receivedBody map[string]any + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewDecoder(r.Body).Decode(&receivedBody) + json.NewEncoder(w).Encode(map[string]any{ + "metadata": map[string]any{"name": "doc-ingest-xyz"}, + }) + })) + defer ts.Close() + + ctx := t.Context() + params := map[string]any{ + "source": "s3://bucket/docs", + "collection": "knowledge-base", + "batch_size": 100, + } + + runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001") + if err != nil { + t.Fatal(err) + } + if runID != "doc-ingest-xyz" { + t.Errorf("runID = %q", runID) + } + + // Verify workflow structure + wf, ok := receivedBody["workflow"].(map[string]any) + if !ok { + t.Fatal("missing workflow key") + } + meta := wf["metadata"].(map[string]any) + if meta["namespace"] != "ai-ml" { + t.Errorf("namespace = %v", meta["namespace"]) + } + labels := meta["labels"].(map[string]any) + if labels["request-id"] != "req-e2e-001" { + t.Errorf("request-id label = %v", labels["request-id"]) + } + + spec := wf["spec"].(map[string]any) + templateRef := spec["workflowTemplateRef"].(map[string]any) + if templateRef["name"] != "document-ingestion" { + t.Errorf("template = %v", templateRef["name"]) + } + + args := spec["arguments"].(map[string]any) + argoParams := args["parameters"].([]any) + if len(argoParams) != 3 { + t.Errorf("param count = %d, want 3", len(argoParams)) + } +} + +func TestSubmitKubeflowE2E_FullPayload(t *testing.T) { + var receivedBody map[string]any + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewDecoder(r.Body).Decode(&receivedBody) + json.NewEncoder(w).Encode(map[string]any{ + "run": map[string]any{"id": "kf-run-e2e-789"}, + }) + })) + defer ts.Close() + + ctx := t.Context() + params := map[string]any{ + "query": "what is kubernetes", + "top_k": 5, + "user_id": "test-user", + } + + runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", params, "req-e2e-002") + if err != nil { + t.Fatal(err) + } + if runID != "kf-run-e2e-789" { + t.Errorf("runID = %q", runID) + } + + // Verify run name format + name := receivedBody["name"].(string) + if name != "rag-pipeline-req-e2e-" { + t.Errorf("run name = %q", name) + } +} + +func TestPipelineDispatchE2E_AllEngines(t *testing.T) { + // Verify all pipeline definitions dispatch to correct engine + for name, def := range pipelines { + t.Run(name, func(t *testing.T) { + if def.Engine != "argo" && def.Engine != "kubeflow" { + t.Errorf("engine = %q, want argo or kubeflow", def.Engine) + } + if def.Engine == "argo" && def.Template == "" { + t.Errorf("argo pipeline %q missing template", name) + } + if def.Engine == "kubeflow" && def.PipelineID == "" { + t.Errorf("kubeflow pipeline %q missing pipeline_id", name) + } + }) + } +} + +func TestPipelineDispatchE2E_UnknownPipeline(t *testing.T) { + // Simulate what main.go's OnMessage does for unknown pipeline + data := map[string]any{ + "request_id": "req-bad", + "pipeline": "nonexistent-pipeline", + } + + pipelineName := strVal(data, "pipeline", "") + _, ok := pipelines[pipelineName] + if ok { + t.Error("nonexistent pipeline should not be found") + } + + // Build error response like main.go + names := make([]string, 0, len(pipelines)) + for k := range pipelines { + names = append(names, k) + } + resp := map[string]any{ + "request_id": strVal(data, "request_id", ""), + "status": "error", + "error": "Unknown pipeline: nonexistent-pipeline", + "available_pipelines": names, + } + + if resp["status"] != "error" { + t.Error("expected error status") + } + if len(resp["available_pipelines"].([]string)) != len(pipelines) { + t.Errorf("available_pipelines count mismatch") + } +} + +func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) { + var count atomic.Int64 + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + count.Add(1) + json.NewEncoder(w).Encode(map[string]any{ + "metadata": map[string]any{"name": "concurrent-wf"}, + }) + })) + defer ts.Close() + + ctx := t.Context() + errs := make(chan error, 10) + + for i := 0; i < 10; i++ { + go func() { + _, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference", + map[string]any{"batch": i}, "req-concurrent") + errs <- err + }() + } + + for i := 0; i < 10; i++ { + if err := <-errs; err != nil { + t.Errorf("concurrent request %d: %v", i, err) + } + } + + if got := count.Load(); got != 10 { + t.Errorf("request count = %d, want 10", got) + } +} + +// ──────────────────────────────────────────────────────────────────────────── +// Benchmarks +// ──────────────────────────────────────────────────────────────────────────── + +func BenchmarkSubmitArgo(b *testing.B) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"metadata":{"name":"bench-wf"}}`)) + })) + defer ts.Close() + + ctx := b.Context() + params := map[string]any{"source": "test"} + + b.ResetTimer() + for b.Loop() { + submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "bench-req") + } +} + +func BenchmarkSubmitKubeflow(b *testing.B) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"run":{"id":"bench-run"}}`)) + })) + defer ts.Close() + + ctx := b.Context() + params := map[string]any{"query": "test"} + + b.ResetTimer() + for b.Loop() { + submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", params, "bench-req") + } +} diff --git a/go.mod b/go.mod index 5797387..5a6132a 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( require ( github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect diff --git a/go.sum b/go.sum index 4a3f959..b9a1b68 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= diff --git a/main.go b/main.go index b1f4483..20ebe37 100644 --- a/main.go +++ b/main.go @@ -122,7 +122,7 @@ func submitArgo(ctx context.Context, client *http.Client, host, namespace, templ }, "spec": map[string]any{ "workflowTemplateRef": map[string]string{"name": template}, - "arguments": map[string]any{"parameters": argoParams}, + "arguments": map[string]any{"parameters": argoParams}, }, }