From 5c886bf6a5606cc0636e03c771c9e2ac83cf36b0 Mon Sep 17 00:00:00 2001 From: "Billy D." Date: Fri, 13 Feb 2026 10:54:04 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20add=20voice=20cloning=20pipeline=20(S3?= =?UTF-8?q?=20audio=20=E2=86=92=20Whisper=20=E2=86=92=20VITS=20training=20?= =?UTF-8?q?=E2=86=92=20Gitea)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- voice_cloning_pipeline.py | 686 ++++++++++++++++++++++++++++ voice_cloning_pipeline.yaml | 876 ++++++++++++++++++++++++++++++++++++ 2 files changed, 1562 insertions(+) create mode 100644 voice_cloning_pipeline.py create mode 100644 voice_cloning_pipeline.yaml diff --git a/voice_cloning_pipeline.py b/voice_cloning_pipeline.py new file mode 100644 index 0000000..8172f1b --- /dev/null +++ b/voice_cloning_pipeline.py @@ -0,0 +1,686 @@ +#!/usr/bin/env python3 +""" +Voice Cloning Pipeline – Kubeflow Pipelines SDK + +Takes an audio file and a transcript, extracts a target speaker's +segments, preprocesses into LJSpeech-format training data, fine-tunes +a Coqui VITS voice model, pushes the model to Gitea, and logs to MLflow. + +Usage: + pip install kfp==2.12.1 + python voice_cloning_pipeline.py + # Upload voice_cloning_pipeline.yaml to Kubeflow Pipelines UI +""" + +from kfp import compiler, dsl +from typing import NamedTuple + + +# ────────────────────────────────────────────────────────────── +# 1. Transcribe + diarise audio via Whisper to identify speakers +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["requests", "boto3"], +) +def transcribe_and_diarise( + s3_endpoint: str, + s3_bucket: str, + s3_key: str, + whisper_url: str = "http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper", +) -> NamedTuple("TranscriptOutput", [("transcript_json", str), ("speakers", str), ("audio_path", str)]): + """Download audio from Quobjects S3, transcribe via Whisper with timestamps.""" + import json + import os + import subprocess + import tempfile + import base64 + import boto3 + import requests + + out = NamedTuple("TranscriptOutput", [("transcript_json", str), ("speakers", str), ("audio_path", str)]) + work = tempfile.mkdtemp() + + # ── Download audio from Quobjects S3 ───────────────────── + ext = os.path.splitext(s3_key)[-1] or ".wav" + audio_path = os.path.join(work, f"audio_raw{ext}") + + client = boto3.client( + "s3", + endpoint_url=f"http://{s3_endpoint}", + aws_access_key_id="", + aws_secret_access_key="", + config=boto3.session.Config(signature_version="UNSIGNED"), + ) + print(f"Downloading s3://{s3_bucket}/{s3_key} from {s3_endpoint}") + client.download_file(s3_bucket, s3_key, audio_path) + print(f"Downloaded {os.path.getsize(audio_path)} bytes") + + # ── Normalise to 16 kHz mono WAV ───────────────────────── + wav_path = os.path.join(work, "audio.wav") + subprocess.run( + ["apt-get", "update", "-qq"], + capture_output=True, + ) + subprocess.run( + ["apt-get", "install", "-y", "-qq", "ffmpeg"], + capture_output=True, check=True, + ) + subprocess.run( + ["ffmpeg", "-y", "-i", audio_path, "-ac", "1", + "-ar", "16000", "-sample_fmt", "s16", wav_path], + capture_output=True, check=True, + ) + + # ── Send to Whisper for timestamped transcription ───────── + with open(wav_path, "rb") as f: + audio_b64 = base64.b64encode(f.read()).decode() + + payload = { + "audio": audio_b64, + "response_format": "verbose_json", + "timestamp_granularities": ["segment"], + } + resp = requests.post(whisper_url, json=payload, timeout=600) + resp.raise_for_status() + result = resp.json() + + segments = result.get("segments", []) + print(f"Whisper returned {len(segments)} segments") + + # ── Group segments by speaker if diarisation is present ─── + # Whisper may not diarise, but we still produce segments with + # start/end timestamps that the next step can use. + speakers = set() + for i, seg in enumerate(segments): + spk = seg.get("speaker", f"SPEAKER_{i // 10}") + seg["speaker"] = spk + speakers.add(spk) + + speakers_list = sorted(speakers) + print(f"Detected speakers: {speakers_list}") + + return out( + transcript_json=json.dumps(segments), + speakers=json.dumps(speakers_list), + audio_path=wav_path, + ) + + +# ────────────────────────────────────────────────────────────── +# 2. Extract target speaker's audio segments +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=[], +) +def extract_speaker_segments( + transcript_json: str, + audio_path: str, + target_speaker: str, + min_duration_s: float = 1.0, + max_duration_s: float = 15.0, +) -> NamedTuple("SpeakerSegments", [("segments_json", str), ("num_segments", int), ("total_duration_s", float)]): + """Slice the audio into per-utterance WAV files for the target speaker.""" + import json + import os + import subprocess + import tempfile + + out = NamedTuple("SpeakerSegments", [("segments_json", str), ("num_segments", int), ("total_duration_s", float)]) + work = tempfile.mkdtemp() + wavs_dir = os.path.join(work, "wavs") + os.makedirs(wavs_dir, exist_ok=True) + + # Install ffmpeg + subprocess.run(["apt-get", "update", "-qq"], capture_output=True) + subprocess.run(["apt-get", "install", "-y", "-qq", "ffmpeg"], capture_output=True, check=True) + + segments = json.loads(transcript_json) + + # Filter by speaker — fuzzy match (case-insensitive, partial) + target_lower = target_speaker.lower() + matched = [] + for seg in segments: + spk = seg.get("speaker", "").lower() + if target_lower in spk or spk in target_lower: + matched.append(seg) + + # If no speaker labels matched, the user may have given a name + # that doesn't appear. Fall back to using ALL segments. + if not matched: + print(f"WARNING: No segments matched speaker '{target_speaker}'. " + f"Using all {len(segments)} segments.") + matched = segments + + print(f"Matched {len(matched)} segments for speaker '{target_speaker}'") + + kept = [] + total_dur = 0.0 + for i, seg in enumerate(matched): + start = float(seg.get("start", 0)) + end = float(seg.get("end", start + 5)) + duration = end - start + text = seg.get("text", "").strip() + + if duration < min_duration_s or not text: + continue + if duration > max_duration_s: + end = start + max_duration_s + duration = max_duration_s + + wav_name = f"utt_{i:04d}.wav" + wav_out = os.path.join(wavs_dir, wav_name) + subprocess.run( + ["ffmpeg", "-y", "-i", audio_path, + "-ss", str(start), "-to", str(end), + "-ac", "1", "-ar", "22050", "-sample_fmt", "s16", + wav_out], + capture_output=True, check=True, + ) + + kept.append({ + "wav": wav_name, + "text": text, + "start": start, + "end": end, + "duration": round(duration, 2), + }) + total_dur += duration + + print(f"Extracted {len(kept)} utterances, total {total_dur:.1f}s") + + return out( + segments_json=json.dumps({"wavs_dir": wavs_dir, "utterances": kept}), + num_segments=len(kept), + total_duration_s=round(total_dur, 2), + ) + + +# ────────────────────────────────────────────────────────────── +# 3. Prepare LJSpeech-format dataset for Coqui TTS +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=[], +) +def prepare_ljspeech_dataset( + segments_json: str, + voice_name: str, + language: str = "en", +) -> NamedTuple("DatasetOutput", [("dataset_dir", str), ("num_samples", int)]): + """Create metadata.csv + wavs/ in LJSpeech format.""" + import json + import os + import shutil + + out = NamedTuple("DatasetOutput", [("dataset_dir", str), ("num_samples", int)]) + + data = json.loads(segments_json) + wavs_src = data["wavs_dir"] + utterances = data["utterances"] + + dataset_dir = os.path.join(os.path.dirname(wavs_src), "dataset") + wavs_dst = os.path.join(dataset_dir, "wavs") + os.makedirs(wavs_dst, exist_ok=True) + + lines = [] + for utt in utterances: + src = os.path.join(wavs_src, utt["wav"]) + dst = os.path.join(wavs_dst, utt["wav"]) + shutil.copy2(src, dst) + stem = os.path.splitext(utt["wav"])[0] + # LJSpeech format: id|text|text + text = utt["text"].replace("|", " ") + lines.append(f"{stem}|{text}|{text}") + + metadata_path = os.path.join(dataset_dir, "metadata.csv") + with open(metadata_path, "w", encoding="utf-8") as f: + f.write("\n".join(lines)) + + # Dataset config for reference + import json as _json + config = { + "name": voice_name, + "language": language, + "num_samples": len(lines), + "format": "ljspeech", + "sample_rate": 22050, + } + with open(os.path.join(dataset_dir, "dataset_config.json"), "w") as f: + _json.dump(config, f, indent=2) + + print(f"LJSpeech dataset ready: {len(lines)} samples") + return out(dataset_dir=dataset_dir, num_samples=len(lines)) + + +# ────────────────────────────────────────────────────────────── +# 4. Fine-tune Coqui VITS voice model +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="ghcr.io/coqui-ai/tts:latest", + packages_to_install=[], +) +def train_vits_voice( + dataset_dir: str, + voice_name: str, + language: str = "en", + base_model: str = "tts_models/en/ljspeech/vits", + num_epochs: int = 100, + batch_size: int = 16, + learning_rate: float = 0.0001, +) -> NamedTuple("TrainOutput", [("model_dir", str), ("best_checkpoint", str), ("final_loss", float)]): + """Fine-tune a VITS model on the speaker dataset.""" + import os + import json + import glob + + out = NamedTuple("TrainOutput", [("model_dir", str), ("best_checkpoint", str), ("final_loss", float)]) + + OUTPUT_DIR = "/tmp/vits_output" + os.makedirs(OUTPUT_DIR, exist_ok=True) + + print(f"=== Coqui VITS Voice Training ===") + print(f"Voice name : {voice_name}") + print(f"Base model : {base_model}") + print(f"Dataset : {dataset_dir}") + print(f"Epochs : {num_epochs}") + print(f"Batch size : {batch_size}") + print(f"LR : {learning_rate}") + + # ── Download base model checkpoint ──────────────────────── + restore_path = None + if base_model and base_model != "none": + from TTS.utils.manage import ModelManager + manager = ModelManager() + model_path, config_path, _ = manager.download_model(base_model) + restore_path = model_path + print(f"Base model checkpoint: {restore_path}") + + # ── Configure and train ─────────────────────────────────── + from trainer import Trainer, TrainerArgs + from TTS.tts.configs.vits_config import VitsConfig + from TTS.tts.configs.shared_configs import BaseDatasetConfig + from TTS.tts.datasets import load_tts_samples + from TTS.tts.models.vits import Vits + from TTS.tts.utils.text.tokenizer import TTSTokenizer + from TTS.utils.audio import AudioProcessor + + dataset_config = BaseDatasetConfig( + formatter="ljspeech", + meta_file_train="metadata.csv", + path=dataset_dir, + language=language, + ) + + config = VitsConfig( + run_name=voice_name, + output_path=OUTPUT_DIR, + datasets=[dataset_config], + batch_size=batch_size, + eval_batch_size=max(1, batch_size // 2), + num_loader_workers=4, + num_eval_loader_workers=2, + run_eval=True, + test_delay_epochs=5, + epochs=num_epochs, + text_cleaner="phoneme_cleaners", + use_phonemes=True, + phoneme_language=language, + phoneme_cache_path=os.path.join(OUTPUT_DIR, "phoneme_cache"), + compute_input_seq_cache=True, + print_step=25, + print_eval=False, + mixed_precision=True, + save_step=500, + save_n_checkpoints=3, + save_best_after=1000, + lr=learning_rate, + audio={ + "sample_rate": 22050, + "resample": True, + "do_trim_silence": True, + "trim_db": 45, + }, + ) + + ap = AudioProcessor.init_from_config(config) + tokenizer, config = TTSTokenizer.init_from_config(config) + + train_samples, eval_samples = load_tts_samples( + dataset_config, + eval_split=True, + eval_split_max_size=config.eval_split_max_size, + eval_split_size=config.eval_split_size, + ) + print(f"Training samples: {len(train_samples)}") + print(f"Eval samples: {len(eval_samples)}") + + model = Vits(config, ap, tokenizer, speaker_manager=None) + + trainer_args = TrainerArgs( + restore_path=restore_path, + skip_train_epoch=False, + ) + + trainer = Trainer( + trainer_args, + config, + output_path=OUTPUT_DIR, + model=model, + train_samples=train_samples, + eval_samples=eval_samples, + ) + + trainer.fit() + + # ── Find best checkpoint ────────────────────────────────── + best_files = glob.glob(os.path.join(OUTPUT_DIR, "**/best_model*.pth"), recursive=True) + if not best_files: + best_files = glob.glob(os.path.join(OUTPUT_DIR, "**/*.pth"), recursive=True) + best_files.sort(key=os.path.getmtime, reverse=True) + best_checkpoint = best_files[0] if best_files else "" + + # Try to read final loss from trainer + final_loss = 0.0 + try: + final_loss = float(trainer.keep_avg_train["avg_loss"]) + except Exception: + pass + + print(f"Training complete. Best checkpoint: {best_checkpoint}") + print(f"Final loss: {final_loss:.4f}") + + return out(model_dir=OUTPUT_DIR, best_checkpoint=best_checkpoint, final_loss=final_loss) + + +# ────────────────────────────────────────────────────────────── +# 5. Push trained voice model to Gitea repository +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["requests"], +) +def push_model_to_gitea( + model_dir: str, + voice_name: str, + gitea_url: str = "http://gitea-http.gitea.svc.cluster.local:3000", + gitea_owner: str = "daviestechlabs", + gitea_repo: str = "voice-models", + gitea_username: str = "", + gitea_password: str = "", +) -> NamedTuple("PushOutput", [("repo_url", str), ("files_pushed", int)]): + """Package and push the trained model to a Gitea repository.""" + import base64 + import glob + import json + import os + import requests + + out = NamedTuple("PushOutput", [("repo_url", str), ("files_pushed", int)]) + session = requests.Session() + session.auth = (gitea_username, gitea_password) if gitea_username else None + + api = f"{gitea_url}/api/v1" + repo_url = f"{gitea_url}/{gitea_owner}/{gitea_repo}" + + # ── Ensure repo exists ──────────────────────────────────── + r = session.get(f"{api}/repos/{gitea_owner}/{gitea_repo}", timeout=30) + if r.status_code == 404: + print(f"Creating repository: {gitea_owner}/{gitea_repo}") + r = session.post( + f"{api}/orgs/{gitea_owner}/repos", + json={ + "name": gitea_repo, + "description": "Trained voice models from voice cloning pipeline", + "private": False, + "auto_init": True, + }, + timeout=30, + ) + if r.status_code not in (200, 201): + r = session.post( + f"{api}/user/repos", + json={"name": gitea_repo, "description": "Trained voice models", "auto_init": True}, + timeout=30, + ) + r.raise_for_status() + print("Repository created") + + # ── Collect model files ─────────────────────────────────── + files_to_push = [] + + # Best model checkpoint + for pattern in ["**/best_model*.pth", "**/*.pth"]: + found = glob.glob(os.path.join(model_dir, pattern), recursive=True) + if found: + found.sort(key=os.path.getmtime, reverse=True) + files_to_push.append(found[0]) + break + + # Config + for pattern in ["**/config.json"]: + found = glob.glob(os.path.join(model_dir, pattern), recursive=True) + if found: + files_to_push.append(found[0]) + + # Model info + model_info = { + "name": voice_name, + "type": "coqui-vits", + "base_model": "tts_models/en/ljspeech/vits", + "sample_rate": 22050, + } + info_path = os.path.join(model_dir, "model_info.json") + with open(info_path, "w") as f: + json.dump(model_info, f, indent=2) + files_to_push.append(info_path) + + # ── Push each file ──────────────────────────────────────── + pushed = 0 + for fpath in files_to_push: + rel = os.path.relpath(fpath, model_dir) + gitea_path = f"{voice_name}/{rel}" + print(f"Pushing: {gitea_path} ({os.path.getsize(fpath)} bytes)") + + with open(fpath, "rb") as f: + content_b64 = base64.b64encode(f.read()).decode() + + # Check if file exists + r = session.get( + f"{api}/repos/{gitea_owner}/{gitea_repo}/contents/{gitea_path}", + timeout=30, + ) + + payload = { + "content": content_b64, + "message": f"Upload {voice_name}: {rel}", + } + + if r.status_code == 200: + sha = r.json().get("sha", "") + payload["sha"] = sha + r = session.put( + f"{api}/repos/{gitea_owner}/{gitea_repo}/contents/{gitea_path}", + json=payload, timeout=120, + ) + else: + r = session.post( + f"{api}/repos/{gitea_owner}/{gitea_repo}/contents/{gitea_path}", + json=payload, timeout=120, + ) + + if r.status_code in (200, 201): + pushed += 1 + print(f" ✓ Pushed") + else: + print(f" ✗ Failed ({r.status_code}): {r.text[:200]}") + + print(f"\nPushed {pushed}/{len(files_to_push)} files to {repo_url}") + return out(repo_url=repo_url, files_pushed=pushed) + + +# ────────────────────────────────────────────────────────────── +# 6. Log metrics to MLflow +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["mlflow>=2.10.0", "requests"], +) +def log_training_metrics( + voice_name: str, + num_segments: int, + total_duration_s: float, + final_loss: float, + num_epochs: int, + batch_size: int, + learning_rate: float, + repo_url: str, + files_pushed: int, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", + experiment_name: str = "voice-cloning", +) -> NamedTuple("LogOutput", [("run_id", str)]): + """Log training run to MLflow.""" + import mlflow + from datetime import datetime + + out = NamedTuple("LogOutput", [("run_id", str)]) + + mlflow.set_tracking_uri(mlflow_tracking_uri) + mlflow.set_experiment(experiment_name) + + with mlflow.start_run(run_name=f"voice-clone-{voice_name}-{datetime.now():%Y%m%d-%H%M}") as run: + mlflow.log_params({ + "voice_name": voice_name, + "base_model": "tts_models/en/ljspeech/vits", + "model_type": "coqui-vits", + "num_epochs": num_epochs, + "batch_size": batch_size, + "learning_rate": learning_rate, + "sample_rate": 22050, + }) + mlflow.log_metrics({ + "num_training_segments": num_segments, + "total_audio_duration_s": total_duration_s, + "final_loss": final_loss, + "files_pushed": files_pushed, + }) + mlflow.set_tags({ + "pipeline": "voice-cloning", + "gitea_repo": repo_url, + "voice_name": voice_name, + }) + print(f"Logged to MLflow run: {run.info.run_id}") + return out(run_id=run.info.run_id) + + +# ────────────────────────────────────────────────────────────── +# Pipeline definition +# ────────────────────────────────────────────────────────────── +@dsl.pipeline( + name="Voice Cloning Pipeline", + description=( + "Extract a speaker from audio+transcript, fine-tune a Coqui VITS " + "voice model, push to Gitea, and log metrics to MLflow." + ), +) +def voice_cloning_pipeline( + s3_endpoint: str = "candlekeep.lab.daviestechlabs.io", + s3_bucket: str = "training-data", + s3_key: str = "", + target_speaker: str = "SPEAKER_0", + voice_name: str = "custom-voice", + language: str = "en", + base_model: str = "tts_models/en/ljspeech/vits", + num_epochs: int = 100, + batch_size: int = 16, + learning_rate: float = 0.0001, + min_segment_duration_s: float = 1.0, + max_segment_duration_s: float = 15.0, + # Whisper / inference endpoints + whisper_url: str = "http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper", + # Gitea + gitea_url: str = "http://gitea-http.gitea.svc.cluster.local:3000", + gitea_owner: str = "daviestechlabs", + gitea_repo: str = "voice-models", + gitea_username: str = "", + gitea_password: str = "", + # MLflow + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +): + # 1 - Download from Quobjects S3 and transcribe with Whisper + transcribed = transcribe_and_diarise( + s3_endpoint=s3_endpoint, + s3_bucket=s3_bucket, + s3_key=s3_key, + whisper_url=whisper_url, + ) + + # 2 - Extract target speaker's segments + extracted = extract_speaker_segments( + transcript_json=transcribed.outputs["transcript_json"], + audio_path=transcribed.outputs["audio_path"], + target_speaker=target_speaker, + min_duration_s=min_segment_duration_s, + max_duration_s=max_segment_duration_s, + ) + + # 3 - Build LJSpeech dataset + dataset = prepare_ljspeech_dataset( + segments_json=extracted.outputs["segments_json"], + voice_name=voice_name, + language=language, + ) + + # 4 - Train VITS model + trained = train_vits_voice( + dataset_dir=dataset.outputs["dataset_dir"], + voice_name=voice_name, + language=language, + base_model=base_model, + num_epochs=num_epochs, + batch_size=batch_size, + learning_rate=learning_rate, + ) + trained.set_accelerator_type("gpu") + trained.set_gpu_limit(1) + trained.set_memory_request("16Gi") + trained.set_memory_limit("32Gi") + trained.set_cpu_request("4") + trained.set_cpu_limit("8") + + # 5 - Push model to Gitea + pushed = push_model_to_gitea( + model_dir=trained.outputs["model_dir"], + voice_name=voice_name, + gitea_url=gitea_url, + gitea_owner=gitea_owner, + gitea_repo=gitea_repo, + gitea_username=gitea_username, + gitea_password=gitea_password, + ) + + # 6 - Log to MLflow + log_training_metrics( + voice_name=voice_name, + num_segments=extracted.outputs["num_segments"], + total_duration_s=extracted.outputs["total_duration_s"], + final_loss=trained.outputs["final_loss"], + num_epochs=num_epochs, + batch_size=batch_size, + learning_rate=learning_rate, + repo_url=pushed.outputs["repo_url"], + files_pushed=pushed.outputs["files_pushed"], + mlflow_tracking_uri=mlflow_tracking_uri, + ) + + +# ────────────────────────────────────────────────────────────── +# Compile +# ────────────────────────────────────────────────────────────── +if __name__ == "__main__": + compiler.Compiler().compile( + pipeline_func=voice_cloning_pipeline, + package_path="voice_cloning_pipeline.yaml", + ) + print("Compiled: voice_cloning_pipeline.yaml") diff --git a/voice_cloning_pipeline.yaml b/voice_cloning_pipeline.yaml new file mode 100644 index 0000000..c8ff783 --- /dev/null +++ b/voice_cloning_pipeline.yaml @@ -0,0 +1,876 @@ +# PIPELINE DEFINITION +# Name: voice-cloning-pipeline +# Description: Extract a speaker from audio+transcript, fine-tune a Coqui VITS voice model, push to Gitea, and log metrics to MLflow. +# Inputs: +# base_model: str [Default: 'tts_models/en/ljspeech/vits'] +# batch_size: int [Default: 16.0] +# gitea_owner: str [Default: 'daviestechlabs'] +# gitea_password: str [Default: ''] +# gitea_repo: str [Default: 'voice-models'] +# gitea_url: str [Default: 'http://gitea-http.gitea.svc.cluster.local:3000'] +# gitea_username: str [Default: ''] +# language: str [Default: 'en'] +# learning_rate: float [Default: 0.0001] +# max_segment_duration_s: float [Default: 15.0] +# min_segment_duration_s: float [Default: 1.0] +# mlflow_tracking_uri: str [Default: 'http://mlflow.mlflow.svc.cluster.local:80'] +# num_epochs: int [Default: 100.0] +# s3_bucket: str [Default: 'training-data'] +# s3_endpoint: str [Default: 'candlekeep.lab.daviestechlabs.io'] +# s3_key: str [Default: ''] +# target_speaker: str [Default: 'SPEAKER_0'] +# voice_name: str [Default: 'custom-voice'] +# whisper_url: str [Default: 'http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper'] +components: + comp-extract-speaker-segments: + executorLabel: exec-extract-speaker-segments + inputDefinitions: + parameters: + audio_path: + parameterType: STRING + max_duration_s: + defaultValue: 15.0 + isOptional: true + parameterType: NUMBER_DOUBLE + min_duration_s: + defaultValue: 1.0 + isOptional: true + parameterType: NUMBER_DOUBLE + target_speaker: + parameterType: STRING + transcript_json: + parameterType: STRING + outputDefinitions: + parameters: + num_segments: + parameterType: NUMBER_INTEGER + segments_json: + parameterType: STRING + total_duration_s: + parameterType: NUMBER_DOUBLE + comp-log-training-metrics: + executorLabel: exec-log-training-metrics + inputDefinitions: + parameters: + batch_size: + parameterType: NUMBER_INTEGER + experiment_name: + defaultValue: voice-cloning + isOptional: true + parameterType: STRING + files_pushed: + parameterType: NUMBER_INTEGER + final_loss: + parameterType: NUMBER_DOUBLE + learning_rate: + parameterType: NUMBER_DOUBLE + mlflow_tracking_uri: + defaultValue: http://mlflow.mlflow.svc.cluster.local:80 + isOptional: true + parameterType: STRING + num_epochs: + parameterType: NUMBER_INTEGER + num_segments: + parameterType: NUMBER_INTEGER + repo_url: + parameterType: STRING + total_duration_s: + parameterType: NUMBER_DOUBLE + voice_name: + parameterType: STRING + outputDefinitions: + parameters: + run_id: + parameterType: STRING + comp-prepare-ljspeech-dataset: + executorLabel: exec-prepare-ljspeech-dataset + inputDefinitions: + parameters: + language: + defaultValue: en + isOptional: true + parameterType: STRING + segments_json: + parameterType: STRING + voice_name: + parameterType: STRING + outputDefinitions: + parameters: + dataset_dir: + parameterType: STRING + num_samples: + parameterType: NUMBER_INTEGER + comp-push-model-to-gitea: + executorLabel: exec-push-model-to-gitea + inputDefinitions: + parameters: + gitea_owner: + defaultValue: daviestechlabs + isOptional: true + parameterType: STRING + gitea_password: + defaultValue: '' + isOptional: true + parameterType: STRING + gitea_repo: + defaultValue: voice-models + isOptional: true + parameterType: STRING + gitea_url: + defaultValue: http://gitea-http.gitea.svc.cluster.local:3000 + isOptional: true + parameterType: STRING + gitea_username: + defaultValue: '' + isOptional: true + parameterType: STRING + model_dir: + parameterType: STRING + voice_name: + parameterType: STRING + outputDefinitions: + parameters: + files_pushed: + parameterType: NUMBER_INTEGER + repo_url: + parameterType: STRING + comp-train-vits-voice: + executorLabel: exec-train-vits-voice + inputDefinitions: + parameters: + base_model: + defaultValue: tts_models/en/ljspeech/vits + isOptional: true + parameterType: STRING + batch_size: + defaultValue: 16.0 + isOptional: true + parameterType: NUMBER_INTEGER + dataset_dir: + parameterType: STRING + language: + defaultValue: en + isOptional: true + parameterType: STRING + learning_rate: + defaultValue: 0.0001 + isOptional: true + parameterType: NUMBER_DOUBLE + num_epochs: + defaultValue: 100.0 + isOptional: true + parameterType: NUMBER_INTEGER + voice_name: + parameterType: STRING + outputDefinitions: + parameters: + best_checkpoint: + parameterType: STRING + final_loss: + parameterType: NUMBER_DOUBLE + model_dir: + parameterType: STRING + comp-transcribe-and-diarise: + executorLabel: exec-transcribe-and-diarise + inputDefinitions: + parameters: + s3_bucket: + parameterType: STRING + s3_endpoint: + parameterType: STRING + s3_key: + parameterType: STRING + whisper_url: + defaultValue: http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + audio_path: + parameterType: STRING + speakers: + parameterType: STRING + transcript_json: + parameterType: STRING +deploymentSpec: + executors: + exec-extract-speaker-segments: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - extract_speaker_segments + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef extract_speaker_segments(\n transcript_json: str,\n audio_path:\ + \ str,\n target_speaker: str,\n min_duration_s: float = 1.0,\n \ + \ max_duration_s: float = 15.0,\n) -> NamedTuple(\"SpeakerSegments\", [(\"\ + segments_json\", str), (\"num_segments\", int), (\"total_duration_s\", float)]):\n\ + \ \"\"\"Slice the audio into per-utterance WAV files for the target speaker.\"\ + \"\"\n import json\n import os\n import subprocess\n import\ + \ tempfile\n\n out = NamedTuple(\"SpeakerSegments\", [(\"segments_json\"\ + , str), (\"num_segments\", int), (\"total_duration_s\", float)])\n work\ + \ = tempfile.mkdtemp()\n wavs_dir = os.path.join(work, \"wavs\")\n \ + \ os.makedirs(wavs_dir, exist_ok=True)\n\n # Install ffmpeg\n subprocess.run([\"\ + apt-get\", \"update\", \"-qq\"], capture_output=True)\n subprocess.run([\"\ + apt-get\", \"install\", \"-y\", \"-qq\", \"ffmpeg\"], capture_output=True,\ + \ check=True)\n\n segments = json.loads(transcript_json)\n\n # Filter\ + \ by speaker \u2014 fuzzy match (case-insensitive, partial)\n target_lower\ + \ = target_speaker.lower()\n matched = []\n for seg in segments:\n\ + \ spk = seg.get(\"speaker\", \"\").lower()\n if target_lower\ + \ in spk or spk in target_lower:\n matched.append(seg)\n\n \ + \ # If no speaker labels matched, the user may have given a name\n #\ + \ that doesn't appear. Fall back to using ALL segments.\n if not matched:\n\ + \ print(f\"WARNING: No segments matched speaker '{target_speaker}'.\ + \ \"\n f\"Using all {len(segments)} segments.\")\n matched\ + \ = segments\n\n print(f\"Matched {len(matched)} segments for speaker\ + \ '{target_speaker}'\")\n\n kept = []\n total_dur = 0.0\n for i,\ + \ seg in enumerate(matched):\n start = float(seg.get(\"start\", 0))\n\ + \ end = float(seg.get(\"end\", start + 5))\n duration = end\ + \ - start\n text = seg.get(\"text\", \"\").strip()\n\n if\ + \ duration < min_duration_s or not text:\n continue\n \ + \ if duration > max_duration_s:\n end = start + max_duration_s\n\ + \ duration = max_duration_s\n\n wav_name = f\"utt_{i:04d}.wav\"\ + \n wav_out = os.path.join(wavs_dir, wav_name)\n subprocess.run(\n\ + \ [\"ffmpeg\", \"-y\", \"-i\", audio_path,\n \"-ss\"\ + , str(start), \"-to\", str(end),\n \"-ac\", \"1\", \"-ar\",\ + \ \"22050\", \"-sample_fmt\", \"s16\",\n wav_out],\n \ + \ capture_output=True, check=True,\n )\n\n kept.append({\n\ + \ \"wav\": wav_name,\n \"text\": text,\n \ + \ \"start\": start,\n \"end\": end,\n \"duration\"\ + : round(duration, 2),\n })\n total_dur += duration\n\n \ + \ print(f\"Extracted {len(kept)} utterances, total {total_dur:.1f}s\")\n\ + \n return out(\n segments_json=json.dumps({\"wavs_dir\": wavs_dir,\ + \ \"utterances\": kept}),\n num_segments=len(kept),\n total_duration_s=round(total_dur,\ + \ 2),\n )\n\n" + image: python:3.13-slim + exec-log-training-metrics: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - log_training_metrics + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'mlflow>=2.10.0'\ + \ 'requests' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef log_training_metrics(\n voice_name: str,\n num_segments:\ + \ int,\n total_duration_s: float,\n final_loss: float,\n num_epochs:\ + \ int,\n batch_size: int,\n learning_rate: float,\n repo_url: str,\n\ + \ files_pushed: int,\n mlflow_tracking_uri: str = \"http://mlflow.mlflow.svc.cluster.local:80\"\ + ,\n experiment_name: str = \"voice-cloning\",\n) -> NamedTuple(\"LogOutput\"\ + , [(\"run_id\", str)]):\n \"\"\"Log training run to MLflow.\"\"\"\n \ + \ import mlflow\n from datetime import datetime\n\n out = NamedTuple(\"\ + LogOutput\", [(\"run_id\", str)])\n\n mlflow.set_tracking_uri(mlflow_tracking_uri)\n\ + \ mlflow.set_experiment(experiment_name)\n\n with mlflow.start_run(run_name=f\"\ + voice-clone-{voice_name}-{datetime.now():%Y%m%d-%H%M}\") as run:\n \ + \ mlflow.log_params({\n \"voice_name\": voice_name,\n \ + \ \"base_model\": \"tts_models/en/ljspeech/vits\",\n \"\ + model_type\": \"coqui-vits\",\n \"num_epochs\": num_epochs,\n\ + \ \"batch_size\": batch_size,\n \"learning_rate\"\ + : learning_rate,\n \"sample_rate\": 22050,\n })\n \ + \ mlflow.log_metrics({\n \"num_training_segments\": num_segments,\n\ + \ \"total_audio_duration_s\": total_duration_s,\n \ + \ \"final_loss\": final_loss,\n \"files_pushed\": files_pushed,\n\ + \ })\n mlflow.set_tags({\n \"pipeline\": \"voice-cloning\"\ + ,\n \"gitea_repo\": repo_url,\n \"voice_name\": voice_name,\n\ + \ })\n print(f\"Logged to MLflow run: {run.info.run_id}\"\ + )\n return out(run_id=run.info.run_id)\n\n" + image: python:3.13-slim + exec-prepare-ljspeech-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - prepare_ljspeech_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef prepare_ljspeech_dataset(\n segments_json: str,\n voice_name:\ + \ str,\n language: str = \"en\",\n) -> NamedTuple(\"DatasetOutput\",\ + \ [(\"dataset_dir\", str), (\"num_samples\", int)]):\n \"\"\"Create metadata.csv\ + \ + wavs/ in LJSpeech format.\"\"\"\n import json\n import os\n \ + \ import shutil\n\n out = NamedTuple(\"DatasetOutput\", [(\"dataset_dir\"\ + , str), (\"num_samples\", int)])\n\n data = json.loads(segments_json)\n\ + \ wavs_src = data[\"wavs_dir\"]\n utterances = data[\"utterances\"\ + ]\n\n dataset_dir = os.path.join(os.path.dirname(wavs_src), \"dataset\"\ + )\n wavs_dst = os.path.join(dataset_dir, \"wavs\")\n os.makedirs(wavs_dst,\ + \ exist_ok=True)\n\n lines = []\n for utt in utterances:\n \ + \ src = os.path.join(wavs_src, utt[\"wav\"])\n dst = os.path.join(wavs_dst,\ + \ utt[\"wav\"])\n shutil.copy2(src, dst)\n stem = os.path.splitext(utt[\"\ + wav\"])[0]\n # LJSpeech format: id|text|text\n text = utt[\"\ + text\"].replace(\"|\", \" \")\n lines.append(f\"{stem}|{text}|{text}\"\ + )\n\n metadata_path = os.path.join(dataset_dir, \"metadata.csv\")\n \ + \ with open(metadata_path, \"w\", encoding=\"utf-8\") as f:\n f.write(\"\ + \\n\".join(lines))\n\n # Dataset config for reference\n import json\ + \ as _json\n config = {\n \"name\": voice_name,\n \"language\"\ + : language,\n \"num_samples\": len(lines),\n \"format\": \"\ + ljspeech\",\n \"sample_rate\": 22050,\n }\n with open(os.path.join(dataset_dir,\ + \ \"dataset_config.json\"), \"w\") as f:\n _json.dump(config, f,\ + \ indent=2)\n\n print(f\"LJSpeech dataset ready: {len(lines)} samples\"\ + )\n return out(dataset_dir=dataset_dir, num_samples=len(lines))\n\n" + image: python:3.13-slim + exec-push-model-to-gitea: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - push_model_to_gitea + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'requests' &&\ + \ \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef push_model_to_gitea(\n model_dir: str,\n voice_name: str,\n\ + \ gitea_url: str = \"http://gitea-http.gitea.svc.cluster.local:3000\"\ + ,\n gitea_owner: str = \"daviestechlabs\",\n gitea_repo: str = \"\ + voice-models\",\n gitea_username: str = \"\",\n gitea_password: str\ + \ = \"\",\n) -> NamedTuple(\"PushOutput\", [(\"repo_url\", str), (\"files_pushed\"\ + , int)]):\n \"\"\"Package and push the trained model to a Gitea repository.\"\ + \"\"\n import base64\n import glob\n import json\n import os\n\ + \ import requests\n\n out = NamedTuple(\"PushOutput\", [(\"repo_url\"\ + , str), (\"files_pushed\", int)])\n session = requests.Session()\n \ + \ session.auth = (gitea_username, gitea_password) if gitea_username else\ + \ None\n\n api = f\"{gitea_url}/api/v1\"\n repo_url = f\"{gitea_url}/{gitea_owner}/{gitea_repo}\"\ + \n\n # \u2500\u2500 Ensure repo exists \u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\n r = session.get(f\"{api}/repos/{gitea_owner}/{gitea_repo}\"\ + , timeout=30)\n if r.status_code == 404:\n print(f\"Creating repository:\ + \ {gitea_owner}/{gitea_repo}\")\n r = session.post(\n \ + \ f\"{api}/orgs/{gitea_owner}/repos\",\n json={\n \ + \ \"name\": gitea_repo,\n \"description\": \"Trained\ + \ voice models from voice cloning pipeline\",\n \"private\"\ + : False,\n \"auto_init\": True,\n },\n \ + \ timeout=30,\n )\n if r.status_code not in (200, 201):\n\ + \ r = session.post(\n f\"{api}/user/repos\",\n\ + \ json={\"name\": gitea_repo, \"description\": \"Trained\ + \ voice models\", \"auto_init\": True},\n timeout=30,\n \ + \ )\n r.raise_for_status()\n print(\"Repository\ + \ created\")\n\n # \u2500\u2500 Collect model files \u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n files_to_push = []\n\ + \n # Best model checkpoint\n for pattern in [\"**/best_model*.pth\"\ + , \"**/*.pth\"]:\n found = glob.glob(os.path.join(model_dir, pattern),\ + \ recursive=True)\n if found:\n found.sort(key=os.path.getmtime,\ + \ reverse=True)\n files_to_push.append(found[0])\n \ + \ break\n\n # Config\n for pattern in [\"**/config.json\"]:\n \ + \ found = glob.glob(os.path.join(model_dir, pattern), recursive=True)\n\ + \ if found:\n files_to_push.append(found[0])\n\n #\ + \ Model info\n model_info = {\n \"name\": voice_name,\n \ + \ \"type\": \"coqui-vits\",\n \"base_model\": \"tts_models/en/ljspeech/vits\"\ + ,\n \"sample_rate\": 22050,\n }\n info_path = os.path.join(model_dir,\ + \ \"model_info.json\")\n with open(info_path, \"w\") as f:\n json.dump(model_info,\ + \ f, indent=2)\n files_to_push.append(info_path)\n\n # \u2500\u2500\ + \ Push each file \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\n pushed = 0\n for fpath in files_to_push:\n\ + \ rel = os.path.relpath(fpath, model_dir)\n gitea_path = f\"\ + {voice_name}/{rel}\"\n print(f\"Pushing: {gitea_path} ({os.path.getsize(fpath)}\ + \ bytes)\")\n\n with open(fpath, \"rb\") as f:\n content_b64\ + \ = base64.b64encode(f.read()).decode()\n\n # Check if file exists\n\ + \ r = session.get(\n f\"{api}/repos/{gitea_owner}/{gitea_repo}/contents/{gitea_path}\"\ + ,\n timeout=30,\n )\n\n payload = {\n \ + \ \"content\": content_b64,\n \"message\": f\"Upload {voice_name}:\ + \ {rel}\",\n }\n\n if r.status_code == 200:\n sha\ + \ = r.json().get(\"sha\", \"\")\n payload[\"sha\"] = sha\n \ + \ r = session.put(\n f\"{api}/repos/{gitea_owner}/{gitea_repo}/contents/{gitea_path}\"\ + ,\n json=payload, timeout=120,\n )\n else:\n\ + \ r = session.post(\n f\"{api}/repos/{gitea_owner}/{gitea_repo}/contents/{gitea_path}\"\ + ,\n json=payload, timeout=120,\n )\n\n \ + \ if r.status_code in (200, 201):\n pushed += 1\n \ + \ print(f\" \u2713 Pushed\")\n else:\n print(f\" \u2717\ + \ Failed ({r.status_code}): {r.text[:200]}\")\n\n print(f\"\\nPushed\ + \ {pushed}/{len(files_to_push)} files to {repo_url}\")\n return out(repo_url=repo_url,\ + \ files_pushed=pushed)\n\n" + image: python:3.13-slim + exec-train-vits-voice: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_vits_voice + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_vits_voice(\n dataset_dir: str,\n voice_name: str,\n\ + \ language: str = \"en\",\n base_model: str = \"tts_models/en/ljspeech/vits\"\ + ,\n num_epochs: int = 100,\n batch_size: int = 16,\n learning_rate:\ + \ float = 0.0001,\n) -> NamedTuple(\"TrainOutput\", [(\"model_dir\", str),\ + \ (\"best_checkpoint\", str), (\"final_loss\", float)]):\n \"\"\"Fine-tune\ + \ a VITS model on the speaker dataset.\"\"\"\n import os\n import\ + \ json\n import glob\n\n out = NamedTuple(\"TrainOutput\", [(\"model_dir\"\ + , str), (\"best_checkpoint\", str), (\"final_loss\", float)])\n\n OUTPUT_DIR\ + \ = \"/tmp/vits_output\"\n os.makedirs(OUTPUT_DIR, exist_ok=True)\n\n\ + \ print(f\"=== Coqui VITS Voice Training ===\")\n print(f\"Voice name\ + \ : {voice_name}\")\n print(f\"Base model : {base_model}\")\n print(f\"\ + Dataset : {dataset_dir}\")\n print(f\"Epochs : {num_epochs}\"\ + )\n print(f\"Batch size : {batch_size}\")\n print(f\"LR :\ + \ {learning_rate}\")\n\n # \u2500\u2500 Download base model checkpoint\ + \ \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \n restore_path = None\n if base_model and base_model != \"none\"\ + :\n from TTS.utils.manage import ModelManager\n manager =\ + \ ModelManager()\n model_path, config_path, _ = manager.download_model(base_model)\n\ + \ restore_path = model_path\n print(f\"Base model checkpoint:\ + \ {restore_path}\")\n\n # \u2500\u2500 Configure and train \u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n from trainer\ + \ import Trainer, TrainerArgs\n from TTS.tts.configs.vits_config import\ + \ VitsConfig\n from TTS.tts.configs.shared_configs import BaseDatasetConfig\n\ + \ from TTS.tts.datasets import load_tts_samples\n from TTS.tts.models.vits\ + \ import Vits\n from TTS.tts.utils.text.tokenizer import TTSTokenizer\n\ + \ from TTS.utils.audio import AudioProcessor\n\n dataset_config =\ + \ BaseDatasetConfig(\n formatter=\"ljspeech\",\n meta_file_train=\"\ + metadata.csv\",\n path=dataset_dir,\n language=language,\n\ + \ )\n\n config = VitsConfig(\n run_name=voice_name,\n \ + \ output_path=OUTPUT_DIR,\n datasets=[dataset_config],\n \ + \ batch_size=batch_size,\n eval_batch_size=max(1, batch_size //\ + \ 2),\n num_loader_workers=4,\n num_eval_loader_workers=2,\n\ + \ run_eval=True,\n test_delay_epochs=5,\n epochs=num_epochs,\n\ + \ text_cleaner=\"phoneme_cleaners\",\n use_phonemes=True,\n\ + \ phoneme_language=language,\n phoneme_cache_path=os.path.join(OUTPUT_DIR,\ + \ \"phoneme_cache\"),\n compute_input_seq_cache=True,\n print_step=25,\n\ + \ print_eval=False,\n mixed_precision=True,\n save_step=500,\n\ + \ save_n_checkpoints=3,\n save_best_after=1000,\n lr=learning_rate,\n\ + \ audio={\n \"sample_rate\": 22050,\n \"resample\"\ + : True,\n \"do_trim_silence\": True,\n \"trim_db\"\ + : 45,\n },\n )\n\n ap = AudioProcessor.init_from_config(config)\n\ + \ tokenizer, config = TTSTokenizer.init_from_config(config)\n\n train_samples,\ + \ eval_samples = load_tts_samples(\n dataset_config,\n eval_split=True,\n\ + \ eval_split_max_size=config.eval_split_max_size,\n eval_split_size=config.eval_split_size,\n\ + \ )\n print(f\"Training samples: {len(train_samples)}\")\n print(f\"\ + Eval samples: {len(eval_samples)}\")\n\n model = Vits(config, ap,\ + \ tokenizer, speaker_manager=None)\n\n trainer_args = TrainerArgs(\n\ + \ restore_path=restore_path,\n skip_train_epoch=False,\n \ + \ )\n\n trainer = Trainer(\n trainer_args,\n config,\n\ + \ output_path=OUTPUT_DIR,\n model=model,\n train_samples=train_samples,\n\ + \ eval_samples=eval_samples,\n )\n\n trainer.fit()\n\n #\ + \ \u2500\u2500 Find best checkpoint \u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\n best_files = glob.glob(os.path.join(OUTPUT_DIR,\ + \ \"**/best_model*.pth\"), recursive=True)\n if not best_files:\n \ + \ best_files = glob.glob(os.path.join(OUTPUT_DIR, \"**/*.pth\"), recursive=True)\n\ + \ best_files.sort(key=os.path.getmtime, reverse=True)\n best_checkpoint\ + \ = best_files[0] if best_files else \"\"\n\n # Try to read final loss\ + \ from trainer\n final_loss = 0.0\n try:\n final_loss = float(trainer.keep_avg_train[\"\ + avg_loss\"])\n except Exception:\n pass\n\n print(f\"Training\ + \ complete. Best checkpoint: {best_checkpoint}\")\n print(f\"Final loss:\ + \ {final_loss:.4f}\")\n\n return out(model_dir=OUTPUT_DIR, best_checkpoint=best_checkpoint,\ + \ final_loss=final_loss)\n\n" + image: ghcr.io/coqui-ai/tts:latest + resources: + accelerator: + resourceCount: '1' + resourceType: gpu + resourceCpuLimit: '8' + resourceCpuRequest: '4' + resourceMemoryLimit: 32Gi + resourceMemoryRequest: 16Gi + exec-transcribe-and-diarise: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - transcribe_and_diarise + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'requests' 'boto3'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef transcribe_and_diarise(\n s3_endpoint: str,\n s3_bucket:\ + \ str,\n s3_key: str,\n whisper_url: str = \"http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper\"\ + ,\n) -> NamedTuple(\"TranscriptOutput\", [(\"transcript_json\", str), (\"\ + speakers\", str), (\"audio_path\", str)]):\n \"\"\"Download audio from\ + \ Quobjects S3, transcribe via Whisper with timestamps.\"\"\"\n import\ + \ json\n import os\n import subprocess\n import tempfile\n import\ + \ base64\n import boto3\n import requests\n\n out = NamedTuple(\"\ + TranscriptOutput\", [(\"transcript_json\", str), (\"speakers\", str), (\"\ + audio_path\", str)])\n work = tempfile.mkdtemp()\n\n # \u2500\u2500\ + \ Download audio from Quobjects S3 \u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\n ext = os.path.splitext(s3_key)[-1] or \".wav\"\n audio_path\ + \ = os.path.join(work, f\"audio_raw{ext}\")\n\n client = boto3.client(\n\ + \ \"s3\",\n endpoint_url=f\"http://{s3_endpoint}\",\n \ + \ aws_access_key_id=\"\",\n aws_secret_access_key=\"\",\n \ + \ config=boto3.session.Config(signature_version=\"UNSIGNED\"),\n )\n\ + \ print(f\"Downloading s3://{s3_bucket}/{s3_key} from {s3_endpoint}\"\ + )\n client.download_file(s3_bucket, s3_key, audio_path)\n print(f\"\ + Downloaded {os.path.getsize(audio_path)} bytes\")\n\n # \u2500\u2500\ + \ Normalise to 16 kHz mono WAV \u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\ + \u2500\u2500\u2500\u2500\u2500\u2500\n wav_path = os.path.join(work,\ + \ \"audio.wav\")\n subprocess.run(\n [\"apt-get\", \"update\"\ + , \"-qq\"],\n capture_output=True,\n )\n subprocess.run(\n\ + \ [\"apt-get\", \"install\", \"-y\", \"-qq\", \"ffmpeg\"],\n \ + \ capture_output=True, check=True,\n )\n subprocess.run(\n \ + \ [\"ffmpeg\", \"-y\", \"-i\", audio_path, \"-ac\", \"1\",\n \ + \ \"-ar\", \"16000\", \"-sample_fmt\", \"s16\", wav_path],\n capture_output=True,\ + \ check=True,\n )\n\n # \u2500\u2500 Send to Whisper for timestamped\ + \ transcription \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ + \ with open(wav_path, \"rb\") as f:\n audio_b64 = base64.b64encode(f.read()).decode()\n\ + \n payload = {\n \"audio\": audio_b64,\n \"response_format\"\ + : \"verbose_json\",\n \"timestamp_granularities\": [\"segment\"],\n\ + \ }\n resp = requests.post(whisper_url, json=payload, timeout=600)\n\ + \ resp.raise_for_status()\n result = resp.json()\n\n segments =\ + \ result.get(\"segments\", [])\n print(f\"Whisper returned {len(segments)}\ + \ segments\")\n\n # \u2500\u2500 Group segments by speaker if diarisation\ + \ is present \u2500\u2500\u2500\n # Whisper may not diarise, but we still\ + \ produce segments with\n # start/end timestamps that the next step can\ + \ use.\n speakers = set()\n for i, seg in enumerate(segments):\n \ + \ spk = seg.get(\"speaker\", f\"SPEAKER_{i // 10}\")\n seg[\"\ + speaker\"] = spk\n speakers.add(spk)\n\n speakers_list = sorted(speakers)\n\ + \ print(f\"Detected speakers: {speakers_list}\")\n\n return out(\n\ + \ transcript_json=json.dumps(segments),\n speakers=json.dumps(speakers_list),\n\ + \ audio_path=wav_path,\n )\n\n" + image: python:3.13-slim +pipelineInfo: + description: Extract a speaker from audio+transcript, fine-tune a Coqui VITS voice + model, push to Gitea, and log metrics to MLflow. + name: voice-cloning-pipeline +root: + dag: + tasks: + extract-speaker-segments: + cachingOptions: + enableCache: true + componentRef: + name: comp-extract-speaker-segments + dependentTasks: + - transcribe-and-diarise + inputs: + parameters: + audio_path: + taskOutputParameter: + outputParameterKey: audio_path + producerTask: transcribe-and-diarise + max_duration_s: + componentInputParameter: max_segment_duration_s + min_duration_s: + componentInputParameter: min_segment_duration_s + target_speaker: + componentInputParameter: target_speaker + transcript_json: + taskOutputParameter: + outputParameterKey: transcript_json + producerTask: transcribe-and-diarise + taskInfo: + name: extract-speaker-segments + log-training-metrics: + cachingOptions: + enableCache: true + componentRef: + name: comp-log-training-metrics + dependentTasks: + - extract-speaker-segments + - push-model-to-gitea + - train-vits-voice + inputs: + parameters: + batch_size: + componentInputParameter: batch_size + files_pushed: + taskOutputParameter: + outputParameterKey: files_pushed + producerTask: push-model-to-gitea + final_loss: + taskOutputParameter: + outputParameterKey: final_loss + producerTask: train-vits-voice + learning_rate: + componentInputParameter: learning_rate + mlflow_tracking_uri: + componentInputParameter: mlflow_tracking_uri + num_epochs: + componentInputParameter: num_epochs + num_segments: + taskOutputParameter: + outputParameterKey: num_segments + producerTask: extract-speaker-segments + repo_url: + taskOutputParameter: + outputParameterKey: repo_url + producerTask: push-model-to-gitea + total_duration_s: + taskOutputParameter: + outputParameterKey: total_duration_s + producerTask: extract-speaker-segments + voice_name: + componentInputParameter: voice_name + taskInfo: + name: log-training-metrics + prepare-ljspeech-dataset: + cachingOptions: + enableCache: true + componentRef: + name: comp-prepare-ljspeech-dataset + dependentTasks: + - extract-speaker-segments + inputs: + parameters: + language: + componentInputParameter: language + segments_json: + taskOutputParameter: + outputParameterKey: segments_json + producerTask: extract-speaker-segments + voice_name: + componentInputParameter: voice_name + taskInfo: + name: prepare-ljspeech-dataset + push-model-to-gitea: + cachingOptions: + enableCache: true + componentRef: + name: comp-push-model-to-gitea + dependentTasks: + - train-vits-voice + inputs: + parameters: + gitea_owner: + componentInputParameter: gitea_owner + gitea_password: + componentInputParameter: gitea_password + gitea_repo: + componentInputParameter: gitea_repo + gitea_url: + componentInputParameter: gitea_url + gitea_username: + componentInputParameter: gitea_username + model_dir: + taskOutputParameter: + outputParameterKey: model_dir + producerTask: train-vits-voice + voice_name: + componentInputParameter: voice_name + taskInfo: + name: push-model-to-gitea + train-vits-voice: + cachingOptions: + enableCache: true + componentRef: + name: comp-train-vits-voice + dependentTasks: + - prepare-ljspeech-dataset + inputs: + parameters: + base_model: + componentInputParameter: base_model + batch_size: + componentInputParameter: batch_size + dataset_dir: + taskOutputParameter: + outputParameterKey: dataset_dir + producerTask: prepare-ljspeech-dataset + language: + componentInputParameter: language + learning_rate: + componentInputParameter: learning_rate + num_epochs: + componentInputParameter: num_epochs + voice_name: + componentInputParameter: voice_name + taskInfo: + name: train-vits-voice + transcribe-and-diarise: + cachingOptions: + enableCache: true + componentRef: + name: comp-transcribe-and-diarise + inputs: + parameters: + s3_bucket: + componentInputParameter: s3_bucket + s3_endpoint: + componentInputParameter: s3_endpoint + s3_key: + componentInputParameter: s3_key + whisper_url: + componentInputParameter: whisper_url + taskInfo: + name: transcribe-and-diarise + inputDefinitions: + parameters: + base_model: + defaultValue: tts_models/en/ljspeech/vits + isOptional: true + parameterType: STRING + batch_size: + defaultValue: 16.0 + isOptional: true + parameterType: NUMBER_INTEGER + gitea_owner: + defaultValue: daviestechlabs + isOptional: true + parameterType: STRING + gitea_password: + defaultValue: '' + isOptional: true + parameterType: STRING + gitea_repo: + defaultValue: voice-models + isOptional: true + parameterType: STRING + gitea_url: + defaultValue: http://gitea-http.gitea.svc.cluster.local:3000 + isOptional: true + parameterType: STRING + gitea_username: + defaultValue: '' + isOptional: true + parameterType: STRING + language: + defaultValue: en + isOptional: true + parameterType: STRING + learning_rate: + defaultValue: 0.0001 + isOptional: true + parameterType: NUMBER_DOUBLE + max_segment_duration_s: + defaultValue: 15.0 + isOptional: true + parameterType: NUMBER_DOUBLE + min_segment_duration_s: + defaultValue: 1.0 + isOptional: true + parameterType: NUMBER_DOUBLE + mlflow_tracking_uri: + defaultValue: http://mlflow.mlflow.svc.cluster.local:80 + isOptional: true + parameterType: STRING + num_epochs: + defaultValue: 100.0 + isOptional: true + parameterType: NUMBER_INTEGER + s3_bucket: + defaultValue: training-data + isOptional: true + parameterType: STRING + s3_endpoint: + defaultValue: candlekeep.lab.daviestechlabs.io + isOptional: true + parameterType: STRING + s3_key: + defaultValue: '' + isOptional: true + parameterType: STRING + target_speaker: + defaultValue: SPEAKER_0 + isOptional: true + parameterType: STRING + voice_name: + defaultValue: custom-voice + isOptional: true + parameterType: STRING + whisper_url: + defaultValue: http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.12.1