diff --git a/dvd_transcription_pipeline.py b/dvd_transcription_pipeline.py new file mode 100644 index 0000000..00e0ad3 --- /dev/null +++ b/dvd_transcription_pipeline.py @@ -0,0 +1,410 @@ +#!/usr/bin/env python3 +""" +DVD / Video Transcription Pipeline – Kubeflow Pipelines SDK + +Extracts audio from a DVD (ISO, VOB) or video file (MKV, MP4, etc.), +chunks it into segments, sends each to the Whisper STT service, and +merges the results into a complete timestamped transcript. + +Usage: + pip install kfp==2.12.1 + python dvd_transcription_pipeline.py + # Upload dvd_transcription_pipeline.yaml to Kubeflow Pipelines UI +""" + +from kfp import compiler, dsl +from typing import NamedTuple + + +# ────────────────────────────────────────────────────────────── +# 1. Extract audio from DVD / video file via ffmpeg +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["requests"], +) +def extract_audio( + source_path: str, + sample_rate: int = 16000, + mono: bool = True, +) -> NamedTuple("AudioOutput", [("wav_path", str), ("duration_s", float)]): + """Extract audio from a video/DVD file using ffmpeg. + + Args: + source_path: Path to DVD ISO, VOB, MKV, MP4, or any ffmpeg- + supported file. Can also be a /dev/sr0 device. + sample_rate: Target sample rate (16 kHz is optimal for Whisper). + mono: Down-mix to mono (Whisper expects single-channel). + """ + import os + import subprocess + import json + + # Install ffmpeg inside the container + subprocess.run( + ["apt-get", "update", "-qq"], + check=True, capture_output=True, + ) + subprocess.run( + ["apt-get", "install", "-y", "-qq", "ffmpeg"], + check=True, capture_output=True, + ) + + out_dir = "/tmp/dvd_audio" + os.makedirs(out_dir, exist_ok=True) + wav_path = os.path.join(out_dir, "full_audio.wav") + + # Build ffmpeg command + cmd = ["ffmpeg", "-y"] + + # Handle DVD ISOs – mount via concat demuxer or direct input + if source_path.lower().endswith(".iso"): + # For ISOs, ffmpeg can read via dvdread protocol + cmd += ["-i", f"dvd://{source_path}"] + else: + cmd += ["-i", source_path] + + # Audio extraction options + cmd += [ + "-vn", # drop video + "-acodec", "pcm_s16le", # 16-bit WAV + "-ar", str(sample_rate), # resample + ] + if mono: + cmd += ["-ac", "1"] # down-mix to mono + + cmd += [wav_path] + + print(f"Running: {' '.join(cmd)}") + result = subprocess.run(cmd, capture_output=True, text=True, timeout=7200) + if result.returncode != 0: + raise RuntimeError(f"ffmpeg failed:\n{result.stderr}") + + # Get duration via ffprobe + probe = subprocess.run( + [ + "ffprobe", "-v", "quiet", + "-show_entries", "format=duration", + "-of", "json", wav_path, + ], + capture_output=True, text=True, + ) + duration_s = float(json.loads(probe.stdout)["format"]["duration"]) + file_size_mb = os.path.getsize(wav_path) / (1024 * 1024) + print(f"Extracted: {wav_path} ({file_size_mb:.1f} MB, {duration_s:.1f}s)") + + from collections import namedtuple + AudioOutput = namedtuple("AudioOutput", ["wav_path", "duration_s"]) + return AudioOutput(wav_path=wav_path, duration_s=duration_s) + + +# ────────────────────────────────────────────────────────────── +# 2. Split audio into manageable chunks +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", +) +def chunk_audio( + wav_path: str, + chunk_duration_s: int = 300, +) -> NamedTuple("ChunkOutput", [("chunk_paths", list), ("num_chunks", int)]): + """Split a WAV file into fixed-duration chunks. + + Args: + wav_path: Path to the mono 16 kHz WAV. + chunk_duration_s: Seconds per chunk (default 5 minutes). + """ + import os + import subprocess + import glob + + subprocess.run( + ["apt-get", "update", "-qq"], + check=True, capture_output=True, + ) + subprocess.run( + ["apt-get", "install", "-y", "-qq", "ffmpeg"], + check=True, capture_output=True, + ) + + out_dir = "/tmp/dvd_chunks" + os.makedirs(out_dir, exist_ok=True) + pattern = os.path.join(out_dir, "chunk_%04d.wav") + + cmd = [ + "ffmpeg", "-y", + "-i", wav_path, + "-f", "segment", + "-segment_time", str(chunk_duration_s), + "-c", "copy", + pattern, + ] + print(f"Chunking: {' '.join(cmd)}") + result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600) + if result.returncode != 0: + raise RuntimeError(f"ffmpeg chunk failed:\n{result.stderr}") + + chunks = sorted(glob.glob(os.path.join(out_dir, "chunk_*.wav"))) + print(f"Created {len(chunks)} chunks of ~{chunk_duration_s}s each") + + from collections import namedtuple + ChunkOutput = namedtuple("ChunkOutput", ["chunk_paths", "num_chunks"]) + return ChunkOutput(chunk_paths=chunks, num_chunks=len(chunks)) + + +# ────────────────────────────────────────────────────────────── +# 3. Transcribe all chunks via Whisper STT endpoint +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["requests"], +) +def transcribe_chunks( + chunk_paths: list, + whisper_url: str = "http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper", + language: str = "en", + response_format: str = "verbose_json", +) -> NamedTuple("TranscriptOutput", [("segments", list), ("full_text", str), ("total_duration_s", float)]): + """Send each audio chunk to the Whisper STT endpoint. + + Args: + chunk_paths: List of WAV file paths to transcribe. + whisper_url: In-cluster Whisper endpoint URL. + language: Language code for Whisper (None for auto-detect). + response_format: 'json', 'verbose_json', or 'text'. + """ + import base64 + import json + import time + import requests + + all_segments = [] + full_text_parts = [] + total_audio_duration = 0.0 + time_offset = 0.0 # cumulative offset for absolute timestamps + + for i, path in enumerate(chunk_paths): + print(f"Transcribing chunk {i + 1}/{len(chunk_paths)}: {path}") + + # Read and base64-encode the chunk + with open(path, "rb") as f: + audio_b64 = base64.b64encode(f.read()).decode("utf-8") + + payload = { + "audio": audio_b64, + "audio_format": "wav", + "language": language, + "task": "transcribe", + "response_format": response_format, + "word_timestamps": False, + } + + start = time.time() + resp = requests.post(whisper_url, json=payload, timeout=600) + elapsed = time.time() - start + resp.raise_for_status() + data = resp.json() + + chunk_duration = data.get("duration", 0.0) + total_audio_duration += chunk_duration + + if "segments" in data: + for seg in data["segments"]: + # Offset timestamps to be absolute within the full audio + seg["start"] += time_offset + seg["end"] += time_offset + all_segments.append(seg) + + chunk_text = data.get("text", "") + full_text_parts.append(chunk_text) + time_offset += chunk_duration + rtf = elapsed / chunk_duration if chunk_duration > 0 else 0 + print(f" → {len(chunk_text)} chars, {chunk_duration:.1f}s audio in {elapsed:.1f}s (RTF={rtf:.2f})") + + full_text = "\n".join(full_text_parts) + print(f"\nTotal: {len(all_segments)} segments, {total_audio_duration:.1f}s audio") + print(f"Transcript length: {len(full_text)} characters") + + from collections import namedtuple + TranscriptOutput = namedtuple("TranscriptOutput", ["segments", "full_text", "total_duration_s"]) + return TranscriptOutput( + segments=all_segments, + full_text=full_text.strip(), + total_duration_s=total_audio_duration, + ) + + +# ────────────────────────────────────────────────────────────── +# 4. Format and save the final transcript +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", +) +def format_transcript( + segments: list, + full_text: str, + total_duration_s: float, + output_format: str = "srt", +) -> NamedTuple("FormattedOutput", [("transcript", str), ("output_path", str)]): + """Format the transcript as SRT, VTT, or plain text. + + Args: + segments: List of segment dicts with start/end/text. + full_text: Full concatenated text. + total_duration_s: Total audio duration in seconds. + output_format: 'srt', 'vtt', or 'txt'. + """ + import os + + def _fmt_ts_srt(seconds: float) -> str: + h = int(seconds // 3600) + m = int((seconds % 3600) // 60) + s = int(seconds % 60) + ms = int((seconds % 1) * 1000) + return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" + + def _fmt_ts_vtt(seconds: float) -> str: + h = int(seconds // 3600) + m = int((seconds % 3600) // 60) + s = int(seconds % 60) + ms = int((seconds % 1) * 1000) + return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}" + + out_dir = "/tmp/dvd_transcript" + os.makedirs(out_dir, exist_ok=True) + + if output_format == "srt": + lines = [] + for i, seg in enumerate(segments, 1): + start_ts = _fmt_ts_srt(seg["start"]) + end_ts = _fmt_ts_srt(seg["end"]) + text = seg.get("text", "").strip() + lines.append(f"{i}\n{start_ts} --> {end_ts}\n{text}\n") + transcript = "\n".join(lines) + ext = "srt" + + elif output_format == "vtt": + lines = ["WEBVTT\n"] + for seg in segments: + start_ts = _fmt_ts_vtt(seg["start"]) + end_ts = _fmt_ts_vtt(seg["end"]) + text = seg.get("text", "").strip() + lines.append(f"{start_ts} --> {end_ts}\n{text}\n") + transcript = "\n".join(lines) + ext = "vtt" + + else: # txt + transcript = full_text + ext = "txt" + + out_path = os.path.join(out_dir, f"transcript.{ext}") + with open(out_path, "w", encoding="utf-8") as f: + f.write(transcript) + + h = int(total_duration_s // 3600) + m = int((total_duration_s % 3600) // 60) + print(f"Transcript saved: {out_path}") + print(f"Audio duration: {h}h {m}m, Segments: {len(segments)}") + print(f"Format: {output_format.upper()}, Size: {len(transcript)} chars") + + from collections import namedtuple + FormattedOutput = namedtuple("FormattedOutput", ["transcript", "output_path"]) + return FormattedOutput(transcript=transcript, output_path=out_path) + + +# ────────────────────────────────────────────────────────────── +# 5. (Optional) Log metrics to MLflow +# ────────────────────────────────────────────────────────────── +@dsl.component( + base_image="python:3.13-slim", + packages_to_install=["mlflow==2.22.0"], +) +def log_transcription_metrics( + total_duration_s: float, + full_text: str, + source_path: str, + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", + experiment_name: str = "dvd-transcription", +): + """Log transcription run metrics to MLflow.""" + import mlflow + + mlflow.set_tracking_uri(mlflow_tracking_uri) + mlflow.set_experiment(experiment_name) + + with mlflow.start_run(run_name=f"transcribe-{source_path.split('/')[-1]}"): + mlflow.log_params({ + "source_path": source_path, + "model": "whisper-large-v3", + }) + mlflow.log_metrics({ + "audio_duration_s": total_duration_s, + "transcript_chars": float(len(full_text)), + }) + + +# ────────────────────────────────────────────────────────────── +# Pipeline definition +# ────────────────────────────────────────────────────────────── +@dsl.pipeline( + name="DVD / Video Transcription", + description=( + "Extract audio from a DVD or video file, transcribe it via Whisper STT, " + "and produce a timestamped transcript (SRT/VTT/TXT)." + ), +) +def dvd_transcription_pipeline( + source_path: str = "/data/dvd/movie.mkv", + chunk_duration_s: int = 300, + language: str = "en", + output_format: str = "srt", + whisper_url: str = "http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper", + mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80", +): + # Step 1: Extract audio from the video/DVD source + audio = extract_audio( + source_path=source_path, + sample_rate=16000, + mono=True, + ) + + # Step 2: Chunk the audio into manageable segments + chunks = chunk_audio( + wav_path=audio.outputs["wav_path"], + chunk_duration_s=chunk_duration_s, + ) + + # Step 3: Transcribe each chunk via the Whisper endpoint + transcript = transcribe_chunks( + chunk_paths=chunks.outputs["chunk_paths"], + whisper_url=whisper_url, + language=language, + response_format="verbose_json", + ) + + # Step 4: Format the transcript into the desired output format + formatted = format_transcript( + segments=transcript.outputs["segments"], + full_text=transcript.outputs["full_text"], + total_duration_s=transcript.outputs["total_duration_s"], + output_format=output_format, + ) + + # Step 5: Log metrics to MLflow + log_transcription_metrics( + total_duration_s=transcript.outputs["total_duration_s"], + full_text=transcript.outputs["full_text"], + source_path=source_path, + mlflow_tracking_uri=mlflow_tracking_uri, + ) + + +# ────────────────────────────────────────────────────────────── +# Compile +# ────────────────────────────────────────────────────────────── +if __name__ == "__main__": + compiler.Compiler().compile( + pipeline_func=dvd_transcription_pipeline, + package_path="dvd_transcription_pipeline.yaml", + ) + print("Compiled: dvd_transcription_pipeline.yaml") diff --git a/dvd_transcription_pipeline.yaml b/dvd_transcription_pipeline.yaml new file mode 100644 index 0000000..b4f1800 --- /dev/null +++ b/dvd_transcription_pipeline.yaml @@ -0,0 +1,549 @@ +# PIPELINE DEFINITION +# Name: dvd-video-transcription +# Description: Extract audio from a DVD or video file, transcribe it via Whisper STT, and produce a timestamped transcript (SRT/VTT/TXT). +# Inputs: +# chunk_duration_s: int [Default: 300.0] +# language: str [Default: 'en'] +# mlflow_tracking_uri: str [Default: 'http://mlflow.mlflow.svc.cluster.local:80'] +# output_format: str [Default: 'srt'] +# source_path: str [Default: '/data/dvd/movie.mkv'] +# whisper_url: str [Default: 'http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper'] +components: + comp-chunk-audio: + executorLabel: exec-chunk-audio + inputDefinitions: + parameters: + chunk_duration_s: + defaultValue: 300.0 + description: Seconds per chunk (default 5 minutes). + isOptional: true + parameterType: NUMBER_INTEGER + wav_path: + description: ' Path to the mono 16 kHz WAV.' + parameterType: STRING + outputDefinitions: + parameters: + chunk_paths: + parameterType: LIST + num_chunks: + parameterType: NUMBER_INTEGER + comp-extract-audio: + executorLabel: exec-extract-audio + inputDefinitions: + parameters: + mono: + defaultValue: true + description: ' Down-mix to mono (Whisper expects single-channel).' + isOptional: true + parameterType: BOOLEAN + sample_rate: + defaultValue: 16000.0 + description: Target sample rate (16 kHz is optimal for Whisper). + isOptional: true + parameterType: NUMBER_INTEGER + source_path: + description: 'Path to DVD ISO, VOB, MKV, MP4, or any ffmpeg- + + supported file. Can also be a /dev/sr0 device.' + parameterType: STRING + outputDefinitions: + parameters: + duration_s: + parameterType: NUMBER_DOUBLE + wav_path: + parameterType: STRING + comp-format-transcript: + executorLabel: exec-format-transcript + inputDefinitions: + parameters: + full_text: + description: ' Full concatenated text.' + parameterType: STRING + output_format: + defaultValue: srt + description: ' ''srt'', ''vtt'', or ''txt''.' + isOptional: true + parameterType: STRING + segments: + description: ' List of segment dicts with start/end/text.' + parameterType: LIST + total_duration_s: + description: Total audio duration in seconds. + parameterType: NUMBER_DOUBLE + outputDefinitions: + parameters: + output_path: + parameterType: STRING + transcript: + parameterType: STRING + comp-log-transcription-metrics: + executorLabel: exec-log-transcription-metrics + inputDefinitions: + parameters: + experiment_name: + defaultValue: dvd-transcription + isOptional: true + parameterType: STRING + full_text: + parameterType: STRING + mlflow_tracking_uri: + defaultValue: http://mlflow.mlflow.svc.cluster.local:80 + isOptional: true + parameterType: STRING + source_path: + parameterType: STRING + total_duration_s: + parameterType: NUMBER_DOUBLE + comp-transcribe-chunks: + executorLabel: exec-transcribe-chunks + inputDefinitions: + parameters: + chunk_paths: + description: ' List of WAV file paths to transcribe.' + parameterType: LIST + language: + defaultValue: en + description: ' Language code for Whisper (None for auto-detect).' + isOptional: true + parameterType: STRING + response_format: + defaultValue: verbose_json + description: '''json'', ''verbose_json'', or ''text''.' + isOptional: true + parameterType: STRING + whisper_url: + defaultValue: http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper + description: ' In-cluster Whisper endpoint URL.' + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + full_text: + parameterType: STRING + segments: + parameterType: LIST + total_duration_s: + parameterType: NUMBER_DOUBLE +deploymentSpec: + executors: + exec-chunk-audio: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - chunk_audio + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef chunk_audio(\n wav_path: str,\n chunk_duration_s: int =\ + \ 300,\n) -> NamedTuple(\"ChunkOutput\", [(\"chunk_paths\", list), (\"num_chunks\"\ + , int)]):\n \"\"\"Split a WAV file into fixed-duration chunks.\n\n \ + \ Args:\n wav_path: Path to the mono 16 kHz WAV.\n \ + \ chunk_duration_s: Seconds per chunk (default 5 minutes).\n \"\"\"\ + \n import os\n import subprocess\n import glob\n\n subprocess.run(\n\ + \ [\"apt-get\", \"update\", \"-qq\"],\n check=True, capture_output=True,\n\ + \ )\n subprocess.run(\n [\"apt-get\", \"install\", \"-y\",\ + \ \"-qq\", \"ffmpeg\"],\n check=True, capture_output=True,\n )\n\ + \n out_dir = \"/tmp/dvd_chunks\"\n os.makedirs(out_dir, exist_ok=True)\n\ + \ pattern = os.path.join(out_dir, \"chunk_%04d.wav\")\n\n cmd = [\n\ + \ \"ffmpeg\", \"-y\",\n \"-i\", wav_path,\n \"-f\"\ + , \"segment\",\n \"-segment_time\", str(chunk_duration_s),\n \ + \ \"-c\", \"copy\",\n pattern,\n ]\n print(f\"Chunking:\ + \ {' '.join(cmd)}\")\n result = subprocess.run(cmd, capture_output=True,\ + \ text=True, timeout=3600)\n if result.returncode != 0:\n raise\ + \ RuntimeError(f\"ffmpeg chunk failed:\\n{result.stderr}\")\n\n chunks\ + \ = sorted(glob.glob(os.path.join(out_dir, \"chunk_*.wav\")))\n print(f\"\ + Created {len(chunks)} chunks of ~{chunk_duration_s}s each\")\n\n from\ + \ collections import namedtuple\n ChunkOutput = namedtuple(\"ChunkOutput\"\ + , [\"chunk_paths\", \"num_chunks\"])\n return ChunkOutput(chunk_paths=chunks,\ + \ num_chunks=len(chunks))\n\n" + image: python:3.13-slim + exec-extract-audio: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - extract_audio + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'requests' &&\ + \ \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef extract_audio(\n source_path: str,\n sample_rate: int =\ + \ 16000,\n mono: bool = True,\n) -> NamedTuple(\"AudioOutput\", [(\"\ + wav_path\", str), (\"duration_s\", float)]):\n \"\"\"Extract audio from\ + \ a video/DVD file using ffmpeg.\n\n Args:\n source_path: Path\ + \ to DVD ISO, VOB, MKV, MP4, or any ffmpeg-\n supported\ + \ file. Can also be a /dev/sr0 device.\n sample_rate: Target sample\ + \ rate (16 kHz is optimal for Whisper).\n mono: Down-mix to\ + \ mono (Whisper expects single-channel).\n \"\"\"\n import os\n \ + \ import subprocess\n import json\n\n # Install ffmpeg inside the\ + \ container\n subprocess.run(\n [\"apt-get\", \"update\", \"-qq\"\ + ],\n check=True, capture_output=True,\n )\n subprocess.run(\n\ + \ [\"apt-get\", \"install\", \"-y\", \"-qq\", \"ffmpeg\"],\n \ + \ check=True, capture_output=True,\n )\n\n out_dir = \"/tmp/dvd_audio\"\ + \n os.makedirs(out_dir, exist_ok=True)\n wav_path = os.path.join(out_dir,\ + \ \"full_audio.wav\")\n\n # Build ffmpeg command\n cmd = [\"ffmpeg\"\ + , \"-y\"]\n\n # Handle DVD ISOs \u2013 mount via concat demuxer or direct\ + \ input\n if source_path.lower().endswith(\".iso\"):\n # For ISOs,\ + \ ffmpeg can read via dvdread protocol\n cmd += [\"-i\", f\"dvd://{source_path}\"\ + ]\n else:\n cmd += [\"-i\", source_path]\n\n # Audio extraction\ + \ options\n cmd += [\n \"-vn\", # drop\ + \ video\n \"-acodec\", \"pcm_s16le\", # 16-bit WAV\n \ + \ \"-ar\", str(sample_rate), # resample\n ]\n if mono:\n \ + \ cmd += [\"-ac\", \"1\"] # down-mix to mono\n\n cmd +=\ + \ [wav_path]\n\n print(f\"Running: {' '.join(cmd)}\")\n result = subprocess.run(cmd,\ + \ capture_output=True, text=True, timeout=7200)\n if result.returncode\ + \ != 0:\n raise RuntimeError(f\"ffmpeg failed:\\n{result.stderr}\"\ + )\n\n # Get duration via ffprobe\n probe = subprocess.run(\n \ + \ [\n \"ffprobe\", \"-v\", \"quiet\",\n \"-show_entries\"\ + , \"format=duration\",\n \"-of\", \"json\", wav_path,\n \ + \ ],\n capture_output=True, text=True,\n )\n duration_s =\ + \ float(json.loads(probe.stdout)[\"format\"][\"duration\"])\n file_size_mb\ + \ = os.path.getsize(wav_path) / (1024 * 1024)\n print(f\"Extracted: {wav_path}\ + \ ({file_size_mb:.1f} MB, {duration_s:.1f}s)\")\n\n from collections\ + \ import namedtuple\n AudioOutput = namedtuple(\"AudioOutput\", [\"wav_path\"\ + , \"duration_s\"])\n return AudioOutput(wav_path=wav_path, duration_s=duration_s)\n\ + \n" + image: python:3.13-slim + exec-format-transcript: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - format_transcript + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef format_transcript(\n segments: list,\n full_text: str,\n\ + \ total_duration_s: float,\n output_format: str = \"srt\",\n) -> NamedTuple(\"\ + FormattedOutput\", [(\"transcript\", str), (\"output_path\", str)]):\n \ + \ \"\"\"Format the transcript as SRT, VTT, or plain text.\n\n Args:\n\ + \ segments: List of segment dicts with start/end/text.\n \ + \ full_text: Full concatenated text.\n total_duration_s:\ + \ Total audio duration in seconds.\n output_format: 'srt', 'vtt',\ + \ or 'txt'.\n \"\"\"\n import os\n\n def _fmt_ts_srt(seconds: float)\ + \ -> str:\n h = int(seconds // 3600)\n m = int((seconds %\ + \ 3600) // 60)\n s = int(seconds % 60)\n ms = int((seconds\ + \ % 1) * 1000)\n return f\"{h:02d}:{m:02d}:{s:02d},{ms:03d}\"\n\n\ + \ def _fmt_ts_vtt(seconds: float) -> str:\n h = int(seconds //\ + \ 3600)\n m = int((seconds % 3600) // 60)\n s = int(seconds\ + \ % 60)\n ms = int((seconds % 1) * 1000)\n return f\"{h:02d}:{m:02d}:{s:02d}.{ms:03d}\"\ + \n\n out_dir = \"/tmp/dvd_transcript\"\n os.makedirs(out_dir, exist_ok=True)\n\ + \n if output_format == \"srt\":\n lines = []\n for i, seg\ + \ in enumerate(segments, 1):\n start_ts = _fmt_ts_srt(seg[\"\ + start\"])\n end_ts = _fmt_ts_srt(seg[\"end\"])\n text\ + \ = seg.get(\"text\", \"\").strip()\n lines.append(f\"{i}\\n{start_ts}\ + \ --> {end_ts}\\n{text}\\n\")\n transcript = \"\\n\".join(lines)\n\ + \ ext = \"srt\"\n\n elif output_format == \"vtt\":\n lines\ + \ = [\"WEBVTT\\n\"]\n for seg in segments:\n start_ts\ + \ = _fmt_ts_vtt(seg[\"start\"])\n end_ts = _fmt_ts_vtt(seg[\"\ + end\"])\n text = seg.get(\"text\", \"\").strip()\n \ + \ lines.append(f\"{start_ts} --> {end_ts}\\n{text}\\n\")\n transcript\ + \ = \"\\n\".join(lines)\n ext = \"vtt\"\n\n else: # txt\n \ + \ transcript = full_text\n ext = \"txt\"\n\n out_path = os.path.join(out_dir,\ + \ f\"transcript.{ext}\")\n with open(out_path, \"w\", encoding=\"utf-8\"\ + ) as f:\n f.write(transcript)\n\n h = int(total_duration_s //\ + \ 3600)\n m = int((total_duration_s % 3600) // 60)\n print(f\"Transcript\ + \ saved: {out_path}\")\n print(f\"Audio duration: {h}h {m}m, Segments:\ + \ {len(segments)}\")\n print(f\"Format: {output_format.upper()}, Size:\ + \ {len(transcript)} chars\")\n\n from collections import namedtuple\n\ + \ FormattedOutput = namedtuple(\"FormattedOutput\", [\"transcript\",\ + \ \"output_path\"])\n return FormattedOutput(transcript=transcript, output_path=out_path)\n\ + \n" + image: python:3.13-slim + exec-log-transcription-metrics: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - log_transcription_metrics + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'mlflow==2.22.0'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef log_transcription_metrics(\n total_duration_s: float,\n \ + \ full_text: str,\n source_path: str,\n mlflow_tracking_uri: str\ + \ = \"http://mlflow.mlflow.svc.cluster.local:80\",\n experiment_name:\ + \ str = \"dvd-transcription\",\n):\n \"\"\"Log transcription run metrics\ + \ to MLflow.\"\"\"\n import mlflow\n\n mlflow.set_tracking_uri(mlflow_tracking_uri)\n\ + \ mlflow.set_experiment(experiment_name)\n\n with mlflow.start_run(run_name=f\"\ + transcribe-{source_path.split('/')[-1]}\"):\n mlflow.log_params({\n\ + \ \"source_path\": source_path,\n \"model\": \"whisper-large-v3\"\ + ,\n })\n mlflow.log_metrics({\n \"audio_duration_s\"\ + : total_duration_s,\n \"transcript_chars\": float(len(full_text)),\n\ + \ })\n\n" + image: python:3.13-slim + exec-transcribe-chunks: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - transcribe_chunks + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'requests' &&\ + \ \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef transcribe_chunks(\n chunk_paths: list,\n whisper_url:\ + \ str = \"http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper\"\ + ,\n language: str = \"en\",\n response_format: str = \"verbose_json\"\ + ,\n) -> NamedTuple(\"TranscriptOutput\", [(\"segments\", list), (\"full_text\"\ + , str), (\"total_duration_s\", float)]):\n \"\"\"Send each audio chunk\ + \ to the Whisper STT endpoint.\n\n Args:\n chunk_paths: List\ + \ of WAV file paths to transcribe.\n whisper_url: In-cluster\ + \ Whisper endpoint URL.\n language: Language code for Whisper\ + \ (None for auto-detect).\n response_format: 'json', 'verbose_json',\ + \ or 'text'.\n \"\"\"\n import base64\n import json\n import\ + \ time\n import requests\n\n all_segments = []\n full_text_parts\ + \ = []\n total_audio_duration = 0.0\n time_offset = 0.0 # cumulative\ + \ offset for absolute timestamps\n\n for i, path in enumerate(chunk_paths):\n\ + \ print(f\"Transcribing chunk {i + 1}/{len(chunk_paths)}: {path}\"\ + )\n\n # Read and base64-encode the chunk\n with open(path,\ + \ \"rb\") as f:\n audio_b64 = base64.b64encode(f.read()).decode(\"\ + utf-8\")\n\n payload = {\n \"audio\": audio_b64,\n \ + \ \"audio_format\": \"wav\",\n \"language\": language,\n\ + \ \"task\": \"transcribe\",\n \"response_format\"\ + : response_format,\n \"word_timestamps\": False,\n }\n\ + \n start = time.time()\n resp = requests.post(whisper_url,\ + \ json=payload, timeout=600)\n elapsed = time.time() - start\n \ + \ resp.raise_for_status()\n data = resp.json()\n\n chunk_duration\ + \ = data.get(\"duration\", 0.0)\n total_audio_duration += chunk_duration\n\ + \n if \"segments\" in data:\n for seg in data[\"segments\"\ + ]:\n # Offset timestamps to be absolute within the full audio\n\ + \ seg[\"start\"] += time_offset\n seg[\"end\"\ + ] += time_offset\n all_segments.append(seg)\n\n chunk_text\ + \ = data.get(\"text\", \"\")\n full_text_parts.append(chunk_text)\n\ + \ time_offset += chunk_duration\n rtf = elapsed / chunk_duration\ + \ if chunk_duration > 0 else 0\n print(f\" \u2192 {len(chunk_text)}\ + \ chars, {chunk_duration:.1f}s audio in {elapsed:.1f}s (RTF={rtf:.2f})\"\ + )\n\n full_text = \"\\n\".join(full_text_parts)\n print(f\"\\nTotal:\ + \ {len(all_segments)} segments, {total_audio_duration:.1f}s audio\")\n \ + \ print(f\"Transcript length: {len(full_text)} characters\")\n\n from\ + \ collections import namedtuple\n TranscriptOutput = namedtuple(\"TranscriptOutput\"\ + , [\"segments\", \"full_text\", \"total_duration_s\"])\n return TranscriptOutput(\n\ + \ segments=all_segments,\n full_text=full_text.strip(),\n\ + \ total_duration_s=total_audio_duration,\n )\n\n" + image: python:3.13-slim +pipelineInfo: + description: Extract audio from a DVD or video file, transcribe it via Whisper STT, + and produce a timestamped transcript (SRT/VTT/TXT). + name: dvd-video-transcription +root: + dag: + tasks: + chunk-audio: + cachingOptions: + enableCache: true + componentRef: + name: comp-chunk-audio + dependentTasks: + - extract-audio + inputs: + parameters: + chunk_duration_s: + componentInputParameter: chunk_duration_s + wav_path: + taskOutputParameter: + outputParameterKey: wav_path + producerTask: extract-audio + taskInfo: + name: chunk-audio + extract-audio: + cachingOptions: + enableCache: true + componentRef: + name: comp-extract-audio + inputs: + parameters: + mono: + runtimeValue: + constant: true + sample_rate: + runtimeValue: + constant: 16000.0 + source_path: + componentInputParameter: source_path + taskInfo: + name: extract-audio + format-transcript: + cachingOptions: + enableCache: true + componentRef: + name: comp-format-transcript + dependentTasks: + - transcribe-chunks + inputs: + parameters: + full_text: + taskOutputParameter: + outputParameterKey: full_text + producerTask: transcribe-chunks + output_format: + componentInputParameter: output_format + segments: + taskOutputParameter: + outputParameterKey: segments + producerTask: transcribe-chunks + total_duration_s: + taskOutputParameter: + outputParameterKey: total_duration_s + producerTask: transcribe-chunks + taskInfo: + name: format-transcript + log-transcription-metrics: + cachingOptions: + enableCache: true + componentRef: + name: comp-log-transcription-metrics + dependentTasks: + - transcribe-chunks + inputs: + parameters: + full_text: + taskOutputParameter: + outputParameterKey: full_text + producerTask: transcribe-chunks + mlflow_tracking_uri: + componentInputParameter: mlflow_tracking_uri + source_path: + componentInputParameter: source_path + total_duration_s: + taskOutputParameter: + outputParameterKey: total_duration_s + producerTask: transcribe-chunks + taskInfo: + name: log-transcription-metrics + transcribe-chunks: + cachingOptions: + enableCache: true + componentRef: + name: comp-transcribe-chunks + dependentTasks: + - chunk-audio + inputs: + parameters: + chunk_paths: + taskOutputParameter: + outputParameterKey: chunk_paths + producerTask: chunk-audio + language: + componentInputParameter: language + response_format: + runtimeValue: + constant: verbose_json + whisper_url: + componentInputParameter: whisper_url + taskInfo: + name: transcribe-chunks + inputDefinitions: + parameters: + chunk_duration_s: + defaultValue: 300.0 + isOptional: true + parameterType: NUMBER_INTEGER + language: + defaultValue: en + isOptional: true + parameterType: STRING + mlflow_tracking_uri: + defaultValue: http://mlflow.mlflow.svc.cluster.local:80 + isOptional: true + parameterType: STRING + output_format: + defaultValue: srt + isOptional: true + parameterType: STRING + source_path: + defaultValue: /data/dvd/movie.mkv + isOptional: true + parameterType: STRING + whisper_url: + defaultValue: http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.12.1