# PIPELINE DEFINITION # Name: voice-cloning-pipeline # Description: Extract a speaker from audio+transcript, fine-tune a Coqui VITS voice model, push to Gitea, and log metrics to MLflow. # Inputs: # base_model: str [Default: 'tts_models/en/ljspeech/vits'] # batch_size: int [Default: 16.0] # gitea_owner: str [Default: 'daviestechlabs'] # gitea_password: str [Default: ''] # gitea_repo: str [Default: 'voice-models'] # gitea_url: str [Default: 'http://gitea-http.gitea.svc.cluster.local:3000'] # gitea_username: str [Default: ''] # language: str [Default: 'en'] # learning_rate: float [Default: 0.0001] # max_segment_duration_s: float [Default: 15.0] # min_segment_duration_s: float [Default: 1.0] # mlflow_tracking_uri: str [Default: 'http://mlflow.mlflow.svc.cluster.local:80'] # num_epochs: int [Default: 100.0] # s3_bucket: str [Default: 'training-data'] # s3_endpoint: str [Default: 'candlekeep.lab.daviestechlabs.io'] # s3_key: str [Default: ''] # target_speaker: str [Default: 'SPEAKER_0'] # voice_name: str [Default: 'custom-voice'] # whisper_url: str [Default: 'http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper'] components: comp-extract-speaker-segments: executorLabel: exec-extract-speaker-segments inputDefinitions: parameters: audio_path: parameterType: STRING max_duration_s: defaultValue: 15.0 isOptional: true parameterType: NUMBER_DOUBLE min_duration_s: defaultValue: 1.0 isOptional: true parameterType: NUMBER_DOUBLE target_speaker: parameterType: STRING transcript_json: parameterType: STRING outputDefinitions: parameters: num_segments: parameterType: NUMBER_INTEGER segments_json: parameterType: STRING total_duration_s: parameterType: NUMBER_DOUBLE comp-log-training-metrics: executorLabel: exec-log-training-metrics inputDefinitions: parameters: batch_size: parameterType: NUMBER_INTEGER experiment_name: defaultValue: voice-cloning isOptional: true parameterType: STRING files_pushed: parameterType: NUMBER_INTEGER final_loss: parameterType: NUMBER_DOUBLE learning_rate: parameterType: NUMBER_DOUBLE mlflow_tracking_uri: defaultValue: http://mlflow.mlflow.svc.cluster.local:80 isOptional: true parameterType: STRING num_epochs: parameterType: NUMBER_INTEGER num_segments: parameterType: NUMBER_INTEGER repo_url: parameterType: STRING total_duration_s: parameterType: NUMBER_DOUBLE voice_name: parameterType: STRING outputDefinitions: parameters: run_id: parameterType: STRING comp-prepare-ljspeech-dataset: executorLabel: exec-prepare-ljspeech-dataset inputDefinitions: parameters: language: defaultValue: en isOptional: true parameterType: STRING segments_json: parameterType: STRING voice_name: parameterType: STRING outputDefinitions: parameters: dataset_dir: parameterType: STRING num_samples: parameterType: NUMBER_INTEGER comp-push-model-to-gitea: executorLabel: exec-push-model-to-gitea inputDefinitions: parameters: gitea_owner: defaultValue: daviestechlabs isOptional: true parameterType: STRING gitea_password: defaultValue: '' isOptional: true parameterType: STRING gitea_repo: defaultValue: voice-models isOptional: true parameterType: STRING gitea_url: defaultValue: http://gitea-http.gitea.svc.cluster.local:3000 isOptional: true parameterType: STRING gitea_username: defaultValue: '' isOptional: true parameterType: STRING model_dir: parameterType: STRING voice_name: parameterType: STRING outputDefinitions: parameters: files_pushed: parameterType: NUMBER_INTEGER repo_url: parameterType: STRING comp-train-vits-voice: executorLabel: exec-train-vits-voice inputDefinitions: parameters: base_model: defaultValue: tts_models/en/ljspeech/vits isOptional: true parameterType: STRING batch_size: defaultValue: 16.0 isOptional: true parameterType: NUMBER_INTEGER dataset_dir: parameterType: STRING language: defaultValue: en isOptional: true parameterType: STRING learning_rate: defaultValue: 0.0001 isOptional: true parameterType: NUMBER_DOUBLE num_epochs: defaultValue: 100.0 isOptional: true parameterType: NUMBER_INTEGER voice_name: parameterType: STRING outputDefinitions: parameters: best_checkpoint: parameterType: STRING final_loss: parameterType: NUMBER_DOUBLE model_dir: parameterType: STRING comp-transcribe-and-diarise: executorLabel: exec-transcribe-and-diarise inputDefinitions: parameters: s3_bucket: parameterType: STRING s3_endpoint: parameterType: STRING s3_key: parameterType: STRING whisper_url: defaultValue: http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper isOptional: true parameterType: STRING outputDefinitions: parameters: audio_path: parameterType: STRING speakers: parameterType: STRING transcript_json: parameterType: STRING deploymentSpec: executors: exec-extract-speaker-segments: container: args: - --executor_input - '{{$}}' - --function_to_execute - extract_speaker_segments command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef extract_speaker_segments(\n transcript_json: str,\n audio_path:\ \ str,\n target_speaker: str,\n min_duration_s: float = 1.0,\n \ \ max_duration_s: float = 15.0,\n) -> NamedTuple(\"SpeakerSegments\", [(\"\ segments_json\", str), (\"num_segments\", int), (\"total_duration_s\", float)]):\n\ \ \"\"\"Slice the audio into per-utterance WAV files for the target speaker.\"\ \"\"\n import json\n import os\n import subprocess\n import\ \ tempfile\n\n out = NamedTuple(\"SpeakerSegments\", [(\"segments_json\"\ , str), (\"num_segments\", int), (\"total_duration_s\", float)])\n work\ \ = tempfile.mkdtemp()\n wavs_dir = os.path.join(work, \"wavs\")\n \ \ os.makedirs(wavs_dir, exist_ok=True)\n\n # Install ffmpeg\n subprocess.run([\"\ apt-get\", \"update\", \"-qq\"], capture_output=True)\n subprocess.run([\"\ apt-get\", \"install\", \"-y\", \"-qq\", \"ffmpeg\"], capture_output=True,\ \ check=True)\n\n segments = json.loads(transcript_json)\n\n # Filter\ \ by speaker \u2014 fuzzy match (case-insensitive, partial)\n target_lower\ \ = target_speaker.lower()\n matched = []\n for seg in segments:\n\ \ spk = seg.get(\"speaker\", \"\").lower()\n if target_lower\ \ in spk or spk in target_lower:\n matched.append(seg)\n\n \ \ # If no speaker labels matched, the user may have given a name\n #\ \ that doesn't appear. Fall back to using ALL segments.\n if not matched:\n\ \ print(f\"WARNING: No segments matched speaker '{target_speaker}'.\ \ \"\n f\"Using all {len(segments)} segments.\")\n matched\ \ = segments\n\n print(f\"Matched {len(matched)} segments for speaker\ \ '{target_speaker}'\")\n\n kept = []\n total_dur = 0.0\n for i,\ \ seg in enumerate(matched):\n start = float(seg.get(\"start\", 0))\n\ \ end = float(seg.get(\"end\", start + 5))\n duration = end\ \ - start\n text = seg.get(\"text\", \"\").strip()\n\n if\ \ duration < min_duration_s or not text:\n continue\n \ \ if duration > max_duration_s:\n end = start + max_duration_s\n\ \ duration = max_duration_s\n\n wav_name = f\"utt_{i:04d}.wav\"\ \n wav_out = os.path.join(wavs_dir, wav_name)\n subprocess.run(\n\ \ [\"ffmpeg\", \"-y\", \"-i\", audio_path,\n \"-ss\"\ , str(start), \"-to\", str(end),\n \"-ac\", \"1\", \"-ar\",\ \ \"22050\", \"-sample_fmt\", \"s16\",\n wav_out],\n \ \ capture_output=True, check=True,\n )\n\n kept.append({\n\ \ \"wav\": wav_name,\n \"text\": text,\n \ \ \"start\": start,\n \"end\": end,\n \"duration\"\ : round(duration, 2),\n })\n total_dur += duration\n\n \ \ print(f\"Extracted {len(kept)} utterances, total {total_dur:.1f}s\")\n\ \n return out(\n segments_json=json.dumps({\"wavs_dir\": wavs_dir,\ \ \"utterances\": kept}),\n num_segments=len(kept),\n total_duration_s=round(total_dur,\ \ 2),\n )\n\n" image: python:3.13-slim exec-log-training-metrics: container: args: - --executor_input - '{{$}}' - --function_to_execute - log_training_metrics command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'mlflow>=2.10.0'\ \ 'requests' && \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef log_training_metrics(\n voice_name: str,\n num_segments:\ \ int,\n total_duration_s: float,\n final_loss: float,\n num_epochs:\ \ int,\n batch_size: int,\n learning_rate: float,\n repo_url: str,\n\ \ files_pushed: int,\n mlflow_tracking_uri: str = \"http://mlflow.mlflow.svc.cluster.local:80\"\ ,\n experiment_name: str = \"voice-cloning\",\n) -> NamedTuple(\"LogOutput\"\ , [(\"run_id\", str)]):\n \"\"\"Log training run to MLflow.\"\"\"\n \ \ import mlflow\n from datetime import datetime\n\n out = NamedTuple(\"\ LogOutput\", [(\"run_id\", str)])\n\n mlflow.set_tracking_uri(mlflow_tracking_uri)\n\ \ mlflow.set_experiment(experiment_name)\n\n with mlflow.start_run(run_name=f\"\ voice-clone-{voice_name}-{datetime.now():%Y%m%d-%H%M}\") as run:\n \ \ mlflow.log_params({\n \"voice_name\": voice_name,\n \ \ \"base_model\": \"tts_models/en/ljspeech/vits\",\n \"\ model_type\": \"coqui-vits\",\n \"num_epochs\": num_epochs,\n\ \ \"batch_size\": batch_size,\n \"learning_rate\"\ : learning_rate,\n \"sample_rate\": 22050,\n })\n \ \ mlflow.log_metrics({\n \"num_training_segments\": num_segments,\n\ \ \"total_audio_duration_s\": total_duration_s,\n \ \ \"final_loss\": final_loss,\n \"files_pushed\": files_pushed,\n\ \ })\n mlflow.set_tags({\n \"pipeline\": \"voice-cloning\"\ ,\n \"gitea_repo\": repo_url,\n \"voice_name\": voice_name,\n\ \ })\n print(f\"Logged to MLflow run: {run.info.run_id}\"\ )\n return out(run_id=run.info.run_id)\n\n" image: python:3.13-slim exec-prepare-ljspeech-dataset: container: args: - --executor_input - '{{$}}' - --function_to_execute - prepare_ljspeech_dataset command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef prepare_ljspeech_dataset(\n segments_json: str,\n voice_name:\ \ str,\n language: str = \"en\",\n) -> NamedTuple(\"DatasetOutput\",\ \ [(\"dataset_dir\", str), (\"num_samples\", int)]):\n \"\"\"Create metadata.csv\ \ + wavs/ in LJSpeech format.\"\"\"\n import json\n import os\n \ \ import shutil\n\n out = NamedTuple(\"DatasetOutput\", [(\"dataset_dir\"\ , str), (\"num_samples\", int)])\n\n data = json.loads(segments_json)\n\ \ wavs_src = data[\"wavs_dir\"]\n utterances = data[\"utterances\"\ ]\n\n dataset_dir = os.path.join(os.path.dirname(wavs_src), \"dataset\"\ )\n wavs_dst = os.path.join(dataset_dir, \"wavs\")\n os.makedirs(wavs_dst,\ \ exist_ok=True)\n\n lines = []\n for utt in utterances:\n \ \ src = os.path.join(wavs_src, utt[\"wav\"])\n dst = os.path.join(wavs_dst,\ \ utt[\"wav\"])\n shutil.copy2(src, dst)\n stem = os.path.splitext(utt[\"\ wav\"])[0]\n # LJSpeech format: id|text|text\n text = utt[\"\ text\"].replace(\"|\", \" \")\n lines.append(f\"{stem}|{text}|{text}\"\ )\n\n metadata_path = os.path.join(dataset_dir, \"metadata.csv\")\n \ \ with open(metadata_path, \"w\", encoding=\"utf-8\") as f:\n f.write(\"\ \\n\".join(lines))\n\n # Dataset config for reference\n import json\ \ as _json\n config = {\n \"name\": voice_name,\n \"language\"\ : language,\n \"num_samples\": len(lines),\n \"format\": \"\ ljspeech\",\n \"sample_rate\": 22050,\n }\n with open(os.path.join(dataset_dir,\ \ \"dataset_config.json\"), \"w\") as f:\n _json.dump(config, f,\ \ indent=2)\n\n print(f\"LJSpeech dataset ready: {len(lines)} samples\"\ )\n return out(dataset_dir=dataset_dir, num_samples=len(lines))\n\n" image: python:3.13-slim exec-push-model-to-gitea: container: args: - --executor_input - '{{$}}' - --function_to_execute - push_model_to_gitea command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'requests' &&\ \ \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef push_model_to_gitea(\n model_dir: str,\n voice_name: str,\n\ \ gitea_url: str = \"http://gitea-http.gitea.svc.cluster.local:3000\"\ ,\n gitea_owner: str = \"daviestechlabs\",\n gitea_repo: str = \"\ voice-models\",\n gitea_username: str = \"\",\n gitea_password: str\ \ = \"\",\n) -> NamedTuple(\"PushOutput\", [(\"repo_url\", str), (\"files_pushed\"\ , int)]):\n \"\"\"Package and push the trained model to a Gitea repository.\"\ \"\"\n import base64\n import glob\n import json\n import os\n\ \ import requests\n\n out = NamedTuple(\"PushOutput\", [(\"repo_url\"\ , str), (\"files_pushed\", int)])\n session = requests.Session()\n \ \ session.auth = (gitea_username, gitea_password) if gitea_username else\ \ None\n\n api = f\"{gitea_url}/api/v1\"\n repo_url = f\"{gitea_url}/{gitea_owner}/{gitea_repo}\"\ \n\n # \u2500\u2500 Ensure repo exists \u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\n r = session.get(f\"{api}/repos/{gitea_owner}/{gitea_repo}\"\ , timeout=30)\n if r.status_code == 404:\n print(f\"Creating repository:\ \ {gitea_owner}/{gitea_repo}\")\n r = session.post(\n \ \ f\"{api}/orgs/{gitea_owner}/repos\",\n json={\n \ \ \"name\": gitea_repo,\n \"description\": \"Trained\ \ voice models from voice cloning pipeline\",\n \"private\"\ : False,\n \"auto_init\": True,\n },\n \ \ timeout=30,\n )\n if r.status_code not in (200, 201):\n\ \ r = session.post(\n f\"{api}/user/repos\",\n\ \ json={\"name\": gitea_repo, \"description\": \"Trained\ \ voice models\", \"auto_init\": True},\n timeout=30,\n \ \ )\n r.raise_for_status()\n print(\"Repository\ \ created\")\n\n # \u2500\u2500 Collect model files \u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n files_to_push = []\n\ \n # Best model checkpoint\n for pattern in [\"**/best_model*.pth\"\ , \"**/*.pth\"]:\n found = glob.glob(os.path.join(model_dir, pattern),\ \ recursive=True)\n if found:\n found.sort(key=os.path.getmtime,\ \ reverse=True)\n files_to_push.append(found[0])\n \ \ break\n\n # Config\n for pattern in [\"**/config.json\"]:\n \ \ found = glob.glob(os.path.join(model_dir, pattern), recursive=True)\n\ \ if found:\n files_to_push.append(found[0])\n\n #\ \ Model info\n model_info = {\n \"name\": voice_name,\n \ \ \"type\": \"coqui-vits\",\n \"base_model\": \"tts_models/en/ljspeech/vits\"\ ,\n \"sample_rate\": 22050,\n }\n info_path = os.path.join(model_dir,\ \ \"model_info.json\")\n with open(info_path, \"w\") as f:\n json.dump(model_info,\ \ f, indent=2)\n files_to_push.append(info_path)\n\n # \u2500\u2500\ \ Push each file \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\n pushed = 0\n for fpath in files_to_push:\n\ \ rel = os.path.relpath(fpath, model_dir)\n gitea_path = f\"\ {voice_name}/{rel}\"\n print(f\"Pushing: {gitea_path} ({os.path.getsize(fpath)}\ \ bytes)\")\n\n with open(fpath, \"rb\") as f:\n content_b64\ \ = base64.b64encode(f.read()).decode()\n\n # Check if file exists\n\ \ r = session.get(\n f\"{api}/repos/{gitea_owner}/{gitea_repo}/contents/{gitea_path}\"\ ,\n timeout=30,\n )\n\n payload = {\n \ \ \"content\": content_b64,\n \"message\": f\"Upload {voice_name}:\ \ {rel}\",\n }\n\n if r.status_code == 200:\n sha\ \ = r.json().get(\"sha\", \"\")\n payload[\"sha\"] = sha\n \ \ r = session.put(\n f\"{api}/repos/{gitea_owner}/{gitea_repo}/contents/{gitea_path}\"\ ,\n json=payload, timeout=120,\n )\n else:\n\ \ r = session.post(\n f\"{api}/repos/{gitea_owner}/{gitea_repo}/contents/{gitea_path}\"\ ,\n json=payload, timeout=120,\n )\n\n \ \ if r.status_code in (200, 201):\n pushed += 1\n \ \ print(f\" \u2713 Pushed\")\n else:\n print(f\" \u2717\ \ Failed ({r.status_code}): {r.text[:200]}\")\n\n print(f\"\\nPushed\ \ {pushed}/{len(files_to_push)} files to {repo_url}\")\n return out(repo_url=repo_url,\ \ files_pushed=pushed)\n\n" image: python:3.13-slim exec-train-vits-voice: container: args: - --executor_input - '{{$}}' - --function_to_execute - train_vits_voice command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef train_vits_voice(\n dataset_dir: str,\n voice_name: str,\n\ \ language: str = \"en\",\n base_model: str = \"tts_models/en/ljspeech/vits\"\ ,\n num_epochs: int = 100,\n batch_size: int = 16,\n learning_rate:\ \ float = 0.0001,\n) -> NamedTuple(\"TrainOutput\", [(\"model_dir\", str),\ \ (\"best_checkpoint\", str), (\"final_loss\", float)]):\n \"\"\"Fine-tune\ \ a VITS model on the speaker dataset.\"\"\"\n import os\n import\ \ json\n import glob\n\n out = NamedTuple(\"TrainOutput\", [(\"model_dir\"\ , str), (\"best_checkpoint\", str), (\"final_loss\", float)])\n\n OUTPUT_DIR\ \ = \"/tmp/vits_output\"\n os.makedirs(OUTPUT_DIR, exist_ok=True)\n\n\ \ print(f\"=== Coqui VITS Voice Training ===\")\n print(f\"Voice name\ \ : {voice_name}\")\n print(f\"Base model : {base_model}\")\n print(f\"\ Dataset : {dataset_dir}\")\n print(f\"Epochs : {num_epochs}\"\ )\n print(f\"Batch size : {batch_size}\")\n print(f\"LR :\ \ {learning_rate}\")\n\n # \u2500\u2500 Download base model checkpoint\ \ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \n restore_path = None\n if base_model and base_model != \"none\"\ :\n from TTS.utils.manage import ModelManager\n manager =\ \ ModelManager()\n model_path, config_path, _ = manager.download_model(base_model)\n\ \ restore_path = model_path\n print(f\"Base model checkpoint:\ \ {restore_path}\")\n\n # \u2500\u2500 Configure and train \u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n from trainer\ \ import Trainer, TrainerArgs\n from TTS.tts.configs.vits_config import\ \ VitsConfig\n from TTS.tts.configs.shared_configs import BaseDatasetConfig\n\ \ from TTS.tts.datasets import load_tts_samples\n from TTS.tts.models.vits\ \ import Vits\n from TTS.tts.utils.text.tokenizer import TTSTokenizer\n\ \ from TTS.utils.audio import AudioProcessor\n\n dataset_config =\ \ BaseDatasetConfig(\n formatter=\"ljspeech\",\n meta_file_train=\"\ metadata.csv\",\n path=dataset_dir,\n language=language,\n\ \ )\n\n config = VitsConfig(\n run_name=voice_name,\n \ \ output_path=OUTPUT_DIR,\n datasets=[dataset_config],\n \ \ batch_size=batch_size,\n eval_batch_size=max(1, batch_size //\ \ 2),\n num_loader_workers=4,\n num_eval_loader_workers=2,\n\ \ run_eval=True,\n test_delay_epochs=5,\n epochs=num_epochs,\n\ \ text_cleaner=\"phoneme_cleaners\",\n use_phonemes=True,\n\ \ phoneme_language=language,\n phoneme_cache_path=os.path.join(OUTPUT_DIR,\ \ \"phoneme_cache\"),\n compute_input_seq_cache=True,\n print_step=25,\n\ \ print_eval=False,\n mixed_precision=True,\n save_step=500,\n\ \ save_n_checkpoints=3,\n save_best_after=1000,\n lr=learning_rate,\n\ \ audio={\n \"sample_rate\": 22050,\n \"resample\"\ : True,\n \"do_trim_silence\": True,\n \"trim_db\"\ : 45,\n },\n )\n\n ap = AudioProcessor.init_from_config(config)\n\ \ tokenizer, config = TTSTokenizer.init_from_config(config)\n\n train_samples,\ \ eval_samples = load_tts_samples(\n dataset_config,\n eval_split=True,\n\ \ eval_split_max_size=config.eval_split_max_size,\n eval_split_size=config.eval_split_size,\n\ \ )\n print(f\"Training samples: {len(train_samples)}\")\n print(f\"\ Eval samples: {len(eval_samples)}\")\n\n model = Vits(config, ap,\ \ tokenizer, speaker_manager=None)\n\n trainer_args = TrainerArgs(\n\ \ restore_path=restore_path,\n skip_train_epoch=False,\n \ \ )\n\n trainer = Trainer(\n trainer_args,\n config,\n\ \ output_path=OUTPUT_DIR,\n model=model,\n train_samples=train_samples,\n\ \ eval_samples=eval_samples,\n )\n\n trainer.fit()\n\n #\ \ \u2500\u2500 Find best checkpoint \u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\n best_files = glob.glob(os.path.join(OUTPUT_DIR,\ \ \"**/best_model*.pth\"), recursive=True)\n if not best_files:\n \ \ best_files = glob.glob(os.path.join(OUTPUT_DIR, \"**/*.pth\"), recursive=True)\n\ \ best_files.sort(key=os.path.getmtime, reverse=True)\n best_checkpoint\ \ = best_files[0] if best_files else \"\"\n\n # Try to read final loss\ \ from trainer\n final_loss = 0.0\n try:\n final_loss = float(trainer.keep_avg_train[\"\ avg_loss\"])\n except Exception:\n pass\n\n print(f\"Training\ \ complete. Best checkpoint: {best_checkpoint}\")\n print(f\"Final loss:\ \ {final_loss:.4f}\")\n\n return out(model_dir=OUTPUT_DIR, best_checkpoint=best_checkpoint,\ \ final_loss=final_loss)\n\n" image: ghcr.io/idiap/coqui-tts:latest resources: accelerator: resourceCount: '1' resourceType: gpu resourceCpuLimit: '8' resourceCpuRequest: '4' resourceMemoryLimit: 32Gi resourceMemoryRequest: 16Gi exec-transcribe-and-diarise: container: args: - --executor_input - '{{$}}' - --function_to_execute - transcribe_and_diarise command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'requests' 'boto3'\ \ && \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef transcribe_and_diarise(\n s3_endpoint: str,\n s3_bucket:\ \ str,\n s3_key: str,\n whisper_url: str = \"http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper\"\ ,\n) -> NamedTuple(\"TranscriptOutput\", [(\"transcript_json\", str), (\"\ speakers\", str), (\"audio_path\", str)]):\n \"\"\"Download audio from\ \ Quobjects S3, transcribe via Whisper with timestamps.\"\"\"\n import\ \ json\n import os\n import subprocess\n import tempfile\n import\ \ base64\n import boto3\n import requests\n\n out = NamedTuple(\"\ TranscriptOutput\", [(\"transcript_json\", str), (\"speakers\", str), (\"\ audio_path\", str)])\n work = tempfile.mkdtemp()\n\n # \u2500\u2500\ \ Download audio from Quobjects S3 \u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\n ext = os.path.splitext(s3_key)[-1] or \".wav\"\n audio_path\ \ = os.path.join(work, f\"audio_raw{ext}\")\n\n client = boto3.client(\n\ \ \"s3\",\n endpoint_url=f\"http://{s3_endpoint}\",\n \ \ aws_access_key_id=\"\",\n aws_secret_access_key=\"\",\n \ \ config=boto3.session.Config(signature_version=\"UNSIGNED\"),\n )\n\ \ print(f\"Downloading s3://{s3_bucket}/{s3_key} from {s3_endpoint}\"\ )\n client.download_file(s3_bucket, s3_key, audio_path)\n print(f\"\ Downloaded {os.path.getsize(audio_path)} bytes\")\n\n # \u2500\u2500\ \ Normalise to 16 kHz mono WAV \u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ \u2500\u2500\u2500\u2500\u2500\u2500\n wav_path = os.path.join(work,\ \ \"audio.wav\")\n subprocess.run(\n [\"apt-get\", \"update\"\ , \"-qq\"],\n capture_output=True,\n )\n subprocess.run(\n\ \ [\"apt-get\", \"install\", \"-y\", \"-qq\", \"ffmpeg\"],\n \ \ capture_output=True, check=True,\n )\n subprocess.run(\n \ \ [\"ffmpeg\", \"-y\", \"-i\", audio_path, \"-ac\", \"1\",\n \ \ \"-ar\", \"16000\", \"-sample_fmt\", \"s16\", wav_path],\n capture_output=True,\ \ check=True,\n )\n\n # \u2500\u2500 Send to Whisper for timestamped\ \ transcription \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ \ with open(wav_path, \"rb\") as f:\n audio_b64 = base64.b64encode(f.read()).decode()\n\ \n payload = {\n \"audio\": audio_b64,\n \"response_format\"\ : \"verbose_json\",\n \"timestamp_granularities\": [\"segment\"],\n\ \ }\n resp = requests.post(whisper_url, json=payload, timeout=600)\n\ \ resp.raise_for_status()\n result = resp.json()\n\n segments =\ \ result.get(\"segments\", [])\n print(f\"Whisper returned {len(segments)}\ \ segments\")\n\n # \u2500\u2500 Group segments by speaker if diarisation\ \ is present \u2500\u2500\u2500\n # Whisper may not diarise, but we still\ \ produce segments with\n # start/end timestamps that the next step can\ \ use.\n speakers = set()\n for i, seg in enumerate(segments):\n \ \ spk = seg.get(\"speaker\", f\"SPEAKER_{i // 10}\")\n seg[\"\ speaker\"] = spk\n speakers.add(spk)\n\n speakers_list = sorted(speakers)\n\ \ print(f\"Detected speakers: {speakers_list}\")\n\n return out(\n\ \ transcript_json=json.dumps(segments),\n speakers=json.dumps(speakers_list),\n\ \ audio_path=wav_path,\n )\n\n" image: python:3.13-slim pipelineInfo: description: Extract a speaker from audio+transcript, fine-tune a Coqui VITS voice model, push to Gitea, and log metrics to MLflow. name: voice-cloning-pipeline root: dag: tasks: extract-speaker-segments: cachingOptions: enableCache: true componentRef: name: comp-extract-speaker-segments dependentTasks: - transcribe-and-diarise inputs: parameters: audio_path: taskOutputParameter: outputParameterKey: audio_path producerTask: transcribe-and-diarise max_duration_s: componentInputParameter: max_segment_duration_s min_duration_s: componentInputParameter: min_segment_duration_s target_speaker: componentInputParameter: target_speaker transcript_json: taskOutputParameter: outputParameterKey: transcript_json producerTask: transcribe-and-diarise taskInfo: name: extract-speaker-segments log-training-metrics: cachingOptions: enableCache: true componentRef: name: comp-log-training-metrics dependentTasks: - extract-speaker-segments - push-model-to-gitea - train-vits-voice inputs: parameters: batch_size: componentInputParameter: batch_size files_pushed: taskOutputParameter: outputParameterKey: files_pushed producerTask: push-model-to-gitea final_loss: taskOutputParameter: outputParameterKey: final_loss producerTask: train-vits-voice learning_rate: componentInputParameter: learning_rate mlflow_tracking_uri: componentInputParameter: mlflow_tracking_uri num_epochs: componentInputParameter: num_epochs num_segments: taskOutputParameter: outputParameterKey: num_segments producerTask: extract-speaker-segments repo_url: taskOutputParameter: outputParameterKey: repo_url producerTask: push-model-to-gitea total_duration_s: taskOutputParameter: outputParameterKey: total_duration_s producerTask: extract-speaker-segments voice_name: componentInputParameter: voice_name taskInfo: name: log-training-metrics prepare-ljspeech-dataset: cachingOptions: enableCache: true componentRef: name: comp-prepare-ljspeech-dataset dependentTasks: - extract-speaker-segments inputs: parameters: language: componentInputParameter: language segments_json: taskOutputParameter: outputParameterKey: segments_json producerTask: extract-speaker-segments voice_name: componentInputParameter: voice_name taskInfo: name: prepare-ljspeech-dataset push-model-to-gitea: cachingOptions: enableCache: true componentRef: name: comp-push-model-to-gitea dependentTasks: - train-vits-voice inputs: parameters: gitea_owner: componentInputParameter: gitea_owner gitea_password: componentInputParameter: gitea_password gitea_repo: componentInputParameter: gitea_repo gitea_url: componentInputParameter: gitea_url gitea_username: componentInputParameter: gitea_username model_dir: taskOutputParameter: outputParameterKey: model_dir producerTask: train-vits-voice voice_name: componentInputParameter: voice_name taskInfo: name: push-model-to-gitea train-vits-voice: cachingOptions: enableCache: true componentRef: name: comp-train-vits-voice dependentTasks: - prepare-ljspeech-dataset inputs: parameters: base_model: componentInputParameter: base_model batch_size: componentInputParameter: batch_size dataset_dir: taskOutputParameter: outputParameterKey: dataset_dir producerTask: prepare-ljspeech-dataset language: componentInputParameter: language learning_rate: componentInputParameter: learning_rate num_epochs: componentInputParameter: num_epochs voice_name: componentInputParameter: voice_name taskInfo: name: train-vits-voice transcribe-and-diarise: cachingOptions: enableCache: true componentRef: name: comp-transcribe-and-diarise inputs: parameters: s3_bucket: componentInputParameter: s3_bucket s3_endpoint: componentInputParameter: s3_endpoint s3_key: componentInputParameter: s3_key whisper_url: componentInputParameter: whisper_url taskInfo: name: transcribe-and-diarise inputDefinitions: parameters: base_model: defaultValue: tts_models/en/ljspeech/vits isOptional: true parameterType: STRING batch_size: defaultValue: 16.0 isOptional: true parameterType: NUMBER_INTEGER gitea_owner: defaultValue: daviestechlabs isOptional: true parameterType: STRING gitea_password: defaultValue: '' isOptional: true parameterType: STRING gitea_repo: defaultValue: voice-models isOptional: true parameterType: STRING gitea_url: defaultValue: http://gitea-http.gitea.svc.cluster.local:3000 isOptional: true parameterType: STRING gitea_username: defaultValue: '' isOptional: true parameterType: STRING language: defaultValue: en isOptional: true parameterType: STRING learning_rate: defaultValue: 0.0001 isOptional: true parameterType: NUMBER_DOUBLE max_segment_duration_s: defaultValue: 15.0 isOptional: true parameterType: NUMBER_DOUBLE min_segment_duration_s: defaultValue: 1.0 isOptional: true parameterType: NUMBER_DOUBLE mlflow_tracking_uri: defaultValue: http://mlflow.mlflow.svc.cluster.local:80 isOptional: true parameterType: STRING num_epochs: defaultValue: 100.0 isOptional: true parameterType: NUMBER_INTEGER s3_bucket: defaultValue: training-data isOptional: true parameterType: STRING s3_endpoint: defaultValue: candlekeep.lab.daviestechlabs.io isOptional: true parameterType: STRING s3_key: defaultValue: '' isOptional: true parameterType: STRING target_speaker: defaultValue: SPEAKER_0 isOptional: true parameterType: STRING voice_name: defaultValue: custom-voice isOptional: true parameterType: STRING whisper_url: defaultValue: http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper isOptional: true parameterType: STRING schemaVersion: 2.1.0 sdkVersion: kfp-2.12.1