From c26e4e5ef0e95a869f37d48fd891ff50ddba666f Mon Sep 17 00:00:00 2001 From: "Billy D." Date: Sun, 1 Feb 2026 20:41:13 -0500 Subject: [PATCH] feat: Add Kubeflow Pipeline definitions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - voice_pipeline: STT → RAG → LLM → TTS - document_ingestion_pipeline: Extract → Chunk → Embed → Milvus - document_ingestion_mlflow_pipeline: With MLflow tracking - evaluation_pipeline: Model benchmarking - kfp-sync-job: K8s job to sync pipelines --- README.md | 97 +++++- document_ingestion_mlflow_pipeline.py | 466 ++++++++++++++++++++++++++ document_ingestion_pipeline.py | 257 ++++++++++++++ evaluation_pipeline.py | 208 ++++++++++++ kfp-sync-job.yaml | 347 +++++++++++++++++++ voice_pipeline.py | 316 +++++++++++++++++ 6 files changed, 1690 insertions(+), 1 deletion(-) create mode 100644 document_ingestion_mlflow_pipeline.py create mode 100644 document_ingestion_pipeline.py create mode 100644 evaluation_pipeline.py create mode 100644 kfp-sync-job.yaml create mode 100644 voice_pipeline.py diff --git a/README.md b/README.md index 07923ff..fb75b32 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,97 @@ -# kubeflow +# Kubeflow Pipelines +Kubeflow Pipeline definitions for the DaviesTechLabs AI/ML platform. + +## Pipelines + +| Pipeline | Description | Output | +|----------|-------------|--------| +| `voice_pipeline.py` | STT → RAG → LLM → TTS | voice_pipeline.yaml | +| `document_ingestion_pipeline.py` | Ingest docs → chunk → embed → Milvus | document_ingestion.yaml | +| `document_ingestion_mlflow_pipeline.py` | Same with MLflow tracking | document_ingestion_mlflow.yaml | +| `evaluation_pipeline.py` | Evaluate models against benchmarks | evaluation.yaml | + +## Usage + +### Compile Pipelines + +```bash +pip install kfp==2.12.1 + +# Compile all +python voice_pipeline.py +python document_ingestion_pipeline.py +python evaluation_pipeline.py + +# Or compile individually +python -c "from kfp import compiler; from voice_pipeline import voice_assistant_pipeline; compiler.Compiler().compile(voice_assistant_pipeline, 'voice.yaml')" +``` + +### Upload to Kubeflow + +Upload the generated `.yaml` files to Kubeflow Pipelines UI or use the SDK: + +```python +import kfp +client = kfp.Client(host='http://kubeflow.ai-ml.svc.cluster.local/pipeline') +client.upload_pipeline('voice_pipeline.yaml', pipeline_name='Voice Assistant') +``` + +## Pipeline Details + +### voice_pipeline + +Full voice assistant with RAG: +1. Transcribe audio (Whisper) +2. Generate embeddings (BGE) +3. Search Milvus +4. Rerank documents (BGE Reranker) +5. Generate response (vLLM) +6. Synthesize speech (XTTS) + +### document_ingestion_pipeline + +Ingest documents into vector DB: +1. Extract text (PDF, DOCX, HTML, TXT) +2. Chunk with overlap (tiktoken) +3. Generate embeddings +4. Store in Milvus collection + +### evaluation_pipeline + +Benchmark model quality: +1. Load eval dataset (MMLU, etc.) +2. Run inference +3. Calculate metrics (accuracy, F1) +4. Log to MLflow + +## Integration + +### kfp-sync-job.yaml + +Kubernetes Job to sync compiled pipelines to Kubeflow: + +```bash +kubectl apply -f kfp-sync-job.yaml +``` + +### From Argo Workflows + +Pipelines can be triggered from Argo via the kfp-integration workflow in the [argo](https://git.daviestechlabs.io/daviestechlabs/argo) repo. + +## Service Endpoints + +| Service | Endpoint | +|---------|----------| +| Whisper STT | `http://whisper-predictor.ai-ml.svc.cluster.local` | +| Embeddings | `http://embeddings-predictor.ai-ml.svc.cluster.local` | +| Reranker | `http://reranker-predictor.ai-ml.svc.cluster.local` | +| vLLM | `http://llm-draft.ai-ml.svc.cluster.local:8000` | +| TTS | `http://tts-predictor.ai-ml.svc.cluster.local` | +| Milvus | `milvus.ai-ml.svc.cluster.local:19530` | + +## Related + +- [argo](https://git.daviestechlabs.io/daviestechlabs/argo) - Argo Workflows for training +- [voice-assistant](https://git.daviestechlabs.io/daviestechlabs/voice-assistant) - Real-time voice handler +- [homelab-design](https://git.daviestechlabs.io/daviestechlabs/homelab-design) - Architecture docs diff --git a/document_ingestion_mlflow_pipeline.py b/document_ingestion_mlflow_pipeline.py new file mode 100644 index 0000000..ebecdd2 --- /dev/null +++ b/document_ingestion_mlflow_pipeline.py @@ -0,0 +1,466 @@ +#!/usr/bin/env python3 +""" +MLflow-Integrated Document Ingestion Pipeline + +Enhanced version of document_ingestion_pipeline.py with full MLflow +experiment tracking for metrics, parameters, and artifacts. + +Usage: + pip install kfp==2.12.1 mlflow>=2.10.0 + python document_ingestion_mlflow_pipeline.py + # Upload to Kubeflow Pipelines UI +""" + +from kfp import dsl +from kfp import compiler +from typing import NamedTuple + + +# MLflow component configuration +MLFLOW_IMAGE = "python:3.13-slim" +MLFLOW_PACKAGES = [ + "mlflow>=2.10.0", + "psycopg2-binary>=2.9.0", + "httpx", + "beautifulsoup4", + "pypdf2", + "docx2txt", + "tiktoken", + "pymilvus", +] + + +@dsl.component( + base_image=MLFLOW_IMAGE, + packages_to_install=["mlflow>=2.10.0", "psycopg2-binary"] +) +def start_mlflow_run( + experiment_name: str, + run_name: str, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", + pipeline_params: dict = None, +) -> NamedTuple('RunInfo', [('run_id', str), ('experiment_id', str)]): + """Start an MLflow run and log initial parameters.""" + import os + import mlflow + from mlflow.tracking import MlflowClient + from collections import namedtuple + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + # Get or create experiment + experiment = client.get_experiment_by_name(experiment_name) + if experiment is None: + experiment_id = client.create_experiment( + name=experiment_name, + artifact_location=f"/mlflow/artifacts/{experiment_name}" + ) + else: + experiment_id = experiment.experiment_id + + # Start run with tags + tags = { + "pipeline.type": "document-ingestion", + "pipeline.framework": "kubeflow", + "kfp.run_id": os.environ.get("KFP_RUN_ID", "unknown"), + } + + run = mlflow.start_run( + experiment_id=experiment_id, + run_name=run_name, + tags=tags, + ) + + # Log pipeline parameters + if pipeline_params: + for key, value in pipeline_params.items(): + mlflow.log_param(key, str(value)[:500]) + + run_id = run.info.run_id + mlflow.end_run() + + RunInfo = namedtuple('RunInfo', ['run_id', 'experiment_id']) + return RunInfo(run_id, experiment_id) + + +@dsl.component( + base_image=MLFLOW_IMAGE, + packages_to_install=MLFLOW_PACKAGES +) +def extract_text_with_tracking( + source_url: str, + run_id: str, + source_type: str = "auto", + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +) -> NamedTuple('ExtractionResult', [('text', str), ('char_count', int), ('source_type', str)]): + """Extract text from documents with MLflow tracking.""" + import time + import httpx + import mlflow + from mlflow.tracking import MlflowClient + from pathlib import Path + import tempfile + from collections import namedtuple + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + start_time = time.time() + + # Download content + with httpx.Client(timeout=120.0) as http_client: + response = http_client.get(source_url) + content = response.content + + # Detect type if auto + if source_type == "auto": + if source_url.endswith(".pdf"): + source_type = "pdf" + elif source_url.endswith(".docx"): + source_type = "docx" + elif source_url.endswith(".html") or source_url.endswith(".htm"): + source_type = "html" + else: + source_type = "text" + + # Extract text based on type + if source_type == "pdf": + from PyPDF2 import PdfReader + import io + reader = PdfReader(io.BytesIO(content)) + text = "\n\n".join([page.extract_text() for page in reader.pages]) + elif source_type == "docx": + import docx2txt + with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as f: + f.write(content) + text = docx2txt.process(f.name) + elif source_type == "html": + from bs4 import BeautifulSoup + soup = BeautifulSoup(content, "html.parser") + text = soup.get_text(separator="\n") + else: + text = content.decode("utf-8", errors="ignore") + + extraction_time = time.time() - start_time + + # Log to MLflow + client.log_param(run_id, "source_url", source_url[:500]) + client.log_param(run_id, "source_type", source_type) + client.log_metric(run_id, "extraction_time_seconds", extraction_time) + client.log_metric(run_id, "source_size_bytes", len(content)) + client.log_metric(run_id, "extracted_char_count", len(text)) + + ExtractionResult = namedtuple('ExtractionResult', ['text', 'char_count', 'source_type']) + return ExtractionResult(text, len(text), source_type) + + +@dsl.component( + base_image=MLFLOW_IMAGE, + packages_to_install=MLFLOW_PACKAGES +) +def chunk_text_with_tracking( + text: str, + run_id: str, + chunk_size: int = 500, + overlap: int = 50, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +) -> NamedTuple('ChunkResult', [('chunks', list), ('chunk_count', int)]): + """Split text into chunks with MLflow tracking.""" + import time + import tiktoken + import mlflow + from mlflow.tracking import MlflowClient + from collections import namedtuple + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + start_time = time.time() + + enc = tiktoken.get_encoding("cl100k_base") + tokens = enc.encode(text) + + chunks = [] + start = 0 + while start < len(tokens): + end = min(start + chunk_size, len(tokens)) + chunk_tokens = tokens[start:end] + chunk_text = enc.decode(chunk_tokens) + chunks.append({ + "text": chunk_text, + "start_token": start, + "end_token": end, + "token_count": len(chunk_tokens), + }) + start += chunk_size - overlap + + chunking_time = time.time() - start_time + + # Log to MLflow + client.log_param(run_id, "chunk_size", chunk_size) + client.log_param(run_id, "chunk_overlap", overlap) + client.log_metric(run_id, "chunking_time_seconds", chunking_time) + client.log_metric(run_id, "total_tokens", len(tokens)) + client.log_metric(run_id, "chunks_created", len(chunks)) + client.log_metric(run_id, "avg_tokens_per_chunk", len(tokens) / len(chunks) if chunks else 0) + + ChunkResult = namedtuple('ChunkResult', ['chunks', 'chunk_count']) + return ChunkResult(chunks, len(chunks)) + + +@dsl.component( + base_image=MLFLOW_IMAGE, + packages_to_install=MLFLOW_PACKAGES +) +def generate_embeddings_with_tracking( + chunks: list, + run_id: str, + embeddings_url: str = "http://embeddings-predictor.ai-ml.svc.cluster.local", + embeddings_model: str = "bge-small-en-v1.5", + batch_size: int = 32, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +) -> NamedTuple('EmbeddingResult', [('embedded_chunks', list), ('embedding_dim', int)]): + """Generate embeddings with MLflow tracking.""" + import time + import httpx + import mlflow + from mlflow.tracking import MlflowClient + from collections import namedtuple + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + start_time = time.time() + + embedded_chunks = [] + texts = [c["text"] for c in chunks] + embedding_dim = 0 + + with httpx.Client(timeout=300.0) as http_client: + for i in range(0, len(texts), batch_size): + batch = texts[i:i + batch_size] + response = http_client.post( + f"{embeddings_url}/embeddings", + json={"input": batch, "model": embeddings_model} + ) + result = response.json() + + for j, embedding_data in enumerate(result["data"]): + chunk = chunks[i + j].copy() + chunk["embedding"] = embedding_data["embedding"] + embedded_chunks.append(chunk) + + if embedding_dim == 0: + embedding_dim = len(embedding_data["embedding"]) + + embedding_time = time.time() - start_time + + # Log to MLflow + client.log_param(run_id, "embeddings_model", embeddings_model) + client.log_param(run_id, "embedding_batch_size", batch_size) + client.log_metric(run_id, "embedding_time_seconds", embedding_time) + client.log_metric(run_id, "embedding_dimension", embedding_dim) + client.log_metric(run_id, "embeddings_per_second", len(chunks) / embedding_time if embedding_time > 0 else 0) + + EmbeddingResult = namedtuple('EmbeddingResult', ['embedded_chunks', 'embedding_dim']) + return EmbeddingResult(embedded_chunks, embedding_dim) + + +@dsl.component( + base_image=MLFLOW_IMAGE, + packages_to_install=MLFLOW_PACKAGES +) +def upsert_to_milvus_with_tracking( + chunks: list, + run_id: str, + collection_name: str, + source_name: str, + milvus_host: str = "milvus.ai-ml.svc.cluster.local", + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +) -> NamedTuple('UpsertResult', [('inserted_count', int), ('collection_name', str)]): + """Upsert to Milvus with MLflow tracking.""" + import time + import mlflow + from mlflow.tracking import MlflowClient + from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility + from collections import namedtuple + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + start_time = time.time() + + connections.connect(host=milvus_host, port=19530) + + # Create collection if needed + if not utility.has_collection(collection_name): + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), + FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=8192), + FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=512), + FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=len(chunks[0]["embedding"])) + ] + schema = CollectionSchema(fields, description="Document embeddings") + Collection(name=collection_name, schema=schema) + + collection = Collection(collection_name) + + # Prepare and insert data + data = [ + [chunk["text"] for chunk in chunks], + [source_name for _ in chunks], + [chunk["embedding"] for chunk in chunks] + ] + + result = collection.insert(data) + collection.flush() + + # Create index if needed + if not collection.has_index(): + collection.create_index( + field_name="embedding", + index_params={ + "metric_type": "COSINE", + "index_type": "HNSW", + "params": {"M": 16, "efConstruction": 256} + } + ) + + upsert_time = time.time() - start_time + inserted_count = len(chunks) + + # Log to MLflow + client.log_param(run_id, "milvus_collection", collection_name) + client.log_param(run_id, "source_name", source_name[:500]) + client.log_metric(run_id, "upsert_time_seconds", upsert_time) + client.log_metric(run_id, "documents_inserted", inserted_count) + client.log_metric(run_id, "inserts_per_second", inserted_count / upsert_time if upsert_time > 0 else 0) + + connections.disconnect("default") + + UpsertResult = namedtuple('UpsertResult', ['inserted_count', 'collection_name']) + return UpsertResult(inserted_count, collection_name) + + +@dsl.component( + base_image=MLFLOW_IMAGE, + packages_to_install=["mlflow>=2.10.0", "psycopg2-binary"] +) +def finalize_mlflow_run( + run_id: str, + total_chunks: int, + status: str = "FINISHED", + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +) -> str: + """Finalize the MLflow run with summary metrics.""" + import mlflow + from mlflow.tracking import MlflowClient + from mlflow.entities import RunStatus + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + # Set final status + status_map = { + "FINISHED": RunStatus.FINISHED, + "FAILED": RunStatus.FAILED, + } + run_status = status_map.get(status.upper(), RunStatus.FINISHED) + + # Log summary + client.set_tag(run_id, "pipeline.status", status) + client.log_metric(run_id, "final_chunk_count", total_chunks) + + client.set_terminated(run_id, status=run_status) + + return run_id + + +@dsl.pipeline( + name="mlflow-document-ingestion", + description="Document ingestion with MLflow experiment tracking" +) +def document_ingestion_mlflow_pipeline( + source_url: str, + collection_name: str = "knowledge_base", + source_name: str = "", + chunk_size: int = 500, + chunk_overlap: int = 50, + experiment_name: str = "document-ingestion", + run_name: str = "", + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +): + """ + Document Ingestion Pipeline with MLflow Tracking + + Ingests documents into Milvus while tracking all metrics + and parameters in MLflow for experiment comparison. + """ + import time + + # Generate run name if not provided + effective_run_name = run_name or f"ingestion-{int(time.time())}" + + # Start MLflow run + mlflow_run = start_mlflow_run( + experiment_name=experiment_name, + run_name=effective_run_name, + mlflow_tracking_uri=mlflow_tracking_uri, + pipeline_params={ + "source_url": source_url, + "collection_name": collection_name, + "chunk_size": chunk_size, + "chunk_overlap": chunk_overlap, + }, + ) + + # Extract text + extraction = extract_text_with_tracking( + source_url=source_url, + run_id=mlflow_run.outputs["run_id"], + mlflow_tracking_uri=mlflow_tracking_uri, + ) + extraction.set_caching_options(enable_caching=True) + + # Chunk text + chunking = chunk_text_with_tracking( + text=extraction.outputs["text"], + run_id=mlflow_run.outputs["run_id"], + chunk_size=chunk_size, + overlap=chunk_overlap, + mlflow_tracking_uri=mlflow_tracking_uri, + ) + chunking.set_caching_options(enable_caching=True) + + # Generate embeddings + embedding = generate_embeddings_with_tracking( + chunks=chunking.outputs["chunks"], + run_id=mlflow_run.outputs["run_id"], + mlflow_tracking_uri=mlflow_tracking_uri, + ) + embedding.set_caching_options(enable_caching=True) + + # Upsert to Milvus + upsert = upsert_to_milvus_with_tracking( + chunks=embedding.outputs["embedded_chunks"], + run_id=mlflow_run.outputs["run_id"], + collection_name=collection_name, + source_name=source_name or source_url, + mlflow_tracking_uri=mlflow_tracking_uri, + ) + + # Finalize run + finalize_mlflow_run( + run_id=mlflow_run.outputs["run_id"], + total_chunks=upsert.outputs["inserted_count"], + mlflow_tracking_uri=mlflow_tracking_uri, + ) + + +if __name__ == "__main__": + compiler.Compiler().compile( + document_ingestion_mlflow_pipeline, + "document_ingestion_mlflow_pipeline.yaml" + ) + print("Compiled: document_ingestion_mlflow_pipeline.yaml") diff --git a/document_ingestion_pipeline.py b/document_ingestion_pipeline.py new file mode 100644 index 0000000..cd5bd3d --- /dev/null +++ b/document_ingestion_pipeline.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python3 +""" +Document Ingestion Pipeline - Kubeflow Pipelines SDK + +Ingests documents into Milvus vector database with embeddings. +Can be triggered from Argo Workflows via the kfp-trigger template. + +Usage: + pip install kfp==2.12.1 + python document_ingestion_pipeline.py + # Upload document_ingestion.yaml to Kubeflow Pipelines UI +""" + +from kfp import dsl +from kfp import compiler +from typing import List + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["httpx", "beautifulsoup4", "pypdf2", "docx2txt"] +) +def extract_text( + source_url: str, + source_type: str = "auto" +) -> str: + """Extract text from various document formats.""" + import httpx + from pathlib import Path + import tempfile + + with httpx.Client(timeout=120.0) as client: + response = client.get(source_url) + content = response.content + + # Detect type if auto + if source_type == "auto": + if source_url.endswith(".pdf"): + source_type = "pdf" + elif source_url.endswith(".docx"): + source_type = "docx" + elif source_url.endswith(".html") or source_url.endswith(".htm"): + source_type = "html" + else: + source_type = "text" + + if source_type == "pdf": + from PyPDF2 import PdfReader + import io + reader = PdfReader(io.BytesIO(content)) + text = "\n\n".join([page.extract_text() for page in reader.pages]) + elif source_type == "docx": + import docx2txt + with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as f: + f.write(content) + text = docx2txt.process(f.name) + elif source_type == "html": + from bs4 import BeautifulSoup + soup = BeautifulSoup(content, "html.parser") + text = soup.get_text(separator="\n") + else: + text = content.decode("utf-8", errors="ignore") + + return text + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["tiktoken"] +) +def chunk_text( + text: str, + chunk_size: int = 500, + overlap: int = 50 +) -> list: + """Split text into overlapping chunks.""" + import tiktoken + + enc = tiktoken.get_encoding("cl100k_base") + tokens = enc.encode(text) + + chunks = [] + start = 0 + while start < len(tokens): + end = min(start + chunk_size, len(tokens)) + chunk_tokens = tokens[start:end] + chunk_text = enc.decode(chunk_tokens) + chunks.append({ + "text": chunk_text, + "start_token": start, + "end_token": end + }) + start += chunk_size - overlap + + return chunks + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["httpx"] +) +def generate_embeddings_batch( + chunks: list, + embeddings_url: str = "http://embeddings-predictor.ai-ml.svc.cluster.local", + batch_size: int = 32 +) -> list: + """Generate embeddings for all chunks.""" + import httpx + + embedded_chunks = [] + texts = [c["text"] for c in chunks] + + with httpx.Client(timeout=300.0) as client: + for i in range(0, len(texts), batch_size): + batch = texts[i:i + batch_size] + response = client.post( + f"{embeddings_url}/embeddings", + json={"input": batch, "model": "bge-small-en-v1.5"} + ) + result = response.json() + + for j, embedding_data in enumerate(result["data"]): + chunk = chunks[i + j].copy() + chunk["embedding"] = embedding_data["embedding"] + embedded_chunks.append(chunk) + + return embedded_chunks + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["pymilvus"] +) +def upsert_to_milvus( + chunks: list, + collection_name: str, + source_name: str, + milvus_host: str = "milvus.ai-ml.svc.cluster.local" +) -> int: + """Upsert embeddings to Milvus collection.""" + from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility + + connections.connect(host=milvus_host, port=19530) + + # Create collection if it doesn't exist + if not utility.has_collection(collection_name): + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), + FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=8192), + FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=512), + FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024) + ] + schema = CollectionSchema(fields, description="Document embeddings") + Collection(name=collection_name, schema=schema) + + collection = Collection(collection_name) + + # Prepare data + data = [ + [chunk["text"] for chunk in chunks], + [source_name for _ in chunks], + [chunk["embedding"] for chunk in chunks] + ] + + # Insert + result = collection.insert(data) + collection.flush() + + # Create index if needed + if not collection.has_index(): + collection.create_index( + field_name="embedding", + index_params={ + "metric_type": "COSINE", + "index_type": "HNSW", + "params": {"M": 16, "efConstruction": 256} + } + ) + + return len(chunks) + + +@dsl.pipeline( + name="document-ingestion-pipeline", + description="Ingest documents into Milvus: Extract -> Chunk -> Embed -> Store" +) +def document_ingestion_pipeline( + source_url: str, + collection_name: str = "knowledge_base", + source_name: str = "", + chunk_size: int = 500, + chunk_overlap: int = 50 +): + """ + Document Ingestion Pipeline + + Args: + source_url: URL to the document (PDF, DOCX, HTML, or plain text) + collection_name: Milvus collection to store embeddings + source_name: Human-readable name for the source + chunk_size: Token count per chunk + chunk_overlap: Overlap between chunks + """ + + # Step 1: Extract text from document + extract_task = extract_text(source_url=source_url) + extract_task.set_caching_options(enable_caching=True) + + # Step 2: Chunk the text + chunk_task = chunk_text( + text=extract_task.output, + chunk_size=chunk_size, + overlap=chunk_overlap + ) + chunk_task.set_caching_options(enable_caching=True) + + # Step 3: Generate embeddings + embed_task = generate_embeddings_batch(chunks=chunk_task.output) + embed_task.set_caching_options(enable_caching=True) + + # Step 4: Store in Milvus + store_task = upsert_to_milvus( + chunks=embed_task.output, + collection_name=collection_name, + source_name=source_name if source_name else source_url + ) + + +@dsl.pipeline( + name="batch-document-ingestion", + description="Ingest multiple documents in parallel" +) +def batch_document_ingestion_pipeline( + source_urls: List[str], + collection_name: str = "knowledge_base" +): + """ + Batch Document Ingestion + + Args: + source_urls: List of document URLs to ingest + collection_name: Target Milvus collection + """ + with dsl.ParallelFor(source_urls) as url: + document_ingestion_pipeline( + source_url=url, + collection_name=collection_name + ) + + +if __name__ == "__main__": + # Primary pipeline - filename must match Python file for sync + compiler.Compiler().compile( + document_ingestion_pipeline, + "document_ingestion_pipeline.yaml" + ) + print("Compiled: document_ingestion_pipeline.yaml") diff --git a/evaluation_pipeline.py b/evaluation_pipeline.py new file mode 100644 index 0000000..2397f2b --- /dev/null +++ b/evaluation_pipeline.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +""" +Model Evaluation Pipeline - Kubeflow Pipelines SDK + +Evaluates fine-tuned models against benchmarks. +Integrates with Argo Workflows for automated model deployment. + +Usage: + pip install kfp==2.12.1 + python evaluation_pipeline.py +""" + +from kfp import dsl +from kfp import compiler +from typing import Dict + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["httpx"] +) +def load_eval_dataset( + dataset_name: str = "mmlu", + subset: str = "test", + limit: int = 100 +) -> list: + """Load evaluation dataset samples.""" + import httpx + import json + + # For now, use a simple test set + # In production, this would load from HuggingFace or S3 + test_samples = [ + { + "question": "What is the capital of France?", + "choices": ["London", "Berlin", "Paris", "Madrid"], + "answer": "C" + }, + { + "question": "Which planet is known as the Red Planet?", + "choices": ["Venus", "Mars", "Jupiter", "Saturn"], + "answer": "B" + }, + { + "question": "What is 2 + 2?", + "choices": ["3", "4", "5", "6"], + "answer": "B" + } + ] + + return test_samples[:limit] + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["httpx"] +) +def run_inference( + samples: list, + model_endpoint: str, + model_name: str = "default" +) -> list: + """Run inference on evaluation samples.""" + import httpx + + results = [] + + with httpx.Client(timeout=120.0) as client: + for sample in samples: + prompt = f"""Answer the following multiple choice question. + +Question: {sample['question']} +Choices: +A) {sample['choices'][0]} +B) {sample['choices'][1]} +C) {sample['choices'][2]} +D) {sample['choices'][3]} + +Answer with just the letter (A, B, C, or D):""" + + response = client.post( + f"{model_endpoint}/v1/chat/completions", + json={ + "model": model_name, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 10, + "temperature": 0 + } + ) + + result = response.json() + answer = result["choices"][0]["message"]["content"].strip().upper() + + results.append({ + "question": sample["question"], + "expected": sample["answer"], + "predicted": answer[0] if answer else "X", + "correct": answer.startswith(sample["answer"]) + }) + + return results + + +@dsl.component( + base_image="python:3.13-slim" +) +def calculate_metrics( + results: list +) -> dict: + """Calculate evaluation metrics.""" + correct = sum(1 for r in results if r["correct"]) + total = len(results) + + accuracy = correct / total if total > 0 else 0 + + return { + "accuracy": accuracy, + "correct": correct, + "total": total, + "pass": accuracy >= 0.7 # 70% threshold + } + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["httpx"] +) +def publish_results( + metrics: dict, + model_name: str, + nats_url: str = "nats://nats.ai-ml.svc.cluster.local:4222" +) -> str: + """Publish evaluation results to NATS.""" + import subprocess + import sys + subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "nats-py"]) + + import asyncio + import json + import nats + + async def publish(): + nc = await nats.connect(nats_url) + await nc.publish( + f"ai.evaluation.results.{model_name}", + json.dumps({ + "model": model_name, + "metrics": metrics, + "status": "passed" if metrics["pass"] else "failed" + }).encode() + ) + await nc.close() + + asyncio.run(publish()) + return "published" + + +@dsl.pipeline( + name="model-evaluation-pipeline", + description="Evaluate model performance on benchmarks" +) +def model_evaluation_pipeline( + model_endpoint: str = "http://llm-draft.ai-ml.svc.cluster.local:8000", + model_name: str = "default", + dataset_name: str = "mmlu", + sample_limit: int = 100 +): + """ + Model Evaluation Pipeline + + Args: + model_endpoint: URL of the model inference endpoint + model_name: Name of the model being evaluated + dataset_name: Evaluation dataset to use + sample_limit: Maximum samples to evaluate + """ + + # Load dataset + load_task = load_eval_dataset( + dataset_name=dataset_name, + limit=sample_limit + ) + load_task.set_caching_options(enable_caching=True) + + # Run inference + inference_task = run_inference( + samples=load_task.output, + model_endpoint=model_endpoint, + model_name=model_name + ) + inference_task.set_caching_options(enable_caching=False) + + # Calculate metrics + metrics_task = calculate_metrics(results=inference_task.output) + + # Publish results + publish_task = publish_results( + metrics=metrics_task.output, + model_name=model_name + ) + + +if __name__ == "__main__": + compiler.Compiler().compile( + model_evaluation_pipeline, + "evaluation_pipeline.yaml" + ) + print("Compiled: evaluation_pipeline.yaml") diff --git a/kfp-sync-job.yaml b/kfp-sync-job.yaml new file mode 100644 index 0000000..cbfe938 --- /dev/null +++ b/kfp-sync-job.yaml @@ -0,0 +1,347 @@ +# KFP Pipeline Sync Job +# Automatically compiles Python pipeline definitions and uploads to Kubeflow +# Runs as a CronJob to keep pipelines in sync with Git +--- +# RBAC to allow reading GitRepository status for artifact URL +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: kfp-sync-flux-reader + namespace: ai-ml +rules: + - apiGroups: ["source.toolkit.fluxcd.io"] + resources: ["gitrepositories"] + verbs: ["get", "list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: kfp-sync-flux-reader + namespace: ai-ml +subjects: + - kind: ServiceAccount + name: pipeline-bridge + namespace: ai-ml +roleRef: + kind: Role + name: kfp-sync-flux-reader + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: batch/v1 +kind: CronJob +metadata: + name: kfp-pipeline-sync + namespace: ai-ml + labels: + app.kubernetes.io/name: kfp-pipeline-sync + app.kubernetes.io/part-of: llm-workflows +spec: + # Run every 30 minutes + schedule: "*/30 * * * *" + concurrencyPolicy: Forbid + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 3 + jobTemplate: + spec: + backoffLimit: 2 + template: + metadata: + labels: + app: kfp-pipeline-sync + spec: + schedulerName: volcano + restartPolicy: OnFailure + serviceAccountName: pipeline-bridge + containers: + - name: sync + image: python:3.13-slim + command: + - python + - /scripts/sync_pipelines.py + env: + - name: KUBEFLOW_HOST + value: "http://ml-pipeline.kubeflow.svc.cluster.local:8888" + - name: GIT_REPO_NAME + value: "llm-workflows" + - name: GIT_REPO_NAMESPACE + value: "ai-ml" + volumeMounts: + - name: scripts + mountPath: /scripts + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi + volumes: + - name: scripts + configMap: + name: kfp-sync-scripts +--- +# Manual trigger job (run this to sync immediately) +apiVersion: batch/v1 +kind: Job +metadata: + name: kfp-pipeline-sync-manual + namespace: ai-ml + labels: + app.kubernetes.io/name: kfp-pipeline-sync + app.kubernetes.io/part-of: llm-workflows + annotations: + description: "Delete and recreate to manually trigger sync" +spec: + backoffLimit: 1 + template: + metadata: + labels: + app: kfp-pipeline-sync + spec: + schedulerName: volcano + restartPolicy: Never + serviceAccountName: pipeline-bridge + containers: + - name: sync + image: python:3.13-slim + command: + - python + - /scripts/sync_pipelines.py + env: + - name: KUBEFLOW_HOST + value: "http://ml-pipeline.kubeflow.svc.cluster.local:8888" + - name: GIT_REPO_NAME + value: "llm-workflows" + - name: GIT_REPO_NAMESPACE + value: "ai-ml" + volumeMounts: + - name: scripts + mountPath: /scripts + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi + volumes: + - name: scripts + configMap: + name: kfp-sync-scripts +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: kfp-sync-scripts + namespace: ai-ml +data: + sync_pipelines.py: | + #!/usr/bin/env python3 + """ + KFP Pipeline Sync + + Compiles Python pipeline definitions and uploads to Kubeflow Pipelines. + Downloads from Flux source-controller artifact for secure access. + """ + import os + import sys + import subprocess + import tempfile + import tarfile + import hashlib + from pathlib import Path + from datetime import datetime + + # Install KFP and kubernetes client + subprocess.check_call([ + sys.executable, "-m", "pip", "install", "-q", + "kfp==2.12.1", "httpx", "kubernetes" + ]) + + from kfp import Client + from kfp import compiler + import httpx + from kubernetes import client as k8s_client, config as k8s_config + + KUBEFLOW_HOST = os.environ.get("KUBEFLOW_HOST", "http://ml-pipeline.kubeflow.svc.cluster.local:8888") + PIPELINES_DIR = os.environ.get("PIPELINES_DIR", "/pipelines") + # GitRepository to get artifact from + GIT_REPO_NAME = os.environ.get("GIT_REPO_NAME", "llm-workflows") + GIT_REPO_NAMESPACE = os.environ.get("GIT_REPO_NAMESPACE", "ai-ml") + + def get_flux_artifact_url() -> str: + """Get artifact URL from Flux GitRepository status.""" + try: + k8s_config.load_incluster_config() + api = k8s_client.CustomObjectsApi() + gitrepo = api.get_namespaced_custom_object( + group="source.toolkit.fluxcd.io", + version="v1", + namespace=GIT_REPO_NAMESPACE, + plural="gitrepositories", + name=GIT_REPO_NAME + ) + return gitrepo.get("status", {}).get("artifact", {}).get("url", "") + except Exception as e: + print(f"Error getting GitRepository: {e}") + return "" + + def get_file_hash(filepath: str) -> str: + """Get MD5 hash of file for change detection.""" + with open(filepath, "rb") as f: + return hashlib.md5(f.read()).hexdigest() + + def compile_pipeline(py_file: Path, output_dir: Path) -> Path: + """Compile a Python pipeline file to YAML.""" + output_file = output_dir / f"{py_file.stem}.yaml" + + # Execute the pipeline file to compile it + result = subprocess.run( + [sys.executable, str(py_file)], + cwd=str(py_file.parent), + capture_output=True, + text=True + ) + + if result.returncode != 0: + print(f"Warning: Failed to compile {py_file}: {result.stderr}") + return None + + # Check if YAML was generated + generated = py_file.parent / f"{py_file.stem}.yaml" + if generated.exists(): + generated.rename(output_file) + return output_file + + return None + + def upload_pipeline(client: Client, yaml_path: Path, pipeline_name: str) -> bool: + """Upload or update a pipeline in Kubeflow.""" + try: + # Check if pipeline exists by listing and filtering client-side + # KFP v2 API has different filter syntax + all_pipelines = client.list_pipelines(page_size=100) + existing = None + if all_pipelines.pipelines: + for p in all_pipelines.pipelines: + if p.display_name == pipeline_name: + existing = p + break + + if existing: + # Update existing pipeline + print(f"Updating existing pipeline: {pipeline_name}") + + # Create new version + version_name = f"v{datetime.now().strftime('%Y%m%d-%H%M%S')}" + client.upload_pipeline_version( + pipeline_package_path=str(yaml_path), + pipeline_version_name=version_name, + pipeline_id=existing.pipeline_id + ) + print(f"Created version {version_name}") + else: + # Create new pipeline + print(f"Creating new pipeline: {pipeline_name}") + client.upload_pipeline( + pipeline_package_path=str(yaml_path), + pipeline_name=pipeline_name + ) + + return True + + except Exception as e: + print(f"Error uploading {pipeline_name}: {e}") + return False + + def sync_from_flux_artifact(client: Client, artifact_url: str): + """Download from Flux source-controller and sync pipelines.""" + with tempfile.TemporaryDirectory() as tmpdir: + tarball = Path(tmpdir) / "source.tar.gz" + extract_dir = Path(tmpdir) / "source" + extract_dir.mkdir() + + # Download artifact from Flux source-controller + print(f"Downloading from Flux: {artifact_url}") + response = httpx.get(artifact_url, follow_redirects=True, timeout=60.0) + response.raise_for_status() + tarball.write_bytes(response.content) + + # Extract tarball + with tarfile.open(tarball, 'r:gz') as tar: + tar.extractall(extract_dir) + + pipelines_dir = extract_dir / "pipelines" + if not pipelines_dir.exists(): + print("No pipelines directory found in artifact") + return + + sync_directory(client, pipelines_dir) + + def sync_directory(client: Client, pipelines_dir: Path): + """Sync all pipeline files from a directory.""" + output_dir = Path("/tmp/compiled") + output_dir.mkdir(exist_ok=True) + + # Find all Python pipeline files + py_files = list(pipelines_dir.glob("*_pipeline.py")) + list(pipelines_dir.glob("*.pipeline.py")) + + if not py_files: + # Fall back to all Python files + py_files = list(pipelines_dir.glob("*.py")) + + print(f"Found {len(py_files)} pipeline files") + + for py_file in py_files: + print(f"\nProcessing: {py_file.name}") + + # Compile pipeline + yaml_path = compile_pipeline(py_file, output_dir) + if not yaml_path: + # Try direct YAML files + yaml_path = pipelines_dir / f"{py_file.stem}.yaml" + if not yaml_path.exists(): + continue + + pipeline_name = py_file.stem.replace("_", "-") + upload_pipeline(client, yaml_path, pipeline_name) + + def main(): + print(f"KFP Pipeline Sync starting...") + print(f"Kubeflow host: {KUBEFLOW_HOST}") + + # Wait for Kubeflow to be ready + client = None + for i in range(5): + try: + client = Client(host=KUBEFLOW_HOST) + # Test connection + client.list_pipelines(page_size=1) + print("Connected to Kubeflow Pipelines") + break + except Exception as e: + print(f"Waiting for Kubeflow... ({e})") + import time + time.sleep(10) + + if not client: + print("Failed to connect to Kubeflow") + sys.exit(1) + + # Get artifact URL from Flux GitRepository + artifact_url = get_flux_artifact_url() + if artifact_url: + print(f"\nSyncing from Flux artifact...") + sync_from_flux_artifact(client, artifact_url) + elif PIPELINES_DIR and Path(PIPELINES_DIR).exists(): + # Sync from local directory + print(f"\nSyncing from local: {PIPELINES_DIR}") + sync_directory(client, Path(PIPELINES_DIR)) + else: + print("No pipeline source configured!") + sys.exit(1) + + print("\nSync complete!") + + if __name__ == "__main__": + main() diff --git a/voice_pipeline.py b/voice_pipeline.py new file mode 100644 index 0000000..886f6c0 --- /dev/null +++ b/voice_pipeline.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python3 +""" +Voice Pipeline - Kubeflow Pipelines SDK + +Compile this to create a Kubeflow Pipeline for voice assistant workflows. + +Usage: + pip install kfp==2.12.1 + python voice_pipeline.py + # Upload voice_pipeline.yaml to Kubeflow Pipelines UI +""" + +from kfp import dsl +from kfp import compiler + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["httpx"] +) +def transcribe_audio( + audio_b64: str, + whisper_url: str = "http://whisper-predictor.ai-ml.svc.cluster.local" +) -> str: + """Transcribe audio using Whisper STT service.""" + import base64 + import httpx + + audio_bytes = base64.b64decode(audio_b64) + + with httpx.Client(timeout=120.0) as client: + response = client.post( + f"{whisper_url}/v1/audio/transcriptions", + files={"file": ("audio.wav", audio_bytes, "audio/wav")}, + data={"model": "whisper-large-v3", "language": "en"} + ) + result = response.json() + + return result.get("text", "") + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["httpx"] +) +def generate_embeddings( + text: str, + embeddings_url: str = "http://embeddings-predictor.ai-ml.svc.cluster.local" +) -> list: + """Generate embeddings for RAG retrieval.""" + import httpx + + with httpx.Client(timeout=60.0) as client: + response = client.post( + f"{embeddings_url}/embeddings", + json={"input": text, "model": "bge-small-en-v1.5"} + ) + result = response.json() + + return result["data"][0]["embedding"] + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["pymilvus"] +) +def retrieve_context( + embedding: list, + milvus_host: str = "milvus.ai-ml.svc.cluster.local", + collection_name: str = "knowledge_base", + top_k: int = 5 +) -> list: + """Retrieve relevant documents from Milvus vector database.""" + from pymilvus import connections, Collection, utility + + connections.connect(host=milvus_host, port=19530) + + if not utility.has_collection(collection_name): + return [] + + collection = Collection(collection_name) + collection.load() + + results = collection.search( + data=[embedding], + anns_field="embedding", + param={"metric_type": "COSINE", "params": {"nprobe": 10}}, + limit=top_k, + output_fields=["text", "source"] + ) + + documents = [] + for hits in results: + for hit in hits: + documents.append({ + "text": hit.entity.get("text"), + "source": hit.entity.get("source"), + "score": hit.distance + }) + + return documents + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["httpx"] +) +def rerank_documents( + query: str, + documents: list, + reranker_url: str = "http://reranker-predictor.ai-ml.svc.cluster.local", + top_k: int = 3 +) -> list: + """Rerank documents using BGE reranker.""" + import httpx + + if not documents: + return [] + + with httpx.Client(timeout=60.0) as client: + response = client.post( + f"{reranker_url}/v1/rerank", + json={ + "query": query, + "documents": [doc["text"] for doc in documents], + "model": "bge-reranker-v2-m3" + } + ) + result = response.json() + + # Sort by rerank score + reranked = sorted( + zip(documents, result.get("scores", [0] * len(documents))), + key=lambda x: x[1], + reverse=True + )[:top_k] + + return [doc for doc, score in reranked] + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["httpx"] +) +def generate_response( + query: str, + context: list, + vllm_url: str = "http://llm-draft.ai-ml.svc.cluster.local:8000", + model: str = "mistralai/Mistral-7B-Instruct-v0.3" +) -> str: + """Generate response using vLLM.""" + import httpx + + # Build context + if context: + context_text = "\n\n".join([doc["text"] for doc in context]) + user_content = f"Context:\n{context_text}\n\nQuestion: {query}" + else: + user_content = query + + system_prompt = """You are a helpful voice assistant. +Answer questions based on the provided context when available. +Keep responses concise and natural for speech synthesis.""" + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_content} + ] + + with httpx.Client(timeout=180.0) as client: + response = client.post( + f"{vllm_url}/v1/chat/completions", + json={ + "model": model, + "messages": messages, + "max_tokens": 512, + "temperature": 0.7 + } + ) + result = response.json() + + return result["choices"][0]["message"]["content"] + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["httpx"] +) +def synthesize_speech( + text: str, + tts_url: str = "http://tts-predictor.ai-ml.svc.cluster.local" +) -> str: + """Convert text to speech using TTS service.""" + import base64 + import httpx + + with httpx.Client(timeout=120.0) as client: + response = client.post( + f"{tts_url}/v1/audio/speech", + json={ + "input": text, + "voice": "en_US-lessac-high", + "response_format": "wav" + } + ) + audio_b64 = base64.b64encode(response.content).decode("utf-8") + + return audio_b64 + + +@dsl.pipeline( + name="voice-assistant-rag-pipeline", + description="End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS" +) +def voice_assistant_pipeline( + audio_b64: str, + collection_name: str = "knowledge_base" +): + """ + Voice Assistant Pipeline with RAG + + Args: + audio_b64: Base64-encoded audio file + collection_name: Milvus collection for RAG + """ + + # Step 1: Transcribe audio with Whisper + transcribe_task = transcribe_audio(audio_b64=audio_b64) + transcribe_task.set_caching_options(enable_caching=False) + + # Step 2: Generate embeddings + embed_task = generate_embeddings(text=transcribe_task.output) + embed_task.set_caching_options(enable_caching=True) + + # Step 3: Retrieve context from Milvus + retrieve_task = retrieve_context( + embedding=embed_task.output, + collection_name=collection_name + ) + + # Step 4: Rerank documents + rerank_task = rerank_documents( + query=transcribe_task.output, + documents=retrieve_task.output + ) + + # Step 5: Generate response with context + llm_task = generate_response( + query=transcribe_task.output, + context=rerank_task.output + ) + + # Step 6: Synthesize speech + tts_task = synthesize_speech(text=llm_task.output) + + +@dsl.pipeline( + name="text-to-speech-pipeline", + description="Simple text to speech pipeline" +) +def text_to_speech_pipeline(text: str): + """Simple TTS pipeline for testing.""" + tts_task = synthesize_speech(text=text) + + +@dsl.pipeline( + name="rag-query-pipeline", + description="RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM" +) +def rag_query_pipeline( + query: str, + collection_name: str = "knowledge_base" +): + """ + RAG Query Pipeline (text input, no voice) + + Args: + query: Text query + collection_name: Milvus collection name + """ + # Embed the query + embed_task = generate_embeddings(text=query) + + # Retrieve from Milvus + retrieve_task = retrieve_context( + embedding=embed_task.output, + collection_name=collection_name + ) + + # Rerank + rerank_task = rerank_documents( + query=query, + documents=retrieve_task.output + ) + + # Generate response + llm_task = generate_response( + query=query, + context=rerank_task.output + ) + + +if __name__ == "__main__": + # Compile all pipelines + pipelines = [ + ("voice_pipeline.yaml", voice_assistant_pipeline), + ("tts_pipeline.yaml", text_to_speech_pipeline), + ("rag_pipeline.yaml", rag_query_pipeline), + ] + + for filename, pipeline_func in pipelines: + compiler.Compiler().compile(pipeline_func, filename) + print(f"Compiled: {filename}") + + print("\nUpload these YAML files to Kubeflow Pipelines UI at:") + print(" http://kubeflow.example.com/pipelines")