# Document Ingestion Workflow # Ingests documents from a source URL into Milvus vector database # Triggered via NATS: ai.pipeline.trigger with pipeline="document-ingestion" --- apiVersion: argoproj.io/v1alpha1 kind: WorkflowTemplate metadata: name: document-ingestion namespace: ai-ml labels: app.kubernetes.io/name: document-ingestion app.kubernetes.io/part-of: llm-workflows spec: entrypoint: ingest-documents serviceAccountName: argo-workflow arguments: parameters: - name: source-url description: "URL to fetch documents from (S3, HTTP, or local path)" - name: collection-name value: "knowledge_base" description: "Milvus collection name" - name: chunk-size value: "512" description: "Text chunk size in characters" - name: chunk-overlap value: "50" description: "Overlap between chunks" templates: - name: ingest-documents dag: tasks: - name: fetch-documents template: fetch-docs arguments: parameters: - name: source-url value: "{{workflow.parameters.source-url}}" - name: chunk-documents template: chunk-docs dependencies: [fetch-documents] arguments: parameters: - name: chunk-size value: "{{workflow.parameters.chunk-size}}" - name: chunk-overlap value: "{{workflow.parameters.chunk-overlap}}" artifacts: - name: documents from: "{{tasks.fetch-documents.outputs.artifacts.documents}}" - name: generate-embeddings template: embed-docs dependencies: [chunk-documents] arguments: artifacts: - name: chunks from: "{{tasks.chunk-documents.outputs.artifacts.chunks}}" - name: store-in-milvus template: store-docs dependencies: [generate-embeddings] arguments: parameters: - name: collection-name value: "{{workflow.parameters.collection-name}}" artifacts: - name: embeddings from: "{{tasks.generate-embeddings.outputs.artifacts.embeddings}}" - name: fetch-docs inputs: parameters: - name: source-url outputs: artifacts: - name: documents path: /tmp/documents container: image: python:3.13-slim command: [python] args: - -c - | import json import os import urllib.request from pathlib import Path source_url = "{{inputs.parameters.source-url}}" output_dir = Path("/tmp/documents") output_dir.mkdir(parents=True, exist_ok=True) print(f"Fetching documents from: {source_url}") # Handle different source types if source_url.startswith("s3://"): import subprocess subprocess.run(["pip", "install", "boto3", "-q"], check=True) import boto3 s3 = boto3.client("s3") bucket, prefix = source_url[5:].split("/", 1) response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) for obj in response.get("Contents", []): key = obj["Key"] local_path = output_dir / Path(key).name s3.download_file(bucket, key, str(local_path)) print(f"Downloaded: {key}") elif source_url.startswith("http"): # Single file download filename = source_url.split("/")[-1] or "document.txt" local_path = output_dir / filename urllib.request.urlretrieve(source_url, local_path) print(f"Downloaded: {filename}") else: print(f"Unsupported URL scheme: {source_url}") exit(1) # List downloaded files files = list(output_dir.glob("*")) print(f"Downloaded {len(files)} files") # Create manifest manifest = {"files": [str(f) for f in files]} with open(output_dir / "manifest.json", "w") as f: json.dump(manifest, f) resources: requests: memory: 256Mi cpu: 100m - name: chunk-docs inputs: parameters: - name: chunk-size - name: chunk-overlap artifacts: - name: documents path: /tmp/documents outputs: artifacts: - name: chunks path: /tmp/chunks container: image: python:3.13-slim command: [python] args: - -c - | import json from pathlib import Path chunk_size = int("{{inputs.parameters.chunk-size}}") chunk_overlap = int("{{inputs.parameters.chunk-overlap}}") input_dir = Path("/tmp/documents") output_dir = Path("/tmp/chunks") output_dir.mkdir(parents=True, exist_ok=True) # Load manifest with open(input_dir / "manifest.json") as f: manifest = json.load(f) all_chunks = [] for filepath in manifest["files"]: filepath = Path(filepath) if not filepath.exists(): continue print(f"Processing: {filepath.name}") # Read file content try: with open(filepath, "r", encoding="utf-8") as f: content = f.read() except Exception as e: print(f"Error reading {filepath}: {e}") continue # Simple chunking chunks = [] start = 0 while start < len(content): end = start + chunk_size chunk = content[start:end] if chunk.strip(): chunks.append({ "text": chunk, "source": filepath.name, "chunk_index": len(chunks) }) start = end - chunk_overlap all_chunks.extend(chunks) print(f" Created {len(chunks)} chunks") # Save chunks with open(output_dir / "chunks.json", "w") as f: json.dump({"chunks": all_chunks}, f) print(f"Total chunks: {len(all_chunks)}") resources: requests: memory: 512Mi cpu: 100m - name: embed-docs inputs: artifacts: - name: chunks path: /tmp/chunks outputs: artifacts: - name: embeddings path: /tmp/embeddings container: image: python:3.13-slim command: [python] args: - -c - | import subprocess subprocess.run(["pip", "install", "httpx", "-q"], check=True) import json import httpx from pathlib import Path EMBEDDINGS_URL = "http://embeddings-predictor.ai-ml.svc.cluster.local" BATCH_SIZE = 32 input_dir = Path("/tmp/chunks") output_dir = Path("/tmp/embeddings") output_dir.mkdir(parents=True, exist_ok=True) # Load chunks with open(input_dir / "chunks.json") as f: data = json.load(f) chunks = data["chunks"] print(f"Generating embeddings for {len(chunks)} chunks") # Generate embeddings in batches all_embeddings = [] with httpx.Client(timeout=120.0) as client: for i in range(0, len(chunks), BATCH_SIZE): batch = chunks[i:i+BATCH_SIZE] texts = [c["text"] for c in batch] response = client.post( f"{EMBEDDINGS_URL}/embeddings", json={"input": texts, "model": "bge"} ) result = response.json() for j, emb_data in enumerate(result.get("data", [])): all_embeddings.append({ "text": batch[j]["text"], "source": batch[j]["source"], "chunk_index": batch[j]["chunk_index"], "embedding": emb_data["embedding"] }) print(f" Processed batch {i//BATCH_SIZE + 1}/{(len(chunks)-1)//BATCH_SIZE + 1}") # Save embeddings with open(output_dir / "embeddings.json", "w") as f: json.dump({"embeddings": all_embeddings}, f) print(f"Generated {len(all_embeddings)} embeddings") envFrom: - configMapRef: name: ai-services-config resources: requests: memory: 1Gi cpu: 200m - name: store-docs inputs: parameters: - name: collection-name artifacts: - name: embeddings path: /tmp/embeddings container: image: python:3.13-slim command: [python] args: - -c - | import subprocess subprocess.run(["pip", "install", "pymilvus", "-q"], check=True) import json from pathlib import Path from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility MILVUS_HOST = "milvus.ai-ml.svc.cluster.local" MILVUS_PORT = 19530 COLLECTION_NAME = "{{inputs.parameters.collection-name}}" EMBEDDING_DIM = 1024 # BGE-large dimension input_dir = Path("/tmp/embeddings") # Load embeddings with open(input_dir / "embeddings.json") as f: data = json.load(f) embeddings = data["embeddings"] print(f"Storing {len(embeddings)} embeddings in Milvus") # Connect to Milvus connections.connect(host=MILVUS_HOST, port=MILVUS_PORT) print("Connected to Milvus") # Create collection if not exists if not utility.has_collection(COLLECTION_NAME): fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=1024), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM) ] schema = CollectionSchema(fields, description="Knowledge base documents") collection = Collection(COLLECTION_NAME, schema) # Create HNSW index index_params = { "metric_type": "COSINE", "index_type": "HNSW", "params": {"M": 16, "efConstruction": 256} } collection.create_index("embedding", index_params) print(f"Created collection: {COLLECTION_NAME}") else: collection = Collection(COLLECTION_NAME) print(f"Using existing collection: {COLLECTION_NAME}") # Insert data in batches BATCH_SIZE = 100 for i in range(0, len(embeddings), BATCH_SIZE): batch = embeddings[i:i+BATCH_SIZE] data = [ [e["text"] for e in batch], [e["source"] for e in batch], [e["embedding"] for e in batch] ] collection.insert(data) print(f" Inserted batch {i//BATCH_SIZE + 1}/{(len(embeddings)-1)//BATCH_SIZE + 1}") # Flush to ensure data is persisted collection.flush() print(f"Successfully stored {len(embeddings)} documents") connections.disconnect("default") envFrom: - configMapRef: name: ai-services-config resources: requests: memory: 512Mi cpu: 100m