Files
kubeflow/dvd_transcription_pipeline.py
Billy D. 45996a8dbf feat: add DVD/video transcription pipeline
5-step KFP pipeline:
1. extract_audio: ffmpeg extracts 16kHz mono WAV from DVD/video
2. chunk_audio: splits into 5-minute segments for Whisper
3. transcribe_chunks: sends each chunk to Whisper STT endpoint
4. format_transcript: produces SRT, VTT, or TXT with timestamps
5. log_metrics: logs run to MLflow (dvd-transcription experiment)
2026-02-13 09:22:56 -05:00

411 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
DVD / Video Transcription Pipeline Kubeflow Pipelines SDK
Extracts audio from a DVD (ISO, VOB) or video file (MKV, MP4, etc.),
chunks it into segments, sends each to the Whisper STT service, and
merges the results into a complete timestamped transcript.
Usage:
pip install kfp==2.12.1
python dvd_transcription_pipeline.py
# Upload dvd_transcription_pipeline.yaml to Kubeflow Pipelines UI
"""
from kfp import compiler, dsl
from typing import NamedTuple
# ──────────────────────────────────────────────────────────────
# 1. Extract audio from DVD / video file via ffmpeg
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["requests"],
)
def extract_audio(
source_path: str,
sample_rate: int = 16000,
mono: bool = True,
) -> NamedTuple("AudioOutput", [("wav_path", str), ("duration_s", float)]):
"""Extract audio from a video/DVD file using ffmpeg.
Args:
source_path: Path to DVD ISO, VOB, MKV, MP4, or any ffmpeg-
supported file. Can also be a /dev/sr0 device.
sample_rate: Target sample rate (16 kHz is optimal for Whisper).
mono: Down-mix to mono (Whisper expects single-channel).
"""
import os
import subprocess
import json
# Install ffmpeg inside the container
subprocess.run(
["apt-get", "update", "-qq"],
check=True, capture_output=True,
)
subprocess.run(
["apt-get", "install", "-y", "-qq", "ffmpeg"],
check=True, capture_output=True,
)
out_dir = "/tmp/dvd_audio"
os.makedirs(out_dir, exist_ok=True)
wav_path = os.path.join(out_dir, "full_audio.wav")
# Build ffmpeg command
cmd = ["ffmpeg", "-y"]
# Handle DVD ISOs mount via concat demuxer or direct input
if source_path.lower().endswith(".iso"):
# For ISOs, ffmpeg can read via dvdread protocol
cmd += ["-i", f"dvd://{source_path}"]
else:
cmd += ["-i", source_path]
# Audio extraction options
cmd += [
"-vn", # drop video
"-acodec", "pcm_s16le", # 16-bit WAV
"-ar", str(sample_rate), # resample
]
if mono:
cmd += ["-ac", "1"] # down-mix to mono
cmd += [wav_path]
print(f"Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=7200)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg failed:\n{result.stderr}")
# Get duration via ffprobe
probe = subprocess.run(
[
"ffprobe", "-v", "quiet",
"-show_entries", "format=duration",
"-of", "json", wav_path,
],
capture_output=True, text=True,
)
duration_s = float(json.loads(probe.stdout)["format"]["duration"])
file_size_mb = os.path.getsize(wav_path) / (1024 * 1024)
print(f"Extracted: {wav_path} ({file_size_mb:.1f} MB, {duration_s:.1f}s)")
from collections import namedtuple
AudioOutput = namedtuple("AudioOutput", ["wav_path", "duration_s"])
return AudioOutput(wav_path=wav_path, duration_s=duration_s)
# ──────────────────────────────────────────────────────────────
# 2. Split audio into manageable chunks
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
)
def chunk_audio(
wav_path: str,
chunk_duration_s: int = 300,
) -> NamedTuple("ChunkOutput", [("chunk_paths", list), ("num_chunks", int)]):
"""Split a WAV file into fixed-duration chunks.
Args:
wav_path: Path to the mono 16 kHz WAV.
chunk_duration_s: Seconds per chunk (default 5 minutes).
"""
import os
import subprocess
import glob
subprocess.run(
["apt-get", "update", "-qq"],
check=True, capture_output=True,
)
subprocess.run(
["apt-get", "install", "-y", "-qq", "ffmpeg"],
check=True, capture_output=True,
)
out_dir = "/tmp/dvd_chunks"
os.makedirs(out_dir, exist_ok=True)
pattern = os.path.join(out_dir, "chunk_%04d.wav")
cmd = [
"ffmpeg", "-y",
"-i", wav_path,
"-f", "segment",
"-segment_time", str(chunk_duration_s),
"-c", "copy",
pattern,
]
print(f"Chunking: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg chunk failed:\n{result.stderr}")
chunks = sorted(glob.glob(os.path.join(out_dir, "chunk_*.wav")))
print(f"Created {len(chunks)} chunks of ~{chunk_duration_s}s each")
from collections import namedtuple
ChunkOutput = namedtuple("ChunkOutput", ["chunk_paths", "num_chunks"])
return ChunkOutput(chunk_paths=chunks, num_chunks=len(chunks))
# ──────────────────────────────────────────────────────────────
# 3. Transcribe all chunks via Whisper STT endpoint
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["requests"],
)
def transcribe_chunks(
chunk_paths: list,
whisper_url: str = "http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper",
language: str = "en",
response_format: str = "verbose_json",
) -> NamedTuple("TranscriptOutput", [("segments", list), ("full_text", str), ("total_duration_s", float)]):
"""Send each audio chunk to the Whisper STT endpoint.
Args:
chunk_paths: List of WAV file paths to transcribe.
whisper_url: In-cluster Whisper endpoint URL.
language: Language code for Whisper (None for auto-detect).
response_format: 'json', 'verbose_json', or 'text'.
"""
import base64
import json
import time
import requests
all_segments = []
full_text_parts = []
total_audio_duration = 0.0
time_offset = 0.0 # cumulative offset for absolute timestamps
for i, path in enumerate(chunk_paths):
print(f"Transcribing chunk {i + 1}/{len(chunk_paths)}: {path}")
# Read and base64-encode the chunk
with open(path, "rb") as f:
audio_b64 = base64.b64encode(f.read()).decode("utf-8")
payload = {
"audio": audio_b64,
"audio_format": "wav",
"language": language,
"task": "transcribe",
"response_format": response_format,
"word_timestamps": False,
}
start = time.time()
resp = requests.post(whisper_url, json=payload, timeout=600)
elapsed = time.time() - start
resp.raise_for_status()
data = resp.json()
chunk_duration = data.get("duration", 0.0)
total_audio_duration += chunk_duration
if "segments" in data:
for seg in data["segments"]:
# Offset timestamps to be absolute within the full audio
seg["start"] += time_offset
seg["end"] += time_offset
all_segments.append(seg)
chunk_text = data.get("text", "")
full_text_parts.append(chunk_text)
time_offset += chunk_duration
rtf = elapsed / chunk_duration if chunk_duration > 0 else 0
print(f"{len(chunk_text)} chars, {chunk_duration:.1f}s audio in {elapsed:.1f}s (RTF={rtf:.2f})")
full_text = "\n".join(full_text_parts)
print(f"\nTotal: {len(all_segments)} segments, {total_audio_duration:.1f}s audio")
print(f"Transcript length: {len(full_text)} characters")
from collections import namedtuple
TranscriptOutput = namedtuple("TranscriptOutput", ["segments", "full_text", "total_duration_s"])
return TranscriptOutput(
segments=all_segments,
full_text=full_text.strip(),
total_duration_s=total_audio_duration,
)
# ──────────────────────────────────────────────────────────────
# 4. Format and save the final transcript
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
)
def format_transcript(
segments: list,
full_text: str,
total_duration_s: float,
output_format: str = "srt",
) -> NamedTuple("FormattedOutput", [("transcript", str), ("output_path", str)]):
"""Format the transcript as SRT, VTT, or plain text.
Args:
segments: List of segment dicts with start/end/text.
full_text: Full concatenated text.
total_duration_s: Total audio duration in seconds.
output_format: 'srt', 'vtt', or 'txt'.
"""
import os
def _fmt_ts_srt(seconds: float) -> str:
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds % 1) * 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def _fmt_ts_vtt(seconds: float) -> str:
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds % 1) * 1000)
return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}"
out_dir = "/tmp/dvd_transcript"
os.makedirs(out_dir, exist_ok=True)
if output_format == "srt":
lines = []
for i, seg in enumerate(segments, 1):
start_ts = _fmt_ts_srt(seg["start"])
end_ts = _fmt_ts_srt(seg["end"])
text = seg.get("text", "").strip()
lines.append(f"{i}\n{start_ts} --> {end_ts}\n{text}\n")
transcript = "\n".join(lines)
ext = "srt"
elif output_format == "vtt":
lines = ["WEBVTT\n"]
for seg in segments:
start_ts = _fmt_ts_vtt(seg["start"])
end_ts = _fmt_ts_vtt(seg["end"])
text = seg.get("text", "").strip()
lines.append(f"{start_ts} --> {end_ts}\n{text}\n")
transcript = "\n".join(lines)
ext = "vtt"
else: # txt
transcript = full_text
ext = "txt"
out_path = os.path.join(out_dir, f"transcript.{ext}")
with open(out_path, "w", encoding="utf-8") as f:
f.write(transcript)
h = int(total_duration_s // 3600)
m = int((total_duration_s % 3600) // 60)
print(f"Transcript saved: {out_path}")
print(f"Audio duration: {h}h {m}m, Segments: {len(segments)}")
print(f"Format: {output_format.upper()}, Size: {len(transcript)} chars")
from collections import namedtuple
FormattedOutput = namedtuple("FormattedOutput", ["transcript", "output_path"])
return FormattedOutput(transcript=transcript, output_path=out_path)
# ──────────────────────────────────────────────────────────────
# 5. (Optional) Log metrics to MLflow
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["mlflow==2.22.0"],
)
def log_transcription_metrics(
total_duration_s: float,
full_text: str,
source_path: str,
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
experiment_name: str = "dvd-transcription",
):
"""Log transcription run metrics to MLflow."""
import mlflow
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=f"transcribe-{source_path.split('/')[-1]}"):
mlflow.log_params({
"source_path": source_path,
"model": "whisper-large-v3",
})
mlflow.log_metrics({
"audio_duration_s": total_duration_s,
"transcript_chars": float(len(full_text)),
})
# ──────────────────────────────────────────────────────────────
# Pipeline definition
# ──────────────────────────────────────────────────────────────
@dsl.pipeline(
name="DVD / Video Transcription",
description=(
"Extract audio from a DVD or video file, transcribe it via Whisper STT, "
"and produce a timestamped transcript (SRT/VTT/TXT)."
),
)
def dvd_transcription_pipeline(
source_path: str = "/data/dvd/movie.mkv",
chunk_duration_s: int = 300,
language: str = "en",
output_format: str = "srt",
whisper_url: str = "http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper",
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
):
# Step 1: Extract audio from the video/DVD source
audio = extract_audio(
source_path=source_path,
sample_rate=16000,
mono=True,
)
# Step 2: Chunk the audio into manageable segments
chunks = chunk_audio(
wav_path=audio.outputs["wav_path"],
chunk_duration_s=chunk_duration_s,
)
# Step 3: Transcribe each chunk via the Whisper endpoint
transcript = transcribe_chunks(
chunk_paths=chunks.outputs["chunk_paths"],
whisper_url=whisper_url,
language=language,
response_format="verbose_json",
)
# Step 4: Format the transcript into the desired output format
formatted = format_transcript(
segments=transcript.outputs["segments"],
full_text=transcript.outputs["full_text"],
total_duration_s=transcript.outputs["total_duration_s"],
output_format=output_format,
)
# Step 5: Log metrics to MLflow
log_transcription_metrics(
total_duration_s=transcript.outputs["total_duration_s"],
full_text=transcript.outputs["full_text"],
source_path=source_path,
mlflow_tracking_uri=mlflow_tracking_uri,
)
# ──────────────────────────────────────────────────────────────
# Compile
# ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=dvd_transcription_pipeline,
package_path="dvd_transcription_pipeline.yaml",
)
print("Compiled: dvd_transcription_pipeline.yaml")