#!/usr/bin/env python3 """ DVD / Video Transcription Pipeline – Kubeflow Pipelines SDK Extracts audio from a DVD (ISO, VOB) or video file (MKV, MP4, etc.), chunks it into segments, sends each to the Whisper STT service, and merges the results into a complete timestamped transcript. Usage: pip install kfp==2.12.1 python dvd_transcription_pipeline.py # Upload dvd_transcription_pipeline.yaml to Kubeflow Pipelines UI """ from kfp import compiler, dsl from typing import NamedTuple # ────────────────────────────────────────────────────────────── # 1. Extract audio from DVD / video file via ffmpeg # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=["requests"], ) def extract_audio( source_path: str, sample_rate: int = 16000, mono: bool = True, ) -> NamedTuple("AudioOutput", [("wav_path", str), ("duration_s", float)]): """Extract audio from a video/DVD file using ffmpeg. Args: source_path: Path to DVD ISO, VOB, MKV, MP4, or any ffmpeg- supported file. Can also be a /dev/sr0 device. sample_rate: Target sample rate (16 kHz is optimal for Whisper). mono: Down-mix to mono (Whisper expects single-channel). """ import os import subprocess import json # Install ffmpeg inside the container subprocess.run( ["apt-get", "update", "-qq"], check=True, capture_output=True, ) subprocess.run( ["apt-get", "install", "-y", "-qq", "ffmpeg"], check=True, capture_output=True, ) out_dir = "/tmp/dvd_audio" os.makedirs(out_dir, exist_ok=True) wav_path = os.path.join(out_dir, "full_audio.wav") # Build ffmpeg command cmd = ["ffmpeg", "-y"] # Handle DVD ISOs – mount via concat demuxer or direct input if source_path.lower().endswith(".iso"): # For ISOs, ffmpeg can read via dvdread protocol cmd += ["-i", f"dvd://{source_path}"] else: cmd += ["-i", source_path] # Audio extraction options cmd += [ "-vn", # drop video "-acodec", "pcm_s16le", # 16-bit WAV "-ar", str(sample_rate), # resample ] if mono: cmd += ["-ac", "1"] # down-mix to mono cmd += [wav_path] print(f"Running: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, timeout=7200) if result.returncode != 0: raise RuntimeError(f"ffmpeg failed:\n{result.stderr}") # Get duration via ffprobe probe = subprocess.run( [ "ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "json", wav_path, ], capture_output=True, text=True, ) duration_s = float(json.loads(probe.stdout)["format"]["duration"]) file_size_mb = os.path.getsize(wav_path) / (1024 * 1024) print(f"Extracted: {wav_path} ({file_size_mb:.1f} MB, {duration_s:.1f}s)") from collections import namedtuple AudioOutput = namedtuple("AudioOutput", ["wav_path", "duration_s"]) return AudioOutput(wav_path=wav_path, duration_s=duration_s) # ────────────────────────────────────────────────────────────── # 2. Split audio into manageable chunks # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", ) def chunk_audio( wav_path: str, chunk_duration_s: int = 300, ) -> NamedTuple("ChunkOutput", [("chunk_paths", list), ("num_chunks", int)]): """Split a WAV file into fixed-duration chunks. Args: wav_path: Path to the mono 16 kHz WAV. chunk_duration_s: Seconds per chunk (default 5 minutes). """ import os import subprocess import glob subprocess.run( ["apt-get", "update", "-qq"], check=True, capture_output=True, ) subprocess.run( ["apt-get", "install", "-y", "-qq", "ffmpeg"], check=True, capture_output=True, ) out_dir = "/tmp/dvd_chunks" os.makedirs(out_dir, exist_ok=True) pattern = os.path.join(out_dir, "chunk_%04d.wav") cmd = [ "ffmpeg", "-y", "-i", wav_path, "-f", "segment", "-segment_time", str(chunk_duration_s), "-c", "copy", pattern, ] print(f"Chunking: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600) if result.returncode != 0: raise RuntimeError(f"ffmpeg chunk failed:\n{result.stderr}") chunks = sorted(glob.glob(os.path.join(out_dir, "chunk_*.wav"))) print(f"Created {len(chunks)} chunks of ~{chunk_duration_s}s each") from collections import namedtuple ChunkOutput = namedtuple("ChunkOutput", ["chunk_paths", "num_chunks"]) return ChunkOutput(chunk_paths=chunks, num_chunks=len(chunks)) # ────────────────────────────────────────────────────────────── # 3. Transcribe all chunks via Whisper STT endpoint # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=["requests"], ) def transcribe_chunks( chunk_paths: list, whisper_url: str = "http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper", language: str = "en", response_format: str = "verbose_json", ) -> NamedTuple("TranscriptOutput", [("segments", list), ("full_text", str), ("total_duration_s", float)]): """Send each audio chunk to the Whisper STT endpoint. Args: chunk_paths: List of WAV file paths to transcribe. whisper_url: In-cluster Whisper endpoint URL. language: Language code for Whisper (None for auto-detect). response_format: 'json', 'verbose_json', or 'text'. """ import base64 import json import time import requests all_segments = [] full_text_parts = [] total_audio_duration = 0.0 time_offset = 0.0 # cumulative offset for absolute timestamps for i, path in enumerate(chunk_paths): print(f"Transcribing chunk {i + 1}/{len(chunk_paths)}: {path}") # Read and base64-encode the chunk with open(path, "rb") as f: audio_b64 = base64.b64encode(f.read()).decode("utf-8") payload = { "audio": audio_b64, "audio_format": "wav", "language": language, "task": "transcribe", "response_format": response_format, "word_timestamps": False, } start = time.time() resp = requests.post(whisper_url, json=payload, timeout=600) elapsed = time.time() - start resp.raise_for_status() data = resp.json() chunk_duration = data.get("duration", 0.0) total_audio_duration += chunk_duration if "segments" in data: for seg in data["segments"]: # Offset timestamps to be absolute within the full audio seg["start"] += time_offset seg["end"] += time_offset all_segments.append(seg) chunk_text = data.get("text", "") full_text_parts.append(chunk_text) time_offset += chunk_duration rtf = elapsed / chunk_duration if chunk_duration > 0 else 0 print(f" → {len(chunk_text)} chars, {chunk_duration:.1f}s audio in {elapsed:.1f}s (RTF={rtf:.2f})") full_text = "\n".join(full_text_parts) print(f"\nTotal: {len(all_segments)} segments, {total_audio_duration:.1f}s audio") print(f"Transcript length: {len(full_text)} characters") from collections import namedtuple TranscriptOutput = namedtuple("TranscriptOutput", ["segments", "full_text", "total_duration_s"]) return TranscriptOutput( segments=all_segments, full_text=full_text.strip(), total_duration_s=total_audio_duration, ) # ────────────────────────────────────────────────────────────── # 4. Format and save the final transcript # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", ) def format_transcript( segments: list, full_text: str, total_duration_s: float, output_format: str = "srt", ) -> NamedTuple("FormattedOutput", [("transcript", str), ("output_path", str)]): """Format the transcript as SRT, VTT, or plain text. Args: segments: List of segment dicts with start/end/text. full_text: Full concatenated text. total_duration_s: Total audio duration in seconds. output_format: 'srt', 'vtt', or 'txt'. """ import os def _fmt_ts_srt(seconds: float) -> str: h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) ms = int((seconds % 1) * 1000) return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" def _fmt_ts_vtt(seconds: float) -> str: h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) ms = int((seconds % 1) * 1000) return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}" out_dir = "/tmp/dvd_transcript" os.makedirs(out_dir, exist_ok=True) if output_format == "srt": lines = [] for i, seg in enumerate(segments, 1): start_ts = _fmt_ts_srt(seg["start"]) end_ts = _fmt_ts_srt(seg["end"]) text = seg.get("text", "").strip() lines.append(f"{i}\n{start_ts} --> {end_ts}\n{text}\n") transcript = "\n".join(lines) ext = "srt" elif output_format == "vtt": lines = ["WEBVTT\n"] for seg in segments: start_ts = _fmt_ts_vtt(seg["start"]) end_ts = _fmt_ts_vtt(seg["end"]) text = seg.get("text", "").strip() lines.append(f"{start_ts} --> {end_ts}\n{text}\n") transcript = "\n".join(lines) ext = "vtt" else: # txt transcript = full_text ext = "txt" out_path = os.path.join(out_dir, f"transcript.{ext}") with open(out_path, "w", encoding="utf-8") as f: f.write(transcript) h = int(total_duration_s // 3600) m = int((total_duration_s % 3600) // 60) print(f"Transcript saved: {out_path}") print(f"Audio duration: {h}h {m}m, Segments: {len(segments)}") print(f"Format: {output_format.upper()}, Size: {len(transcript)} chars") from collections import namedtuple FormattedOutput = namedtuple("FormattedOutput", ["transcript", "output_path"]) return FormattedOutput(transcript=transcript, output_path=out_path) # ────────────────────────────────────────────────────────────── # 5. (Optional) Log metrics to MLflow # ────────────────────────────────────────────────────────────── @dsl.component( base_image="python:3.13-slim", packages_to_install=["mlflow==2.22.0"], ) def log_transcription_metrics( total_duration_s: float, full_text: str, source_path: str, mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", experiment_name: str = "dvd-transcription", ): """Log transcription run metrics to MLflow.""" import mlflow mlflow.set_tracking_uri(mlflow_tracking_uri) mlflow.set_experiment(experiment_name) with mlflow.start_run(run_name=f"transcribe-{source_path.split('/')[-1]}"): mlflow.log_params({ "source_path": source_path, "model": "whisper-large-v3", }) mlflow.log_metrics({ "audio_duration_s": total_duration_s, "transcript_chars": float(len(full_text)), }) # ────────────────────────────────────────────────────────────── # Pipeline definition # ────────────────────────────────────────────────────────────── @dsl.pipeline( name="DVD / Video Transcription", description=( "Extract audio from a DVD or video file, transcribe it via Whisper STT, " "and produce a timestamped transcript (SRT/VTT/TXT)." ), ) def dvd_transcription_pipeline( source_path: str = "/data/dvd/movie.mkv", chunk_duration_s: int = 300, language: str = "en", output_format: str = "srt", whisper_url: str = "http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper", mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", ): # Step 1: Extract audio from the video/DVD source audio = extract_audio( source_path=source_path, sample_rate=16000, mono=True, ) # Step 2: Chunk the audio into manageable segments chunks = chunk_audio( wav_path=audio.outputs["wav_path"], chunk_duration_s=chunk_duration_s, ) # Step 3: Transcribe each chunk via the Whisper endpoint transcript = transcribe_chunks( chunk_paths=chunks.outputs["chunk_paths"], whisper_url=whisper_url, language=language, response_format="verbose_json", ) # Step 4: Format the transcript into the desired output format formatted = format_transcript( segments=transcript.outputs["segments"], full_text=transcript.outputs["full_text"], total_duration_s=transcript.outputs["total_duration_s"], output_format=output_format, ) # Step 5: Log metrics to MLflow log_transcription_metrics( total_duration_s=transcript.outputs["total_duration_s"], full_text=transcript.outputs["full_text"], source_path=source_path, mlflow_tracking_uri=mlflow_tracking_uri, ) # ────────────────────────────────────────────────────────────── # Compile # ────────────────────────────────────────────────────────────── if __name__ == "__main__": compiler.Compiler().compile( pipeline_func=dvd_transcription_pipeline, package_path="dvd_transcription_pipeline.yaml", ) print("Compiled: dvd_transcription_pipeline.yaml")