#!/usr/bin/env python3 """ Pipeline Bridge Service (Refactored) Bridges NATS events to workflow engines using handler-base: 1. Listen for pipeline triggers on "ai.pipeline.trigger" 2. Submit to Kubeflow Pipelines or Argo Workflows 3. Monitor execution and publish status updates 4. Publish completion to "ai.pipeline.status.{request_id}" """ import logging from typing import Any, Optional from datetime import datetime import httpx from nats.aio.msg import Msg from handler_base import Handler, Settings from handler_base.telemetry import create_span logger = logging.getLogger("pipeline-bridge") class PipelineSettings(Settings): """Pipeline bridge specific settings.""" service_name: str = "pipeline-bridge" # Kubeflow Pipelines kubeflow_host: str = "http://ml-pipeline.kubeflow.svc.cluster.local:8888" # Argo Workflows argo_host: str = "http://argo-server.argo.svc.cluster.local:2746" argo_namespace: str = "ai-ml" # Pipeline definitions PIPELINES = { "document-ingestion": { "engine": "argo", "template": "document-ingestion", "description": "Ingest documents into Milvus vector database", }, "batch-inference": { "engine": "argo", "template": "batch-inference", "description": "Run batch LLM inference on a dataset", }, "rag-query": { "engine": "kubeflow", "pipeline_id": "rag-pipeline", "description": "Execute RAG query pipeline", }, "voice-pipeline": { "engine": "kubeflow", "pipeline_id": "voice-pipeline", "description": "Full voice assistant pipeline", }, "model-evaluation": { "engine": "argo", "template": "model-evaluation", "description": "Evaluate model performance", }, } class PipelineBridge(Handler): """ Pipeline trigger handler. Request format: { "request_id": "uuid", "pipeline": "document-ingestion", "parameters": {"key": "value"} } Response format: { "request_id": "uuid", "status": "submitted", "run_id": "workflow-run-id", "engine": "argo|kubeflow" } """ def __init__(self): self.pipeline_settings = PipelineSettings() super().__init__( subject="ai.pipeline.trigger", settings=self.pipeline_settings, queue_group="pipeline-bridges", ) self._http: Optional[httpx.AsyncClient] = None async def setup(self) -> None: """Initialize HTTP client.""" logger.info("Initializing pipeline bridge...") self._http = httpx.AsyncClient(timeout=60.0) logger.info(f"Pipeline bridge ready. Available pipelines: {list(PIPELINES.keys())}") async def teardown(self) -> None: """Clean up HTTP client.""" if self._http: await self._http.aclose() logger.info("Pipeline bridge closed") async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]: """Handle pipeline trigger request.""" request_id = data.get("request_id", "unknown") pipeline_name = data.get("pipeline", "") parameters = data.get("parameters", {}) logger.info(f"Triggering pipeline '{pipeline_name}' for request {request_id}") with create_span("pipeline.trigger") as span: if span: span.set_attribute("request.id", request_id) span.set_attribute("pipeline.name", pipeline_name) # Validate pipeline if pipeline_name not in PIPELINES: error = f"Unknown pipeline: {pipeline_name}" logger.error(error) return { "request_id": request_id, "status": "error", "error": error, "available_pipelines": list(PIPELINES.keys()), } pipeline = PIPELINES[pipeline_name] engine = pipeline["engine"] try: if engine == "argo": run_id = await self._submit_argo( pipeline["template"], parameters, request_id ) else: run_id = await self._submit_kubeflow( pipeline["pipeline_id"], parameters, request_id ) result = { "request_id": request_id, "status": "submitted", "run_id": run_id, "engine": engine, "pipeline": pipeline_name, "submitted_at": datetime.utcnow().isoformat(), } # Publish status update await self.nats.publish( f"ai.pipeline.status.{request_id}", result ) logger.info(f"Pipeline {pipeline_name} submitted: {run_id}") return result except Exception as e: logger.exception(f"Failed to submit pipeline {pipeline_name}") return { "request_id": request_id, "status": "error", "error": str(e), } async def _submit_argo( self, template: str, parameters: dict, request_id: str ) -> str: """Submit workflow to Argo Workflows.""" with create_span("pipeline.submit.argo") as span: if span: span.set_attribute("argo.template", template) workflow = { "apiVersion": "argoproj.io/v1alpha1", "kind": "Workflow", "metadata": { "generateName": f"{template}-", "namespace": self.pipeline_settings.argo_namespace, "labels": { "request-id": request_id, }, }, "spec": { "workflowTemplateRef": {"name": template}, "arguments": { "parameters": [ {"name": k, "value": str(v)} for k, v in parameters.items() ] }, }, } response = await self._http.post( f"{self.pipeline_settings.argo_host}/api/v1/workflows/{self.pipeline_settings.argo_namespace}", json={"workflow": workflow}, ) response.raise_for_status() result = response.json() return result["metadata"]["name"] async def _submit_kubeflow( self, pipeline_id: str, parameters: dict, request_id: str ) -> str: """Submit run to Kubeflow Pipelines.""" with create_span("pipeline.submit.kubeflow") as span: if span: span.set_attribute("kubeflow.pipeline_id", pipeline_id) run_request = { "name": f"{pipeline_id}-{request_id[:8]}", "pipeline_spec": { "pipeline_id": pipeline_id, "parameters": [ {"name": k, "value": str(v)} for k, v in parameters.items() ], }, } response = await self._http.post( f"{self.pipeline_settings.kubeflow_host}/apis/v1beta1/runs", json=run_request, ) response.raise_for_status() result = response.json() return result["run"]["id"] if __name__ == "__main__": PipelineBridge().run()