# Argo Events - EventSource for KFP and NATS integration # Enables bidirectional triggering between Argo Workflows and Kubeflow Pipelines --- apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: kfp-events namespace: ai-ml labels: app.kubernetes.io/name: kfp-events app.kubernetes.io/part-of: ai-ml-pipelines spec: service: ports: - name: webhook port: 12000 targetPort: 12000 # Webhook to receive KFP pipeline completion events webhook: kfp-completion: port: "12000" endpoint: /kfp/completion method: POST kfp-failure: port: "12000" endpoint: /kfp/failure method: POST # NATS for receiving pipeline trigger requests nats: pipeline-trigger: url: nats://nats.ai-ml.svc.cluster.local:4222 subject: ai.pipeline.trigger jsonBody: true argo-trigger: url: nats://nats.ai-ml.svc.cluster.local:4222 subject: ai.argo.trigger jsonBody: true kfp-trigger: url: nats://nats.ai-ml.svc.cluster.local:4222 subject: ai.kfp.trigger jsonBody: true --- # Sensor for handling KFP completion events apiVersion: argoproj.io/v1alpha1 kind: Sensor metadata: name: kfp-completion-sensor namespace: ai-ml labels: app.kubernetes.io/name: kfp-completion-sensor app.kubernetes.io/part-of: ai-ml-pipelines spec: dependencies: - name: kfp-success eventSourceName: kfp-events eventName: kfp-completion filters: data: - path: body.status type: string value: - "SUCCEEDED" - name: kfp-failure eventSourceName: kfp-events eventName: kfp-failure triggers: # On KFP success, publish to NATS - template: name: notify-kfp-success nats: url: nats://nats.ai-ml.svc.cluster.local:4222 subject: ai.pipeline.status.completed payload: - src: dependencyName: kfp-success dataKey: body.run_id dest: run_id - src: dependencyName: kfp-success dataKey: body.pipeline_name dest: pipeline_name - src: dependencyName: kfp-success dataKey: body.status dest: status retryStrategy: steps: 3 # On KFP failure, trigger recovery workflow - template: name: kfp-failure-recovery k8s: operation: create source: resource: apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: kfp-failure-handler- namespace: ai-ml spec: entrypoint: notify-failure arguments: parameters: - name: run-id - name: pipeline-name - name: error-message templates: - name: notify-failure inputs: parameters: - name: run-id - name: pipeline-name - name: error-message script: image: python:3.13-slim command: [python] source: | import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "nats-py"]) import asyncio import json import nats async def notify(): nc = await nats.connect("nats://nats.ai-ml.svc.cluster.local:4222") await nc.publish( "ai.pipeline.status.failed", json.dumps({ "run_id": "{{inputs.parameters.run-id}}", "pipeline_name": "{{inputs.parameters.pipeline-name}}", "error": "{{inputs.parameters.error-message}}", "source": "kubeflow" }).encode() ) await nc.close() asyncio.run(notify()) parameters: - src: dependencyName: kfp-failure dataKey: body.run_id dest: spec.arguments.parameters.0.value - src: dependencyName: kfp-failure dataKey: body.pipeline_name dest: spec.arguments.parameters.1.value - src: dependencyName: kfp-failure dataKey: body.error dest: spec.arguments.parameters.2.value retryStrategy: steps: 3 --- # Sensor for NATS-triggered Argo Workflows apiVersion: argoproj.io/v1alpha1 kind: Sensor metadata: name: nats-argo-sensor namespace: ai-ml labels: app.kubernetes.io/name: nats-argo-sensor app.kubernetes.io/part-of: ai-ml-pipelines spec: dependencies: - name: argo-trigger eventSourceName: kfp-events eventName: argo-trigger triggers: - template: name: trigger-argo-workflow k8s: operation: create source: resource: apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: nats-triggered- namespace: ai-ml spec: workflowTemplateRef: name: placeholder arguments: parameters: [] parameters: - src: dependencyName: argo-trigger dataKey: body.template dest: spec.workflowTemplateRef.name - src: dependencyName: argo-trigger dataKey: body.parameters dest: spec.arguments.parameters retryStrategy: steps: 3 --- # Sensor for NATS-triggered KFP Pipelines apiVersion: argoproj.io/v1alpha1 kind: Sensor metadata: name: nats-kfp-sensor namespace: ai-ml labels: app.kubernetes.io/name: nats-kfp-sensor app.kubernetes.io/part-of: ai-ml-pipelines spec: dependencies: - name: kfp-trigger eventSourceName: kfp-events eventName: kfp-trigger triggers: # Trigger KFP via Argo Workflow (uses kfp-trigger template) - template: name: trigger-kfp-via-argo k8s: operation: create source: resource: apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: kfp-via-nats- namespace: ai-ml spec: workflowTemplateRef: name: kfp-trigger arguments: parameters: - name: pipeline-id value: "" - name: pipeline-params value: "{}" - name: wait-for-completion value: "true" parameters: - src: dependencyName: kfp-trigger dataKey: body.pipeline_id dest: spec.arguments.parameters.0.value - src: dependencyName: kfp-trigger dataKey: body.parameters dest: spec.arguments.parameters.1.value operation: "stringify" - src: dependencyName: kfp-trigger dataKey: body.wait dest: spec.arguments.parameters.2.value operation: "stringify" retryStrategy: steps: 3 --- # Service for the EventSource webhook apiVersion: v1 kind: Service metadata: name: kfp-events-webhook namespace: ai-ml labels: app.kubernetes.io/name: kfp-events app.kubernetes.io/part-of: ai-ml-pipelines spec: selector: eventsource-name: kfp-events ports: - name: webhook port: 12000 targetPort: 12000