diff --git a/.gitea/workflows/compile-upload.yaml b/.gitea/workflows/compile-upload.yaml new file mode 100644 index 0000000..ae06a78 --- /dev/null +++ b/.gitea/workflows/compile-upload.yaml @@ -0,0 +1,221 @@ +name: Compile and Upload Pipelines + +on: + push: + branches: [main] + paths: + - "**/*_pipeline.py" + - "**/*pipeline*.py" + workflow_dispatch: + +env: + NTFY_URL: http://ntfy.observability.svc.cluster.local:80 + KUBEFLOW_HOST: http://ml-pipeline.kubeflow.svc.cluster.local:8888 + +jobs: + compile-and-upload: + name: Compile & Upload + runs-on: ubuntu-latest + outputs: + compiled: ${{ steps.compile.outputs.compiled }} + failed: ${{ steps.compile.outputs.failed }} + uploaded: ${{ steps.upload.outputs.uploaded }} + upload_failed: ${{ steps.upload.outputs.failed }} + version: ${{ steps.upload.outputs.version }} + uploaded_names: ${{ steps.upload.outputs.uploaded_names }} + failed_names: ${{ steps.upload.outputs.failed_names }} + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.13" + + - name: Install KFP + run: pip install kfp==2.12.1 + + - name: Discover pipeline files + id: discover + run: | + # Find all pipeline Python files + FILES=$(find . -maxdepth 1 -name '*_pipeline.py' -o -name '*pipeline*.py' | sort) + COUNT=$(echo "$FILES" | grep -c '.' || true) + echo "files<> $GITHUB_OUTPUT + echo "$FILES" >> $GITHUB_OUTPUT + echo "EOF" >> $GITHUB_OUTPUT + echo "count=$COUNT" >> $GITHUB_OUTPUT + echo "Found $COUNT pipeline files:" + echo "$FILES" + + - name: Compile pipelines + id: compile + run: | + COMPILED=0 + FAILED=0 + COMPILED_LIST="" + FAILED_LIST="" + + for py_file in ${{ steps.discover.outputs.files }}; do + name=$(basename "$py_file" .py) + echo "::group::Compiling $name" + + if python "$py_file"; then + yaml_file="${name}.yaml" + if [ -f "$yaml_file" ]; then + echo "✓ Compiled $name → $yaml_file" + COMPILED=$((COMPILED + 1)) + COMPILED_LIST="${COMPILED_LIST}${name}\n" + else + echo "✗ $name produced no YAML output" + FAILED=$((FAILED + 1)) + FAILED_LIST="${FAILED_LIST}${name}\n" + fi + else + echo "✗ Failed to compile $name" + FAILED=$((FAILED + 1)) + FAILED_LIST="${FAILED_LIST}${name}\n" + fi + + echo "::endgroup::" + done + + echo "compiled=$COMPILED" >> $GITHUB_OUTPUT + echo "failed=$FAILED" >> $GITHUB_OUTPUT + echo "compiled_list<> $GITHUB_OUTPUT + echo -e "$COMPILED_LIST" >> $GITHUB_OUTPUT + echo "EOF" >> $GITHUB_OUTPUT + echo "failed_list<> $GITHUB_OUTPUT + echo -e "$FAILED_LIST" >> $GITHUB_OUTPUT + echo "EOF" >> $GITHUB_OUTPUT + + echo "" + echo "=== Summary ===" + echo "Compiled: $COMPILED" + echo "Failed: $FAILED" + + if [ "$FAILED" -gt 0 ]; then + echo "::warning::$FAILED pipeline(s) failed to compile" + fi + + - name: Upload pipelines to Kubeflow + id: upload + run: | + python3 << 'UPLOAD_SCRIPT' + import os + import sys + from pathlib import Path + from datetime import datetime + from kfp import Client + + host = os.environ["KUBEFLOW_HOST"] + print(f"Connecting to Kubeflow at {host}") + + try: + client = Client(host=host) + client.list_pipelines(page_size=1) + print("Connected to Kubeflow Pipelines") + except Exception as e: + print(f"ERROR: Cannot connect to Kubeflow: {e}") + sys.exit(1) + + # Get all compiled YAML files + yaml_files = sorted(Path(".").glob("*_pipeline.yaml")) + if not yaml_files: + yaml_files = sorted(Path(".").glob("*pipeline*.yaml")) + + uploaded = 0 + failed = 0 + uploaded_names = [] + failed_names = [] + version_tag = f"v{datetime.now().strftime('%Y%m%d-%H%M%S')}" + + for yaml_path in yaml_files: + pipeline_name = yaml_path.stem.replace("_", "-") + print(f"\n--- {pipeline_name} ---") + + try: + # Check if pipeline already exists + existing = None + all_pipelines = client.list_pipelines(page_size=200) + if all_pipelines.pipelines: + for p in all_pipelines.pipelines: + if p.display_name == pipeline_name: + existing = p + break + + if existing: + print(f" Updating: {pipeline_name} ({existing.pipeline_id})") + client.upload_pipeline_version( + pipeline_package_path=str(yaml_path), + pipeline_version_name=version_tag, + pipeline_id=existing.pipeline_id, + ) + else: + print(f" Creating: {pipeline_name}") + client.upload_pipeline( + pipeline_package_path=str(yaml_path), + pipeline_name=pipeline_name, + ) + + uploaded += 1 + uploaded_names.append(pipeline_name) + print(f" ✓ Done") + + except Exception as e: + failed += 1 + failed_names.append(pipeline_name) + print(f" ✗ Error: {e}") + + # Write outputs + with open(os.environ["GITHUB_OUTPUT"], "a") as f: + f.write(f"uploaded={uploaded}\n") + f.write(f"failed={failed}\n") + f.write(f"version={version_tag}\n") + f.write(f"uploaded_names={', '.join(uploaded_names)}\n") + f.write(f"failed_names={', '.join(failed_names)}\n") + + print(f"\n=== Upload Summary ===") + print(f"Uploaded: {uploaded}") + print(f"Failed: {failed}") + + if failed > 0: + sys.exit(1) + UPLOAD_SCRIPT + + notify: + name: Notify + runs-on: ubuntu-latest + needs: [compile-and-upload] + if: always() + steps: + - name: Notify on success + if: needs.compile-and-upload.result == 'success' + run: | + curl -s \ + -H "Title: ✅ Pipelines uploaded to Kubeflow" \ + -H "Priority: default" \ + -H "Tags: white_check_mark,rocket" \ + -H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \ + -d "Branch: ${{ gitea.ref_name }} + Commit: ${{ gitea.event.head_commit.message || gitea.sha }} + Compiled: ${{ needs.compile-and-upload.outputs.compiled || '?' }} pipeline(s) + Uploaded: ${{ needs.compile-and-upload.outputs.uploaded || '?' }} pipeline(s) + Version: ${{ needs.compile-and-upload.outputs.version || 'n/a' }}" \ + ${{ env.NTFY_URL }}/gitea-ci + + - name: Notify on failure + if: needs.compile-and-upload.result == 'failure' + run: | + curl -s \ + -H "Title: ❌ Pipeline upload failed" \ + -H "Priority: high" \ + -H "Tags: x,rocket" \ + -H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \ + -d "Branch: ${{ gitea.ref_name }} + Commit: ${{ gitea.event.head_commit.message || gitea.sha }} + Compiled: ${{ needs.compile-and-upload.outputs.compiled || '?' }}, Failed compile: ${{ needs.compile-and-upload.outputs.failed || '?' }} + Upload failures: ${{ needs.compile-and-upload.outputs.failed_names || 'unknown' }} + Check logs for details." \ + ${{ env.NTFY_URL }}/gitea-ci diff --git a/qlora_pdf_pipeline.py b/qlora_pdf_pipeline.py new file mode 100644 index 0000000..08fee84 --- /dev/null +++ b/qlora_pdf_pipeline.py @@ -0,0 +1,705 @@ +#!/usr/bin/env python3 +""" +QLoRA Fine-Tuning Pipeline – Kubeflow Pipelines SDK + +Fetches PDFs from a Quobjects S3 bucket, extracts instruction-tuning +data, trains a QLoRA adapter on the Llama 3.1 70B base model using +the Strix Halo's 128 GB unified memory, evaluates it, and pushes the +adapter weights to a Gitea repository. + +Usage: + pip install kfp==2.12.1 + python qlora_pdf_pipeline.py + # Upload qlora_pdf_pipeline.yaml to Kubeflow Pipelines UI + +Prerequisites in-cluster: + - Secret mlpipeline-minio-artifact (namespace kubeflow) for S3 creds + - Secret gitea-admin-secret (namespace gitea) for Gitea push + - Node khelben with amd.com/gpu and the ROCm PyTorch image +""" + +from kfp import compiler, dsl +from typing import NamedTuple + + +# ────────────────────────────────────────────────────────────── +# 1. Fetch PDFs from Quobjects S3 +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["boto3"], +) +def fetch_pdfs_from_s3( + s3_endpoint: str, + s3_bucket: str, + s3_prefix: str, + aws_access_key_id: str, + aws_secret_access_key: str, +) -> NamedTuple("PDFOutput", [("pdf_dir", str), ("num_files", int)]): + """Download all PDFs from a Quobjects S3 bucket.""" + import os + import boto3 + from botocore.client import Config + + out_dir = "/tmp/pdfs" + os.makedirs(out_dir, exist_ok=True) + + client = boto3.client( + "s3", + endpoint_url=f"http://{s3_endpoint}", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name="us-east-1", + config=Config(signature_version="s3v4"), + ) + + paginator = client.get_paginator("list_objects_v2") + count = 0 + for page in paginator.paginate(Bucket=s3_bucket, Prefix=s3_prefix): + for obj in page.get("Contents", []): + key = obj["Key"] + if key.lower().endswith(".pdf"): + local_path = os.path.join(out_dir, os.path.basename(key)) + print(f"Downloading: {key} → {local_path}") + client.download_file(s3_bucket, key, local_path) + count += 1 + + print(f"Downloaded {count} PDFs to {out_dir}") + from collections import namedtuple + + return namedtuple("PDFOutput", ["pdf_dir", "num_files"])( + pdf_dir=out_dir, num_files=count + ) + + +# ────────────────────────────────────────────────────────────── +# 2. Extract text from PDFs → instruction-tuning dataset +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["pymupdf"], +) +def prepare_training_data( + pdf_dir: str, + max_seq_length: int = 2048, + chunk_size: int = 512, + chunk_overlap: int = 64, +) -> NamedTuple("DataOutput", [("dataset_path", str), ("num_train", int), ("num_val", int)]): + """Extract text from PDFs, chunk it, and format as instruction-tuning pairs.""" + import json + import os + import fitz # PyMuPDF + + out_dir = "/tmp/training_data" + os.makedirs(out_dir, exist_ok=True) + + # 1. Extract text from all PDFs + all_chunks: list[dict] = [] + for fname in sorted(os.listdir(pdf_dir)): + if not fname.lower().endswith(".pdf"): + continue + path = os.path.join(pdf_dir, fname) + print(f"Extracting: {fname}") + try: + doc = fitz.open(path) + full_text = "" + for page in doc: + full_text += page.get_text() + "\n" + doc.close() + except Exception as e: + print(f" SKIP ({e})") + continue + + # 2. Chunk text with overlap + words = full_text.split() + for i in range(0, len(words), chunk_size - chunk_overlap): + chunk_words = words[i : i + chunk_size] + if len(chunk_words) < 50: + continue # skip tiny trailing chunks + chunk_text = " ".join(chunk_words) + all_chunks.append({"text": chunk_text, "source": fname}) + + print(f"Total chunks: {len(all_chunks)}") + if not all_chunks: + raise ValueError("No text extracted from PDFs — check your bucket") + + # 3. Format as Llama 3 chat training pairs + # We create self-supervised pairs: model learns to continue/explain the content + samples = [] + for chunk in all_chunks: + text = chunk["text"] + source = chunk["source"] + # Split chunk roughly in half for input/output + words = text.split() + mid = len(words) // 2 + context = " ".join(words[:mid]) + continuation = " ".join(words[mid:]) + + samples.append( + { + "messages": [ + { + "role": "system", + "content": ( + "You are a knowledgeable assistant. " + "Continue the information accurately and coherently." + ), + }, + { + "role": "user", + "content": f"Continue the following passage from {source}:\n\n{context}", + }, + {"role": "assistant", "content": continuation}, + ] + } + ) + + # 4. Train/val split (90/10) + import random + + random.seed(42) + random.shuffle(samples) + split = int(len(samples) * 0.9) + train = samples[:split] + val = samples[split:] + + train_path = os.path.join(out_dir, "train.json") + val_path = os.path.join(out_dir, "val.json") + with open(train_path, "w") as f: + json.dump(train, f) + with open(val_path, "w") as f: + json.dump(val, f) + + print(f"Train: {len(train)} samples, Val: {len(val)} samples") + from collections import namedtuple + + return namedtuple("DataOutput", ["dataset_path", "num_train", "num_val"])( + dataset_path=out_dir, num_train=len(train), num_val=len(val) + ) + + +# ────────────────────────────────────────────────────────────── +# 3. QLoRA training on Strix Halo (ROCm, 128 GB unified) +# ────────────────────────────────────────────────────────────── +@dsl.component( + # Use a ROCm base image with PyTorch + PEFT pre-installed. + # Falls back to pip-installing if not present. + base_image="python:3.13-slim", + packages_to_install=[ + "torch", + "transformers", + "peft", + "datasets", + "accelerate", + "bitsandbytes", + "scipy", + "trl", + ], +) +def train_qlora( + dataset_path: str, + base_model: str, + learning_rate: float = 2e-4, + num_epochs: int = 3, + batch_size: int = 2, + gradient_accumulation_steps: int = 8, + max_seq_length: int = 2048, + lora_r: int = 64, + lora_alpha: int = 16, + lora_dropout: float = 0.05, +) -> NamedTuple( + "TrainOutput", + [("adapter_path", str), ("train_loss", float), ("eval_loss", float)], +): + """QLoRA fine-tune Llama 3.1 70B with 4-bit NF4 quantization.""" + import json + import os + + import torch + from datasets import Dataset + from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training + from transformers import ( + AutoModelForCausalLM, + AutoTokenizer, + BitsAndBytesConfig, + TrainingArguments, + ) + from trl import SFTTrainer + + output_dir = "/tmp/qlora_output" + os.makedirs(output_dir, exist_ok=True) + + # ── Load data ─────────────────────────────────────────── + with open(os.path.join(dataset_path, "train.json")) as f: + train_data = json.load(f) + with open(os.path.join(dataset_path, "val.json")) as f: + val_data = json.load(f) + + print(f"Loaded {len(train_data)} train / {len(val_data)} val samples") + + # ── Tokenizer ─────────────────────────────────────────── + print(f"Loading tokenizer: {base_model}") + tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + tokenizer.padding_side = "right" + + # ── Format with chat template ─────────────────────────── + def format_chat(sample): + return {"text": tokenizer.apply_chat_template( + sample["messages"], tokenize=False, add_generation_prompt=False + )} + + train_ds = Dataset.from_list(train_data).map(format_chat) + val_ds = Dataset.from_list(val_data).map(format_chat) + + # ── 4-bit quantisation ────────────────────────────────── + bnb_config = BitsAndBytesConfig( + load_in_4bit=True, + bnb_4bit_quant_type="nf4", + bnb_4bit_compute_dtype=torch.bfloat16, + bnb_4bit_use_double_quant=True, + ) + + print(f"Loading model: {base_model} (4-bit NF4)") + model = AutoModelForCausalLM.from_pretrained( + base_model, + quantization_config=bnb_config, + device_map="auto", + trust_remote_code=True, + torch_dtype=torch.bfloat16, + ) + model = prepare_model_for_kbit_training(model) + + # ── LoRA config ───────────────────────────────────────── + lora_config = LoraConfig( + r=lora_r, + lora_alpha=lora_alpha, + target_modules=[ + "q_proj", "k_proj", "v_proj", "o_proj", + "gate_proj", "up_proj", "down_proj", + ], + lora_dropout=lora_dropout, + bias="none", + task_type="CAUSAL_LM", + ) + + model = get_peft_model(model, lora_config) + model.print_trainable_parameters() + + # ── Training args ─────────────────────────────────────── + training_args = TrainingArguments( + output_dir=os.path.join(output_dir, "checkpoints"), + num_train_epochs=num_epochs, + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + gradient_accumulation_steps=gradient_accumulation_steps, + learning_rate=learning_rate, + bf16=True, + logging_steps=5, + eval_strategy="steps", + eval_steps=50, + save_strategy="steps", + save_steps=100, + save_total_limit=2, + load_best_model_at_end=True, + metric_for_best_model="eval_loss", + report_to="none", + warmup_ratio=0.03, + lr_scheduler_type="cosine", + optim="paged_adamw_8bit", + max_grad_norm=0.3, + group_by_length=True, + ) + + # ── SFTTrainer ────────────────────────────────────────── + trainer = SFTTrainer( + model=model, + args=training_args, + train_dataset=train_ds, + eval_dataset=val_ds, + tokenizer=tokenizer, + max_seq_length=max_seq_length, + dataset_text_field="text", + packing=True, # pack short samples for efficiency + ) + + print("Starting QLoRA training …") + result = trainer.train() + train_loss = result.training_loss + + eval_result = trainer.evaluate() + eval_loss = eval_result.get("eval_loss", 0.0) + + print(f"Train loss: {train_loss:.4f}, Eval loss: {eval_loss:.4f}") + + # ── Save adapter ──────────────────────────────────────── + adapter_path = os.path.join(output_dir, "adapter") + model.save_pretrained(adapter_path) + tokenizer.save_pretrained(adapter_path) + + metadata = { + "base_model": base_model, + "lora_r": lora_r, + "lora_alpha": lora_alpha, + "lora_dropout": lora_dropout, + "learning_rate": learning_rate, + "num_epochs": num_epochs, + "batch_size": batch_size, + "gradient_accumulation_steps": gradient_accumulation_steps, + "max_seq_length": max_seq_length, + "train_samples": len(train_data), + "val_samples": len(val_data), + "train_loss": train_loss, + "eval_loss": eval_loss, + } + with open(os.path.join(adapter_path, "training_metadata.json"), "w") as f: + json.dump(metadata, f, indent=2) + + print(f"Adapter saved to {adapter_path}") + + from collections import namedtuple + + return namedtuple("TrainOutput", ["adapter_path", "train_loss", "eval_loss"])( + adapter_path=adapter_path, + train_loss=train_loss, + eval_loss=eval_loss, + ) + + +# ────────────────────────────────────────────────────────────── +# 4. Quick sanity evaluation +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=[ + "torch", "transformers", "peft", "bitsandbytes", "accelerate", "scipy", + ], +) +def evaluate_adapter( + adapter_path: str, + base_model: str, +) -> NamedTuple("EvalOutput", [("report", str), ("passed", bool)]): + """Load the QLoRA adapter and run a few sanity-check prompts.""" + import torch + from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig + from peft import PeftModel + + bnb_config = BitsAndBytesConfig( + load_in_4bit=True, + bnb_4bit_quant_type="nf4", + bnb_4bit_compute_dtype=torch.bfloat16, + bnb_4bit_use_double_quant=True, + ) + + print(f"Loading base model {base_model} …") + model = AutoModelForCausalLM.from_pretrained( + base_model, + quantization_config=bnb_config, + device_map="auto", + trust_remote_code=True, + torch_dtype=torch.bfloat16, + ) + tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) + + print(f"Loading adapter from {adapter_path} …") + model = PeftModel.from_pretrained(model, adapter_path) + model.eval() + + test_prompts = [ + "Summarise the key points from the training material.", + "What are the main topics covered in the source documents?", + "Explain the most important concept from the training data.", + ] + + lines = [] + for prompt in test_prompts: + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt}, + ] + input_text = tokenizer.apply_chat_template( + messages, tokenize=False, add_generation_prompt=True + ) + inputs = tokenizer(input_text, return_tensors="pt").to(model.device) + with torch.no_grad(): + out = model.generate(**inputs, max_new_tokens=128, temperature=0.7, do_sample=True) + response = tokenizer.decode(out[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True) + lines.append(f"Q: {prompt}\nA: {response}\n") + print(lines[-1]) + + report = "\n".join(lines) + # Simple heuristic: did the model produce non-empty responses? + passed = all(len(l.split("A:")[1].strip()) > 10 for l in lines) + print(f"Evaluation passed: {passed}") + + from collections import namedtuple + + return namedtuple("EvalOutput", ["report", "passed"])( + report=report, passed=passed + ) + + +# ────────────────────────────────────────────────────────────── +# 5. Push adapter to Gitea repo +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["requests"], +) +def push_adapter_to_gitea( + adapter_path: str, + gitea_url: str, + gitea_owner: str, + gitea_repo: str, + gitea_username: str, + gitea_password: str, + branch: str = "main", + commit_message: str = "feat: add QLoRA adapter from PDF training pipeline", +) -> NamedTuple("PushOutput", [("repo_url", str), ("files_pushed", int)]): + """Push the QLoRA adapter files to a Gitea repository via the API.""" + import base64 + import json + import os + import requests + + api_base = f"{gitea_url}/api/v1" + auth = (gitea_username, gitea_password) + repo_api = f"{api_base}/repos/{gitea_owner}/{gitea_repo}" + + # Check if repo exists, create if not + resp = requests.get(repo_api, auth=auth, timeout=30) + if resp.status_code == 404: + print(f"Creating repo {gitea_owner}/{gitea_repo} …") + create_resp = requests.post( + f"{api_base}/orgs/{gitea_owner}/repos" + if gitea_owner != gitea_username + else f"{api_base}/user/repos", + auth=auth, + json={ + "name": gitea_repo, + "description": "QLoRA adapters trained from PDF documents", + "private": False, + "auto_init": True, + }, + timeout=30, + ) + create_resp.raise_for_status() + print(f"Created: {create_resp.json().get('html_url')}") + + # Collect all adapter files + files_to_push = [] + for root, dirs, files in os.walk(adapter_path): + for fname in files: + fpath = os.path.join(root, fname) + rel_path = os.path.relpath(fpath, adapter_path) + with open(fpath, "rb") as f: + content = base64.b64encode(f.read()).decode("utf-8") + files_to_push.append({"path": rel_path, "content": content}) + + print(f"Pushing {len(files_to_push)} files to {gitea_owner}/{gitea_repo}") + + # Push each file via Gitea contents API + pushed = 0 + for item in files_to_push: + file_api = f"{repo_api}/contents/{item['path']}" + + # Check if file already exists (need SHA for update) + existing = requests.get(file_api, auth=auth, params={"ref": branch}, timeout=30) + payload = { + "message": commit_message, + "content": item["content"], + "branch": branch, + } + if existing.status_code == 200: + payload["sha"] = existing.json()["sha"] + resp = requests.put(file_api, auth=auth, json=payload, timeout=60) + else: + resp = requests.post(file_api, auth=auth, json=payload, timeout=60) + + if resp.status_code in (200, 201): + pushed += 1 + print(f" ✓ {item['path']}") + else: + print(f" ✗ {item['path']}: {resp.status_code} {resp.text[:200]}") + + repo_url = f"{gitea_url}/{gitea_owner}/{gitea_repo}" + print(f"Pushed {pushed}/{len(files_to_push)} files to {repo_url}") + + from collections import namedtuple + + return namedtuple("PushOutput", ["repo_url", "files_pushed"])( + repo_url=repo_url, files_pushed=pushed + ) + + +# ────────────────────────────────────────────────────────────── +# 6. Log metrics to MLflow +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["mlflow==2.22.0"], +) +def log_training_metrics( + base_model: str, + train_loss: float, + eval_loss: float, + num_train: int, + num_val: int, + num_pdfs: int, + lora_r: int, + lora_alpha: int, + learning_rate: float, + num_epochs: int, + repo_url: str, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", + experiment_name: str = "qlora-pdf-training", +): + """Log the full training run to MLflow.""" + import mlflow + + mlflow.set_tracking_uri(mlflow_tracking_uri) + mlflow.set_experiment(experiment_name) + + with mlflow.start_run(run_name=f"qlora-{base_model.split('/')[-1]}"): + mlflow.log_params( + { + "base_model": base_model, + "lora_r": lora_r, + "lora_alpha": lora_alpha, + "learning_rate": learning_rate, + "num_epochs": num_epochs, + "num_pdfs": num_pdfs, + "data_source": "quobjects/training-data", + } + ) + mlflow.log_metrics( + { + "train_loss": train_loss, + "eval_loss": eval_loss, + "train_samples": float(num_train), + "val_samples": float(num_val), + } + ) + mlflow.set_tag("adapter_repo", repo_url) + + +# ────────────────────────────────────────────────────────────── +# Pipeline definition +# ────────────────────────────────────────────────────────────── +@dsl.pipeline( + name="QLoRA PDF Fine-Tuning", + description=( + "Fine-tune Llama 3.1 70B via QLoRA on PDFs from the Quobjects " + "training-data bucket. Pushes the adapter to Gitea and logs " + "metrics to MLflow." + ), +) +def qlora_pdf_pipeline( + # ── S3 / Quobjects ── + s3_endpoint: str = "candlekeep.lab.daviestechlabs.io", + s3_bucket: str = "training-data", + s3_prefix: str = "", + aws_access_key_id: str = "", + aws_secret_access_key: str = "", + # ── Model ── + base_model: str = "meta-llama/Llama-3.1-70B-Instruct", + # ── Training hyper-params ── + learning_rate: float = 2e-4, + num_epochs: int = 3, + batch_size: int = 2, + gradient_accumulation_steps: int = 8, + max_seq_length: int = 2048, + lora_r: int = 64, + lora_alpha: int = 16, + lora_dropout: float = 0.05, + # ── Data prep ── + chunk_size: int = 512, + chunk_overlap: int = 64, + # ── Gitea ── + gitea_url: str = "http://gitea-http.gitea.svc.cluster.local:3000", + gitea_owner: str = "daviestechlabs", + gitea_repo: str = "qlora-adapters", + gitea_username: str = "", + gitea_password: str = "", + # ── MLflow ── + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +): + # Step 1 — Fetch PDFs from S3 + pdfs = fetch_pdfs_from_s3( + s3_endpoint=s3_endpoint, + s3_bucket=s3_bucket, + s3_prefix=s3_prefix, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + + # Step 2 — Extract text and build training dataset + data = prepare_training_data( + pdf_dir=pdfs.outputs["pdf_dir"], + max_seq_length=max_seq_length, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + + # Step 3 — QLoRA training (GPU-heavy) + trained = train_qlora( + dataset_path=data.outputs["dataset_path"], + base_model=base_model, + learning_rate=learning_rate, + num_epochs=num_epochs, + batch_size=batch_size, + gradient_accumulation_steps=gradient_accumulation_steps, + max_seq_length=max_seq_length, + lora_r=lora_r, + lora_alpha=lora_alpha, + lora_dropout=lora_dropout, + ) + # Ask for a GPU on khelben + trained.set_accelerator_type("gpu") + trained.set_gpu_limit(1) + + # Step 4 — Quick evaluation + evaluated = evaluate_adapter( + adapter_path=trained.outputs["adapter_path"], + base_model=base_model, + ) + evaluated.set_accelerator_type("gpu") + evaluated.set_gpu_limit(1) + + # Step 5 — Push adapter to Gitea + pushed = push_adapter_to_gitea( + adapter_path=trained.outputs["adapter_path"], + gitea_url=gitea_url, + gitea_owner=gitea_owner, + gitea_repo=gitea_repo, + gitea_username=gitea_username, + gitea_password=gitea_password, + ) + + # Step 6 — Log to MLflow + log_training_metrics( + base_model=base_model, + train_loss=trained.outputs["train_loss"], + eval_loss=trained.outputs["eval_loss"], + num_train=data.outputs["num_train"], + num_val=data.outputs["num_val"], + num_pdfs=pdfs.outputs["num_files"], + lora_r=lora_r, + lora_alpha=lora_alpha, + learning_rate=learning_rate, + num_epochs=num_epochs, + repo_url=pushed.outputs["repo_url"], + mlflow_tracking_uri=mlflow_tracking_uri, + ) + + +# ────────────────────────────────────────────────────────────── +# Compile +# ────────────────────────────────────────────────────────────── +if __name__ == "__main__": + compiler.Compiler().compile( + pipeline_func=qlora_pdf_pipeline, + package_path="qlora_pdf_pipeline.yaml", + ) + print("Compiled: qlora_pdf_pipeline.yaml") diff --git a/qlora_pdf_pipeline.yaml b/qlora_pdf_pipeline.yaml new file mode 100644 index 0000000..8734539 --- /dev/null +++ b/qlora_pdf_pipeline.yaml @@ -0,0 +1,904 @@ +# PIPELINE DEFINITION +# Name: qlora-pdf-fine-tuning +# Description: Fine-tune Llama 3.1 70B via QLoRA on PDFs from the Quobjects training-data bucket. Pushes the adapter to Gitea and logs metrics to MLflow. +# Inputs: +# aws_access_key_id: str [Default: ''] +# aws_secret_access_key: str [Default: ''] +# base_model: str [Default: 'meta-llama/Llama-3.1-70B-Instruct'] +# batch_size: int [Default: 2.0] +# chunk_overlap: int [Default: 64.0] +# chunk_size: int [Default: 512.0] +# gitea_owner: str [Default: 'daviestechlabs'] +# gitea_password: str [Default: ''] +# gitea_repo: str [Default: 'qlora-adapters'] +# gitea_url: str [Default: 'http://gitea-http.gitea.svc.cluster.local:3000'] +# gitea_username: str [Default: ''] +# gradient_accumulation_steps: int [Default: 8.0] +# learning_rate: float [Default: 0.0002] +# lora_alpha: int [Default: 16.0] +# lora_dropout: float [Default: 0.05] +# lora_r: int [Default: 64.0] +# max_seq_length: int [Default: 2048.0] +# mlflow_tracking_uri: str [Default: 'http://mlflow.mlflow.svc.cluster.local:80'] +# num_epochs: int [Default: 3.0] +# s3_bucket: str [Default: 'training-data'] +# s3_endpoint: str [Default: 'candlekeep.lab.daviestechlabs.io'] +# s3_prefix: str [Default: ''] +components: + comp-evaluate-adapter: + executorLabel: exec-evaluate-adapter + inputDefinitions: + parameters: + adapter_path: + parameterType: STRING + base_model: + parameterType: STRING + outputDefinitions: + parameters: + passed: + parameterType: BOOLEAN + report: + parameterType: STRING + comp-fetch-pdfs-from-s3: + executorLabel: exec-fetch-pdfs-from-s3 + inputDefinitions: + parameters: + aws_access_key_id: + parameterType: STRING + aws_secret_access_key: + parameterType: STRING + s3_bucket: + parameterType: STRING + s3_endpoint: + parameterType: STRING + s3_prefix: + parameterType: STRING + outputDefinitions: + parameters: + num_files: + parameterType: NUMBER_INTEGER + pdf_dir: + parameterType: STRING + comp-log-training-metrics: + executorLabel: exec-log-training-metrics + inputDefinitions: + parameters: + base_model: + parameterType: STRING + eval_loss: + parameterType: NUMBER_DOUBLE + experiment_name: + defaultValue: qlora-pdf-training + isOptional: true + parameterType: STRING + learning_rate: + parameterType: NUMBER_DOUBLE + lora_alpha: + parameterType: NUMBER_INTEGER + lora_r: + parameterType: NUMBER_INTEGER + mlflow_tracking_uri: + defaultValue: http://mlflow.mlflow.svc.cluster.local:80 + isOptional: true + parameterType: STRING + num_epochs: + parameterType: NUMBER_INTEGER + num_pdfs: + parameterType: NUMBER_INTEGER + num_train: + parameterType: NUMBER_INTEGER + num_val: + parameterType: NUMBER_INTEGER + repo_url: + parameterType: STRING + train_loss: + parameterType: NUMBER_DOUBLE + comp-prepare-training-data: + executorLabel: exec-prepare-training-data + inputDefinitions: + parameters: + chunk_overlap: + defaultValue: 64.0 + isOptional: true + parameterType: NUMBER_INTEGER + chunk_size: + defaultValue: 512.0 + isOptional: true + parameterType: NUMBER_INTEGER + max_seq_length: + defaultValue: 2048.0 + isOptional: true + parameterType: NUMBER_INTEGER + pdf_dir: + parameterType: STRING + outputDefinitions: + parameters: + dataset_path: + parameterType: STRING + num_train: + parameterType: NUMBER_INTEGER + num_val: + parameterType: NUMBER_INTEGER + comp-push-adapter-to-gitea: + executorLabel: exec-push-adapter-to-gitea + inputDefinitions: + parameters: + adapter_path: + parameterType: STRING + branch: + defaultValue: main + isOptional: true + parameterType: STRING + commit_message: + defaultValue: 'feat: add QLoRA adapter from PDF training pipeline' + isOptional: true + parameterType: STRING + gitea_owner: + parameterType: STRING + gitea_password: + parameterType: STRING + gitea_repo: + parameterType: STRING + gitea_url: + parameterType: STRING + gitea_username: + parameterType: STRING + outputDefinitions: + parameters: + files_pushed: + parameterType: NUMBER_INTEGER + repo_url: + parameterType: STRING + comp-train-qlora: + executorLabel: exec-train-qlora + inputDefinitions: + parameters: + base_model: + parameterType: STRING + batch_size: + defaultValue: 2.0 + isOptional: true + parameterType: NUMBER_INTEGER + dataset_path: + parameterType: STRING + gradient_accumulation_steps: + defaultValue: 8.0 + isOptional: true + parameterType: NUMBER_INTEGER + learning_rate: + defaultValue: 0.0002 + isOptional: true + parameterType: NUMBER_DOUBLE + lora_alpha: + defaultValue: 16.0 + isOptional: true + parameterType: NUMBER_INTEGER + lora_dropout: + defaultValue: 0.05 + isOptional: true + parameterType: NUMBER_DOUBLE + lora_r: + defaultValue: 64.0 + isOptional: true + parameterType: NUMBER_INTEGER + max_seq_length: + defaultValue: 2048.0 + isOptional: true + parameterType: NUMBER_INTEGER + num_epochs: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + adapter_path: + parameterType: STRING + eval_loss: + parameterType: NUMBER_DOUBLE + train_loss: + parameterType: NUMBER_DOUBLE +deploymentSpec: + executors: + exec-evaluate-adapter: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - evaluate_adapter + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'torch' 'transformers'\ + \ 'peft' 'bitsandbytes' 'accelerate' 'scipy' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef evaluate_adapter(\n adapter_path: str,\n base_model: str,\n\ + ) -> NamedTuple(\"EvalOutput\", [(\"report\", str), (\"passed\", bool)]):\n\ + \ \"\"\"Load the QLoRA adapter and run a few sanity-check prompts.\"\"\ + \"\n import torch\n from transformers import AutoModelForCausalLM,\ + \ AutoTokenizer, BitsAndBytesConfig\n from peft import PeftModel\n\n\ + \ bnb_config = BitsAndBytesConfig(\n load_in_4bit=True,\n \ + \ bnb_4bit_quant_type=\"nf4\",\n bnb_4bit_compute_dtype=torch.bfloat16,\n\ + \ bnb_4bit_use_double_quant=True,\n )\n\n print(f\"Loading\ + \ base model {base_model} \u2026\")\n model = AutoModelForCausalLM.from_pretrained(\n\ + \ base_model,\n quantization_config=bnb_config,\n device_map=\"\ + auto\",\n trust_remote_code=True,\n torch_dtype=torch.bfloat16,\n\ + \ )\n tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True)\n\ + \n print(f\"Loading adapter from {adapter_path} \u2026\")\n model\ + \ = PeftModel.from_pretrained(model, adapter_path)\n model.eval()\n\n\ + \ test_prompts = [\n \"Summarise the key points from the training\ + \ material.\",\n \"What are the main topics covered in the source\ + \ documents?\",\n \"Explain the most important concept from the training\ + \ data.\",\n ]\n\n lines = []\n for prompt in test_prompts:\n \ + \ messages = [\n {\"role\": \"system\", \"content\": \"\ + You are a helpful assistant.\"},\n {\"role\": \"user\", \"content\"\ + : prompt},\n ]\n input_text = tokenizer.apply_chat_template(\n\ + \ messages, tokenize=False, add_generation_prompt=True\n \ + \ )\n inputs = tokenizer(input_text, return_tensors=\"pt\").to(model.device)\n\ + \ with torch.no_grad():\n out = model.generate(**inputs,\ + \ max_new_tokens=128, temperature=0.7, do_sample=True)\n response\ + \ = tokenizer.decode(out[0][inputs[\"input_ids\"].shape[1]:], skip_special_tokens=True)\n\ + \ lines.append(f\"Q: {prompt}\\nA: {response}\\n\")\n print(lines[-1])\n\ + \n report = \"\\n\".join(lines)\n # Simple heuristic: did the model\ + \ produce non-empty responses?\n passed = all(len(l.split(\"A:\")[1].strip())\ + \ > 10 for l in lines)\n print(f\"Evaluation passed: {passed}\")\n\n\ + \ from collections import namedtuple\n\n return namedtuple(\"EvalOutput\"\ + , [\"report\", \"passed\"])(\n report=report, passed=passed\n \ + \ )\n\n" + image: python:3.13-slim + resources: + accelerator: + resourceCount: '1' + resourceType: gpu + exec-fetch-pdfs-from-s3: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - fetch_pdfs_from_s3 + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef fetch_pdfs_from_s3(\n s3_endpoint: str,\n s3_bucket: str,\n\ + \ s3_prefix: str,\n aws_access_key_id: str,\n aws_secret_access_key:\ + \ str,\n) -> NamedTuple(\"PDFOutput\", [(\"pdf_dir\", str), (\"num_files\"\ + , int)]):\n \"\"\"Download all PDFs from a Quobjects S3 bucket.\"\"\"\ + \n import os\n import boto3\n from botocore.client import Config\n\ + \n out_dir = \"/tmp/pdfs\"\n os.makedirs(out_dir, exist_ok=True)\n\ + \n client = boto3.client(\n \"s3\",\n endpoint_url=f\"\ + http://{s3_endpoint}\",\n aws_access_key_id=aws_access_key_id,\n\ + \ aws_secret_access_key=aws_secret_access_key,\n region_name=\"\ + us-east-1\",\n config=Config(signature_version=\"s3v4\"),\n )\n\ + \n paginator = client.get_paginator(\"list_objects_v2\")\n count =\ + \ 0\n for page in paginator.paginate(Bucket=s3_bucket, Prefix=s3_prefix):\n\ + \ for obj in page.get(\"Contents\", []):\n key = obj[\"\ + Key\"]\n if key.lower().endswith(\".pdf\"):\n \ + \ local_path = os.path.join(out_dir, os.path.basename(key))\n \ + \ print(f\"Downloading: {key} \u2192 {local_path}\")\n \ + \ client.download_file(s3_bucket, key, local_path)\n count\ + \ += 1\n\n print(f\"Downloaded {count} PDFs to {out_dir}\")\n from\ + \ collections import namedtuple\n\n return namedtuple(\"PDFOutput\",\ + \ [\"pdf_dir\", \"num_files\"])(\n pdf_dir=out_dir, num_files=count\n\ + \ )\n\n" + image: python:3.13-slim + exec-log-training-metrics: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - log_training_metrics + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'mlflow==2.22.0'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef log_training_metrics(\n base_model: str,\n train_loss:\ + \ float,\n eval_loss: float,\n num_train: int,\n num_val: int,\n\ + \ num_pdfs: int,\n lora_r: int,\n lora_alpha: int,\n learning_rate:\ + \ float,\n num_epochs: int,\n repo_url: str,\n mlflow_tracking_uri:\ + \ str = \"http://mlflow.mlflow.svc.cluster.local:80\",\n experiment_name:\ + \ str = \"qlora-pdf-training\",\n):\n \"\"\"Log the full training run\ + \ to MLflow.\"\"\"\n import mlflow\n\n mlflow.set_tracking_uri(mlflow_tracking_uri)\n\ + \ mlflow.set_experiment(experiment_name)\n\n with mlflow.start_run(run_name=f\"\ + qlora-{base_model.split('/')[-1]}\"):\n mlflow.log_params(\n \ + \ {\n \"base_model\": base_model,\n \ + \ \"lora_r\": lora_r,\n \"lora_alpha\": lora_alpha,\n \ + \ \"learning_rate\": learning_rate,\n \"num_epochs\"\ + : num_epochs,\n \"num_pdfs\": num_pdfs,\n \ + \ \"data_source\": \"quobjects/training-data\",\n }\n \ + \ )\n mlflow.log_metrics(\n {\n \"train_loss\"\ + : train_loss,\n \"eval_loss\": eval_loss,\n \ + \ \"train_samples\": float(num_train),\n \"val_samples\"\ + : float(num_val),\n }\n )\n mlflow.set_tag(\"adapter_repo\"\ + , repo_url)\n\n" + image: python:3.13-slim + exec-prepare-training-data: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - prepare_training_data + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pymupdf' &&\ + \ \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef prepare_training_data(\n pdf_dir: str,\n max_seq_length:\ + \ int = 2048,\n chunk_size: int = 512,\n chunk_overlap: int = 64,\n\ + ) -> NamedTuple(\"DataOutput\", [(\"dataset_path\", str), (\"num_train\"\ + , int), (\"num_val\", int)]):\n \"\"\"Extract text from PDFs, chunk it,\ + \ and format as instruction-tuning pairs.\"\"\"\n import json\n import\ + \ os\n import fitz # PyMuPDF\n\n out_dir = \"/tmp/training_data\"\ + \n os.makedirs(out_dir, exist_ok=True)\n\n # 1. Extract text from\ + \ all PDFs\n all_chunks: list[dict] = []\n for fname in sorted(os.listdir(pdf_dir)):\n\ + \ if not fname.lower().endswith(\".pdf\"):\n continue\n\ + \ path = os.path.join(pdf_dir, fname)\n print(f\"Extracting:\ + \ {fname}\")\n try:\n doc = fitz.open(path)\n \ + \ full_text = \"\"\n for page in doc:\n full_text\ + \ += page.get_text() + \"\\n\"\n doc.close()\n except\ + \ Exception as e:\n print(f\" SKIP ({e})\")\n continue\n\ + \n # 2. Chunk text with overlap\n words = full_text.split()\n\ + \ for i in range(0, len(words), chunk_size - chunk_overlap):\n \ + \ chunk_words = words[i : i + chunk_size]\n if len(chunk_words)\ + \ < 50:\n continue # skip tiny trailing chunks\n \ + \ chunk_text = \" \".join(chunk_words)\n all_chunks.append({\"\ + text\": chunk_text, \"source\": fname})\n\n print(f\"Total chunks: {len(all_chunks)}\"\ + )\n if not all_chunks:\n raise ValueError(\"No text extracted\ + \ from PDFs \u2014 check your bucket\")\n\n # 3. Format as Llama 3 chat\ + \ training pairs\n # We create self-supervised pairs: model learns\ + \ to continue/explain the content\n samples = []\n for chunk in all_chunks:\n\ + \ text = chunk[\"text\"]\n source = chunk[\"source\"]\n \ + \ # Split chunk roughly in half for input/output\n words = text.split()\n\ + \ mid = len(words) // 2\n context = \" \".join(words[:mid])\n\ + \ continuation = \" \".join(words[mid:])\n\n samples.append(\n\ + \ {\n \"messages\": [\n {\n\ + \ \"role\": \"system\",\n \ + \ \"content\": (\n \"You are a knowledgeable\ + \ assistant. \"\n \"Continue the information\ + \ accurately and coherently.\"\n ),\n \ + \ },\n {\n \"role\": \"\ + user\",\n \"content\": f\"Continue the following\ + \ passage from {source}:\\n\\n{context}\",\n },\n \ + \ {\"role\": \"assistant\", \"content\": continuation},\n\ + \ ]\n }\n )\n\n # 4. Train/val split\ + \ (90/10)\n import random\n\n random.seed(42)\n random.shuffle(samples)\n\ + \ split = int(len(samples) * 0.9)\n train = samples[:split]\n val\ + \ = samples[split:]\n\n train_path = os.path.join(out_dir, \"train.json\"\ + )\n val_path = os.path.join(out_dir, \"val.json\")\n with open(train_path,\ + \ \"w\") as f:\n json.dump(train, f)\n with open(val_path, \"\ + w\") as f:\n json.dump(val, f)\n\n print(f\"Train: {len(train)}\ + \ samples, Val: {len(val)} samples\")\n from collections import namedtuple\n\ + \n return namedtuple(\"DataOutput\", [\"dataset_path\", \"num_train\"\ + , \"num_val\"])(\n dataset_path=out_dir, num_train=len(train), num_val=len(val)\n\ + \ )\n\n" + image: python:3.13-slim + exec-push-adapter-to-gitea: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - push_adapter_to_gitea + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'requests' &&\ + \ \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef push_adapter_to_gitea(\n adapter_path: str,\n gitea_url:\ + \ str,\n gitea_owner: str,\n gitea_repo: str,\n gitea_username:\ + \ str,\n gitea_password: str,\n branch: str = \"main\",\n commit_message:\ + \ str = \"feat: add QLoRA adapter from PDF training pipeline\",\n) -> NamedTuple(\"\ + PushOutput\", [(\"repo_url\", str), (\"files_pushed\", int)]):\n \"\"\ + \"Push the QLoRA adapter files to a Gitea repository via the API.\"\"\"\n\ + \ import base64\n import json\n import os\n import requests\n\ + \n api_base = f\"{gitea_url}/api/v1\"\n auth = (gitea_username, gitea_password)\n\ + \ repo_api = f\"{api_base}/repos/{gitea_owner}/{gitea_repo}\"\n\n \ + \ # Check if repo exists, create if not\n resp = requests.get(repo_api,\ + \ auth=auth, timeout=30)\n if resp.status_code == 404:\n print(f\"\ + Creating repo {gitea_owner}/{gitea_repo} \u2026\")\n create_resp\ + \ = requests.post(\n f\"{api_base}/orgs/{gitea_owner}/repos\"\ + \n if gitea_owner != gitea_username\n else f\"{api_base}/user/repos\"\ + ,\n auth=auth,\n json={\n \"name\"\ + : gitea_repo,\n \"description\": \"QLoRA adapters trained\ + \ from PDF documents\",\n \"private\": False,\n \ + \ \"auto_init\": True,\n },\n timeout=30,\n\ + \ )\n create_resp.raise_for_status()\n print(f\"Created:\ + \ {create_resp.json().get('html_url')}\")\n\n # Collect all adapter files\n\ + \ files_to_push = []\n for root, dirs, files in os.walk(adapter_path):\n\ + \ for fname in files:\n fpath = os.path.join(root, fname)\n\ + \ rel_path = os.path.relpath(fpath, adapter_path)\n \ + \ with open(fpath, \"rb\") as f:\n content = base64.b64encode(f.read()).decode(\"\ + utf-8\")\n files_to_push.append({\"path\": rel_path, \"content\"\ + : content})\n\n print(f\"Pushing {len(files_to_push)} files to {gitea_owner}/{gitea_repo}\"\ + )\n\n # Push each file via Gitea contents API\n pushed = 0\n for\ + \ item in files_to_push:\n file_api = f\"{repo_api}/contents/{item['path']}\"\ + \n\n # Check if file already exists (need SHA for update)\n \ + \ existing = requests.get(file_api, auth=auth, params={\"ref\": branch},\ + \ timeout=30)\n payload = {\n \"message\": commit_message,\n\ + \ \"content\": item[\"content\"],\n \"branch\": branch,\n\ + \ }\n if existing.status_code == 200:\n payload[\"\ + sha\"] = existing.json()[\"sha\"]\n resp = requests.put(file_api,\ + \ auth=auth, json=payload, timeout=60)\n else:\n resp\ + \ = requests.post(file_api, auth=auth, json=payload, timeout=60)\n\n \ + \ if resp.status_code in (200, 201):\n pushed += 1\n \ + \ print(f\" \u2713 {item['path']}\")\n else:\n \ + \ print(f\" \u2717 {item['path']}: {resp.status_code} {resp.text[:200]}\"\ + )\n\n repo_url = f\"{gitea_url}/{gitea_owner}/{gitea_repo}\"\n print(f\"\ + Pushed {pushed}/{len(files_to_push)} files to {repo_url}\")\n\n from\ + \ collections import namedtuple\n\n return namedtuple(\"PushOutput\"\ + , [\"repo_url\", \"files_pushed\"])(\n repo_url=repo_url, files_pushed=pushed\n\ + \ )\n\n" + image: python:3.13-slim + exec-train-qlora: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_qlora + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'torch' 'transformers'\ + \ 'peft' 'datasets' 'accelerate' 'bitsandbytes' 'scipy' 'trl' && \"$0\"\ + \ \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_qlora(\n dataset_path: str,\n base_model: str,\n\ + \ learning_rate: float = 2e-4,\n num_epochs: int = 3,\n batch_size:\ + \ int = 2,\n gradient_accumulation_steps: int = 8,\n max_seq_length:\ + \ int = 2048,\n lora_r: int = 64,\n lora_alpha: int = 16,\n lora_dropout:\ + \ float = 0.05,\n) -> NamedTuple(\n \"TrainOutput\",\n [(\"adapter_path\"\ + , str), (\"train_loss\", float), (\"eval_loss\", float)],\n):\n \"\"\"\ + QLoRA fine-tune Llama 3.1 70B with 4-bit NF4 quantization.\"\"\"\n import\ + \ json\n import os\n\n import torch\n from datasets import Dataset\n\ + \ from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training\n\ + \ from transformers import (\n AutoModelForCausalLM,\n \ + \ AutoTokenizer,\n BitsAndBytesConfig,\n TrainingArguments,\n\ + \ )\n from trl import SFTTrainer\n\n output_dir = \"/tmp/qlora_output\"\ + \n os.makedirs(output_dir, exist_ok=True)\n\n # \u2500\u2500 Load\ + \ data \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n with open(os.path.join(dataset_path,\ + \ \"train.json\")) as f:\n train_data = json.load(f)\n with open(os.path.join(dataset_path,\ + \ \"val.json\")) as f:\n val_data = json.load(f)\n\n print(f\"\ + Loaded {len(train_data)} train / {len(val_data)} val samples\")\n\n #\ + \ \u2500\u2500 Tokenizer \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n \ + \ print(f\"Loading tokenizer: {base_model}\")\n tokenizer = AutoTokenizer.from_pretrained(base_model,\ + \ trust_remote_code=True)\n if tokenizer.pad_token is None:\n \ + \ tokenizer.pad_token = tokenizer.eos_token\n tokenizer.padding_side\ + \ = \"right\"\n\n # \u2500\u2500 Format with chat template \u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\n def format_chat(sample):\n return {\"text\": tokenizer.apply_chat_template(\n\ + \ sample[\"messages\"], tokenize=False, add_generation_prompt=False\n\ + \ )}\n\n train_ds = Dataset.from_list(train_data).map(format_chat)\n\ + \ val_ds = Dataset.from_list(val_data).map(format_chat)\n\n # \u2500\ + \u2500 4-bit quantisation \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\n bnb_config = BitsAndBytesConfig(\n load_in_4bit=True,\n\ + \ bnb_4bit_quant_type=\"nf4\",\n bnb_4bit_compute_dtype=torch.bfloat16,\n\ + \ bnb_4bit_use_double_quant=True,\n )\n\n print(f\"Loading\ + \ model: {base_model} (4-bit NF4)\")\n model = AutoModelForCausalLM.from_pretrained(\n\ + \ base_model,\n quantization_config=bnb_config,\n device_map=\"\ + auto\",\n trust_remote_code=True,\n torch_dtype=torch.bfloat16,\n\ + \ )\n model = prepare_model_for_kbit_training(model)\n\n # \u2500\ + \u2500 LoRA config \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n lora_config = LoraConfig(\n\ + \ r=lora_r,\n lora_alpha=lora_alpha,\n target_modules=[\n\ + \ \"q_proj\", \"k_proj\", \"v_proj\", \"o_proj\",\n \ + \ \"gate_proj\", \"up_proj\", \"down_proj\",\n ],\n lora_dropout=lora_dropout,\n\ + \ bias=\"none\",\n task_type=\"CAUSAL_LM\",\n )\n\n \ + \ model = get_peft_model(model, lora_config)\n model.print_trainable_parameters()\n\ + \n # \u2500\u2500 Training args \u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n training_args = TrainingArguments(\n\ + \ output_dir=os.path.join(output_dir, \"checkpoints\"),\n \ + \ num_train_epochs=num_epochs,\n per_device_train_batch_size=batch_size,\n\ + \ per_device_eval_batch_size=batch_size,\n gradient_accumulation_steps=gradient_accumulation_steps,\n\ + \ learning_rate=learning_rate,\n bf16=True,\n logging_steps=5,\n\ + \ eval_strategy=\"steps\",\n eval_steps=50,\n save_strategy=\"\ + steps\",\n save_steps=100,\n save_total_limit=2,\n \ + \ load_best_model_at_end=True,\n metric_for_best_model=\"eval_loss\"\ + ,\n report_to=\"none\",\n warmup_ratio=0.03,\n lr_scheduler_type=\"\ + cosine\",\n optim=\"paged_adamw_8bit\",\n max_grad_norm=0.3,\n\ + \ group_by_length=True,\n )\n\n # \u2500\u2500 SFTTrainer \u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\n trainer = SFTTrainer(\n model=model,\n\ + \ args=training_args,\n train_dataset=train_ds,\n eval_dataset=val_ds,\n\ + \ tokenizer=tokenizer,\n max_seq_length=max_seq_length,\n\ + \ dataset_text_field=\"text\",\n packing=True, # pack short\ + \ samples for efficiency\n )\n\n print(\"Starting QLoRA training \u2026\ + \")\n result = trainer.train()\n train_loss = result.training_loss\n\ + \n eval_result = trainer.evaluate()\n eval_loss = eval_result.get(\"\ + eval_loss\", 0.0)\n\n print(f\"Train loss: {train_loss:.4f}, Eval loss:\ + \ {eval_loss:.4f}\")\n\n # \u2500\u2500 Save adapter \u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\n adapter_path = os.path.join(output_dir, \"adapter\")\n model.save_pretrained(adapter_path)\n\ + \ tokenizer.save_pretrained(adapter_path)\n\n metadata = {\n \ + \ \"base_model\": base_model,\n \"lora_r\": lora_r,\n \"\ + lora_alpha\": lora_alpha,\n \"lora_dropout\": lora_dropout,\n \ + \ \"learning_rate\": learning_rate,\n \"num_epochs\": num_epochs,\n\ + \ \"batch_size\": batch_size,\n \"gradient_accumulation_steps\"\ + : gradient_accumulation_steps,\n \"max_seq_length\": max_seq_length,\n\ + \ \"train_samples\": len(train_data),\n \"val_samples\": len(val_data),\n\ + \ \"train_loss\": train_loss,\n \"eval_loss\": eval_loss,\n\ + \ }\n with open(os.path.join(adapter_path, \"training_metadata.json\"\ + ), \"w\") as f:\n json.dump(metadata, f, indent=2)\n\n print(f\"\ + Adapter saved to {adapter_path}\")\n\n from collections import namedtuple\n\ + \n return namedtuple(\"TrainOutput\", [\"adapter_path\", \"train_loss\"\ + , \"eval_loss\"])(\n adapter_path=adapter_path,\n train_loss=train_loss,\n\ + \ eval_loss=eval_loss,\n )\n\n" + image: python:3.13-slim + resources: + accelerator: + resourceCount: '1' + resourceType: gpu +pipelineInfo: + description: Fine-tune Llama 3.1 70B via QLoRA on PDFs from the Quobjects training-data + bucket. Pushes the adapter to Gitea and logs metrics to MLflow. + name: qlora-pdf-fine-tuning +root: + dag: + tasks: + evaluate-adapter: + cachingOptions: + enableCache: true + componentRef: + name: comp-evaluate-adapter + dependentTasks: + - train-qlora + inputs: + parameters: + adapter_path: + taskOutputParameter: + outputParameterKey: adapter_path + producerTask: train-qlora + base_model: + componentInputParameter: base_model + taskInfo: + name: evaluate-adapter + fetch-pdfs-from-s3: + cachingOptions: + enableCache: true + componentRef: + name: comp-fetch-pdfs-from-s3 + inputs: + parameters: + aws_access_key_id: + componentInputParameter: aws_access_key_id + aws_secret_access_key: + componentInputParameter: aws_secret_access_key + s3_bucket: + componentInputParameter: s3_bucket + s3_endpoint: + componentInputParameter: s3_endpoint + s3_prefix: + componentInputParameter: s3_prefix + taskInfo: + name: fetch-pdfs-from-s3 + log-training-metrics: + cachingOptions: + enableCache: true + componentRef: + name: comp-log-training-metrics + dependentTasks: + - fetch-pdfs-from-s3 + - prepare-training-data + - push-adapter-to-gitea + - train-qlora + inputs: + parameters: + base_model: + componentInputParameter: base_model + eval_loss: + taskOutputParameter: + outputParameterKey: eval_loss + producerTask: train-qlora + learning_rate: + componentInputParameter: learning_rate + lora_alpha: + componentInputParameter: lora_alpha + lora_r: + componentInputParameter: lora_r + mlflow_tracking_uri: + componentInputParameter: mlflow_tracking_uri + num_epochs: + componentInputParameter: num_epochs + num_pdfs: + taskOutputParameter: + outputParameterKey: num_files + producerTask: fetch-pdfs-from-s3 + num_train: + taskOutputParameter: + outputParameterKey: num_train + producerTask: prepare-training-data + num_val: + taskOutputParameter: + outputParameterKey: num_val + producerTask: prepare-training-data + repo_url: + taskOutputParameter: + outputParameterKey: repo_url + producerTask: push-adapter-to-gitea + train_loss: + taskOutputParameter: + outputParameterKey: train_loss + producerTask: train-qlora + taskInfo: + name: log-training-metrics + prepare-training-data: + cachingOptions: + enableCache: true + componentRef: + name: comp-prepare-training-data + dependentTasks: + - fetch-pdfs-from-s3 + inputs: + parameters: + chunk_overlap: + componentInputParameter: chunk_overlap + chunk_size: + componentInputParameter: chunk_size + max_seq_length: + componentInputParameter: max_seq_length + pdf_dir: + taskOutputParameter: + outputParameterKey: pdf_dir + producerTask: fetch-pdfs-from-s3 + taskInfo: + name: prepare-training-data + push-adapter-to-gitea: + cachingOptions: + enableCache: true + componentRef: + name: comp-push-adapter-to-gitea + dependentTasks: + - train-qlora + inputs: + parameters: + adapter_path: + taskOutputParameter: + outputParameterKey: adapter_path + producerTask: train-qlora + gitea_owner: + componentInputParameter: gitea_owner + gitea_password: + componentInputParameter: gitea_password + gitea_repo: + componentInputParameter: gitea_repo + gitea_url: + componentInputParameter: gitea_url + gitea_username: + componentInputParameter: gitea_username + taskInfo: + name: push-adapter-to-gitea + train-qlora: + cachingOptions: + enableCache: true + componentRef: + name: comp-train-qlora + dependentTasks: + - prepare-training-data + inputs: + parameters: + base_model: + componentInputParameter: base_model + batch_size: + componentInputParameter: batch_size + dataset_path: + taskOutputParameter: + outputParameterKey: dataset_path + producerTask: prepare-training-data + gradient_accumulation_steps: + componentInputParameter: gradient_accumulation_steps + learning_rate: + componentInputParameter: learning_rate + lora_alpha: + componentInputParameter: lora_alpha + lora_dropout: + componentInputParameter: lora_dropout + lora_r: + componentInputParameter: lora_r + max_seq_length: + componentInputParameter: max_seq_length + num_epochs: + componentInputParameter: num_epochs + taskInfo: + name: train-qlora + inputDefinitions: + parameters: + aws_access_key_id: + defaultValue: '' + isOptional: true + parameterType: STRING + aws_secret_access_key: + defaultValue: '' + isOptional: true + parameterType: STRING + base_model: + defaultValue: meta-llama/Llama-3.1-70B-Instruct + isOptional: true + parameterType: STRING + batch_size: + defaultValue: 2.0 + isOptional: true + parameterType: NUMBER_INTEGER + chunk_overlap: + defaultValue: 64.0 + isOptional: true + parameterType: NUMBER_INTEGER + chunk_size: + defaultValue: 512.0 + isOptional: true + parameterType: NUMBER_INTEGER + gitea_owner: + defaultValue: daviestechlabs + isOptional: true + parameterType: STRING + gitea_password: + defaultValue: '' + isOptional: true + parameterType: STRING + gitea_repo: + defaultValue: qlora-adapters + isOptional: true + parameterType: STRING + gitea_url: + defaultValue: http://gitea-http.gitea.svc.cluster.local:3000 + isOptional: true + parameterType: STRING + gitea_username: + defaultValue: '' + isOptional: true + parameterType: STRING + gradient_accumulation_steps: + defaultValue: 8.0 + isOptional: true + parameterType: NUMBER_INTEGER + learning_rate: + defaultValue: 0.0002 + isOptional: true + parameterType: NUMBER_DOUBLE + lora_alpha: + defaultValue: 16.0 + isOptional: true + parameterType: NUMBER_INTEGER + lora_dropout: + defaultValue: 0.05 + isOptional: true + parameterType: NUMBER_DOUBLE + lora_r: + defaultValue: 64.0 + isOptional: true + parameterType: NUMBER_INTEGER + max_seq_length: + defaultValue: 2048.0 + isOptional: true + parameterType: NUMBER_INTEGER + mlflow_tracking_uri: + defaultValue: http://mlflow.mlflow.svc.cluster.local:80 + isOptional: true + parameterType: STRING + num_epochs: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + s3_bucket: + defaultValue: training-data + isOptional: true + parameterType: STRING + s3_endpoint: + defaultValue: candlekeep.lab.daviestechlabs.io + isOptional: true + parameterType: STRING + s3_prefix: + defaultValue: '' + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.12.1