#!/usr/bin/env python3 """ Distributed CPU Fine-Tuning Pipeline – Kubeflow Pipelines SDK Trains a small language model (1B–8B) using LoRA on CPU only, distributed across multiple cluster nodes via KubeRay RayJob. GPUs remain 100 % dedicated to inference serving. Architecture: 1. Fetch PDFs from Quobjects S3 2. Prepare instruction-tuning dataset 3. Upload prepared data to S3 (shared storage for Ray workers) 4. Submit a KubeRay RayJob that runs Ray Train TorchTrainer with N CPU-only workers across cluster nodes 5. Download the trained adapter from S3 6. Sanity evaluation on CPU 7. Push adapter to Gitea 8. Log metrics to MLflow Scaling: Increase `num_workers` to spread training across more nodes. Each worker runs data-parallel LoRA training; gradients are synchronised via AllReduce (Gloo backend). Usage: pip install kfp==2.12.1 python cpu_training_pipeline.py # Upload cpu_training_pipeline.yaml to Kubeflow Pipelines UI Prerequisites in-cluster: - Secret mlpipeline-minio-artifact (namespace kubeflow) for S3 creds - Secret gitea-admin-secret (namespace gitea) for Gitea push - KubeRay operator installed (namespace ray-system) - MLflow tracking server (namespace mlflow) - RBAC: pipeline SA needs create/get/list/delete on rayjobs.ray.io and configmaps in the training namespace """ from kfp import compiler, dsl from typing import NamedTuple # ────────────────────────────────────────────────────────────── # 1. Fetch PDFs from Quobjects S3 # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=["boto3"], ) def fetch_pdfs_from_s3( s3_endpoint: str, s3_bucket: str, s3_prefix: str, aws_access_key_id: str, aws_secret_access_key: str, ) -> NamedTuple("PDFOutput", [("pdf_dir", str), ("num_files", int)]): """Download all PDFs from a Quobjects S3 bucket.""" import os import boto3 from botocore.client import Config out_dir = "/tmp/pdfs" os.makedirs(out_dir, exist_ok=True) client = boto3.client( "s3", endpoint_url=f"http://{s3_endpoint}", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name="us-east-1", config=Config(signature_version="s3v4"), ) paginator = client.get_paginator("list_objects_v2") count = 0 for page in paginator.paginate(Bucket=s3_bucket, Prefix=s3_prefix): for obj in page.get("Contents", []): key = obj["Key"] if key.lower().endswith(".pdf"): local_path = os.path.join(out_dir, os.path.basename(key)) print(f"Downloading: {key} → {local_path}") client.download_file(s3_bucket, key, local_path) count += 1 print(f"Downloaded {count} PDFs to {out_dir}") from collections import namedtuple return namedtuple("PDFOutput", ["pdf_dir", "num_files"])( pdf_dir=out_dir, num_files=count ) # ────────────────────────────────────────────────────────────── # 2. Extract text from PDFs → instruction-tuning dataset # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=["pymupdf"], ) def prepare_training_data( pdf_dir: str, max_seq_length: int = 2048, chunk_size: int = 512, chunk_overlap: int = 64, ) -> NamedTuple( "DataOutput", [("dataset_path", str), ("num_train", int), ("num_val", int)] ): """Extract text from PDFs, chunk it, and format as instruction-tuning pairs.""" import json import os import fitz # PyMuPDF out_dir = "/tmp/training_data" os.makedirs(out_dir, exist_ok=True) # 1. Extract text from all PDFs all_chunks: list[dict] = [] for fname in sorted(os.listdir(pdf_dir)): if not fname.lower().endswith(".pdf"): continue path = os.path.join(pdf_dir, fname) print(f"Extracting: {fname}") try: doc = fitz.open(path) full_text = "" for page in doc: full_text += page.get_text() + "\n" doc.close() except Exception as e: print(f" SKIP ({e})") continue # 2. Chunk text with overlap words = full_text.split() for i in range(0, len(words), chunk_size - chunk_overlap): chunk_words = words[i : i + chunk_size] if len(chunk_words) < 50: continue # skip tiny trailing chunks chunk_text = " ".join(chunk_words) all_chunks.append({"text": chunk_text, "source": fname}) print(f"Total chunks: {len(all_chunks)}") if not all_chunks: raise ValueError("No text extracted from PDFs — check your bucket") # 3. Format as chat training pairs (self-supervised continuation) samples = [] for chunk in all_chunks: text = chunk["text"] source = chunk["source"] words = text.split() mid = len(words) // 2 context = " ".join(words[:mid]) continuation = " ".join(words[mid:]) samples.append( { "messages": [ { "role": "system", "content": ( "You are a knowledgeable assistant. " "Continue the information accurately and coherently." ), }, { "role": "user", "content": f"Continue the following passage from {source}:\n\n{context}", }, {"role": "assistant", "content": continuation}, ] } ) # 4. Train/val split (90/10) import random random.seed(42) random.shuffle(samples) split = int(len(samples) * 0.9) train = samples[:split] val = samples[split:] train_path = os.path.join(out_dir, "train.json") val_path = os.path.join(out_dir, "val.json") with open(train_path, "w") as f: json.dump(train, f) with open(val_path, "w") as f: json.dump(val, f) print(f"Train: {len(train)} samples, Val: {len(val)} samples") from collections import namedtuple return namedtuple("DataOutput", ["dataset_path", "num_train", "num_val"])( dataset_path=out_dir, num_train=len(train), num_val=len(val) ) # ────────────────────────────────────────────────────────────── # 3. Upload prepared data to S3 (so Ray workers can access it) # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=["boto3"], ) def upload_data_to_s3( dataset_path: str, s3_endpoint: str, s3_bucket: str, aws_access_key_id: str, aws_secret_access_key: str, run_id: str = "", ) -> NamedTuple("UploadOutput", [("data_s3_prefix", str)]): """Upload train.json and val.json to S3 for distributed Ray workers.""" import os import uuid import boto3 from botocore.client import Config if not run_id: run_id = uuid.uuid4().hex[:8] prefix = f"ray-training-runs/{run_id}/data" client = boto3.client( "s3", endpoint_url=f"http://{s3_endpoint}", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name="us-east-1", config=Config(signature_version="s3v4"), ) for fname in ["train.json", "val.json"]: local = os.path.join(dataset_path, fname) key = f"{prefix}/{fname}" print(f"Uploading {local} → s3://{s3_bucket}/{key}") client.upload_file(local, s3_bucket, key) print(f"Data uploaded to s3://{s3_bucket}/{prefix}") from collections import namedtuple return namedtuple("UploadOutput", ["data_s3_prefix"])(data_s3_prefix=prefix) # ────────────────────────────────────────────────────────────── # Training script (embedded in the RayJob ConfigMap) # ────────────────────────────────────────────────────────────── _RAY_TRAIN_SCRIPT = '''\ #!/usr/bin/env python3 """Distributed LoRA training on CPU via Ray Train TorchTrainer.""" import json import os import boto3 import ray import torch from botocore.client import Config from datasets import Dataset from peft import LoraConfig, get_peft_model from ray.train import RunConfig, ScalingConfig from ray.train.huggingface.transformers import ( RayTrainReportCallback, prepare_trainer, ) from ray.train.torch import TorchTrainer from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments from trl import SFTTrainer def _s3_client(cfg): return boto3.client( "s3", endpoint_url=f"http://{cfg['s3_endpoint']}", aws_access_key_id=cfg["aws_access_key_id"], aws_secret_access_key=cfg["aws_secret_access_key"], region_name="us-east-1", config=Config(signature_version="s3v4"), ) def train_func(config): """Each Ray worker executes this function.""" import ray.train as rt # ── Download data from S3 (each worker independently) ── s3 = _s3_client(config) data_dir = "/tmp/training_data" os.makedirs(data_dir, exist_ok=True) prefix = config["data_s3_prefix"] bucket = config["s3_bucket"] for fname in ["train.json", "val.json"]: s3.download_file(bucket, f"{prefix}/{fname}", f"{data_dir}/{fname}") with open(f"{data_dir}/train.json") as f: train_data = json.load(f) with open(f"{data_dir}/val.json") as f: val_data = json.load(f) rank = rt.get_context().get_world_rank() world = rt.get_context().get_world_size() print(f"[worker {rank}/{world}] Loaded {len(train_data)} train / " f"{len(val_data)} val samples") # ── Tokenizer ────────────────────────────────────────── base_model = config["base_model"] tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.padding_side = "right" def format_chat(sample): return { "text": tokenizer.apply_chat_template( sample["messages"], tokenize=False, add_generation_prompt=False ) } train_ds = Dataset.from_list(train_data).map(format_chat) val_ds = Dataset.from_list(val_data).map(format_chat) # ── Model (float32, CPU) ─────────────────────────────── print(f"[worker {rank}] Loading {base_model} (float32, CPU)") model = AutoModelForCausalLM.from_pretrained( base_model, torch_dtype=torch.float32, device_map="cpu", trust_remote_code=True, ) lora_cfg = LoraConfig( r=config["lora_r"], lora_alpha=config["lora_alpha"], target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], lora_dropout=config["lora_dropout"], bias="none", task_type="CAUSAL_LM", ) model = get_peft_model(model, lora_cfg) model.print_trainable_parameters() # ── Training args (CPU-optimised) ────────────────────── args = TrainingArguments( output_dir="/tmp/checkpoints", num_train_epochs=config["num_epochs"], per_device_train_batch_size=config["batch_size"], per_device_eval_batch_size=config["batch_size"], gradient_accumulation_steps=config["gradient_accumulation_steps"], learning_rate=config["learning_rate"], bf16=False, fp16=False, logging_steps=10, eval_strategy="steps", eval_steps=100, save_strategy="no", # Ray handles checkpointing warmup_ratio=0.05, lr_scheduler_type="cosine", optim="adamw_torch", max_grad_norm=1.0, group_by_length=True, gradient_checkpointing=True, dataloader_num_workers=2, dataloader_pin_memory=False, no_cuda=True, report_to="none", ) # ── SFTTrainer + Ray integration ─────────────────────── trainer = SFTTrainer( model=model, args=args, train_dataset=train_ds, eval_dataset=val_ds, tokenizer=tokenizer, max_seq_length=config["max_seq_length"], dataset_text_field="text", packing=True, ) # Ray Train callback reports metrics; prepare_trainer wraps # the model with DDP and adds DistributedSampler trainer.add_callback(RayTrainReportCallback()) trainer = prepare_trainer(trainer) print(f"[worker {rank}] Starting distributed LoRA training …") result = trainer.train() eval_result = trainer.evaluate() train_loss = result.training_loss eval_loss = eval_result.get("eval_loss", 0.0) print(f"[worker {rank}] train_loss={train_loss:.4f} eval_loss={eval_loss:.4f}") # ── Rank 0: save adapter + upload to S3 ──────────────── if rank == 0: adapter_path = "/tmp/adapter" unwrapped = ( trainer.model.module if hasattr(trainer.model, "module") else trainer.model ) unwrapped.save_pretrained(adapter_path) tokenizer.save_pretrained(adapter_path) metadata = { "base_model": base_model, "train_loss": train_loss, "eval_loss": eval_loss, "training_device": "cpu", "distributed": True, "num_workers": world, "dtype": "float32", "lora_r": config["lora_r"], "lora_alpha": config["lora_alpha"], "lora_dropout": config["lora_dropout"], "learning_rate": config["learning_rate"], "num_epochs": config["num_epochs"], "batch_size": config["batch_size"], "gradient_accumulation_steps": config["gradient_accumulation_steps"], "max_seq_length": config["max_seq_length"], } with open(f"{adapter_path}/training_metadata.json", "w") as f: json.dump(metadata, f, indent=2) # Upload adapter to S3 adapter_prefix = config["adapter_s3_prefix"] for root, _, files in os.walk(adapter_path): for fname in files: fpath = os.path.join(root, fname) rel = os.path.relpath(fpath, adapter_path) key = f"{adapter_prefix}/{rel}" s3.upload_file(fpath, bucket, key) print(f" ✓ s3://{bucket}/{key}") # Results marker for the KFP step to read s3.put_object( Bucket=bucket, Key=f"{adapter_prefix}/results.json", Body=json.dumps(metadata), ) print(f"Adapter uploaded to s3://{bucket}/{adapter_prefix}") if __name__ == "__main__": cfg = json.loads(os.environ["TRAIN_CONFIG"]) num_workers = cfg.pop("num_workers", 4) cpus_per_worker = cfg.pop("cpus_per_worker", 4) ray.init() trainer = TorchTrainer( train_func, scaling_config=ScalingConfig( num_workers=num_workers, use_gpu=False, resources_per_worker={"CPU": cpus_per_worker}, ), run_config=RunConfig(name="cpu-lora-training"), train_loop_config=cfg, ) result = trainer.fit() print(f"Training complete: {result}") ''' # ────────────────────────────────────────────────────────────── # 4. Submit a KubeRay RayJob for distributed CPU training # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=["kubernetes", "boto3"], ) def submit_ray_training_job( data_s3_prefix: str, s3_endpoint: str, s3_bucket: str, aws_access_key_id: str, aws_secret_access_key: str, base_model: str, learning_rate: float, num_epochs: int, batch_size: int, gradient_accumulation_steps: int, max_seq_length: int, lora_r: int, lora_alpha: int, lora_dropout: float, num_workers: int = 4, cpus_per_worker: int = 4, memory_per_worker: str = "16Gi", ray_image: str = "rayproject/ray:2.44.1-py311-cpu", namespace: str = "kubeflow", poll_interval_seconds: int = 30, training_script: str = "", ) -> NamedTuple( "RayTrainOutput", [ ("adapter_s3_prefix", str), ("train_loss", float), ("eval_loss", float), ("num_workers_used", int), ], ): """Create a KubeRay RayJob for distributed CPU LoRA training and wait.""" import json import time import uuid import boto3 from botocore.client import Config from kubernetes import client as k8s_client from kubernetes import config as k8s_config # ── Build identifiers ─────────────────────────────────── run_id = uuid.uuid4().hex[:8] job_name = f"cpu-lora-{run_id}" cm_name = f"train-script-{run_id}" adapter_prefix = f"ray-training-runs/{run_id}/adapter" # ── Training config (passed as TRAIN_CONFIG env var) ──── train_config = json.dumps( { "base_model": base_model, "learning_rate": learning_rate, "num_epochs": num_epochs, "batch_size": batch_size, "gradient_accumulation_steps": gradient_accumulation_steps, "max_seq_length": max_seq_length, "lora_r": lora_r, "lora_alpha": lora_alpha, "lora_dropout": lora_dropout, "num_workers": num_workers, "cpus_per_worker": cpus_per_worker, "data_s3_prefix": data_s3_prefix, "adapter_s3_prefix": adapter_prefix, "s3_endpoint": s3_endpoint, "s3_bucket": s3_bucket, "aws_access_key_id": aws_access_key_id, "aws_secret_access_key": aws_secret_access_key, } ) # ── Kubernetes client ─────────────────────────────────── k8s_config.load_incluster_config() core_v1 = k8s_client.CoreV1Api() custom_api = k8s_client.CustomObjectsApi() # ── Create ConfigMap with training script ─────────────── cm = k8s_client.V1ConfigMap( metadata=k8s_client.V1ObjectMeta(name=cm_name, namespace=namespace), data={"train.py": training_script}, ) core_v1.create_namespaced_config_map(namespace=namespace, body=cm) print(f"Created ConfigMap {cm_name}") # ── Common container spec ─────────────────────────────── pip_deps = [ "torch", "transformers", "peft", "datasets", "accelerate", "scipy", "trl", "boto3", ] def make_container(name, cpu_limit, mem_limit, is_head=False): return { "name": name, "image": ray_image, "resources": { "limits": {"cpu": cpu_limit, "memory": mem_limit}, "requests": { "cpu": str(max(1, int(cpu_limit) // 2)), "memory": mem_limit, }, }, "env": [ {"name": "TRAIN_CONFIG", "value": train_config}, {"name": "RAY_DEDUP_LOGS", "value": "0"}, ], "volumeMounts": [ { "name": "train-script", "mountPath": "/home/ray/train.py", "subPath": "train.py", } ], } # ── RayJob spec ───────────────────────────────────────── ray_job = { "apiVersion": "ray.io/v1", "kind": "RayJob", "metadata": {"name": job_name, "namespace": namespace}, "spec": { "entrypoint": "python /home/ray/train.py", "shutdownAfterJobFinishes": True, "ttlSecondsAfterFinished": 300, "runtimeEnvYAML": json.dumps({"pip": pip_deps}), "rayClusterSpec": { "rayVersion": "2.44.1", "headGroupSpec": { "rayStartParams": { "dashboard-host": "0.0.0.0", "num-cpus": "0", }, "template": { "spec": { "containers": [ make_container( "ray-head", "2", "4Gi", is_head=True ) ], "volumes": [ { "name": "train-script", "configMap": {"name": cm_name}, } ], } }, }, "workerGroupSpecs": [ { "replicas": num_workers, "minReplicas": num_workers, "maxReplicas": num_workers, "groupName": "cpu-workers", "rayStartParams": { "num-cpus": str(cpus_per_worker), }, "template": { "spec": { "containers": [ make_container( "ray-worker", str(cpus_per_worker), memory_per_worker, ) ], "volumes": [ { "name": "train-script", "configMap": {"name": cm_name}, } ], } }, } ], }, }, } # ── Submit RayJob ─────────────────────────────────────── custom_api.create_namespaced_custom_object( group="ray.io", version="v1", namespace=namespace, plural="rayjobs", body=ray_job, ) print(f"Submitted RayJob {job_name} ({num_workers} workers × " f"{cpus_per_worker} CPUs)") # ── Poll until completion ─────────────────────────────── terminal_states = {"SUCCEEDED", "FAILED", "STOPPED"} while True: time.sleep(poll_interval_seconds) obj = custom_api.get_namespaced_custom_object( group="ray.io", version="v1", namespace=namespace, plural="rayjobs", name=job_name, ) status = obj.get("status", {}) job_status = status.get("jobStatus", "PENDING") print(f" RayJob {job_name}: {job_status}") if job_status in terminal_states: break if job_status != "SUCCEEDED": msg = status.get("message", "unknown error") raise RuntimeError(f"RayJob {job_name} failed: {job_status} — {msg}") print(f"RayJob {job_name} completed successfully") # ── Read results from S3 ──────────────────────────────── s3 = boto3.client( "s3", endpoint_url=f"http://{s3_endpoint}", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name="us-east-1", config=Config(signature_version="s3v4"), ) results_obj = s3.get_object( Bucket=s3_bucket, Key=f"{adapter_prefix}/results.json" ) results = json.loads(results_obj["Body"].read().decode("utf-8")) train_loss = float(results.get("train_loss", 0.0)) eval_loss = float(results.get("eval_loss", 0.0)) print(f"Results: train_loss={train_loss:.4f}, eval_loss={eval_loss:.4f}") # ── Cleanup ConfigMap ─────────────────────────────────── try: core_v1.delete_namespaced_config_map(name=cm_name, namespace=namespace) print(f"Deleted ConfigMap {cm_name}") except Exception as e: print(f"ConfigMap cleanup warning: {e}") from collections import namedtuple return namedtuple( "RayTrainOutput", ["adapter_s3_prefix", "train_loss", "eval_loss", "num_workers_used"], )( adapter_s3_prefix=adapter_prefix, train_loss=train_loss, eval_loss=eval_loss, num_workers_used=num_workers, ) # ────────────────────────────────────────────────────────────── # 5. Download trained adapter from S3 # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=["boto3"], ) def download_adapter_from_s3( adapter_s3_prefix: str, s3_endpoint: str, s3_bucket: str, aws_access_key_id: str, aws_secret_access_key: str, ) -> NamedTuple("AdapterOutput", [("adapter_path", str)]): """Download the trained LoRA adapter from S3.""" import os import boto3 from botocore.client import Config client = boto3.client( "s3", endpoint_url=f"http://{s3_endpoint}", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name="us-east-1", config=Config(signature_version="s3v4"), ) adapter_path = "/tmp/adapter" os.makedirs(adapter_path, exist_ok=True) paginator = client.get_paginator("list_objects_v2") count = 0 for page in paginator.paginate(Bucket=s3_bucket, Prefix=adapter_s3_prefix): for obj in page.get("Contents", []): key = obj["Key"] rel = key[len(adapter_s3_prefix) :].lstrip("/") if not rel or rel == "results.json": continue local = os.path.join(adapter_path, rel) os.makedirs(os.path.dirname(local), exist_ok=True) client.download_file(s3_bucket, key, local) count += 1 print(f" ↓ {rel}") print(f"Downloaded {count} adapter files to {adapter_path}") from collections import namedtuple return namedtuple("AdapterOutput", ["adapter_path"])(adapter_path=adapter_path) # ────────────────────────────────────────────────────────────── # 6. Sanity evaluation (CPU-only) # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=[ "torch", "transformers", "peft", "accelerate", "scipy", ], ) def evaluate_adapter_cpu( adapter_path: str, base_model: str, ) -> NamedTuple("EvalOutput", [("report", str), ("passed", bool)]): """Load the LoRA adapter on CPU and run sanity-check prompts.""" import torch from peft import PeftModel from transformers import AutoModelForCausalLM, AutoTokenizer print(f"Loading base model {base_model} (CPU) …") model = AutoModelForCausalLM.from_pretrained( base_model, torch_dtype=torch.float32, device_map="cpu", trust_remote_code=True, ) tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) print(f"Loading adapter from {adapter_path} …") model = PeftModel.from_pretrained(model, adapter_path) model.eval() test_prompts = [ "Summarise the key points from the training material.", "What are the main topics covered in the source documents?", "Explain the most important concept from the training data.", ] lines = [] for prompt in test_prompts: messages = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}, ] input_text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) inputs = tokenizer(input_text, return_tensors="pt") with torch.no_grad(): out = model.generate( **inputs, max_new_tokens=128, temperature=0.7, do_sample=True ) response = tokenizer.decode( out[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True ) lines.append(f"Q: {prompt}\nA: {response}\n") print(lines[-1]) report = "\n".join(lines) passed = all(len(line.split("A:")[1].strip()) > 10 for line in lines) print(f"Evaluation passed: {passed}") from collections import namedtuple return namedtuple("EvalOutput", ["report", "passed"])( report=report, passed=passed ) # ────────────────────────────────────────────────────────────── # 7. Push adapter to Gitea repo # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=["requests"], ) def push_adapter_to_gitea( adapter_path: str, gitea_url: str, gitea_owner: str, gitea_repo: str, gitea_username: str, gitea_password: str, branch: str = "main", commit_message: str = "feat: add LoRA adapter from distributed CPU training", ) -> NamedTuple("PushOutput", [("repo_url", str), ("files_pushed", int)]): """Push the LoRA adapter files to a Gitea repository via the API.""" import base64 import os import requests api_base = f"{gitea_url}/api/v1" auth = (gitea_username, gitea_password) repo_api = f"{api_base}/repos/{gitea_owner}/{gitea_repo}" # Check if repo exists, create if not resp = requests.get(repo_api, auth=auth, timeout=30) if resp.status_code == 404: print(f"Creating repo {gitea_owner}/{gitea_repo} …") create_resp = requests.post( f"{api_base}/orgs/{gitea_owner}/repos" if gitea_owner != gitea_username else f"{api_base}/user/repos", auth=auth, json={ "name": gitea_repo, "description": "LoRA adapters trained via distributed CPU pipeline", "private": False, "auto_init": True, }, timeout=30, ) create_resp.raise_for_status() print(f"Created: {create_resp.json().get('html_url')}") # Collect all adapter files files_to_push = [] for root, _dirs, files in os.walk(adapter_path): for fname in files: fpath = os.path.join(root, fname) rel_path = os.path.relpath(fpath, adapter_path) with open(fpath, "rb") as f: content = base64.b64encode(f.read()).decode("utf-8") files_to_push.append({"path": rel_path, "content": content}) print(f"Pushing {len(files_to_push)} files to {gitea_owner}/{gitea_repo}") pushed = 0 for item in files_to_push: file_api = f"{repo_api}/contents/{item['path']}" existing = requests.get( file_api, auth=auth, params={"ref": branch}, timeout=30 ) payload = { "message": commit_message, "content": item["content"], "branch": branch, } if existing.status_code == 200: payload["sha"] = existing.json()["sha"] resp = requests.put(file_api, auth=auth, json=payload, timeout=60) else: resp = requests.post(file_api, auth=auth, json=payload, timeout=60) if resp.status_code in (200, 201): pushed += 1 print(f" ✓ {item['path']}") else: print(f" ✗ {item['path']}: {resp.status_code} {resp.text[:200]}") repo_url = f"{gitea_url}/{gitea_owner}/{gitea_repo}" print(f"Pushed {pushed}/{len(files_to_push)} files to {repo_url}") from collections import namedtuple return namedtuple("PushOutput", ["repo_url", "files_pushed"])( repo_url=repo_url, files_pushed=pushed ) # ────────────────────────────────────────────────────────────── # 8. Log metrics to MLflow # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=["mlflow==2.22.0"], ) def log_training_metrics( base_model: str, train_loss: float, eval_loss: float, num_train: int, num_val: int, num_pdfs: int, lora_r: int, lora_alpha: int, learning_rate: float, num_epochs: int, num_workers: int, repo_url: str, mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", experiment_name: str = "cpu-lora-training", ): """Log the full training run to MLflow.""" import mlflow mlflow.set_tracking_uri(mlflow_tracking_uri) mlflow.set_experiment(experiment_name) with mlflow.start_run(run_name=f"cpu-lora-{base_model.split('/')[-1]}"): mlflow.log_params( { "base_model": base_model, "training_device": "cpu", "distributed": True, "num_workers": num_workers, "dtype": "float32", "lora_r": lora_r, "lora_alpha": lora_alpha, "learning_rate": learning_rate, "num_epochs": num_epochs, "num_pdfs": num_pdfs, "backend": "ray-train-gloo", "data_source": "quobjects/training-data", } ) mlflow.log_metrics( { "train_loss": train_loss, "eval_loss": eval_loss, "train_samples": float(num_train), "val_samples": float(num_val), } ) mlflow.set_tag("adapter_repo", repo_url) mlflow.set_tag("training_type", "distributed-cpu-lora") mlflow.set_tag("orchestrator", "kuberay-rayjob") # ────────────────────────────────────────────────────────────── # Pipeline definition # ────────────────────────────────────────────────────────────── @dsl.pipeline( name="Distributed CPU LoRA Fine-Tuning", description=( "Fine-tune a small language model (1B–8B) using LoRA distributed " "across cluster CPU nodes via KubeRay RayJob + Ray Train. " "Scale by increasing num_workers. Pushes adapter to Gitea and " "logs to MLflow." ), ) def cpu_training_pipeline( # ── S3 / Quobjects ── s3_endpoint: str = "candlekeep.lab.daviestechlabs.io", s3_bucket: str = "training-data", s3_prefix: str = "", aws_access_key_id: str = "", aws_secret_access_key: str = "", # ── Model (small, CPU-friendly) ── base_model: str = "Qwen/Qwen2.5-3B-Instruct", # ── Training hyper-params (conservative for CPU) ── learning_rate: float = 2e-5, num_epochs: int = 3, batch_size: int = 1, gradient_accumulation_steps: int = 16, max_seq_length: int = 1024, lora_r: int = 16, lora_alpha: int = 32, lora_dropout: float = 0.05, # ── Data prep ── chunk_size: int = 512, chunk_overlap: int = 64, # ── Ray Train (scale here) ── num_workers: int = 4, cpus_per_worker: int = 4, memory_per_worker: str = "16Gi", ray_image: str = "rayproject/ray:2.44.1-py311-cpu", ray_namespace: str = "kubeflow", # ── Gitea ── gitea_url: str = "http://gitea-http.gitea.svc.cluster.local:3000", gitea_owner: str = "daviestechlabs", gitea_repo: str = "lora-adapters-cpu", gitea_username: str = "", gitea_password: str = "", # ── MLflow ── mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", ): # Step 1 — Fetch PDFs from S3 pdfs = fetch_pdfs_from_s3( s3_endpoint=s3_endpoint, s3_bucket=s3_bucket, s3_prefix=s3_prefix, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) # Step 2 — Extract text and build training dataset data = prepare_training_data( pdf_dir=pdfs.outputs["pdf_dir"], max_seq_length=max_seq_length, chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) # Step 3 — Upload training data to S3 for Ray workers uploaded = upload_data_to_s3( dataset_path=data.outputs["dataset_path"], s3_endpoint=s3_endpoint, s3_bucket=s3_bucket, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) # Step 4 — Distributed CPU training via KubeRay RayJob trained = submit_ray_training_job( data_s3_prefix=uploaded.outputs["data_s3_prefix"], s3_endpoint=s3_endpoint, s3_bucket=s3_bucket, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, base_model=base_model, learning_rate=learning_rate, num_epochs=num_epochs, batch_size=batch_size, gradient_accumulation_steps=gradient_accumulation_steps, max_seq_length=max_seq_length, lora_r=lora_r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, num_workers=num_workers, cpus_per_worker=cpus_per_worker, memory_per_worker=memory_per_worker, ray_image=ray_image, namespace=ray_namespace, training_script=_RAY_TRAIN_SCRIPT, ) # The RayJob submission pod itself is lightweight trained.set_cpu_limit("1") trained.set_memory_limit("1Gi") # Step 5 — Download trained adapter from S3 adapter = download_adapter_from_s3( adapter_s3_prefix=trained.outputs["adapter_s3_prefix"], s3_endpoint=s3_endpoint, s3_bucket=s3_bucket, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) # Step 6 — Quick evaluation (CPU-only) evaluated = evaluate_adapter_cpu( adapter_path=adapter.outputs["adapter_path"], base_model=base_model, ) evaluated.set_cpu_limit("4") evaluated.set_memory_limit("16Gi") evaluated.set_memory_request("8Gi") # Step 7 — Push adapter to Gitea pushed = push_adapter_to_gitea( adapter_path=adapter.outputs["adapter_path"], gitea_url=gitea_url, gitea_owner=gitea_owner, gitea_repo=gitea_repo, gitea_username=gitea_username, gitea_password=gitea_password, ) # Step 8 — Log to MLflow log_training_metrics( base_model=base_model, train_loss=trained.outputs["train_loss"], eval_loss=trained.outputs["eval_loss"], num_train=data.outputs["num_train"], num_val=data.outputs["num_val"], num_pdfs=pdfs.outputs["num_files"], lora_r=lora_r, lora_alpha=lora_alpha, learning_rate=learning_rate, num_epochs=num_epochs, num_workers=trained.outputs["num_workers_used"], repo_url=pushed.outputs["repo_url"], mlflow_tracking_uri=mlflow_tracking_uri, ) # ────────────────────────────────────────────────────────────── # Compile # ────────────────────────────────────────────────────────────── if __name__ == "__main__": compiler.Compiler().compile( pipeline_func=cpu_training_pipeline, package_path="cpu_training_pipeline.yaml", ) print("Compiled: cpu_training_pipeline.yaml")