feat: add MLflow tracking to evaluation pipeline

- Add create_mlflow_run and log_evaluation_to_mlflow KFP components
- Log accuracy, correct/total counts, pass/fail to MLflow experiment
- Upload evaluation_results.json as artifact
- Wire MLflow run into pipeline DAG before NATS publish
This commit is contained in:
2026-02-12 06:15:13 -05:00
parent bd8c8616d0
commit cee21f124c

View File

@@ -4,15 +4,98 @@ Model Evaluation Pipeline - Kubeflow Pipelines SDK
Evaluates fine-tuned models against benchmarks. Evaluates fine-tuned models against benchmarks.
Integrates with Argo Workflows for automated model deployment. Integrates with Argo Workflows for automated model deployment.
Logs evaluation results to MLflow for experiment tracking.
Usage: Usage:
pip install kfp==2.12.1 pip install kfp==2.12.1 mlflow boto3 psycopg2-binary
python evaluation_pipeline.py python evaluation_pipeline.py
""" """
from kfp import dsl from kfp import dsl
from kfp import compiler from kfp import compiler
from typing import Dict from typing import Dict, List, Any, NamedTuple
# ---- MLflow KFP components (inline to avoid external dep) ----
MLFLOW_IMAGE = "python:3.13-slim"
MLFLOW_PACKAGES = ["mlflow>=2.10.0", "boto3", "psycopg2-binary"]
@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES)
def create_mlflow_run(
experiment_name: str,
run_name: str,
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
tags: Dict[str, str] = None,
params: Dict[str, str] = None,
) -> NamedTuple('RunInfo', [('run_id', str), ('experiment_id', str)]):
"""Create an MLflow run and return the run_id."""
import os, mlflow
from mlflow.tracking import MlflowClient
from collections import namedtuple
mlflow.set_tracking_uri(mlflow_tracking_uri)
client = MlflowClient()
exp = client.get_experiment_by_name(experiment_name)
experiment_id = exp.experiment_id if exp else client.create_experiment(
name=experiment_name, artifact_location=f"/mlflow/artifacts/{experiment_name}"
)
default_tags = {
"pipeline.type": "evaluation",
"kfp.run_id": os.environ.get("KFP_RUN_ID", "unknown"),
}
if tags:
default_tags.update(tags)
run = mlflow.start_run(experiment_id=experiment_id, run_name=run_name, tags=default_tags)
if params:
mlflow.log_params(params)
run_id = run.info.run_id
mlflow.end_run()
RunInfo = namedtuple('RunInfo', ['run_id', 'experiment_id'])
return RunInfo(run_id, experiment_id)
@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES)
def log_evaluation_to_mlflow(
run_id: str,
model_name: str,
dataset_name: str,
metrics: dict,
results: list,
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
) -> str:
"""Log evaluation metrics, params, and sample results to MLflow."""
import json, tempfile, mlflow
from mlflow.tracking import MlflowClient
from pathlib import Path
mlflow.set_tracking_uri(mlflow_tracking_uri)
client = MlflowClient()
client.log_param(run_id, "eval.model_name", model_name)
client.log_param(run_id, "eval.dataset", dataset_name)
for key, value in metrics.items():
client.log_metric(run_id, f"eval.{key}", float(value))
if results:
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "evaluation_results.json"
path.write_text(json.dumps(results, indent=2))
client.log_artifact(run_id, str(path))
passed = metrics.get("pass", metrics.get("accuracy", 0) >= 0.7)
client.set_tag(run_id, "eval.passed", str(passed))
client.set_tag(run_id, "model.name", model_name)
# End the run
client.set_terminated(run_id, status="FINISHED")
return run_id
@dsl.component( @dsl.component(
@@ -175,6 +258,18 @@ def model_evaluation_pipeline(
sample_limit: Maximum samples to evaluate sample_limit: Maximum samples to evaluate
""" """
# Create MLflow run for this evaluation
mlflow_run = create_mlflow_run(
experiment_name="model-evaluation",
run_name=f"eval-{model_name}-{dataset_name}",
params={
"model_endpoint": model_endpoint,
"model_name": model_name,
"dataset_name": dataset_name,
"sample_limit": str(sample_limit),
},
)
# Load dataset # Load dataset
load_task = load_eval_dataset( load_task = load_eval_dataset(
dataset_name=dataset_name, dataset_name=dataset_name,
@@ -193,11 +288,21 @@ def model_evaluation_pipeline(
# Calculate metrics # Calculate metrics
metrics_task = calculate_metrics(results=inference_task.output) metrics_task = calculate_metrics(results=inference_task.output)
# Publish results # Log results to MLflow
log_task = log_evaluation_to_mlflow(
run_id=mlflow_run.outputs["run_id"],
model_name=model_name,
dataset_name=dataset_name,
metrics=metrics_task.output,
results=inference_task.output,
)
# Publish results to NATS
publish_task = publish_results( publish_task = publish_results(
metrics=metrics_task.output, metrics=metrics_task.output,
model_name=model_name model_name=model_name
) )
publish_task.after(log_task)
if __name__ == "__main__": if __name__ == "__main__":