From cee21f124cbf1805871e86c2fe857bc47f12f5e6 Mon Sep 17 00:00:00 2001 From: "Billy D." Date: Thu, 12 Feb 2026 06:15:13 -0500 Subject: [PATCH] feat: add MLflow tracking to evaluation pipeline - Add create_mlflow_run and log_evaluation_to_mlflow KFP components - Log accuracy, correct/total counts, pass/fail to MLflow experiment - Upload evaluation_results.json as artifact - Wire MLflow run into pipeline DAG before NATS publish --- evaluation_pipeline.py | 111 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 108 insertions(+), 3 deletions(-) diff --git a/evaluation_pipeline.py b/evaluation_pipeline.py index 2397f2b..dfc00b8 100644 --- a/evaluation_pipeline.py +++ b/evaluation_pipeline.py @@ -4,15 +4,98 @@ Model Evaluation Pipeline - Kubeflow Pipelines SDK Evaluates fine-tuned models against benchmarks. Integrates with Argo Workflows for automated model deployment. +Logs evaluation results to MLflow for experiment tracking. Usage: - pip install kfp==2.12.1 + pip install kfp==2.12.1 mlflow boto3 psycopg2-binary python evaluation_pipeline.py """ from kfp import dsl from kfp import compiler -from typing import Dict +from typing import Dict, List, Any, NamedTuple + + +# ---- MLflow KFP components (inline to avoid external dep) ---- + +MLFLOW_IMAGE = "python:3.13-slim" +MLFLOW_PACKAGES = ["mlflow>=2.10.0", "boto3", "psycopg2-binary"] + + +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) +def create_mlflow_run( + experiment_name: str, + run_name: str, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", + tags: Dict[str, str] = None, + params: Dict[str, str] = None, +) -> NamedTuple('RunInfo', [('run_id', str), ('experiment_id', str)]): + """Create an MLflow run and return the run_id.""" + import os, mlflow + from mlflow.tracking import MlflowClient + from collections import namedtuple + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + exp = client.get_experiment_by_name(experiment_name) + experiment_id = exp.experiment_id if exp else client.create_experiment( + name=experiment_name, artifact_location=f"/mlflow/artifacts/{experiment_name}" + ) + + default_tags = { + "pipeline.type": "evaluation", + "kfp.run_id": os.environ.get("KFP_RUN_ID", "unknown"), + } + if tags: + default_tags.update(tags) + + run = mlflow.start_run(experiment_id=experiment_id, run_name=run_name, tags=default_tags) + if params: + mlflow.log_params(params) + run_id = run.info.run_id + mlflow.end_run() + + RunInfo = namedtuple('RunInfo', ['run_id', 'experiment_id']) + return RunInfo(run_id, experiment_id) + + +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) +def log_evaluation_to_mlflow( + run_id: str, + model_name: str, + dataset_name: str, + metrics: dict, + results: list, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +) -> str: + """Log evaluation metrics, params, and sample results to MLflow.""" + import json, tempfile, mlflow + from mlflow.tracking import MlflowClient + from pathlib import Path + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + client.log_param(run_id, "eval.model_name", model_name) + client.log_param(run_id, "eval.dataset", dataset_name) + + for key, value in metrics.items(): + client.log_metric(run_id, f"eval.{key}", float(value)) + + if results: + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "evaluation_results.json" + path.write_text(json.dumps(results, indent=2)) + client.log_artifact(run_id, str(path)) + + passed = metrics.get("pass", metrics.get("accuracy", 0) >= 0.7) + client.set_tag(run_id, "eval.passed", str(passed)) + client.set_tag(run_id, "model.name", model_name) + + # End the run + client.set_terminated(run_id, status="FINISHED") + return run_id @dsl.component( @@ -175,6 +258,18 @@ def model_evaluation_pipeline( sample_limit: Maximum samples to evaluate """ + # Create MLflow run for this evaluation + mlflow_run = create_mlflow_run( + experiment_name="model-evaluation", + run_name=f"eval-{model_name}-{dataset_name}", + params={ + "model_endpoint": model_endpoint, + "model_name": model_name, + "dataset_name": dataset_name, + "sample_limit": str(sample_limit), + }, + ) + # Load dataset load_task = load_eval_dataset( dataset_name=dataset_name, @@ -193,11 +288,21 @@ def model_evaluation_pipeline( # Calculate metrics metrics_task = calculate_metrics(results=inference_task.output) - # Publish results + # Log results to MLflow + log_task = log_evaluation_to_mlflow( + run_id=mlflow_run.outputs["run_id"], + model_name=model_name, + dataset_name=dataset_name, + metrics=metrics_task.output, + results=inference_task.output, + ) + + # Publish results to NATS publish_task = publish_results( metrics=metrics_task.output, model_name=model_name ) + publish_task.after(log_task) if __name__ == "__main__":