From ca5bef9664bc67dc605eb7200f11a100d2a7b1c9 Mon Sep 17 00:00:00 2001 From: "Billy D." Date: Fri, 13 Feb 2026 11:05:26 -0500 Subject: [PATCH] style: apply ruff format to all files --- mlflow_utils/cli.py | 20 ++------- mlflow_utils/client.py | 50 +++++---------------- mlflow_utils/experiment_comparison.py | 20 +++------ mlflow_utils/inference_tracker.py | 56 ++++++++---------------- mlflow_utils/kfp_components.py | 62 ++++++++------------------- mlflow_utils/model_registry.py | 46 +++++++++----------- mlflow_utils/tracker.py | 57 ++++++------------------ 7 files changed, 89 insertions(+), 222 deletions(-) diff --git a/mlflow_utils/cli.py b/mlflow_utils/cli.py index fd2362e..8c4ae19 100644 --- a/mlflow_utils/cli.py +++ b/mlflow_utils/cli.py @@ -63,10 +63,7 @@ def cmd_list_experiments(args): def cmd_compare(args): """Compare recent runs in an experiment.""" - analyzer = ExperimentAnalyzer( - args.experiment, - tracking_uri=args.tracking_uri - ) + analyzer = ExperimentAnalyzer(args.experiment, tracking_uri=args.tracking_uri) if args.run_ids: run_ids = args.run_ids.split(",") @@ -82,10 +79,7 @@ def cmd_compare(args): def cmd_best(args): """Find the best run by a metric.""" - analyzer = ExperimentAnalyzer( - args.experiment, - tracking_uri=args.tracking_uri - ) + analyzer = ExperimentAnalyzer(args.experiment, tracking_uri=args.tracking_uri) best_run = analyzer.get_best_run( metric=args.metric, @@ -115,10 +109,7 @@ def cmd_best(args): def cmd_summary(args): """Get metrics summary for an experiment.""" - analyzer = ExperimentAnalyzer( - args.experiment, - tracking_uri=args.tracking_uri - ) + analyzer = ExperimentAnalyzer(args.experiment, tracking_uri=args.tracking_uri) summary = analyzer.get_metrics_summary( hours=args.hours, @@ -201,10 +192,7 @@ def cmd_promote(args): def cmd_query(args): """Query runs with a filter.""" - analyzer = ExperimentAnalyzer( - args.experiment, - tracking_uri=args.tracking_uri - ) + analyzer = ExperimentAnalyzer(args.experiment, tracking_uri=args.tracking_uri) runs = analyzer.search_runs( filter_string=args.filter or "", diff --git a/mlflow_utils/client.py b/mlflow_utils/client.py index 290c87d..1aa5288 100644 --- a/mlflow_utils/client.py +++ b/mlflow_utils/client.py @@ -22,41 +22,24 @@ class MLflowConfig: # Tracking server URIs tracking_uri: str = field( - default_factory=lambda: os.environ.get( - "MLFLOW_TRACKING_URI", - "http://mlflow.mlflow.svc.cluster.local:80" - ) + default_factory=lambda: os.environ.get("MLFLOW_TRACKING_URI", "http://mlflow.mlflow.svc.cluster.local:80") ) external_uri: str = field( - default_factory=lambda: os.environ.get( - "MLFLOW_EXTERNAL_URI", - "https://mlflow.lab.daviestechlabs.io" - ) + default_factory=lambda: os.environ.get("MLFLOW_EXTERNAL_URI", "https://mlflow.lab.daviestechlabs.io") ) # Artifact storage (NFS PVC mount) artifact_location: str = field( - default_factory=lambda: os.environ.get( - "MLFLOW_ARTIFACT_LOCATION", - "/mlflow/artifacts" - ) + default_factory=lambda: os.environ.get("MLFLOW_ARTIFACT_LOCATION", "/mlflow/artifacts") ) # Default experiment settings default_experiment: str = field( - default_factory=lambda: os.environ.get( - "MLFLOW_DEFAULT_EXPERIMENT", - "llm-workflows" - ) + default_factory=lambda: os.environ.get("MLFLOW_DEFAULT_EXPERIMENT", "llm-workflows") ) # Service identification - service_name: str = field( - default_factory=lambda: os.environ.get( - "OTEL_SERVICE_NAME", - "unknown-service" - ) - ) + service_name: str = field(default_factory=lambda: os.environ.get("OTEL_SERVICE_NAME", "unknown-service")) # Additional tags to add to all runs default_tags: Dict[str, str] = field(default_factory=dict) @@ -85,10 +68,7 @@ def get_tracking_uri(external: bool = False) -> str: return config.external_uri if external else config.tracking_uri -def get_mlflow_client( - tracking_uri: Optional[str] = None, - configure_global: bool = True -) -> MlflowClient: +def get_mlflow_client(tracking_uri: Optional[str] = None, configure_global: bool = True) -> MlflowClient: """ Get a configured MLflow client. @@ -110,9 +90,7 @@ def get_mlflow_client( def ensure_experiment( - experiment_name: str, - artifact_location: Optional[str] = None, - tags: Optional[Dict[str, str]] = None + experiment_name: str, artifact_location: Optional[str] = None, tags: Optional[Dict[str, str]] = None ) -> str: """ Ensure an experiment exists, creating it if necessary. @@ -134,11 +112,7 @@ def ensure_experiment( if experiment is None: # Create the experiment artifact_loc = artifact_location or f"{config.artifact_location}/{experiment_name}" - experiment_id = client.create_experiment( - name=experiment_name, - artifact_location=artifact_loc, - tags=tags or {} - ) + experiment_id = client.create_experiment(name=experiment_name, artifact_location=artifact_loc, tags=tags or {}) logger.info(f"Created experiment '{experiment_name}' with ID: {experiment_id}") else: experiment_id = experiment.experiment_id @@ -148,9 +122,7 @@ def ensure_experiment( def get_or_create_registered_model( - model_name: str, - description: Optional[str] = None, - tags: Optional[Dict[str, str]] = None + model_name: str, description: Optional[str] = None, tags: Optional[Dict[str, str]] = None ) -> str: """ Get or create a registered model in the Model Registry. @@ -172,9 +144,7 @@ def get_or_create_registered_model( except mlflow.exceptions.MlflowException: # Create the model client.create_registered_model( - name=model_name, - description=description or f"Model for {model_name}", - tags=tags or {} + name=model_name, description=description or f"Model for {model_name}", tags=tags or {} ) logger.info(f"Created registered model: {model_name}") diff --git a/mlflow_utils/experiment_comparison.py b/mlflow_utils/experiment_comparison.py index 87dca68..4e84f00 100644 --- a/mlflow_utils/experiment_comparison.py +++ b/mlflow_utils/experiment_comparison.py @@ -51,6 +51,7 @@ logger = logging.getLogger(__name__) @dataclass class RunComparison: """Comparison result for multiple MLflow runs.""" + run_ids: List[str] experiment_name: str @@ -111,6 +112,7 @@ class RunComparison: @dataclass class PromotionRecommendation: """Recommendation for model promotion.""" + model_name: str version: Optional[int] recommended: bool @@ -262,13 +264,9 @@ class ExperimentAnalyzer: # Metadata comparison.run_names[run_id] = run.info.run_name or run_id[:8] - comparison.start_times[run_id] = datetime.fromtimestamp( - run.info.start_time / 1000 - ) + comparison.start_times[run_id] = datetime.fromtimestamp(run.info.start_time / 1000) if run.info.end_time: - comparison.durations[run_id] = ( - run.info.end_time - run.info.start_time - ) / 1000 + comparison.durations[run_id] = (run.info.end_time - run.info.start_time) / 1000 # Metrics for key, value in run.data.metrics.items(): @@ -288,10 +286,7 @@ class ExperimentAnalyzer: continue # Determine if lower is better based on metric name - minimize = any( - term in metric_name.lower() - for term in ["latency", "error", "loss", "time"] - ) + minimize = any(term in metric_name.lower() for term in ["latency", "error", "loss", "time"]) if minimize: best_id = min(values.keys(), key=lambda k: values[k]) @@ -330,10 +325,7 @@ class ExperimentAnalyzer: ) # Filter to only runs that have the metric - runs_with_metric = [ - r for r in runs - if metric in r.data.metrics - ] + runs_with_metric = [r for r in runs if metric in r.data.metrics] return runs_with_metric[0] if runs_with_metric else None diff --git a/mlflow_utils/inference_tracker.py b/mlflow_utils/inference_tracker.py index 9feb871..ebb4d9c 100644 --- a/mlflow_utils/inference_tracker.py +++ b/mlflow_utils/inference_tracker.py @@ -29,6 +29,7 @@ logger = logging.getLogger(__name__) @dataclass class InferenceMetrics: """Metrics collected during an inference request.""" + request_id: str user_id: Optional[str] = None session_id: Optional[str] = None @@ -190,31 +191,22 @@ class InferenceMetricsTracker: # Initialize MLflow in thread pool to avoid blocking loop = asyncio.get_event_loop() - await loop.run_in_executor( - self._executor, - self._init_mlflow - ) + await loop.run_in_executor(self._executor, self._init_mlflow) if self.enable_batching: self._flush_task = asyncio.create_task(self._periodic_flush()) - logger.info( - f"InferenceMetricsTracker started for {self.service_name} " - f"(experiment: {self.experiment_name})" - ) + logger.info(f"InferenceMetricsTracker started for {self.service_name} (experiment: {self.experiment_name})") def _init_mlflow(self) -> None: """Initialize MLflow client and experiment (runs in thread pool).""" - self._client = get_mlflow_client( - tracking_uri=self.tracking_uri, - configure_global=True - ) + self._client = get_mlflow_client(tracking_uri=self.tracking_uri, configure_global=True) self._experiment_id = ensure_experiment( self.experiment_name, tags={ "service": self.service_name, "type": "inference-metrics", - } + }, ) async def stop(self) -> None: @@ -265,10 +257,7 @@ class InferenceMetricsTracker: else: # Immediate logging in thread pool loop = asyncio.get_event_loop() - await loop.run_in_executor( - self._executor, - partial(self._log_single_inference, metrics) - ) + await loop.run_in_executor(self._executor, partial(self._log_single_inference, metrics)) async def _periodic_flush(self) -> None: """Periodically flush batched metrics.""" @@ -287,10 +276,7 @@ class InferenceMetricsTracker: # Log in thread pool loop = asyncio.get_event_loop() - await loop.run_in_executor( - self._executor, - partial(self._log_batch, batch) - ) + await loop.run_in_executor(self._executor, partial(self._log_batch, batch)) def _log_single_inference(self, metrics: InferenceMetrics) -> None: """Log a single inference request to MLflow (runs in thread pool).""" @@ -302,7 +288,7 @@ class InferenceMetricsTracker: "service": self.service_name, "request_id": metrics.request_id, "type": "single-inference", - } + }, ): mlflow.log_params(metrics.as_params_dict()) mlflow.log_metrics(metrics.as_metrics_dict()) @@ -336,7 +322,7 @@ class InferenceMetricsTracker: "service": self.service_name, "type": "batch-inference", "batch_size": str(len(batch)), - } + }, ): # Log aggregate metrics mlflow.log_metrics(aggregates) @@ -352,12 +338,14 @@ class InferenceMetricsTracker: premium_count = sum(1 for m in batch if m.is_premium) error_count = sum(1 for m in batch if m.has_error) - mlflow.log_metrics({ - "rag_enabled_pct": rag_enabled_count / len(batch) * 100, - "streaming_pct": streaming_count / len(batch) * 100, - "premium_pct": premium_count / len(batch) * 100, - "error_rate": error_count / len(batch) * 100, - }) + mlflow.log_metrics( + { + "rag_enabled_pct": rag_enabled_count / len(batch) * 100, + "streaming_pct": streaming_count / len(batch) * 100, + "premium_pct": premium_count / len(batch) * 100, + "error_rate": error_count / len(batch) * 100, + } + ) # Log model distribution model_counts: Dict[str, int] = defaultdict(int) @@ -366,20 +354,14 @@ class InferenceMetricsTracker: model_counts[m.model_name] += 1 if model_counts: - mlflow.log_dict( - {"models": dict(model_counts)}, - "model_distribution.json" - ) + mlflow.log_dict({"models": dict(model_counts)}, "model_distribution.json") logger.info(f"Logged batch of {len(batch)} inference metrics") except Exception as e: logger.error(f"Failed to log batch metrics: {e}") - def _calculate_aggregates( - self, - batch: List[InferenceMetrics] - ) -> Dict[str, float]: + def _calculate_aggregates(self, batch: List[InferenceMetrics]) -> Dict[str, float]: """Calculate aggregate statistics from a batch of metrics.""" import statistics diff --git a/mlflow_utils/kfp_components.py b/mlflow_utils/kfp_components.py index 7048f55..152642c 100644 --- a/mlflow_utils/kfp_components.py +++ b/mlflow_utils/kfp_components.py @@ -47,17 +47,14 @@ MLFLOW_PACKAGES = [ ] -@dsl.component( - base_image=MLFLOW_IMAGE, - packages_to_install=MLFLOW_PACKAGES -) +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) def create_mlflow_run( experiment_name: str, run_name: str, mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", tags: Dict[str, str] = None, params: Dict[str, str] = None, -) -> NamedTuple('RunInfo', [('run_id', str), ('experiment_id', str), ('artifact_uri', str)]): +) -> NamedTuple("RunInfo", [("run_id", str), ("experiment_id", str), ("artifact_uri", str)]): """ Create a new MLflow run for the pipeline. @@ -90,8 +87,7 @@ def create_mlflow_run( experiment = client.get_experiment_by_name(experiment_name) if experiment is None: experiment_id = client.create_experiment( - name=experiment_name, - artifact_location=f"/mlflow/artifacts/{experiment_name}" + name=experiment_name, artifact_location=f"/mlflow/artifacts/{experiment_name}" ) else: experiment_id = experiment.experiment_id @@ -122,14 +118,11 @@ def create_mlflow_run( # End run (KFP components are isolated, we'll resume in other components) mlflow.end_run() - RunInfo = namedtuple('RunInfo', ['run_id', 'experiment_id', 'artifact_uri']) + RunInfo = namedtuple("RunInfo", ["run_id", "experiment_id", "artifact_uri"]) return RunInfo(run_id, experiment_id, artifact_uri) -@dsl.component( - base_image=MLFLOW_IMAGE, - packages_to_install=MLFLOW_PACKAGES -) +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) def log_params_component( run_id: str, params: Dict[str, str], @@ -158,10 +151,7 @@ def log_params_component( return run_id -@dsl.component( - base_image=MLFLOW_IMAGE, - packages_to_install=MLFLOW_PACKAGES -) +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) def log_metrics_component( run_id: str, metrics: Dict[str, float], @@ -192,10 +182,7 @@ def log_metrics_component( return run_id -@dsl.component( - base_image=MLFLOW_IMAGE, - packages_to_install=MLFLOW_PACKAGES -) +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) def log_artifact_component( run_id: str, artifact_path: str, @@ -225,10 +212,7 @@ def log_artifact_component( return run_id -@dsl.component( - base_image=MLFLOW_IMAGE, - packages_to_install=MLFLOW_PACKAGES -) +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) def log_dict_artifact( run_id: str, data: Dict[str, Any], @@ -258,23 +242,20 @@ def log_dict_artifact( client = MlflowClient() # Ensure .json extension - if not filename.endswith('.json'): - filename += '.json' + if not filename.endswith(".json"): + filename += ".json" # Write to temp file and log with tempfile.TemporaryDirectory() as tmpdir: filepath = Path(tmpdir) / filename - with open(filepath, 'w') as f: + with open(filepath, "w") as f: json.dump(data, f, indent=2) client.log_artifact(run_id, str(filepath)) return run_id -@dsl.component( - base_image=MLFLOW_IMAGE, - packages_to_install=MLFLOW_PACKAGES -) +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) def end_mlflow_run( run_id: str, status: str = "FINISHED", @@ -310,10 +291,7 @@ def end_mlflow_run( return run_id -@dsl.component( - base_image=MLFLOW_IMAGE, - packages_to_install=MLFLOW_PACKAGES + ["httpx"] -) +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES + ["httpx"]) def log_training_metrics( run_id: str, model_type: str, @@ -370,7 +348,7 @@ def log_training_metrics( # Log full config as artifact with tempfile.TemporaryDirectory() as tmpdir: config_path = Path(tmpdir) / "training_config.json" - with open(config_path, 'w') as f: + with open(config_path, "w") as f: json.dump(training_config, f, indent=2) client.log_artifact(run_id, str(config_path)) @@ -382,10 +360,7 @@ def log_training_metrics( return run_id -@dsl.component( - base_image=MLFLOW_IMAGE, - packages_to_install=MLFLOW_PACKAGES -) +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) def log_document_ingestion_metrics( run_id: str, source_url: str, @@ -452,10 +427,7 @@ def log_document_ingestion_metrics( return run_id -@dsl.component( - base_image=MLFLOW_IMAGE, - packages_to_install=MLFLOW_PACKAGES -) +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) def log_evaluation_results( run_id: str, model_name: str, @@ -502,7 +474,7 @@ def log_evaluation_results( if sample_results: with tempfile.TemporaryDirectory() as tmpdir: results_path = Path(tmpdir) / "evaluation_results.json" - with open(results_path, 'w') as f: + with open(results_path, "w") as f: json.dump(sample_results, f, indent=2) client.log_artifact(run_id, str(results_path)) diff --git a/mlflow_utils/model_registry.py b/mlflow_utils/model_registry.py index b229a85..aa4d19c 100644 --- a/mlflow_utils/model_registry.py +++ b/mlflow_utils/model_registry.py @@ -192,7 +192,7 @@ def register_model_for_kserve( tags={ "model.type": model_type, "deployment.target": "kserve", - } + }, ) logger.info(f"Created registered model: {model_name}") @@ -206,13 +206,10 @@ def register_model_for_kserve( **(tags or {}), "model.type": model_type, **kserve_config.as_dict(), - } + }, ) - logger.info( - f"Registered model version {model_version.version} " - f"for {model_name} (type: {model_type})" - ) + logger.info(f"Registered model version {model_version.version} for {model_name} (type: {model_type})") return model_version @@ -412,25 +409,24 @@ def generate_kserve_manifest( if config.container_image: # Custom container - predictor["containers"] = [{ - "name": "predictor", - "image": config.container_image, - "ports": [{"containerPort": config.container_port, "protocol": "TCP"}], - "resources": { - "requests": { - "cpu": config.cpu_request, - "memory": config.memory_request, + predictor["containers"] = [ + { + "name": "predictor", + "image": config.container_image, + "ports": [{"containerPort": config.container_port, "protocol": "TCP"}], + "resources": { + "requests": { + "cpu": config.cpu_request, + "memory": config.memory_request, + }, + "limits": { + "cpu": config.cpu_limit, + "memory": config.memory_limit, + }, }, - "limits": { - "cpu": config.cpu_limit, - "memory": config.memory_limit, - }, - }, - "env": [ - {"name": k, "value": v} - for k, v in config.env_vars.items() - ], - }] + "env": [{"name": k, "value": v} for k, v in config.env_vars.items()], + } + ] # Add GPU if needed if config.gpu_count > 0: @@ -494,7 +490,7 @@ def generate_kserve_yaml( yaml_str = yaml.dump(manifest, default_flow_style=False, sort_keys=False) if output_path: - with open(output_path, 'w') as f: + with open(output_path, "w") as f: f.write(yaml_str) logger.info(f"Wrote KServe manifest to {output_path}") diff --git a/mlflow_utils/tracker.py b/mlflow_utils/tracker.py index 3438ebb..32e0853 100644 --- a/mlflow_utils/tracker.py +++ b/mlflow_utils/tracker.py @@ -23,6 +23,7 @@ logger = logging.getLogger(__name__) @dataclass class PipelineMetadata: """Metadata about the Kubeflow Pipeline run.""" + pipeline_name: str run_id: str run_name: Optional[str] = None @@ -30,12 +31,8 @@ class PipelineMetadata: namespace: str = "ai-ml" # KFP-specific metadata (populated from environment if available) - kfp_run_id: Optional[str] = field( - default_factory=lambda: os.environ.get("KFP_RUN_ID") - ) - kfp_pod_name: Optional[str] = field( - default_factory=lambda: os.environ.get("KFP_POD_NAME") - ) + kfp_run_id: Optional[str] = field(default_factory=lambda: os.environ.get("KFP_RUN_ID")) + kfp_pod_name: Optional[str] = field(default_factory=lambda: os.environ.get("KFP_POD_NAME")) def as_tags(self) -> Dict[str, str]: """Convert metadata to MLflow tags.""" @@ -142,10 +139,7 @@ class MLflowTracker: Yields: The MLflow run object """ - self.client = get_mlflow_client( - tracking_uri=self.tracking_uri, - configure_global=True - ) + self.client = get_mlflow_client(tracking_uri=self.tracking_uri, configure_global=True) # Ensure experiment exists experiment_id = ensure_experiment(self.experiment_name) @@ -163,8 +157,7 @@ class MLflowTracker: self.run_id = self.run.info.run_id logger.info( - f"Started MLflow run '{self.run_name}' " - f"(ID: {self.run_id}) in experiment '{self.experiment_name}'" + f"Started MLflow run '{self.run_name}' (ID: {self.run_id}) in experiment '{self.experiment_name}'" ) yield self.run @@ -214,11 +207,7 @@ class MLflowTracker: """Log a single parameter.""" self.log_params({key: value}) - def log_metrics( - self, - metrics: Dict[str, Union[float, int]], - step: Optional[int] = None - ) -> None: + def log_metrics(self, metrics: Dict[str, Union[float, int]], step: Optional[int] = None) -> None: """ Log metrics to the current run. @@ -233,20 +222,11 @@ class MLflowTracker: mlflow.log_metrics(metrics, step=step) logger.debug(f"Logged {len(metrics)} metrics") - def log_metric( - self, - key: str, - value: Union[float, int], - step: Optional[int] = None - ) -> None: + def log_metric(self, key: str, value: Union[float, int], step: Optional[int] = None) -> None: """Log a single metric.""" self.log_metrics({key: value}, step=step) - def log_artifact( - self, - local_path: str, - artifact_path: Optional[str] = None - ) -> None: + def log_artifact(self, local_path: str, artifact_path: Optional[str] = None) -> None: """ Log an artifact file to the current run. @@ -261,11 +241,7 @@ class MLflowTracker: mlflow.log_artifact(local_path, artifact_path) logger.info(f"Logged artifact: {local_path}") - def log_artifacts( - self, - local_dir: str, - artifact_path: Optional[str] = None - ) -> None: + def log_artifacts(self, local_dir: str, artifact_path: Optional[str] = None) -> None: """ Log all files in a directory as artifacts. @@ -280,12 +256,7 @@ class MLflowTracker: mlflow.log_artifacts(local_dir, artifact_path) logger.info(f"Logged artifacts from: {local_dir}") - def log_dict( - self, - data: Dict[str, Any], - filename: str, - artifact_path: Optional[str] = None - ) -> None: + def log_dict(self, data: Dict[str, Any], filename: str, artifact_path: Optional[str] = None) -> None: """ Log a dictionary as a JSON artifact. @@ -311,7 +282,7 @@ class MLflowTracker: model_name: str, model_path: Optional[str] = None, framework: str = "pytorch", - extra_info: Optional[Dict[str, Any]] = None + extra_info: Optional[Dict[str, Any]] = None, ) -> None: """ Log model information as parameters and tags. @@ -341,11 +312,7 @@ class MLflowTracker: mlflow.set_tag("model.name", model_name) def log_dataset_info( - self, - name: str, - source: str, - size: Optional[int] = None, - extra_info: Optional[Dict[str, Any]] = None + self, name: str, source: str, size: Optional[int] = None, extra_info: Optional[Dict[str, Any]] = None ) -> None: """ Log dataset information.