Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1bdf92c376 | |||
| c832a837eb | |||
| b6f0e9b88f | |||
| 43fb5afcc5 | |||
| de4fa6ea90 | |||
| 509f392185 | |||
| 7cdcbfbff3 | |||
| 8f9b2203ca | |||
| aeb1b749be |
9
.dockerignore
Normal file
9
.dockerignore
Normal file
@@ -0,0 +1,9 @@
|
||||
.git
|
||||
.gitignore
|
||||
*.md
|
||||
LICENSE
|
||||
renovate.json
|
||||
*_test.go
|
||||
e2e_test.go
|
||||
__pycache__
|
||||
.env*
|
||||
213
.gitea/workflows/build-push.yaml
Normal file
213
.gitea/workflows/build-push.yaml
Normal file
@@ -0,0 +1,213 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
pull_request:
|
||||
branches: [main]
|
||||
|
||||
env:
|
||||
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
|
||||
GOPRIVATE: git.daviestechlabs.io
|
||||
REGISTRY: gitea-http.gitea.svc.cluster.local:3000/daviestechlabs
|
||||
REGISTRY_HOST: gitea-http.gitea.svc.cluster.local:3000
|
||||
IMAGE_NAME: pipeline-bridge
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
name: Lint
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version-file: go.mod
|
||||
cache: true
|
||||
|
||||
- name: Configure private modules
|
||||
run: git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
|
||||
|
||||
- name: Run go vet
|
||||
run: go vet ./...
|
||||
|
||||
- name: Install golangci-lint
|
||||
run: |
|
||||
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b "$(go env GOPATH)/bin"
|
||||
echo "$(go env GOPATH)/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Run golangci-lint
|
||||
run: golangci-lint run ./...
|
||||
|
||||
test:
|
||||
name: Test
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version-file: go.mod
|
||||
cache: true
|
||||
|
||||
- name: Configure private modules
|
||||
run: git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
|
||||
|
||||
- name: Verify dependencies
|
||||
run: go mod verify
|
||||
|
||||
- name: Build
|
||||
run: go build -v ./...
|
||||
|
||||
- name: Run tests
|
||||
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./...
|
||||
|
||||
release:
|
||||
name: Release
|
||||
runs-on: ubuntu-latest
|
||||
needs: [lint, test]
|
||||
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
|
||||
outputs:
|
||||
version: ${{ steps.version.outputs.version }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Determine version bump
|
||||
id: version
|
||||
run: |
|
||||
# Get latest tag or default to v0.0.0
|
||||
LATEST=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
|
||||
VERSION=${LATEST#v}
|
||||
IFS='.' read -r MAJOR MINOR PATCH <<< "$VERSION"
|
||||
|
||||
# Check commit message for keywords
|
||||
MSG="${{ gitea.event.head_commit.message }}"
|
||||
if echo "$MSG" | grep -qiE "^major:|BREAKING CHANGE"; then
|
||||
MAJOR=$((MAJOR + 1)); MINOR=0; PATCH=0
|
||||
BUMP="major"
|
||||
elif echo "$MSG" | grep -qiE "^(minor:|feat:)"; then
|
||||
MINOR=$((MINOR + 1)); PATCH=0
|
||||
BUMP="minor"
|
||||
else
|
||||
PATCH=$((PATCH + 1))
|
||||
BUMP="patch"
|
||||
fi
|
||||
|
||||
NEW_VERSION="v${MAJOR}.${MINOR}.${PATCH}"
|
||||
echo "version=$NEW_VERSION" >> $GITHUB_OUTPUT
|
||||
echo "bump=$BUMP" >> $GITHUB_OUTPUT
|
||||
echo "Bumping $LATEST → $NEW_VERSION ($BUMP)"
|
||||
|
||||
- name: Create and push tag
|
||||
run: |
|
||||
git config user.name "gitea-actions[bot]"
|
||||
git config user.email "actions@git.daviestechlabs.io"
|
||||
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
|
||||
git push origin ${{ steps.version.outputs.version }}
|
||||
|
||||
docker:
|
||||
name: Docker Build & Push
|
||||
runs-on: ubuntu-latest
|
||||
needs: [lint, test, release]
|
||||
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
with:
|
||||
buildkitd-config-inline: |
|
||||
[registry."gitea-http.gitea.svc.cluster.local:3000"]
|
||||
http = true
|
||||
insecure = true
|
||||
|
||||
- name: Login to Docker Hub
|
||||
if: vars.DOCKERHUB_USERNAME != ''
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ vars.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
|
||||
- name: Configure Docker for insecure registry
|
||||
run: |
|
||||
sudo mkdir -p /etc/docker
|
||||
echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json
|
||||
sudo systemctl restart docker || sudo service docker restart || true
|
||||
sleep 2
|
||||
|
||||
- name: Login to Gitea Registry
|
||||
run: |
|
||||
AUTH=$(echo -n "${{ secrets.REGISTRY_USER }}:${{ secrets.REGISTRY_TOKEN }}" | base64 -w0)
|
||||
mkdir -p ~/.docker
|
||||
cat > ~/.docker/config.json << EOF
|
||||
{
|
||||
"auths": {
|
||||
"${{ env.REGISTRY_HOST }}": {
|
||||
"auth": "$AUTH"
|
||||
}
|
||||
}
|
||||
}
|
||||
EOF
|
||||
echo "Auth configured for ${{ env.REGISTRY_HOST }}"
|
||||
|
||||
- name: Extract metadata
|
||||
id: meta
|
||||
uses: docker/metadata-action@v5
|
||||
with:
|
||||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
|
||||
tags: |
|
||||
type=semver,pattern={{version}},value=${{ needs.release.outputs.version }}
|
||||
type=semver,pattern={{major}}.{{minor}},value=${{ needs.release.outputs.version }}
|
||||
type=raw,value=latest,enable={{is_default_branch}}
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
|
||||
notify:
|
||||
name: Notify
|
||||
runs-on: ubuntu-latest
|
||||
needs: [lint, test, release, docker]
|
||||
if: always()
|
||||
steps:
|
||||
- name: Notify on success
|
||||
if: needs.lint.result == 'success' && needs.test.result == 'success'
|
||||
run: |
|
||||
curl -s \
|
||||
-H "Title: ✅ CI Passed: ${{ gitea.repository }}" \
|
||||
-H "Priority: default" \
|
||||
-H "Tags: white_check_mark,github" \
|
||||
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
|
||||
-d "Branch: ${{ gitea.ref_name }}
|
||||
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
|
||||
Release: ${{ needs.release.result == 'success' && needs.release.outputs.version || 'skipped' }}
|
||||
Docker: ${{ needs.docker.result }}" \
|
||||
${{ env.NTFY_URL }}/gitea-ci
|
||||
|
||||
- name: Notify on failure
|
||||
if: needs.lint.result == 'failure' || needs.test.result == 'failure'
|
||||
run: |
|
||||
curl -s \
|
||||
-H "Title: ❌ CI Failed: ${{ gitea.repository }}" \
|
||||
-H "Priority: high" \
|
||||
-H "Tags: x,github" \
|
||||
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
|
||||
-d "Branch: ${{ gitea.ref_name }}
|
||||
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
|
||||
Lint: ${{ needs.lint.result }}
|
||||
Test: ${{ needs.test.result }}" \
|
||||
${{ env.NTFY_URL }}/gitea-ci
|
||||
@@ -1,129 +0,0 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
pull_request:
|
||||
branches: [main]
|
||||
|
||||
env:
|
||||
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
name: Lint
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up uv
|
||||
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Set up Python
|
||||
run: uv python install 3.13
|
||||
|
||||
- name: Install dependencies
|
||||
run: uv sync --frozen --extra dev
|
||||
|
||||
- name: Run ruff check
|
||||
run: uv run ruff check .
|
||||
|
||||
- name: Run ruff format check
|
||||
run: uv run ruff format --check .
|
||||
|
||||
test:
|
||||
name: Test
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up uv
|
||||
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Set up Python
|
||||
run: uv python install 3.13
|
||||
|
||||
- name: Install dependencies
|
||||
run: uv sync --frozen --extra dev
|
||||
|
||||
- name: Run tests
|
||||
run: uv run pytest -v
|
||||
|
||||
release:
|
||||
name: Release
|
||||
runs-on: ubuntu-latest
|
||||
needs: [lint, test]
|
||||
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Determine version bump
|
||||
id: version
|
||||
run: |
|
||||
# Get latest tag or default to v0.0.0
|
||||
LATEST=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
|
||||
VERSION=${LATEST#v}
|
||||
IFS='.' read -r MAJOR MINOR PATCH <<< "$VERSION"
|
||||
|
||||
# Check commit message for keywords
|
||||
MSG="${{ gitea.event.head_commit.message }}"
|
||||
if echo "$MSG" | grep -qiE "^major:|BREAKING CHANGE"; then
|
||||
MAJOR=$((MAJOR + 1)); MINOR=0; PATCH=0
|
||||
BUMP="major"
|
||||
elif echo "$MSG" | grep -qiE "^(minor:|feat:)"; then
|
||||
MINOR=$((MINOR + 1)); PATCH=0
|
||||
BUMP="minor"
|
||||
else
|
||||
PATCH=$((PATCH + 1))
|
||||
BUMP="patch"
|
||||
fi
|
||||
|
||||
NEW_VERSION="v${MAJOR}.${MINOR}.${PATCH}"
|
||||
echo "version=$NEW_VERSION" >> $GITHUB_OUTPUT
|
||||
echo "bump=$BUMP" >> $GITHUB_OUTPUT
|
||||
echo "Bumping $LATEST → $NEW_VERSION ($BUMP)"
|
||||
|
||||
- name: Create and push tag
|
||||
run: |
|
||||
git config user.name "gitea-actions[bot]"
|
||||
git config user.email "actions@git.daviestechlabs.io"
|
||||
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
|
||||
git push origin ${{ steps.version.outputs.version }}
|
||||
|
||||
notify:
|
||||
name: Notify
|
||||
runs-on: ubuntu-latest
|
||||
needs: [lint, test, release]
|
||||
if: always()
|
||||
steps:
|
||||
- name: Notify on success
|
||||
if: needs.lint.result == 'success' && needs.test.result == 'success'
|
||||
run: |
|
||||
curl -s \
|
||||
-H "Title: ✅ CI Passed: ${{ gitea.repository }}" \
|
||||
-H "Priority: default" \
|
||||
-H "Tags: white_check_mark,github" \
|
||||
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
|
||||
-d "Branch: ${{ gitea.ref_name }}
|
||||
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
|
||||
Release: ${{ needs.release.result == 'success' && 'created' || 'skipped' }}" \
|
||||
${{ env.NTFY_URL }}/gitea-ci
|
||||
|
||||
- name: Notify on failure
|
||||
if: needs.lint.result == 'failure' || needs.test.result == 'failure'
|
||||
run: |
|
||||
curl -s \
|
||||
-H "Title: ❌ CI Failed: ${{ gitea.repository }}" \
|
||||
-H "Priority: high" \
|
||||
-H "Tags: x,github" \
|
||||
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
|
||||
-d "Branch: ${{ gitea.ref_name }}
|
||||
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
|
||||
Lint: ${{ needs.lint.result }}
|
||||
Test: ${{ needs.test.result }}" \
|
||||
${{ env.NTFY_URL }}/gitea-ci
|
||||
60
.gitea/workflows/update-dependency.yml
Normal file
60
.gitea/workflows/update-dependency.yml
Normal file
@@ -0,0 +1,60 @@
|
||||
name: Update handler-base
|
||||
|
||||
on:
|
||||
repository_dispatch:
|
||||
types: [handler-base-release]
|
||||
|
||||
env:
|
||||
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
|
||||
GOPRIVATE: git.daviestechlabs.io
|
||||
|
||||
jobs:
|
||||
update:
|
||||
name: Update handler-base dependency
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
token: ${{ secrets.DISPATCH_TOKEN }}
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version-file: go.mod
|
||||
cache: true
|
||||
|
||||
- name: Configure Git
|
||||
run: |
|
||||
git config user.name "gitea-actions[bot]"
|
||||
git config user.email "actions@git.daviestechlabs.io"
|
||||
git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
|
||||
|
||||
- name: Update handler-base
|
||||
run: |
|
||||
VERSION="${{ gitea.event.client_payload.version }}"
|
||||
echo "Updating handler-base to ${VERSION}"
|
||||
go get git.daviestechlabs.io/daviestechlabs/handler-base@${VERSION}
|
||||
go mod tidy
|
||||
|
||||
- name: Commit and push
|
||||
run: |
|
||||
VERSION="${{ gitea.event.client_payload.version }}"
|
||||
if git diff --quiet go.mod go.sum; then
|
||||
echo "No changes to commit"
|
||||
exit 0
|
||||
fi
|
||||
git add go.mod go.sum
|
||||
git commit -m "chore(deps): bump handler-base to ${VERSION}"
|
||||
git push
|
||||
|
||||
- name: Notify
|
||||
if: success()
|
||||
run: |
|
||||
VERSION="${{ gitea.event.client_payload.version }}"
|
||||
curl -s \
|
||||
-H "Title: 📦 Dep Update: ${{ gitea.repository }}" \
|
||||
-H "Priority: default" \
|
||||
-H "Tags: package,github" \
|
||||
-d "handler-base updated to ${VERSION}" \
|
||||
${{ env.NTFY_URL }}/gitea-ci
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -9,3 +9,4 @@ dist/
|
||||
build/
|
||||
.env
|
||||
.env.local
|
||||
pipeline-bridge
|
||||
|
||||
34
Dockerfile
34
Dockerfile
@@ -1,13 +1,31 @@
|
||||
# Pipeline Bridge - Using handler-base
|
||||
ARG BASE_TAG=latest
|
||||
FROM ghcr.io/billy-davies-2/handler-base:${BASE_TAG}
|
||||
# Build stage
|
||||
FROM golang:1.25-alpine AS builder
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Install additional dependencies from pyproject.toml
|
||||
COPY pyproject.toml .
|
||||
RUN uv pip install --system --no-cache httpx kubernetes
|
||||
# Install ca-certificates for HTTPS
|
||||
RUN apk add --no-cache ca-certificates
|
||||
|
||||
COPY pipeline_bridge.py .
|
||||
# Copy go mod files
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
CMD ["python", "pipeline_bridge.py"]
|
||||
# Copy source code
|
||||
COPY . .
|
||||
|
||||
# Build static binary
|
||||
RUN CGO_ENABLED=0 GOOS=linux GOAMD64=v3 go build -ldflags="-w -s" -o /pipeline-bridge .
|
||||
|
||||
# Runtime stage - scratch for minimal image
|
||||
FROM scratch
|
||||
|
||||
# Copy CA certificates for HTTPS
|
||||
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
|
||||
|
||||
# Copy binary
|
||||
COPY --from=builder /pipeline-bridge /pipeline-bridge
|
||||
|
||||
# Run as non-root
|
||||
USER 65534:65534
|
||||
|
||||
ENTRYPOINT ["/pipeline-bridge"]
|
||||
|
||||
110
README.md
110
README.md
@@ -1,105 +1,43 @@
|
||||
# Pipeline Bridge
|
||||
# pipeline-bridge
|
||||
|
||||
Bridges NATS events to Kubeflow Pipelines and Argo Workflows.
|
||||
|
||||
## Overview
|
||||
|
||||
The Pipeline Bridge listens for pipeline trigger requests on NATS and submits them to the appropriate workflow engine (Argo Workflows or Kubeflow Pipelines). It monitors execution and publishes status updates back to NATS.
|
||||
|
||||
## NATS Subjects
|
||||
|
||||
| Subject | Direction | Description |
|
||||
|---------|-----------|-------------|
|
||||
| `ai.pipeline.trigger` | Subscribe | Pipeline trigger requests |
|
||||
| `ai.pipeline.status.{request_id}` | Publish | Pipeline status updates |
|
||||
Bridges NATS events to Argo Workflows and Kubeflow Pipelines. Subscribes to
|
||||
`ai.pipeline.trigger` and submits workflow runs to the appropriate engine,
|
||||
publishing status updates to `ai.pipeline.status.{request_id}`.
|
||||
|
||||
## Supported Pipelines
|
||||
|
||||
| Pipeline | Engine | Description |
|
||||
|----------|--------|-------------|
|
||||
| `document-ingestion` | Argo | Ingest documents into Milvus |
|
||||
| `batch-inference` | Argo | Run batch LLM inference |
|
||||
| `model-evaluation` | Argo | Evaluate model performance |
|
||||
| `rag-query` | Kubeflow | Execute RAG query pipeline |
|
||||
| `voice-pipeline` | Kubeflow | Full voice assistant pipeline |
|
||||
| Name | Engine | Template / Pipeline ID |
|
||||
|------|--------|----------------------|
|
||||
| document-ingestion | Argo | document-ingestion |
|
||||
| batch-inference | Argo | batch-inference |
|
||||
| model-evaluation | Argo | model-evaluation |
|
||||
| rag-query | Kubeflow | rag-pipeline |
|
||||
| voice-pipeline | Kubeflow | voice-pipeline |
|
||||
|
||||
## Request Format
|
||||
|
||||
```json
|
||||
{
|
||||
"request_id": "uuid",
|
||||
"pipeline": "document-ingestion",
|
||||
"parameters": {
|
||||
"source-url": "s3://bucket/docs/",
|
||||
"collection-name": "knowledge_base"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Response Format
|
||||
|
||||
```json
|
||||
{
|
||||
"request_id": "uuid",
|
||||
"status": "submitted",
|
||||
"pipeline": "document-ingestion",
|
||||
"engine": "argo",
|
||||
"run_id": "document-ingestion-abc123",
|
||||
"message": "Pipeline submitted successfully",
|
||||
"timestamp": "2026-01-03T12:00:00Z"
|
||||
}
|
||||
```
|
||||
|
||||
## Status Updates
|
||||
|
||||
The bridge publishes status updates as the workflow progresses:
|
||||
|
||||
- `submitted` - Workflow created
|
||||
- `pending` - Waiting to start
|
||||
- `running` - In progress
|
||||
- `succeeded` - Completed successfully
|
||||
- `failed` - Failed
|
||||
- `error` - System error
|
||||
|
||||
## Implementation
|
||||
|
||||
The pipeline bridge uses the [handler-base](https://git.daviestechlabs.io/daviestechlabs/handler-base) library for standardized NATS handling, telemetry, and health checks.
|
||||
|
||||
## Environment Variables
|
||||
## Configuration
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `NATS_URL` | `nats://nats.ai-ml.svc.cluster.local:4222` | NATS server URL |
|
||||
| `KUBEFLOW_HOST` | `http://ml-pipeline.kubeflow.svc.cluster.local:8888` | Kubeflow Pipelines API |
|
||||
| `ARGO_HOST` | `http://argo-server.argo.svc.cluster.local:2746` | Argo Workflows API |
|
||||
| `ARGO_NAMESPACE` | `ai-ml` | Namespace for Argo Workflows |
|
||||
| `ARGO_HOST` | `http://argo-server.argo.svc.cluster.local:2746` | Argo Server API |
|
||||
| `ARGO_NAMESPACE` | `ai-ml` | Namespace for Argo workflows |
|
||||
|
||||
## Building
|
||||
Plus all standard handler-base settings (`NATS_URL`, `OTEL_*`, `HEALTH_PORT`, etc.).
|
||||
|
||||
## Build
|
||||
|
||||
```bash
|
||||
docker build -t pipeline-bridge:latest .
|
||||
|
||||
# With specific handler-base tag
|
||||
docker build --build-arg BASE_TAG=latest -t pipeline-bridge:latest .
|
||||
go build -o pipeline-bridge .
|
||||
```
|
||||
|
||||
## Testing
|
||||
## Test
|
||||
|
||||
```bash
|
||||
# Port-forward NATS
|
||||
kubectl port-forward -n ai-ml svc/nats 4222:4222
|
||||
|
||||
# Trigger document ingestion
|
||||
nats pub ai.pipeline.trigger '{
|
||||
"request_id": "test-1",
|
||||
"pipeline": "document-ingestion",
|
||||
"parameters": {"source-url": "https://example.com/docs.txt"}
|
||||
}'
|
||||
|
||||
# Monitor status
|
||||
nats sub "ai.pipeline.status.>"
|
||||
go test -v -race ./...
|
||||
```
|
||||
|
||||
## License
|
||||
## Docker
|
||||
|
||||
MIT
|
||||
```bash
|
||||
docker build -t pipeline-bridge .
|
||||
```
|
||||
|
||||
213
e2e_test.go
Normal file
213
e2e_test.go
Normal file
@@ -0,0 +1,213 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
||||
)
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// E2E tests: Argo + Kubeflow pipeline submission integration
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestSubmitArgoE2E_FullPayload(t *testing.T) {
|
||||
// Verify the full Argo workflow structure
|
||||
var receivedBody map[string]any
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_ = json.NewDecoder(r.Body).Decode(&receivedBody)
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"metadata": map[string]any{"name": "doc-ingest-xyz"},
|
||||
})
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
params := map[string]any{
|
||||
"source": "s3://bucket/docs",
|
||||
"collection": "knowledge-base",
|
||||
"batch_size": 100,
|
||||
}
|
||||
|
||||
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if runID != "doc-ingest-xyz" {
|
||||
t.Errorf("runID = %q", runID)
|
||||
}
|
||||
|
||||
// Verify workflow structure
|
||||
wf, ok := receivedBody["workflow"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatal("missing workflow key")
|
||||
}
|
||||
meta := wf["metadata"].(map[string]any)
|
||||
if meta["namespace"] != "ai-ml" {
|
||||
t.Errorf("namespace = %v", meta["namespace"])
|
||||
}
|
||||
labels := meta["labels"].(map[string]any)
|
||||
if labels["request-id"] != "req-e2e-001" {
|
||||
t.Errorf("request-id label = %v", labels["request-id"])
|
||||
}
|
||||
|
||||
spec := wf["spec"].(map[string]any)
|
||||
templateRef := spec["workflowTemplateRef"].(map[string]any)
|
||||
if templateRef["name"] != "document-ingestion" {
|
||||
t.Errorf("template = %v", templateRef["name"])
|
||||
}
|
||||
|
||||
args := spec["arguments"].(map[string]any)
|
||||
argoParams := args["parameters"].([]any)
|
||||
if len(argoParams) != 3 {
|
||||
t.Errorf("param count = %d, want 3", len(argoParams))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubmitKubeflowE2E_FullPayload(t *testing.T) {
|
||||
var receivedBody map[string]any
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_ = json.NewDecoder(r.Body).Decode(&receivedBody)
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"run": map[string]any{"id": "kf-run-e2e-789"},
|
||||
})
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
params := map[string]any{
|
||||
"query": "what is kubernetes",
|
||||
"top_k": 5,
|
||||
"user_id": "test-user",
|
||||
}
|
||||
|
||||
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", params, "req-e2e-002")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if runID != "kf-run-e2e-789" {
|
||||
t.Errorf("runID = %q", runID)
|
||||
}
|
||||
|
||||
// Verify run name format
|
||||
name := receivedBody["name"].(string)
|
||||
if name != "rag-pipeline-req-e2e-" {
|
||||
t.Errorf("run name = %q", name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipelineDispatchE2E_AllEngines(t *testing.T) {
|
||||
// Verify all pipeline definitions dispatch to correct engine
|
||||
for name, def := range pipelines {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
if def.Engine != "argo" && def.Engine != "kubeflow" {
|
||||
t.Errorf("engine = %q, want argo or kubeflow", def.Engine)
|
||||
}
|
||||
if def.Engine == "argo" && def.Template == "" {
|
||||
t.Errorf("argo pipeline %q missing template", name)
|
||||
}
|
||||
if def.Engine == "kubeflow" && def.PipelineID == "" {
|
||||
t.Errorf("kubeflow pipeline %q missing pipeline_id", name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipelineDispatchE2E_UnknownPipeline(t *testing.T) {
|
||||
// Verify unknown pipeline is rejected and available list is provided
|
||||
pipelineName := "nonexistent-pipeline"
|
||||
_, ok := pipelines[pipelineName]
|
||||
if ok {
|
||||
t.Error("nonexistent pipeline should not be found")
|
||||
}
|
||||
|
||||
names := make([]string, 0, len(pipelines))
|
||||
for k := range pipelines {
|
||||
names = append(names, k)
|
||||
}
|
||||
resp := &messages.PipelineStatus{
|
||||
RequestID: "req-bad",
|
||||
Status: "error",
|
||||
Error: "Unknown pipeline: nonexistent-pipeline",
|
||||
AvailablePipelines: names,
|
||||
}
|
||||
|
||||
if resp.Status != "error" {
|
||||
t.Error("expected error status")
|
||||
}
|
||||
if len(resp.AvailablePipelines) != len(pipelines) {
|
||||
t.Errorf("available_pipelines count mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) {
|
||||
var count atomic.Int64
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
count.Add(1)
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"metadata": map[string]any{"name": "concurrent-wf"},
|
||||
})
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
errs := make(chan error, 10)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
go func() {
|
||||
_, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference",
|
||||
map[string]any{"batch": i}, "req-concurrent")
|
||||
errs <- err
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := <-errs; err != nil {
|
||||
t.Errorf("concurrent request %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
if got := count.Load(); got != 10 {
|
||||
t.Errorf("request count = %d, want 10", got)
|
||||
}
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// Benchmarks
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func BenchmarkSubmitArgo(b *testing.B) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(`{"metadata":{"name":"bench-wf"}}`))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
ctx := b.Context()
|
||||
params := map[string]any{"source": "test"}
|
||||
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
_, _ = submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "bench-req")
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSubmitKubeflow(b *testing.B) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(`{"run":{"id":"bench-run"}}`))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
ctx := b.Context()
|
||||
params := map[string]any{"query": "test"}
|
||||
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
_, _ = submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", params, "bench-req")
|
||||
}
|
||||
}
|
||||
41
go.mod
Normal file
41
go.mod
Normal file
@@ -0,0 +1,41 @@
|
||||
module git.daviestechlabs.io/daviestechlabs/pipeline-bridge
|
||||
|
||||
go 1.25.1
|
||||
|
||||
require (
|
||||
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3
|
||||
github.com/nats-io/nats.go v1.48.0
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.9.0 // indirect
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/otel v1.40.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.40.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.40.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.40.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
|
||||
golang.org/x/crypto v0.47.0 // indirect
|
||||
golang.org/x/net v0.49.0 // indirect
|
||||
golang.org/x/sys v0.40.0 // indirect
|
||||
golang.org/x/text v0.33.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
|
||||
google.golang.org/grpc v1.78.0 // indirect
|
||||
google.golang.org/protobuf v1.36.11 // indirect
|
||||
)
|
||||
81
go.sum
Normal file
81
go.sum
Normal file
@@ -0,0 +1,81 @@
|
||||
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3 h1:uYog8B839ulqrWoht3qqCvT7CnR3e2skpaLZc2Pg3GI=
|
||||
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3/go.mod h1:M3HgvUDWnRn7cX3BE8l+HvoCUYtmRr5OoumB+hnRHoE=
|
||||
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
|
||||
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
|
||||
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII=
|
||||
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
|
||||
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
|
||||
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
|
||||
go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 h1:NOyNnS19BF2SUDApbOKbDtWZ0IK7b8FJ2uAGdIWOGb0=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0/go.mod h1:VL6EgVikRLcJa9ftukrHu/ZkkhFBSo1lzvdBC9CF1ss=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 h1:DvJDOPmSWQHWywQS6lKL+pb8s3gBLOZUtw4N+mavW1I=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0/go.mod h1:EtekO9DEJb4/jRyN4v4Qjc2yA7AtfCBuz2FynRUWTXs=
|
||||
go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g=
|
||||
go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc=
|
||||
go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8=
|
||||
go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg=
|
||||
go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw=
|
||||
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
|
||||
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
|
||||
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
|
||||
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
|
||||
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
|
||||
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
|
||||
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
|
||||
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
|
||||
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
|
||||
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
|
||||
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
|
||||
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
|
||||
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
224
main.go
Normal file
224
main.go
Normal file
@@ -0,0 +1,224 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/config"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
||||
)
|
||||
|
||||
// Pipeline definitions — maps pipeline name to engine config.
|
||||
var pipelines = map[string]pipelineDef{
|
||||
"document-ingestion": {Engine: "argo", Template: "document-ingestion"},
|
||||
"batch-inference": {Engine: "argo", Template: "batch-inference"},
|
||||
"rag-query": {Engine: "kubeflow", PipelineID: "rag-pipeline"},
|
||||
"voice-pipeline": {Engine: "kubeflow", PipelineID: "voice-pipeline"},
|
||||
"model-evaluation": {Engine: "argo", Template: "model-evaluation"},
|
||||
}
|
||||
|
||||
type pipelineDef struct {
|
||||
Engine string // "argo" or "kubeflow"
|
||||
Template string // Argo WorkflowTemplate name
|
||||
PipelineID string // Kubeflow pipeline ID
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := config.Load()
|
||||
cfg.ServiceName = "pipeline-bridge"
|
||||
cfg.NATSQueueGroup = "pipeline-bridges"
|
||||
|
||||
kubeflowHost := getEnv("KUBEFLOW_HOST", "http://ml-pipeline.kubeflow.svc.cluster.local:8888")
|
||||
argoHost := getEnv("ARGO_HOST", "http://argo-server.argo.svc.cluster.local:2746")
|
||||
argoNamespace := getEnv("ARGO_NAMESPACE", "ai-ml")
|
||||
|
||||
httpClient := &http.Client{Timeout: 60 * time.Second}
|
||||
|
||||
h := handler.New("ai.pipeline.trigger", cfg)
|
||||
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
|
||||
req, err := natsutil.Decode[messages.PipelineTrigger](msg.Data)
|
||||
if err != nil {
|
||||
return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil
|
||||
}
|
||||
|
||||
requestID := req.RequestID
|
||||
if requestID == "" {
|
||||
requestID = "unknown"
|
||||
}
|
||||
pipelineName := req.Pipeline
|
||||
params := req.Parameters
|
||||
if params == nil {
|
||||
params = map[string]any{}
|
||||
}
|
||||
|
||||
slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID)
|
||||
|
||||
// Validate pipeline
|
||||
pipeline, ok := pipelines[pipelineName]
|
||||
if !ok {
|
||||
names := make([]string, 0, len(pipelines))
|
||||
for k := range pipelines {
|
||||
names = append(names, k)
|
||||
}
|
||||
return &messages.PipelineStatus{
|
||||
RequestID: requestID,
|
||||
Status: "error",
|
||||
Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName),
|
||||
AvailablePipelines: names,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var runID string
|
||||
|
||||
if pipeline.Engine == "argo" {
|
||||
runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID)
|
||||
} else {
|
||||
runID, err = submitKubeflow(ctx, httpClient, kubeflowHost, pipeline.PipelineID, params, requestID)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err)
|
||||
return &messages.PipelineStatus{
|
||||
RequestID: requestID,
|
||||
Status: "error",
|
||||
Error: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
result := &messages.PipelineStatus{
|
||||
RequestID: requestID,
|
||||
Status: "submitted",
|
||||
RunID: runID,
|
||||
Engine: pipeline.Engine,
|
||||
Pipeline: pipelineName,
|
||||
SubmittedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
// Publish status update
|
||||
_ = h.NATS.Publish(fmt.Sprintf("ai.pipeline.status.%s", requestID), result)
|
||||
|
||||
slog.Info("pipeline submitted", "pipeline", pipelineName, "run_id", runID)
|
||||
return result, nil
|
||||
})
|
||||
|
||||
if err := h.Run(); err != nil {
|
||||
slog.Error("handler failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]any, requestID string) (string, error) {
|
||||
argoParams := make([]map[string]string, 0, len(params))
|
||||
for k, v := range params {
|
||||
argoParams = append(argoParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
|
||||
}
|
||||
|
||||
workflow := map[string]any{
|
||||
"apiVersion": "argoproj.io/v1alpha1",
|
||||
"kind": "Workflow",
|
||||
"metadata": map[string]any{
|
||||
"generateName": template + "-",
|
||||
"namespace": namespace,
|
||||
"labels": map[string]string{"request-id": requestID},
|
||||
},
|
||||
"spec": map[string]any{
|
||||
"workflowTemplateRef": map[string]string{"name": template},
|
||||
"arguments": map[string]any{"parameters": argoParams},
|
||||
},
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(map[string]any{"workflow": workflow})
|
||||
url := fmt.Sprintf("%s/api/v1/workflows/%s", host, namespace)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("argo request: %w", err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
return "", fmt.Errorf("argo %d: %s", resp.StatusCode, string(respBody))
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Metadata struct {
|
||||
Name string `json:"name"`
|
||||
} `json:"metadata"`
|
||||
}
|
||||
if err := json.Unmarshal(respBody, &result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return result.Metadata.Name, nil
|
||||
}
|
||||
|
||||
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]any, requestID string) (string, error) {
|
||||
kfParams := make([]map[string]string, 0, len(params))
|
||||
for k, v := range params {
|
||||
kfParams = append(kfParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
|
||||
}
|
||||
|
||||
runRequest := map[string]any{
|
||||
"name": fmt.Sprintf("%s-%s", pipelineID, requestID[:min(8, len(requestID))]),
|
||||
"pipeline_spec": map[string]any{
|
||||
"pipeline_id": pipelineID,
|
||||
"parameters": kfParams,
|
||||
},
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(runRequest)
|
||||
url := fmt.Sprintf("%s/apis/v1beta1/runs", host)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("kubeflow request: %w", err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
return "", fmt.Errorf("kubeflow %d: %s", resp.StatusCode, string(respBody))
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Run struct {
|
||||
ID string `json:"id"`
|
||||
} `json:"run"`
|
||||
}
|
||||
if err := json.Unmarshal(respBody, &result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return result.Run.ID, nil
|
||||
}
|
||||
|
||||
// Helpers
|
||||
|
||||
func getEnv(key, fallback string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
186
main_test.go
Normal file
186
main_test.go
Normal file
@@ -0,0 +1,186 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
)
|
||||
|
||||
func TestPipelineTriggerDecode(t *testing.T) {
|
||||
req := messages.PipelineTrigger{
|
||||
RequestID: "req-001",
|
||||
Pipeline: "document-ingestion",
|
||||
Parameters: map[string]any{"source": "s3://bucket"},
|
||||
}
|
||||
data, err := msgpack.Marshal(&req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
decoded, err := natsutil.Decode[messages.PipelineTrigger](data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if decoded.RequestID != "req-001" {
|
||||
t.Errorf("RequestID = %q", decoded.RequestID)
|
||||
}
|
||||
if decoded.Pipeline != "document-ingestion" {
|
||||
t.Errorf("Pipeline = %q", decoded.Pipeline)
|
||||
}
|
||||
if decoded.Parameters["source"] != "s3://bucket" {
|
||||
t.Errorf("Parameters = %v", decoded.Parameters)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipelineStatusRoundtrip(t *testing.T) {
|
||||
status := messages.PipelineStatus{
|
||||
RequestID: "req-002",
|
||||
Status: "submitted",
|
||||
RunID: "argo-abc123",
|
||||
Engine: "argo",
|
||||
Pipeline: "batch-inference",
|
||||
}
|
||||
data, err := msgpack.Marshal(&status)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var got messages.PipelineStatus
|
||||
if err := msgpack.Unmarshal(data, &got); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got.RunID != "argo-abc123" {
|
||||
t.Errorf("RunID = %q", got.RunID)
|
||||
}
|
||||
if got.Engine != "argo" {
|
||||
t.Errorf("Engine = %q", got.Engine)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetEnv(t *testing.T) {
|
||||
t.Setenv("TEST_HOST", "http://test:8080")
|
||||
if got := getEnv("TEST_HOST", "fallback"); got != "http://test:8080" {
|
||||
t.Errorf("getEnv(TEST_HOST) = %q, want %q", got, "http://test:8080")
|
||||
}
|
||||
if got := getEnv("MISSING_VAR_XYZ", "default"); got != "default" {
|
||||
t.Errorf("getEnv(MISSING) = %q, want %q", got, "default")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipelinesMap(t *testing.T) {
|
||||
expected := []string{"document-ingestion", "batch-inference", "rag-query", "voice-pipeline", "model-evaluation"}
|
||||
for _, name := range expected {
|
||||
if _, ok := pipelines[name]; !ok {
|
||||
t.Errorf("pipeline %q not found in pipelines map", name)
|
||||
}
|
||||
}
|
||||
if got := pipelines["document-ingestion"].Engine; got != "argo" {
|
||||
t.Errorf("document-ingestion engine = %q, want argo", got)
|
||||
}
|
||||
if got := pipelines["rag-query"].Engine; got != "kubeflow" {
|
||||
t.Errorf("rag-query engine = %q, want kubeflow", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubmitArgo(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
t.Errorf("expected POST, got %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/api/v1/workflows/ai-ml" {
|
||||
t.Errorf("unexpected path: %s", r.URL.Path)
|
||||
}
|
||||
if r.Header.Get("Content-Type") != "application/json" {
|
||||
t.Errorf("expected application/json content type")
|
||||
}
|
||||
|
||||
var body map[string]any
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
t.Errorf("failed to decode body: %v", err)
|
||||
}
|
||||
wf, ok := body["workflow"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatal("missing workflow key")
|
||||
}
|
||||
meta := wf["metadata"].(map[string]any)
|
||||
if meta["namespace"] != "ai-ml" {
|
||||
t.Errorf("unexpected namespace: %v", meta["namespace"])
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"metadata": map[string]any{"name": "document-ingestion-abc123"},
|
||||
})
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]any{
|
||||
"source": "test",
|
||||
}, "req-001")
|
||||
if err != nil {
|
||||
t.Fatalf("submitArgo() error: %v", err)
|
||||
}
|
||||
if runID != "document-ingestion-abc123" {
|
||||
t.Errorf("runID = %q, want %q", runID, "document-ingestion-abc123")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubmitArgoError(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
_, _ = w.Write([]byte(`{"message":"bad request"}`))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
_, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "bad-template", nil, "req-err")
|
||||
if err == nil {
|
||||
t.Fatal("expected error for 400 response")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubmitKubeflow(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
t.Errorf("expected POST, got %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/apis/v1beta1/runs" {
|
||||
t.Errorf("unexpected path: %s", r.URL.Path)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"run": map[string]any{"id": "kf-run-456"},
|
||||
})
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]any{
|
||||
"query": "test",
|
||||
}, "req-002")
|
||||
if err != nil {
|
||||
t.Fatalf("submitKubeflow() error: %v", err)
|
||||
}
|
||||
if runID != "kf-run-456" {
|
||||
t.Errorf("runID = %q, want %q", runID, "kf-run-456")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubmitKubeflowError(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
_, _ = w.Write([]byte("internal error"))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
_, err := submitKubeflow(ctx, ts.Client(), ts.URL, "bad-pipeline", nil, "req-err2")
|
||||
if err == nil {
|
||||
t.Fatal("expected error for 500 response")
|
||||
}
|
||||
}
|
||||
@@ -1,228 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Pipeline Bridge Service (Refactored)
|
||||
|
||||
Bridges NATS events to workflow engines using handler-base:
|
||||
1. Listen for pipeline triggers on "ai.pipeline.trigger"
|
||||
2. Submit to Kubeflow Pipelines or Argo Workflows
|
||||
3. Monitor execution and publish status updates
|
||||
4. Publish completion to "ai.pipeline.status.{request_id}"
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Optional
|
||||
from datetime import datetime
|
||||
|
||||
import httpx
|
||||
from nats.aio.msg import Msg
|
||||
|
||||
from handler_base import Handler, Settings
|
||||
from handler_base.telemetry import create_span
|
||||
|
||||
logger = logging.getLogger("pipeline-bridge")
|
||||
|
||||
|
||||
class PipelineSettings(Settings):
|
||||
"""Pipeline bridge specific settings."""
|
||||
|
||||
service_name: str = "pipeline-bridge"
|
||||
|
||||
# Kubeflow Pipelines
|
||||
kubeflow_host: str = "http://ml-pipeline.kubeflow.svc.cluster.local:8888"
|
||||
|
||||
# Argo Workflows
|
||||
argo_host: str = "http://argo-server.argo.svc.cluster.local:2746"
|
||||
argo_namespace: str = "ai-ml"
|
||||
|
||||
|
||||
# Pipeline definitions
|
||||
PIPELINES = {
|
||||
"document-ingestion": {
|
||||
"engine": "argo",
|
||||
"template": "document-ingestion",
|
||||
"description": "Ingest documents into Milvus vector database",
|
||||
},
|
||||
"batch-inference": {
|
||||
"engine": "argo",
|
||||
"template": "batch-inference",
|
||||
"description": "Run batch LLM inference on a dataset",
|
||||
},
|
||||
"rag-query": {
|
||||
"engine": "kubeflow",
|
||||
"pipeline_id": "rag-pipeline",
|
||||
"description": "Execute RAG query pipeline",
|
||||
},
|
||||
"voice-pipeline": {
|
||||
"engine": "kubeflow",
|
||||
"pipeline_id": "voice-pipeline",
|
||||
"description": "Full voice assistant pipeline",
|
||||
},
|
||||
"model-evaluation": {
|
||||
"engine": "argo",
|
||||
"template": "model-evaluation",
|
||||
"description": "Evaluate model performance",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class PipelineBridge(Handler):
|
||||
"""
|
||||
Pipeline trigger handler.
|
||||
|
||||
Request format:
|
||||
{
|
||||
"request_id": "uuid",
|
||||
"pipeline": "document-ingestion",
|
||||
"parameters": {"key": "value"}
|
||||
}
|
||||
|
||||
Response format:
|
||||
{
|
||||
"request_id": "uuid",
|
||||
"status": "submitted",
|
||||
"run_id": "workflow-run-id",
|
||||
"engine": "argo|kubeflow"
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.pipeline_settings = PipelineSettings()
|
||||
super().__init__(
|
||||
subject="ai.pipeline.trigger",
|
||||
settings=self.pipeline_settings,
|
||||
queue_group="pipeline-bridges",
|
||||
)
|
||||
|
||||
self._http: Optional[httpx.AsyncClient] = None
|
||||
|
||||
async def setup(self) -> None:
|
||||
"""Initialize HTTP client."""
|
||||
logger.info("Initializing pipeline bridge...")
|
||||
|
||||
self._http = httpx.AsyncClient(timeout=60.0)
|
||||
|
||||
logger.info(f"Pipeline bridge ready. Available pipelines: {list(PIPELINES.keys())}")
|
||||
|
||||
async def teardown(self) -> None:
|
||||
"""Clean up HTTP client."""
|
||||
if self._http:
|
||||
await self._http.aclose()
|
||||
logger.info("Pipeline bridge closed")
|
||||
|
||||
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
|
||||
"""Handle pipeline trigger request."""
|
||||
request_id = data.get("request_id", "unknown")
|
||||
pipeline_name = data.get("pipeline", "")
|
||||
parameters = data.get("parameters", {})
|
||||
|
||||
logger.info(f"Triggering pipeline '{pipeline_name}' for request {request_id}")
|
||||
|
||||
with create_span("pipeline.trigger") as span:
|
||||
if span:
|
||||
span.set_attribute("request.id", request_id)
|
||||
span.set_attribute("pipeline.name", pipeline_name)
|
||||
|
||||
# Validate pipeline
|
||||
if pipeline_name not in PIPELINES:
|
||||
error = f"Unknown pipeline: {pipeline_name}"
|
||||
logger.error(error)
|
||||
return {
|
||||
"request_id": request_id,
|
||||
"status": "error",
|
||||
"error": error,
|
||||
"available_pipelines": list(PIPELINES.keys()),
|
||||
}
|
||||
|
||||
pipeline = PIPELINES[pipeline_name]
|
||||
engine = pipeline["engine"]
|
||||
|
||||
try:
|
||||
if engine == "argo":
|
||||
run_id = await self._submit_argo(pipeline["template"], parameters, request_id)
|
||||
else:
|
||||
run_id = await self._submit_kubeflow(
|
||||
pipeline["pipeline_id"], parameters, request_id
|
||||
)
|
||||
|
||||
result = {
|
||||
"request_id": request_id,
|
||||
"status": "submitted",
|
||||
"run_id": run_id,
|
||||
"engine": engine,
|
||||
"pipeline": pipeline_name,
|
||||
"submitted_at": datetime.utcnow().isoformat(),
|
||||
}
|
||||
|
||||
# Publish status update
|
||||
await self.nats.publish(f"ai.pipeline.status.{request_id}", result)
|
||||
|
||||
logger.info(f"Pipeline {pipeline_name} submitted: {run_id}")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to submit pipeline {pipeline_name}")
|
||||
return {
|
||||
"request_id": request_id,
|
||||
"status": "error",
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
async def _submit_argo(self, template: str, parameters: dict, request_id: str) -> str:
|
||||
"""Submit workflow to Argo Workflows."""
|
||||
with create_span("pipeline.submit.argo") as span:
|
||||
if span:
|
||||
span.set_attribute("argo.template", template)
|
||||
|
||||
workflow = {
|
||||
"apiVersion": "argoproj.io/v1alpha1",
|
||||
"kind": "Workflow",
|
||||
"metadata": {
|
||||
"generateName": f"{template}-",
|
||||
"namespace": self.pipeline_settings.argo_namespace,
|
||||
"labels": {
|
||||
"request-id": request_id,
|
||||
},
|
||||
},
|
||||
"spec": {
|
||||
"workflowTemplateRef": {"name": template},
|
||||
"arguments": {
|
||||
"parameters": [{"name": k, "value": str(v)} for k, v in parameters.items()]
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
response = await self._http.post(
|
||||
f"{self.pipeline_settings.argo_host}/api/v1/workflows/{self.pipeline_settings.argo_namespace}",
|
||||
json={"workflow": workflow},
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
return result["metadata"]["name"]
|
||||
|
||||
async def _submit_kubeflow(self, pipeline_id: str, parameters: dict, request_id: str) -> str:
|
||||
"""Submit run to Kubeflow Pipelines."""
|
||||
with create_span("pipeline.submit.kubeflow") as span:
|
||||
if span:
|
||||
span.set_attribute("kubeflow.pipeline_id", pipeline_id)
|
||||
|
||||
run_request = {
|
||||
"name": f"{pipeline_id}-{request_id[:8]}",
|
||||
"pipeline_spec": {
|
||||
"pipeline_id": pipeline_id,
|
||||
"parameters": [{"name": k, "value": str(v)} for k, v in parameters.items()],
|
||||
},
|
||||
}
|
||||
|
||||
response = await self._http.post(
|
||||
f"{self.pipeline_settings.kubeflow_host}/apis/v1beta1/runs",
|
||||
json=run_request,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
return result["run"]["id"]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
PipelineBridge().run()
|
||||
@@ -1,45 +0,0 @@
|
||||
[project]
|
||||
name = "pipeline-bridge"
|
||||
version = "1.0.0"
|
||||
description = "Bridge NATS events to Kubeflow Pipelines and Argo Workflows"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
license = { text = "MIT" }
|
||||
authors = [{ name = "Davies Tech Labs" }]
|
||||
|
||||
dependencies = [
|
||||
"handler-base @ git+https://git.daviestechlabs.io/daviestechlabs/handler-base.git",
|
||||
"httpx>=0.27.0",
|
||||
"kubernetes>=28.0.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"pytest>=8.0.0",
|
||||
"pytest-asyncio>=0.23.0",
|
||||
"ruff>=0.1.0",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.hatch.metadata]
|
||||
allow-direct-references = true
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["."]
|
||||
only-include = ["pipeline_bridge.py"]
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 100
|
||||
target-version = "py311"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
asyncio_mode = "auto"
|
||||
testpaths = ["tests"]
|
||||
python_files = ["test_*.py"]
|
||||
python_classes = ["Test*"]
|
||||
python_functions = ["test_*"]
|
||||
addopts = "-v --tb=short"
|
||||
filterwarnings = ["ignore::DeprecationWarning"]
|
||||
@@ -1 +0,0 @@
|
||||
# Pipeline Bridge Tests
|
||||
@@ -1,91 +0,0 @@
|
||||
"""
|
||||
Pytest configuration and fixtures for pipeline-bridge tests.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
# Set test environment variables before importing
|
||||
os.environ.setdefault("NATS_URL", "nats://localhost:4222")
|
||||
os.environ.setdefault("OTEL_ENABLED", "false")
|
||||
os.environ.setdefault("MLFLOW_ENABLED", "false")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def event_loop():
|
||||
"""Create event loop for async tests."""
|
||||
loop = asyncio.new_event_loop()
|
||||
yield loop
|
||||
loop.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_nats_message():
|
||||
"""Create a mock NATS message."""
|
||||
msg = MagicMock()
|
||||
msg.subject = "ai.pipeline.trigger"
|
||||
msg.reply = "ai.pipeline.status.test-123"
|
||||
return msg
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def argo_pipeline_request():
|
||||
"""Sample Argo pipeline trigger request."""
|
||||
return {
|
||||
"request_id": "test-request-123",
|
||||
"pipeline": "document-ingestion",
|
||||
"parameters": {
|
||||
"source_path": "s3://bucket/documents",
|
||||
"collection_name": "test_collection",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def kubeflow_pipeline_request():
|
||||
"""Sample Kubeflow pipeline trigger request."""
|
||||
return {
|
||||
"request_id": "test-request-456",
|
||||
"pipeline": "rag-query",
|
||||
"parameters": {
|
||||
"query": "What is AI?",
|
||||
"collection": "documents",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def unknown_pipeline_request():
|
||||
"""Request for unknown pipeline."""
|
||||
return {
|
||||
"request_id": "test-request-789",
|
||||
"pipeline": "nonexistent-pipeline",
|
||||
"parameters": {},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_argo_response():
|
||||
"""Mock Argo Workflows API response."""
|
||||
return {
|
||||
"metadata": {
|
||||
"name": "document-ingestion-abc123",
|
||||
"namespace": "ai-ml",
|
||||
},
|
||||
"status": {"phase": "Pending"},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_kubeflow_response():
|
||||
"""Mock Kubeflow Pipelines API response."""
|
||||
return {
|
||||
"run": {
|
||||
"id": "run-xyz-789",
|
||||
"name": "rag-query-test",
|
||||
"status": "Running",
|
||||
}
|
||||
}
|
||||
@@ -1,272 +0,0 @@
|
||||
"""
|
||||
Unit tests for PipelineBridge handler.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from pipeline_bridge import PipelineBridge, PipelineSettings, PIPELINES
|
||||
|
||||
|
||||
class TestPipelineSettings:
|
||||
"""Tests for PipelineSettings configuration."""
|
||||
|
||||
def test_default_settings(self):
|
||||
"""Test default settings values."""
|
||||
settings = PipelineSettings()
|
||||
|
||||
assert settings.service_name == "pipeline-bridge"
|
||||
assert settings.kubeflow_host == "http://ml-pipeline.kubeflow.svc.cluster.local:8888"
|
||||
assert settings.argo_host == "http://argo-server.argo.svc.cluster.local:2746"
|
||||
assert settings.argo_namespace == "ai-ml"
|
||||
|
||||
def test_custom_settings(self):
|
||||
"""Test custom settings."""
|
||||
settings = PipelineSettings(
|
||||
kubeflow_host="http://custom-kubeflow:8888",
|
||||
argo_namespace="custom-ns",
|
||||
)
|
||||
|
||||
assert settings.kubeflow_host == "http://custom-kubeflow:8888"
|
||||
assert settings.argo_namespace == "custom-ns"
|
||||
|
||||
|
||||
class TestPipelineDefinitions:
|
||||
"""Tests for pipeline definitions."""
|
||||
|
||||
def test_required_pipelines_exist(self):
|
||||
"""Test that required pipelines are defined."""
|
||||
required = ["document-ingestion", "batch-inference", "rag-query", "voice-pipeline"]
|
||||
for name in required:
|
||||
assert name in PIPELINES, f"Pipeline {name} should be defined"
|
||||
|
||||
def test_argo_pipelines_have_template(self):
|
||||
"""Test Argo pipelines have template field."""
|
||||
for name, config in PIPELINES.items():
|
||||
if config["engine"] == "argo":
|
||||
assert "template" in config, f"Argo pipeline {name} missing template"
|
||||
|
||||
def test_kubeflow_pipelines_have_pipeline_id(self):
|
||||
"""Test Kubeflow pipelines have pipeline_id field."""
|
||||
for name, config in PIPELINES.items():
|
||||
if config["engine"] == "kubeflow":
|
||||
assert "pipeline_id" in config, f"Kubeflow pipeline {name} missing pipeline_id"
|
||||
|
||||
def test_all_pipelines_have_description(self):
|
||||
"""Test all pipelines have descriptions."""
|
||||
for name, config in PIPELINES.items():
|
||||
assert "description" in config, f"Pipeline {name} missing description"
|
||||
|
||||
|
||||
class TestPipelineBridge:
|
||||
"""Tests for PipelineBridge handler."""
|
||||
|
||||
@pytest.fixture
|
||||
def handler(self):
|
||||
"""Create handler with mocked HTTP client."""
|
||||
handler = PipelineBridge()
|
||||
handler._http = AsyncMock()
|
||||
handler.nats = AsyncMock()
|
||||
return handler
|
||||
|
||||
def test_init(self, handler):
|
||||
"""Test handler initialization."""
|
||||
assert handler.subject == "ai.pipeline.trigger"
|
||||
assert handler.queue_group == "pipeline-bridges"
|
||||
assert handler.pipeline_settings.service_name == "pipeline-bridge"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_unknown_pipeline(
|
||||
self,
|
||||
handler,
|
||||
mock_nats_message,
|
||||
unknown_pipeline_request,
|
||||
):
|
||||
"""Test handling unknown pipeline."""
|
||||
result = await handler.handle_message(mock_nats_message, unknown_pipeline_request)
|
||||
|
||||
assert result["status"] == "error"
|
||||
assert "Unknown pipeline" in result["error"]
|
||||
assert "available_pipelines" in result
|
||||
assert "document-ingestion" in result["available_pipelines"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_argo_pipeline(
|
||||
self,
|
||||
handler,
|
||||
mock_nats_message,
|
||||
argo_pipeline_request,
|
||||
mock_argo_response,
|
||||
):
|
||||
"""Test triggering Argo workflow."""
|
||||
# Setup mock response
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = mock_argo_response
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
handler._http.post.return_value = mock_response
|
||||
|
||||
result = await handler.handle_message(mock_nats_message, argo_pipeline_request)
|
||||
|
||||
assert result["status"] == "submitted"
|
||||
assert result["engine"] == "argo"
|
||||
assert result["run_id"] == "document-ingestion-abc123"
|
||||
assert result["pipeline"] == "document-ingestion"
|
||||
assert "submitted_at" in result
|
||||
|
||||
# Verify API call
|
||||
handler._http.post.assert_called_once()
|
||||
call_args = handler._http.post.call_args
|
||||
assert "argo-server" in str(call_args)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_kubeflow_pipeline(
|
||||
self,
|
||||
handler,
|
||||
mock_nats_message,
|
||||
kubeflow_pipeline_request,
|
||||
mock_kubeflow_response,
|
||||
):
|
||||
"""Test triggering Kubeflow pipeline."""
|
||||
# Setup mock response
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = mock_kubeflow_response
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
handler._http.post.return_value = mock_response
|
||||
|
||||
result = await handler.handle_message(mock_nats_message, kubeflow_pipeline_request)
|
||||
|
||||
assert result["status"] == "submitted"
|
||||
assert result["engine"] == "kubeflow"
|
||||
assert result["run_id"] == "run-xyz-789"
|
||||
assert result["pipeline"] == "rag-query"
|
||||
|
||||
# Verify API call
|
||||
handler._http.post.assert_called_once()
|
||||
call_args = handler._http.post.call_args
|
||||
assert "ml-pipeline" in str(call_args)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_api_error(
|
||||
self,
|
||||
handler,
|
||||
mock_nats_message,
|
||||
argo_pipeline_request,
|
||||
):
|
||||
"""Test handling API errors."""
|
||||
handler._http.post.side_effect = Exception("Connection refused")
|
||||
|
||||
result = await handler.handle_message(mock_nats_message, argo_pipeline_request)
|
||||
|
||||
assert result["status"] == "error"
|
||||
assert "Connection refused" in result["error"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publishes_status_update(
|
||||
self,
|
||||
handler,
|
||||
mock_nats_message,
|
||||
argo_pipeline_request,
|
||||
mock_argo_response,
|
||||
):
|
||||
"""Test that status is published to NATS."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = mock_argo_response
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
handler._http.post.return_value = mock_response
|
||||
|
||||
await handler.handle_message(mock_nats_message, argo_pipeline_request)
|
||||
|
||||
handler.nats.publish.assert_called_once()
|
||||
call_args = handler.nats.publish.call_args
|
||||
assert "ai.pipeline.status.test-request-123" in str(call_args)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_setup_creates_http_client(self):
|
||||
"""Test that setup initializes HTTP client."""
|
||||
with patch("pipeline_bridge.httpx.AsyncClient") as mock_client:
|
||||
handler = PipelineBridge()
|
||||
await handler.setup()
|
||||
|
||||
mock_client.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_teardown_closes_http_client(self, handler):
|
||||
"""Test that teardown closes HTTP client."""
|
||||
await handler.teardown()
|
||||
|
||||
handler._http.aclose.assert_called_once()
|
||||
|
||||
|
||||
class TestArgoSubmission:
|
||||
"""Tests for Argo workflow submission."""
|
||||
|
||||
@pytest.fixture
|
||||
def handler(self):
|
||||
"""Create handler with mocked HTTP client."""
|
||||
handler = PipelineBridge()
|
||||
handler._http = AsyncMock()
|
||||
return handler
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_argo_workflow_structure(
|
||||
self,
|
||||
handler,
|
||||
mock_argo_response,
|
||||
):
|
||||
"""Test Argo workflow request structure."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = mock_argo_response
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
handler._http.post.return_value = mock_response
|
||||
|
||||
await handler._submit_argo(
|
||||
template="document-ingestion",
|
||||
parameters={"key": "value"},
|
||||
request_id="test-123",
|
||||
)
|
||||
|
||||
# Verify workflow structure
|
||||
call_kwargs = handler._http.post.call_args.kwargs
|
||||
workflow = call_kwargs["json"]["workflow"]
|
||||
|
||||
assert workflow["apiVersion"] == "argoproj.io/v1alpha1"
|
||||
assert workflow["kind"] == "Workflow"
|
||||
assert "workflowTemplateRef" in workflow["spec"]
|
||||
assert workflow["spec"]["workflowTemplateRef"]["name"] == "document-ingestion"
|
||||
assert workflow["metadata"]["labels"]["request-id"] == "test-123"
|
||||
|
||||
|
||||
class TestKubeflowSubmission:
|
||||
"""Tests for Kubeflow pipeline submission."""
|
||||
|
||||
@pytest.fixture
|
||||
def handler(self):
|
||||
"""Create handler with mocked HTTP client."""
|
||||
handler = PipelineBridge()
|
||||
handler._http = AsyncMock()
|
||||
return handler
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_kubeflow_run_structure(
|
||||
self,
|
||||
handler,
|
||||
mock_kubeflow_response,
|
||||
):
|
||||
"""Test Kubeflow run request structure."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = mock_kubeflow_response
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
handler._http.post.return_value = mock_response
|
||||
|
||||
await handler._submit_kubeflow(
|
||||
pipeline_id="rag-pipeline",
|
||||
parameters={"query": "test"},
|
||||
request_id="test-456",
|
||||
)
|
||||
|
||||
# Verify run request structure
|
||||
call_kwargs = handler._http.post.call_args.kwargs
|
||||
run_request = call_kwargs["json"]
|
||||
|
||||
assert "rag-pipeline" in run_request["name"]
|
||||
assert run_request["pipeline_spec"]["pipeline_id"] == "rag-pipeline"
|
||||
Reference in New Issue
Block a user