diff --git a/cpu_training_pipeline.py b/cpu_training_pipeline.py new file mode 100644 index 0000000..8073191 --- /dev/null +++ b/cpu_training_pipeline.py @@ -0,0 +1,1163 @@ +#!/usr/bin/env python3 +""" +Distributed CPU Fine-Tuning Pipeline – Kubeflow Pipelines SDK + +Trains a small language model (1B–8B) using LoRA on CPU only, +distributed across multiple cluster nodes via KubeRay RayJob. +GPUs remain 100 % dedicated to inference serving. + +Architecture: + 1. Fetch PDFs from Quobjects S3 + 2. Prepare instruction-tuning dataset + 3. Upload prepared data to S3 (shared storage for Ray workers) + 4. Submit a KubeRay RayJob that runs Ray Train TorchTrainer + with N CPU-only workers across cluster nodes + 5. Download the trained adapter from S3 + 6. Sanity evaluation on CPU + 7. Push adapter to Gitea + 8. Log metrics to MLflow + +Scaling: + Increase `num_workers` to spread training across more nodes. + Each worker runs data-parallel LoRA training; gradients are + synchronised via AllReduce (Gloo backend). + +Usage: + pip install kfp==2.12.1 + python cpu_training_pipeline.py + # Upload cpu_training_pipeline.yaml to Kubeflow Pipelines UI + +Prerequisites in-cluster: + - Secret mlpipeline-minio-artifact (namespace kubeflow) for S3 creds + - Secret gitea-admin-secret (namespace gitea) for Gitea push + - KubeRay operator installed (namespace ray-system) + - MLflow tracking server (namespace mlflow) + - RBAC: pipeline SA needs create/get/list/delete on rayjobs.ray.io + and configmaps in the training namespace +""" + +from kfp import compiler, dsl +from typing import NamedTuple + + +# ────────────────────────────────────────────────────────────── +# 1. Fetch PDFs from Quobjects S3 +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["boto3"], +) +def fetch_pdfs_from_s3( + s3_endpoint: str, + s3_bucket: str, + s3_prefix: str, + aws_access_key_id: str, + aws_secret_access_key: str, +) -> NamedTuple("PDFOutput", [("pdf_dir", str), ("num_files", int)]): + """Download all PDFs from a Quobjects S3 bucket.""" + import os + + import boto3 + from botocore.client import Config + + out_dir = "/tmp/pdfs" + os.makedirs(out_dir, exist_ok=True) + + client = boto3.client( + "s3", + endpoint_url=f"http://{s3_endpoint}", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name="us-east-1", + config=Config(signature_version="s3v4"), + ) + + paginator = client.get_paginator("list_objects_v2") + count = 0 + for page in paginator.paginate(Bucket=s3_bucket, Prefix=s3_prefix): + for obj in page.get("Contents", []): + key = obj["Key"] + if key.lower().endswith(".pdf"): + local_path = os.path.join(out_dir, os.path.basename(key)) + print(f"Downloading: {key} → {local_path}") + client.download_file(s3_bucket, key, local_path) + count += 1 + + print(f"Downloaded {count} PDFs to {out_dir}") + from collections import namedtuple + + return namedtuple("PDFOutput", ["pdf_dir", "num_files"])( + pdf_dir=out_dir, num_files=count + ) + + +# ────────────────────────────────────────────────────────────── +# 2. Extract text from PDFs → instruction-tuning dataset +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["pymupdf"], +) +def prepare_training_data( + pdf_dir: str, + max_seq_length: int = 2048, + chunk_size: int = 512, + chunk_overlap: int = 64, +) -> NamedTuple( + "DataOutput", [("dataset_path", str), ("num_train", int), ("num_val", int)] +): + """Extract text from PDFs, chunk it, and format as instruction-tuning pairs.""" + import json + import os + + import fitz # PyMuPDF + + out_dir = "/tmp/training_data" + os.makedirs(out_dir, exist_ok=True) + + # 1. Extract text from all PDFs + all_chunks: list[dict] = [] + for fname in sorted(os.listdir(pdf_dir)): + if not fname.lower().endswith(".pdf"): + continue + path = os.path.join(pdf_dir, fname) + print(f"Extracting: {fname}") + try: + doc = fitz.open(path) + full_text = "" + for page in doc: + full_text += page.get_text() + "\n" + doc.close() + except Exception as e: + print(f" SKIP ({e})") + continue + + # 2. Chunk text with overlap + words = full_text.split() + for i in range(0, len(words), chunk_size - chunk_overlap): + chunk_words = words[i : i + chunk_size] + if len(chunk_words) < 50: + continue # skip tiny trailing chunks + chunk_text = " ".join(chunk_words) + all_chunks.append({"text": chunk_text, "source": fname}) + + print(f"Total chunks: {len(all_chunks)}") + if not all_chunks: + raise ValueError("No text extracted from PDFs — check your bucket") + + # 3. Format as chat training pairs (self-supervised continuation) + samples = [] + for chunk in all_chunks: + text = chunk["text"] + source = chunk["source"] + words = text.split() + mid = len(words) // 2 + context = " ".join(words[:mid]) + continuation = " ".join(words[mid:]) + + samples.append( + { + "messages": [ + { + "role": "system", + "content": ( + "You are a knowledgeable assistant. " + "Continue the information accurately and coherently." + ), + }, + { + "role": "user", + "content": f"Continue the following passage from {source}:\n\n{context}", + }, + {"role": "assistant", "content": continuation}, + ] + } + ) + + # 4. Train/val split (90/10) + import random + + random.seed(42) + random.shuffle(samples) + split = int(len(samples) * 0.9) + train = samples[:split] + val = samples[split:] + + train_path = os.path.join(out_dir, "train.json") + val_path = os.path.join(out_dir, "val.json") + with open(train_path, "w") as f: + json.dump(train, f) + with open(val_path, "w") as f: + json.dump(val, f) + + print(f"Train: {len(train)} samples, Val: {len(val)} samples") + from collections import namedtuple + + return namedtuple("DataOutput", ["dataset_path", "num_train", "num_val"])( + dataset_path=out_dir, num_train=len(train), num_val=len(val) + ) + + +# ────────────────────────────────────────────────────────────── +# 3. Upload prepared data to S3 (so Ray workers can access it) +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["boto3"], +) +def upload_data_to_s3( + dataset_path: str, + s3_endpoint: str, + s3_bucket: str, + aws_access_key_id: str, + aws_secret_access_key: str, + run_id: str = "", +) -> NamedTuple("UploadOutput", [("data_s3_prefix", str)]): + """Upload train.json and val.json to S3 for distributed Ray workers.""" + import os + import uuid + + import boto3 + from botocore.client import Config + + if not run_id: + run_id = uuid.uuid4().hex[:8] + prefix = f"ray-training-runs/{run_id}/data" + + client = boto3.client( + "s3", + endpoint_url=f"http://{s3_endpoint}", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name="us-east-1", + config=Config(signature_version="s3v4"), + ) + + for fname in ["train.json", "val.json"]: + local = os.path.join(dataset_path, fname) + key = f"{prefix}/{fname}" + print(f"Uploading {local} → s3://{s3_bucket}/{key}") + client.upload_file(local, s3_bucket, key) + + print(f"Data uploaded to s3://{s3_bucket}/{prefix}") + from collections import namedtuple + + return namedtuple("UploadOutput", ["data_s3_prefix"])(data_s3_prefix=prefix) + + +# ────────────────────────────────────────────────────────────── +# Training script (embedded in the RayJob ConfigMap) +# ────────────────────────────────────────────────────────────── +_RAY_TRAIN_SCRIPT = '''\ +#!/usr/bin/env python3 +"""Distributed LoRA training on CPU via Ray Train TorchTrainer.""" +import json +import os + +import boto3 +import ray +import torch +from botocore.client import Config +from datasets import Dataset +from peft import LoraConfig, get_peft_model +from ray.train import RunConfig, ScalingConfig +from ray.train.huggingface.transformers import ( + RayTrainReportCallback, + prepare_trainer, +) +from ray.train.torch import TorchTrainer +from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments +from trl import SFTTrainer + + +def _s3_client(cfg): + return boto3.client( + "s3", + endpoint_url=f"http://{cfg['s3_endpoint']}", + aws_access_key_id=cfg["aws_access_key_id"], + aws_secret_access_key=cfg["aws_secret_access_key"], + region_name="us-east-1", + config=Config(signature_version="s3v4"), + ) + + +def train_func(config): + """Each Ray worker executes this function.""" + import ray.train as rt + + # ── Download data from S3 (each worker independently) ── + s3 = _s3_client(config) + data_dir = "/tmp/training_data" + os.makedirs(data_dir, exist_ok=True) + + prefix = config["data_s3_prefix"] + bucket = config["s3_bucket"] + for fname in ["train.json", "val.json"]: + s3.download_file(bucket, f"{prefix}/{fname}", f"{data_dir}/{fname}") + + with open(f"{data_dir}/train.json") as f: + train_data = json.load(f) + with open(f"{data_dir}/val.json") as f: + val_data = json.load(f) + + rank = rt.get_context().get_world_rank() + world = rt.get_context().get_world_size() + print(f"[worker {rank}/{world}] Loaded {len(train_data)} train / " + f"{len(val_data)} val samples") + + # ── Tokenizer ────────────────────────────────────────── + base_model = config["base_model"] + tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + tokenizer.padding_side = "right" + + def format_chat(sample): + return { + "text": tokenizer.apply_chat_template( + sample["messages"], tokenize=False, add_generation_prompt=False + ) + } + + train_ds = Dataset.from_list(train_data).map(format_chat) + val_ds = Dataset.from_list(val_data).map(format_chat) + + # ── Model (float32, CPU) ─────────────────────────────── + print(f"[worker {rank}] Loading {base_model} (float32, CPU)") + model = AutoModelForCausalLM.from_pretrained( + base_model, + torch_dtype=torch.float32, + device_map="cpu", + trust_remote_code=True, + ) + + lora_cfg = LoraConfig( + r=config["lora_r"], + lora_alpha=config["lora_alpha"], + target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], + lora_dropout=config["lora_dropout"], + bias="none", + task_type="CAUSAL_LM", + ) + model = get_peft_model(model, lora_cfg) + model.print_trainable_parameters() + + # ── Training args (CPU-optimised) ────────────────────── + args = TrainingArguments( + output_dir="/tmp/checkpoints", + num_train_epochs=config["num_epochs"], + per_device_train_batch_size=config["batch_size"], + per_device_eval_batch_size=config["batch_size"], + gradient_accumulation_steps=config["gradient_accumulation_steps"], + learning_rate=config["learning_rate"], + bf16=False, + fp16=False, + logging_steps=10, + eval_strategy="steps", + eval_steps=100, + save_strategy="no", # Ray handles checkpointing + warmup_ratio=0.05, + lr_scheduler_type="cosine", + optim="adamw_torch", + max_grad_norm=1.0, + group_by_length=True, + gradient_checkpointing=True, + dataloader_num_workers=2, + dataloader_pin_memory=False, + no_cuda=True, + report_to="none", + ) + + # ── SFTTrainer + Ray integration ─────────────────────── + trainer = SFTTrainer( + model=model, + args=args, + train_dataset=train_ds, + eval_dataset=val_ds, + tokenizer=tokenizer, + max_seq_length=config["max_seq_length"], + dataset_text_field="text", + packing=True, + ) + + # Ray Train callback reports metrics; prepare_trainer wraps + # the model with DDP and adds DistributedSampler + trainer.add_callback(RayTrainReportCallback()) + trainer = prepare_trainer(trainer) + + print(f"[worker {rank}] Starting distributed LoRA training …") + result = trainer.train() + eval_result = trainer.evaluate() + + train_loss = result.training_loss + eval_loss = eval_result.get("eval_loss", 0.0) + print(f"[worker {rank}] train_loss={train_loss:.4f} eval_loss={eval_loss:.4f}") + + # ── Rank 0: save adapter + upload to S3 ──────────────── + if rank == 0: + adapter_path = "/tmp/adapter" + unwrapped = ( + trainer.model.module + if hasattr(trainer.model, "module") + else trainer.model + ) + unwrapped.save_pretrained(adapter_path) + tokenizer.save_pretrained(adapter_path) + + metadata = { + "base_model": base_model, + "train_loss": train_loss, + "eval_loss": eval_loss, + "training_device": "cpu", + "distributed": True, + "num_workers": world, + "dtype": "float32", + "lora_r": config["lora_r"], + "lora_alpha": config["lora_alpha"], + "lora_dropout": config["lora_dropout"], + "learning_rate": config["learning_rate"], + "num_epochs": config["num_epochs"], + "batch_size": config["batch_size"], + "gradient_accumulation_steps": config["gradient_accumulation_steps"], + "max_seq_length": config["max_seq_length"], + } + with open(f"{adapter_path}/training_metadata.json", "w") as f: + json.dump(metadata, f, indent=2) + + # Upload adapter to S3 + adapter_prefix = config["adapter_s3_prefix"] + for root, _, files in os.walk(adapter_path): + for fname in files: + fpath = os.path.join(root, fname) + rel = os.path.relpath(fpath, adapter_path) + key = f"{adapter_prefix}/{rel}" + s3.upload_file(fpath, bucket, key) + print(f" ✓ s3://{bucket}/{key}") + + # Results marker for the KFP step to read + s3.put_object( + Bucket=bucket, + Key=f"{adapter_prefix}/results.json", + Body=json.dumps(metadata), + ) + print(f"Adapter uploaded to s3://{bucket}/{adapter_prefix}") + + +if __name__ == "__main__": + cfg = json.loads(os.environ["TRAIN_CONFIG"]) + num_workers = cfg.pop("num_workers", 4) + cpus_per_worker = cfg.pop("cpus_per_worker", 4) + + ray.init() + trainer = TorchTrainer( + train_func, + scaling_config=ScalingConfig( + num_workers=num_workers, + use_gpu=False, + resources_per_worker={"CPU": cpus_per_worker}, + ), + run_config=RunConfig(name="cpu-lora-training"), + train_loop_config=cfg, + ) + result = trainer.fit() + print(f"Training complete: {result}") +''' + + +# ────────────────────────────────────────────────────────────── +# 4. Submit a KubeRay RayJob for distributed CPU training +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["kubernetes", "boto3"], +) +def submit_ray_training_job( + data_s3_prefix: str, + s3_endpoint: str, + s3_bucket: str, + aws_access_key_id: str, + aws_secret_access_key: str, + base_model: str, + learning_rate: float, + num_epochs: int, + batch_size: int, + gradient_accumulation_steps: int, + max_seq_length: int, + lora_r: int, + lora_alpha: int, + lora_dropout: float, + num_workers: int = 4, + cpus_per_worker: int = 4, + memory_per_worker: str = "16Gi", + ray_image: str = "rayproject/ray:2.44.1-py311-cpu", + namespace: str = "kubeflow", + poll_interval_seconds: int = 30, + training_script: str = "", +) -> NamedTuple( + "RayTrainOutput", + [ + ("adapter_s3_prefix", str), + ("train_loss", float), + ("eval_loss", float), + ("num_workers_used", int), + ], +): + """Create a KubeRay RayJob for distributed CPU LoRA training and wait.""" + import json + import time + import uuid + + import boto3 + from botocore.client import Config + from kubernetes import client as k8s_client + from kubernetes import config as k8s_config + + # ── Build identifiers ─────────────────────────────────── + run_id = uuid.uuid4().hex[:8] + job_name = f"cpu-lora-{run_id}" + cm_name = f"train-script-{run_id}" + adapter_prefix = f"ray-training-runs/{run_id}/adapter" + + # ── Training config (passed as TRAIN_CONFIG env var) ──── + train_config = json.dumps( + { + "base_model": base_model, + "learning_rate": learning_rate, + "num_epochs": num_epochs, + "batch_size": batch_size, + "gradient_accumulation_steps": gradient_accumulation_steps, + "max_seq_length": max_seq_length, + "lora_r": lora_r, + "lora_alpha": lora_alpha, + "lora_dropout": lora_dropout, + "num_workers": num_workers, + "cpus_per_worker": cpus_per_worker, + "data_s3_prefix": data_s3_prefix, + "adapter_s3_prefix": adapter_prefix, + "s3_endpoint": s3_endpoint, + "s3_bucket": s3_bucket, + "aws_access_key_id": aws_access_key_id, + "aws_secret_access_key": aws_secret_access_key, + } + ) + + # ── Kubernetes client ─────────────────────────────────── + k8s_config.load_incluster_config() + core_v1 = k8s_client.CoreV1Api() + custom_api = k8s_client.CustomObjectsApi() + + # ── Create ConfigMap with training script ─────────────── + cm = k8s_client.V1ConfigMap( + metadata=k8s_client.V1ObjectMeta(name=cm_name, namespace=namespace), + data={"train.py": training_script}, + ) + core_v1.create_namespaced_config_map(namespace=namespace, body=cm) + print(f"Created ConfigMap {cm_name}") + + # ── Common container spec ─────────────────────────────── + pip_deps = [ + "torch", + "transformers", + "peft", + "datasets", + "accelerate", + "scipy", + "trl", + "boto3", + ] + + def make_container(name, cpu_limit, mem_limit, is_head=False): + return { + "name": name, + "image": ray_image, + "resources": { + "limits": {"cpu": cpu_limit, "memory": mem_limit}, + "requests": { + "cpu": str(max(1, int(cpu_limit) // 2)), + "memory": mem_limit, + }, + }, + "env": [ + {"name": "TRAIN_CONFIG", "value": train_config}, + {"name": "RAY_DEDUP_LOGS", "value": "0"}, + ], + "volumeMounts": [ + { + "name": "train-script", + "mountPath": "/home/ray/train.py", + "subPath": "train.py", + } + ], + } + + # ── RayJob spec ───────────────────────────────────────── + ray_job = { + "apiVersion": "ray.io/v1", + "kind": "RayJob", + "metadata": {"name": job_name, "namespace": namespace}, + "spec": { + "entrypoint": "python /home/ray/train.py", + "shutdownAfterJobFinishes": True, + "ttlSecondsAfterFinished": 300, + "runtimeEnvYAML": json.dumps({"pip": pip_deps}), + "rayClusterSpec": { + "rayVersion": "2.44.1", + "headGroupSpec": { + "rayStartParams": { + "dashboard-host": "0.0.0.0", + "num-cpus": "0", + }, + "template": { + "spec": { + "containers": [ + make_container( + "ray-head", "2", "4Gi", is_head=True + ) + ], + "volumes": [ + { + "name": "train-script", + "configMap": {"name": cm_name}, + } + ], + } + }, + }, + "workerGroupSpecs": [ + { + "replicas": num_workers, + "minReplicas": num_workers, + "maxReplicas": num_workers, + "groupName": "cpu-workers", + "rayStartParams": { + "num-cpus": str(cpus_per_worker), + }, + "template": { + "spec": { + "containers": [ + make_container( + "ray-worker", + str(cpus_per_worker), + memory_per_worker, + ) + ], + "volumes": [ + { + "name": "train-script", + "configMap": {"name": cm_name}, + } + ], + } + }, + } + ], + }, + }, + } + + # ── Submit RayJob ─────────────────────────────────────── + custom_api.create_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayjobs", + body=ray_job, + ) + print(f"Submitted RayJob {job_name} ({num_workers} workers × " + f"{cpus_per_worker} CPUs)") + + # ── Poll until completion ─────────────────────────────── + terminal_states = {"SUCCEEDED", "FAILED", "STOPPED"} + while True: + time.sleep(poll_interval_seconds) + obj = custom_api.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayjobs", + name=job_name, + ) + status = obj.get("status", {}) + job_status = status.get("jobStatus", "PENDING") + print(f" RayJob {job_name}: {job_status}") + + if job_status in terminal_states: + break + + if job_status != "SUCCEEDED": + msg = status.get("message", "unknown error") + raise RuntimeError(f"RayJob {job_name} failed: {job_status} — {msg}") + + print(f"RayJob {job_name} completed successfully") + + # ── Read results from S3 ──────────────────────────────── + s3 = boto3.client( + "s3", + endpoint_url=f"http://{s3_endpoint}", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name="us-east-1", + config=Config(signature_version="s3v4"), + ) + + results_obj = s3.get_object( + Bucket=s3_bucket, Key=f"{adapter_prefix}/results.json" + ) + results = json.loads(results_obj["Body"].read().decode("utf-8")) + + train_loss = float(results.get("train_loss", 0.0)) + eval_loss = float(results.get("eval_loss", 0.0)) + print(f"Results: train_loss={train_loss:.4f}, eval_loss={eval_loss:.4f}") + + # ── Cleanup ConfigMap ─────────────────────────────────── + try: + core_v1.delete_namespaced_config_map(name=cm_name, namespace=namespace) + print(f"Deleted ConfigMap {cm_name}") + except Exception as e: + print(f"ConfigMap cleanup warning: {e}") + + from collections import namedtuple + + return namedtuple( + "RayTrainOutput", + ["adapter_s3_prefix", "train_loss", "eval_loss", "num_workers_used"], + )( + adapter_s3_prefix=adapter_prefix, + train_loss=train_loss, + eval_loss=eval_loss, + num_workers_used=num_workers, + ) + + +# ────────────────────────────────────────────────────────────── +# 5. Download trained adapter from S3 +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["boto3"], +) +def download_adapter_from_s3( + adapter_s3_prefix: str, + s3_endpoint: str, + s3_bucket: str, + aws_access_key_id: str, + aws_secret_access_key: str, +) -> NamedTuple("AdapterOutput", [("adapter_path", str)]): + """Download the trained LoRA adapter from S3.""" + import os + + import boto3 + from botocore.client import Config + + client = boto3.client( + "s3", + endpoint_url=f"http://{s3_endpoint}", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name="us-east-1", + config=Config(signature_version="s3v4"), + ) + + adapter_path = "/tmp/adapter" + os.makedirs(adapter_path, exist_ok=True) + + paginator = client.get_paginator("list_objects_v2") + count = 0 + for page in paginator.paginate(Bucket=s3_bucket, Prefix=adapter_s3_prefix): + for obj in page.get("Contents", []): + key = obj["Key"] + rel = key[len(adapter_s3_prefix) :].lstrip("/") + if not rel or rel == "results.json": + continue + local = os.path.join(adapter_path, rel) + os.makedirs(os.path.dirname(local), exist_ok=True) + client.download_file(s3_bucket, key, local) + count += 1 + print(f" ↓ {rel}") + + print(f"Downloaded {count} adapter files to {adapter_path}") + from collections import namedtuple + + return namedtuple("AdapterOutput", ["adapter_path"])(adapter_path=adapter_path) + + +# ────────────────────────────────────────────────────────────── +# 6. Sanity evaluation (CPU-only) +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=[ + "torch", + "transformers", + "peft", + "accelerate", + "scipy", + ], +) +def evaluate_adapter_cpu( + adapter_path: str, + base_model: str, +) -> NamedTuple("EvalOutput", [("report", str), ("passed", bool)]): + """Load the LoRA adapter on CPU and run sanity-check prompts.""" + import torch + from peft import PeftModel + from transformers import AutoModelForCausalLM, AutoTokenizer + + print(f"Loading base model {base_model} (CPU) …") + model = AutoModelForCausalLM.from_pretrained( + base_model, + torch_dtype=torch.float32, + device_map="cpu", + trust_remote_code=True, + ) + tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) + + print(f"Loading adapter from {adapter_path} …") + model = PeftModel.from_pretrained(model, adapter_path) + model.eval() + + test_prompts = [ + "Summarise the key points from the training material.", + "What are the main topics covered in the source documents?", + "Explain the most important concept from the training data.", + ] + + lines = [] + for prompt in test_prompts: + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt}, + ] + input_text = tokenizer.apply_chat_template( + messages, tokenize=False, add_generation_prompt=True + ) + inputs = tokenizer(input_text, return_tensors="pt") + with torch.no_grad(): + out = model.generate( + **inputs, max_new_tokens=128, temperature=0.7, do_sample=True + ) + response = tokenizer.decode( + out[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True + ) + lines.append(f"Q: {prompt}\nA: {response}\n") + print(lines[-1]) + + report = "\n".join(lines) + passed = all(len(line.split("A:")[1].strip()) > 10 for line in lines) + print(f"Evaluation passed: {passed}") + + from collections import namedtuple + + return namedtuple("EvalOutput", ["report", "passed"])( + report=report, passed=passed + ) + + +# ────────────────────────────────────────────────────────────── +# 7. Push adapter to Gitea repo +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["requests"], +) +def push_adapter_to_gitea( + adapter_path: str, + gitea_url: str, + gitea_owner: str, + gitea_repo: str, + gitea_username: str, + gitea_password: str, + branch: str = "main", + commit_message: str = "feat: add LoRA adapter from distributed CPU training", +) -> NamedTuple("PushOutput", [("repo_url", str), ("files_pushed", int)]): + """Push the LoRA adapter files to a Gitea repository via the API.""" + import base64 + import os + + import requests + + api_base = f"{gitea_url}/api/v1" + auth = (gitea_username, gitea_password) + repo_api = f"{api_base}/repos/{gitea_owner}/{gitea_repo}" + + # Check if repo exists, create if not + resp = requests.get(repo_api, auth=auth, timeout=30) + if resp.status_code == 404: + print(f"Creating repo {gitea_owner}/{gitea_repo} …") + create_resp = requests.post( + f"{api_base}/orgs/{gitea_owner}/repos" + if gitea_owner != gitea_username + else f"{api_base}/user/repos", + auth=auth, + json={ + "name": gitea_repo, + "description": "LoRA adapters trained via distributed CPU pipeline", + "private": False, + "auto_init": True, + }, + timeout=30, + ) + create_resp.raise_for_status() + print(f"Created: {create_resp.json().get('html_url')}") + + # Collect all adapter files + files_to_push = [] + for root, _dirs, files in os.walk(adapter_path): + for fname in files: + fpath = os.path.join(root, fname) + rel_path = os.path.relpath(fpath, adapter_path) + with open(fpath, "rb") as f: + content = base64.b64encode(f.read()).decode("utf-8") + files_to_push.append({"path": rel_path, "content": content}) + + print(f"Pushing {len(files_to_push)} files to {gitea_owner}/{gitea_repo}") + + pushed = 0 + for item in files_to_push: + file_api = f"{repo_api}/contents/{item['path']}" + + existing = requests.get( + file_api, auth=auth, params={"ref": branch}, timeout=30 + ) + payload = { + "message": commit_message, + "content": item["content"], + "branch": branch, + } + if existing.status_code == 200: + payload["sha"] = existing.json()["sha"] + resp = requests.put(file_api, auth=auth, json=payload, timeout=60) + else: + resp = requests.post(file_api, auth=auth, json=payload, timeout=60) + + if resp.status_code in (200, 201): + pushed += 1 + print(f" ✓ {item['path']}") + else: + print(f" ✗ {item['path']}: {resp.status_code} {resp.text[:200]}") + + repo_url = f"{gitea_url}/{gitea_owner}/{gitea_repo}" + print(f"Pushed {pushed}/{len(files_to_push)} files to {repo_url}") + + from collections import namedtuple + + return namedtuple("PushOutput", ["repo_url", "files_pushed"])( + repo_url=repo_url, files_pushed=pushed + ) + + +# ────────────────────────────────────────────────────────────── +# 8. Log metrics to MLflow +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["mlflow==2.22.0"], +) +def log_training_metrics( + base_model: str, + train_loss: float, + eval_loss: float, + num_train: int, + num_val: int, + num_pdfs: int, + lora_r: int, + lora_alpha: int, + learning_rate: float, + num_epochs: int, + num_workers: int, + repo_url: str, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", + experiment_name: str = "cpu-lora-training", +): + """Log the full training run to MLflow.""" + import mlflow + + mlflow.set_tracking_uri(mlflow_tracking_uri) + mlflow.set_experiment(experiment_name) + + with mlflow.start_run(run_name=f"cpu-lora-{base_model.split('/')[-1]}"): + mlflow.log_params( + { + "base_model": base_model, + "training_device": "cpu", + "distributed": True, + "num_workers": num_workers, + "dtype": "float32", + "lora_r": lora_r, + "lora_alpha": lora_alpha, + "learning_rate": learning_rate, + "num_epochs": num_epochs, + "num_pdfs": num_pdfs, + "backend": "ray-train-gloo", + "data_source": "quobjects/training-data", + } + ) + mlflow.log_metrics( + { + "train_loss": train_loss, + "eval_loss": eval_loss, + "train_samples": float(num_train), + "val_samples": float(num_val), + } + ) + mlflow.set_tag("adapter_repo", repo_url) + mlflow.set_tag("training_type", "distributed-cpu-lora") + mlflow.set_tag("orchestrator", "kuberay-rayjob") + + +# ────────────────────────────────────────────────────────────── +# Pipeline definition +# ────────────────────────────────────────────────────────────── +@dsl.pipeline( + name="Distributed CPU LoRA Fine-Tuning", + description=( + "Fine-tune a small language model (1B–8B) using LoRA distributed " + "across cluster CPU nodes via KubeRay RayJob + Ray Train. " + "Scale by increasing num_workers. Pushes adapter to Gitea and " + "logs to MLflow." + ), +) +def cpu_training_pipeline( + # ── S3 / Quobjects ── + s3_endpoint: str = "candlekeep.lab.daviestechlabs.io", + s3_bucket: str = "training-data", + s3_prefix: str = "", + aws_access_key_id: str = "", + aws_secret_access_key: str = "", + # ── Model (small, CPU-friendly) ── + base_model: str = "Qwen/Qwen2.5-3B-Instruct", + # ── Training hyper-params (conservative for CPU) ── + learning_rate: float = 2e-5, + num_epochs: int = 3, + batch_size: int = 1, + gradient_accumulation_steps: int = 16, + max_seq_length: int = 1024, + lora_r: int = 16, + lora_alpha: int = 32, + lora_dropout: float = 0.05, + # ── Data prep ── + chunk_size: int = 512, + chunk_overlap: int = 64, + # ── Ray Train (scale here) ── + num_workers: int = 4, + cpus_per_worker: int = 4, + memory_per_worker: str = "16Gi", + ray_image: str = "rayproject/ray:2.44.1-py311-cpu", + ray_namespace: str = "kubeflow", + # ── Gitea ── + gitea_url: str = "http://gitea-http.gitea.svc.cluster.local:3000", + gitea_owner: str = "daviestechlabs", + gitea_repo: str = "lora-adapters-cpu", + gitea_username: str = "", + gitea_password: str = "", + # ── MLflow ── + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +): + # Step 1 — Fetch PDFs from S3 + pdfs = fetch_pdfs_from_s3( + s3_endpoint=s3_endpoint, + s3_bucket=s3_bucket, + s3_prefix=s3_prefix, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + + # Step 2 — Extract text and build training dataset + data = prepare_training_data( + pdf_dir=pdfs.outputs["pdf_dir"], + max_seq_length=max_seq_length, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + + # Step 3 — Upload training data to S3 for Ray workers + uploaded = upload_data_to_s3( + dataset_path=data.outputs["dataset_path"], + s3_endpoint=s3_endpoint, + s3_bucket=s3_bucket, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + + # Step 4 — Distributed CPU training via KubeRay RayJob + trained = submit_ray_training_job( + data_s3_prefix=uploaded.outputs["data_s3_prefix"], + s3_endpoint=s3_endpoint, + s3_bucket=s3_bucket, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + base_model=base_model, + learning_rate=learning_rate, + num_epochs=num_epochs, + batch_size=batch_size, + gradient_accumulation_steps=gradient_accumulation_steps, + max_seq_length=max_seq_length, + lora_r=lora_r, + lora_alpha=lora_alpha, + lora_dropout=lora_dropout, + num_workers=num_workers, + cpus_per_worker=cpus_per_worker, + memory_per_worker=memory_per_worker, + ray_image=ray_image, + namespace=ray_namespace, + training_script=_RAY_TRAIN_SCRIPT, + ) + # The RayJob submission pod itself is lightweight + trained.set_cpu_limit("1") + trained.set_memory_limit("1Gi") + + # Step 5 — Download trained adapter from S3 + adapter = download_adapter_from_s3( + adapter_s3_prefix=trained.outputs["adapter_s3_prefix"], + s3_endpoint=s3_endpoint, + s3_bucket=s3_bucket, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + + # Step 6 — Quick evaluation (CPU-only) + evaluated = evaluate_adapter_cpu( + adapter_path=adapter.outputs["adapter_path"], + base_model=base_model, + ) + evaluated.set_cpu_limit("4") + evaluated.set_memory_limit("16Gi") + evaluated.set_memory_request("8Gi") + + # Step 7 — Push adapter to Gitea + pushed = push_adapter_to_gitea( + adapter_path=adapter.outputs["adapter_path"], + gitea_url=gitea_url, + gitea_owner=gitea_owner, + gitea_repo=gitea_repo, + gitea_username=gitea_username, + gitea_password=gitea_password, + ) + + # Step 8 — Log to MLflow + log_training_metrics( + base_model=base_model, + train_loss=trained.outputs["train_loss"], + eval_loss=trained.outputs["eval_loss"], + num_train=data.outputs["num_train"], + num_val=data.outputs["num_val"], + num_pdfs=pdfs.outputs["num_files"], + lora_r=lora_r, + lora_alpha=lora_alpha, + learning_rate=learning_rate, + num_epochs=num_epochs, + num_workers=trained.outputs["num_workers_used"], + repo_url=pushed.outputs["repo_url"], + mlflow_tracking_uri=mlflow_tracking_uri, + ) + + +# ────────────────────────────────────────────────────────────── +# Compile +# ────────────────────────────────────────────────────────────── +if __name__ == "__main__": + compiler.Compiler().compile( + pipeline_func=cpu_training_pipeline, + package_path="cpu_training_pipeline.yaml", + ) + print("Compiled: cpu_training_pipeline.yaml")