fix: ruff formatting, allow-direct-references, and noqa for Kubeflow pipeline vars
All checks were successful
CI / Lint (push) Successful in 51s
CI / Test (push) Successful in 55s
CI / Release (push) Successful in 7s
CI / Notify (push) Successful in 2s

This commit is contained in:
2026-02-02 08:44:14 -05:00
parent 58465b77d8
commit 0462412353
5 changed files with 208 additions and 237 deletions

View File

@@ -14,70 +14,58 @@ from kfp import dsl
from kfp import compiler
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["httpx"]
)
@dsl.component(base_image="python:3.13-slim", packages_to_install=["httpx"])
def transcribe_audio(
audio_b64: str,
whisper_url: str = "http://whisper-predictor.ai-ml.svc.cluster.local"
audio_b64: str, whisper_url: str = "http://whisper-predictor.ai-ml.svc.cluster.local"
) -> str:
"""Transcribe audio using Whisper STT service."""
import base64
import httpx
audio_bytes = base64.b64decode(audio_b64)
with httpx.Client(timeout=120.0) as client:
response = client.post(
f"{whisper_url}/v1/audio/transcriptions",
files={"file": ("audio.wav", audio_bytes, "audio/wav")},
data={"model": "whisper-large-v3", "language": "en"}
data={"model": "whisper-large-v3", "language": "en"},
)
result = response.json()
return result.get("text", "")
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["httpx"]
)
@dsl.component(base_image="python:3.13-slim", packages_to_install=["httpx"])
def generate_embeddings(
text: str,
embeddings_url: str = "http://embeddings-predictor.ai-ml.svc.cluster.local"
text: str, embeddings_url: str = "http://embeddings-predictor.ai-ml.svc.cluster.local"
) -> list:
"""Generate embeddings for RAG retrieval."""
import httpx
with httpx.Client(timeout=60.0) as client:
response = client.post(
f"{embeddings_url}/embeddings",
json={"input": text, "model": "bge-small-en-v1.5"}
f"{embeddings_url}/embeddings", json={"input": text, "model": "bge-small-en-v1.5"}
)
result = response.json()
return result["data"][0]["embedding"]
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["pymilvus"]
)
@dsl.component(base_image="python:3.13-slim", packages_to_install=["pymilvus"])
def retrieve_context(
embedding: list,
milvus_host: str = "milvus.ai-ml.svc.cluster.local",
collection_name: str = "knowledge_base",
top_k: int = 5
top_k: int = 5,
) -> list:
"""Retrieve relevant documents from Milvus vector database."""
from pymilvus import connections, Collection, utility
connections.connect(host=milvus_host, port=19530)
if not utility.has_collection(collection_name):
return []
collection = Collection(collection_name)
collection.load()
@@ -86,30 +74,29 @@ def retrieve_context(
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=top_k,
output_fields=["text", "source"]
output_fields=["text", "source"],
)
documents = []
for hits in results:
for hit in hits:
documents.append({
"text": hit.entity.get("text"),
"source": hit.entity.get("source"),
"score": hit.distance
})
documents.append(
{
"text": hit.entity.get("text"),
"source": hit.entity.get("source"),
"score": hit.distance,
}
)
return documents
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["httpx"]
)
@dsl.component(base_image="python:3.13-slim", packages_to_install=["httpx"])
def rerank_documents(
query: str,
documents: list,
reranker_url: str = "http://reranker-predictor.ai-ml.svc.cluster.local",
top_k: int = 3
top_k: int = 3,
) -> list:
"""Rerank documents using BGE reranker."""
import httpx
@@ -123,30 +110,25 @@ def rerank_documents(
json={
"query": query,
"documents": [doc["text"] for doc in documents],
"model": "bge-reranker-v2-m3"
}
"model": "bge-reranker-v2-m3",
},
)
result = response.json()
# Sort by rerank score
reranked = sorted(
zip(documents, result.get("scores", [0] * len(documents))),
key=lambda x: x[1],
reverse=True
zip(documents, result.get("scores", [0] * len(documents))), key=lambda x: x[1], reverse=True
)[:top_k]
return [doc for doc, score in reranked]
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["httpx"]
)
@dsl.component(base_image="python:3.13-slim", packages_to_install=["httpx"])
def generate_response(
query: str,
context: list,
vllm_url: str = "http://llm-draft.ai-ml.svc.cluster.local:8000",
model: str = "mistralai/Mistral-7B-Instruct-v0.3"
model: str = "mistralai/Mistral-7B-Instruct-v0.3",
) -> str:
"""Generate response using vLLM."""
import httpx
@@ -164,31 +146,22 @@ Keep responses concise and natural for speech synthesis."""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content}
{"role": "user", "content": user_content},
]
with httpx.Client(timeout=180.0) as client:
response = client.post(
f"{vllm_url}/v1/chat/completions",
json={
"model": model,
"messages": messages,
"max_tokens": 512,
"temperature": 0.7
}
json={"model": model, "messages": messages, "max_tokens": 512, "temperature": 0.7},
)
result = response.json()
return result["choices"][0]["message"]["content"]
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["httpx"]
)
@dsl.component(base_image="python:3.13-slim", packages_to_install=["httpx"])
def synthesize_speech(
text: str,
tts_url: str = "http://tts-predictor.ai-ml.svc.cluster.local"
text: str, tts_url: str = "http://tts-predictor.ai-ml.svc.cluster.local"
) -> str:
"""Convert text to speech using TTS service."""
import base64
@@ -197,11 +170,7 @@ def synthesize_speech(
with httpx.Client(timeout=120.0) as client:
response = client.post(
f"{tts_url}/v1/audio/speech",
json={
"input": text,
"voice": "en_US-lessac-high",
"response_format": "wav"
}
json={"input": text, "voice": "en_US-lessac-high", "response_format": "wav"},
)
audio_b64 = base64.b64encode(response.content).decode("utf-8")
@@ -210,20 +179,17 @@ def synthesize_speech(
@dsl.pipeline(
name="voice-assistant-rag-pipeline",
description="End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS"
description="End-to-end voice assistant with RAG: STT -> Embeddings -> Milvus -> Rerank -> LLM -> TTS",
)
def voice_assistant_pipeline(
audio_b64: str,
collection_name: str = "knowledge_base"
):
def voice_assistant_pipeline(audio_b64: str, collection_name: str = "knowledge_base"):
"""
Voice Assistant Pipeline with RAG
Args:
audio_b64: Base64-encoded audio file
collection_name: Milvus collection for RAG
"""
# Step 1: Transcribe audio with Whisper
transcribe_task = transcribe_audio(audio_b64=audio_b64)
transcribe_task.set_caching_options(enable_caching=False)
@@ -233,70 +199,47 @@ def voice_assistant_pipeline(
embed_task.set_caching_options(enable_caching=True)
# Step 3: Retrieve context from Milvus
retrieve_task = retrieve_context(
embedding=embed_task.output,
collection_name=collection_name
)
retrieve_task = retrieve_context(embedding=embed_task.output, collection_name=collection_name)
# Step 4: Rerank documents
rerank_task = rerank_documents(
query=transcribe_task.output,
documents=retrieve_task.output
)
rerank_task = rerank_documents(query=transcribe_task.output, documents=retrieve_task.output)
# Step 5: Generate response with context
llm_task = generate_response(
query=transcribe_task.output,
context=rerank_task.output
)
llm_task = generate_response(query=transcribe_task.output, context=rerank_task.output)
# Step 6: Synthesize speech
tts_task = synthesize_speech(text=llm_task.output)
tts_task = synthesize_speech(text=llm_task.output) # noqa: F841
@dsl.pipeline(
name="text-to-speech-pipeline",
description="Simple text to speech pipeline"
)
@dsl.pipeline(name="text-to-speech-pipeline", description="Simple text to speech pipeline")
def text_to_speech_pipeline(text: str):
"""Simple TTS pipeline for testing."""
tts_task = synthesize_speech(text=text)
tts_task = synthesize_speech(text=text) # noqa: F841
@dsl.pipeline(
name="rag-query-pipeline",
description="RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM"
name="rag-query-pipeline", description="RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM"
)
def rag_query_pipeline(
query: str,
collection_name: str = "knowledge_base"
):
def rag_query_pipeline(query: str, collection_name: str = "knowledge_base"):
"""
RAG Query Pipeline (text input, no voice)
Args:
query: Text query
collection_name: Milvus collection name
"""
# Embed the query
embed_task = generate_embeddings(text=query)
# Retrieve from Milvus
retrieve_task = retrieve_context(
embedding=embed_task.output,
collection_name=collection_name
)
retrieve_task = retrieve_context(embedding=embed_task.output, collection_name=collection_name)
# Rerank
rerank_task = rerank_documents(
query=query,
documents=retrieve_task.output
)
rerank_task = rerank_documents(query=query, documents=retrieve_task.output)
# Generate response
llm_task = generate_response(
query=query,
context=rerank_task.output
llm_task = generate_response( # noqa: F841
query=query, context=rerank_task.output
)
@@ -307,10 +250,10 @@ if __name__ == "__main__":
("tts_pipeline.yaml", text_to_speech_pipeline),
("rag_pipeline.yaml", rag_query_pipeline),
]
for filename, pipeline_func in pipelines:
compiler.Compiler().compile(pipeline_func, filename)
print(f"Compiled: {filename}")
print("\nUpload these YAML files to Kubeflow Pipelines UI at:")
print(" http://kubeflow.example.com/pipelines")