260 lines
7.9 KiB
Python
260 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Voice Pipeline - Kubeflow Pipelines SDK
|
|
|
|
Compile this to create a Kubeflow Pipeline for voice assistant workflows.
|
|
|
|
Usage:
|
|
pip install kfp==2.12.1
|
|
python voice_pipeline.py
|
|
# Upload voice_pipeline.yaml to Kubeflow Pipelines UI
|
|
"""
|
|
|
|
from kfp import dsl
|
|
from kfp import compiler
|
|
|
|
|
|
@dsl.component(base_image="python:3.13-slim", packages_to_install=["httpx"])
|
|
def transcribe_audio(
|
|
audio_b64: str, whisper_url: str = "http://whisper-predictor.ai-ml.svc.cluster.local"
|
|
) -> str:
|
|
"""Transcribe audio using Whisper STT service."""
|
|
import base64
|
|
import httpx
|
|
|
|
audio_bytes = base64.b64decode(audio_b64)
|
|
|
|
with httpx.Client(timeout=120.0) as client:
|
|
response = client.post(
|
|
f"{whisper_url}/v1/audio/transcriptions",
|
|
files={"file": ("audio.wav", audio_bytes, "audio/wav")},
|
|
data={"model": "whisper-large-v3", "language": "en"},
|
|
)
|
|
result = response.json()
|
|
|
|
return result.get("text", "")
|
|
|
|
|
|
@dsl.component(base_image="python:3.13-slim", packages_to_install=["httpx"])
|
|
def generate_embeddings(
|
|
text: str, embeddings_url: str = "http://embeddings-predictor.ai-ml.svc.cluster.local"
|
|
) -> list:
|
|
"""Generate embeddings for RAG retrieval."""
|
|
import httpx
|
|
|
|
with httpx.Client(timeout=60.0) as client:
|
|
response = client.post(
|
|
f"{embeddings_url}/embeddings", json={"input": text, "model": "bge-small-en-v1.5"}
|
|
)
|
|
result = response.json()
|
|
|
|
return result["data"][0]["embedding"]
|
|
|
|
|
|
@dsl.component(base_image="python:3.13-slim", packages_to_install=["pymilvus"])
|
|
def retrieve_context(
|
|
embedding: list,
|
|
milvus_host: str = "milvus.ai-ml.svc.cluster.local",
|
|
collection_name: str = "knowledge_base",
|
|
top_k: int = 5,
|
|
) -> list:
|
|
"""Retrieve relevant documents from Milvus vector database."""
|
|
from pymilvus import connections, Collection, utility
|
|
|
|
connections.connect(host=milvus_host, port=19530)
|
|
|
|
if not utility.has_collection(collection_name):
|
|
return []
|
|
|
|
collection = Collection(collection_name)
|
|
collection.load()
|
|
|
|
results = collection.search(
|
|
data=[embedding],
|
|
anns_field="embedding",
|
|
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
|
|
limit=top_k,
|
|
output_fields=["text", "source"],
|
|
)
|
|
|
|
documents = []
|
|
for hits in results:
|
|
for hit in hits:
|
|
documents.append(
|
|
{
|
|
"text": hit.entity.get("text"),
|
|
"source": hit.entity.get("source"),
|
|
"score": hit.distance,
|
|
}
|
|
)
|
|
|
|
return documents
|
|
|
|
|
|
@dsl.component(base_image="python:3.13-slim", packages_to_install=["httpx"])
|
|
def rerank_documents(
|
|
query: str,
|
|
documents: list,
|
|
reranker_url: str = "http://reranker-predictor.ai-ml.svc.cluster.local",
|
|
top_k: int = 3,
|
|
) -> list:
|
|
"""Rerank documents using BGE reranker."""
|
|
import httpx
|
|
|
|
if not documents:
|
|
return []
|
|
|
|
with httpx.Client(timeout=60.0) as client:
|
|
response = client.post(
|
|
f"{reranker_url}/v1/rerank",
|
|
json={
|
|
"query": query,
|
|
"documents": [doc["text"] for doc in documents],
|
|
"model": "bge-reranker-v2-m3",
|
|
},
|
|
)
|
|
result = response.json()
|
|
|
|
# Sort by rerank score
|
|
reranked = sorted(
|
|
zip(documents, result.get("scores", [0] * len(documents))), key=lambda x: x[1], reverse=True
|
|
)[:top_k]
|
|
|
|
return [doc for doc, score in reranked]
|
|
|
|
|
|
@dsl.component(base_image="python:3.13-slim", packages_to_install=["httpx"])
|
|
def generate_response(
|
|
query: str,
|
|
context: list,
|
|
vllm_url: str = "http://llm-draft.ai-ml.svc.cluster.local:8000",
|
|
model: str = "mistralai/Mistral-7B-Instruct-v0.3",
|
|
) -> str:
|
|
"""Generate response using vLLM."""
|
|
import httpx
|
|
|
|
# Build context
|
|
if context:
|
|
context_text = "\n\n".join([doc["text"] for doc in context])
|
|
user_content = f"Context:\n{context_text}\n\nQuestion: {query}"
|
|
else:
|
|
user_content = query
|
|
|
|
system_prompt = """You are a helpful voice assistant.
|
|
Answer questions based on the provided context when available.
|
|
Keep responses concise and natural for speech synthesis."""
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_content},
|
|
]
|
|
|
|
with httpx.Client(timeout=180.0) as client:
|
|
response = client.post(
|
|
f"{vllm_url}/v1/chat/completions",
|
|
json={"model": model, "messages": messages, "max_tokens": 512, "temperature": 0.7},
|
|
)
|
|
result = response.json()
|
|
|
|
return result["choices"][0]["message"]["content"]
|
|
|
|
|
|
@dsl.component(base_image="python:3.13-slim", packages_to_install=["httpx"])
|
|
def synthesize_speech(
|
|
text: str, tts_url: str = "http://tts-predictor.ai-ml.svc.cluster.local"
|
|
) -> str:
|
|
"""Convert text to speech using TTS service."""
|
|
import base64
|
|
import httpx
|
|
|
|
with httpx.Client(timeout=120.0) as client:
|
|
response = client.post(
|
|
f"{tts_url}/v1/audio/speech",
|
|
json={"input": text, "voice": "en_US-lessac-high", "response_format": "wav"},
|
|
)
|
|
audio_b64 = base64.b64encode(response.content).decode("utf-8")
|
|
|
|
return audio_b64
|
|
|
|
|
|
@dsl.pipeline(
|
|
name="voice-assistant-rag-pipeline",
|
|
description="End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS",
|
|
)
|
|
def voice_assistant_pipeline(audio_b64: str, collection_name: str = "knowledge_base"):
|
|
"""
|
|
Voice Assistant Pipeline with RAG
|
|
|
|
Args:
|
|
audio_b64: Base64-encoded audio file
|
|
collection_name: Milvus collection for RAG
|
|
"""
|
|
|
|
# Step 1: Transcribe audio with Whisper
|
|
transcribe_task = transcribe_audio(audio_b64=audio_b64)
|
|
transcribe_task.set_caching_options(enable_caching=False)
|
|
|
|
# Step 2: Generate embeddings
|
|
embed_task = generate_embeddings(text=transcribe_task.output)
|
|
embed_task.set_caching_options(enable_caching=True)
|
|
|
|
# Step 3: Retrieve context from Milvus
|
|
retrieve_task = retrieve_context(embedding=embed_task.output, collection_name=collection_name)
|
|
|
|
# Step 4: Rerank documents
|
|
rerank_task = rerank_documents(query=transcribe_task.output, documents=retrieve_task.output)
|
|
|
|
# Step 5: Generate response with context
|
|
llm_task = generate_response(query=transcribe_task.output, context=rerank_task.output)
|
|
|
|
# Step 6: Synthesize speech
|
|
tts_task = synthesize_speech(text=llm_task.output) # noqa: F841
|
|
|
|
|
|
@dsl.pipeline(name="text-to-speech-pipeline", description="Simple text to speech pipeline")
|
|
def text_to_speech_pipeline(text: str):
|
|
"""Simple TTS pipeline for testing."""
|
|
tts_task = synthesize_speech(text=text) # noqa: F841
|
|
|
|
|
|
@dsl.pipeline(
|
|
name="rag-query-pipeline", description="RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM"
|
|
)
|
|
def rag_query_pipeline(query: str, collection_name: str = "knowledge_base"):
|
|
"""
|
|
RAG Query Pipeline (text input, no voice)
|
|
|
|
Args:
|
|
query: Text query
|
|
collection_name: Milvus collection name
|
|
"""
|
|
# Embed the query
|
|
embed_task = generate_embeddings(text=query)
|
|
|
|
# Retrieve from Milvus
|
|
retrieve_task = retrieve_context(embedding=embed_task.output, collection_name=collection_name)
|
|
|
|
# Rerank
|
|
rerank_task = rerank_documents(query=query, documents=retrieve_task.output)
|
|
|
|
# Generate response
|
|
llm_task = generate_response( # noqa: F841
|
|
query=query, context=rerank_task.output
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Compile all pipelines
|
|
pipelines = [
|
|
("voice_pipeline.yaml", voice_assistant_pipeline),
|
|
("tts_pipeline.yaml", text_to_speech_pipeline),
|
|
("rag_pipeline.yaml", rag_query_pipeline),
|
|
]
|
|
|
|
for filename, pipeline_func in pipelines:
|
|
compiler.Compiler().compile(pipeline_func, filename)
|
|
print(f"Compiled: {filename}")
|
|
|
|
print("\nUpload these YAML files to Kubeflow Pipelines UI at:")
|
|
print(" http://kubeflow.example.com/pipelines")
|