diff --git a/rag_pipeline.yaml b/rag_pipeline.yaml new file mode 100644 index 0000000..e95b6a6 --- /dev/null +++ b/rag_pipeline.yaml @@ -0,0 +1,363 @@ +# PIPELINE DEFINITION +# Name: rag-query-pipeline +# Description: RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM. Logs per-step latency to MLflow. +# Inputs: +# collection_name: str [Default: 'knowledge_base'] +# query: str +components: + comp-generate-embeddings: + executorLabel: exec-generate-embeddings + inputDefinitions: + parameters: + embeddings_url: + defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings + isOptional: true + parameterType: STRING + text: + parameterType: STRING + outputDefinitions: + parameters: + embedding: + parameterType: LIST + latency_s: + parameterType: NUMBER_DOUBLE + comp-generate-response: + executorLabel: exec-generate-response + inputDefinitions: + parameters: + context: + parameterType: LIST + model: + defaultValue: hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4 + isOptional: true + parameterType: STRING + query: + parameterType: STRING + vllm_url: + defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + completion_tokens: + parameterType: NUMBER_INTEGER + latency_s: + parameterType: NUMBER_DOUBLE + text: + parameterType: STRING + comp-rerank-documents: + executorLabel: exec-rerank-documents + inputDefinitions: + parameters: + documents: + parameterType: LIST + query: + parameterType: STRING + reranker_url: + defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker + isOptional: true + parameterType: STRING + top_k: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + documents: + parameterType: LIST + latency_s: + parameterType: NUMBER_DOUBLE + comp-retrieve-context: + executorLabel: exec-retrieve-context + inputDefinitions: + parameters: + collection_name: + defaultValue: knowledge_base + isOptional: true + parameterType: STRING + embedding: + parameterType: LIST + milvus_host: + defaultValue: milvus.ai-ml.svc.cluster.local + isOptional: true + parameterType: STRING + top_k: + defaultValue: 5.0 + isOptional: true + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + documents: + parameterType: LIST + latency_s: + parameterType: NUMBER_DOUBLE +deploymentSpec: + executors: + exec-generate-embeddings: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - generate_embeddings + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef generate_embeddings(\n text: str,\n embeddings_url: str\ + \ = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings\"\ + \n) -> NamedTuple(\"EmbedResult\", [(\"embedding\", list), (\"latency_s\"\ + , float)]):\n \"\"\"Generate embeddings for RAG retrieval.\"\"\"\n \ + \ import time\n import httpx\n from collections import namedtuple\n\ + \n start = time.perf_counter()\n with httpx.Client(timeout=60.0) as\ + \ client:\n response = client.post(\n f\"{embeddings_url}/embeddings\"\ + ,\n json={\"input\": text, \"model\": \"bge-small-en-v1.5\"}\n\ + \ )\n result = response.json()\n latency = time.perf_counter()\ + \ - start\n\n EmbedResult = namedtuple(\"EmbedResult\", [\"embedding\"\ + , \"latency_s\"])\n return EmbedResult(result[\"data\"][0][\"embedding\"\ + ], latency)\n\n" + image: python:3.13-slim + exec-generate-response: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - generate_response + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef generate_response(\n query: str,\n context: list,\n \ + \ vllm_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm\"\ + ,\n model: str = \"hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4\"\ + \n) -> NamedTuple(\"LLMResult\", [(\"text\", str), (\"latency_s\", float),\ + \ (\"completion_tokens\", int)]):\n \"\"\"Generate response using vLLM.\"\ + \"\"\n import time\n import httpx\n from collections import namedtuple\n\ + \n # Build context\n if context:\n context_text = \"\\n\\n\"\ + .join([doc[\"text\"] for doc in context])\n user_content = f\"Context:\\\ + n{context_text}\\n\\nQuestion: {query}\"\n else:\n user_content\ + \ = query\n\n system_prompt = \"\"\"You are a helpful voice assistant.\n\ + Answer questions based on the provided context when available.\nKeep responses\ + \ concise and natural for speech synthesis.\"\"\"\n\n messages = [\n\ + \ {\"role\": \"system\", \"content\": system_prompt},\n {\"\ + role\": \"user\", \"content\": user_content}\n ]\n\n start = time.perf_counter()\n\ + \ with httpx.Client(timeout=180.0) as client:\n response = client.post(\n\ + \ f\"{vllm_url}/v1/chat/completions\",\n json={\n\ + \ \"model\": model,\n \"messages\": messages,\n\ + \ \"max_tokens\": 512,\n \"temperature\":\ + \ 0.7\n }\n )\n result = response.json()\n latency\ + \ = time.perf_counter() - start\n\n text = result[\"choices\"][0][\"\ + message\"][\"content\"]\n usage = result.get(\"usage\", {})\n completion_tokens\ + \ = usage.get(\"completion_tokens\", len(text.split()))\n\n LLMResult\ + \ = namedtuple(\"LLMResult\", [\"text\", \"latency_s\", \"completion_tokens\"\ + ])\n return LLMResult(text, latency, completion_tokens)\n\n" + image: python:3.13-slim + exec-rerank-documents: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - rerank_documents + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef rerank_documents(\n query: str,\n documents: list,\n \ + \ reranker_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker\"\ + ,\n top_k: int = 3\n) -> NamedTuple(\"RerankResult\", [(\"documents\"\ + , list), (\"latency_s\", float)]):\n \"\"\"Rerank documents using BGE\ + \ reranker.\"\"\"\n import time\n import httpx\n from collections\ + \ import namedtuple\n\n if not documents:\n RerankResult = namedtuple(\"\ + RerankResult\", [\"documents\", \"latency_s\"])\n return RerankResult([],\ + \ 0.0)\n\n start = time.perf_counter()\n with httpx.Client(timeout=60.0)\ + \ as client:\n response = client.post(\n f\"{reranker_url}/v1/rerank\"\ + ,\n json={\n \"query\": query,\n \ + \ \"documents\": [doc[\"text\"] for doc in documents],\n \ + \ \"model\": \"bge-reranker-v2-m3\"\n }\n )\n \ + \ result = response.json()\n latency = time.perf_counter() - start\n\ + \n # Sort by rerank score\n reranked = sorted(\n zip(documents,\ + \ result.get(\"scores\", [0] * len(documents))),\n key=lambda x:\ + \ x[1],\n reverse=True\n )[:top_k]\n\n RerankResult = namedtuple(\"\ + RerankResult\", [\"documents\", \"latency_s\"])\n return RerankResult([doc\ + \ for doc, score in reranked], latency)\n\n" + image: python:3.13-slim + exec-retrieve-context: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - retrieve_context + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pymilvus' &&\ + \ \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef retrieve_context(\n embedding: list,\n milvus_host: str\ + \ = \"milvus.ai-ml.svc.cluster.local\",\n collection_name: str = \"knowledge_base\"\ + ,\n top_k: int = 5\n) -> NamedTuple(\"RetrieveResult\", [(\"documents\"\ + , list), (\"latency_s\", float)]):\n \"\"\"Retrieve relevant documents\ + \ from Milvus vector database.\"\"\"\n import time\n from pymilvus\ + \ import connections, Collection, utility\n from collections import namedtuple\n\ + \n start = time.perf_counter()\n connections.connect(host=milvus_host,\ + \ port=19530)\n\n if not utility.has_collection(collection_name):\n \ + \ latency = time.perf_counter() - start\n RetrieveResult =\ + \ namedtuple(\"RetrieveResult\", [\"documents\", \"latency_s\"])\n \ + \ return RetrieveResult([], latency)\n\n collection = Collection(collection_name)\n\ + \ collection.load()\n\n results = collection.search(\n data=[embedding],\n\ + \ anns_field=\"embedding\",\n param={\"metric_type\": \"COSINE\"\ + , \"params\": {\"nprobe\": 10}},\n limit=top_k,\n output_fields=[\"\ + text\", \"source\"]\n )\n latency = time.perf_counter() - start\n\n\ + \ documents = []\n for hits in results:\n for hit in hits:\n\ + \ documents.append({\n \"text\": hit.entity.get(\"\ + text\"),\n \"source\": hit.entity.get(\"source\"),\n \ + \ \"score\": hit.distance\n })\n\n RetrieveResult\ + \ = namedtuple(\"RetrieveResult\", [\"documents\", \"latency_s\"])\n \ + \ return RetrieveResult(documents, latency)\n\n" + image: python:3.13-slim +pipelineInfo: + description: 'RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM. Logs per-step + latency to MLflow.' + name: rag-query-pipeline +root: + dag: + tasks: + generate-embeddings: + cachingOptions: + enableCache: true + componentRef: + name: comp-generate-embeddings + inputs: + parameters: + text: + componentInputParameter: query + taskInfo: + name: generate-embeddings + generate-response: + cachingOptions: + enableCache: true + componentRef: + name: comp-generate-response + dependentTasks: + - rerank-documents + inputs: + parameters: + context: + taskOutputParameter: + outputParameterKey: documents + producerTask: rerank-documents + query: + componentInputParameter: query + taskInfo: + name: generate-response + rerank-documents: + cachingOptions: + enableCache: true + componentRef: + name: comp-rerank-documents + dependentTasks: + - retrieve-context + inputs: + parameters: + documents: + taskOutputParameter: + outputParameterKey: documents + producerTask: retrieve-context + query: + componentInputParameter: query + taskInfo: + name: rerank-documents + retrieve-context: + cachingOptions: + enableCache: true + componentRef: + name: comp-retrieve-context + dependentTasks: + - generate-embeddings + inputs: + parameters: + collection_name: + componentInputParameter: collection_name + embedding: + taskOutputParameter: + outputParameterKey: embedding + producerTask: generate-embeddings + taskInfo: + name: retrieve-context + inputDefinitions: + parameters: + collection_name: + defaultValue: knowledge_base + description: Milvus collection name + isOptional: true + parameterType: STRING + query: + description: Text query + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.12.1 diff --git a/tts_pipeline.yaml b/tts_pipeline.yaml new file mode 100644 index 0000000..e731f0e --- /dev/null +++ b/tts_pipeline.yaml @@ -0,0 +1,87 @@ +# PIPELINE DEFINITION +# Name: text-to-speech-pipeline +# Description: Simple text to speech pipeline +# Inputs: +# text: str +components: + comp-synthesize-speech: + executorLabel: exec-synthesize-speech + inputDefinitions: + parameters: + text: + parameterType: STRING + tts_url: + defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + audio_b64: + parameterType: STRING + latency_s: + parameterType: NUMBER_DOUBLE +deploymentSpec: + executors: + exec-synthesize-speech: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - synthesize_speech + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef synthesize_speech(\n text: str,\n tts_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts\"\ + \n) -> NamedTuple(\"TTSResult\", [(\"audio_b64\", str), (\"latency_s\",\ + \ float)]):\n \"\"\"Convert text to speech using TTS service.\"\"\"\n\ + \ import base64\n import time\n import httpx\n from collections\ + \ import namedtuple\n\n start = time.perf_counter()\n with httpx.Client(timeout=120.0)\ + \ as client:\n response = client.post(\n f\"{tts_url}/v1/audio/speech\"\ + ,\n json={\n \"input\": text,\n \ + \ \"voice\": \"en_US-lessac-high\",\n \"response_format\"\ + : \"wav\"\n }\n )\n audio_b64 = base64.b64encode(response.content).decode(\"\ + utf-8\")\n latency = time.perf_counter() - start\n\n TTSResult = namedtuple(\"\ + TTSResult\", [\"audio_b64\", \"latency_s\"])\n return TTSResult(audio_b64,\ + \ latency)\n\n" + image: python:3.13-slim +pipelineInfo: + description: Simple text to speech pipeline + name: text-to-speech-pipeline +root: + dag: + tasks: + synthesize-speech: + cachingOptions: + enableCache: true + componentRef: + name: comp-synthesize-speech + inputs: + parameters: + text: + componentInputParameter: text + taskInfo: + name: synthesize-speech + inputDefinitions: + parameters: + text: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.12.1 diff --git a/vllm_tuning_pipeline.py b/vllm_tuning_pipeline.py new file mode 100644 index 0000000..d356ebc --- /dev/null +++ b/vllm_tuning_pipeline.py @@ -0,0 +1,454 @@ +#!/usr/bin/env python3 +""" +vLLM Tuning Evaluation Pipeline - Kubeflow Pipelines SDK + +Runs inference benchmarks with different vLLM configurations and logs +results to MLflow so you can compare APC, chunked prefill, speculative +decoding, and GPU memory utilisation settings side-by-side. + +Usage: + pip install kfp==2.12.1 + python vllm_tuning_pipeline.py + # Upload vllm_tuning_pipeline.yaml to Kubeflow Pipelines UI +""" + +from kfp import dsl +from kfp import compiler +from typing import NamedTuple + + +MLFLOW_IMAGE = "python:3.13-slim" +MLFLOW_PACKAGES = ["mlflow>=2.10.0", "boto3", "psycopg2-binary"] +BENCH_PACKAGES = ["httpx"] + + +# ---- MLflow components ---- + + +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) +def create_tuning_run( + experiment_name: str, + run_name: str, + tuning_params: dict, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +) -> NamedTuple("RunInfo", [("run_id", str), ("experiment_id", str)]): + """Create an MLflow run for a vLLM tuning experiment.""" + import os + import mlflow + from mlflow.tracking import MlflowClient + from collections import namedtuple + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + exp = client.get_experiment_by_name(experiment_name) + experiment_id = ( + exp.experiment_id + if exp + else client.create_experiment( + name=experiment_name, + artifact_location=f"/mlflow/artifacts/{experiment_name}", + ) + ) + + tags = { + "pipeline.type": "vllm-tuning", + "kfp.run_id": os.environ.get("KFP_RUN_ID", "unknown"), + } + + run = mlflow.start_run( + experiment_id=experiment_id, run_name=run_name, tags=tags + ) + # Log every tuning param + for key, value in tuning_params.items(): + mlflow.log_param(f"vllm.{key}", value) + run_id = run.info.run_id + mlflow.end_run() + + RunInfo = namedtuple("RunInfo", ["run_id", "experiment_id"]) + return RunInfo(run_id, experiment_id) + + +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) +def log_benchmark_results( + run_id: str, + metrics: dict, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +) -> str: + """Log benchmark metrics to MLflow and close the run.""" + import json + import tempfile + import mlflow + from mlflow.tracking import MlflowClient + from pathlib import Path + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + for key, value in metrics.items(): + client.log_metric(run_id, key, float(value)) + + # Save full results as artifact + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "benchmark_results.json" + path.write_text(json.dumps(metrics, indent=2)) + client.log_artifact(run_id, str(path)) + + client.set_terminated(run_id, status="FINISHED") + return run_id + + +# ---- Benchmark components ---- + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=BENCH_PACKAGES, +) +def build_prompt_suite() -> list: + """Return a list of test prompts spanning short, medium, and long inputs.""" + return [ + { + "id": "short-1", + "category": "short", + "messages": [ + {"role": "user", "content": "What is the capital of France?"} + ], + "max_tokens": 64, + }, + { + "id": "short-2", + "category": "short", + "messages": [ + {"role": "user", "content": "Explain quantum computing in one sentence."} + ], + "max_tokens": 64, + }, + { + "id": "medium-1", + "category": "medium", + "messages": [ + { + "role": "system", + "content": "You are a helpful AI assistant running on a homelab.", + }, + { + "role": "user", + "content": ( + "Compare and contrast supervised and unsupervised " + "machine learning. Give examples of each and explain " + "when you would choose one over the other." + ), + }, + ], + "max_tokens": 512, + }, + { + "id": "medium-2", + "category": "medium", + "messages": [ + { + "role": "user", + "content": ( + "Write a Python function that implements a binary search " + "tree with insert, search, and delete operations. Include " + "docstrings and type hints." + ), + }, + ], + "max_tokens": 1024, + }, + { + "id": "long-1", + "category": "long", + "messages": [ + { + "role": "system", + "content": "You are a technical writer for a Kubernetes homelab blog.", + }, + { + "role": "user", + "content": ( + "Write a detailed tutorial on setting up a multi-node " + "Kubernetes cluster with Talos Linux, covering: " + "1) Hardware requirements and network topology, " + "2) Talos machine config generation, " + "3) Control plane bootstrapping, " + "4) Worker node joining, " + "5) CNI setup with Cilium, " + "6) Storage with Rook-Ceph, " + "7) GitOps with Flux CD. " + "Include YAML examples for each step." + ), + }, + ], + "max_tokens": 2048, + }, + { + "id": "repeat-prefix-1", + "category": "prefix-cache-test", + "messages": [ + { + "role": "system", + "content": "You are a helpful AI assistant running on a homelab.", + }, + { + "role": "user", + "content": ( + "Compare and contrast supervised and unsupervised " + "machine learning. Now focus specifically on " + "reinforcement learning and how it differs." + ), + }, + ], + "max_tokens": 512, + }, + ] + + +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=BENCH_PACKAGES, +) +def run_benchmark( + prompts: list, + llm_endpoint: str, + model_name: str, + num_warmup: int = 2, + num_iterations: int = 3, +) -> dict: + """ + Run all prompts through the LLM endpoint and collect timing metrics. + + Returns aggregate metrics: p50/p95/mean latency, tokens/sec, TTFT. + """ + import time + import statistics + import httpx + + all_latencies: list[float] = [] + all_tps: list[float] = [] + all_ttft: list[float] = [] + per_category: dict[str, list[float]] = {} + + with httpx.Client(timeout=300.0) as client: + # Warmup + for _ in range(num_warmup): + try: + client.post( + f"{llm_endpoint}/v1/chat/completions", + json={ + "model": model_name, + "messages": [{"role": "user", "content": "Hi"}], + "max_tokens": 8, + "temperature": 0, + }, + ) + except Exception: + pass + + # Benchmark + for iteration in range(num_iterations): + for prompt in prompts: + category = prompt.get("category", "unknown") + payload = { + "model": model_name, + "messages": prompt["messages"], + "max_tokens": prompt.get("max_tokens", 256), + "temperature": 0, + "stream": True, + } + + try: + t_start = time.perf_counter() + first_token_time = None + + with client.stream( + "POST", + f"{llm_endpoint}/v1/chat/completions", + json=payload, + ) as resp: + resp.raise_for_status() + completion_tokens = 0 + for line in resp.iter_lines(): + if not line.startswith("data: "): + continue + chunk = line[6:] + if chunk == "[DONE]": + break + if first_token_time is None: + first_token_time = time.perf_counter() + completion_tokens += 1 + + t_end = time.perf_counter() + latency = t_end - t_start + ttft = ( + (first_token_time - t_start) + if first_token_time + else latency + ) + tps = ( + completion_tokens / latency if latency > 0 else 0 + ) + + all_latencies.append(latency) + all_tps.append(tps) + all_ttft.append(ttft) + per_category.setdefault(category, []).append(latency) + + except Exception as exc: + # Record failure but keep going + all_latencies.append(-1) + all_tps.append(0) + all_ttft.append(-1) + + # Compute aggregates + valid_latencies = [l for l in all_latencies if l > 0] + valid_tps = [t for t in all_tps if t > 0] + valid_ttft = [t for t in all_ttft if t > 0] + + def safe_stat(values, func): + return func(values) if values else 0 + + metrics = { + "total_requests": len(all_latencies), + "successful_requests": len(valid_latencies), + "failed_requests": len(all_latencies) - len(valid_latencies), + # Latency + "latency_mean_s": safe_stat(valid_latencies, statistics.mean), + "latency_p50_s": safe_stat( + valid_latencies, + lambda v: statistics.median(v), + ), + "latency_p95_s": safe_stat( + valid_latencies, + lambda v: sorted(v)[int(len(v) * 0.95)] if v else 0, + ), + # Throughput + "tokens_per_second_mean": safe_stat(valid_tps, statistics.mean), + "tokens_per_second_p50": safe_stat( + valid_tps, lambda v: statistics.median(v) + ), + # Time to first token + "ttft_mean_s": safe_stat(valid_ttft, statistics.mean), + "ttft_p50_s": safe_stat(valid_ttft, lambda v: statistics.median(v)), + "ttft_p95_s": safe_stat( + valid_ttft, + lambda v: sorted(v)[int(len(v) * 0.95)] if v else 0, + ), + } + + # Per-category latency + for cat, lats in per_category.items(): + valid = [l for l in lats if l > 0] + if valid: + metrics[f"latency_mean_{cat}_s"] = statistics.mean(valid) + + return metrics + + +# ---- Pipeline ---- + + +@dsl.pipeline( + name="vllm-tuning-evaluation", + description=( + "Benchmark vLLM with different tuning configurations. " + "Logs latency, TPS, and TTFT to MLflow for A/B comparison." + ), +) +def vllm_tuning_pipeline( + llm_endpoint: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm", + model_name: str = "hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4", + # Tuning knobs (match env vars in rayservice.yaml) + enable_prefix_caching: str = "true", + enable_chunked_prefill: str = "true", + num_speculative_tokens: str = "3", + ngram_prompt_lookup_max: str = "4", + gpu_memory_utilization: str = "0.90", + # Benchmark config + num_warmup: int = 2, + num_iterations: int = 3, + run_label: str = "baseline", +): + """ + vLLM Tuning Evaluation Pipeline + + Run this multiple times with different tuning params, then compare + runs in the MLflow "vllm-tuning" experiment. + + Args: + llm_endpoint: vLLM inference endpoint URL + model_name: HF model identifier + enable_prefix_caching: "true" or "false" + enable_chunked_prefill: "true" or "false" + num_speculative_tokens: number of speculative tokens (0 = off) + ngram_prompt_lookup_max: ngram window for spec decode (0 = off) + gpu_memory_utilization: 0.0 - 1.0 + num_warmup: warmup requests before timing + num_iterations: how many times to repeat the prompt suite + run_label: human-readable label (e.g. "apc-on-spec3") + """ + + tuning_params = { + "enable_prefix_caching": enable_prefix_caching, + "enable_chunked_prefill": enable_chunked_prefill, + "num_speculative_tokens": num_speculative_tokens, + "ngram_prompt_lookup_max": ngram_prompt_lookup_max, + "gpu_memory_utilization": gpu_memory_utilization, + "model_name": model_name, + "llm_endpoint": llm_endpoint, + "num_warmup": str(num_warmup), + "num_iterations": str(num_iterations), + } + + # 1. Create MLflow run + mlflow_run = create_tuning_run( + experiment_name="vllm-tuning", + run_name=f"vllm-{run_label}", + tuning_params=tuning_params, + ) + + # 2. Build prompt suite + prompts_task = build_prompt_suite() + prompts_task.set_caching_options(enable_caching=True) + + # 3. Run benchmark + bench_task = run_benchmark( + prompts=prompts_task.output, + llm_endpoint=llm_endpoint, + model_name=model_name, + num_warmup=num_warmup, + num_iterations=num_iterations, + ) + bench_task.set_caching_options(enable_caching=False) + + # 4. Log results to MLflow + log_task = log_benchmark_results( + run_id=mlflow_run.outputs["run_id"], + metrics=bench_task.output, + ) + + +if __name__ == "__main__": + compiler.Compiler().compile( + vllm_tuning_pipeline, + "vllm_tuning_pipeline.yaml", + ) + print("Compiled: vllm_tuning_pipeline.yaml") + print() + print("Example runs to compare configurations:") + print(" # Baseline (current config)") + print(" kfp run submit vllm_tuning_pipeline.yaml --run-label=baseline") + print() + print(" # APC disabled") + print(" kfp run submit vllm_tuning_pipeline.yaml \\") + print(" --enable-prefix-caching=false --run-label=no-apc") + print() + print(" # No speculative decoding") + print(" kfp run submit vllm_tuning_pipeline.yaml \\") + print(" --num-speculative-tokens=0 --run-label=no-spec") + print() + print(" # Aggressive spec decode") + print(" kfp run submit vllm_tuning_pipeline.yaml \\") + print(" --num-speculative-tokens=5 --ngram-prompt-lookup-max=6 --run-label=spec5-ngram6") diff --git a/vllm_tuning_pipeline.yaml b/vllm_tuning_pipeline.yaml new file mode 100644 index 0000000..c9c9b61 --- /dev/null +++ b/vllm_tuning_pipeline.yaml @@ -0,0 +1,501 @@ +# PIPELINE DEFINITION +# Name: vllm-tuning-evaluation +# Description: Benchmark vLLM with different tuning configurations. Logs latency, TPS, and TTFT to MLflow for A/B comparison. +# Inputs: +# enable_chunked_prefill: str [Default: 'true'] +# enable_prefix_caching: str [Default: 'true'] +# gpu_memory_utilization: str [Default: '0.90'] +# llm_endpoint: str [Default: 'http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm'] +# model_name: str [Default: 'hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4'] +# ngram_prompt_lookup_max: str [Default: '4'] +# num_iterations: int [Default: 3.0] +# num_speculative_tokens: str [Default: '3'] +# num_warmup: int [Default: 2.0] +# run_label: str [Default: 'baseline'] +components: + comp-build-prompt-suite: + executorLabel: exec-build-prompt-suite + outputDefinitions: + parameters: + Output: + parameterType: LIST + comp-create-tuning-run: + executorLabel: exec-create-tuning-run + inputDefinitions: + parameters: + experiment_name: + parameterType: STRING + mlflow_tracking_uri: + defaultValue: http://mlflow.mlflow.svc.cluster.local:80 + isOptional: true + parameterType: STRING + run_name: + parameterType: STRING + tuning_params: + parameterType: STRUCT + outputDefinitions: + parameters: + experiment_id: + parameterType: STRING + run_id: + parameterType: STRING + comp-log-benchmark-results: + executorLabel: exec-log-benchmark-results + inputDefinitions: + parameters: + metrics: + parameterType: STRUCT + mlflow_tracking_uri: + defaultValue: http://mlflow.mlflow.svc.cluster.local:80 + isOptional: true + parameterType: STRING + run_id: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-run-benchmark: + executorLabel: exec-run-benchmark + inputDefinitions: + parameters: + llm_endpoint: + parameterType: STRING + model_name: + parameterType: STRING + num_iterations: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + num_warmup: + defaultValue: 2.0 + isOptional: true + parameterType: NUMBER_INTEGER + prompts: + parameterType: LIST + outputDefinitions: + parameters: + Output: + parameterType: STRUCT +deploymentSpec: + executors: + exec-build-prompt-suite: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - build_prompt_suite + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef build_prompt_suite() -> list:\n \"\"\"Return a list of test\ + \ prompts spanning short, medium, and long inputs.\"\"\"\n return [\n\ + \ {\n \"id\": \"short-1\",\n \"category\":\ + \ \"short\",\n \"messages\": [\n {\"role\": \"\ + user\", \"content\": \"What is the capital of France?\"}\n ],\n\ + \ \"max_tokens\": 64,\n },\n {\n \"\ + id\": \"short-2\",\n \"category\": \"short\",\n \"\ + messages\": [\n {\"role\": \"user\", \"content\": \"Explain\ + \ quantum computing in one sentence.\"}\n ],\n \"\ + max_tokens\": 64,\n },\n {\n \"id\": \"medium-1\"\ + ,\n \"category\": \"medium\",\n \"messages\": [\n\ + \ {\n \"role\": \"system\",\n \ + \ \"content\": \"You are a helpful AI assistant running on a\ + \ homelab.\",\n },\n {\n \ + \ \"role\": \"user\",\n \"content\": (\n \ + \ \"Compare and contrast supervised and unsupervised \"\n \ + \ \"machine learning. Give examples of each and explain\ + \ \"\n \"when you would choose one over the other.\"\ + \n ),\n },\n ],\n \ + \ \"max_tokens\": 512,\n },\n {\n \"id\": \"\ + medium-2\",\n \"category\": \"medium\",\n \"messages\"\ + : [\n {\n \"role\": \"user\",\n \ + \ \"content\": (\n \"Write a Python\ + \ function that implements a binary search \"\n \"\ + tree with insert, search, and delete operations. Include \"\n \ + \ \"docstrings and type hints.\"\n ),\n\ + \ },\n ],\n \"max_tokens\": 1024,\n\ + \ },\n {\n \"id\": \"long-1\",\n \"\ + category\": \"long\",\n \"messages\": [\n {\n\ + \ \"role\": \"system\",\n \"content\"\ + : \"You are a technical writer for a Kubernetes homelab blog.\",\n \ + \ },\n {\n \"role\": \"user\"\ + ,\n \"content\": (\n \"Write a\ + \ detailed tutorial on setting up a multi-node \"\n \ + \ \"Kubernetes cluster with Talos Linux, covering: \"\n \ + \ \"1) Hardware requirements and network topology, \"\n \ + \ \"2) Talos machine config generation, \"\n \ + \ \"3) Control plane bootstrapping, \"\n \ + \ \"4) Worker node joining, \"\n \"5) CNI setup\ + \ with Cilium, \"\n \"6) Storage with Rook-Ceph,\ + \ \"\n \"7) GitOps with Flux CD. \"\n \ + \ \"Include YAML examples for each step.\"\n \ + \ ),\n },\n ],\n \"max_tokens\"\ + : 2048,\n },\n {\n \"id\": \"repeat-prefix-1\"\ + ,\n \"category\": \"prefix-cache-test\",\n \"messages\"\ + : [\n {\n \"role\": \"system\",\n \ + \ \"content\": \"You are a helpful AI assistant running on\ + \ a homelab.\",\n },\n {\n \ + \ \"role\": \"user\",\n \"content\": (\n \ + \ \"Compare and contrast supervised and unsupervised \"\n\ + \ \"machine learning. Now focus specifically on \"\ + \n \"reinforcement learning and how it differs.\"\ + \n ),\n },\n ],\n \ + \ \"max_tokens\": 512,\n },\n ]\n\n" + image: python:3.13-slim + exec-create-tuning-run: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_tuning_run + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'mlflow>=2.10.0'\ + \ 'boto3' 'psycopg2-binary' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_tuning_run(\n experiment_name: str,\n run_name:\ + \ str,\n tuning_params: dict,\n mlflow_tracking_uri: str = \"http://mlflow.mlflow.svc.cluster.local:80\"\ + ,\n) -> NamedTuple(\"RunInfo\", [(\"run_id\", str), (\"experiment_id\",\ + \ str)]):\n \"\"\"Create an MLflow run for a vLLM tuning experiment.\"\ + \"\"\n import os\n import mlflow\n from mlflow.tracking import\ + \ MlflowClient\n from collections import namedtuple\n\n mlflow.set_tracking_uri(mlflow_tracking_uri)\n\ + \ client = MlflowClient()\n\n exp = client.get_experiment_by_name(experiment_name)\n\ + \ experiment_id = (\n exp.experiment_id\n if exp\n \ + \ else client.create_experiment(\n name=experiment_name,\n\ + \ artifact_location=f\"/mlflow/artifacts/{experiment_name}\"\ + ,\n )\n )\n\n tags = {\n \"pipeline.type\": \"vllm-tuning\"\ + ,\n \"kfp.run_id\": os.environ.get(\"KFP_RUN_ID\", \"unknown\"),\n\ + \ }\n\n run = mlflow.start_run(\n experiment_id=experiment_id,\ + \ run_name=run_name, tags=tags\n )\n # Log every tuning param\n \ + \ for key, value in tuning_params.items():\n mlflow.log_param(f\"\ + vllm.{key}\", value)\n run_id = run.info.run_id\n mlflow.end_run()\n\ + \n RunInfo = namedtuple(\"RunInfo\", [\"run_id\", \"experiment_id\"])\n\ + \ return RunInfo(run_id, experiment_id)\n\n" + image: python:3.13-slim + exec-log-benchmark-results: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - log_benchmark_results + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'mlflow>=2.10.0'\ + \ 'boto3' 'psycopg2-binary' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef log_benchmark_results(\n run_id: str,\n metrics: dict,\n\ + \ mlflow_tracking_uri: str = \"http://mlflow.mlflow.svc.cluster.local:80\"\ + ,\n) -> str:\n \"\"\"Log benchmark metrics to MLflow and close the run.\"\ + \"\"\n import json\n import tempfile\n import mlflow\n from\ + \ mlflow.tracking import MlflowClient\n from pathlib import Path\n\n\ + \ mlflow.set_tracking_uri(mlflow_tracking_uri)\n client = MlflowClient()\n\ + \n for key, value in metrics.items():\n client.log_metric(run_id,\ + \ key, float(value))\n\n # Save full results as artifact\n with tempfile.TemporaryDirectory()\ + \ as tmpdir:\n path = Path(tmpdir) / \"benchmark_results.json\"\n\ + \ path.write_text(json.dumps(metrics, indent=2))\n client.log_artifact(run_id,\ + \ str(path))\n\n client.set_terminated(run_id, status=\"FINISHED\")\n\ + \ return run_id\n\n" + image: python:3.13-slim + exec-run-benchmark: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - run_benchmark + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef run_benchmark(\n prompts: list,\n llm_endpoint: str,\n\ + \ model_name: str,\n num_warmup: int = 2,\n num_iterations: int\ + \ = 3,\n) -> dict:\n \"\"\"\n Run all prompts through the LLM endpoint\ + \ and collect timing metrics.\n\n Returns aggregate metrics: p50/p95/mean\ + \ latency, tokens/sec, TTFT.\n \"\"\"\n import time\n import statistics\n\ + \ import httpx\n\n all_latencies: list[float] = []\n all_tps: list[float]\ + \ = []\n all_ttft: list[float] = []\n per_category: dict[str, list[float]]\ + \ = {}\n\n with httpx.Client(timeout=300.0) as client:\n # Warmup\n\ + \ for _ in range(num_warmup):\n try:\n \ + \ client.post(\n f\"{llm_endpoint}/v1/chat/completions\"\ + ,\n json={\n \"model\": model_name,\n\ + \ \"messages\": [{\"role\": \"user\", \"content\"\ + : \"Hi\"}],\n \"max_tokens\": 8,\n \ + \ \"temperature\": 0,\n },\n \ + \ )\n except Exception:\n pass\n\n # Benchmark\n\ + \ for iteration in range(num_iterations):\n for prompt\ + \ in prompts:\n category = prompt.get(\"category\", \"unknown\"\ + )\n payload = {\n \"model\": model_name,\n\ + \ \"messages\": prompt[\"messages\"],\n \ + \ \"max_tokens\": prompt.get(\"max_tokens\", 256),\n \ + \ \"temperature\": 0,\n \"stream\": True,\n \ + \ }\n\n try:\n t_start = time.perf_counter()\n\ + \ first_token_time = None\n\n with\ + \ client.stream(\n \"POST\",\n \ + \ f\"{llm_endpoint}/v1/chat/completions\",\n \ + \ json=payload,\n ) as resp:\n \ + \ resp.raise_for_status()\n completion_tokens =\ + \ 0\n for line in resp.iter_lines():\n \ + \ if not line.startswith(\"data: \"):\n \ + \ continue\n chunk = line[6:]\n\ + \ if chunk == \"[DONE]\":\n \ + \ break\n if first_token_time is\ + \ None:\n first_token_time = time.perf_counter()\n\ + \ completion_tokens += 1\n\n \ + \ t_end = time.perf_counter()\n latency = t_end -\ + \ t_start\n ttft = (\n (first_token_time\ + \ - t_start)\n if first_token_time\n \ + \ else latency\n )\n tps\ + \ = (\n completion_tokens / latency if latency >\ + \ 0 else 0\n )\n\n all_latencies.append(latency)\n\ + \ all_tps.append(tps)\n all_ttft.append(ttft)\n\ + \ per_category.setdefault(category, []).append(latency)\n\ + \n except Exception as exc:\n # Record\ + \ failure but keep going\n all_latencies.append(-1)\n\ + \ all_tps.append(0)\n all_ttft.append(-1)\n\ + \n # Compute aggregates\n valid_latencies = [l for l in all_latencies\ + \ if l > 0]\n valid_tps = [t for t in all_tps if t > 0]\n valid_ttft\ + \ = [t for t in all_ttft if t > 0]\n\n def safe_stat(values, func):\n\ + \ return func(values) if values else 0\n\n metrics = {\n \ + \ \"total_requests\": len(all_latencies),\n \"successful_requests\"\ + : len(valid_latencies),\n \"failed_requests\": len(all_latencies)\ + \ - len(valid_latencies),\n # Latency\n \"latency_mean_s\"\ + : safe_stat(valid_latencies, statistics.mean),\n \"latency_p50_s\"\ + : safe_stat(\n valid_latencies,\n lambda v: statistics.median(v),\n\ + \ ),\n \"latency_p95_s\": safe_stat(\n valid_latencies,\n\ + \ lambda v: sorted(v)[int(len(v) * 0.95)] if v else 0,\n \ + \ ),\n # Throughput\n \"tokens_per_second_mean\": safe_stat(valid_tps,\ + \ statistics.mean),\n \"tokens_per_second_p50\": safe_stat(\n \ + \ valid_tps, lambda v: statistics.median(v)\n ),\n \ + \ # Time to first token\n \"ttft_mean_s\": safe_stat(valid_ttft,\ + \ statistics.mean),\n \"ttft_p50_s\": safe_stat(valid_ttft, lambda\ + \ v: statistics.median(v)),\n \"ttft_p95_s\": safe_stat(\n \ + \ valid_ttft,\n lambda v: sorted(v)[int(len(v) * 0.95)]\ + \ if v else 0,\n ),\n }\n\n # Per-category latency\n for\ + \ cat, lats in per_category.items():\n valid = [l for l in lats if\ + \ l > 0]\n if valid:\n metrics[f\"latency_mean_{cat}_s\"\ + ] = statistics.mean(valid)\n\n return metrics\n\n" + image: python:3.13-slim +pipelineInfo: + description: Benchmark vLLM with different tuning configurations. Logs latency, + TPS, and TTFT to MLflow for A/B comparison. + name: vllm-tuning-evaluation +root: + dag: + tasks: + build-prompt-suite: + cachingOptions: + enableCache: true + componentRef: + name: comp-build-prompt-suite + taskInfo: + name: build-prompt-suite + create-tuning-run: + cachingOptions: + enableCache: true + componentRef: + name: comp-create-tuning-run + inputs: + parameters: + experiment_name: + runtimeValue: + constant: vllm-tuning + pipelinechannel--enable_chunked_prefill: + componentInputParameter: enable_chunked_prefill + pipelinechannel--enable_prefix_caching: + componentInputParameter: enable_prefix_caching + pipelinechannel--gpu_memory_utilization: + componentInputParameter: gpu_memory_utilization + pipelinechannel--llm_endpoint: + componentInputParameter: llm_endpoint + pipelinechannel--model_name: + componentInputParameter: model_name + pipelinechannel--ngram_prompt_lookup_max: + componentInputParameter: ngram_prompt_lookup_max + pipelinechannel--num_iterations: + componentInputParameter: num_iterations + pipelinechannel--num_speculative_tokens: + componentInputParameter: num_speculative_tokens + pipelinechannel--num_warmup: + componentInputParameter: num_warmup + pipelinechannel--run_label: + componentInputParameter: run_label + run_name: + runtimeValue: + constant: vllm-{{$.inputs.parameters['pipelinechannel--run_label']}} + tuning_params: + runtimeValue: + constant: + enable_chunked_prefill: '{{$.inputs.parameters[''pipelinechannel--enable_chunked_prefill'']}}' + enable_prefix_caching: '{{$.inputs.parameters[''pipelinechannel--enable_prefix_caching'']}}' + gpu_memory_utilization: '{{$.inputs.parameters[''pipelinechannel--gpu_memory_utilization'']}}' + llm_endpoint: '{{$.inputs.parameters[''pipelinechannel--llm_endpoint'']}}' + model_name: '{{$.inputs.parameters[''pipelinechannel--model_name'']}}' + ngram_prompt_lookup_max: '{{$.inputs.parameters[''pipelinechannel--ngram_prompt_lookup_max'']}}' + num_iterations: '{{$.inputs.parameters[''pipelinechannel--num_iterations'']}}' + num_speculative_tokens: '{{$.inputs.parameters[''pipelinechannel--num_speculative_tokens'']}}' + num_warmup: '{{$.inputs.parameters[''pipelinechannel--num_warmup'']}}' + taskInfo: + name: create-tuning-run + log-benchmark-results: + cachingOptions: + enableCache: true + componentRef: + name: comp-log-benchmark-results + dependentTasks: + - create-tuning-run + - run-benchmark + inputs: + parameters: + metrics: + taskOutputParameter: + outputParameterKey: Output + producerTask: run-benchmark + run_id: + taskOutputParameter: + outputParameterKey: run_id + producerTask: create-tuning-run + taskInfo: + name: log-benchmark-results + run-benchmark: + cachingOptions: {} + componentRef: + name: comp-run-benchmark + dependentTasks: + - build-prompt-suite + inputs: + parameters: + llm_endpoint: + componentInputParameter: llm_endpoint + model_name: + componentInputParameter: model_name + num_iterations: + componentInputParameter: num_iterations + num_warmup: + componentInputParameter: num_warmup + prompts: + taskOutputParameter: + outputParameterKey: Output + producerTask: build-prompt-suite + taskInfo: + name: run-benchmark + inputDefinitions: + parameters: + enable_chunked_prefill: + defaultValue: 'true' + description: '"true" or "false"' + isOptional: true + parameterType: STRING + enable_prefix_caching: + defaultValue: 'true' + description: '"true" or "false"' + isOptional: true + parameterType: STRING + gpu_memory_utilization: + defaultValue: '0.90' + description: 0.0 - 1.0 + isOptional: true + parameterType: STRING + llm_endpoint: + defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm + description: vLLM inference endpoint URL + isOptional: true + parameterType: STRING + model_name: + defaultValue: hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4 + description: HF model identifier + isOptional: true + parameterType: STRING + ngram_prompt_lookup_max: + defaultValue: '4' + description: ngram window for spec decode (0 = off) + isOptional: true + parameterType: STRING + num_iterations: + defaultValue: 3.0 + description: how many times to repeat the prompt suite + isOptional: true + parameterType: NUMBER_INTEGER + num_speculative_tokens: + defaultValue: '3' + description: number of speculative tokens (0 = off) + isOptional: true + parameterType: STRING + num_warmup: + defaultValue: 2.0 + description: warmup requests before timing + isOptional: true + parameterType: NUMBER_INTEGER + run_label: + defaultValue: baseline + description: human-readable label (e.g. "apc-on-spec3") + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.12.1 diff --git a/voice_pipeline.py b/voice_pipeline.py index f953fe9..a09747c 100644 --- a/voice_pipeline.py +++ b/voice_pipeline.py @@ -12,6 +12,11 @@ Usage: from kfp import dsl from kfp import compiler +from typing import NamedTuple + + +MLFLOW_IMAGE = "python:3.13-slim" +MLFLOW_PACKAGES = ["mlflow>=2.10.0", "boto3", "psycopg2-binary"] @dsl.component( @@ -21,13 +26,16 @@ from kfp import compiler def transcribe_audio( audio_b64: str, whisper_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/whisper" -) -> str: +) -> NamedTuple("STTResult", [("text", str), ("latency_s", float), ("audio_duration_s", float)]): """Transcribe audio using Whisper STT service.""" import base64 + import time import httpx + from collections import namedtuple audio_bytes = base64.b64decode(audio_b64) + start = time.perf_counter() with httpx.Client(timeout=120.0) as client: response = client.post( f"{whisper_url}/v1/audio/transcriptions", @@ -35,8 +43,14 @@ def transcribe_audio( data={"model": "whisper-large-v3", "language": "en"} ) result = response.json() + latency = time.perf_counter() - start - return result.get("text", "") + text = result.get("text", "") + # Estimate audio duration from WAV header (16-bit PCM, 16kHz) + audio_duration = max(len(audio_bytes) / (16000 * 2), 0.1) + + STTResult = namedtuple("STTResult", ["text", "latency_s", "audio_duration_s"]) + return STTResult(text, latency, audio_duration) @dsl.component( @@ -46,18 +60,23 @@ def transcribe_audio( def generate_embeddings( text: str, embeddings_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings" -) -> list: +) -> NamedTuple("EmbedResult", [("embedding", list), ("latency_s", float)]): """Generate embeddings for RAG retrieval.""" + import time import httpx + from collections import namedtuple + start = time.perf_counter() with httpx.Client(timeout=60.0) as client: response = client.post( f"{embeddings_url}/embeddings", json={"input": text, "model": "bge-small-en-v1.5"} ) result = response.json() + latency = time.perf_counter() - start - return result["data"][0]["embedding"] + EmbedResult = namedtuple("EmbedResult", ["embedding", "latency_s"]) + return EmbedResult(result["data"][0]["embedding"], latency) @dsl.component( @@ -69,14 +88,19 @@ def retrieve_context( milvus_host: str = "milvus.ai-ml.svc.cluster.local", collection_name: str = "knowledge_base", top_k: int = 5 -) -> list: +) -> NamedTuple("RetrieveResult", [("documents", list), ("latency_s", float)]): """Retrieve relevant documents from Milvus vector database.""" + import time from pymilvus import connections, Collection, utility + from collections import namedtuple + start = time.perf_counter() connections.connect(host=milvus_host, port=19530) if not utility.has_collection(collection_name): - return [] + latency = time.perf_counter() - start + RetrieveResult = namedtuple("RetrieveResult", ["documents", "latency_s"]) + return RetrieveResult([], latency) collection = Collection(collection_name) collection.load() @@ -88,6 +112,7 @@ def retrieve_context( limit=top_k, output_fields=["text", "source"] ) + latency = time.perf_counter() - start documents = [] for hits in results: @@ -98,7 +123,8 @@ def retrieve_context( "score": hit.distance }) - return documents + RetrieveResult = namedtuple("RetrieveResult", ["documents", "latency_s"]) + return RetrieveResult(documents, latency) @dsl.component( @@ -110,13 +136,17 @@ def rerank_documents( documents: list, reranker_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker", top_k: int = 3 -) -> list: +) -> NamedTuple("RerankResult", [("documents", list), ("latency_s", float)]): """Rerank documents using BGE reranker.""" + import time import httpx + from collections import namedtuple if not documents: - return [] + RerankResult = namedtuple("RerankResult", ["documents", "latency_s"]) + return RerankResult([], 0.0) + start = time.perf_counter() with httpx.Client(timeout=60.0) as client: response = client.post( f"{reranker_url}/v1/rerank", @@ -127,6 +157,7 @@ def rerank_documents( } ) result = response.json() + latency = time.perf_counter() - start # Sort by rerank score reranked = sorted( @@ -135,7 +166,8 @@ def rerank_documents( reverse=True )[:top_k] - return [doc for doc, score in reranked] + RerankResult = namedtuple("RerankResult", ["documents", "latency_s"]) + return RerankResult([doc for doc, score in reranked], latency) @dsl.component( @@ -147,9 +179,11 @@ def generate_response( context: list, vllm_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm", model: str = "hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4" -) -> str: +) -> NamedTuple("LLMResult", [("text", str), ("latency_s", float), ("completion_tokens", int)]): """Generate response using vLLM.""" + import time import httpx + from collections import namedtuple # Build context if context: @@ -167,6 +201,7 @@ Keep responses concise and natural for speech synthesis.""" {"role": "user", "content": user_content} ] + start = time.perf_counter() with httpx.Client(timeout=180.0) as client: response = client.post( f"{vllm_url}/v1/chat/completions", @@ -178,8 +213,14 @@ Keep responses concise and natural for speech synthesis.""" } ) result = response.json() + latency = time.perf_counter() - start - return result["choices"][0]["message"]["content"] + text = result["choices"][0]["message"]["content"] + usage = result.get("usage", {}) + completion_tokens = usage.get("completion_tokens", len(text.split())) + + LLMResult = namedtuple("LLMResult", ["text", "latency_s", "completion_tokens"]) + return LLMResult(text, latency, completion_tokens) @dsl.component( @@ -189,11 +230,14 @@ Keep responses concise and natural for speech synthesis.""" def synthesize_speech( text: str, tts_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts" -) -> str: +) -> NamedTuple("TTSResult", [("audio_b64", str), ("latency_s", float)]): """Convert text to speech using TTS service.""" import base64 + import time import httpx + from collections import namedtuple + start = time.perf_counter() with httpx.Client(timeout=120.0) as client: response = client.post( f"{tts_url}/v1/audio/speech", @@ -204,13 +248,86 @@ def synthesize_speech( } ) audio_b64 = base64.b64encode(response.content).decode("utf-8") + latency = time.perf_counter() - start - return audio_b64 + TTSResult = namedtuple("TTSResult", ["audio_b64", "latency_s"]) + return TTSResult(audio_b64, latency) + + +# ---- MLflow logging component ---- + + +@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) +def log_pipeline_metrics( + stt_latency: float, + stt_audio_duration: float, + embed_latency: float, + retrieve_latency: float, + rerank_latency: float, + llm_latency: float, + llm_completion_tokens: int, + tts_latency: float, + experiment_name: str = "voice-pipeline-metrics", + run_name: str = "voice-pipeline", + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +) -> str: + """Log per-step latency metrics to MLflow for the full voice pipeline.""" + import os + import mlflow + from mlflow.tracking import MlflowClient + + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + exp = client.get_experiment_by_name(experiment_name) + experiment_id = ( + exp.experiment_id + if exp + else client.create_experiment( + name=experiment_name, + artifact_location=f"/mlflow/artifacts/{experiment_name}", + ) + ) + + run = mlflow.start_run( + experiment_id=experiment_id, + run_name=run_name, + tags={ + "pipeline.type": "voice-assistant", + "kfp.run_id": os.environ.get("KFP_RUN_ID", "unknown"), + }, + ) + + total_latency = ( + stt_latency + embed_latency + retrieve_latency + + rerank_latency + llm_latency + tts_latency + ) + stt_rtf = stt_latency / stt_audio_duration if stt_audio_duration > 0 else 0 + llm_tps = llm_completion_tokens / llm_latency if llm_latency > 0 else 0 + + mlflow.log_metrics({ + "stt_latency_s": stt_latency, + "stt_audio_duration_s": stt_audio_duration, + "stt_realtime_factor": stt_rtf, + "embed_latency_s": embed_latency, + "retrieve_latency_s": retrieve_latency, + "rerank_latency_s": rerank_latency, + "llm_latency_s": llm_latency, + "llm_completion_tokens": llm_completion_tokens, + "llm_tokens_per_second": llm_tps, + "tts_latency_s": tts_latency, + "total_pipeline_latency_s": total_latency, + }) + mlflow.end_run() + return run.info.run_id + + +# ---- Pipelines ---- @dsl.pipeline( name="voice-assistant-rag-pipeline", - description="End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS" + description="End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS. Logs per-step latency to MLflow." ) def voice_assistant_pipeline( audio_b64: str, @@ -229,29 +346,41 @@ def voice_assistant_pipeline( transcribe_task.set_caching_options(enable_caching=False) # Step 2: Generate embeddings - embed_task = generate_embeddings(text=transcribe_task.output) + embed_task = generate_embeddings(text=transcribe_task.outputs["text"]) embed_task.set_caching_options(enable_caching=True) # Step 3: Retrieve context from Milvus retrieve_task = retrieve_context( - embedding=embed_task.output, + embedding=embed_task.outputs["embedding"], collection_name=collection_name ) # Step 4: Rerank documents rerank_task = rerank_documents( - query=transcribe_task.output, - documents=retrieve_task.output + query=transcribe_task.outputs["text"], + documents=retrieve_task.outputs["documents"] ) # Step 5: Generate response with context llm_task = generate_response( - query=transcribe_task.output, - context=rerank_task.output + query=transcribe_task.outputs["text"], + context=rerank_task.outputs["documents"] ) # Step 6: Synthesize speech - tts_task = synthesize_speech(text=llm_task.output) + tts_task = synthesize_speech(text=llm_task.outputs["text"]) + + # Step 7: Log all per-step latencies to MLflow + log_task = log_pipeline_metrics( + stt_latency=transcribe_task.outputs["latency_s"], + stt_audio_duration=transcribe_task.outputs["audio_duration_s"], + embed_latency=embed_task.outputs["latency_s"], + retrieve_latency=retrieve_task.outputs["latency_s"], + rerank_latency=rerank_task.outputs["latency_s"], + llm_latency=llm_task.outputs["latency_s"], + llm_completion_tokens=llm_task.outputs["completion_tokens"], + tts_latency=tts_task.outputs["latency_s"], + ) @dsl.pipeline( @@ -265,7 +394,7 @@ def text_to_speech_pipeline(text: str): @dsl.pipeline( name="rag-query-pipeline", - description="RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM" + description="RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM. Logs per-step latency to MLflow." ) def rag_query_pipeline( query: str, @@ -283,20 +412,20 @@ def rag_query_pipeline( # Retrieve from Milvus retrieve_task = retrieve_context( - embedding=embed_task.output, + embedding=embed_task.outputs["embedding"], collection_name=collection_name ) # Rerank rerank_task = rerank_documents( query=query, - documents=retrieve_task.output + documents=retrieve_task.outputs["documents"] ) # Generate response llm_task = generate_response( query=query, - context=rerank_task.output + context=rerank_task.outputs["documents"] ) diff --git a/voice_pipeline.yaml b/voice_pipeline.yaml new file mode 100644 index 0000000..fb31f4d --- /dev/null +++ b/voice_pipeline.yaml @@ -0,0 +1,656 @@ +# PIPELINE DEFINITION +# Name: voice-assistant-rag-pipeline +# Description: End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS. Logs per-step latency to MLflow. +# Inputs: +# audio_b64: str +# collection_name: str [Default: 'knowledge_base'] +components: + comp-generate-embeddings: + executorLabel: exec-generate-embeddings + inputDefinitions: + parameters: + embeddings_url: + defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings + isOptional: true + parameterType: STRING + text: + parameterType: STRING + outputDefinitions: + parameters: + embedding: + parameterType: LIST + latency_s: + parameterType: NUMBER_DOUBLE + comp-generate-response: + executorLabel: exec-generate-response + inputDefinitions: + parameters: + context: + parameterType: LIST + model: + defaultValue: hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4 + isOptional: true + parameterType: STRING + query: + parameterType: STRING + vllm_url: + defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + completion_tokens: + parameterType: NUMBER_INTEGER + latency_s: + parameterType: NUMBER_DOUBLE + text: + parameterType: STRING + comp-log-pipeline-metrics: + executorLabel: exec-log-pipeline-metrics + inputDefinitions: + parameters: + embed_latency: + parameterType: NUMBER_DOUBLE + experiment_name: + defaultValue: voice-pipeline-metrics + isOptional: true + parameterType: STRING + llm_completion_tokens: + parameterType: NUMBER_INTEGER + llm_latency: + parameterType: NUMBER_DOUBLE + mlflow_tracking_uri: + defaultValue: http://mlflow.mlflow.svc.cluster.local:80 + isOptional: true + parameterType: STRING + rerank_latency: + parameterType: NUMBER_DOUBLE + retrieve_latency: + parameterType: NUMBER_DOUBLE + run_name: + defaultValue: voice-pipeline + isOptional: true + parameterType: STRING + stt_audio_duration: + parameterType: NUMBER_DOUBLE + stt_latency: + parameterType: NUMBER_DOUBLE + tts_latency: + parameterType: NUMBER_DOUBLE + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-rerank-documents: + executorLabel: exec-rerank-documents + inputDefinitions: + parameters: + documents: + parameterType: LIST + query: + parameterType: STRING + reranker_url: + defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker + isOptional: true + parameterType: STRING + top_k: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + documents: + parameterType: LIST + latency_s: + parameterType: NUMBER_DOUBLE + comp-retrieve-context: + executorLabel: exec-retrieve-context + inputDefinitions: + parameters: + collection_name: + defaultValue: knowledge_base + isOptional: true + parameterType: STRING + embedding: + parameterType: LIST + milvus_host: + defaultValue: milvus.ai-ml.svc.cluster.local + isOptional: true + parameterType: STRING + top_k: + defaultValue: 5.0 + isOptional: true + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + documents: + parameterType: LIST + latency_s: + parameterType: NUMBER_DOUBLE + comp-synthesize-speech: + executorLabel: exec-synthesize-speech + inputDefinitions: + parameters: + text: + parameterType: STRING + tts_url: + defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + audio_b64: + parameterType: STRING + latency_s: + parameterType: NUMBER_DOUBLE + comp-transcribe-audio: + executorLabel: exec-transcribe-audio + inputDefinitions: + parameters: + audio_b64: + parameterType: STRING + whisper_url: + defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/whisper + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + audio_duration_s: + parameterType: NUMBER_DOUBLE + latency_s: + parameterType: NUMBER_DOUBLE + text: + parameterType: STRING +deploymentSpec: + executors: + exec-generate-embeddings: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - generate_embeddings + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef generate_embeddings(\n text: str,\n embeddings_url: str\ + \ = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings\"\ + \n) -> NamedTuple(\"EmbedResult\", [(\"embedding\", list), (\"latency_s\"\ + , float)]):\n \"\"\"Generate embeddings for RAG retrieval.\"\"\"\n \ + \ import time\n import httpx\n from collections import namedtuple\n\ + \n start = time.perf_counter()\n with httpx.Client(timeout=60.0) as\ + \ client:\n response = client.post(\n f\"{embeddings_url}/embeddings\"\ + ,\n json={\"input\": text, \"model\": \"bge-small-en-v1.5\"}\n\ + \ )\n result = response.json()\n latency = time.perf_counter()\ + \ - start\n\n EmbedResult = namedtuple(\"EmbedResult\", [\"embedding\"\ + , \"latency_s\"])\n return EmbedResult(result[\"data\"][0][\"embedding\"\ + ], latency)\n\n" + image: python:3.13-slim + exec-generate-response: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - generate_response + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef generate_response(\n query: str,\n context: list,\n \ + \ vllm_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm\"\ + ,\n model: str = \"hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4\"\ + \n) -> NamedTuple(\"LLMResult\", [(\"text\", str), (\"latency_s\", float),\ + \ (\"completion_tokens\", int)]):\n \"\"\"Generate response using vLLM.\"\ + \"\"\n import time\n import httpx\n from collections import namedtuple\n\ + \n # Build context\n if context:\n context_text = \"\\n\\n\"\ + .join([doc[\"text\"] for doc in context])\n user_content = f\"Context:\\\ + n{context_text}\\n\\nQuestion: {query}\"\n else:\n user_content\ + \ = query\n\n system_prompt = \"\"\"You are a helpful voice assistant.\n\ + Answer questions based on the provided context when available.\nKeep responses\ + \ concise and natural for speech synthesis.\"\"\"\n\n messages = [\n\ + \ {\"role\": \"system\", \"content\": system_prompt},\n {\"\ + role\": \"user\", \"content\": user_content}\n ]\n\n start = time.perf_counter()\n\ + \ with httpx.Client(timeout=180.0) as client:\n response = client.post(\n\ + \ f\"{vllm_url}/v1/chat/completions\",\n json={\n\ + \ \"model\": model,\n \"messages\": messages,\n\ + \ \"max_tokens\": 512,\n \"temperature\":\ + \ 0.7\n }\n )\n result = response.json()\n latency\ + \ = time.perf_counter() - start\n\n text = result[\"choices\"][0][\"\ + message\"][\"content\"]\n usage = result.get(\"usage\", {})\n completion_tokens\ + \ = usage.get(\"completion_tokens\", len(text.split()))\n\n LLMResult\ + \ = namedtuple(\"LLMResult\", [\"text\", \"latency_s\", \"completion_tokens\"\ + ])\n return LLMResult(text, latency, completion_tokens)\n\n" + image: python:3.13-slim + exec-log-pipeline-metrics: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - log_pipeline_metrics + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'mlflow>=2.10.0'\ + \ 'boto3' 'psycopg2-binary' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef log_pipeline_metrics(\n stt_latency: float,\n stt_audio_duration:\ + \ float,\n embed_latency: float,\n retrieve_latency: float,\n rerank_latency:\ + \ float,\n llm_latency: float,\n llm_completion_tokens: int,\n \ + \ tts_latency: float,\n experiment_name: str = \"voice-pipeline-metrics\"\ + ,\n run_name: str = \"voice-pipeline\",\n mlflow_tracking_uri: str\ + \ = \"http://mlflow.mlflow.svc.cluster.local:80\",\n) -> str:\n \"\"\"\ + Log per-step latency metrics to MLflow for the full voice pipeline.\"\"\"\ + \n import os\n import mlflow\n from mlflow.tracking import MlflowClient\n\ + \n mlflow.set_tracking_uri(mlflow_tracking_uri)\n client = MlflowClient()\n\ + \n exp = client.get_experiment_by_name(experiment_name)\n experiment_id\ + \ = (\n exp.experiment_id\n if exp\n else client.create_experiment(\n\ + \ name=experiment_name,\n artifact_location=f\"/mlflow/artifacts/{experiment_name}\"\ + ,\n )\n )\n\n run = mlflow.start_run(\n experiment_id=experiment_id,\n\ + \ run_name=run_name,\n tags={\n \"pipeline.type\"\ + : \"voice-assistant\",\n \"kfp.run_id\": os.environ.get(\"KFP_RUN_ID\"\ + , \"unknown\"),\n },\n )\n\n total_latency = (\n stt_latency\ + \ + embed_latency + retrieve_latency\n + rerank_latency + llm_latency\ + \ + tts_latency\n )\n stt_rtf = stt_latency / stt_audio_duration if\ + \ stt_audio_duration > 0 else 0\n llm_tps = llm_completion_tokens / llm_latency\ + \ if llm_latency > 0 else 0\n\n mlflow.log_metrics({\n \"stt_latency_s\"\ + : stt_latency,\n \"stt_audio_duration_s\": stt_audio_duration,\n\ + \ \"stt_realtime_factor\": stt_rtf,\n \"embed_latency_s\"\ + : embed_latency,\n \"retrieve_latency_s\": retrieve_latency,\n \ + \ \"rerank_latency_s\": rerank_latency,\n \"llm_latency_s\"\ + : llm_latency,\n \"llm_completion_tokens\": llm_completion_tokens,\n\ + \ \"llm_tokens_per_second\": llm_tps,\n \"tts_latency_s\"\ + : tts_latency,\n \"total_pipeline_latency_s\": total_latency,\n \ + \ })\n mlflow.end_run()\n return run.info.run_id\n\n" + image: python:3.13-slim + exec-rerank-documents: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - rerank_documents + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef rerank_documents(\n query: str,\n documents: list,\n \ + \ reranker_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker\"\ + ,\n top_k: int = 3\n) -> NamedTuple(\"RerankResult\", [(\"documents\"\ + , list), (\"latency_s\", float)]):\n \"\"\"Rerank documents using BGE\ + \ reranker.\"\"\"\n import time\n import httpx\n from collections\ + \ import namedtuple\n\n if not documents:\n RerankResult = namedtuple(\"\ + RerankResult\", [\"documents\", \"latency_s\"])\n return RerankResult([],\ + \ 0.0)\n\n start = time.perf_counter()\n with httpx.Client(timeout=60.0)\ + \ as client:\n response = client.post(\n f\"{reranker_url}/v1/rerank\"\ + ,\n json={\n \"query\": query,\n \ + \ \"documents\": [doc[\"text\"] for doc in documents],\n \ + \ \"model\": \"bge-reranker-v2-m3\"\n }\n )\n \ + \ result = response.json()\n latency = time.perf_counter() - start\n\ + \n # Sort by rerank score\n reranked = sorted(\n zip(documents,\ + \ result.get(\"scores\", [0] * len(documents))),\n key=lambda x:\ + \ x[1],\n reverse=True\n )[:top_k]\n\n RerankResult = namedtuple(\"\ + RerankResult\", [\"documents\", \"latency_s\"])\n return RerankResult([doc\ + \ for doc, score in reranked], latency)\n\n" + image: python:3.13-slim + exec-retrieve-context: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - retrieve_context + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pymilvus' &&\ + \ \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef retrieve_context(\n embedding: list,\n milvus_host: str\ + \ = \"milvus.ai-ml.svc.cluster.local\",\n collection_name: str = \"knowledge_base\"\ + ,\n top_k: int = 5\n) -> NamedTuple(\"RetrieveResult\", [(\"documents\"\ + , list), (\"latency_s\", float)]):\n \"\"\"Retrieve relevant documents\ + \ from Milvus vector database.\"\"\"\n import time\n from pymilvus\ + \ import connections, Collection, utility\n from collections import namedtuple\n\ + \n start = time.perf_counter()\n connections.connect(host=milvus_host,\ + \ port=19530)\n\n if not utility.has_collection(collection_name):\n \ + \ latency = time.perf_counter() - start\n RetrieveResult =\ + \ namedtuple(\"RetrieveResult\", [\"documents\", \"latency_s\"])\n \ + \ return RetrieveResult([], latency)\n\n collection = Collection(collection_name)\n\ + \ collection.load()\n\n results = collection.search(\n data=[embedding],\n\ + \ anns_field=\"embedding\",\n param={\"metric_type\": \"COSINE\"\ + , \"params\": {\"nprobe\": 10}},\n limit=top_k,\n output_fields=[\"\ + text\", \"source\"]\n )\n latency = time.perf_counter() - start\n\n\ + \ documents = []\n for hits in results:\n for hit in hits:\n\ + \ documents.append({\n \"text\": hit.entity.get(\"\ + text\"),\n \"source\": hit.entity.get(\"source\"),\n \ + \ \"score\": hit.distance\n })\n\n RetrieveResult\ + \ = namedtuple(\"RetrieveResult\", [\"documents\", \"latency_s\"])\n \ + \ return RetrieveResult(documents, latency)\n\n" + image: python:3.13-slim + exec-synthesize-speech: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - synthesize_speech + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef synthesize_speech(\n text: str,\n tts_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts\"\ + \n) -> NamedTuple(\"TTSResult\", [(\"audio_b64\", str), (\"latency_s\",\ + \ float)]):\n \"\"\"Convert text to speech using TTS service.\"\"\"\n\ + \ import base64\n import time\n import httpx\n from collections\ + \ import namedtuple\n\n start = time.perf_counter()\n with httpx.Client(timeout=120.0)\ + \ as client:\n response = client.post(\n f\"{tts_url}/v1/audio/speech\"\ + ,\n json={\n \"input\": text,\n \ + \ \"voice\": \"en_US-lessac-high\",\n \"response_format\"\ + : \"wav\"\n }\n )\n audio_b64 = base64.b64encode(response.content).decode(\"\ + utf-8\")\n latency = time.perf_counter() - start\n\n TTSResult = namedtuple(\"\ + TTSResult\", [\"audio_b64\", \"latency_s\"])\n return TTSResult(audio_b64,\ + \ latency)\n\n" + image: python:3.13-slim + exec-transcribe-audio: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - transcribe_audio + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef transcribe_audio(\n audio_b64: str,\n whisper_url: str\ + \ = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/whisper\"\ + \n) -> NamedTuple(\"STTResult\", [(\"text\", str), (\"latency_s\", float),\ + \ (\"audio_duration_s\", float)]):\n \"\"\"Transcribe audio using Whisper\ + \ STT service.\"\"\"\n import base64\n import time\n import httpx\n\ + \ from collections import namedtuple\n\n audio_bytes = base64.b64decode(audio_b64)\n\ + \n start = time.perf_counter()\n with httpx.Client(timeout=120.0)\ + \ as client:\n response = client.post(\n f\"{whisper_url}/v1/audio/transcriptions\"\ + ,\n files={\"file\": (\"audio.wav\", audio_bytes, \"audio/wav\"\ + )},\n data={\"model\": \"whisper-large-v3\", \"language\": \"\ + en\"}\n )\n result = response.json()\n latency = time.perf_counter()\ + \ - start\n\n text = result.get(\"text\", \"\")\n # Estimate audio\ + \ duration from WAV header (16-bit PCM, 16kHz)\n audio_duration = max(len(audio_bytes)\ + \ / (16000 * 2), 0.1)\n\n STTResult = namedtuple(\"STTResult\", [\"text\"\ + , \"latency_s\", \"audio_duration_s\"])\n return STTResult(text, latency,\ + \ audio_duration)\n\n" + image: python:3.13-slim +pipelineInfo: + description: 'End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> + Rerank -> LLM -> TTS. Logs per-step latency to MLflow.' + name: voice-assistant-rag-pipeline +root: + dag: + tasks: + generate-embeddings: + cachingOptions: + enableCache: true + componentRef: + name: comp-generate-embeddings + dependentTasks: + - transcribe-audio + inputs: + parameters: + text: + taskOutputParameter: + outputParameterKey: text + producerTask: transcribe-audio + taskInfo: + name: generate-embeddings + generate-response: + cachingOptions: + enableCache: true + componentRef: + name: comp-generate-response + dependentTasks: + - rerank-documents + - transcribe-audio + inputs: + parameters: + context: + taskOutputParameter: + outputParameterKey: documents + producerTask: rerank-documents + query: + taskOutputParameter: + outputParameterKey: text + producerTask: transcribe-audio + taskInfo: + name: generate-response + log-pipeline-metrics: + cachingOptions: + enableCache: true + componentRef: + name: comp-log-pipeline-metrics + dependentTasks: + - generate-embeddings + - generate-response + - rerank-documents + - retrieve-context + - synthesize-speech + - transcribe-audio + inputs: + parameters: + embed_latency: + taskOutputParameter: + outputParameterKey: latency_s + producerTask: generate-embeddings + llm_completion_tokens: + taskOutputParameter: + outputParameterKey: completion_tokens + producerTask: generate-response + llm_latency: + taskOutputParameter: + outputParameterKey: latency_s + producerTask: generate-response + rerank_latency: + taskOutputParameter: + outputParameterKey: latency_s + producerTask: rerank-documents + retrieve_latency: + taskOutputParameter: + outputParameterKey: latency_s + producerTask: retrieve-context + stt_audio_duration: + taskOutputParameter: + outputParameterKey: audio_duration_s + producerTask: transcribe-audio + stt_latency: + taskOutputParameter: + outputParameterKey: latency_s + producerTask: transcribe-audio + tts_latency: + taskOutputParameter: + outputParameterKey: latency_s + producerTask: synthesize-speech + taskInfo: + name: log-pipeline-metrics + rerank-documents: + cachingOptions: + enableCache: true + componentRef: + name: comp-rerank-documents + dependentTasks: + - retrieve-context + - transcribe-audio + inputs: + parameters: + documents: + taskOutputParameter: + outputParameterKey: documents + producerTask: retrieve-context + query: + taskOutputParameter: + outputParameterKey: text + producerTask: transcribe-audio + taskInfo: + name: rerank-documents + retrieve-context: + cachingOptions: + enableCache: true + componentRef: + name: comp-retrieve-context + dependentTasks: + - generate-embeddings + inputs: + parameters: + collection_name: + componentInputParameter: collection_name + embedding: + taskOutputParameter: + outputParameterKey: embedding + producerTask: generate-embeddings + taskInfo: + name: retrieve-context + synthesize-speech: + cachingOptions: + enableCache: true + componentRef: + name: comp-synthesize-speech + dependentTasks: + - generate-response + inputs: + parameters: + text: + taskOutputParameter: + outputParameterKey: text + producerTask: generate-response + taskInfo: + name: synthesize-speech + transcribe-audio: + cachingOptions: {} + componentRef: + name: comp-transcribe-audio + inputs: + parameters: + audio_b64: + componentInputParameter: audio_b64 + taskInfo: + name: transcribe-audio + inputDefinitions: + parameters: + audio_b64: + description: Base64-encoded audio file + parameterType: STRING + collection_name: + defaultValue: knowledge_base + description: Milvus collection for RAG + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.12.1