llm-workflows repo has been phased out. Update labels to reflect the new ai-ml-pipelines naming convention.
271 lines
8.0 KiB
YAML
271 lines
8.0 KiB
YAML
# Argo Events - EventSource for KFP and NATS integration
|
|
# Enables bidirectional triggering between Argo Workflows and Kubeflow Pipelines
|
|
---
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: EventSource
|
|
metadata:
|
|
name: kfp-events
|
|
namespace: ai-ml
|
|
labels:
|
|
app.kubernetes.io/name: kfp-events
|
|
app.kubernetes.io/part-of: ai-ml-pipelines
|
|
spec:
|
|
service:
|
|
ports:
|
|
- name: webhook
|
|
port: 12000
|
|
targetPort: 12000
|
|
# Webhook to receive KFP pipeline completion events
|
|
webhook:
|
|
kfp-completion:
|
|
port: "12000"
|
|
endpoint: /kfp/completion
|
|
method: POST
|
|
kfp-failure:
|
|
port: "12000"
|
|
endpoint: /kfp/failure
|
|
method: POST
|
|
# NATS for receiving pipeline trigger requests
|
|
nats:
|
|
pipeline-trigger:
|
|
url: nats://nats.ai-ml.svc.cluster.local:4222
|
|
subject: ai.pipeline.trigger
|
|
jsonBody: true
|
|
argo-trigger:
|
|
url: nats://nats.ai-ml.svc.cluster.local:4222
|
|
subject: ai.argo.trigger
|
|
jsonBody: true
|
|
kfp-trigger:
|
|
url: nats://nats.ai-ml.svc.cluster.local:4222
|
|
subject: ai.kfp.trigger
|
|
jsonBody: true
|
|
---
|
|
# Sensor for handling KFP completion events
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Sensor
|
|
metadata:
|
|
name: kfp-completion-sensor
|
|
namespace: ai-ml
|
|
labels:
|
|
app.kubernetes.io/name: kfp-completion-sensor
|
|
app.kubernetes.io/part-of: ai-ml-pipelines
|
|
spec:
|
|
dependencies:
|
|
- name: kfp-success
|
|
eventSourceName: kfp-events
|
|
eventName: kfp-completion
|
|
filters:
|
|
data:
|
|
- path: body.status
|
|
type: string
|
|
value:
|
|
- "SUCCEEDED"
|
|
- name: kfp-failure
|
|
eventSourceName: kfp-events
|
|
eventName: kfp-failure
|
|
triggers:
|
|
# On KFP success, publish to NATS
|
|
- template:
|
|
name: notify-kfp-success
|
|
nats:
|
|
url: nats://nats.ai-ml.svc.cluster.local:4222
|
|
subject: ai.pipeline.status.completed
|
|
payload:
|
|
- src:
|
|
dependencyName: kfp-success
|
|
dataKey: body.run_id
|
|
dest: run_id
|
|
- src:
|
|
dependencyName: kfp-success
|
|
dataKey: body.pipeline_name
|
|
dest: pipeline_name
|
|
- src:
|
|
dependencyName: kfp-success
|
|
dataKey: body.status
|
|
dest: status
|
|
retryStrategy:
|
|
steps: 3
|
|
# On KFP failure, trigger recovery workflow
|
|
- template:
|
|
name: kfp-failure-recovery
|
|
k8s:
|
|
operation: create
|
|
source:
|
|
resource:
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
generateName: kfp-failure-handler-
|
|
namespace: ai-ml
|
|
spec:
|
|
entrypoint: notify-failure
|
|
arguments:
|
|
parameters:
|
|
- name: run-id
|
|
- name: pipeline-name
|
|
- name: error-message
|
|
templates:
|
|
- name: notify-failure
|
|
inputs:
|
|
parameters:
|
|
- name: run-id
|
|
- name: pipeline-name
|
|
- name: error-message
|
|
script:
|
|
image: python:3.13-slim
|
|
command: [python]
|
|
source: |
|
|
import subprocess
|
|
import sys
|
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "nats-py"])
|
|
|
|
import asyncio
|
|
import json
|
|
import nats
|
|
|
|
async def notify():
|
|
nc = await nats.connect("nats://nats.ai-ml.svc.cluster.local:4222")
|
|
await nc.publish(
|
|
"ai.pipeline.status.failed",
|
|
json.dumps({
|
|
"run_id": "{{inputs.parameters.run-id}}",
|
|
"pipeline_name": "{{inputs.parameters.pipeline-name}}",
|
|
"error": "{{inputs.parameters.error-message}}",
|
|
"source": "kubeflow"
|
|
}).encode()
|
|
)
|
|
await nc.close()
|
|
|
|
asyncio.run(notify())
|
|
parameters:
|
|
- src:
|
|
dependencyName: kfp-failure
|
|
dataKey: body.run_id
|
|
dest: spec.arguments.parameters.0.value
|
|
- src:
|
|
dependencyName: kfp-failure
|
|
dataKey: body.pipeline_name
|
|
dest: spec.arguments.parameters.1.value
|
|
- src:
|
|
dependencyName: kfp-failure
|
|
dataKey: body.error
|
|
dest: spec.arguments.parameters.2.value
|
|
retryStrategy:
|
|
steps: 3
|
|
---
|
|
# Sensor for NATS-triggered Argo Workflows
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Sensor
|
|
metadata:
|
|
name: nats-argo-sensor
|
|
namespace: ai-ml
|
|
labels:
|
|
app.kubernetes.io/name: nats-argo-sensor
|
|
app.kubernetes.io/part-of: ai-ml-pipelines
|
|
spec:
|
|
dependencies:
|
|
- name: argo-trigger
|
|
eventSourceName: kfp-events
|
|
eventName: argo-trigger
|
|
triggers:
|
|
- template:
|
|
name: trigger-argo-workflow
|
|
k8s:
|
|
operation: create
|
|
source:
|
|
resource:
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
generateName: nats-triggered-
|
|
namespace: ai-ml
|
|
spec:
|
|
workflowTemplateRef:
|
|
name: placeholder
|
|
arguments:
|
|
parameters: []
|
|
parameters:
|
|
- src:
|
|
dependencyName: argo-trigger
|
|
dataKey: body.template
|
|
dest: spec.workflowTemplateRef.name
|
|
- src:
|
|
dependencyName: argo-trigger
|
|
dataKey: body.parameters
|
|
dest: spec.arguments.parameters
|
|
retryStrategy:
|
|
steps: 3
|
|
---
|
|
# Sensor for NATS-triggered KFP Pipelines
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Sensor
|
|
metadata:
|
|
name: nats-kfp-sensor
|
|
namespace: ai-ml
|
|
labels:
|
|
app.kubernetes.io/name: nats-kfp-sensor
|
|
app.kubernetes.io/part-of: ai-ml-pipelines
|
|
spec:
|
|
dependencies:
|
|
- name: kfp-trigger
|
|
eventSourceName: kfp-events
|
|
eventName: kfp-trigger
|
|
triggers:
|
|
# Trigger KFP via Argo Workflow (uses kfp-trigger template)
|
|
- template:
|
|
name: trigger-kfp-via-argo
|
|
k8s:
|
|
operation: create
|
|
source:
|
|
resource:
|
|
apiVersion: argoproj.io/v1alpha1
|
|
kind: Workflow
|
|
metadata:
|
|
generateName: kfp-via-nats-
|
|
namespace: ai-ml
|
|
spec:
|
|
workflowTemplateRef:
|
|
name: kfp-trigger
|
|
arguments:
|
|
parameters:
|
|
- name: pipeline-id
|
|
value: ""
|
|
- name: pipeline-params
|
|
value: "{}"
|
|
- name: wait-for-completion
|
|
value: "true"
|
|
parameters:
|
|
- src:
|
|
dependencyName: kfp-trigger
|
|
dataKey: body.pipeline_id
|
|
dest: spec.arguments.parameters.0.value
|
|
- src:
|
|
dependencyName: kfp-trigger
|
|
dataKey: body.parameters
|
|
dest: spec.arguments.parameters.1.value
|
|
operation: "stringify"
|
|
- src:
|
|
dependencyName: kfp-trigger
|
|
dataKey: body.wait
|
|
dest: spec.arguments.parameters.2.value
|
|
operation: "stringify"
|
|
retryStrategy:
|
|
steps: 3
|
|
---
|
|
# Service for the EventSource webhook
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: kfp-events-webhook
|
|
namespace: ai-ml
|
|
labels:
|
|
app.kubernetes.io/name: kfp-events
|
|
app.kubernetes.io/part-of: ai-ml-pipelines
|
|
spec:
|
|
selector:
|
|
eventsource-name: kfp-events
|
|
ports:
|
|
- name: webhook
|
|
port: 12000
|
|
targetPort: 12000
|