#!/usr/bin/env python3 """ Voice Pipeline - Kubeflow Pipelines SDK Compile this to create a Kubeflow Pipeline for voice assistant workflows. Usage: pip install kfp==2.12.1 python voice_pipeline.py # Upload voice_pipeline.yaml to Kubeflow Pipelines UI """ from kfp import dsl from kfp import compiler from typing import NamedTuple MLFLOW_IMAGE = "python:3.13-slim" MLFLOW_PACKAGES = ["mlflow>=2.10.0", "boto3", "psycopg2-binary"] @dsl.component( base_image="python:3.13-slim", packages_to_install=["httpx"] ) def transcribe_audio( audio_b64: str, whisper_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/whisper" ) -> NamedTuple("STTResult", [("text", str), ("latency_s", float), ("audio_duration_s", float)]): """Transcribe audio using Whisper STT service.""" import base64 import time import httpx from collections import namedtuple audio_bytes = base64.b64decode(audio_b64) start = time.perf_counter() with httpx.Client(timeout=120.0) as client: response = client.post( f"{whisper_url}/v1/audio/transcriptions", files={"file": ("audio.wav", audio_bytes, "audio/wav")}, data={"model": "whisper-large-v3", "language": "en"} ) result = response.json() latency = time.perf_counter() - start text = result.get("text", "") # Estimate audio duration from WAV header (16-bit PCM, 16kHz) audio_duration = max(len(audio_bytes) / (16000 * 2), 0.1) STTResult = namedtuple("STTResult", ["text", "latency_s", "audio_duration_s"]) return STTResult(text, latency, audio_duration) @dsl.component( base_image="python:3.13-slim", packages_to_install=["httpx"] ) def generate_embeddings( text: str, embeddings_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings" ) -> NamedTuple("EmbedResult", [("embedding", list), ("latency_s", float)]): """Generate embeddings for RAG retrieval.""" import time import httpx from collections import namedtuple start = time.perf_counter() with httpx.Client(timeout=60.0) as client: response = client.post( f"{embeddings_url}/embeddings", json={"input": text, "model": "bge-small-en-v1.5"} ) result = response.json() latency = time.perf_counter() - start EmbedResult = namedtuple("EmbedResult", ["embedding", "latency_s"]) return EmbedResult(result["data"][0]["embedding"], latency) @dsl.component( base_image="python:3.13-slim", packages_to_install=["pymilvus"] ) def retrieve_context( embedding: list, milvus_host: str = "milvus.ai-ml.svc.cluster.local", collection_name: str = "knowledge_base", top_k: int = 5 ) -> NamedTuple("RetrieveResult", [("documents", list), ("latency_s", float)]): """Retrieve relevant documents from Milvus vector database.""" import time from pymilvus import connections, Collection, utility from collections import namedtuple start = time.perf_counter() connections.connect(host=milvus_host, port=19530) if not utility.has_collection(collection_name): latency = time.perf_counter() - start RetrieveResult = namedtuple("RetrieveResult", ["documents", "latency_s"]) return RetrieveResult([], latency) collection = Collection(collection_name) collection.load() results = collection.search( data=[embedding], anns_field="embedding", param={"metric_type": "COSINE", "params": {"nprobe": 10}}, limit=top_k, output_fields=["text", "source"] ) latency = time.perf_counter() - start documents = [] for hits in results: for hit in hits: documents.append({ "text": hit.entity.get("text"), "source": hit.entity.get("source"), "score": hit.distance }) RetrieveResult = namedtuple("RetrieveResult", ["documents", "latency_s"]) return RetrieveResult(documents, latency) @dsl.component( base_image="python:3.13-slim", packages_to_install=["httpx"] ) def rerank_documents( query: str, documents: list, reranker_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker", top_k: int = 3 ) -> NamedTuple("RerankResult", [("documents", list), ("latency_s", float)]): """Rerank documents using BGE reranker.""" import time import httpx from collections import namedtuple if not documents: RerankResult = namedtuple("RerankResult", ["documents", "latency_s"]) return RerankResult([], 0.0) start = time.perf_counter() with httpx.Client(timeout=60.0) as client: response = client.post( f"{reranker_url}/v1/rerank", json={ "query": query, "documents": [doc["text"] for doc in documents], "model": "bge-reranker-v2-m3" } ) result = response.json() latency = time.perf_counter() - start # Sort by rerank score reranked = sorted( zip(documents, result.get("scores", [0] * len(documents))), key=lambda x: x[1], reverse=True )[:top_k] RerankResult = namedtuple("RerankResult", ["documents", "latency_s"]) return RerankResult([doc for doc, score in reranked], latency) @dsl.component( base_image="python:3.13-slim", packages_to_install=["httpx"] ) def generate_response( query: str, context: list, vllm_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm", model: str = "hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4" ) -> NamedTuple("LLMResult", [("text", str), ("latency_s", float), ("completion_tokens", int)]): """Generate response using vLLM.""" import time import httpx from collections import namedtuple # Build context if context: context_text = "\n\n".join([doc["text"] for doc in context]) user_content = f"Context:\n{context_text}\n\nQuestion: {query}" else: user_content = query system_prompt = """You are a helpful voice assistant. Answer questions based on the provided context when available. Keep responses concise and natural for speech synthesis.""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ] start = time.perf_counter() with httpx.Client(timeout=180.0) as client: response = client.post( f"{vllm_url}/v1/chat/completions", json={ "model": model, "messages": messages, "max_tokens": 512, "temperature": 0.7 } ) result = response.json() latency = time.perf_counter() - start text = result["choices"][0]["message"]["content"] usage = result.get("usage", {}) completion_tokens = usage.get("completion_tokens", len(text.split())) LLMResult = namedtuple("LLMResult", ["text", "latency_s", "completion_tokens"]) return LLMResult(text, latency, completion_tokens) @dsl.component( base_image="python:3.13-slim", packages_to_install=["httpx"] ) def synthesize_speech( text: str, tts_url: str = "http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/tts" ) -> NamedTuple("TTSResult", [("audio_b64", str), ("latency_s", float)]): """Convert text to speech using TTS service.""" import base64 import time import httpx from collections import namedtuple start = time.perf_counter() with httpx.Client(timeout=120.0) as client: response = client.post( f"{tts_url}/v1/audio/speech", json={ "input": text, "voice": "en_US-lessac-high", "response_format": "wav" } ) audio_b64 = base64.b64encode(response.content).decode("utf-8") latency = time.perf_counter() - start TTSResult = namedtuple("TTSResult", ["audio_b64", "latency_s"]) return TTSResult(audio_b64, latency) # ---- MLflow logging component ---- @dsl.component(base_image=MLFLOW_IMAGE, packages_to_install=MLFLOW_PACKAGES) def log_pipeline_metrics( stt_latency: float, stt_audio_duration: float, embed_latency: float, retrieve_latency: float, rerank_latency: float, llm_latency: float, llm_completion_tokens: int, tts_latency: float, experiment_name: str = "voice-pipeline-metrics", run_name: str = "voice-pipeline", mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", ) -> str: """Log per-step latency metrics to MLflow for the full voice pipeline.""" import os import mlflow from mlflow.tracking import MlflowClient mlflow.set_tracking_uri(mlflow_tracking_uri) client = MlflowClient() exp = client.get_experiment_by_name(experiment_name) experiment_id = ( exp.experiment_id if exp else client.create_experiment( name=experiment_name, artifact_location=f"/mlflow/artifacts/{experiment_name}", ) ) run = mlflow.start_run( experiment_id=experiment_id, run_name=run_name, tags={ "pipeline.type": "voice-assistant", "kfp.run_id": os.environ.get("KFP_RUN_ID", "unknown"), }, ) total_latency = ( stt_latency + embed_latency + retrieve_latency + rerank_latency + llm_latency + tts_latency ) stt_rtf = stt_latency / stt_audio_duration if stt_audio_duration > 0 else 0 llm_tps = llm_completion_tokens / llm_latency if llm_latency > 0 else 0 mlflow.log_metrics({ "stt_latency_s": stt_latency, "stt_audio_duration_s": stt_audio_duration, "stt_realtime_factor": stt_rtf, "embed_latency_s": embed_latency, "retrieve_latency_s": retrieve_latency, "rerank_latency_s": rerank_latency, "llm_latency_s": llm_latency, "llm_completion_tokens": llm_completion_tokens, "llm_tokens_per_second": llm_tps, "tts_latency_s": tts_latency, "total_pipeline_latency_s": total_latency, }) mlflow.end_run() return run.info.run_id # ---- Pipelines ---- @dsl.pipeline( name="voice-assistant-rag-pipeline", description="End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS. Logs per-step latency to MLflow." ) def voice_assistant_pipeline( audio_b64: str, collection_name: str = "knowledge_base" ): """ Voice Assistant Pipeline with RAG Args: audio_b64: Base64-encoded audio file collection_name: Milvus collection for RAG """ # Step 1: Transcribe audio with Whisper transcribe_task = transcribe_audio(audio_b64=audio_b64) transcribe_task.set_caching_options(enable_caching=False) # Step 2: Generate embeddings embed_task = generate_embeddings(text=transcribe_task.outputs["text"]) embed_task.set_caching_options(enable_caching=True) # Step 3: Retrieve context from Milvus retrieve_task = retrieve_context( embedding=embed_task.outputs["embedding"], collection_name=collection_name ) # Step 4: Rerank documents rerank_task = rerank_documents( query=transcribe_task.outputs["text"], documents=retrieve_task.outputs["documents"] ) # Step 5: Generate response with context llm_task = generate_response( query=transcribe_task.outputs["text"], context=rerank_task.outputs["documents"] ) # Step 6: Synthesize speech tts_task = synthesize_speech(text=llm_task.outputs["text"]) # Step 7: Log all per-step latencies to MLflow log_task = log_pipeline_metrics( stt_latency=transcribe_task.outputs["latency_s"], stt_audio_duration=transcribe_task.outputs["audio_duration_s"], embed_latency=embed_task.outputs["latency_s"], retrieve_latency=retrieve_task.outputs["latency_s"], rerank_latency=rerank_task.outputs["latency_s"], llm_latency=llm_task.outputs["latency_s"], llm_completion_tokens=llm_task.outputs["completion_tokens"], tts_latency=tts_task.outputs["latency_s"], ) @dsl.pipeline( name="text-to-speech-pipeline", description="Simple text to speech pipeline" ) def text_to_speech_pipeline(text: str): """Simple TTS pipeline for testing.""" tts_task = synthesize_speech(text=text) @dsl.pipeline( name="rag-query-pipeline", description="RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM. Logs per-step latency to MLflow." ) def rag_query_pipeline( query: str, collection_name: str = "knowledge_base" ): """ RAG Query Pipeline (text input, no voice) Args: query: Text query collection_name: Milvus collection name """ # Embed the query embed_task = generate_embeddings(text=query) # Retrieve from Milvus retrieve_task = retrieve_context( embedding=embed_task.outputs["embedding"], collection_name=collection_name ) # Rerank rerank_task = rerank_documents( query=query, documents=retrieve_task.outputs["documents"] ) # Generate response llm_task = generate_response( query=query, context=rerank_task.outputs["documents"] ) if __name__ == "__main__": # Compile all pipelines pipelines = [ ("voice_pipeline.yaml", voice_assistant_pipeline), ("tts_pipeline.yaml", text_to_speech_pipeline), ("rag_pipeline.yaml", rag_query_pipeline), ] for filename, pipeline_func in pipelines: compiler.Compiler().compile(pipeline_func, filename) print(f"Compiled: {filename}") print("\nUpload these YAML files to Kubeflow Pipelines UI at:") print(" http://kubeflow.example.com/pipelines")