feature/go-handler-refactor #1

Merged
billy merged 3 commits from feature/go-handler-refactor into main 2026-02-20 12:34:07 +00:00
14 changed files with 769 additions and 3433 deletions
Showing only changes of commit aeb1b749be - Show all commits

View File

@@ -0,0 +1,206 @@
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
REGISTRY: gitea-http.gitea.svc.cluster.local:3000/daviestechlabs
REGISTRY_HOST: gitea-http.gitea.svc.cluster.local:3000
IMAGE_NAME: pipeline-bridge
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Run go vet
run: go vet ./...
- name: Install golangci-lint
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b "$(go env GOPATH)/bin"
echo "$(go env GOPATH)/bin" >> $GITHUB_PATH
- name: Run golangci-lint
run: golangci-lint run ./...
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Verify dependencies
run: go mod verify
- name: Build
run: go build -v ./...
- name: Run tests
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./...
release:
name: Release
runs-on: ubuntu-latest
needs: [lint, test]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
outputs:
version: ${{ steps.version.outputs.version }}
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Determine version bump
id: version
run: |
# Get latest tag or default to v0.0.0
LATEST=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
VERSION=${LATEST#v}
IFS='.' read -r MAJOR MINOR PATCH <<< "$VERSION"
# Check commit message for keywords
MSG="${{ gitea.event.head_commit.message }}"
if echo "$MSG" | grep -qiE "^major:|BREAKING CHANGE"; then
MAJOR=$((MAJOR + 1)); MINOR=0; PATCH=0
BUMP="major"
elif echo "$MSG" | grep -qiE "^(minor:|feat:)"; then
MINOR=$((MINOR + 1)); PATCH=0
BUMP="minor"
else
PATCH=$((PATCH + 1))
BUMP="patch"
fi
NEW_VERSION="v${MAJOR}.${MINOR}.${PATCH}"
echo "version=$NEW_VERSION" >> $GITHUB_OUTPUT
echo "bump=$BUMP" >> $GITHUB_OUTPUT
echo "Bumping $LATEST → $NEW_VERSION ($BUMP)"
- name: Create and push tag
run: |
git config user.name "gitea-actions[bot]"
git config user.email "actions@git.daviestechlabs.io"
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
git push origin ${{ steps.version.outputs.version }}
docker:
name: Docker Build & Push
runs-on: ubuntu-latest
needs: [lint, test, release]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
buildkitd-config-inline: |
[registry."gitea-http.gitea.svc.cluster.local:3000"]
http = true
insecure = true
- name: Login to Docker Hub
if: vars.DOCKERHUB_USERNAME != ''
uses: docker/login-action@v3
with:
username: ${{ vars.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Configure Docker for insecure registry
run: |
sudo mkdir -p /etc/docker
echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json
sudo systemctl restart docker || sudo service docker restart || true
sleep 2
- name: Login to Gitea Registry
run: |
AUTH=$(echo -n "${{ secrets.REGISTRY_USER }}:${{ secrets.REGISTRY_TOKEN }}" | base64 -w0)
mkdir -p ~/.docker
cat > ~/.docker/config.json << EOF
{
"auths": {
"${{ env.REGISTRY_HOST }}": {
"auth": "$AUTH"
}
}
}
EOF
echo "Auth configured for ${{ env.REGISTRY_HOST }}"
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=semver,pattern={{version}},value=${{ needs.release.outputs.version }}
type=semver,pattern={{major}}.{{minor}},value=${{ needs.release.outputs.version }}
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
notify:
name: Notify
runs-on: ubuntu-latest
needs: [lint, test, release, docker]
if: always()
steps:
- name: Notify on success
if: needs.lint.result == 'success' && needs.test.result == 'success'
run: |
curl -s \
-H "Title: ✅ CI Passed: ${{ gitea.repository }}" \
-H "Priority: default" \
-H "Tags: white_check_mark,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Release: ${{ needs.release.result == 'success' && needs.release.outputs.version || 'skipped' }}
Docker: ${{ needs.docker.result }}" \
${{ env.NTFY_URL }}/gitea-ci
- name: Notify on failure
if: needs.lint.result == 'failure' || needs.test.result == 'failure'
run: |
curl -s \
-H "Title: ❌ CI Failed: ${{ gitea.repository }}" \
-H "Priority: high" \
-H "Tags: x,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Lint: ${{ needs.lint.result }}
Test: ${{ needs.test.result }}" \
${{ env.NTFY_URL }}/gitea-ci

1
.gitignore vendored
View File

@@ -9,3 +9,4 @@ dist/
build/ build/
.env .env
.env.local .env.local
pipeline-bridge

View File

@@ -1,13 +1,31 @@
# Pipeline Bridge - Using handler-base # Build stage
ARG BASE_TAG=latest FROM golang:1.25-alpine AS builder
FROM ghcr.io/billy-davies-2/handler-base:${BASE_TAG}
WORKDIR /app WORKDIR /app
# Install additional dependencies from pyproject.toml # Install ca-certificates for HTTPS
COPY pyproject.toml . RUN apk add --no-cache ca-certificates
RUN uv pip install --system --no-cache httpx kubernetes
COPY pipeline_bridge.py . # Copy go mod files
COPY go.mod go.sum ./
RUN go mod download
CMD ["python", "pipeline_bridge.py"] # Copy source code
COPY . .
# Build static binary
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-w -s" -o /pipeline-bridge .
# Runtime stage - scratch for minimal image
FROM scratch
# Copy CA certificates for HTTPS
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy binary
COPY --from=builder /pipeline-bridge /pipeline-bridge
# Run as non-root
USER 65534:65534
ENTRYPOINT ["/pipeline-bridge"]

110
README.md
View File

@@ -1,105 +1,43 @@
# Pipeline Bridge # pipeline-bridge
Bridges NATS events to Kubeflow Pipelines and Argo Workflows. Bridges NATS events to Argo Workflows and Kubeflow Pipelines. Subscribes to
`ai.pipeline.trigger` and submits workflow runs to the appropriate engine,
## Overview publishing status updates to `ai.pipeline.status.{request_id}`.
The Pipeline Bridge listens for pipeline trigger requests on NATS and submits them to the appropriate workflow engine (Argo Workflows or Kubeflow Pipelines). It monitors execution and publishes status updates back to NATS.
## NATS Subjects
| Subject | Direction | Description |
|---------|-----------|-------------|
| `ai.pipeline.trigger` | Subscribe | Pipeline trigger requests |
| `ai.pipeline.status.{request_id}` | Publish | Pipeline status updates |
## Supported Pipelines ## Supported Pipelines
| Pipeline | Engine | Description | | Name | Engine | Template / Pipeline ID |
|----------|--------|-------------| |------|--------|----------------------|
| `document-ingestion` | Argo | Ingest documents into Milvus | | document-ingestion | Argo | document-ingestion |
| `batch-inference` | Argo | Run batch LLM inference | | batch-inference | Argo | batch-inference |
| `model-evaluation` | Argo | Evaluate model performance | | model-evaluation | Argo | model-evaluation |
| `rag-query` | Kubeflow | Execute RAG query pipeline | | rag-query | Kubeflow | rag-pipeline |
| `voice-pipeline` | Kubeflow | Full voice assistant pipeline | | voice-pipeline | Kubeflow | voice-pipeline |
## Request Format ## Configuration
```json
{
"request_id": "uuid",
"pipeline": "document-ingestion",
"parameters": {
"source-url": "s3://bucket/docs/",
"collection-name": "knowledge_base"
}
}
```
## Response Format
```json
{
"request_id": "uuid",
"status": "submitted",
"pipeline": "document-ingestion",
"engine": "argo",
"run_id": "document-ingestion-abc123",
"message": "Pipeline submitted successfully",
"timestamp": "2026-01-03T12:00:00Z"
}
```
## Status Updates
The bridge publishes status updates as the workflow progresses:
- `submitted` - Workflow created
- `pending` - Waiting to start
- `running` - In progress
- `succeeded` - Completed successfully
- `failed` - Failed
- `error` - System error
## Implementation
The pipeline bridge uses the [handler-base](https://git.daviestechlabs.io/daviestechlabs/handler-base) library for standardized NATS handling, telemetry, and health checks.
## Environment Variables
| Variable | Default | Description | | Variable | Default | Description |
|----------|---------|-------------| |----------|---------|-------------|
| `NATS_URL` | `nats://nats.ai-ml.svc.cluster.local:4222` | NATS server URL |
| `KUBEFLOW_HOST` | `http://ml-pipeline.kubeflow.svc.cluster.local:8888` | Kubeflow Pipelines API | | `KUBEFLOW_HOST` | `http://ml-pipeline.kubeflow.svc.cluster.local:8888` | Kubeflow Pipelines API |
| `ARGO_HOST` | `http://argo-server.argo.svc.cluster.local:2746` | Argo Workflows API | | `ARGO_HOST` | `http://argo-server.argo.svc.cluster.local:2746` | Argo Server API |
| `ARGO_NAMESPACE` | `ai-ml` | Namespace for Argo Workflows | | `ARGO_NAMESPACE` | `ai-ml` | Namespace for Argo workflows |
## Building Plus all standard handler-base settings (`NATS_URL`, `OTEL_*`, `HEALTH_PORT`, etc.).
## Build
```bash ```bash
docker build -t pipeline-bridge:latest . go build -o pipeline-bridge .
# With specific handler-base tag
docker build --build-arg BASE_TAG=latest -t pipeline-bridge:latest .
``` ```
## Testing ## Test
```bash ```bash
# Port-forward NATS go test -v -race ./...
kubectl port-forward -n ai-ml svc/nats 4222:4222
# Trigger document ingestion
nats pub ai.pipeline.trigger '{
"request_id": "test-1",
"pipeline": "document-ingestion",
"parameters": {"source-url": "https://example.com/docs.txt"}
}'
# Monitor status
nats sub "ai.pipeline.status.>"
``` ```
## License ## Docker
MIT ```bash
docker build -t pipeline-bridge .
```

42
go.mod Normal file
View File

@@ -0,0 +1,42 @@
module git.daviestechlabs.io/daviestechlabs/pipeline-bridge
go 1.25.1
require (
git.daviestechlabs.io/daviestechlabs/handler-base v0.0.0
github.com/nats-io/nats.go v1.48.0
)
require (
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 // indirect
go.opentelemetry.io/otel/metric v1.40.0 // indirect
go.opentelemetry.io/otel/sdk v1.40.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect
go.opentelemetry.io/otel/trace v1.40.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/grpc v1.78.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)
replace git.daviestechlabs.io/daviestechlabs/handler-base => ../handler-base

77
go.sum Normal file
View File

@@ -0,0 +1,77 @@
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 h1:NOyNnS19BF2SUDApbOKbDtWZ0IK7b8FJ2uAGdIWOGb0=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0/go.mod h1:VL6EgVikRLcJa9ftukrHu/ZkkhFBSo1lzvdBC9CF1ss=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 h1:DvJDOPmSWQHWywQS6lKL+pb8s3gBLOZUtw4N+mavW1I=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0/go.mod h1:EtekO9DEJb4/jRyN4v4Qjc2yA7AtfCBuz2FynRUWTXs=
go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g=
go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc=
go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8=
go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE=
go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw=
go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg=
go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw=
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

230
main.go Normal file
View File

@@ -0,0 +1,230 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"time"
"github.com/nats-io/nats.go"
"git.daviestechlabs.io/daviestechlabs/handler-base/config"
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
)
// Pipeline definitions — maps pipeline name to engine config.
var pipelines = map[string]pipelineDef{
"document-ingestion": {Engine: "argo", Template: "document-ingestion"},
"batch-inference": {Engine: "argo", Template: "batch-inference"},
"rag-query": {Engine: "kubeflow", PipelineID: "rag-pipeline"},
"voice-pipeline": {Engine: "kubeflow", PipelineID: "voice-pipeline"},
"model-evaluation": {Engine: "argo", Template: "model-evaluation"},
}
type pipelineDef struct {
Engine string // "argo" or "kubeflow"
Template string // Argo WorkflowTemplate name
PipelineID string // Kubeflow pipeline ID
}
func main() {
cfg := config.Load()
cfg.ServiceName = "pipeline-bridge"
cfg.NATSQueueGroup = "pipeline-bridges"
kubeflowHost := getEnv("KUBEFLOW_HOST", "http://ml-pipeline.kubeflow.svc.cluster.local:8888")
argoHost := getEnv("ARGO_HOST", "http://argo-server.argo.svc.cluster.local:2746")
argoNamespace := getEnv("ARGO_NAMESPACE", "ai-ml")
httpClient := &http.Client{Timeout: 60 * time.Second}
h := handler.New("ai.pipeline.trigger", cfg)
h.OnMessage(func(ctx context.Context, msg *nats.Msg, data map[string]any) (map[string]any, error) {
requestID := strVal(data, "request_id", "unknown")
pipelineName := strVal(data, "pipeline", "")
params := mapVal(data, "parameters")
slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID)
// Validate pipeline
pipeline, ok := pipelines[pipelineName]
if !ok {
names := make([]string, 0, len(pipelines))
for k := range pipelines {
names = append(names, k)
}
return map[string]any{
"request_id": requestID,
"status": "error",
"error": fmt.Sprintf("Unknown pipeline: %s", pipelineName),
"available_pipelines": names,
}, nil
}
var runID string
var err error
if pipeline.Engine == "argo" {
runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID)
} else {
runID, err = submitKubeflow(ctx, httpClient, kubeflowHost, pipeline.PipelineID, params, requestID)
}
if err != nil {
slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err)
return map[string]any{
"request_id": requestID,
"status": "error",
"error": err.Error(),
}, nil
}
result := map[string]any{
"request_id": requestID,
"status": "submitted",
"run_id": runID,
"engine": pipeline.Engine,
"pipeline": pipelineName,
"submitted_at": time.Now().UTC().Format(time.RFC3339),
}
// Publish status update
_ = h.NATS.Publish(fmt.Sprintf("ai.pipeline.status.%s", requestID), result)
slog.Info("pipeline submitted", "pipeline", pipelineName, "run_id", runID)
return result, nil
})
if err := h.Run(); err != nil {
slog.Error("handler failed", "error", err)
}
}
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]any, requestID string) (string, error) {
argoParams := make([]map[string]string, 0, len(params))
for k, v := range params {
argoParams = append(argoParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
}
workflow := map[string]any{
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
"metadata": map[string]any{
"generateName": template + "-",
"namespace": namespace,
"labels": map[string]string{"request-id": requestID},
},
"spec": map[string]any{
"workflowTemplateRef": map[string]string{"name": template},
"arguments": map[string]any{"parameters": argoParams},
},
}
body, _ := json.Marshal(map[string]any{"workflow": workflow})
url := fmt.Sprintf("%s/api/v1/workflows/%s", host, namespace)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("argo request: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return "", fmt.Errorf("argo %d: %s", resp.StatusCode, string(respBody))
}
var result struct {
Metadata struct {
Name string `json:"name"`
} `json:"metadata"`
}
if err := json.Unmarshal(respBody, &result); err != nil {
return "", err
}
return result.Metadata.Name, nil
}
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]any, requestID string) (string, error) {
kfParams := make([]map[string]string, 0, len(params))
for k, v := range params {
kfParams = append(kfParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
}
runRequest := map[string]any{
"name": fmt.Sprintf("%s-%s", pipelineID, requestID[:min(8, len(requestID))]),
"pipeline_spec": map[string]any{
"pipeline_id": pipelineID,
"parameters": kfParams,
},
}
body, _ := json.Marshal(runRequest)
url := fmt.Sprintf("%s/apis/v1beta1/runs", host)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("kubeflow request: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return "", fmt.Errorf("kubeflow %d: %s", resp.StatusCode, string(respBody))
}
var result struct {
Run struct {
ID string `json:"id"`
} `json:"run"`
}
if err := json.Unmarshal(respBody, &result); err != nil {
return "", err
}
return result.Run.ID, nil
}
// Helpers
func strVal(m map[string]any, key, fallback string) string {
if v, ok := m[key]; ok {
if s, ok := v.(string); ok {
return s
}
}
return fallback
}
func mapVal(m map[string]any, key string) map[string]any {
if v, ok := m[key]; ok {
if sub, ok := v.(map[string]any); ok {
return sub
}
}
return map[string]any{}
}
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}

163
main_test.go Normal file
View File

@@ -0,0 +1,163 @@
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
func TestStrVal(t *testing.T) {
m := map[string]any{"key": "value", "num": 42}
if got := strVal(m, "key", ""); got != "value" {
t.Errorf("strVal(key) = %q, want %q", got, "value")
}
if got := strVal(m, "missing", "default"); got != "default" {
t.Errorf("strVal(missing) = %q, want %q", got, "default")
}
if got := strVal(m, "num", "fallback"); got != "fallback" {
t.Errorf("strVal(num) = %q, want %q", got, "fallback")
}
}
func TestMapVal(t *testing.T) {
inner := map[string]any{"a": "b"}
m := map[string]any{"nested": inner, "scalar": "hi"}
got := mapVal(m, "nested")
if got["a"] != "b" {
t.Errorf("mapVal(nested) = %v, want {a:b}", got)
}
got2 := mapVal(m, "missing")
if len(got2) != 0 {
t.Errorf("mapVal(missing) should be empty, got %v", got2)
}
got3 := mapVal(m, "scalar")
if len(got3) != 0 {
t.Errorf("mapVal(scalar) should be empty, got %v", got3)
}
}
func TestGetEnv(t *testing.T) {
t.Setenv("TEST_HOST", "http://test:8080")
if got := getEnv("TEST_HOST", "fallback"); got != "http://test:8080" {
t.Errorf("getEnv(TEST_HOST) = %q, want %q", got, "http://test:8080")
}
if got := getEnv("MISSING_VAR_XYZ", "default"); got != "default" {
t.Errorf("getEnv(MISSING) = %q, want %q", got, "default")
}
}
func TestPipelinesMap(t *testing.T) {
expected := []string{"document-ingestion", "batch-inference", "rag-query", "voice-pipeline", "model-evaluation"}
for _, name := range expected {
if _, ok := pipelines[name]; !ok {
t.Errorf("pipeline %q not found in pipelines map", name)
}
}
if got := pipelines["document-ingestion"].Engine; got != "argo" {
t.Errorf("document-ingestion engine = %q, want argo", got)
}
if got := pipelines["rag-query"].Engine; got != "kubeflow" {
t.Errorf("rag-query engine = %q, want kubeflow", got)
}
}
func TestSubmitArgo(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/api/v1/workflows/ai-ml" {
t.Errorf("unexpected path: %s", r.URL.Path)
}
if r.Header.Get("Content-Type") != "application/json" {
t.Errorf("expected application/json content type")
}
var body map[string]any
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Errorf("failed to decode body: %v", err)
}
wf, ok := body["workflow"].(map[string]any)
if !ok {
t.Fatal("missing workflow key")
}
meta := wf["metadata"].(map[string]any)
if meta["namespace"] != "ai-ml" {
t.Errorf("unexpected namespace: %v", meta["namespace"])
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"metadata": map[string]any{"name": "document-ingestion-abc123"},
})
}))
defer ts.Close()
ctx := t.Context()
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]any{
"source": "test",
}, "req-001")
if err != nil {
t.Fatalf("submitArgo() error: %v", err)
}
if runID != "document-ingestion-abc123" {
t.Errorf("runID = %q, want %q", runID, "document-ingestion-abc123")
}
}
func TestSubmitArgoError(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(`{"message":"bad request"}`))
}))
defer ts.Close()
ctx := t.Context()
_, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "bad-template", nil, "req-err")
if err == nil {
t.Fatal("expected error for 400 response")
}
}
func TestSubmitKubeflow(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/apis/v1beta1/runs" {
t.Errorf("unexpected path: %s", r.URL.Path)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"run": map[string]any{"id": "kf-run-456"},
})
}))
defer ts.Close()
ctx := t.Context()
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]any{
"query": "test",
}, "req-002")
if err != nil {
t.Fatalf("submitKubeflow() error: %v", err)
}
if runID != "kf-run-456" {
t.Errorf("runID = %q, want %q", runID, "kf-run-456")
}
}
func TestSubmitKubeflowError(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("internal error"))
}))
defer ts.Close()
ctx := t.Context()
_, err := submitKubeflow(ctx, ts.Client(), ts.URL, "bad-pipeline", nil, "req-err2")
if err == nil {
t.Fatal("expected error for 500 response")
}
}

View File

@@ -1,228 +0,0 @@
#!/usr/bin/env python3
"""
Pipeline Bridge Service (Refactored)
Bridges NATS events to workflow engines using handler-base:
1. Listen for pipeline triggers on "ai.pipeline.trigger"
2. Submit to Kubeflow Pipelines or Argo Workflows
3. Monitor execution and publish status updates
4. Publish completion to "ai.pipeline.status.{request_id}"
"""
import logging
from typing import Any, Optional
from datetime import datetime
import httpx
from nats.aio.msg import Msg
from handler_base import Handler, Settings
from handler_base.telemetry import create_span
logger = logging.getLogger("pipeline-bridge")
class PipelineSettings(Settings):
"""Pipeline bridge specific settings."""
service_name: str = "pipeline-bridge"
# Kubeflow Pipelines
kubeflow_host: str = "http://ml-pipeline.kubeflow.svc.cluster.local:8888"
# Argo Workflows
argo_host: str = "http://argo-server.argo.svc.cluster.local:2746"
argo_namespace: str = "ai-ml"
# Pipeline definitions
PIPELINES = {
"document-ingestion": {
"engine": "argo",
"template": "document-ingestion",
"description": "Ingest documents into Milvus vector database",
},
"batch-inference": {
"engine": "argo",
"template": "batch-inference",
"description": "Run batch LLM inference on a dataset",
},
"rag-query": {
"engine": "kubeflow",
"pipeline_id": "rag-pipeline",
"description": "Execute RAG query pipeline",
},
"voice-pipeline": {
"engine": "kubeflow",
"pipeline_id": "voice-pipeline",
"description": "Full voice assistant pipeline",
},
"model-evaluation": {
"engine": "argo",
"template": "model-evaluation",
"description": "Evaluate model performance",
},
}
class PipelineBridge(Handler):
"""
Pipeline trigger handler.
Request format:
{
"request_id": "uuid",
"pipeline": "document-ingestion",
"parameters": {"key": "value"}
}
Response format:
{
"request_id": "uuid",
"status": "submitted",
"run_id": "workflow-run-id",
"engine": "argo|kubeflow"
}
"""
def __init__(self):
self.pipeline_settings = PipelineSettings()
super().__init__(
subject="ai.pipeline.trigger",
settings=self.pipeline_settings,
queue_group="pipeline-bridges",
)
self._http: Optional[httpx.AsyncClient] = None
async def setup(self) -> None:
"""Initialize HTTP client."""
logger.info("Initializing pipeline bridge...")
self._http = httpx.AsyncClient(timeout=60.0)
logger.info(f"Pipeline bridge ready. Available pipelines: {list(PIPELINES.keys())}")
async def teardown(self) -> None:
"""Clean up HTTP client."""
if self._http:
await self._http.aclose()
logger.info("Pipeline bridge closed")
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
"""Handle pipeline trigger request."""
request_id = data.get("request_id", "unknown")
pipeline_name = data.get("pipeline", "")
parameters = data.get("parameters", {})
logger.info(f"Triggering pipeline '{pipeline_name}' for request {request_id}")
with create_span("pipeline.trigger") as span:
if span:
span.set_attribute("request.id", request_id)
span.set_attribute("pipeline.name", pipeline_name)
# Validate pipeline
if pipeline_name not in PIPELINES:
error = f"Unknown pipeline: {pipeline_name}"
logger.error(error)
return {
"request_id": request_id,
"status": "error",
"error": error,
"available_pipelines": list(PIPELINES.keys()),
}
pipeline = PIPELINES[pipeline_name]
engine = pipeline["engine"]
try:
if engine == "argo":
run_id = await self._submit_argo(pipeline["template"], parameters, request_id)
else:
run_id = await self._submit_kubeflow(
pipeline["pipeline_id"], parameters, request_id
)
result = {
"request_id": request_id,
"status": "submitted",
"run_id": run_id,
"engine": engine,
"pipeline": pipeline_name,
"submitted_at": datetime.utcnow().isoformat(),
}
# Publish status update
await self.nats.publish(f"ai.pipeline.status.{request_id}", result)
logger.info(f"Pipeline {pipeline_name} submitted: {run_id}")
return result
except Exception as e:
logger.exception(f"Failed to submit pipeline {pipeline_name}")
return {
"request_id": request_id,
"status": "error",
"error": str(e),
}
async def _submit_argo(self, template: str, parameters: dict, request_id: str) -> str:
"""Submit workflow to Argo Workflows."""
with create_span("pipeline.submit.argo") as span:
if span:
span.set_attribute("argo.template", template)
workflow = {
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
"metadata": {
"generateName": f"{template}-",
"namespace": self.pipeline_settings.argo_namespace,
"labels": {
"request-id": request_id,
},
},
"spec": {
"workflowTemplateRef": {"name": template},
"arguments": {
"parameters": [{"name": k, "value": str(v)} for k, v in parameters.items()]
},
},
}
response = await self._http.post(
f"{self.pipeline_settings.argo_host}/api/v1/workflows/{self.pipeline_settings.argo_namespace}",
json={"workflow": workflow},
)
response.raise_for_status()
result = response.json()
return result["metadata"]["name"]
async def _submit_kubeflow(self, pipeline_id: str, parameters: dict, request_id: str) -> str:
"""Submit run to Kubeflow Pipelines."""
with create_span("pipeline.submit.kubeflow") as span:
if span:
span.set_attribute("kubeflow.pipeline_id", pipeline_id)
run_request = {
"name": f"{pipeline_id}-{request_id[:8]}",
"pipeline_spec": {
"pipeline_id": pipeline_id,
"parameters": [{"name": k, "value": str(v)} for k, v in parameters.items()],
},
}
response = await self._http.post(
f"{self.pipeline_settings.kubeflow_host}/apis/v1beta1/runs",
json=run_request,
)
response.raise_for_status()
result = response.json()
return result["run"]["id"]
if __name__ == "__main__":
PipelineBridge().run()

View File

@@ -1,45 +0,0 @@
[project]
name = "pipeline-bridge"
version = "1.0.0"
description = "Bridge NATS events to Kubeflow Pipelines and Argo Workflows"
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [{ name = "Davies Tech Labs" }]
dependencies = [
"handler-base @ git+https://git.daviestechlabs.io/daviestechlabs/handler-base.git",
"httpx>=0.27.0",
"kubernetes>=28.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"ruff>=0.1.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.metadata]
allow-direct-references = true
[tool.hatch.build.targets.wheel]
packages = ["."]
only-include = ["pipeline_bridge.py"]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --tb=short"
filterwarnings = ["ignore::DeprecationWarning"]

View File

@@ -1 +0,0 @@
# Pipeline Bridge Tests

View File

@@ -1,91 +0,0 @@
"""
Pytest configuration and fixtures for pipeline-bridge tests.
"""
import asyncio
import os
from unittest.mock import MagicMock
import pytest
# Set test environment variables before importing
os.environ.setdefault("NATS_URL", "nats://localhost:4222")
os.environ.setdefault("OTEL_ENABLED", "false")
os.environ.setdefault("MLFLOW_ENABLED", "false")
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def mock_nats_message():
"""Create a mock NATS message."""
msg = MagicMock()
msg.subject = "ai.pipeline.trigger"
msg.reply = "ai.pipeline.status.test-123"
return msg
@pytest.fixture
def argo_pipeline_request():
"""Sample Argo pipeline trigger request."""
return {
"request_id": "test-request-123",
"pipeline": "document-ingestion",
"parameters": {
"source_path": "s3://bucket/documents",
"collection_name": "test_collection",
},
}
@pytest.fixture
def kubeflow_pipeline_request():
"""Sample Kubeflow pipeline trigger request."""
return {
"request_id": "test-request-456",
"pipeline": "rag-query",
"parameters": {
"query": "What is AI?",
"collection": "documents",
},
}
@pytest.fixture
def unknown_pipeline_request():
"""Request for unknown pipeline."""
return {
"request_id": "test-request-789",
"pipeline": "nonexistent-pipeline",
"parameters": {},
}
@pytest.fixture
def mock_argo_response():
"""Mock Argo Workflows API response."""
return {
"metadata": {
"name": "document-ingestion-abc123",
"namespace": "ai-ml",
},
"status": {"phase": "Pending"},
}
@pytest.fixture
def mock_kubeflow_response():
"""Mock Kubeflow Pipelines API response."""
return {
"run": {
"id": "run-xyz-789",
"name": "rag-query-test",
"status": "Running",
}
}

View File

@@ -1,272 +0,0 @@
"""
Unit tests for PipelineBridge handler.
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from pipeline_bridge import PipelineBridge, PipelineSettings, PIPELINES
class TestPipelineSettings:
"""Tests for PipelineSettings configuration."""
def test_default_settings(self):
"""Test default settings values."""
settings = PipelineSettings()
assert settings.service_name == "pipeline-bridge"
assert settings.kubeflow_host == "http://ml-pipeline.kubeflow.svc.cluster.local:8888"
assert settings.argo_host == "http://argo-server.argo.svc.cluster.local:2746"
assert settings.argo_namespace == "ai-ml"
def test_custom_settings(self):
"""Test custom settings."""
settings = PipelineSettings(
kubeflow_host="http://custom-kubeflow:8888",
argo_namespace="custom-ns",
)
assert settings.kubeflow_host == "http://custom-kubeflow:8888"
assert settings.argo_namespace == "custom-ns"
class TestPipelineDefinitions:
"""Tests for pipeline definitions."""
def test_required_pipelines_exist(self):
"""Test that required pipelines are defined."""
required = ["document-ingestion", "batch-inference", "rag-query", "voice-pipeline"]
for name in required:
assert name in PIPELINES, f"Pipeline {name} should be defined"
def test_argo_pipelines_have_template(self):
"""Test Argo pipelines have template field."""
for name, config in PIPELINES.items():
if config["engine"] == "argo":
assert "template" in config, f"Argo pipeline {name} missing template"
def test_kubeflow_pipelines_have_pipeline_id(self):
"""Test Kubeflow pipelines have pipeline_id field."""
for name, config in PIPELINES.items():
if config["engine"] == "kubeflow":
assert "pipeline_id" in config, f"Kubeflow pipeline {name} missing pipeline_id"
def test_all_pipelines_have_description(self):
"""Test all pipelines have descriptions."""
for name, config in PIPELINES.items():
assert "description" in config, f"Pipeline {name} missing description"
class TestPipelineBridge:
"""Tests for PipelineBridge handler."""
@pytest.fixture
def handler(self):
"""Create handler with mocked HTTP client."""
handler = PipelineBridge()
handler._http = AsyncMock()
handler.nats = AsyncMock()
return handler
def test_init(self, handler):
"""Test handler initialization."""
assert handler.subject == "ai.pipeline.trigger"
assert handler.queue_group == "pipeline-bridges"
assert handler.pipeline_settings.service_name == "pipeline-bridge"
@pytest.mark.asyncio
async def test_handle_unknown_pipeline(
self,
handler,
mock_nats_message,
unknown_pipeline_request,
):
"""Test handling unknown pipeline."""
result = await handler.handle_message(mock_nats_message, unknown_pipeline_request)
assert result["status"] == "error"
assert "Unknown pipeline" in result["error"]
assert "available_pipelines" in result
assert "document-ingestion" in result["available_pipelines"]
@pytest.mark.asyncio
async def test_handle_argo_pipeline(
self,
handler,
mock_nats_message,
argo_pipeline_request,
mock_argo_response,
):
"""Test triggering Argo workflow."""
# Setup mock response
mock_response = MagicMock()
mock_response.json.return_value = mock_argo_response
mock_response.raise_for_status = MagicMock()
handler._http.post.return_value = mock_response
result = await handler.handle_message(mock_nats_message, argo_pipeline_request)
assert result["status"] == "submitted"
assert result["engine"] == "argo"
assert result["run_id"] == "document-ingestion-abc123"
assert result["pipeline"] == "document-ingestion"
assert "submitted_at" in result
# Verify API call
handler._http.post.assert_called_once()
call_args = handler._http.post.call_args
assert "argo-server" in str(call_args)
@pytest.mark.asyncio
async def test_handle_kubeflow_pipeline(
self,
handler,
mock_nats_message,
kubeflow_pipeline_request,
mock_kubeflow_response,
):
"""Test triggering Kubeflow pipeline."""
# Setup mock response
mock_response = MagicMock()
mock_response.json.return_value = mock_kubeflow_response
mock_response.raise_for_status = MagicMock()
handler._http.post.return_value = mock_response
result = await handler.handle_message(mock_nats_message, kubeflow_pipeline_request)
assert result["status"] == "submitted"
assert result["engine"] == "kubeflow"
assert result["run_id"] == "run-xyz-789"
assert result["pipeline"] == "rag-query"
# Verify API call
handler._http.post.assert_called_once()
call_args = handler._http.post.call_args
assert "ml-pipeline" in str(call_args)
@pytest.mark.asyncio
async def test_handle_api_error(
self,
handler,
mock_nats_message,
argo_pipeline_request,
):
"""Test handling API errors."""
handler._http.post.side_effect = Exception("Connection refused")
result = await handler.handle_message(mock_nats_message, argo_pipeline_request)
assert result["status"] == "error"
assert "Connection refused" in result["error"]
@pytest.mark.asyncio
async def test_publishes_status_update(
self,
handler,
mock_nats_message,
argo_pipeline_request,
mock_argo_response,
):
"""Test that status is published to NATS."""
mock_response = MagicMock()
mock_response.json.return_value = mock_argo_response
mock_response.raise_for_status = MagicMock()
handler._http.post.return_value = mock_response
await handler.handle_message(mock_nats_message, argo_pipeline_request)
handler.nats.publish.assert_called_once()
call_args = handler.nats.publish.call_args
assert "ai.pipeline.status.test-request-123" in str(call_args)
@pytest.mark.asyncio
async def test_setup_creates_http_client(self):
"""Test that setup initializes HTTP client."""
with patch("pipeline_bridge.httpx.AsyncClient") as mock_client:
handler = PipelineBridge()
await handler.setup()
mock_client.assert_called_once()
@pytest.mark.asyncio
async def test_teardown_closes_http_client(self, handler):
"""Test that teardown closes HTTP client."""
await handler.teardown()
handler._http.aclose.assert_called_once()
class TestArgoSubmission:
"""Tests for Argo workflow submission."""
@pytest.fixture
def handler(self):
"""Create handler with mocked HTTP client."""
handler = PipelineBridge()
handler._http = AsyncMock()
return handler
@pytest.mark.asyncio
async def test_argo_workflow_structure(
self,
handler,
mock_argo_response,
):
"""Test Argo workflow request structure."""
mock_response = MagicMock()
mock_response.json.return_value = mock_argo_response
mock_response.raise_for_status = MagicMock()
handler._http.post.return_value = mock_response
await handler._submit_argo(
template="document-ingestion",
parameters={"key": "value"},
request_id="test-123",
)
# Verify workflow structure
call_kwargs = handler._http.post.call_args.kwargs
workflow = call_kwargs["json"]["workflow"]
assert workflow["apiVersion"] == "argoproj.io/v1alpha1"
assert workflow["kind"] == "Workflow"
assert "workflowTemplateRef" in workflow["spec"]
assert workflow["spec"]["workflowTemplateRef"]["name"] == "document-ingestion"
assert workflow["metadata"]["labels"]["request-id"] == "test-123"
class TestKubeflowSubmission:
"""Tests for Kubeflow pipeline submission."""
@pytest.fixture
def handler(self):
"""Create handler with mocked HTTP client."""
handler = PipelineBridge()
handler._http = AsyncMock()
return handler
@pytest.mark.asyncio
async def test_kubeflow_run_structure(
self,
handler,
mock_kubeflow_response,
):
"""Test Kubeflow run request structure."""
mock_response = MagicMock()
mock_response.json.return_value = mock_kubeflow_response
mock_response.raise_for_status = MagicMock()
handler._http.post.return_value = mock_response
await handler._submit_kubeflow(
pipeline_id="rag-pipeline",
parameters={"query": "test"},
request_id="test-456",
)
# Verify run request structure
call_kwargs = handler._http.post.call_args.kwargs
run_request = call_kwargs["json"]
assert "rag-pipeline" in run_request["name"]
assert run_request["pipeline_spec"]["pipeline_id"] == "rag-pipeline"

2702
uv.lock generated

File diff suppressed because it is too large Load Diff