Billy D. da6e96b9f6
All checks were successful
CI / Lint (push) Successful in 1m57s
CI / Test (push) Successful in 54s
CI / Release (push) Successful in 5s
CI / Notify (push) Successful in 1s
fix: replace astral-sh/setup-uv action with shell install
The JS-based GitHub Action doesn't work on Gitea's act runner.
Use curl installer + GITHUB_PATH instead.
2026-02-13 19:40:57 -05:00
2026-02-02 11:08:05 +00:00

Pipeline Bridge

Bridges NATS events to Kubeflow Pipelines and Argo Workflows.

Overview

The Pipeline Bridge listens for pipeline trigger requests on NATS and submits them to the appropriate workflow engine (Argo Workflows or Kubeflow Pipelines). It monitors execution and publishes status updates back to NATS.

NATS Subjects

Subject Direction Description
ai.pipeline.trigger Subscribe Pipeline trigger requests
ai.pipeline.status.{request_id} Publish Pipeline status updates

Supported Pipelines

Pipeline Engine Description
document-ingestion Argo Ingest documents into Milvus
batch-inference Argo Run batch LLM inference
model-evaluation Argo Evaluate model performance
rag-query Kubeflow Execute RAG query pipeline
voice-pipeline Kubeflow Full voice assistant pipeline

Request Format

{
  "request_id": "uuid",
  "pipeline": "document-ingestion",
  "parameters": {
    "source-url": "s3://bucket/docs/",
    "collection-name": "knowledge_base"
  }
}

Response Format

{
  "request_id": "uuid",
  "status": "submitted",
  "pipeline": "document-ingestion",
  "engine": "argo",
  "run_id": "document-ingestion-abc123",
  "message": "Pipeline submitted successfully",
  "timestamp": "2026-01-03T12:00:00Z"
}

Status Updates

The bridge publishes status updates as the workflow progresses:

  • submitted - Workflow created
  • pending - Waiting to start
  • running - In progress
  • succeeded - Completed successfully
  • failed - Failed
  • error - System error

Implementation

The pipeline bridge uses the handler-base library for standardized NATS handling, telemetry, and health checks.

Environment Variables

Variable Default Description
NATS_URL nats://nats.ai-ml.svc.cluster.local:4222 NATS server URL
KUBEFLOW_HOST http://ml-pipeline.kubeflow.svc.cluster.local:8888 Kubeflow Pipelines API
ARGO_HOST http://argo-server.argo.svc.cluster.local:2746 Argo Workflows API
ARGO_NAMESPACE ai-ml Namespace for Argo Workflows

Building

docker build -t pipeline-bridge:latest .

# With specific handler-base tag
docker build --build-arg BASE_TAG=latest -t pipeline-bridge:latest .

Testing

# Port-forward NATS
kubectl port-forward -n ai-ml svc/nats 4222:4222

# Trigger document ingestion
nats pub ai.pipeline.trigger '{
  "request_id": "test-1",
  "pipeline": "document-ingestion",
  "parameters": {"source-url": "https://example.com/docs.txt"}
}'

# Monitor status
nats sub "ai.pipeline.status.>"

License

MIT

Description
No description provided
Readme MIT 315 KiB
Languages
Go 96%
Dockerfile 4%