Files
kubeflow/cpu_training_pipeline.py
Billy D. d4eb54d92b
All checks were successful
Compile and Upload Pipelines / Compile & Upload (push) Successful in 15s
Compile and Upload Pipelines / Notify (push) Successful in 1s
pipelines go to gravenhollow now.
2026-02-18 07:14:12 -05:00

1169 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Distributed CPU Fine-Tuning Pipeline Kubeflow Pipelines SDK
Trains a small language model (1B8B) using LoRA on CPU only,
distributed across multiple cluster nodes via KubeRay RayJob.
GPUs remain 100 % dedicated to inference serving.
Architecture:
1. Fetch PDFs from RustFS S3
2. Prepare instruction-tuning dataset
3. Upload prepared data to S3 (shared storage for Ray workers)
4. Submit a KubeRay RayJob that runs Ray Train TorchTrainer
with N CPU-only workers across cluster nodes
5. Download the trained adapter from S3
6. Sanity evaluation on CPU
7. Push adapter to Gitea
8. Log metrics to MLflow
Scaling:
Increase `num_workers` to spread training across more nodes.
Each worker runs data-parallel LoRA training; gradients are
synchronised via AllReduce (Gloo backend).
Usage:
pip install kfp==2.12.1
python cpu_training_pipeline.py
# Upload cpu_training_pipeline.yaml to Kubeflow Pipelines UI
Prerequisites in-cluster:
- Secret mlpipeline-minio-artifact (namespace kubeflow) for S3 creds
- Secret gitea-admin-secret (namespace gitea) for Gitea push
- KubeRay operator installed (namespace ray-system)
- MLflow tracking server (namespace mlflow)
- RBAC: pipeline SA needs create/get/list/delete on rayjobs.ray.io
and configmaps in the training namespace
"""
from kfp import compiler, dsl
from typing import NamedTuple
# ──────────────────────────────────────────────────────────────
# 1. Fetch PDFs from RustFS S3
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["boto3"],
)
def fetch_pdfs_from_s3(
s3_endpoint: str,
s3_bucket: str,
s3_prefix: str,
aws_access_key_id: str,
aws_secret_access_key: str,
) -> NamedTuple("PDFOutput", [("pdf_dir", str), ("num_files", int)]):
"""Download all PDFs from an S3 bucket."""
import os
import boto3
from botocore.client import Config
out_dir = "/tmp/pdfs"
os.makedirs(out_dir, exist_ok=True)
client = boto3.client(
"s3",
endpoint_url=s3_endpoint,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name="us-east-1",
config=Config(signature_version="s3v4"),
verify=False,
)
paginator = client.get_paginator("list_objects_v2")
count = 0
for page in paginator.paginate(Bucket=s3_bucket, Prefix=s3_prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
if key.lower().endswith(".pdf"):
local_path = os.path.join(out_dir, os.path.basename(key))
print(f"Downloading: {key}{local_path}")
client.download_file(s3_bucket, key, local_path)
count += 1
print(f"Downloaded {count} PDFs to {out_dir}")
from collections import namedtuple
return namedtuple("PDFOutput", ["pdf_dir", "num_files"])(
pdf_dir=out_dir, num_files=count
)
# ──────────────────────────────────────────────────────────────
# 2. Extract text from PDFs → instruction-tuning dataset
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["pymupdf"],
)
def prepare_training_data(
pdf_dir: str,
max_seq_length: int = 2048,
chunk_size: int = 512,
chunk_overlap: int = 64,
) -> NamedTuple(
"DataOutput", [("dataset_path", str), ("num_train", int), ("num_val", int)]
):
"""Extract text from PDFs, chunk it, and format as instruction-tuning pairs."""
import json
import os
import fitz # PyMuPDF
out_dir = "/tmp/training_data"
os.makedirs(out_dir, exist_ok=True)
# 1. Extract text from all PDFs
all_chunks: list[dict] = []
for fname in sorted(os.listdir(pdf_dir)):
if not fname.lower().endswith(".pdf"):
continue
path = os.path.join(pdf_dir, fname)
print(f"Extracting: {fname}")
try:
doc = fitz.open(path)
full_text = ""
for page in doc:
full_text += page.get_text() + "\n"
doc.close()
except Exception as e:
print(f" SKIP ({e})")
continue
# 2. Chunk text with overlap
words = full_text.split()
for i in range(0, len(words), chunk_size - chunk_overlap):
chunk_words = words[i : i + chunk_size]
if len(chunk_words) < 50:
continue # skip tiny trailing chunks
chunk_text = " ".join(chunk_words)
all_chunks.append({"text": chunk_text, "source": fname})
print(f"Total chunks: {len(all_chunks)}")
if not all_chunks:
raise ValueError("No text extracted from PDFs — check your bucket")
# 3. Format as chat training pairs (self-supervised continuation)
samples = []
for chunk in all_chunks:
text = chunk["text"]
source = chunk["source"]
words = text.split()
mid = len(words) // 2
context = " ".join(words[:mid])
continuation = " ".join(words[mid:])
samples.append(
{
"messages": [
{
"role": "system",
"content": (
"You are a knowledgeable assistant. "
"Continue the information accurately and coherently."
),
},
{
"role": "user",
"content": f"Continue the following passage from {source}:\n\n{context}",
},
{"role": "assistant", "content": continuation},
]
}
)
# 4. Train/val split (90/10)
import random
random.seed(42)
random.shuffle(samples)
split = int(len(samples) * 0.9)
train = samples[:split]
val = samples[split:]
train_path = os.path.join(out_dir, "train.json")
val_path = os.path.join(out_dir, "val.json")
with open(train_path, "w") as f:
json.dump(train, f)
with open(val_path, "w") as f:
json.dump(val, f)
print(f"Train: {len(train)} samples, Val: {len(val)} samples")
from collections import namedtuple
return namedtuple("DataOutput", ["dataset_path", "num_train", "num_val"])(
dataset_path=out_dir, num_train=len(train), num_val=len(val)
)
# ──────────────────────────────────────────────────────────────
# 3. Upload prepared data to S3 (so Ray workers can access it)
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["boto3"],
)
def upload_data_to_s3(
dataset_path: str,
s3_endpoint: str,
s3_bucket: str,
aws_access_key_id: str,
aws_secret_access_key: str,
run_id: str = "",
) -> NamedTuple("UploadOutput", [("data_s3_prefix", str)]):
"""Upload train.json and val.json to S3 for distributed Ray workers."""
import os
import uuid
import boto3
from botocore.client import Config
if not run_id:
run_id = uuid.uuid4().hex[:8]
prefix = f"ray-training-runs/{run_id}/data"
client = boto3.client(
"s3",
endpoint_url=s3_endpoint,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name="us-east-1",
config=Config(signature_version="s3v4"),
verify=False,
)
for fname in ["train.json", "val.json"]:
local = os.path.join(dataset_path, fname)
key = f"{prefix}/{fname}"
print(f"Uploading {local} → s3://{s3_bucket}/{key}")
client.upload_file(local, s3_bucket, key)
print(f"Data uploaded to s3://{s3_bucket}/{prefix}")
from collections import namedtuple
return namedtuple("UploadOutput", ["data_s3_prefix"])(data_s3_prefix=prefix)
# ──────────────────────────────────────────────────────────────
# Training script (embedded in the RayJob ConfigMap)
# ──────────────────────────────────────────────────────────────
_RAY_TRAIN_SCRIPT = '''\
#!/usr/bin/env python3
"""Distributed LoRA training on CPU via Ray Train TorchTrainer."""
import json
import os
import boto3
import ray
import torch
from botocore.client import Config
from datasets import Dataset
from peft import LoraConfig, get_peft_model
from ray.train import RunConfig, ScalingConfig
from ray.train.huggingface.transformers import (
RayTrainReportCallback,
prepare_trainer,
)
from ray.train.torch import TorchTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
from trl import SFTTrainer
def _s3_client(cfg):
return boto3.client(
"s3",
endpoint_url=cfg['s3_endpoint'],
aws_access_key_id=cfg["aws_access_key_id"],
aws_secret_access_key=cfg["aws_secret_access_key"],
region_name="us-east-1",
config=Config(signature_version="s3v4"),
verify=False,
)
def train_func(config):
"""Each Ray worker executes this function."""
import ray.train as rt
# ── Download data from S3 (each worker independently) ──
s3 = _s3_client(config)
data_dir = "/tmp/training_data"
os.makedirs(data_dir, exist_ok=True)
prefix = config["data_s3_prefix"]
bucket = config["s3_bucket"]
for fname in ["train.json", "val.json"]:
s3.download_file(bucket, f"{prefix}/{fname}", f"{data_dir}/{fname}")
with open(f"{data_dir}/train.json") as f:
train_data = json.load(f)
with open(f"{data_dir}/val.json") as f:
val_data = json.load(f)
rank = rt.get_context().get_world_rank()
world = rt.get_context().get_world_size()
print(f"[worker {rank}/{world}] Loaded {len(train_data)} train / "
f"{len(val_data)} val samples")
# ── Tokenizer ──────────────────────────────────────────
base_model = config["base_model"]
tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"
def format_chat(sample):
return {
"text": tokenizer.apply_chat_template(
sample["messages"], tokenize=False, add_generation_prompt=False
)
}
train_ds = Dataset.from_list(train_data).map(format_chat)
val_ds = Dataset.from_list(val_data).map(format_chat)
# ── Model (float32, CPU) ───────────────────────────────
print(f"[worker {rank}] Loading {base_model} (float32, CPU)")
model = AutoModelForCausalLM.from_pretrained(
base_model,
torch_dtype=torch.float32,
device_map="cpu",
trust_remote_code=True,
)
lora_cfg = LoraConfig(
r=config["lora_r"],
lora_alpha=config["lora_alpha"],
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
lora_dropout=config["lora_dropout"],
bias="none",
task_type="CAUSAL_LM",
)
model = get_peft_model(model, lora_cfg)
model.print_trainable_parameters()
# ── Training args (CPU-optimised) ──────────────────────
args = TrainingArguments(
output_dir="/tmp/checkpoints",
num_train_epochs=config["num_epochs"],
per_device_train_batch_size=config["batch_size"],
per_device_eval_batch_size=config["batch_size"],
gradient_accumulation_steps=config["gradient_accumulation_steps"],
learning_rate=config["learning_rate"],
bf16=False,
fp16=False,
logging_steps=10,
eval_strategy="steps",
eval_steps=100,
save_strategy="no", # Ray handles checkpointing
warmup_ratio=0.05,
lr_scheduler_type="cosine",
optim="adamw_torch",
max_grad_norm=1.0,
group_by_length=True,
gradient_checkpointing=True,
dataloader_num_workers=2,
dataloader_pin_memory=False,
no_cuda=True,
report_to="none",
)
# ── SFTTrainer + Ray integration ───────────────────────
trainer = SFTTrainer(
model=model,
args=args,
train_dataset=train_ds,
eval_dataset=val_ds,
tokenizer=tokenizer,
max_seq_length=config["max_seq_length"],
dataset_text_field="text",
packing=True,
)
# Ray Train callback reports metrics; prepare_trainer wraps
# the model with DDP and adds DistributedSampler
trainer.add_callback(RayTrainReportCallback())
trainer = prepare_trainer(trainer)
print(f"[worker {rank}] Starting distributed LoRA training …")
result = trainer.train()
eval_result = trainer.evaluate()
train_loss = result.training_loss
eval_loss = eval_result.get("eval_loss", 0.0)
print(f"[worker {rank}] train_loss={train_loss:.4f} eval_loss={eval_loss:.4f}")
# ── Rank 0: save adapter + upload to S3 ────────────────
if rank == 0:
adapter_path = "/tmp/adapter"
unwrapped = (
trainer.model.module
if hasattr(trainer.model, "module")
else trainer.model
)
unwrapped.save_pretrained(adapter_path)
tokenizer.save_pretrained(adapter_path)
metadata = {
"base_model": base_model,
"train_loss": train_loss,
"eval_loss": eval_loss,
"training_device": "cpu",
"distributed": True,
"num_workers": world,
"dtype": "float32",
"lora_r": config["lora_r"],
"lora_alpha": config["lora_alpha"],
"lora_dropout": config["lora_dropout"],
"learning_rate": config["learning_rate"],
"num_epochs": config["num_epochs"],
"batch_size": config["batch_size"],
"gradient_accumulation_steps": config["gradient_accumulation_steps"],
"max_seq_length": config["max_seq_length"],
}
with open(f"{adapter_path}/training_metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
# Upload adapter to S3
adapter_prefix = config["adapter_s3_prefix"]
for root, _, files in os.walk(adapter_path):
for fname in files:
fpath = os.path.join(root, fname)
rel = os.path.relpath(fpath, adapter_path)
key = f"{adapter_prefix}/{rel}"
s3.upload_file(fpath, bucket, key)
print(f" ✓ s3://{bucket}/{key}")
# Results marker for the KFP step to read
s3.put_object(
Bucket=bucket,
Key=f"{adapter_prefix}/results.json",
Body=json.dumps(metadata),
)
print(f"Adapter uploaded to s3://{bucket}/{adapter_prefix}")
if __name__ == "__main__":
cfg = json.loads(os.environ["TRAIN_CONFIG"])
num_workers = cfg.pop("num_workers", 4)
cpus_per_worker = cfg.pop("cpus_per_worker", 4)
ray.init()
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=num_workers,
use_gpu=False,
resources_per_worker={"CPU": cpus_per_worker},
),
run_config=RunConfig(name="cpu-lora-training"),
train_loop_config=cfg,
)
result = trainer.fit()
print(f"Training complete: {result}")
'''
# ──────────────────────────────────────────────────────────────
# 4. Submit a KubeRay RayJob for distributed CPU training
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["kubernetes", "boto3"],
)
def submit_ray_training_job(
data_s3_prefix: str,
s3_endpoint: str,
s3_bucket: str,
aws_access_key_id: str,
aws_secret_access_key: str,
base_model: str,
learning_rate: float,
num_epochs: int,
batch_size: int,
gradient_accumulation_steps: int,
max_seq_length: int,
lora_r: int,
lora_alpha: int,
lora_dropout: float,
num_workers: int = 4,
cpus_per_worker: int = 4,
memory_per_worker: str = "16Gi",
ray_image: str = "rayproject/ray:2.44.1-py311-cpu",
namespace: str = "kubeflow",
poll_interval_seconds: int = 30,
training_script: str = "",
) -> NamedTuple(
"RayTrainOutput",
[
("adapter_s3_prefix", str),
("train_loss", float),
("eval_loss", float),
("num_workers_used", int),
],
):
"""Create a KubeRay RayJob for distributed CPU LoRA training and wait."""
import json
import time
import uuid
import boto3
from botocore.client import Config
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
# ── Build identifiers ───────────────────────────────────
run_id = uuid.uuid4().hex[:8]
job_name = f"cpu-lora-{run_id}"
cm_name = f"train-script-{run_id}"
adapter_prefix = f"ray-training-runs/{run_id}/adapter"
# ── Training config (passed as TRAIN_CONFIG env var) ────
train_config = json.dumps(
{
"base_model": base_model,
"learning_rate": learning_rate,
"num_epochs": num_epochs,
"batch_size": batch_size,
"gradient_accumulation_steps": gradient_accumulation_steps,
"max_seq_length": max_seq_length,
"lora_r": lora_r,
"lora_alpha": lora_alpha,
"lora_dropout": lora_dropout,
"num_workers": num_workers,
"cpus_per_worker": cpus_per_worker,
"data_s3_prefix": data_s3_prefix,
"adapter_s3_prefix": adapter_prefix,
"s3_endpoint": s3_endpoint,
"s3_bucket": s3_bucket,
"aws_access_key_id": aws_access_key_id,
"aws_secret_access_key": aws_secret_access_key,
}
)
# ── Kubernetes client ───────────────────────────────────
k8s_config.load_incluster_config()
core_v1 = k8s_client.CoreV1Api()
custom_api = k8s_client.CustomObjectsApi()
# ── Create ConfigMap with training script ───────────────
cm = k8s_client.V1ConfigMap(
metadata=k8s_client.V1ObjectMeta(name=cm_name, namespace=namespace),
data={"train.py": training_script},
)
core_v1.create_namespaced_config_map(namespace=namespace, body=cm)
print(f"Created ConfigMap {cm_name}")
# ── Common container spec ───────────────────────────────
pip_deps = [
"torch",
"transformers",
"peft",
"datasets",
"accelerate",
"scipy",
"trl",
"boto3",
]
def make_container(name, cpu_limit, mem_limit, is_head=False):
return {
"name": name,
"image": ray_image,
"resources": {
"limits": {"cpu": cpu_limit, "memory": mem_limit},
"requests": {
"cpu": str(max(1, int(cpu_limit) // 2)),
"memory": mem_limit,
},
},
"env": [
{"name": "TRAIN_CONFIG", "value": train_config},
{"name": "RAY_DEDUP_LOGS", "value": "0"},
],
"volumeMounts": [
{
"name": "train-script",
"mountPath": "/home/ray/train.py",
"subPath": "train.py",
}
],
}
# ── RayJob spec ─────────────────────────────────────────
ray_job = {
"apiVersion": "ray.io/v1",
"kind": "RayJob",
"metadata": {"name": job_name, "namespace": namespace},
"spec": {
"entrypoint": "python /home/ray/train.py",
"shutdownAfterJobFinishes": True,
"ttlSecondsAfterFinished": 300,
"runtimeEnvYAML": json.dumps({"pip": pip_deps}),
"rayClusterSpec": {
"rayVersion": "2.44.1",
"headGroupSpec": {
"rayStartParams": {
"dashboard-host": "0.0.0.0",
"num-cpus": "0",
},
"template": {
"spec": {
"containers": [
make_container(
"ray-head", "2", "4Gi", is_head=True
)
],
"volumes": [
{
"name": "train-script",
"configMap": {"name": cm_name},
}
],
}
},
},
"workerGroupSpecs": [
{
"replicas": num_workers,
"minReplicas": num_workers,
"maxReplicas": num_workers,
"groupName": "cpu-workers",
"rayStartParams": {
"num-cpus": str(cpus_per_worker),
},
"template": {
"spec": {
"containers": [
make_container(
"ray-worker",
str(cpus_per_worker),
memory_per_worker,
)
],
"volumes": [
{
"name": "train-script",
"configMap": {"name": cm_name},
}
],
}
},
}
],
},
},
}
# ── Submit RayJob ───────────────────────────────────────
custom_api.create_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayjobs",
body=ray_job,
)
print(f"Submitted RayJob {job_name} ({num_workers} workers × "
f"{cpus_per_worker} CPUs)")
# ── Poll until completion ───────────────────────────────
terminal_states = {"SUCCEEDED", "FAILED", "STOPPED"}
while True:
time.sleep(poll_interval_seconds)
obj = custom_api.get_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayjobs",
name=job_name,
)
status = obj.get("status", {})
job_status = status.get("jobStatus", "PENDING")
print(f" RayJob {job_name}: {job_status}")
if job_status in terminal_states:
break
if job_status != "SUCCEEDED":
msg = status.get("message", "unknown error")
raise RuntimeError(f"RayJob {job_name} failed: {job_status}{msg}")
print(f"RayJob {job_name} completed successfully")
# ── Read results from S3 ────────────────────────────────
s3 = boto3.client(
"s3",
endpoint_url=s3_endpoint,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name="us-east-1",
config=Config(signature_version="s3v4"),
verify=False,
)
results_obj = s3.get_object(
Bucket=s3_bucket, Key=f"{adapter_prefix}/results.json"
)
results = json.loads(results_obj["Body"].read().decode("utf-8"))
train_loss = float(results.get("train_loss", 0.0))
eval_loss = float(results.get("eval_loss", 0.0))
print(f"Results: train_loss={train_loss:.4f}, eval_loss={eval_loss:.4f}")
# ── Cleanup ConfigMap ───────────────────────────────────
try:
core_v1.delete_namespaced_config_map(name=cm_name, namespace=namespace)
print(f"Deleted ConfigMap {cm_name}")
except Exception as e:
print(f"ConfigMap cleanup warning: {e}")
from collections import namedtuple
return namedtuple(
"RayTrainOutput",
["adapter_s3_prefix", "train_loss", "eval_loss", "num_workers_used"],
)(
adapter_s3_prefix=adapter_prefix,
train_loss=train_loss,
eval_loss=eval_loss,
num_workers_used=num_workers,
)
# ──────────────────────────────────────────────────────────────
# 5. Download trained adapter from S3
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["boto3"],
)
def download_adapter_from_s3(
adapter_s3_prefix: str,
s3_endpoint: str,
s3_bucket: str,
aws_access_key_id: str,
aws_secret_access_key: str,
) -> NamedTuple("AdapterOutput", [("adapter_path", str)]):
"""Download the trained LoRA adapter from S3."""
import os
import boto3
from botocore.client import Config
client = boto3.client(
"s3",
endpoint_url=s3_endpoint,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name="us-east-1",
config=Config(signature_version="s3v4"),
verify=False,
)
adapter_path = "/tmp/adapter"
os.makedirs(adapter_path, exist_ok=True)
paginator = client.get_paginator("list_objects_v2")
count = 0
for page in paginator.paginate(Bucket=s3_bucket, Prefix=adapter_s3_prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
rel = key[len(adapter_s3_prefix) :].lstrip("/")
if not rel or rel == "results.json":
continue
local = os.path.join(adapter_path, rel)
os.makedirs(os.path.dirname(local), exist_ok=True)
client.download_file(s3_bucket, key, local)
count += 1
print(f"{rel}")
print(f"Downloaded {count} adapter files to {adapter_path}")
from collections import namedtuple
return namedtuple("AdapterOutput", ["adapter_path"])(adapter_path=adapter_path)
# ──────────────────────────────────────────────────────────────
# 6. Sanity evaluation (CPU-only)
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=[
"torch",
"transformers",
"peft",
"accelerate",
"scipy",
],
)
def evaluate_adapter_cpu(
adapter_path: str,
base_model: str,
) -> NamedTuple("EvalOutput", [("report", str), ("passed", bool)]):
"""Load the LoRA adapter on CPU and run sanity-check prompts."""
import torch
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
print(f"Loading base model {base_model} (CPU) …")
model = AutoModelForCausalLM.from_pretrained(
base_model,
torch_dtype=torch.float32,
device_map="cpu",
trust_remote_code=True,
)
tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True)
print(f"Loading adapter from {adapter_path}")
model = PeftModel.from_pretrained(model, adapter_path)
model.eval()
test_prompts = [
"Summarise the key points from the training material.",
"What are the main topics covered in the source documents?",
"Explain the most important concept from the training data.",
]
lines = []
for prompt in test_prompts:
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt},
]
input_text = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
inputs = tokenizer(input_text, return_tensors="pt")
with torch.no_grad():
out = model.generate(
**inputs, max_new_tokens=128, temperature=0.7, do_sample=True
)
response = tokenizer.decode(
out[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True
)
lines.append(f"Q: {prompt}\nA: {response}\n")
print(lines[-1])
report = "\n".join(lines)
passed = all(len(line.split("A:")[1].strip()) > 10 for line in lines)
print(f"Evaluation passed: {passed}")
from collections import namedtuple
return namedtuple("EvalOutput", ["report", "passed"])(
report=report, passed=passed
)
# ──────────────────────────────────────────────────────────────
# 7. Push adapter to Gitea repo
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["requests"],
)
def push_adapter_to_gitea(
adapter_path: str,
gitea_url: str,
gitea_owner: str,
gitea_repo: str,
gitea_username: str,
gitea_password: str,
branch: str = "main",
commit_message: str = "feat: add LoRA adapter from distributed CPU training",
) -> NamedTuple("PushOutput", [("repo_url", str), ("files_pushed", int)]):
"""Push the LoRA adapter files to a Gitea repository via the API."""
import base64
import os
import requests
api_base = f"{gitea_url}/api/v1"
auth = (gitea_username, gitea_password)
repo_api = f"{api_base}/repos/{gitea_owner}/{gitea_repo}"
# Check if repo exists, create if not
resp = requests.get(repo_api, auth=auth, timeout=30)
if resp.status_code == 404:
print(f"Creating repo {gitea_owner}/{gitea_repo}")
create_resp = requests.post(
f"{api_base}/orgs/{gitea_owner}/repos"
if gitea_owner != gitea_username
else f"{api_base}/user/repos",
auth=auth,
json={
"name": gitea_repo,
"description": "LoRA adapters trained via distributed CPU pipeline",
"private": False,
"auto_init": True,
},
timeout=30,
)
create_resp.raise_for_status()
print(f"Created: {create_resp.json().get('html_url')}")
# Collect all adapter files
files_to_push = []
for root, _dirs, files in os.walk(adapter_path):
for fname in files:
fpath = os.path.join(root, fname)
rel_path = os.path.relpath(fpath, adapter_path)
with open(fpath, "rb") as f:
content = base64.b64encode(f.read()).decode("utf-8")
files_to_push.append({"path": rel_path, "content": content})
print(f"Pushing {len(files_to_push)} files to {gitea_owner}/{gitea_repo}")
pushed = 0
for item in files_to_push:
file_api = f"{repo_api}/contents/{item['path']}"
existing = requests.get(
file_api, auth=auth, params={"ref": branch}, timeout=30
)
payload = {
"message": commit_message,
"content": item["content"],
"branch": branch,
}
if existing.status_code == 200:
payload["sha"] = existing.json()["sha"]
resp = requests.put(file_api, auth=auth, json=payload, timeout=60)
else:
resp = requests.post(file_api, auth=auth, json=payload, timeout=60)
if resp.status_code in (200, 201):
pushed += 1
print(f"{item['path']}")
else:
print(f"{item['path']}: {resp.status_code} {resp.text[:200]}")
repo_url = f"{gitea_url}/{gitea_owner}/{gitea_repo}"
print(f"Pushed {pushed}/{len(files_to_push)} files to {repo_url}")
from collections import namedtuple
return namedtuple("PushOutput", ["repo_url", "files_pushed"])(
repo_url=repo_url, files_pushed=pushed
)
# ──────────────────────────────────────────────────────────────
# 8. Log metrics to MLflow
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["mlflow==2.22.0"],
)
def log_training_metrics(
base_model: str,
train_loss: float,
eval_loss: float,
num_train: int,
num_val: int,
num_pdfs: int,
lora_r: int,
lora_alpha: int,
learning_rate: float,
num_epochs: int,
num_workers: int,
repo_url: str,
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
experiment_name: str = "cpu-lora-training",
):
"""Log the full training run to MLflow."""
import mlflow
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=f"cpu-lora-{base_model.split('/')[-1]}"):
mlflow.log_params(
{
"base_model": base_model,
"training_device": "cpu",
"distributed": True,
"num_workers": num_workers,
"dtype": "float32",
"lora_r": lora_r,
"lora_alpha": lora_alpha,
"learning_rate": learning_rate,
"num_epochs": num_epochs,
"num_pdfs": num_pdfs,
"backend": "ray-train-gloo",
"data_source": "rustfs/training-data",
}
)
mlflow.log_metrics(
{
"train_loss": train_loss,
"eval_loss": eval_loss,
"train_samples": float(num_train),
"val_samples": float(num_val),
}
)
mlflow.set_tag("adapter_repo", repo_url)
mlflow.set_tag("training_type", "distributed-cpu-lora")
mlflow.set_tag("orchestrator", "kuberay-rayjob")
# ──────────────────────────────────────────────────────────────
# Pipeline definition
# ──────────────────────────────────────────────────────────────
@dsl.pipeline(
name="Distributed CPU LoRA Fine-Tuning",
description=(
"Fine-tune a small language model (1B8B) using LoRA distributed "
"across cluster CPU nodes via KubeRay RayJob + Ray Train. "
"Scale by increasing num_workers. Pushes adapter to Gitea and "
"logs to MLflow."
),
)
def cpu_training_pipeline(
# ── S3 / RustFS ──
s3_endpoint: str = "https://gravenhollow.lab.daviestechlabs.io:30292",
s3_bucket: str = "training-data",
s3_prefix: str = "",
aws_access_key_id: str = "",
aws_secret_access_key: str = "",
# ── Model (small, CPU-friendly) ──
base_model: str = "Qwen/Qwen2.5-3B-Instruct",
# ── Training hyper-params (conservative for CPU) ──
learning_rate: float = 2e-5,
num_epochs: int = 3,
batch_size: int = 1,
gradient_accumulation_steps: int = 16,
max_seq_length: int = 1024,
lora_r: int = 16,
lora_alpha: int = 32,
lora_dropout: float = 0.05,
# ── Data prep ──
chunk_size: int = 512,
chunk_overlap: int = 64,
# ── Ray Train (scale here) ──
num_workers: int = 4,
cpus_per_worker: int = 4,
memory_per_worker: str = "16Gi",
ray_image: str = "rayproject/ray:2.44.1-py311-cpu",
ray_namespace: str = "kubeflow",
# ── Gitea ──
gitea_url: str = "http://gitea-http.gitea.svc.cluster.local:3000",
gitea_owner: str = "daviestechlabs",
gitea_repo: str = "lora-adapters-cpu",
gitea_username: str = "",
gitea_password: str = "",
# ── MLflow ──
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
):
# Step 1 — Fetch PDFs from S3
pdfs = fetch_pdfs_from_s3(
s3_endpoint=s3_endpoint,
s3_bucket=s3_bucket,
s3_prefix=s3_prefix,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
# Step 2 — Extract text and build training dataset
data = prepare_training_data(
pdf_dir=pdfs.outputs["pdf_dir"],
max_seq_length=max_seq_length,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
# Step 3 — Upload training data to S3 for Ray workers
uploaded = upload_data_to_s3(
dataset_path=data.outputs["dataset_path"],
s3_endpoint=s3_endpoint,
s3_bucket=s3_bucket,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
# Step 4 — Distributed CPU training via KubeRay RayJob
trained = submit_ray_training_job(
data_s3_prefix=uploaded.outputs["data_s3_prefix"],
s3_endpoint=s3_endpoint,
s3_bucket=s3_bucket,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
base_model=base_model,
learning_rate=learning_rate,
num_epochs=num_epochs,
batch_size=batch_size,
gradient_accumulation_steps=gradient_accumulation_steps,
max_seq_length=max_seq_length,
lora_r=lora_r,
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
num_workers=num_workers,
cpus_per_worker=cpus_per_worker,
memory_per_worker=memory_per_worker,
ray_image=ray_image,
namespace=ray_namespace,
training_script=_RAY_TRAIN_SCRIPT,
)
# The RayJob submission pod itself is lightweight
trained.set_cpu_limit("1")
trained.set_memory_limit("1Gi")
# Step 5 — Download trained adapter from S3
adapter = download_adapter_from_s3(
adapter_s3_prefix=trained.outputs["adapter_s3_prefix"],
s3_endpoint=s3_endpoint,
s3_bucket=s3_bucket,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
# Step 6 — Quick evaluation (CPU-only)
evaluated = evaluate_adapter_cpu(
adapter_path=adapter.outputs["adapter_path"],
base_model=base_model,
)
evaluated.set_cpu_limit("4")
evaluated.set_memory_limit("16Gi")
evaluated.set_memory_request("8Gi")
# Step 7 — Push adapter to Gitea
pushed = push_adapter_to_gitea(
adapter_path=adapter.outputs["adapter_path"],
gitea_url=gitea_url,
gitea_owner=gitea_owner,
gitea_repo=gitea_repo,
gitea_username=gitea_username,
gitea_password=gitea_password,
)
# Step 8 — Log to MLflow
log_training_metrics(
base_model=base_model,
train_loss=trained.outputs["train_loss"],
eval_loss=trained.outputs["eval_loss"],
num_train=data.outputs["num_train"],
num_val=data.outputs["num_val"],
num_pdfs=pdfs.outputs["num_files"],
lora_r=lora_r,
lora_alpha=lora_alpha,
learning_rate=learning_rate,
num_epochs=num_epochs,
num_workers=trained.outputs["num_workers_used"],
repo_url=pushed.outputs["repo_url"],
mlflow_tracking_uri=mlflow_tracking_uri,
)
# ──────────────────────────────────────────────────────────────
# Compile
# ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=cpu_training_pipeline,
package_path="cpu_training_pipeline.yaml",
)
print("Compiled: cpu_training_pipeline.yaml")