feat: add DVD/video transcription pipeline

5-step KFP pipeline:
1. extract_audio: ffmpeg extracts 16kHz mono WAV from DVD/video
2. chunk_audio: splits into 5-minute segments for Whisper
3. transcribe_chunks: sends each chunk to Whisper STT endpoint
4. format_transcript: produces SRT, VTT, or TXT with timestamps
5. log_metrics: logs run to MLflow (dvd-transcription experiment)
This commit is contained in:
2026-02-13 09:22:56 -05:00
parent bc4b230dd9
commit 45996a8dbf
2 changed files with 959 additions and 0 deletions

View File

@@ -0,0 +1,410 @@
#!/usr/bin/env python3
"""
DVD / Video Transcription Pipeline Kubeflow Pipelines SDK
Extracts audio from a DVD (ISO, VOB) or video file (MKV, MP4, etc.),
chunks it into segments, sends each to the Whisper STT service, and
merges the results into a complete timestamped transcript.
Usage:
pip install kfp==2.12.1
python dvd_transcription_pipeline.py
# Upload dvd_transcription_pipeline.yaml to Kubeflow Pipelines UI
"""
from kfp import compiler, dsl
from typing import NamedTuple
# ──────────────────────────────────────────────────────────────
# 1. Extract audio from DVD / video file via ffmpeg
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["requests"],
)
def extract_audio(
source_path: str,
sample_rate: int = 16000,
mono: bool = True,
) -> NamedTuple("AudioOutput", [("wav_path", str), ("duration_s", float)]):
"""Extract audio from a video/DVD file using ffmpeg.
Args:
source_path: Path to DVD ISO, VOB, MKV, MP4, or any ffmpeg-
supported file. Can also be a /dev/sr0 device.
sample_rate: Target sample rate (16 kHz is optimal for Whisper).
mono: Down-mix to mono (Whisper expects single-channel).
"""
import os
import subprocess
import json
# Install ffmpeg inside the container
subprocess.run(
["apt-get", "update", "-qq"],
check=True, capture_output=True,
)
subprocess.run(
["apt-get", "install", "-y", "-qq", "ffmpeg"],
check=True, capture_output=True,
)
out_dir = "/tmp/dvd_audio"
os.makedirs(out_dir, exist_ok=True)
wav_path = os.path.join(out_dir, "full_audio.wav")
# Build ffmpeg command
cmd = ["ffmpeg", "-y"]
# Handle DVD ISOs mount via concat demuxer or direct input
if source_path.lower().endswith(".iso"):
# For ISOs, ffmpeg can read via dvdread protocol
cmd += ["-i", f"dvd://{source_path}"]
else:
cmd += ["-i", source_path]
# Audio extraction options
cmd += [
"-vn", # drop video
"-acodec", "pcm_s16le", # 16-bit WAV
"-ar", str(sample_rate), # resample
]
if mono:
cmd += ["-ac", "1"] # down-mix to mono
cmd += [wav_path]
print(f"Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=7200)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg failed:\n{result.stderr}")
# Get duration via ffprobe
probe = subprocess.run(
[
"ffprobe", "-v", "quiet",
"-show_entries", "format=duration",
"-of", "json", wav_path,
],
capture_output=True, text=True,
)
duration_s = float(json.loads(probe.stdout)["format"]["duration"])
file_size_mb = os.path.getsize(wav_path) / (1024 * 1024)
print(f"Extracted: {wav_path} ({file_size_mb:.1f} MB, {duration_s:.1f}s)")
from collections import namedtuple
AudioOutput = namedtuple("AudioOutput", ["wav_path", "duration_s"])
return AudioOutput(wav_path=wav_path, duration_s=duration_s)
# ──────────────────────────────────────────────────────────────
# 2. Split audio into manageable chunks
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
)
def chunk_audio(
wav_path: str,
chunk_duration_s: int = 300,
) -> NamedTuple("ChunkOutput", [("chunk_paths", list), ("num_chunks", int)]):
"""Split a WAV file into fixed-duration chunks.
Args:
wav_path: Path to the mono 16 kHz WAV.
chunk_duration_s: Seconds per chunk (default 5 minutes).
"""
import os
import subprocess
import glob
subprocess.run(
["apt-get", "update", "-qq"],
check=True, capture_output=True,
)
subprocess.run(
["apt-get", "install", "-y", "-qq", "ffmpeg"],
check=True, capture_output=True,
)
out_dir = "/tmp/dvd_chunks"
os.makedirs(out_dir, exist_ok=True)
pattern = os.path.join(out_dir, "chunk_%04d.wav")
cmd = [
"ffmpeg", "-y",
"-i", wav_path,
"-f", "segment",
"-segment_time", str(chunk_duration_s),
"-c", "copy",
pattern,
]
print(f"Chunking: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg chunk failed:\n{result.stderr}")
chunks = sorted(glob.glob(os.path.join(out_dir, "chunk_*.wav")))
print(f"Created {len(chunks)} chunks of ~{chunk_duration_s}s each")
from collections import namedtuple
ChunkOutput = namedtuple("ChunkOutput", ["chunk_paths", "num_chunks"])
return ChunkOutput(chunk_paths=chunks, num_chunks=len(chunks))
# ──────────────────────────────────────────────────────────────
# 3. Transcribe all chunks via Whisper STT endpoint
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["requests"],
)
def transcribe_chunks(
chunk_paths: list,
whisper_url: str = "http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper",
language: str = "en",
response_format: str = "verbose_json",
) -> NamedTuple("TranscriptOutput", [("segments", list), ("full_text", str), ("total_duration_s", float)]):
"""Send each audio chunk to the Whisper STT endpoint.
Args:
chunk_paths: List of WAV file paths to transcribe.
whisper_url: In-cluster Whisper endpoint URL.
language: Language code for Whisper (None for auto-detect).
response_format: 'json', 'verbose_json', or 'text'.
"""
import base64
import json
import time
import requests
all_segments = []
full_text_parts = []
total_audio_duration = 0.0
time_offset = 0.0 # cumulative offset for absolute timestamps
for i, path in enumerate(chunk_paths):
print(f"Transcribing chunk {i + 1}/{len(chunk_paths)}: {path}")
# Read and base64-encode the chunk
with open(path, "rb") as f:
audio_b64 = base64.b64encode(f.read()).decode("utf-8")
payload = {
"audio": audio_b64,
"audio_format": "wav",
"language": language,
"task": "transcribe",
"response_format": response_format,
"word_timestamps": False,
}
start = time.time()
resp = requests.post(whisper_url, json=payload, timeout=600)
elapsed = time.time() - start
resp.raise_for_status()
data = resp.json()
chunk_duration = data.get("duration", 0.0)
total_audio_duration += chunk_duration
if "segments" in data:
for seg in data["segments"]:
# Offset timestamps to be absolute within the full audio
seg["start"] += time_offset
seg["end"] += time_offset
all_segments.append(seg)
chunk_text = data.get("text", "")
full_text_parts.append(chunk_text)
time_offset += chunk_duration
rtf = elapsed / chunk_duration if chunk_duration > 0 else 0
print(f"{len(chunk_text)} chars, {chunk_duration:.1f}s audio in {elapsed:.1f}s (RTF={rtf:.2f})")
full_text = "\n".join(full_text_parts)
print(f"\nTotal: {len(all_segments)} segments, {total_audio_duration:.1f}s audio")
print(f"Transcript length: {len(full_text)} characters")
from collections import namedtuple
TranscriptOutput = namedtuple("TranscriptOutput", ["segments", "full_text", "total_duration_s"])
return TranscriptOutput(
segments=all_segments,
full_text=full_text.strip(),
total_duration_s=total_audio_duration,
)
# ──────────────────────────────────────────────────────────────
# 4. Format and save the final transcript
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
)
def format_transcript(
segments: list,
full_text: str,
total_duration_s: float,
output_format: str = "srt",
) -> NamedTuple("FormattedOutput", [("transcript", str), ("output_path", str)]):
"""Format the transcript as SRT, VTT, or plain text.
Args:
segments: List of segment dicts with start/end/text.
full_text: Full concatenated text.
total_duration_s: Total audio duration in seconds.
output_format: 'srt', 'vtt', or 'txt'.
"""
import os
def _fmt_ts_srt(seconds: float) -> str:
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds % 1) * 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def _fmt_ts_vtt(seconds: float) -> str:
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds % 1) * 1000)
return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}"
out_dir = "/tmp/dvd_transcript"
os.makedirs(out_dir, exist_ok=True)
if output_format == "srt":
lines = []
for i, seg in enumerate(segments, 1):
start_ts = _fmt_ts_srt(seg["start"])
end_ts = _fmt_ts_srt(seg["end"])
text = seg.get("text", "").strip()
lines.append(f"{i}\n{start_ts} --> {end_ts}\n{text}\n")
transcript = "\n".join(lines)
ext = "srt"
elif output_format == "vtt":
lines = ["WEBVTT\n"]
for seg in segments:
start_ts = _fmt_ts_vtt(seg["start"])
end_ts = _fmt_ts_vtt(seg["end"])
text = seg.get("text", "").strip()
lines.append(f"{start_ts} --> {end_ts}\n{text}\n")
transcript = "\n".join(lines)
ext = "vtt"
else: # txt
transcript = full_text
ext = "txt"
out_path = os.path.join(out_dir, f"transcript.{ext}")
with open(out_path, "w", encoding="utf-8") as f:
f.write(transcript)
h = int(total_duration_s // 3600)
m = int((total_duration_s % 3600) // 60)
print(f"Transcript saved: {out_path}")
print(f"Audio duration: {h}h {m}m, Segments: {len(segments)}")
print(f"Format: {output_format.upper()}, Size: {len(transcript)} chars")
from collections import namedtuple
FormattedOutput = namedtuple("FormattedOutput", ["transcript", "output_path"])
return FormattedOutput(transcript=transcript, output_path=out_path)
# ──────────────────────────────────────────────────────────────
# 5. (Optional) Log metrics to MLflow
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.13-slim",
packages_to_install=["mlflow==2.22.0"],
)
def log_transcription_metrics(
total_duration_s: float,
full_text: str,
source_path: str,
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
experiment_name: str = "dvd-transcription",
):
"""Log transcription run metrics to MLflow."""
import mlflow
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=f"transcribe-{source_path.split('/')[-1]}"):
mlflow.log_params({
"source_path": source_path,
"model": "whisper-large-v3",
})
mlflow.log_metrics({
"audio_duration_s": total_duration_s,
"transcript_chars": float(len(full_text)),
})
# ──────────────────────────────────────────────────────────────
# Pipeline definition
# ──────────────────────────────────────────────────────────────
@dsl.pipeline(
name="DVD / Video Transcription",
description=(
"Extract audio from a DVD or video file, transcribe it via Whisper STT, "
"and produce a timestamped transcript (SRT/VTT/TXT)."
),
)
def dvd_transcription_pipeline(
source_path: str = "/data/dvd/movie.mkv",
chunk_duration_s: int = 300,
language: str = "en",
output_format: str = "srt",
whisper_url: str = "http://ai-inference-serve-svc.kuberay.svc.cluster.local:8000/whisper",
mlflow_tracking_uri: str = "http://mlflow.mlflow.svc.cluster.local:80",
):
# Step 1: Extract audio from the video/DVD source
audio = extract_audio(
source_path=source_path,
sample_rate=16000,
mono=True,
)
# Step 2: Chunk the audio into manageable segments
chunks = chunk_audio(
wav_path=audio.outputs["wav_path"],
chunk_duration_s=chunk_duration_s,
)
# Step 3: Transcribe each chunk via the Whisper endpoint
transcript = transcribe_chunks(
chunk_paths=chunks.outputs["chunk_paths"],
whisper_url=whisper_url,
language=language,
response_format="verbose_json",
)
# Step 4: Format the transcript into the desired output format
formatted = format_transcript(
segments=transcript.outputs["segments"],
full_text=transcript.outputs["full_text"],
total_duration_s=transcript.outputs["total_duration_s"],
output_format=output_format,
)
# Step 5: Log metrics to MLflow
log_transcription_metrics(
total_duration_s=transcript.outputs["total_duration_s"],
full_text=transcript.outputs["full_text"],
source_path=source_path,
mlflow_tracking_uri=mlflow_tracking_uri,
)
# ──────────────────────────────────────────────────────────────
# Compile
# ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=dvd_transcription_pipeline,
package_path="dvd_transcription_pipeline.yaml",
)
print("Compiled: dvd_transcription_pipeline.yaml")