feat: add vLLM tuning pipeline + recompile voice pipelines with MLflow

New:
- vllm_tuning_pipeline.py: A/B benchmark different vLLM configs,
  logs latency/TPS/TTFT to MLflow (vllm-tuning experiment)
- vllm_tuning_pipeline.yaml: compiled KFP YAML

Updated:
- voice_pipeline.py: per-step NamedTuple outputs with latency tracking,
  new log_pipeline_metrics MLflow component
- voice_pipeline.yaml, tts_pipeline.yaml, rag_pipeline.yaml: recompiled
This commit is contained in:
2026-02-13 08:24:11 -05:00
parent cee21f124c
commit bc4b230dd9
6 changed files with 2216 additions and 26 deletions

363
rag_pipeline.yaml Normal file
View File

@@ -0,0 +1,363 @@
# PIPELINE DEFINITION
# Name: rag-query-pipeline
# Description: RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM. Logs per-step latency to MLflow.
# Inputs:
# collection_name: str [Default: 'knowledge_base']
# query: str
components:
comp-generate-embeddings:
executorLabel: exec-generate-embeddings
inputDefinitions:
parameters:
embeddings_url:
defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings
isOptional: true
parameterType: STRING
text:
parameterType: STRING
outputDefinitions:
parameters:
embedding:
parameterType: LIST
latency_s:
parameterType: NUMBER_DOUBLE
comp-generate-response:
executorLabel: exec-generate-response
inputDefinitions:
parameters:
context:
parameterType: LIST
model:
defaultValue: hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4
isOptional: true
parameterType: STRING
query:
parameterType: STRING
vllm_url:
defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm
isOptional: true
parameterType: STRING
outputDefinitions:
parameters:
completion_tokens:
parameterType: NUMBER_INTEGER
latency_s:
parameterType: NUMBER_DOUBLE
text:
parameterType: STRING
comp-rerank-documents:
executorLabel: exec-rerank-documents
inputDefinitions:
parameters:
documents:
parameterType: LIST
query:
parameterType: STRING
reranker_url:
defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker
isOptional: true
parameterType: STRING
top_k:
defaultValue: 3.0
isOptional: true
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
documents:
parameterType: LIST
latency_s:
parameterType: NUMBER_DOUBLE
comp-retrieve-context:
executorLabel: exec-retrieve-context
inputDefinitions:
parameters:
collection_name:
defaultValue: knowledge_base
isOptional: true
parameterType: STRING
embedding:
parameterType: LIST
milvus_host:
defaultValue: milvus.ai-ml.svc.cluster.local
isOptional: true
parameterType: STRING
top_k:
defaultValue: 5.0
isOptional: true
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
documents:
parameterType: LIST
latency_s:
parameterType: NUMBER_DOUBLE
deploymentSpec:
executors:
exec-generate-embeddings:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- generate_embeddings
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef generate_embeddings(\n text: str,\n embeddings_url: str\
\ = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings\"\
\n) -> NamedTuple(\"EmbedResult\", [(\"embedding\", list), (\"latency_s\"\
, float)]):\n \"\"\"Generate embeddings for RAG retrieval.\"\"\"\n \
\ import time\n import httpx\n from collections import namedtuple\n\
\n start = time.perf_counter()\n with httpx.Client(timeout=60.0) as\
\ client:\n response = client.post(\n f\"{embeddings_url}/embeddings\"\
,\n json={\"input\": text, \"model\": \"bge-small-en-v1.5\"}\n\
\ )\n result = response.json()\n latency = time.perf_counter()\
\ - start\n\n EmbedResult = namedtuple(\"EmbedResult\", [\"embedding\"\
, \"latency_s\"])\n return EmbedResult(result[\"data\"][0][\"embedding\"\
], latency)\n\n"
image: python:3.13-slim
exec-generate-response:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- generate_response
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef generate_response(\n query: str,\n context: list,\n \
\ vllm_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm\"\
,\n model: str = \"hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4\"\
\n) -> NamedTuple(\"LLMResult\", [(\"text\", str), (\"latency_s\", float),\
\ (\"completion_tokens\", int)]):\n \"\"\"Generate response using vLLM.\"\
\"\"\n import time\n import httpx\n from collections import namedtuple\n\
\n # Build context\n if context:\n context_text = \"\\n\\n\"\
.join([doc[\"text\"] for doc in context])\n user_content = f\"Context:\\\
n{context_text}\\n\\nQuestion: {query}\"\n else:\n user_content\
\ = query\n\n system_prompt = \"\"\"You are a helpful voice assistant.\n\
Answer questions based on the provided context when available.\nKeep responses\
\ concise and natural for speech synthesis.\"\"\"\n\n messages = [\n\
\ {\"role\": \"system\", \"content\": system_prompt},\n {\"\
role\": \"user\", \"content\": user_content}\n ]\n\n start = time.perf_counter()\n\
\ with httpx.Client(timeout=180.0) as client:\n response = client.post(\n\
\ f\"{vllm_url}/v1/chat/completions\",\n json={\n\
\ \"model\": model,\n \"messages\": messages,\n\
\ \"max_tokens\": 512,\n \"temperature\":\
\ 0.7\n }\n )\n result = response.json()\n latency\
\ = time.perf_counter() - start\n\n text = result[\"choices\"][0][\"\
message\"][\"content\"]\n usage = result.get(\"usage\", {})\n completion_tokens\
\ = usage.get(\"completion_tokens\", len(text.split()))\n\n LLMResult\
\ = namedtuple(\"LLMResult\", [\"text\", \"latency_s\", \"completion_tokens\"\
])\n return LLMResult(text, latency, completion_tokens)\n\n"
image: python:3.13-slim
exec-rerank-documents:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- rerank_documents
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef rerank_documents(\n query: str,\n documents: list,\n \
\ reranker_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker\"\
,\n top_k: int = 3\n) -> NamedTuple(\"RerankResult\", [(\"documents\"\
, list), (\"latency_s\", float)]):\n \"\"\"Rerank documents using BGE\
\ reranker.\"\"\"\n import time\n import httpx\n from collections\
\ import namedtuple\n\n if not documents:\n RerankResult = namedtuple(\"\
RerankResult\", [\"documents\", \"latency_s\"])\n return RerankResult([],\
\ 0.0)\n\n start = time.perf_counter()\n with httpx.Client(timeout=60.0)\
\ as client:\n response = client.post(\n f\"{reranker_url}/v1/rerank\"\
,\n json={\n \"query\": query,\n \
\ \"documents\": [doc[\"text\"] for doc in documents],\n \
\ \"model\": \"bge-reranker-v2-m3\"\n }\n )\n \
\ result = response.json()\n latency = time.perf_counter() - start\n\
\n # Sort by rerank score\n reranked = sorted(\n zip(documents,\
\ result.get(\"scores\", [0] * len(documents))),\n key=lambda x:\
\ x[1],\n reverse=True\n )[:top_k]\n\n RerankResult = namedtuple(\"\
RerankResult\", [\"documents\", \"latency_s\"])\n return RerankResult([doc\
\ for doc, score in reranked], latency)\n\n"
image: python:3.13-slim
exec-retrieve-context:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- retrieve_context
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'pymilvus' &&\
\ \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef retrieve_context(\n embedding: list,\n milvus_host: str\
\ = \"milvus.ai-ml.svc.cluster.local\",\n collection_name: str = \"knowledge_base\"\
,\n top_k: int = 5\n) -> NamedTuple(\"RetrieveResult\", [(\"documents\"\
, list), (\"latency_s\", float)]):\n \"\"\"Retrieve relevant documents\
\ from Milvus vector database.\"\"\"\n import time\n from pymilvus\
\ import connections, Collection, utility\n from collections import namedtuple\n\
\n start = time.perf_counter()\n connections.connect(host=milvus_host,\
\ port=19530)\n\n if not utility.has_collection(collection_name):\n \
\ latency = time.perf_counter() - start\n RetrieveResult =\
\ namedtuple(\"RetrieveResult\", [\"documents\", \"latency_s\"])\n \
\ return RetrieveResult([], latency)\n\n collection = Collection(collection_name)\n\
\ collection.load()\n\n results = collection.search(\n data=[embedding],\n\
\ anns_field=\"embedding\",\n param={\"metric_type\": \"COSINE\"\
, \"params\": {\"nprobe\": 10}},\n limit=top_k,\n output_fields=[\"\
text\", \"source\"]\n )\n latency = time.perf_counter() - start\n\n\
\ documents = []\n for hits in results:\n for hit in hits:\n\
\ documents.append({\n \"text\": hit.entity.get(\"\
text\"),\n \"source\": hit.entity.get(\"source\"),\n \
\ \"score\": hit.distance\n })\n\n RetrieveResult\
\ = namedtuple(\"RetrieveResult\", [\"documents\", \"latency_s\"])\n \
\ return RetrieveResult(documents, latency)\n\n"
image: python:3.13-slim
pipelineInfo:
description: 'RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM. Logs per-step
latency to MLflow.'
name: rag-query-pipeline
root:
dag:
tasks:
generate-embeddings:
cachingOptions:
enableCache: true
componentRef:
name: comp-generate-embeddings
inputs:
parameters:
text:
componentInputParameter: query
taskInfo:
name: generate-embeddings
generate-response:
cachingOptions:
enableCache: true
componentRef:
name: comp-generate-response
dependentTasks:
- rerank-documents
inputs:
parameters:
context:
taskOutputParameter:
outputParameterKey: documents
producerTask: rerank-documents
query:
componentInputParameter: query
taskInfo:
name: generate-response
rerank-documents:
cachingOptions:
enableCache: true
componentRef:
name: comp-rerank-documents
dependentTasks:
- retrieve-context
inputs:
parameters:
documents:
taskOutputParameter:
outputParameterKey: documents
producerTask: retrieve-context
query:
componentInputParameter: query
taskInfo:
name: rerank-documents
retrieve-context:
cachingOptions:
enableCache: true
componentRef:
name: comp-retrieve-context
dependentTasks:
- generate-embeddings
inputs:
parameters:
collection_name:
componentInputParameter: collection_name
embedding:
taskOutputParameter:
outputParameterKey: embedding
producerTask: generate-embeddings
taskInfo:
name: retrieve-context
inputDefinitions:
parameters:
collection_name:
defaultValue: knowledge_base
description: Milvus collection name
isOptional: true
parameterType: STRING
query:
description: Text query
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.12.1

87
tts_pipeline.yaml Normal file
View File

@@ -0,0 +1,87 @@
# PIPELINE DEFINITION
# Name: text-to-speech-pipeline
# Description: Simple text to speech pipeline
# Inputs:
# text: str
components:
comp-synthesize-speech:
executorLabel: exec-synthesize-speech
inputDefinitions:
parameters:
text:
parameterType: STRING
tts_url:
defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts
isOptional: true
parameterType: STRING
outputDefinitions:
parameters:
audio_b64:
parameterType: STRING
latency_s:
parameterType: NUMBER_DOUBLE
deploymentSpec:
executors:
exec-synthesize-speech:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- synthesize_speech
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef synthesize_speech(\n text: str,\n tts_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts\"\
\n) -> NamedTuple(\"TTSResult\", [(\"audio_b64\", str), (\"latency_s\",\
\ float)]):\n \"\"\"Convert text to speech using TTS service.\"\"\"\n\
\ import base64\n import time\n import httpx\n from collections\
\ import namedtuple\n\n start = time.perf_counter()\n with httpx.Client(timeout=120.0)\
\ as client:\n response = client.post(\n f\"{tts_url}/v1/audio/speech\"\
,\n json={\n \"input\": text,\n \
\ \"voice\": \"en_US-lessac-high\",\n \"response_format\"\
: \"wav\"\n }\n )\n audio_b64 = base64.b64encode(response.content).decode(\"\
utf-8\")\n latency = time.perf_counter() - start\n\n TTSResult = namedtuple(\"\
TTSResult\", [\"audio_b64\", \"latency_s\"])\n return TTSResult(audio_b64,\
\ latency)\n\n"
image: python:3.13-slim
pipelineInfo:
description: Simple text to speech pipeline
name: text-to-speech-pipeline
root:
dag:
tasks:
synthesize-speech:
cachingOptions:
enableCache: true
componentRef:
name: comp-synthesize-speech
inputs:
parameters:
text:
componentInputParameter: text
taskInfo:
name: synthesize-speech
inputDefinitions:
parameters:
text:
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.12.1

454
vllm_tuning_pipeline.py Normal file
View File

@@ -0,0 +1,454 @@
#!/usr/bin/env python3
"""
vLLM Tuning Evaluation Pipeline - Kubeflow Pipelines SDK
Runs inference benchmarks with different vLLM configurations and logs
results to MLflow so you can compare APC, chunked prefill, speculative
decoding, and GPU memory utilisation settings side-by-side.
Usage:
pip install kfp==2.12.1
python vllm_tuning_pipeline.py
# Upload vllm_tuning_pipeline.yaml to Kubeflow Pipelines UI
"""
from kfp import dsl
from kfp import compiler
from typing import NamedTuple
MLFLOW_IMAGE = "python:3.13-slim"
MLFLOW_PACKAGES = ["mlflow>=2.10.0", "boto3", "psycopg2-binary"]
BENCH_PACKAGES = ["httpx"]
# ---- MLflow components ----
@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES)
def create_tuning_run(
experiment_name: str,
run_name: str,
tuning_params: dict,
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
) -> NamedTuple("RunInfo", [("run_id", str), ("experiment_id", str)]):
"""Create an MLflow run for a vLLM tuning experiment."""
import os
import mlflow
from mlflow.tracking import MlflowClient
from collections import namedtuple
mlflow.set_tracking_uri(mlflow_tracking_uri)
client = MlflowClient()
exp = client.get_experiment_by_name(experiment_name)
experiment_id = (
exp.experiment_id
if exp
else client.create_experiment(
name=experiment_name,
artifact_location=f"/mlflow/artifacts/{experiment_name}",
)
)
tags = {
"pipeline.type": "vllm-tuning",
"kfp.run_id": os.environ.get("KFP_RUN_ID", "unknown"),
}
run = mlflow.start_run(
experiment_id=experiment_id, run_name=run_name, tags=tags
)
# Log every tuning param
for key, value in tuning_params.items():
mlflow.log_param(f"vllm.{key}", value)
run_id = run.info.run_id
mlflow.end_run()
RunInfo = namedtuple("RunInfo", ["run_id", "experiment_id"])
return RunInfo(run_id, experiment_id)
@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES)
def log_benchmark_results(
run_id: str,
metrics: dict,
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
) -> str:
"""Log benchmark metrics to MLflow and close the run."""
import json
import tempfile
import mlflow
from mlflow.tracking import MlflowClient
from pathlib import Path
mlflow.set_tracking_uri(mlflow_tracking_uri)
client = MlflowClient()
for key, value in metrics.items():
client.log_metric(run_id, key, float(value))
# Save full results as artifact
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "benchmark_results.json"
path.write_text(json.dumps(metrics, indent=2))
client.log_artifact(run_id, str(path))
client.set_terminated(run_id, status="FINISHED")
return run_id
# ---- Benchmark components ----
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=BENCH_PACKAGES,
)
def build_prompt_suite() -> list:
"""Return a list of test prompts spanning short, medium, and long inputs."""
return [
{
"id": "short-1",
"category": "short",
"messages": [
{"role": "user", "content": "What is the capital of France?"}
],
"max_tokens": 64,
},
{
"id": "short-2",
"category": "short",
"messages": [
{"role": "user", "content": "Explain quantum computing in one sentence."}
],
"max_tokens": 64,
},
{
"id": "medium-1",
"category": "medium",
"messages": [
{
"role": "system",
"content": "You are a helpful AI assistant running on a homelab.",
},
{
"role": "user",
"content": (
"Compare and contrast supervised and unsupervised "
"machine learning. Give examples of each and explain "
"when you would choose one over the other."
),
},
],
"max_tokens": 512,
},
{
"id": "medium-2",
"category": "medium",
"messages": [
{
"role": "user",
"content": (
"Write a Python function that implements a binary search "
"tree with insert, search, and delete operations. Include "
"docstrings and type hints."
),
},
],
"max_tokens": 1024,
},
{
"id": "long-1",
"category": "long",
"messages": [
{
"role": "system",
"content": "You are a technical writer for a Kubernetes homelab blog.",
},
{
"role": "user",
"content": (
"Write a detailed tutorial on setting up a multi-node "
"Kubernetes cluster with Talos Linux, covering: "
"1) Hardware requirements and network topology, "
"2) Talos machine config generation, "
"3) Control plane bootstrapping, "
"4) Worker node joining, "
"5) CNI setup with Cilium, "
"6) Storage with Rook-Ceph, "
"7) GitOps with Flux CD. "
"Include YAML examples for each step."
),
},
],
"max_tokens": 2048,
},
{
"id": "repeat-prefix-1",
"category": "prefix-cache-test",
"messages": [
{
"role": "system",
"content": "You are a helpful AI assistant running on a homelab.",
},
{
"role": "user",
"content": (
"Compare and contrast supervised and unsupervised "
"machine learning. Now focus specifically on "
"reinforcement learning and how it differs."
),
},
],
"max_tokens": 512,
},
]
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=BENCH_PACKAGES,
)
def run_benchmark(
prompts: list,
llm_endpoint: str,
model_name: str,
num_warmup: int = 2,
num_iterations: int = 3,
) -> dict:
"""
Run all prompts through the LLM endpoint and collect timing metrics.
Returns aggregate metrics: p50/p95/mean latency, tokens/sec, TTFT.
"""
import time
import statistics
import httpx
all_latencies: list[float] = []
all_tps: list[float] = []
all_ttft: list[float] = []
per_category: dict[str, list[float]] = {}
with httpx.Client(timeout=300.0) as client:
# Warmup
for _ in range(num_warmup):
try:
client.post(
f"{llm_endpoint}/v1/chat/completions",
json={
"model": model_name,
"messages": [{"role": "user", "content": "Hi"}],
"max_tokens": 8,
"temperature": 0,
},
)
except Exception:
pass
# Benchmark
for iteration in range(num_iterations):
for prompt in prompts:
category = prompt.get("category", "unknown")
payload = {
"model": model_name,
"messages": prompt["messages"],
"max_tokens": prompt.get("max_tokens", 256),
"temperature": 0,
"stream": True,
}
try:
t_start = time.perf_counter()
first_token_time = None
with client.stream(
"POST",
f"{llm_endpoint}/v1/chat/completions",
json=payload,
) as resp:
resp.raise_for_status()
completion_tokens = 0
for line in resp.iter_lines():
if not line.startswith("data: "):
continue
chunk = line[6:]
if chunk == "[DONE]":
break
if first_token_time is None:
first_token_time = time.perf_counter()
completion_tokens += 1
t_end = time.perf_counter()
latency = t_end - t_start
ttft = (
(first_token_time - t_start)
if first_token_time
else latency
)
tps = (
completion_tokens / latency if latency > 0 else 0
)
all_latencies.append(latency)
all_tps.append(tps)
all_ttft.append(ttft)
per_category.setdefault(category, []).append(latency)
except Exception as exc:
# Record failure but keep going
all_latencies.append(-1)
all_tps.append(0)
all_ttft.append(-1)
# Compute aggregates
valid_latencies = [l for l in all_latencies if l > 0]
valid_tps = [t for t in all_tps if t > 0]
valid_ttft = [t for t in all_ttft if t > 0]
def safe_stat(values, func):
return func(values) if values else 0
metrics = {
"total_requests": len(all_latencies),
"successful_requests": len(valid_latencies),
"failed_requests": len(all_latencies) - len(valid_latencies),
# Latency
"latency_mean_s": safe_stat(valid_latencies, statistics.mean),
"latency_p50_s": safe_stat(
valid_latencies,
lambda v: statistics.median(v),
),
"latency_p95_s": safe_stat(
valid_latencies,
lambda v: sorted(v)[int(len(v) * 0.95)] if v else 0,
),
# Throughput
"tokens_per_second_mean": safe_stat(valid_tps, statistics.mean),
"tokens_per_second_p50": safe_stat(
valid_tps, lambda v: statistics.median(v)
),
# Time to first token
"ttft_mean_s": safe_stat(valid_ttft, statistics.mean),
"ttft_p50_s": safe_stat(valid_ttft, lambda v: statistics.median(v)),
"ttft_p95_s": safe_stat(
valid_ttft,
lambda v: sorted(v)[int(len(v) * 0.95)] if v else 0,
),
}
# Per-category latency
for cat, lats in per_category.items():
valid = [l for l in lats if l > 0]
if valid:
metrics[f"latency_mean_{cat}_s"] = statistics.mean(valid)
return metrics
# ---- Pipeline ----
@dsl.pipeline(
name="vllm-tuning-evaluation",
description=(
"Benchmark vLLM with different tuning configurations. "
"Logs latency, TPS, and TTFT to MLflow for A/B comparison."
),
)
def vllm_tuning_pipeline(
llm_endpoint: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm",
model_name: str = "hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4",
# Tuning knobs (match env vars in rayservice.yaml)
enable_prefix_caching: str = "true",
enable_chunked_prefill: str = "true",
num_speculative_tokens: str = "3",
ngram_prompt_lookup_max: str = "4",
gpu_memory_utilization: str = "0.90",
# Benchmark config
num_warmup: int = 2,
num_iterations: int = 3,
run_label: str = "baseline",
):
"""
vLLM Tuning Evaluation Pipeline
Run this multiple times with different tuning params, then compare
runs in the MLflow "vllm-tuning" experiment.
Args:
llm_endpoint: vLLM inference endpoint URL
model_name: HF model identifier
enable_prefix_caching: "true" or "false"
enable_chunked_prefill: "true" or "false"
num_speculative_tokens: number of speculative tokens (0 = off)
ngram_prompt_lookup_max: ngram window for spec decode (0 = off)
gpu_memory_utilization: 0.0 - 1.0
num_warmup: warmup requests before timing
num_iterations: how many times to repeat the prompt suite
run_label: human-readable label (e.g. "apc-on-spec3")
"""
tuning_params = {
"enable_prefix_caching": enable_prefix_caching,
"enable_chunked_prefill": enable_chunked_prefill,
"num_speculative_tokens": num_speculative_tokens,
"ngram_prompt_lookup_max": ngram_prompt_lookup_max,
"gpu_memory_utilization": gpu_memory_utilization,
"model_name": model_name,
"llm_endpoint": llm_endpoint,
"num_warmup": str(num_warmup),
"num_iterations": str(num_iterations),
}
# 1. Create MLflow run
mlflow_run = create_tuning_run(
experiment_name="vllm-tuning",
run_name=f"vllm-{run_label}",
tuning_params=tuning_params,
)
# 2. Build prompt suite
prompts_task = build_prompt_suite()
prompts_task.set_caching_options(enable_caching=True)
# 3. Run benchmark
bench_task = run_benchmark(
prompts=prompts_task.output,
llm_endpoint=llm_endpoint,
model_name=model_name,
num_warmup=num_warmup,
num_iterations=num_iterations,
)
bench_task.set_caching_options(enable_caching=False)
# 4. Log results to MLflow
log_task = log_benchmark_results(
run_id=mlflow_run.outputs["run_id"],
metrics=bench_task.output,
)
if __name__ == "__main__":
compiler.Compiler().compile(
vllm_tuning_pipeline,
"vllm_tuning_pipeline.yaml",
)
print("Compiled: vllm_tuning_pipeline.yaml")
print()
print("Example runs to compare configurations:")
print(" # Baseline (current config)")
print(" kfp run submit vllm_tuning_pipeline.yaml --run-label=baseline")
print()
print(" # APC disabled")
print(" kfp run submit vllm_tuning_pipeline.yaml \\")
print(" --enable-prefix-caching=false --run-label=no-apc")
print()
print(" # No speculative decoding")
print(" kfp run submit vllm_tuning_pipeline.yaml \\")
print(" --num-speculative-tokens=0 --run-label=no-spec")
print()
print(" # Aggressive spec decode")
print(" kfp run submit vllm_tuning_pipeline.yaml \\")
print(" --num-speculative-tokens=5 --ngram-prompt-lookup-max=6 --run-label=spec5-ngram6")

501
vllm_tuning_pipeline.yaml Normal file
View File

@@ -0,0 +1,501 @@
# PIPELINE DEFINITION
# Name: vllm-tuning-evaluation
# Description: Benchmark vLLM with different tuning configurations. Logs latency, TPS, and TTFT to MLflow for A/B comparison.
# Inputs:
# enable_chunked_prefill: str [Default: 'true']
# enable_prefix_caching: str [Default: 'true']
# gpu_memory_utilization: str [Default: '0.90']
# llm_endpoint: str [Default: 'http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm']
# model_name: str [Default: 'hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4']
# ngram_prompt_lookup_max: str [Default: '4']
# num_iterations: int [Default: 3.0]
# num_speculative_tokens: str [Default: '3']
# num_warmup: int [Default: 2.0]
# run_label: str [Default: 'baseline']
components:
comp-build-prompt-suite:
executorLabel: exec-build-prompt-suite
outputDefinitions:
parameters:
Output:
parameterType: LIST
comp-create-tuning-run:
executorLabel: exec-create-tuning-run
inputDefinitions:
parameters:
experiment_name:
parameterType: STRING
mlflow_tracking_uri:
defaultValue: http://mlflow.mlflow.svc.cluster.local:80
isOptional: true
parameterType: STRING
run_name:
parameterType: STRING
tuning_params:
parameterType: STRUCT
outputDefinitions:
parameters:
experiment_id:
parameterType: STRING
run_id:
parameterType: STRING
comp-log-benchmark-results:
executorLabel: exec-log-benchmark-results
inputDefinitions:
parameters:
metrics:
parameterType: STRUCT
mlflow_tracking_uri:
defaultValue: http://mlflow.mlflow.svc.cluster.local:80
isOptional: true
parameterType: STRING
run_id:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-run-benchmark:
executorLabel: exec-run-benchmark
inputDefinitions:
parameters:
llm_endpoint:
parameterType: STRING
model_name:
parameterType: STRING
num_iterations:
defaultValue: 3.0
isOptional: true
parameterType: NUMBER_INTEGER
num_warmup:
defaultValue: 2.0
isOptional: true
parameterType: NUMBER_INTEGER
prompts:
parameterType: LIST
outputDefinitions:
parameters:
Output:
parameterType: STRUCT
deploymentSpec:
executors:
exec-build-prompt-suite:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- build_prompt_suite
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef build_prompt_suite() -> list:\n \"\"\"Return a list of test\
\ prompts spanning short, medium, and long inputs.\"\"\"\n return [\n\
\ {\n \"id\": \"short-1\",\n \"category\":\
\ \"short\",\n \"messages\": [\n {\"role\": \"\
user\", \"content\": \"What is the capital of France?\"}\n ],\n\
\ \"max_tokens\": 64,\n },\n {\n \"\
id\": \"short-2\",\n \"category\": \"short\",\n \"\
messages\": [\n {\"role\": \"user\", \"content\": \"Explain\
\ quantum computing in one sentence.\"}\n ],\n \"\
max_tokens\": 64,\n },\n {\n \"id\": \"medium-1\"\
,\n \"category\": \"medium\",\n \"messages\": [\n\
\ {\n \"role\": \"system\",\n \
\ \"content\": \"You are a helpful AI assistant running on a\
\ homelab.\",\n },\n {\n \
\ \"role\": \"user\",\n \"content\": (\n \
\ \"Compare and contrast supervised and unsupervised \"\n \
\ \"machine learning. Give examples of each and explain\
\ \"\n \"when you would choose one over the other.\"\
\n ),\n },\n ],\n \
\ \"max_tokens\": 512,\n },\n {\n \"id\": \"\
medium-2\",\n \"category\": \"medium\",\n \"messages\"\
: [\n {\n \"role\": \"user\",\n \
\ \"content\": (\n \"Write a Python\
\ function that implements a binary search \"\n \"\
tree with insert, search, and delete operations. Include \"\n \
\ \"docstrings and type hints.\"\n ),\n\
\ },\n ],\n \"max_tokens\": 1024,\n\
\ },\n {\n \"id\": \"long-1\",\n \"\
category\": \"long\",\n \"messages\": [\n {\n\
\ \"role\": \"system\",\n \"content\"\
: \"You are a technical writer for a Kubernetes homelab blog.\",\n \
\ },\n {\n \"role\": \"user\"\
,\n \"content\": (\n \"Write a\
\ detailed tutorial on setting up a multi-node \"\n \
\ \"Kubernetes cluster with Talos Linux, covering: \"\n \
\ \"1) Hardware requirements and network topology, \"\n \
\ \"2) Talos machine config generation, \"\n \
\ \"3) Control plane bootstrapping, \"\n \
\ \"4) Worker node joining, \"\n \"5) CNI setup\
\ with Cilium, \"\n \"6) Storage with Rook-Ceph,\
\ \"\n \"7) GitOps with Flux CD. \"\n \
\ \"Include YAML examples for each step.\"\n \
\ ),\n },\n ],\n \"max_tokens\"\
: 2048,\n },\n {\n \"id\": \"repeat-prefix-1\"\
,\n \"category\": \"prefix-cache-test\",\n \"messages\"\
: [\n {\n \"role\": \"system\",\n \
\ \"content\": \"You are a helpful AI assistant running on\
\ a homelab.\",\n },\n {\n \
\ \"role\": \"user\",\n \"content\": (\n \
\ \"Compare and contrast supervised and unsupervised \"\n\
\ \"machine learning. Now focus specifically on \"\
\n \"reinforcement learning and how it differs.\"\
\n ),\n },\n ],\n \
\ \"max_tokens\": 512,\n },\n ]\n\n"
image: python:3.13-slim
exec-create-tuning-run:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- create_tuning_run
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'mlflow>=2.10.0'\
\ 'boto3' 'psycopg2-binary' && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef create_tuning_run(\n experiment_name: str,\n run_name:\
\ str,\n tuning_params: dict,\n mlflow_tracking_uri: str = \"http://mlflow.mlflow.svc.cluster.local:80\"\
,\n) -> NamedTuple(\"RunInfo\", [(\"run_id\", str), (\"experiment_id\",\
\ str)]):\n \"\"\"Create an MLflow run for a vLLM tuning experiment.\"\
\"\"\n import os\n import mlflow\n from mlflow.tracking import\
\ MlflowClient\n from collections import namedtuple\n\n mlflow.set_tracking_uri(mlflow_tracking_uri)\n\
\ client = MlflowClient()\n\n exp = client.get_experiment_by_name(experiment_name)\n\
\ experiment_id = (\n exp.experiment_id\n if exp\n \
\ else client.create_experiment(\n name=experiment_name,\n\
\ artifact_location=f\"/mlflow/artifacts/{experiment_name}\"\
,\n )\n )\n\n tags = {\n \"pipeline.type\": \"vllm-tuning\"\
,\n \"kfp.run_id\": os.environ.get(\"KFP_RUN_ID\", \"unknown\"),\n\
\ }\n\n run = mlflow.start_run(\n experiment_id=experiment_id,\
\ run_name=run_name, tags=tags\n )\n # Log every tuning param\n \
\ for key, value in tuning_params.items():\n mlflow.log_param(f\"\
vllm.{key}\", value)\n run_id = run.info.run_id\n mlflow.end_run()\n\
\n RunInfo = namedtuple(\"RunInfo\", [\"run_id\", \"experiment_id\"])\n\
\ return RunInfo(run_id, experiment_id)\n\n"
image: python:3.13-slim
exec-log-benchmark-results:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- log_benchmark_results
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'mlflow>=2.10.0'\
\ 'boto3' 'psycopg2-binary' && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef log_benchmark_results(\n run_id: str,\n metrics: dict,\n\
\ mlflow_tracking_uri: str = \"http://mlflow.mlflow.svc.cluster.local:80\"\
,\n) -> str:\n \"\"\"Log benchmark metrics to MLflow and close the run.\"\
\"\"\n import json\n import tempfile\n import mlflow\n from\
\ mlflow.tracking import MlflowClient\n from pathlib import Path\n\n\
\ mlflow.set_tracking_uri(mlflow_tracking_uri)\n client = MlflowClient()\n\
\n for key, value in metrics.items():\n client.log_metric(run_id,\
\ key, float(value))\n\n # Save full results as artifact\n with tempfile.TemporaryDirectory()\
\ as tmpdir:\n path = Path(tmpdir) / \"benchmark_results.json\"\n\
\ path.write_text(json.dumps(metrics, indent=2))\n client.log_artifact(run_id,\
\ str(path))\n\n client.set_terminated(run_id, status=\"FINISHED\")\n\
\ return run_id\n\n"
image: python:3.13-slim
exec-run-benchmark:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- run_benchmark
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef run_benchmark(\n prompts: list,\n llm_endpoint: str,\n\
\ model_name: str,\n num_warmup: int = 2,\n num_iterations: int\
\ = 3,\n) -> dict:\n \"\"\"\n Run all prompts through the LLM endpoint\
\ and collect timing metrics.\n\n Returns aggregate metrics: p50/p95/mean\
\ latency, tokens/sec, TTFT.\n \"\"\"\n import time\n import statistics\n\
\ import httpx\n\n all_latencies: list[float] = []\n all_tps: list[float]\
\ = []\n all_ttft: list[float] = []\n per_category: dict[str, list[float]]\
\ = {}\n\n with httpx.Client(timeout=300.0) as client:\n # Warmup\n\
\ for _ in range(num_warmup):\n try:\n \
\ client.post(\n f\"{llm_endpoint}/v1/chat/completions\"\
,\n json={\n \"model\": model_name,\n\
\ \"messages\": [{\"role\": \"user\", \"content\"\
: \"Hi\"}],\n \"max_tokens\": 8,\n \
\ \"temperature\": 0,\n },\n \
\ )\n except Exception:\n pass\n\n # Benchmark\n\
\ for iteration in range(num_iterations):\n for prompt\
\ in prompts:\n category = prompt.get(\"category\", \"unknown\"\
)\n payload = {\n \"model\": model_name,\n\
\ \"messages\": prompt[\"messages\"],\n \
\ \"max_tokens\": prompt.get(\"max_tokens\", 256),\n \
\ \"temperature\": 0,\n \"stream\": True,\n \
\ }\n\n try:\n t_start = time.perf_counter()\n\
\ first_token_time = None\n\n with\
\ client.stream(\n \"POST\",\n \
\ f\"{llm_endpoint}/v1/chat/completions\",\n \
\ json=payload,\n ) as resp:\n \
\ resp.raise_for_status()\n completion_tokens =\
\ 0\n for line in resp.iter_lines():\n \
\ if not line.startswith(\"data: \"):\n \
\ continue\n chunk = line[6:]\n\
\ if chunk == \"[DONE]\":\n \
\ break\n if first_token_time is\
\ None:\n first_token_time = time.perf_counter()\n\
\ completion_tokens += 1\n\n \
\ t_end = time.perf_counter()\n latency = t_end -\
\ t_start\n ttft = (\n (first_token_time\
\ - t_start)\n if first_token_time\n \
\ else latency\n )\n tps\
\ = (\n completion_tokens / latency if latency >\
\ 0 else 0\n )\n\n all_latencies.append(latency)\n\
\ all_tps.append(tps)\n all_ttft.append(ttft)\n\
\ per_category.setdefault(category, []).append(latency)\n\
\n except Exception as exc:\n # Record\
\ failure but keep going\n all_latencies.append(-1)\n\
\ all_tps.append(0)\n all_ttft.append(-1)\n\
\n # Compute aggregates\n valid_latencies = [l for l in all_latencies\
\ if l > 0]\n valid_tps = [t for t in all_tps if t > 0]\n valid_ttft\
\ = [t for t in all_ttft if t > 0]\n\n def safe_stat(values, func):\n\
\ return func(values) if values else 0\n\n metrics = {\n \
\ \"total_requests\": len(all_latencies),\n \"successful_requests\"\
: len(valid_latencies),\n \"failed_requests\": len(all_latencies)\
\ - len(valid_latencies),\n # Latency\n \"latency_mean_s\"\
: safe_stat(valid_latencies, statistics.mean),\n \"latency_p50_s\"\
: safe_stat(\n valid_latencies,\n lambda v: statistics.median(v),\n\
\ ),\n \"latency_p95_s\": safe_stat(\n valid_latencies,\n\
\ lambda v: sorted(v)[int(len(v) * 0.95)] if v else 0,\n \
\ ),\n # Throughput\n \"tokens_per_second_mean\": safe_stat(valid_tps,\
\ statistics.mean),\n \"tokens_per_second_p50\": safe_stat(\n \
\ valid_tps, lambda v: statistics.median(v)\n ),\n \
\ # Time to first token\n \"ttft_mean_s\": safe_stat(valid_ttft,\
\ statistics.mean),\n \"ttft_p50_s\": safe_stat(valid_ttft, lambda\
\ v: statistics.median(v)),\n \"ttft_p95_s\": safe_stat(\n \
\ valid_ttft,\n lambda v: sorted(v)[int(len(v) * 0.95)]\
\ if v else 0,\n ),\n }\n\n # Per-category latency\n for\
\ cat, lats in per_category.items():\n valid = [l for l in lats if\
\ l > 0]\n if valid:\n metrics[f\"latency_mean_{cat}_s\"\
] = statistics.mean(valid)\n\n return metrics\n\n"
image: python:3.13-slim
pipelineInfo:
description: Benchmark vLLM with different tuning configurations. Logs latency,
TPS, and TTFT to MLflow for A/B comparison.
name: vllm-tuning-evaluation
root:
dag:
tasks:
build-prompt-suite:
cachingOptions:
enableCache: true
componentRef:
name: comp-build-prompt-suite
taskInfo:
name: build-prompt-suite
create-tuning-run:
cachingOptions:
enableCache: true
componentRef:
name: comp-create-tuning-run
inputs:
parameters:
experiment_name:
runtimeValue:
constant: vllm-tuning
pipelinechannel--enable_chunked_prefill:
componentInputParameter: enable_chunked_prefill
pipelinechannel--enable_prefix_caching:
componentInputParameter: enable_prefix_caching
pipelinechannel--gpu_memory_utilization:
componentInputParameter: gpu_memory_utilization
pipelinechannel--llm_endpoint:
componentInputParameter: llm_endpoint
pipelinechannel--model_name:
componentInputParameter: model_name
pipelinechannel--ngram_prompt_lookup_max:
componentInputParameter: ngram_prompt_lookup_max
pipelinechannel--num_iterations:
componentInputParameter: num_iterations
pipelinechannel--num_speculative_tokens:
componentInputParameter: num_speculative_tokens
pipelinechannel--num_warmup:
componentInputParameter: num_warmup
pipelinechannel--run_label:
componentInputParameter: run_label
run_name:
runtimeValue:
constant: vllm-{{$.inputs.parameters['pipelinechannel--run_label']}}
tuning_params:
runtimeValue:
constant:
enable_chunked_prefill: '{{$.inputs.parameters[''pipelinechannel--enable_chunked_prefill'']}}'
enable_prefix_caching: '{{$.inputs.parameters[''pipelinechannel--enable_prefix_caching'']}}'
gpu_memory_utilization: '{{$.inputs.parameters[''pipelinechannel--gpu_memory_utilization'']}}'
llm_endpoint: '{{$.inputs.parameters[''pipelinechannel--llm_endpoint'']}}'
model_name: '{{$.inputs.parameters[''pipelinechannel--model_name'']}}'
ngram_prompt_lookup_max: '{{$.inputs.parameters[''pipelinechannel--ngram_prompt_lookup_max'']}}'
num_iterations: '{{$.inputs.parameters[''pipelinechannel--num_iterations'']}}'
num_speculative_tokens: '{{$.inputs.parameters[''pipelinechannel--num_speculative_tokens'']}}'
num_warmup: '{{$.inputs.parameters[''pipelinechannel--num_warmup'']}}'
taskInfo:
name: create-tuning-run
log-benchmark-results:
cachingOptions:
enableCache: true
componentRef:
name: comp-log-benchmark-results
dependentTasks:
- create-tuning-run
- run-benchmark
inputs:
parameters:
metrics:
taskOutputParameter:
outputParameterKey: Output
producerTask: run-benchmark
run_id:
taskOutputParameter:
outputParameterKey: run_id
producerTask: create-tuning-run
taskInfo:
name: log-benchmark-results
run-benchmark:
cachingOptions: {}
componentRef:
name: comp-run-benchmark
dependentTasks:
- build-prompt-suite
inputs:
parameters:
llm_endpoint:
componentInputParameter: llm_endpoint
model_name:
componentInputParameter: model_name
num_iterations:
componentInputParameter: num_iterations
num_warmup:
componentInputParameter: num_warmup
prompts:
taskOutputParameter:
outputParameterKey: Output
producerTask: build-prompt-suite
taskInfo:
name: run-benchmark
inputDefinitions:
parameters:
enable_chunked_prefill:
defaultValue: 'true'
description: '"true" or "false"'
isOptional: true
parameterType: STRING
enable_prefix_caching:
defaultValue: 'true'
description: '"true" or "false"'
isOptional: true
parameterType: STRING
gpu_memory_utilization:
defaultValue: '0.90'
description: 0.0 - 1.0
isOptional: true
parameterType: STRING
llm_endpoint:
defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm
description: vLLM inference endpoint URL
isOptional: true
parameterType: STRING
model_name:
defaultValue: hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4
description: HF model identifier
isOptional: true
parameterType: STRING
ngram_prompt_lookup_max:
defaultValue: '4'
description: ngram window for spec decode (0 = off)
isOptional: true
parameterType: STRING
num_iterations:
defaultValue: 3.0
description: how many times to repeat the prompt suite
isOptional: true
parameterType: NUMBER_INTEGER
num_speculative_tokens:
defaultValue: '3'
description: number of speculative tokens (0 = off)
isOptional: true
parameterType: STRING
num_warmup:
defaultValue: 2.0
description: warmup requests before timing
isOptional: true
parameterType: NUMBER_INTEGER
run_label:
defaultValue: baseline
description: human-readable label (e.g. "apc-on-spec3")
isOptional: true
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.12.1

View File

@@ -12,6 +12,11 @@ Usage:
from kfp import dsl from kfp import dsl
from kfp import compiler from kfp import compiler
from typing import NamedTuple
MLFLOW_IMAGE = "python:3.13-slim"
MLFLOW_PACKAGES = ["mlflow>=2.10.0", "boto3", "psycopg2-binary"]
@dsl.component( @dsl.component(
@@ -21,13 +26,16 @@ from kfp import compiler
def transcribe_audio( def transcribe_audio(
audio_b64: str, audio_b64: str,
whisper_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/whisper" whisper_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/whisper"
) -> str: ) -> NamedTuple("STTResult", [("text", str), ("latency_s", float), ("audio_duration_s", float)]):
"""Transcribe audio using Whisper STT service.""" """Transcribe audio using Whisper STT service."""
import base64 import base64
import time
import httpx import httpx
from collections import namedtuple
audio_bytes = base64.b64decode(audio_b64) audio_bytes = base64.b64decode(audio_b64)
start = time.perf_counter()
with httpx.Client(timeout=120.0) as client: with httpx.Client(timeout=120.0) as client:
response = client.post( response = client.post(
f"{whisper_url}/v1/audio/transcriptions", f"{whisper_url}/v1/audio/transcriptions",
@@ -35,8 +43,14 @@ def transcribe_audio(
data={"model": "whisper-large-v3", "language": "en"} data={"model": "whisper-large-v3", "language": "en"}
) )
result = response.json() result = response.json()
latency = time.perf_counter() - start
return result.get("text", "") text = result.get("text", "")
# Estimate audio duration from WAV header (16-bit PCM, 16kHz)
audio_duration = max(len(audio_bytes) / (16000 * 2), 0.1)
STTResult = namedtuple("STTResult", ["text", "latency_s", "audio_duration_s"])
return STTResult(text, latency, audio_duration)
@dsl.component( @dsl.component(
@@ -46,18 +60,23 @@ def transcribe_audio(
def generate_embeddings( def generate_embeddings(
text: str, text: str,
embeddings_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings" embeddings_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings"
) -> list: ) -> NamedTuple("EmbedResult", [("embedding", list), ("latency_s", float)]):
"""Generate embeddings for RAG retrieval.""" """Generate embeddings for RAG retrieval."""
import time
import httpx import httpx
from collections import namedtuple
start = time.perf_counter()
with httpx.Client(timeout=60.0) as client: with httpx.Client(timeout=60.0) as client:
response = client.post( response = client.post(
f"{embeddings_url}/embeddings", f"{embeddings_url}/embeddings",
json={"input": text, "model": "bge-small-en-v1.5"} json={"input": text, "model": "bge-small-en-v1.5"}
) )
result = response.json() result = response.json()
latency = time.perf_counter() - start
return result["data"][0]["embedding"] EmbedResult = namedtuple("EmbedResult", ["embedding", "latency_s"])
return EmbedResult(result["data"][0]["embedding"], latency)
@dsl.component( @dsl.component(
@@ -69,14 +88,19 @@ def retrieve_context(
milvus_host: str = "milvus.ai-ml.svc.cluster.local", milvus_host: str = "milvus.ai-ml.svc.cluster.local",
collection_name: str = "knowledge_base", collection_name: str = "knowledge_base",
top_k: int = 5 top_k: int = 5
) -> list: ) -> NamedTuple("RetrieveResult", [("documents", list), ("latency_s", float)]):
"""Retrieve relevant documents from Milvus vector database.""" """Retrieve relevant documents from Milvus vector database."""
import time
from pymilvus import connections, Collection, utility from pymilvus import connections, Collection, utility
from collections import namedtuple
start = time.perf_counter()
connections.connect(host=milvus_host, port=19530) connections.connect(host=milvus_host, port=19530)
if not utility.has_collection(collection_name): if not utility.has_collection(collection_name):
return [] latency = time.perf_counter() - start
RetrieveResult = namedtuple("RetrieveResult", ["documents", "latency_s"])
return RetrieveResult([], latency)
collection = Collection(collection_name) collection = Collection(collection_name)
collection.load() collection.load()
@@ -88,6 +112,7 @@ def retrieve_context(
limit=top_k, limit=top_k,
output_fields=["text", "source"] output_fields=["text", "source"]
) )
latency = time.perf_counter() - start
documents = [] documents = []
for hits in results: for hits in results:
@@ -98,7 +123,8 @@ def retrieve_context(
"score": hit.distance "score": hit.distance
}) })
return documents RetrieveResult = namedtuple("RetrieveResult", ["documents", "latency_s"])
return RetrieveResult(documents, latency)
@dsl.component( @dsl.component(
@@ -110,13 +136,17 @@ def rerank_documents(
documents: list, documents: list,
reranker_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker", reranker_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker",
top_k: int = 3 top_k: int = 3
) -> list: ) -> NamedTuple("RerankResult", [("documents", list), ("latency_s", float)]):
"""Rerank documents using BGE reranker.""" """Rerank documents using BGE reranker."""
import time
import httpx import httpx
from collections import namedtuple
if not documents: if not documents:
return [] RerankResult = namedtuple("RerankResult", ["documents", "latency_s"])
return RerankResult([], 0.0)
start = time.perf_counter()
with httpx.Client(timeout=60.0) as client: with httpx.Client(timeout=60.0) as client:
response = client.post( response = client.post(
f"{reranker_url}/v1/rerank", f"{reranker_url}/v1/rerank",
@@ -127,6 +157,7 @@ def rerank_documents(
} }
) )
result = response.json() result = response.json()
latency = time.perf_counter() - start
# Sort by rerank score # Sort by rerank score
reranked = sorted( reranked = sorted(
@@ -135,7 +166,8 @@ def rerank_documents(
reverse=True reverse=True
)[:top_k] )[:top_k]
return [doc for doc, score in reranked] RerankResult = namedtuple("RerankResult", ["documents", "latency_s"])
return RerankResult([doc for doc, score in reranked], latency)
@dsl.component( @dsl.component(
@@ -147,9 +179,11 @@ def generate_response(
context: list, context: list,
vllm_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm", vllm_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm",
model: str = "hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4" model: str = "hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4"
) -> str: ) -> NamedTuple("LLMResult", [("text", str), ("latency_s", float), ("completion_tokens", int)]):
"""Generate response using vLLM.""" """Generate response using vLLM."""
import time
import httpx import httpx
from collections import namedtuple
# Build context # Build context
if context: if context:
@@ -167,6 +201,7 @@ Keep responses concise and natural for speech synthesis."""
{"role": "user", "content": user_content} {"role": "user", "content": user_content}
] ]
start = time.perf_counter()
with httpx.Client(timeout=180.0) as client: with httpx.Client(timeout=180.0) as client:
response = client.post( response = client.post(
f"{vllm_url}/v1/chat/completions", f"{vllm_url}/v1/chat/completions",
@@ -178,8 +213,14 @@ Keep responses concise and natural for speech synthesis."""
} }
) )
result = response.json() result = response.json()
latency = time.perf_counter() - start
return result["choices"][0]["message"]["content"] text = result["choices"][0]["message"]["content"]
usage = result.get("usage", {})
completion_tokens = usage.get("completion_tokens", len(text.split()))
LLMResult = namedtuple("LLMResult", ["text", "latency_s", "completion_tokens"])
return LLMResult(text, latency, completion_tokens)
@dsl.component( @dsl.component(
@@ -189,11 +230,14 @@ Keep responses concise and natural for speech synthesis."""
def synthesize_speech( def synthesize_speech(
text: str, text: str,
tts_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts" tts_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts"
) -> str: ) -> NamedTuple("TTSResult", [("audio_b64", str), ("latency_s", float)]):
"""Convert text to speech using TTS service.""" """Convert text to speech using TTS service."""
import base64 import base64
import time
import httpx import httpx
from collections import namedtuple
start = time.perf_counter()
with httpx.Client(timeout=120.0) as client: with httpx.Client(timeout=120.0) as client:
response = client.post( response = client.post(
f"{tts_url}/v1/audio/speech", f"{tts_url}/v1/audio/speech",
@@ -204,13 +248,86 @@ def synthesize_speech(
} }
) )
audio_b64 = base64.b64encode(response.content).decode("utf-8") audio_b64 = base64.b64encode(response.content).decode("utf-8")
latency = time.perf_counter() - start
return audio_b64 TTSResult = namedtuple("TTSResult", ["audio_b64", "latency_s"])
return TTSResult(audio_b64, latency)
# ---- MLflow logging component ----
@dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES)
def log_pipeline_metrics(
stt_latency: float,
stt_audio_duration: float,
embed_latency: float,
retrieve_latency: float,
rerank_latency: float,
llm_latency: float,
llm_completion_tokens: int,
tts_latency: float,
experiment_name: str = "voice-pipeline-metrics",
run_name: str = "voice-pipeline",
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
) -> str:
"""Log per-step latency metrics to MLflow for the full voice pipeline."""
import os
import mlflow
from mlflow.tracking import MlflowClient
mlflow.set_tracking_uri(mlflow_tracking_uri)
client = MlflowClient()
exp = client.get_experiment_by_name(experiment_name)
experiment_id = (
exp.experiment_id
if exp
else client.create_experiment(
name=experiment_name,
artifact_location=f"/mlflow/artifacts/{experiment_name}",
)
)
run = mlflow.start_run(
experiment_id=experiment_id,
run_name=run_name,
tags={
"pipeline.type": "voice-assistant",
"kfp.run_id": os.environ.get("KFP_RUN_ID", "unknown"),
},
)
total_latency = (
stt_latency + embed_latency + retrieve_latency
+ rerank_latency + llm_latency + tts_latency
)
stt_rtf = stt_latency / stt_audio_duration if stt_audio_duration > 0 else 0
llm_tps = llm_completion_tokens / llm_latency if llm_latency > 0 else 0
mlflow.log_metrics({
"stt_latency_s": stt_latency,
"stt_audio_duration_s": stt_audio_duration,
"stt_realtime_factor": stt_rtf,
"embed_latency_s": embed_latency,
"retrieve_latency_s": retrieve_latency,
"rerank_latency_s": rerank_latency,
"llm_latency_s": llm_latency,
"llm_completion_tokens": llm_completion_tokens,
"llm_tokens_per_second": llm_tps,
"tts_latency_s": tts_latency,
"total_pipeline_latency_s": total_latency,
})
mlflow.end_run()
return run.info.run_id
# ---- Pipelines ----
@dsl.pipeline( @dsl.pipeline(
name="voice-assistant-rag-pipeline", name="voice-assistant-rag-pipeline",
description="End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS" description="End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS. Logs per-step latency to MLflow."
) )
def voice_assistant_pipeline( def voice_assistant_pipeline(
audio_b64: str, audio_b64: str,
@@ -229,29 +346,41 @@ def voice_assistant_pipeline(
transcribe_task.set_caching_options(enable_caching=False) transcribe_task.set_caching_options(enable_caching=False)
# Step 2: Generate embeddings # Step 2: Generate embeddings
embed_task = generate_embeddings(text=transcribe_task.output) embed_task = generate_embeddings(text=transcribe_task.outputs["text"])
embed_task.set_caching_options(enable_caching=True) embed_task.set_caching_options(enable_caching=True)
# Step 3: Retrieve context from Milvus # Step 3: Retrieve context from Milvus
retrieve_task = retrieve_context( retrieve_task = retrieve_context(
embedding=embed_task.output, embedding=embed_task.outputs["embedding"],
collection_name=collection_name collection_name=collection_name
) )
# Step 4: Rerank documents # Step 4: Rerank documents
rerank_task = rerank_documents( rerank_task = rerank_documents(
query=transcribe_task.output, query=transcribe_task.outputs["text"],
documents=retrieve_task.output documents=retrieve_task.outputs["documents"]
) )
# Step 5: Generate response with context # Step 5: Generate response with context
llm_task = generate_response( llm_task = generate_response(
query=transcribe_task.output, query=transcribe_task.outputs["text"],
context=rerank_task.output context=rerank_task.outputs["documents"]
) )
# Step 6: Synthesize speech # Step 6: Synthesize speech
tts_task = synthesize_speech(text=llm_task.output) tts_task = synthesize_speech(text=llm_task.outputs["text"])
# Step 7: Log all per-step latencies to MLflow
log_task = log_pipeline_metrics(
stt_latency=transcribe_task.outputs["latency_s"],
stt_audio_duration=transcribe_task.outputs["audio_duration_s"],
embed_latency=embed_task.outputs["latency_s"],
retrieve_latency=retrieve_task.outputs["latency_s"],
rerank_latency=rerank_task.outputs["latency_s"],
llm_latency=llm_task.outputs["latency_s"],
llm_completion_tokens=llm_task.outputs["completion_tokens"],
tts_latency=tts_task.outputs["latency_s"],
)
@dsl.pipeline( @dsl.pipeline(
@@ -265,7 +394,7 @@ def text_to_speech_pipeline(text: str):
@dsl.pipeline( @dsl.pipeline(
name="rag-query-pipeline", name="rag-query-pipeline",
description="RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM" description="RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM. Logs per-step latency to MLflow."
) )
def rag_query_pipeline( def rag_query_pipeline(
query: str, query: str,
@@ -283,20 +412,20 @@ def rag_query_pipeline(
# Retrieve from Milvus # Retrieve from Milvus
retrieve_task = retrieve_context( retrieve_task = retrieve_context(
embedding=embed_task.output, embedding=embed_task.outputs["embedding"],
collection_name=collection_name collection_name=collection_name
) )
# Rerank # Rerank
rerank_task = rerank_documents( rerank_task = rerank_documents(
query=query, query=query,
documents=retrieve_task.output documents=retrieve_task.outputs["documents"]
) )
# Generate response # Generate response
llm_task = generate_response( llm_task = generate_response(
query=query, query=query,
context=rerank_task.output context=rerank_task.outputs["documents"]
) )

656
voice_pipeline.yaml Normal file
View File

@@ -0,0 +1,656 @@
# PIPELINE DEFINITION
# Name: voice-assistant-rag-pipeline
# Description: End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS. Logs per-step latency to MLflow.
# Inputs:
# audio_b64: str
# collection_name: str [Default: 'knowledge_base']
components:
comp-generate-embeddings:
executorLabel: exec-generate-embeddings
inputDefinitions:
parameters:
embeddings_url:
defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings
isOptional: true
parameterType: STRING
text:
parameterType: STRING
outputDefinitions:
parameters:
embedding:
parameterType: LIST
latency_s:
parameterType: NUMBER_DOUBLE
comp-generate-response:
executorLabel: exec-generate-response
inputDefinitions:
parameters:
context:
parameterType: LIST
model:
defaultValue: hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4
isOptional: true
parameterType: STRING
query:
parameterType: STRING
vllm_url:
defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm
isOptional: true
parameterType: STRING
outputDefinitions:
parameters:
completion_tokens:
parameterType: NUMBER_INTEGER
latency_s:
parameterType: NUMBER_DOUBLE
text:
parameterType: STRING
comp-log-pipeline-metrics:
executorLabel: exec-log-pipeline-metrics
inputDefinitions:
parameters:
embed_latency:
parameterType: NUMBER_DOUBLE
experiment_name:
defaultValue: voice-pipeline-metrics
isOptional: true
parameterType: STRING
llm_completion_tokens:
parameterType: NUMBER_INTEGER
llm_latency:
parameterType: NUMBER_DOUBLE
mlflow_tracking_uri:
defaultValue: http://mlflow.mlflow.svc.cluster.local:80
isOptional: true
parameterType: STRING
rerank_latency:
parameterType: NUMBER_DOUBLE
retrieve_latency:
parameterType: NUMBER_DOUBLE
run_name:
defaultValue: voice-pipeline
isOptional: true
parameterType: STRING
stt_audio_duration:
parameterType: NUMBER_DOUBLE
stt_latency:
parameterType: NUMBER_DOUBLE
tts_latency:
parameterType: NUMBER_DOUBLE
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-rerank-documents:
executorLabel: exec-rerank-documents
inputDefinitions:
parameters:
documents:
parameterType: LIST
query:
parameterType: STRING
reranker_url:
defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker
isOptional: true
parameterType: STRING
top_k:
defaultValue: 3.0
isOptional: true
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
documents:
parameterType: LIST
latency_s:
parameterType: NUMBER_DOUBLE
comp-retrieve-context:
executorLabel: exec-retrieve-context
inputDefinitions:
parameters:
collection_name:
defaultValue: knowledge_base
isOptional: true
parameterType: STRING
embedding:
parameterType: LIST
milvus_host:
defaultValue: milvus.ai-ml.svc.cluster.local
isOptional: true
parameterType: STRING
top_k:
defaultValue: 5.0
isOptional: true
parameterType: NUMBER_INTEGER
outputDefinitions:
parameters:
documents:
parameterType: LIST
latency_s:
parameterType: NUMBER_DOUBLE
comp-synthesize-speech:
executorLabel: exec-synthesize-speech
inputDefinitions:
parameters:
text:
parameterType: STRING
tts_url:
defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts
isOptional: true
parameterType: STRING
outputDefinitions:
parameters:
audio_b64:
parameterType: STRING
latency_s:
parameterType: NUMBER_DOUBLE
comp-transcribe-audio:
executorLabel: exec-transcribe-audio
inputDefinitions:
parameters:
audio_b64:
parameterType: STRING
whisper_url:
defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/whisper
isOptional: true
parameterType: STRING
outputDefinitions:
parameters:
audio_duration_s:
parameterType: NUMBER_DOUBLE
latency_s:
parameterType: NUMBER_DOUBLE
text:
parameterType: STRING
deploymentSpec:
executors:
exec-generate-embeddings:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- generate_embeddings
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef generate_embeddings(\n text: str,\n embeddings_url: str\
\ = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings\"\
\n) -> NamedTuple(\"EmbedResult\", [(\"embedding\", list), (\"latency_s\"\
, float)]):\n \"\"\"Generate embeddings for RAG retrieval.\"\"\"\n \
\ import time\n import httpx\n from collections import namedtuple\n\
\n start = time.perf_counter()\n with httpx.Client(timeout=60.0) as\
\ client:\n response = client.post(\n f\"{embeddings_url}/embeddings\"\
,\n json={\"input\": text, \"model\": \"bge-small-en-v1.5\"}\n\
\ )\n result = response.json()\n latency = time.perf_counter()\
\ - start\n\n EmbedResult = namedtuple(\"EmbedResult\", [\"embedding\"\
, \"latency_s\"])\n return EmbedResult(result[\"data\"][0][\"embedding\"\
], latency)\n\n"
image: python:3.13-slim
exec-generate-response:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- generate_response
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef generate_response(\n query: str,\n context: list,\n \
\ vllm_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm\"\
,\n model: str = \"hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4\"\
\n) -> NamedTuple(\"LLMResult\", [(\"text\", str), (\"latency_s\", float),\
\ (\"completion_tokens\", int)]):\n \"\"\"Generate response using vLLM.\"\
\"\"\n import time\n import httpx\n from collections import namedtuple\n\
\n # Build context\n if context:\n context_text = \"\\n\\n\"\
.join([doc[\"text\"] for doc in context])\n user_content = f\"Context:\\\
n{context_text}\\n\\nQuestion: {query}\"\n else:\n user_content\
\ = query\n\n system_prompt = \"\"\"You are a helpful voice assistant.\n\
Answer questions based on the provided context when available.\nKeep responses\
\ concise and natural for speech synthesis.\"\"\"\n\n messages = [\n\
\ {\"role\": \"system\", \"content\": system_prompt},\n {\"\
role\": \"user\", \"content\": user_content}\n ]\n\n start = time.perf_counter()\n\
\ with httpx.Client(timeout=180.0) as client:\n response = client.post(\n\
\ f\"{vllm_url}/v1/chat/completions\",\n json={\n\
\ \"model\": model,\n \"messages\": messages,\n\
\ \"max_tokens\": 512,\n \"temperature\":\
\ 0.7\n }\n )\n result = response.json()\n latency\
\ = time.perf_counter() - start\n\n text = result[\"choices\"][0][\"\
message\"][\"content\"]\n usage = result.get(\"usage\", {})\n completion_tokens\
\ = usage.get(\"completion_tokens\", len(text.split()))\n\n LLMResult\
\ = namedtuple(\"LLMResult\", [\"text\", \"latency_s\", \"completion_tokens\"\
])\n return LLMResult(text, latency, completion_tokens)\n\n"
image: python:3.13-slim
exec-log-pipeline-metrics:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- log_pipeline_metrics
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'mlflow>=2.10.0'\
\ 'boto3' 'psycopg2-binary' && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef log_pipeline_metrics(\n stt_latency: float,\n stt_audio_duration:\
\ float,\n embed_latency: float,\n retrieve_latency: float,\n rerank_latency:\
\ float,\n llm_latency: float,\n llm_completion_tokens: int,\n \
\ tts_latency: float,\n experiment_name: str = \"voice-pipeline-metrics\"\
,\n run_name: str = \"voice-pipeline\",\n mlflow_tracking_uri: str\
\ = \"http://mlflow.mlflow.svc.cluster.local:80\",\n) -> str:\n \"\"\"\
Log per-step latency metrics to MLflow for the full voice pipeline.\"\"\"\
\n import os\n import mlflow\n from mlflow.tracking import MlflowClient\n\
\n mlflow.set_tracking_uri(mlflow_tracking_uri)\n client = MlflowClient()\n\
\n exp = client.get_experiment_by_name(experiment_name)\n experiment_id\
\ = (\n exp.experiment_id\n if exp\n else client.create_experiment(\n\
\ name=experiment_name,\n artifact_location=f\"/mlflow/artifacts/{experiment_name}\"\
,\n )\n )\n\n run = mlflow.start_run(\n experiment_id=experiment_id,\n\
\ run_name=run_name,\n tags={\n \"pipeline.type\"\
: \"voice-assistant\",\n \"kfp.run_id\": os.environ.get(\"KFP_RUN_ID\"\
, \"unknown\"),\n },\n )\n\n total_latency = (\n stt_latency\
\ + embed_latency + retrieve_latency\n + rerank_latency + llm_latency\
\ + tts_latency\n )\n stt_rtf = stt_latency / stt_audio_duration if\
\ stt_audio_duration > 0 else 0\n llm_tps = llm_completion_tokens / llm_latency\
\ if llm_latency > 0 else 0\n\n mlflow.log_metrics({\n \"stt_latency_s\"\
: stt_latency,\n \"stt_audio_duration_s\": stt_audio_duration,\n\
\ \"stt_realtime_factor\": stt_rtf,\n \"embed_latency_s\"\
: embed_latency,\n \"retrieve_latency_s\": retrieve_latency,\n \
\ \"rerank_latency_s\": rerank_latency,\n \"llm_latency_s\"\
: llm_latency,\n \"llm_completion_tokens\": llm_completion_tokens,\n\
\ \"llm_tokens_per_second\": llm_tps,\n \"tts_latency_s\"\
: tts_latency,\n \"total_pipeline_latency_s\": total_latency,\n \
\ })\n mlflow.end_run()\n return run.info.run_id\n\n"
image: python:3.13-slim
exec-rerank-documents:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- rerank_documents
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef rerank_documents(\n query: str,\n documents: list,\n \
\ reranker_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker\"\
,\n top_k: int = 3\n) -> NamedTuple(\"RerankResult\", [(\"documents\"\
, list), (\"latency_s\", float)]):\n \"\"\"Rerank documents using BGE\
\ reranker.\"\"\"\n import time\n import httpx\n from collections\
\ import namedtuple\n\n if not documents:\n RerankResult = namedtuple(\"\
RerankResult\", [\"documents\", \"latency_s\"])\n return RerankResult([],\
\ 0.0)\n\n start = time.perf_counter()\n with httpx.Client(timeout=60.0)\
\ as client:\n response = client.post(\n f\"{reranker_url}/v1/rerank\"\
,\n json={\n \"query\": query,\n \
\ \"documents\": [doc[\"text\"] for doc in documents],\n \
\ \"model\": \"bge-reranker-v2-m3\"\n }\n )\n \
\ result = response.json()\n latency = time.perf_counter() - start\n\
\n # Sort by rerank score\n reranked = sorted(\n zip(documents,\
\ result.get(\"scores\", [0] * len(documents))),\n key=lambda x:\
\ x[1],\n reverse=True\n )[:top_k]\n\n RerankResult = namedtuple(\"\
RerankResult\", [\"documents\", \"latency_s\"])\n return RerankResult([doc\
\ for doc, score in reranked], latency)\n\n"
image: python:3.13-slim
exec-retrieve-context:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- retrieve_context
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'pymilvus' &&\
\ \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef retrieve_context(\n embedding: list,\n milvus_host: str\
\ = \"milvus.ai-ml.svc.cluster.local\",\n collection_name: str = \"knowledge_base\"\
,\n top_k: int = 5\n) -> NamedTuple(\"RetrieveResult\", [(\"documents\"\
, list), (\"latency_s\", float)]):\n \"\"\"Retrieve relevant documents\
\ from Milvus vector database.\"\"\"\n import time\n from pymilvus\
\ import connections, Collection, utility\n from collections import namedtuple\n\
\n start = time.perf_counter()\n connections.connect(host=milvus_host,\
\ port=19530)\n\n if not utility.has_collection(collection_name):\n \
\ latency = time.perf_counter() - start\n RetrieveResult =\
\ namedtuple(\"RetrieveResult\", [\"documents\", \"latency_s\"])\n \
\ return RetrieveResult([], latency)\n\n collection = Collection(collection_name)\n\
\ collection.load()\n\n results = collection.search(\n data=[embedding],\n\
\ anns_field=\"embedding\",\n param={\"metric_type\": \"COSINE\"\
, \"params\": {\"nprobe\": 10}},\n limit=top_k,\n output_fields=[\"\
text\", \"source\"]\n )\n latency = time.perf_counter() - start\n\n\
\ documents = []\n for hits in results:\n for hit in hits:\n\
\ documents.append({\n \"text\": hit.entity.get(\"\
text\"),\n \"source\": hit.entity.get(\"source\"),\n \
\ \"score\": hit.distance\n })\n\n RetrieveResult\
\ = namedtuple(\"RetrieveResult\", [\"documents\", \"latency_s\"])\n \
\ return RetrieveResult(documents, latency)\n\n"
image: python:3.13-slim
exec-synthesize-speech:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- synthesize_speech
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef synthesize_speech(\n text: str,\n tts_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts\"\
\n) -> NamedTuple(\"TTSResult\", [(\"audio_b64\", str), (\"latency_s\",\
\ float)]):\n \"\"\"Convert text to speech using TTS service.\"\"\"\n\
\ import base64\n import time\n import httpx\n from collections\
\ import namedtuple\n\n start = time.perf_counter()\n with httpx.Client(timeout=120.0)\
\ as client:\n response = client.post(\n f\"{tts_url}/v1/audio/speech\"\
,\n json={\n \"input\": text,\n \
\ \"voice\": \"en_US-lessac-high\",\n \"response_format\"\
: \"wav\"\n }\n )\n audio_b64 = base64.b64encode(response.content).decode(\"\
utf-8\")\n latency = time.perf_counter() - start\n\n TTSResult = namedtuple(\"\
TTSResult\", [\"audio_b64\", \"latency_s\"])\n return TTSResult(audio_b64,\
\ latency)\n\n"
image: python:3.13-slim
exec-transcribe-audio:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- transcribe_audio
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef transcribe_audio(\n audio_b64: str,\n whisper_url: str\
\ = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/whisper\"\
\n) -> NamedTuple(\"STTResult\", [(\"text\", str), (\"latency_s\", float),\
\ (\"audio_duration_s\", float)]):\n \"\"\"Transcribe audio using Whisper\
\ STT service.\"\"\"\n import base64\n import time\n import httpx\n\
\ from collections import namedtuple\n\n audio_bytes = base64.b64decode(audio_b64)\n\
\n start = time.perf_counter()\n with httpx.Client(timeout=120.0)\
\ as client:\n response = client.post(\n f\"{whisper_url}/v1/audio/transcriptions\"\
,\n files={\"file\": (\"audio.wav\", audio_bytes, \"audio/wav\"\
)},\n data={\"model\": \"whisper-large-v3\", \"language\": \"\
en\"}\n )\n result = response.json()\n latency = time.perf_counter()\
\ - start\n\n text = result.get(\"text\", \"\")\n # Estimate audio\
\ duration from WAV header (16-bit PCM, 16kHz)\n audio_duration = max(len(audio_bytes)\
\ / (16000 * 2), 0.1)\n\n STTResult = namedtuple(\"STTResult\", [\"text\"\
, \"latency_s\", \"audio_duration_s\"])\n return STTResult(text, latency,\
\ audio_duration)\n\n"
image: python:3.13-slim
pipelineInfo:
description: 'End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus ->
Rerank -> LLM -> TTS. Logs per-step latency to MLflow.'
name: voice-assistant-rag-pipeline
root:
dag:
tasks:
generate-embeddings:
cachingOptions:
enableCache: true
componentRef:
name: comp-generate-embeddings
dependentTasks:
- transcribe-audio
inputs:
parameters:
text:
taskOutputParameter:
outputParameterKey: text
producerTask: transcribe-audio
taskInfo:
name: generate-embeddings
generate-response:
cachingOptions:
enableCache: true
componentRef:
name: comp-generate-response
dependentTasks:
- rerank-documents
- transcribe-audio
inputs:
parameters:
context:
taskOutputParameter:
outputParameterKey: documents
producerTask: rerank-documents
query:
taskOutputParameter:
outputParameterKey: text
producerTask: transcribe-audio
taskInfo:
name: generate-response
log-pipeline-metrics:
cachingOptions:
enableCache: true
componentRef:
name: comp-log-pipeline-metrics
dependentTasks:
- generate-embeddings
- generate-response
- rerank-documents
- retrieve-context
- synthesize-speech
- transcribe-audio
inputs:
parameters:
embed_latency:
taskOutputParameter:
outputParameterKey: latency_s
producerTask: generate-embeddings
llm_completion_tokens:
taskOutputParameter:
outputParameterKey: completion_tokens
producerTask: generate-response
llm_latency:
taskOutputParameter:
outputParameterKey: latency_s
producerTask: generate-response
rerank_latency:
taskOutputParameter:
outputParameterKey: latency_s
producerTask: rerank-documents
retrieve_latency:
taskOutputParameter:
outputParameterKey: latency_s
producerTask: retrieve-context
stt_audio_duration:
taskOutputParameter:
outputParameterKey: audio_duration_s
producerTask: transcribe-audio
stt_latency:
taskOutputParameter:
outputParameterKey: latency_s
producerTask: transcribe-audio
tts_latency:
taskOutputParameter:
outputParameterKey: latency_s
producerTask: synthesize-speech
taskInfo:
name: log-pipeline-metrics
rerank-documents:
cachingOptions:
enableCache: true
componentRef:
name: comp-rerank-documents
dependentTasks:
- retrieve-context
- transcribe-audio
inputs:
parameters:
documents:
taskOutputParameter:
outputParameterKey: documents
producerTask: retrieve-context
query:
taskOutputParameter:
outputParameterKey: text
producerTask: transcribe-audio
taskInfo:
name: rerank-documents
retrieve-context:
cachingOptions:
enableCache: true
componentRef:
name: comp-retrieve-context
dependentTasks:
- generate-embeddings
inputs:
parameters:
collection_name:
componentInputParameter: collection_name
embedding:
taskOutputParameter:
outputParameterKey: embedding
producerTask: generate-embeddings
taskInfo:
name: retrieve-context
synthesize-speech:
cachingOptions:
enableCache: true
componentRef:
name: comp-synthesize-speech
dependentTasks:
- generate-response
inputs:
parameters:
text:
taskOutputParameter:
outputParameterKey: text
producerTask: generate-response
taskInfo:
name: synthesize-speech
transcribe-audio:
cachingOptions: {}
componentRef:
name: comp-transcribe-audio
inputs:
parameters:
audio_b64:
componentInputParameter: audio_b64
taskInfo:
name: transcribe-audio
inputDefinitions:
parameters:
audio_b64:
description: Base64-encoded audio file
parameterType: STRING
collection_name:
defaultValue: knowledge_base
description: Milvus collection for RAG
isOptional: true
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.12.1