# PIPELINE DEFINITION # Name: rag-query-pipeline # Description: RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM. Logs per-step latency to MLflow. # Inputs: # collection_name: str [Default: 'knowledge_base'] # query: str components: comp-generate-embeddings: executorLabel: exec-generate-embeddings inputDefinitions: parameters: embeddings_url: defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings isOptional: true parameterType: STRING text: parameterType: STRING outputDefinitions: parameters: embedding: parameterType: LIST latency_s: parameterType: NUMBER_DOUBLE comp-generate-response: executorLabel: exec-generate-response inputDefinitions: parameters: context: parameterType: LIST model: defaultValue: hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4 isOptional: true parameterType: STRING query: parameterType: STRING vllm_url: defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm isOptional: true parameterType: STRING outputDefinitions: parameters: completion_tokens: parameterType: NUMBER_INTEGER latency_s: parameterType: NUMBER_DOUBLE text: parameterType: STRING comp-rerank-documents: executorLabel: exec-rerank-documents inputDefinitions: parameters: documents: parameterType: LIST query: parameterType: STRING reranker_url: defaultValue: http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker isOptional: true parameterType: STRING top_k: defaultValue: 3.0 isOptional: true parameterType: NUMBER_INTEGER outputDefinitions: parameters: documents: parameterType: LIST latency_s: parameterType: NUMBER_DOUBLE comp-retrieve-context: executorLabel: exec-retrieve-context inputDefinitions: parameters: collection_name: defaultValue: knowledge_base isOptional: true parameterType: STRING embedding: parameterType: LIST milvus_host: defaultValue: milvus.ai-ml.svc.cluster.local isOptional: true parameterType: STRING top_k: defaultValue: 5.0 isOptional: true parameterType: NUMBER_INTEGER outputDefinitions: parameters: documents: parameterType: LIST latency_s: parameterType: NUMBER_DOUBLE deploymentSpec: executors: exec-generate-embeddings: container: args: - --executor_input - '{{$}}' - --function_to_execute - generate_embeddings command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef generate_embeddings(\n text: str,\n embeddings_url: str\ \ = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/embeddings\"\ \n) -> NamedTuple(\"EmbedResult\", [(\"embedding\", list), (\"latency_s\"\ , float)]):\n \"\"\"Generate embeddings for RAG retrieval.\"\"\"\n \ \ import time\n import httpx\n from collections import namedtuple\n\ \n start = time.perf_counter()\n with httpx.Client(timeout=60.0) as\ \ client:\n response = client.post(\n f\"{embeddings_url}/embeddings\"\ ,\n json={\"input\": text, \"model\": \"bge-small-en-v1.5\"}\n\ \ )\n result = response.json()\n latency = time.perf_counter()\ \ - start\n\n EmbedResult = namedtuple(\"EmbedResult\", [\"embedding\"\ , \"latency_s\"])\n return EmbedResult(result[\"data\"][0][\"embedding\"\ ], latency)\n\n" image: python:3.13-slim exec-generate-response: container: args: - --executor_input - '{{$}}' - --function_to_execute - generate_response command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef generate_response(\n query: str,\n context: list,\n \ \ vllm_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/llm\"\ ,\n model: str = \"hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4\"\ \n) -> NamedTuple(\"LLMResult\", [(\"text\", str), (\"latency_s\", float),\ \ (\"completion_tokens\", int)]):\n \"\"\"Generate response using vLLM.\"\ \"\"\n import time\n import httpx\n from collections import namedtuple\n\ \n # Build context\n if context:\n context_text = \"\\n\\n\"\ .join([doc[\"text\"] for doc in context])\n user_content = f\"Context:\\\ n{context_text}\\n\\nQuestion: {query}\"\n else:\n user_content\ \ = query\n\n system_prompt = \"\"\"You are a helpful voice assistant.\n\ Answer questions based on the provided context when available.\nKeep responses\ \ concise and natural for speech synthesis.\"\"\"\n\n messages = [\n\ \ {\"role\": \"system\", \"content\": system_prompt},\n {\"\ role\": \"user\", \"content\": user_content}\n ]\n\n start = time.perf_counter()\n\ \ with httpx.Client(timeout=180.0) as client:\n response = client.post(\n\ \ f\"{vllm_url}/v1/chat/completions\",\n json={\n\ \ \"model\": model,\n \"messages\": messages,\n\ \ \"max_tokens\": 512,\n \"temperature\":\ \ 0.7\n }\n )\n result = response.json()\n latency\ \ = time.perf_counter() - start\n\n text = result[\"choices\"][0][\"\ message\"][\"content\"]\n usage = result.get(\"usage\", {})\n completion_tokens\ \ = usage.get(\"completion_tokens\", len(text.split()))\n\n LLMResult\ \ = namedtuple(\"LLMResult\", [\"text\", \"latency_s\", \"completion_tokens\"\ ])\n return LLMResult(text, latency, completion_tokens)\n\n" image: python:3.13-slim exec-rerank-documents: container: args: - --executor_input - '{{$}}' - --function_to_execute - rerank_documents command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'httpx' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef rerank_documents(\n query: str,\n documents: list,\n \ \ reranker_url: str = \"http://ai-inference-serve-svc.ai-ml.svc.cluster.local:8000/reranker\"\ ,\n top_k: int = 3\n) -> NamedTuple(\"RerankResult\", [(\"documents\"\ , list), (\"latency_s\", float)]):\n \"\"\"Rerank documents using BGE\ \ reranker.\"\"\"\n import time\n import httpx\n from collections\ \ import namedtuple\n\n if not documents:\n RerankResult = namedtuple(\"\ RerankResult\", [\"documents\", \"latency_s\"])\n return RerankResult([],\ \ 0.0)\n\n start = time.perf_counter()\n with httpx.Client(timeout=60.0)\ \ as client:\n response = client.post(\n f\"{reranker_url}/v1/rerank\"\ ,\n json={\n \"query\": query,\n \ \ \"documents\": [doc[\"text\"] for doc in documents],\n \ \ \"model\": \"bge-reranker-v2-m3\"\n }\n )\n \ \ result = response.json()\n latency = time.perf_counter() - start\n\ \n # Sort by rerank score\n reranked = sorted(\n zip(documents,\ \ result.get(\"scores\", [0] * len(documents))),\n key=lambda x:\ \ x[1],\n reverse=True\n )[:top_k]\n\n RerankResult = namedtuple(\"\ RerankResult\", [\"documents\", \"latency_s\"])\n return RerankResult([doc\ \ for doc, score in reranked], latency)\n\n" image: python:3.13-slim exec-retrieve-context: container: args: - --executor_input - '{{$}}' - --function_to_execute - retrieve_context command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'pymilvus' &&\ \ \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef retrieve_context(\n embedding: list,\n milvus_host: str\ \ = \"milvus.ai-ml.svc.cluster.local\",\n collection_name: str = \"knowledge_base\"\ ,\n top_k: int = 5\n) -> NamedTuple(\"RetrieveResult\", [(\"documents\"\ , list), (\"latency_s\", float)]):\n \"\"\"Retrieve relevant documents\ \ from Milvus vector database.\"\"\"\n import time\n from pymilvus\ \ import connections, Collection, utility\n from collections import namedtuple\n\ \n start = time.perf_counter()\n connections.connect(host=milvus_host,\ \ port=19530)\n\n if not utility.has_collection(collection_name):\n \ \ latency = time.perf_counter() - start\n RetrieveResult =\ \ namedtuple(\"RetrieveResult\", [\"documents\", \"latency_s\"])\n \ \ return RetrieveResult([], latency)\n\n collection = Collection(collection_name)\n\ \ collection.load()\n\n results = collection.search(\n data=[embedding],\n\ \ anns_field=\"embedding\",\n param={\"metric_type\": \"COSINE\"\ , \"params\": {\"nprobe\": 10}},\n limit=top_k,\n output_fields=[\"\ text\", \"source\"]\n )\n latency = time.perf_counter() - start\n\n\ \ documents = []\n for hits in results:\n for hit in hits:\n\ \ documents.append({\n \"text\": hit.entity.get(\"\ text\"),\n \"source\": hit.entity.get(\"source\"),\n \ \ \"score\": hit.distance\n })\n\n RetrieveResult\ \ = namedtuple(\"RetrieveResult\", [\"documents\", \"latency_s\"])\n \ \ return RetrieveResult(documents, latency)\n\n" image: python:3.13-slim pipelineInfo: description: 'RAG query pipeline: Embed -> Retrieve -> Rerank -> LLM. Logs per-step latency to MLflow.' name: rag-query-pipeline root: dag: tasks: generate-embeddings: cachingOptions: enableCache: true componentRef: name: comp-generate-embeddings inputs: parameters: text: componentInputParameter: query taskInfo: name: generate-embeddings generate-response: cachingOptions: enableCache: true componentRef: name: comp-generate-response dependentTasks: - rerank-documents inputs: parameters: context: taskOutputParameter: outputParameterKey: documents producerTask: rerank-documents query: componentInputParameter: query taskInfo: name: generate-response rerank-documents: cachingOptions: enableCache: true componentRef: name: comp-rerank-documents dependentTasks: - retrieve-context inputs: parameters: documents: taskOutputParameter: outputParameterKey: documents producerTask: retrieve-context query: componentInputParameter: query taskInfo: name: rerank-documents retrieve-context: cachingOptions: enableCache: true componentRef: name: comp-retrieve-context dependentTasks: - generate-embeddings inputs: parameters: collection_name: componentInputParameter: collection_name embedding: taskOutputParameter: outputParameterKey: embedding producerTask: generate-embeddings taskInfo: name: retrieve-context inputDefinitions: parameters: collection_name: defaultValue: knowledge_base description: Milvus collection name isOptional: true parameterType: STRING query: description: Text query parameterType: STRING schemaVersion: 2.1.0 sdkVersion: kfp-2.12.1