9 Commits

Author SHA1 Message Date
1bdf92c376 fix: add GOPRIVATE and git auth for private handler-base module
Some checks failed
CI / Lint (push) Successful in 2m33s
CI / Test (push) Successful in 2m36s
CI / Release (push) Successful in 1m20s
CI / Docker Build & Push (push) Failing after 6m44s
CI / Notify (push) Successful in 1s
- Set GOPRIVATE=git.daviestechlabs.io to bypass public Go module proxy
- Configure git URL insteadOf with DISPATCH_TOKEN for private repo access
2026-02-20 09:22:36 -05:00
c832a837eb fix: rename GITEA_TOKEN to DISPATCH_TOKEN
Some checks failed
CI / Lint (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
CI / Notify (push) Has been cancelled
2026-02-20 09:10:21 -05:00
b6f0e9b88f ci: add handler-base auto-update workflow, remove old Python CI
Some checks failed
CI / Test (push) Has been cancelled
CI / Release (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
CI / Notify (push) Has been cancelled
CI / Lint (push) Has been cancelled
2026-02-20 09:06:00 -05:00
43fb5afcc5 fix: use tagged handler-base v0.1.3, remove local replace directive
Some checks failed
CI / Lint (push) Failing after 1m20s
CI / Release (push) Has been cancelled
CI / Notify (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
CI / Test (push) Failing after 1m22s
2026-02-20 09:00:46 -05:00
de4fa6ea90 fix: resolve golangci-lint errcheck warnings
Some checks failed
CI / Release (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
CI / Notify (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Lint (push) Has been cancelled
- Add error checks for unchecked return values (errcheck)
- Remove unused struct fields (unused)
- Fix gofmt formatting issues
2026-02-20 08:45:25 -05:00
509f392185 Merge pull request 'feature/go-handler-refactor' (#1) from feature/go-handler-refactor into main
Some checks failed
CI / Lint (push) Failing after 55s
CI / Test (push) Failing after 1m22s
CI / Release (push) Has been skipped
CI / Docker Build & Push (push) Has been skipped
CI / Notify (push) Successful in 1s
Reviewed-on: #1
2026-02-20 12:34:06 +00:00
7cdcbfbff3 feat: migrate to typed messages
Some checks failed
CI / Lint (pull_request) Failing after 57s
CI / Test (pull_request) Failing after 1m23s
CI / Release (pull_request) Has been skipped
CI / Docker Build & Push (pull_request) Has been skipped
CI / Notify (pull_request) Successful in 1s
- Switch OnMessage → OnTypedMessage with natsutil.Decode[messages.PipelineTrigger]
- Return *messages.PipelineStatus (not map[string]any)
- Remove strVal/mapVal helpers
- Add .dockerignore, GOAMD64=v3 in Dockerfile
- Update tests for typed structs (14 tests pass)
2026-02-20 07:11:03 -05:00
8f9b2203ca feat: add e2e tests + benchmarks
- e2e_test.go: Argo/Kubeflow full payload validation, concurrent requests
- Pipeline dispatch validation, unknown pipeline error handling
- Benchmarks: Argo 218µs/op, Kubeflow 179µs/op
2026-02-20 06:45:23 -05:00
aeb1b749be feat: rewrite pipeline-bridge in Go
Replace Python implementation with Go for smaller container images.
Uses handler-base Go module for NATS, health, and telemetry.

- main.go: pipeline bridge with Argo/Kubeflow HTTP submission
- main_test.go: 8 tests covering helpers and HTTP submit functions
- Dockerfile: multi-stage golang:1.25-alpine → scratch
- CI: Gitea Actions with lint/test/release/docker/notify
2026-02-19 17:52:31 -05:00
18 changed files with 1078 additions and 3562 deletions

9
.dockerignore Normal file
View File

@@ -0,0 +1,9 @@
.git
.gitignore
*.md
LICENSE
renovate.json
*_test.go
e2e_test.go
__pycache__
.env*

View File

@@ -0,0 +1,213 @@
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
GOPRIVATE: git.daviestechlabs.io
REGISTRY: gitea-http.gitea.svc.cluster.local:3000/daviestechlabs
REGISTRY_HOST: gitea-http.gitea.svc.cluster.local:3000
IMAGE_NAME: pipeline-bridge
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Configure private modules
run: git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
- name: Run go vet
run: go vet ./...
- name: Install golangci-lint
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b "$(go env GOPATH)/bin"
echo "$(go env GOPATH)/bin" >> $GITHUB_PATH
- name: Run golangci-lint
run: golangci-lint run ./...
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Configure private modules
run: git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
- name: Verify dependencies
run: go mod verify
- name: Build
run: go build -v ./...
- name: Run tests
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./...
release:
name: Release
runs-on: ubuntu-latest
needs: [lint, test]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
outputs:
version: ${{ steps.version.outputs.version }}
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Determine version bump
id: version
run: |
# Get latest tag or default to v0.0.0
LATEST=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
VERSION=${LATEST#v}
IFS='.' read -r MAJOR MINOR PATCH <<< "$VERSION"
# Check commit message for keywords
MSG="${{ gitea.event.head_commit.message }}"
if echo "$MSG" | grep -qiE "^major:|BREAKING CHANGE"; then
MAJOR=$((MAJOR + 1)); MINOR=0; PATCH=0
BUMP="major"
elif echo "$MSG" | grep -qiE "^(minor:|feat:)"; then
MINOR=$((MINOR + 1)); PATCH=0
BUMP="minor"
else
PATCH=$((PATCH + 1))
BUMP="patch"
fi
NEW_VERSION="v${MAJOR}.${MINOR}.${PATCH}"
echo "version=$NEW_VERSION" >> $GITHUB_OUTPUT
echo "bump=$BUMP" >> $GITHUB_OUTPUT
echo "Bumping $LATEST → $NEW_VERSION ($BUMP)"
- name: Create and push tag
run: |
git config user.name "gitea-actions[bot]"
git config user.email "actions@git.daviestechlabs.io"
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
git push origin ${{ steps.version.outputs.version }}
docker:
name: Docker Build & Push
runs-on: ubuntu-latest
needs: [lint, test, release]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
buildkitd-config-inline: |
[registry."gitea-http.gitea.svc.cluster.local:3000"]
http = true
insecure = true
- name: Login to Docker Hub
if: vars.DOCKERHUB_USERNAME != ''
uses: docker/login-action@v3
with:
username: ${{ vars.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Configure Docker for insecure registry
run: |
sudo mkdir -p /etc/docker
echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json
sudo systemctl restart docker || sudo service docker restart || true
sleep 2
- name: Login to Gitea Registry
run: |
AUTH=$(echo -n "${{ secrets.REGISTRY_USER }}:${{ secrets.REGISTRY_TOKEN }}" | base64 -w0)
mkdir -p ~/.docker
cat > ~/.docker/config.json << EOF
{
"auths": {
"${{ env.REGISTRY_HOST }}": {
"auth": "$AUTH"
}
}
}
EOF
echo "Auth configured for ${{ env.REGISTRY_HOST }}"
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=semver,pattern={{version}},value=${{ needs.release.outputs.version }}
type=semver,pattern={{major}}.{{minor}},value=${{ needs.release.outputs.version }}
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
notify:
name: Notify
runs-on: ubuntu-latest
needs: [lint, test, release, docker]
if: always()
steps:
- name: Notify on success
if: needs.lint.result == 'success' && needs.test.result == 'success'
run: |
curl -s \
-H "Title: ✅ CI Passed: ${{ gitea.repository }}" \
-H "Priority: default" \
-H "Tags: white_check_mark,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Release: ${{ needs.release.result == 'success' && needs.release.outputs.version || 'skipped' }}
Docker: ${{ needs.docker.result }}" \
${{ env.NTFY_URL }}/gitea-ci
- name: Notify on failure
if: needs.lint.result == 'failure' || needs.test.result == 'failure'
run: |
curl -s \
-H "Title: ❌ CI Failed: ${{ gitea.repository }}" \
-H "Priority: high" \
-H "Tags: x,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Lint: ${{ needs.lint.result }}
Test: ${{ needs.test.result }}" \
${{ env.NTFY_URL }}/gitea-ci

View File

@@ -1,129 +0,0 @@
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Set up Python
run: uv python install 3.13
- name: Install dependencies
run: uv sync --frozen --extra dev
- name: Run ruff check
run: uv run ruff check .
- name: Run ruff format check
run: uv run ruff format --check .
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Set up Python
run: uv python install 3.13
- name: Install dependencies
run: uv sync --frozen --extra dev
- name: Run tests
run: uv run pytest -v
release:
name: Release
runs-on: ubuntu-latest
needs: [lint, test]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Determine version bump
id: version
run: |
# Get latest tag or default to v0.0.0
LATEST=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
VERSION=${LATEST#v}
IFS='.' read -r MAJOR MINOR PATCH <<< "$VERSION"
# Check commit message for keywords
MSG="${{ gitea.event.head_commit.message }}"
if echo "$MSG" | grep -qiE "^major:|BREAKING CHANGE"; then
MAJOR=$((MAJOR + 1)); MINOR=0; PATCH=0
BUMP="major"
elif echo "$MSG" | grep -qiE "^(minor:|feat:)"; then
MINOR=$((MINOR + 1)); PATCH=0
BUMP="minor"
else
PATCH=$((PATCH + 1))
BUMP="patch"
fi
NEW_VERSION="v${MAJOR}.${MINOR}.${PATCH}"
echo "version=$NEW_VERSION" >> $GITHUB_OUTPUT
echo "bump=$BUMP" >> $GITHUB_OUTPUT
echo "Bumping $LATEST → $NEW_VERSION ($BUMP)"
- name: Create and push tag
run: |
git config user.name "gitea-actions[bot]"
git config user.email "actions@git.daviestechlabs.io"
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
git push origin ${{ steps.version.outputs.version }}
notify:
name: Notify
runs-on: ubuntu-latest
needs: [lint, test, release]
if: always()
steps:
- name: Notify on success
if: needs.lint.result == 'success' && needs.test.result == 'success'
run: |
curl -s \
-H "Title: ✅ CI Passed: ${{ gitea.repository }}" \
-H "Priority: default" \
-H "Tags: white_check_mark,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Release: ${{ needs.release.result == 'success' && 'created' || 'skipped' }}" \
${{ env.NTFY_URL }}/gitea-ci
- name: Notify on failure
if: needs.lint.result == 'failure' || needs.test.result == 'failure'
run: |
curl -s \
-H "Title: ❌ CI Failed: ${{ gitea.repository }}" \
-H "Priority: high" \
-H "Tags: x,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Lint: ${{ needs.lint.result }}
Test: ${{ needs.test.result }}" \
${{ env.NTFY_URL }}/gitea-ci

View File

@@ -0,0 +1,60 @@
name: Update handler-base
on:
repository_dispatch:
types: [handler-base-release]
env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
GOPRIVATE: git.daviestechlabs.io
jobs:
update:
name: Update handler-base dependency
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
token: ${{ secrets.DISPATCH_TOKEN }}
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Configure Git
run: |
git config user.name "gitea-actions[bot]"
git config user.email "actions@git.daviestechlabs.io"
git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
- name: Update handler-base
run: |
VERSION="${{ gitea.event.client_payload.version }}"
echo "Updating handler-base to ${VERSION}"
go get git.daviestechlabs.io/daviestechlabs/handler-base@${VERSION}
go mod tidy
- name: Commit and push
run: |
VERSION="${{ gitea.event.client_payload.version }}"
if git diff --quiet go.mod go.sum; then
echo "No changes to commit"
exit 0
fi
git add go.mod go.sum
git commit -m "chore(deps): bump handler-base to ${VERSION}"
git push
- name: Notify
if: success()
run: |
VERSION="${{ gitea.event.client_payload.version }}"
curl -s \
-H "Title: 📦 Dep Update: ${{ gitea.repository }}" \
-H "Priority: default" \
-H "Tags: package,github" \
-d "handler-base updated to ${VERSION}" \
${{ env.NTFY_URL }}/gitea-ci

1
.gitignore vendored
View File

@@ -9,3 +9,4 @@ dist/
build/ build/
.env .env
.env.local .env.local
pipeline-bridge

View File

@@ -1,13 +1,31 @@
# Pipeline Bridge - Using handler-base # Build stage
ARG BASE_TAG=latest FROM golang:1.25-alpine AS builder
FROM ghcr.io/billy-davies-2/handler-base:${BASE_TAG}
WORKDIR /app WORKDIR /app
# Install additional dependencies from pyproject.toml # Install ca-certificates for HTTPS
COPY pyproject.toml . RUN apk add --no-cache ca-certificates
RUN uv pip install --system --no-cache httpx kubernetes
COPY pipeline_bridge.py . # Copy go mod files
COPY go.mod go.sum ./
RUN go mod download
CMD ["python", "pipeline_bridge.py"] # Copy source code
COPY . .
# Build static binary
RUN CGO_ENABLED=0 GOOS=linux GOAMD64=v3 go build -ldflags="-w -s" -o /pipeline-bridge .
# Runtime stage - scratch for minimal image
FROM scratch
# Copy CA certificates for HTTPS
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy binary
COPY --from=builder /pipeline-bridge /pipeline-bridge
# Run as non-root
USER 65534:65534
ENTRYPOINT ["/pipeline-bridge"]

110
README.md
View File

@@ -1,105 +1,43 @@
# Pipeline Bridge # pipeline-bridge
Bridges NATS events to Kubeflow Pipelines and Argo Workflows. Bridges NATS events to Argo Workflows and Kubeflow Pipelines. Subscribes to
`ai.pipeline.trigger` and submits workflow runs to the appropriate engine,
## Overview publishing status updates to `ai.pipeline.status.{request_id}`.
The Pipeline Bridge listens for pipeline trigger requests on NATS and submits them to the appropriate workflow engine (Argo Workflows or Kubeflow Pipelines). It monitors execution and publishes status updates back to NATS.
## NATS Subjects
| Subject | Direction | Description |
|---------|-----------|-------------|
| `ai.pipeline.trigger` | Subscribe | Pipeline trigger requests |
| `ai.pipeline.status.{request_id}` | Publish | Pipeline status updates |
## Supported Pipelines ## Supported Pipelines
| Pipeline | Engine | Description | | Name | Engine | Template / Pipeline ID |
|----------|--------|-------------| |------|--------|----------------------|
| `document-ingestion` | Argo | Ingest documents into Milvus | | document-ingestion | Argo | document-ingestion |
| `batch-inference` | Argo | Run batch LLM inference | | batch-inference | Argo | batch-inference |
| `model-evaluation` | Argo | Evaluate model performance | | model-evaluation | Argo | model-evaluation |
| `rag-query` | Kubeflow | Execute RAG query pipeline | | rag-query | Kubeflow | rag-pipeline |
| `voice-pipeline` | Kubeflow | Full voice assistant pipeline | | voice-pipeline | Kubeflow | voice-pipeline |
## Request Format ## Configuration
```json
{
"request_id": "uuid",
"pipeline": "document-ingestion",
"parameters": {
"source-url": "s3://bucket/docs/",
"collection-name": "knowledge_base"
}
}
```
## Response Format
```json
{
"request_id": "uuid",
"status": "submitted",
"pipeline": "document-ingestion",
"engine": "argo",
"run_id": "document-ingestion-abc123",
"message": "Pipeline submitted successfully",
"timestamp": "2026-01-03T12:00:00Z"
}
```
## Status Updates
The bridge publishes status updates as the workflow progresses:
- `submitted` - Workflow created
- `pending` - Waiting to start
- `running` - In progress
- `succeeded` - Completed successfully
- `failed` - Failed
- `error` - System error
## Implementation
The pipeline bridge uses the [handler-base](https://git.daviestechlabs.io/daviestechlabs/handler-base) library for standardized NATS handling, telemetry, and health checks.
## Environment Variables
| Variable | Default | Description | | Variable | Default | Description |
|----------|---------|-------------| |----------|---------|-------------|
| `NATS_URL` | `nats://nats.ai-ml.svc.cluster.local:4222` | NATS server URL |
| `KUBEFLOW_HOST` | `http://ml-pipeline.kubeflow.svc.cluster.local:8888` | Kubeflow Pipelines API | | `KUBEFLOW_HOST` | `http://ml-pipeline.kubeflow.svc.cluster.local:8888` | Kubeflow Pipelines API |
| `ARGO_HOST` | `http://argo-server.argo.svc.cluster.local:2746` | Argo Workflows API | | `ARGO_HOST` | `http://argo-server.argo.svc.cluster.local:2746` | Argo Server API |
| `ARGO_NAMESPACE` | `ai-ml` | Namespace for Argo Workflows | | `ARGO_NAMESPACE` | `ai-ml` | Namespace for Argo workflows |
## Building Plus all standard handler-base settings (`NATS_URL`, `OTEL_*`, `HEALTH_PORT`, etc.).
## Build
```bash ```bash
docker build -t pipeline-bridge:latest . go build -o pipeline-bridge .
# With specific handler-base tag
docker build --build-arg BASE_TAG=latest -t pipeline-bridge:latest .
``` ```
## Testing ## Test
```bash ```bash
# Port-forward NATS go test -v -race ./...
kubectl port-forward -n ai-ml svc/nats 4222:4222
# Trigger document ingestion
nats pub ai.pipeline.trigger '{
"request_id": "test-1",
"pipeline": "document-ingestion",
"parameters": {"source-url": "https://example.com/docs.txt"}
}'
# Monitor status
nats sub "ai.pipeline.status.>"
``` ```
## License ## Docker
MIT ```bash
docker build -t pipeline-bridge .
```

213
e2e_test.go Normal file
View File

@@ -0,0 +1,213 @@
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
)
// ────────────────────────────────────────────────────────────────────────────
// E2E tests: Argo + Kubeflow pipeline submission integration
// ────────────────────────────────────────────────────────────────────────────
func TestSubmitArgoE2E_FullPayload(t *testing.T) {
// Verify the full Argo workflow structure
var receivedBody map[string]any
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = json.NewDecoder(r.Body).Decode(&receivedBody)
_ = json.NewEncoder(w).Encode(map[string]any{
"metadata": map[string]any{"name": "doc-ingest-xyz"},
})
}))
defer ts.Close()
ctx := t.Context()
params := map[string]any{
"source": "s3://bucket/docs",
"collection": "knowledge-base",
"batch_size": 100,
}
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001")
if err != nil {
t.Fatal(err)
}
if runID != "doc-ingest-xyz" {
t.Errorf("runID = %q", runID)
}
// Verify workflow structure
wf, ok := receivedBody["workflow"].(map[string]any)
if !ok {
t.Fatal("missing workflow key")
}
meta := wf["metadata"].(map[string]any)
if meta["namespace"] != "ai-ml" {
t.Errorf("namespace = %v", meta["namespace"])
}
labels := meta["labels"].(map[string]any)
if labels["request-id"] != "req-e2e-001" {
t.Errorf("request-id label = %v", labels["request-id"])
}
spec := wf["spec"].(map[string]any)
templateRef := spec["workflowTemplateRef"].(map[string]any)
if templateRef["name"] != "document-ingestion" {
t.Errorf("template = %v", templateRef["name"])
}
args := spec["arguments"].(map[string]any)
argoParams := args["parameters"].([]any)
if len(argoParams) != 3 {
t.Errorf("param count = %d, want 3", len(argoParams))
}
}
func TestSubmitKubeflowE2E_FullPayload(t *testing.T) {
var receivedBody map[string]any
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = json.NewDecoder(r.Body).Decode(&receivedBody)
_ = json.NewEncoder(w).Encode(map[string]any{
"run": map[string]any{"id": "kf-run-e2e-789"},
})
}))
defer ts.Close()
ctx := t.Context()
params := map[string]any{
"query": "what is kubernetes",
"top_k": 5,
"user_id": "test-user",
}
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", params, "req-e2e-002")
if err != nil {
t.Fatal(err)
}
if runID != "kf-run-e2e-789" {
t.Errorf("runID = %q", runID)
}
// Verify run name format
name := receivedBody["name"].(string)
if name != "rag-pipeline-req-e2e-" {
t.Errorf("run name = %q", name)
}
}
func TestPipelineDispatchE2E_AllEngines(t *testing.T) {
// Verify all pipeline definitions dispatch to correct engine
for name, def := range pipelines {
t.Run(name, func(t *testing.T) {
if def.Engine != "argo" && def.Engine != "kubeflow" {
t.Errorf("engine = %q, want argo or kubeflow", def.Engine)
}
if def.Engine == "argo" && def.Template == "" {
t.Errorf("argo pipeline %q missing template", name)
}
if def.Engine == "kubeflow" && def.PipelineID == "" {
t.Errorf("kubeflow pipeline %q missing pipeline_id", name)
}
})
}
}
func TestPipelineDispatchE2E_UnknownPipeline(t *testing.T) {
// Verify unknown pipeline is rejected and available list is provided
pipelineName := "nonexistent-pipeline"
_, ok := pipelines[pipelineName]
if ok {
t.Error("nonexistent pipeline should not be found")
}
names := make([]string, 0, len(pipelines))
for k := range pipelines {
names = append(names, k)
}
resp := &messages.PipelineStatus{
RequestID: "req-bad",
Status: "error",
Error: "Unknown pipeline: nonexistent-pipeline",
AvailablePipelines: names,
}
if resp.Status != "error" {
t.Error("expected error status")
}
if len(resp.AvailablePipelines) != len(pipelines) {
t.Errorf("available_pipelines count mismatch")
}
}
func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) {
var count atomic.Int64
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count.Add(1)
_ = json.NewEncoder(w).Encode(map[string]any{
"metadata": map[string]any{"name": "concurrent-wf"},
})
}))
defer ts.Close()
ctx := t.Context()
errs := make(chan error, 10)
for i := 0; i < 10; i++ {
go func() {
_, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference",
map[string]any{"batch": i}, "req-concurrent")
errs <- err
}()
}
for i := 0; i < 10; i++ {
if err := <-errs; err != nil {
t.Errorf("concurrent request %d: %v", i, err)
}
}
if got := count.Load(); got != 10 {
t.Errorf("request count = %d, want 10", got)
}
}
// ────────────────────────────────────────────────────────────────────────────
// Benchmarks
// ────────────────────────────────────────────────────────────────────────────
func BenchmarkSubmitArgo(b *testing.B) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"metadata":{"name":"bench-wf"}}`))
}))
defer ts.Close()
ctx := b.Context()
params := map[string]any{"source": "test"}
b.ResetTimer()
for b.Loop() {
_, _ = submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "bench-req")
}
}
func BenchmarkSubmitKubeflow(b *testing.B) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"run":{"id":"bench-run"}}`))
}))
defer ts.Close()
ctx := b.Context()
params := map[string]any{"query": "test"}
b.ResetTimer()
for b.Loop() {
_, _ = submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", params, "bench-req")
}
}

41
go.mod Normal file
View File

@@ -0,0 +1,41 @@
module git.daviestechlabs.io/daviestechlabs/pipeline-bridge
go 1.25.1
require (
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3
github.com/nats-io/nats.go v1.48.0
github.com/vmihailenco/msgpack/v5 v5.4.1
)
require (
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 // indirect
go.opentelemetry.io/otel/metric v1.40.0 // indirect
go.opentelemetry.io/otel/sdk v1.40.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect
go.opentelemetry.io/otel/trace v1.40.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/grpc v1.78.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)

81
go.sum Normal file
View File

@@ -0,0 +1,81 @@
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3 h1:uYog8B839ulqrWoht3qqCvT7CnR3e2skpaLZc2Pg3GI=
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3/go.mod h1:M3HgvUDWnRn7cX3BE8l+HvoCUYtmRr5OoumB+hnRHoE=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 h1:NOyNnS19BF2SUDApbOKbDtWZ0IK7b8FJ2uAGdIWOGb0=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0/go.mod h1:VL6EgVikRLcJa9ftukrHu/ZkkhFBSo1lzvdBC9CF1ss=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 h1:DvJDOPmSWQHWywQS6lKL+pb8s3gBLOZUtw4N+mavW1I=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0/go.mod h1:EtekO9DEJb4/jRyN4v4Qjc2yA7AtfCBuz2FynRUWTXs=
go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g=
go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc=
go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8=
go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE=
go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw=
go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg=
go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw=
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

224
main.go Normal file
View File

@@ -0,0 +1,224 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"time"
"github.com/nats-io/nats.go"
"git.daviestechlabs.io/daviestechlabs/handler-base/config"
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
)
// Pipeline definitions — maps pipeline name to engine config.
var pipelines = map[string]pipelineDef{
"document-ingestion": {Engine: "argo", Template: "document-ingestion"},
"batch-inference": {Engine: "argo", Template: "batch-inference"},
"rag-query": {Engine: "kubeflow", PipelineID: "rag-pipeline"},
"voice-pipeline": {Engine: "kubeflow", PipelineID: "voice-pipeline"},
"model-evaluation": {Engine: "argo", Template: "model-evaluation"},
}
type pipelineDef struct {
Engine string // "argo" or "kubeflow"
Template string // Argo WorkflowTemplate name
PipelineID string // Kubeflow pipeline ID
}
func main() {
cfg := config.Load()
cfg.ServiceName = "pipeline-bridge"
cfg.NATSQueueGroup = "pipeline-bridges"
kubeflowHost := getEnv("KUBEFLOW_HOST", "http://ml-pipeline.kubeflow.svc.cluster.local:8888")
argoHost := getEnv("ARGO_HOST", "http://argo-server.argo.svc.cluster.local:2746")
argoNamespace := getEnv("ARGO_NAMESPACE", "ai-ml")
httpClient := &http.Client{Timeout: 60 * time.Second}
h := handler.New("ai.pipeline.trigger", cfg)
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
req, err := natsutil.Decode[messages.PipelineTrigger](msg.Data)
if err != nil {
return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil
}
requestID := req.RequestID
if requestID == "" {
requestID = "unknown"
}
pipelineName := req.Pipeline
params := req.Parameters
if params == nil {
params = map[string]any{}
}
slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID)
// Validate pipeline
pipeline, ok := pipelines[pipelineName]
if !ok {
names := make([]string, 0, len(pipelines))
for k := range pipelines {
names = append(names, k)
}
return &messages.PipelineStatus{
RequestID: requestID,
Status: "error",
Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName),
AvailablePipelines: names,
}, nil
}
var runID string
if pipeline.Engine == "argo" {
runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID)
} else {
runID, err = submitKubeflow(ctx, httpClient, kubeflowHost, pipeline.PipelineID, params, requestID)
}
if err != nil {
slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err)
return &messages.PipelineStatus{
RequestID: requestID,
Status: "error",
Error: err.Error(),
}, nil
}
result := &messages.PipelineStatus{
RequestID: requestID,
Status: "submitted",
RunID: runID,
Engine: pipeline.Engine,
Pipeline: pipelineName,
SubmittedAt: time.Now().UTC().Format(time.RFC3339),
}
// Publish status update
_ = h.NATS.Publish(fmt.Sprintf("ai.pipeline.status.%s", requestID), result)
slog.Info("pipeline submitted", "pipeline", pipelineName, "run_id", runID)
return result, nil
})
if err := h.Run(); err != nil {
slog.Error("handler failed", "error", err)
}
}
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]any, requestID string) (string, error) {
argoParams := make([]map[string]string, 0, len(params))
for k, v := range params {
argoParams = append(argoParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
}
workflow := map[string]any{
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
"metadata": map[string]any{
"generateName": template + "-",
"namespace": namespace,
"labels": map[string]string{"request-id": requestID},
},
"spec": map[string]any{
"workflowTemplateRef": map[string]string{"name": template},
"arguments": map[string]any{"parameters": argoParams},
},
}
body, _ := json.Marshal(map[string]any{"workflow": workflow})
url := fmt.Sprintf("%s/api/v1/workflows/%s", host, namespace)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("argo request: %w", err)
}
defer func() { _ = resp.Body.Close() }()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return "", fmt.Errorf("argo %d: %s", resp.StatusCode, string(respBody))
}
var result struct {
Metadata struct {
Name string `json:"name"`
} `json:"metadata"`
}
if err := json.Unmarshal(respBody, &result); err != nil {
return "", err
}
return result.Metadata.Name, nil
}
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]any, requestID string) (string, error) {
kfParams := make([]map[string]string, 0, len(params))
for k, v := range params {
kfParams = append(kfParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
}
runRequest := map[string]any{
"name": fmt.Sprintf("%s-%s", pipelineID, requestID[:min(8, len(requestID))]),
"pipeline_spec": map[string]any{
"pipeline_id": pipelineID,
"parameters": kfParams,
},
}
body, _ := json.Marshal(runRequest)
url := fmt.Sprintf("%s/apis/v1beta1/runs", host)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("kubeflow request: %w", err)
}
defer func() { _ = resp.Body.Close() }()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return "", fmt.Errorf("kubeflow %d: %s", resp.StatusCode, string(respBody))
}
var result struct {
Run struct {
ID string `json:"id"`
} `json:"run"`
}
if err := json.Unmarshal(respBody, &result); err != nil {
return "", err
}
return result.Run.ID, nil
}
// Helpers
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}

186
main_test.go Normal file
View File

@@ -0,0 +1,186 @@
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
"github.com/vmihailenco/msgpack/v5"
)
func TestPipelineTriggerDecode(t *testing.T) {
req := messages.PipelineTrigger{
RequestID: "req-001",
Pipeline: "document-ingestion",
Parameters: map[string]any{"source": "s3://bucket"},
}
data, err := msgpack.Marshal(&req)
if err != nil {
t.Fatal(err)
}
decoded, err := natsutil.Decode[messages.PipelineTrigger](data)
if err != nil {
t.Fatal(err)
}
if decoded.RequestID != "req-001" {
t.Errorf("RequestID = %q", decoded.RequestID)
}
if decoded.Pipeline != "document-ingestion" {
t.Errorf("Pipeline = %q", decoded.Pipeline)
}
if decoded.Parameters["source"] != "s3://bucket" {
t.Errorf("Parameters = %v", decoded.Parameters)
}
}
func TestPipelineStatusRoundtrip(t *testing.T) {
status := messages.PipelineStatus{
RequestID: "req-002",
Status: "submitted",
RunID: "argo-abc123",
Engine: "argo",
Pipeline: "batch-inference",
}
data, err := msgpack.Marshal(&status)
if err != nil {
t.Fatal(err)
}
var got messages.PipelineStatus
if err := msgpack.Unmarshal(data, &got); err != nil {
t.Fatal(err)
}
if got.RunID != "argo-abc123" {
t.Errorf("RunID = %q", got.RunID)
}
if got.Engine != "argo" {
t.Errorf("Engine = %q", got.Engine)
}
}
func TestGetEnv(t *testing.T) {
t.Setenv("TEST_HOST", "http://test:8080")
if got := getEnv("TEST_HOST", "fallback"); got != "http://test:8080" {
t.Errorf("getEnv(TEST_HOST) = %q, want %q", got, "http://test:8080")
}
if got := getEnv("MISSING_VAR_XYZ", "default"); got != "default" {
t.Errorf("getEnv(MISSING) = %q, want %q", got, "default")
}
}
func TestPipelinesMap(t *testing.T) {
expected := []string{"document-ingestion", "batch-inference", "rag-query", "voice-pipeline", "model-evaluation"}
for _, name := range expected {
if _, ok := pipelines[name]; !ok {
t.Errorf("pipeline %q not found in pipelines map", name)
}
}
if got := pipelines["document-ingestion"].Engine; got != "argo" {
t.Errorf("document-ingestion engine = %q, want argo", got)
}
if got := pipelines["rag-query"].Engine; got != "kubeflow" {
t.Errorf("rag-query engine = %q, want kubeflow", got)
}
}
func TestSubmitArgo(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/api/v1/workflows/ai-ml" {
t.Errorf("unexpected path: %s", r.URL.Path)
}
if r.Header.Get("Content-Type") != "application/json" {
t.Errorf("expected application/json content type")
}
var body map[string]any
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Errorf("failed to decode body: %v", err)
}
wf, ok := body["workflow"].(map[string]any)
if !ok {
t.Fatal("missing workflow key")
}
meta := wf["metadata"].(map[string]any)
if meta["namespace"] != "ai-ml" {
t.Errorf("unexpected namespace: %v", meta["namespace"])
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"metadata": map[string]any{"name": "document-ingestion-abc123"},
})
}))
defer ts.Close()
ctx := t.Context()
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]any{
"source": "test",
}, "req-001")
if err != nil {
t.Fatalf("submitArgo() error: %v", err)
}
if runID != "document-ingestion-abc123" {
t.Errorf("runID = %q, want %q", runID, "document-ingestion-abc123")
}
}
func TestSubmitArgoError(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(`{"message":"bad request"}`))
}))
defer ts.Close()
ctx := t.Context()
_, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "bad-template", nil, "req-err")
if err == nil {
t.Fatal("expected error for 400 response")
}
}
func TestSubmitKubeflow(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/apis/v1beta1/runs" {
t.Errorf("unexpected path: %s", r.URL.Path)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"run": map[string]any{"id": "kf-run-456"},
})
}))
defer ts.Close()
ctx := t.Context()
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]any{
"query": "test",
}, "req-002")
if err != nil {
t.Fatalf("submitKubeflow() error: %v", err)
}
if runID != "kf-run-456" {
t.Errorf("runID = %q, want %q", runID, "kf-run-456")
}
}
func TestSubmitKubeflowError(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("internal error"))
}))
defer ts.Close()
ctx := t.Context()
_, err := submitKubeflow(ctx, ts.Client(), ts.URL, "bad-pipeline", nil, "req-err2")
if err == nil {
t.Fatal("expected error for 500 response")
}
}

View File

@@ -1,228 +0,0 @@
#!/usr/bin/env python3
"""
Pipeline Bridge Service (Refactored)
Bridges NATS events to workflow engines using handler-base:
1. Listen for pipeline triggers on "ai.pipeline.trigger"
2. Submit to Kubeflow Pipelines or Argo Workflows
3. Monitor execution and publish status updates
4. Publish completion to "ai.pipeline.status.{request_id}"
"""
import logging
from typing import Any, Optional
from datetime import datetime
import httpx
from nats.aio.msg import Msg
from handler_base import Handler, Settings
from handler_base.telemetry import create_span
logger = logging.getLogger("pipeline-bridge")
class PipelineSettings(Settings):
"""Pipeline bridge specific settings."""
service_name: str = "pipeline-bridge"
# Kubeflow Pipelines
kubeflow_host: str = "http://ml-pipeline.kubeflow.svc.cluster.local:8888"
# Argo Workflows
argo_host: str = "http://argo-server.argo.svc.cluster.local:2746"
argo_namespace: str = "ai-ml"
# Pipeline definitions
PIPELINES = {
"document-ingestion": {
"engine": "argo",
"template": "document-ingestion",
"description": "Ingest documents into Milvus vector database",
},
"batch-inference": {
"engine": "argo",
"template": "batch-inference",
"description": "Run batch LLM inference on a dataset",
},
"rag-query": {
"engine": "kubeflow",
"pipeline_id": "rag-pipeline",
"description": "Execute RAG query pipeline",
},
"voice-pipeline": {
"engine": "kubeflow",
"pipeline_id": "voice-pipeline",
"description": "Full voice assistant pipeline",
},
"model-evaluation": {
"engine": "argo",
"template": "model-evaluation",
"description": "Evaluate model performance",
},
}
class PipelineBridge(Handler):
"""
Pipeline trigger handler.
Request format:
{
"request_id": "uuid",
"pipeline": "document-ingestion",
"parameters": {"key": "value"}
}
Response format:
{
"request_id": "uuid",
"status": "submitted",
"run_id": "workflow-run-id",
"engine": "argo|kubeflow"
}
"""
def __init__(self):
self.pipeline_settings = PipelineSettings()
super().__init__(
subject="ai.pipeline.trigger",
settings=self.pipeline_settings,
queue_group="pipeline-bridges",
)
self._http: Optional[httpx.AsyncClient] = None
async def setup(self) -> None:
"""Initialize HTTP client."""
logger.info("Initializing pipeline bridge...")
self._http = httpx.AsyncClient(timeout=60.0)
logger.info(f"Pipeline bridge ready. Available pipelines: {list(PIPELINES.keys())}")
async def teardown(self) -> None:
"""Clean up HTTP client."""
if self._http:
await self._http.aclose()
logger.info("Pipeline bridge closed")
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
"""Handle pipeline trigger request."""
request_id = data.get("request_id", "unknown")
pipeline_name = data.get("pipeline", "")
parameters = data.get("parameters", {})
logger.info(f"Triggering pipeline '{pipeline_name}' for request {request_id}")
with create_span("pipeline.trigger") as span:
if span:
span.set_attribute("request.id", request_id)
span.set_attribute("pipeline.name", pipeline_name)
# Validate pipeline
if pipeline_name not in PIPELINES:
error = f"Unknown pipeline: {pipeline_name}"
logger.error(error)
return {
"request_id": request_id,
"status": "error",
"error": error,
"available_pipelines": list(PIPELINES.keys()),
}
pipeline = PIPELINES[pipeline_name]
engine = pipeline["engine"]
try:
if engine == "argo":
run_id = await self._submit_argo(pipeline["template"], parameters, request_id)
else:
run_id = await self._submit_kubeflow(
pipeline["pipeline_id"], parameters, request_id
)
result = {
"request_id": request_id,
"status": "submitted",
"run_id": run_id,
"engine": engine,
"pipeline": pipeline_name,
"submitted_at": datetime.utcnow().isoformat(),
}
# Publish status update
await self.nats.publish(f"ai.pipeline.status.{request_id}", result)
logger.info(f"Pipeline {pipeline_name} submitted: {run_id}")
return result
except Exception as e:
logger.exception(f"Failed to submit pipeline {pipeline_name}")
return {
"request_id": request_id,
"status": "error",
"error": str(e),
}
async def _submit_argo(self, template: str, parameters: dict, request_id: str) -> str:
"""Submit workflow to Argo Workflows."""
with create_span("pipeline.submit.argo") as span:
if span:
span.set_attribute("argo.template", template)
workflow = {
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
"metadata": {
"generateName": f"{template}-",
"namespace": self.pipeline_settings.argo_namespace,
"labels": {
"request-id": request_id,
},
},
"spec": {
"workflowTemplateRef": {"name": template},
"arguments": {
"parameters": [{"name": k, "value": str(v)} for k, v in parameters.items()]
},
},
}
response = await self._http.post(
f"{self.pipeline_settings.argo_host}/api/v1/workflows/{self.pipeline_settings.argo_namespace}",
json={"workflow": workflow},
)
response.raise_for_status()
result = response.json()
return result["metadata"]["name"]
async def _submit_kubeflow(self, pipeline_id: str, parameters: dict, request_id: str) -> str:
"""Submit run to Kubeflow Pipelines."""
with create_span("pipeline.submit.kubeflow") as span:
if span:
span.set_attribute("kubeflow.pipeline_id", pipeline_id)
run_request = {
"name": f"{pipeline_id}-{request_id[:8]}",
"pipeline_spec": {
"pipeline_id": pipeline_id,
"parameters": [{"name": k, "value": str(v)} for k, v in parameters.items()],
},
}
response = await self._http.post(
f"{self.pipeline_settings.kubeflow_host}/apis/v1beta1/runs",
json=run_request,
)
response.raise_for_status()
result = response.json()
return result["run"]["id"]
if __name__ == "__main__":
PipelineBridge().run()

View File

@@ -1,45 +0,0 @@
[project]
name = "pipeline-bridge"
version = "1.0.0"
description = "Bridge NATS events to Kubeflow Pipelines and Argo Workflows"
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [{ name = "Davies Tech Labs" }]
dependencies = [
"handler-base @ git+https://git.daviestechlabs.io/daviestechlabs/handler-base.git",
"httpx>=0.27.0",
"kubernetes>=28.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"ruff>=0.1.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.metadata]
allow-direct-references = true
[tool.hatch.build.targets.wheel]
packages = ["."]
only-include = ["pipeline_bridge.py"]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --tb=short"
filterwarnings = ["ignore::DeprecationWarning"]

View File

@@ -1 +0,0 @@
# Pipeline Bridge Tests

View File

@@ -1,91 +0,0 @@
"""
Pytest configuration and fixtures for pipeline-bridge tests.
"""
import asyncio
import os
from unittest.mock import MagicMock
import pytest
# Set test environment variables before importing
os.environ.setdefault("NATS_URL", "nats://localhost:4222")
os.environ.setdefault("OTEL_ENABLED", "false")
os.environ.setdefault("MLFLOW_ENABLED", "false")
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def mock_nats_message():
"""Create a mock NATS message."""
msg = MagicMock()
msg.subject = "ai.pipeline.trigger"
msg.reply = "ai.pipeline.status.test-123"
return msg
@pytest.fixture
def argo_pipeline_request():
"""Sample Argo pipeline trigger request."""
return {
"request_id": "test-request-123",
"pipeline": "document-ingestion",
"parameters": {
"source_path": "s3://bucket/documents",
"collection_name": "test_collection",
},
}
@pytest.fixture
def kubeflow_pipeline_request():
"""Sample Kubeflow pipeline trigger request."""
return {
"request_id": "test-request-456",
"pipeline": "rag-query",
"parameters": {
"query": "What is AI?",
"collection": "documents",
},
}
@pytest.fixture
def unknown_pipeline_request():
"""Request for unknown pipeline."""
return {
"request_id": "test-request-789",
"pipeline": "nonexistent-pipeline",
"parameters": {},
}
@pytest.fixture
def mock_argo_response():
"""Mock Argo Workflows API response."""
return {
"metadata": {
"name": "document-ingestion-abc123",
"namespace": "ai-ml",
},
"status": {"phase": "Pending"},
}
@pytest.fixture
def mock_kubeflow_response():
"""Mock Kubeflow Pipelines API response."""
return {
"run": {
"id": "run-xyz-789",
"name": "rag-query-test",
"status": "Running",
}
}

View File

@@ -1,272 +0,0 @@
"""
Unit tests for PipelineBridge handler.
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from pipeline_bridge import PipelineBridge, PipelineSettings, PIPELINES
class TestPipelineSettings:
"""Tests for PipelineSettings configuration."""
def test_default_settings(self):
"""Test default settings values."""
settings = PipelineSettings()
assert settings.service_name == "pipeline-bridge"
assert settings.kubeflow_host == "http://ml-pipeline.kubeflow.svc.cluster.local:8888"
assert settings.argo_host == "http://argo-server.argo.svc.cluster.local:2746"
assert settings.argo_namespace == "ai-ml"
def test_custom_settings(self):
"""Test custom settings."""
settings = PipelineSettings(
kubeflow_host="http://custom-kubeflow:8888",
argo_namespace="custom-ns",
)
assert settings.kubeflow_host == "http://custom-kubeflow:8888"
assert settings.argo_namespace == "custom-ns"
class TestPipelineDefinitions:
"""Tests for pipeline definitions."""
def test_required_pipelines_exist(self):
"""Test that required pipelines are defined."""
required = ["document-ingestion", "batch-inference", "rag-query", "voice-pipeline"]
for name in required:
assert name in PIPELINES, f"Pipeline {name} should be defined"
def test_argo_pipelines_have_template(self):
"""Test Argo pipelines have template field."""
for name, config in PIPELINES.items():
if config["engine"] == "argo":
assert "template" in config, f"Argo pipeline {name} missing template"
def test_kubeflow_pipelines_have_pipeline_id(self):
"""Test Kubeflow pipelines have pipeline_id field."""
for name, config in PIPELINES.items():
if config["engine"] == "kubeflow":
assert "pipeline_id" in config, f"Kubeflow pipeline {name} missing pipeline_id"
def test_all_pipelines_have_description(self):
"""Test all pipelines have descriptions."""
for name, config in PIPELINES.items():
assert "description" in config, f"Pipeline {name} missing description"
class TestPipelineBridge:
"""Tests for PipelineBridge handler."""
@pytest.fixture
def handler(self):
"""Create handler with mocked HTTP client."""
handler = PipelineBridge()
handler._http = AsyncMock()
handler.nats = AsyncMock()
return handler
def test_init(self, handler):
"""Test handler initialization."""
assert handler.subject == "ai.pipeline.trigger"
assert handler.queue_group == "pipeline-bridges"
assert handler.pipeline_settings.service_name == "pipeline-bridge"
@pytest.mark.asyncio
async def test_handle_unknown_pipeline(
self,
handler,
mock_nats_message,
unknown_pipeline_request,
):
"""Test handling unknown pipeline."""
result = await handler.handle_message(mock_nats_message, unknown_pipeline_request)
assert result["status"] == "error"
assert "Unknown pipeline" in result["error"]
assert "available_pipelines" in result
assert "document-ingestion" in result["available_pipelines"]
@pytest.mark.asyncio
async def test_handle_argo_pipeline(
self,
handler,
mock_nats_message,
argo_pipeline_request,
mock_argo_response,
):
"""Test triggering Argo workflow."""
# Setup mock response
mock_response = MagicMock()
mock_response.json.return_value = mock_argo_response
mock_response.raise_for_status = MagicMock()
handler._http.post.return_value = mock_response
result = await handler.handle_message(mock_nats_message, argo_pipeline_request)
assert result["status"] == "submitted"
assert result["engine"] == "argo"
assert result["run_id"] == "document-ingestion-abc123"
assert result["pipeline"] == "document-ingestion"
assert "submitted_at" in result
# Verify API call
handler._http.post.assert_called_once()
call_args = handler._http.post.call_args
assert "argo-server" in str(call_args)
@pytest.mark.asyncio
async def test_handle_kubeflow_pipeline(
self,
handler,
mock_nats_message,
kubeflow_pipeline_request,
mock_kubeflow_response,
):
"""Test triggering Kubeflow pipeline."""
# Setup mock response
mock_response = MagicMock()
mock_response.json.return_value = mock_kubeflow_response
mock_response.raise_for_status = MagicMock()
handler._http.post.return_value = mock_response
result = await handler.handle_message(mock_nats_message, kubeflow_pipeline_request)
assert result["status"] == "submitted"
assert result["engine"] == "kubeflow"
assert result["run_id"] == "run-xyz-789"
assert result["pipeline"] == "rag-query"
# Verify API call
handler._http.post.assert_called_once()
call_args = handler._http.post.call_args
assert "ml-pipeline" in str(call_args)
@pytest.mark.asyncio
async def test_handle_api_error(
self,
handler,
mock_nats_message,
argo_pipeline_request,
):
"""Test handling API errors."""
handler._http.post.side_effect = Exception("Connection refused")
result = await handler.handle_message(mock_nats_message, argo_pipeline_request)
assert result["status"] == "error"
assert "Connection refused" in result["error"]
@pytest.mark.asyncio
async def test_publishes_status_update(
self,
handler,
mock_nats_message,
argo_pipeline_request,
mock_argo_response,
):
"""Test that status is published to NATS."""
mock_response = MagicMock()
mock_response.json.return_value = mock_argo_response
mock_response.raise_for_status = MagicMock()
handler._http.post.return_value = mock_response
await handler.handle_message(mock_nats_message, argo_pipeline_request)
handler.nats.publish.assert_called_once()
call_args = handler.nats.publish.call_args
assert "ai.pipeline.status.test-request-123" in str(call_args)
@pytest.mark.asyncio
async def test_setup_creates_http_client(self):
"""Test that setup initializes HTTP client."""
with patch("pipeline_bridge.httpx.AsyncClient") as mock_client:
handler = PipelineBridge()
await handler.setup()
mock_client.assert_called_once()
@pytest.mark.asyncio
async def test_teardown_closes_http_client(self, handler):
"""Test that teardown closes HTTP client."""
await handler.teardown()
handler._http.aclose.assert_called_once()
class TestArgoSubmission:
"""Tests for Argo workflow submission."""
@pytest.fixture
def handler(self):
"""Create handler with mocked HTTP client."""
handler = PipelineBridge()
handler._http = AsyncMock()
return handler
@pytest.mark.asyncio
async def test_argo_workflow_structure(
self,
handler,
mock_argo_response,
):
"""Test Argo workflow request structure."""
mock_response = MagicMock()
mock_response.json.return_value = mock_argo_response
mock_response.raise_for_status = MagicMock()
handler._http.post.return_value = mock_response
await handler._submit_argo(
template="document-ingestion",
parameters={"key": "value"},
request_id="test-123",
)
# Verify workflow structure
call_kwargs = handler._http.post.call_args.kwargs
workflow = call_kwargs["json"]["workflow"]
assert workflow["apiVersion"] == "argoproj.io/v1alpha1"
assert workflow["kind"] == "Workflow"
assert "workflowTemplateRef" in workflow["spec"]
assert workflow["spec"]["workflowTemplateRef"]["name"] == "document-ingestion"
assert workflow["metadata"]["labels"]["request-id"] == "test-123"
class TestKubeflowSubmission:
"""Tests for Kubeflow pipeline submission."""
@pytest.fixture
def handler(self):
"""Create handler with mocked HTTP client."""
handler = PipelineBridge()
handler._http = AsyncMock()
return handler
@pytest.mark.asyncio
async def test_kubeflow_run_structure(
self,
handler,
mock_kubeflow_response,
):
"""Test Kubeflow run request structure."""
mock_response = MagicMock()
mock_response.json.return_value = mock_kubeflow_response
mock_response.raise_for_status = MagicMock()
handler._http.post.return_value = mock_response
await handler._submit_kubeflow(
pipeline_id="rag-pipeline",
parameters={"query": "test"},
request_id="test-456",
)
# Verify run request structure
call_kwargs = handler._http.post.call_args.kwargs
run_request = call_kwargs["json"]
assert "rag-pipeline" in run_request["name"]
assert run_request["pipeline_spec"]["pipeline_id"] == "rag-pipeline"

2702
uv.lock generated

File diff suppressed because it is too large Load Diff