fix: ruff formatting and allow-direct-references for handler-base dep
All checks were successful
CI / Lint (push) Successful in 40s
CI / Test (push) Successful in 43s
CI / Release (push) Successful in 5s
CI / Notify (push) Successful in 1s

This commit is contained in:
2026-02-02 08:44:51 -05:00
parent 7c7a147db6
commit 49c804e234
4 changed files with 82 additions and 90 deletions

View File

@@ -8,6 +8,7 @@ Bridges NATS events to workflow engines using handler-base:
3. Monitor execution and publish status updates
4. Publish completion to "ai.pipeline.status.{request_id}"
"""
import logging
from typing import Any, Optional
from datetime import datetime
@@ -23,12 +24,12 @@ logger = logging.getLogger("pipeline-bridge")
class PipelineSettings(Settings):
"""Pipeline bridge specific settings."""
service_name: str = "pipeline-bridge"
# Kubeflow Pipelines
kubeflow_host: str = "http://ml-pipeline.kubeflow.svc.cluster.local:8888"
# Argo Workflows
argo_host: str = "http://argo-server.argo.svc.cluster.local:2746"
argo_namespace: str = "ai-ml"
@@ -67,14 +68,14 @@ PIPELINES = {
class PipelineBridge(Handler):
"""
Pipeline trigger handler.
Request format:
{
"request_id": "uuid",
"pipeline": "document-ingestion",
"parameters": {"key": "value"}
}
Response format:
{
"request_id": "uuid",
@@ -83,7 +84,7 @@ class PipelineBridge(Handler):
"engine": "argo|kubeflow"
}
"""
def __init__(self):
self.pipeline_settings = PipelineSettings()
super().__init__(
@@ -91,36 +92,36 @@ class PipelineBridge(Handler):
settings=self.pipeline_settings,
queue_group="pipeline-bridges",
)
self._http: Optional[httpx.AsyncClient] = None
async def setup(self) -> None:
"""Initialize HTTP client."""
logger.info("Initializing pipeline bridge...")
self._http = httpx.AsyncClient(timeout=60.0)
logger.info(f"Pipeline bridge ready. Available pipelines: {list(PIPELINES.keys())}")
async def teardown(self) -> None:
"""Clean up HTTP client."""
if self._http:
await self._http.aclose()
logger.info("Pipeline bridge closed")
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
"""Handle pipeline trigger request."""
request_id = data.get("request_id", "unknown")
pipeline_name = data.get("pipeline", "")
parameters = data.get("parameters", {})
logger.info(f"Triggering pipeline '{pipeline_name}' for request {request_id}")
with create_span("pipeline.trigger") as span:
if span:
span.set_attribute("request.id", request_id)
span.set_attribute("pipeline.name", pipeline_name)
# Validate pipeline
if pipeline_name not in PIPELINES:
error = f"Unknown pipeline: {pipeline_name}"
@@ -131,20 +132,18 @@ class PipelineBridge(Handler):
"error": error,
"available_pipelines": list(PIPELINES.keys()),
}
pipeline = PIPELINES[pipeline_name]
engine = pipeline["engine"]
try:
if engine == "argo":
run_id = await self._submit_argo(
pipeline["template"], parameters, request_id
)
run_id = await self._submit_argo(pipeline["template"], parameters, request_id)
else:
run_id = await self._submit_kubeflow(
pipeline["pipeline_id"], parameters, request_id
)
result = {
"request_id": request_id,
"status": "submitted",
@@ -153,15 +152,13 @@ class PipelineBridge(Handler):
"pipeline": pipeline_name,
"submitted_at": datetime.utcnow().isoformat(),
}
# Publish status update
await self.nats.publish(
f"ai.pipeline.status.{request_id}", result
)
await self.nats.publish(f"ai.pipeline.status.{request_id}", result)
logger.info(f"Pipeline {pipeline_name} submitted: {run_id}")
return result
except Exception as e:
logger.exception(f"Failed to submit pipeline {pipeline_name}")
return {
@@ -169,15 +166,13 @@ class PipelineBridge(Handler):
"status": "error",
"error": str(e),
}
async def _submit_argo(
self, template: str, parameters: dict, request_id: str
) -> str:
async def _submit_argo(self, template: str, parameters: dict, request_id: str) -> str:
"""Submit workflow to Argo Workflows."""
with create_span("pipeline.submit.argo") as span:
if span:
span.set_attribute("argo.template", template)
workflow = {
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
@@ -191,48 +186,40 @@ class PipelineBridge(Handler):
"spec": {
"workflowTemplateRef": {"name": template},
"arguments": {
"parameters": [
{"name": k, "value": str(v)}
for k, v in parameters.items()
]
"parameters": [{"name": k, "value": str(v)} for k, v in parameters.items()]
},
},
}
response = await self._http.post(
f"{self.pipeline_settings.argo_host}/api/v1/workflows/{self.pipeline_settings.argo_namespace}",
json={"workflow": workflow},
)
response.raise_for_status()
result = response.json()
return result["metadata"]["name"]
async def _submit_kubeflow(
self, pipeline_id: str, parameters: dict, request_id: str
) -> str:
async def _submit_kubeflow(self, pipeline_id: str, parameters: dict, request_id: str) -> str:
"""Submit run to Kubeflow Pipelines."""
with create_span("pipeline.submit.kubeflow") as span:
if span:
span.set_attribute("kubeflow.pipeline_id", pipeline_id)
run_request = {
"name": f"{pipeline_id}-{request_id[:8]}",
"pipeline_spec": {
"pipeline_id": pipeline_id,
"parameters": [
{"name": k, "value": str(v)}
for k, v in parameters.items()
],
"parameters": [{"name": k, "value": str(v)} for k, v in parameters.items()],
},
}
response = await self._http.post(
f"{self.pipeline_settings.kubeflow_host}/apis/v1beta1/runs",
json=run_request,
)
response.raise_for_status()
result = response.json()
return result["run"]["id"]