From d4fafea09bd8fb59a116ad8c79817b2db831b99b Mon Sep 17 00:00:00 2001 From: "Billy D." Date: Mon, 2 Feb 2026 06:23:34 -0500 Subject: [PATCH] feat: add streaming TTS service with Coqui XTTS - tts_streaming.py: NATS-based TTS using XTTS HTTP API - Streaming audio chunks for low-latency playback - Voice cloning support via reference audio - Multi-language synthesis - OpenTelemetry instrumentation with HyperDX support --- .gitignore | 11 ++ Dockerfile | 14 ++ README.md | 169 +++++++++++++++++++++- healthcheck.py | 28 ++++ requirements.txt | 15 ++ tts_streaming.py | 356 +++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 592 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 healthcheck.py create mode 100644 requirements.txt create mode 100644 tts_streaming.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f024e3d --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +.venv/ +__pycache__/ +*.pyc +*.pyo +.pytest_cache/ +.mypy_cache/ +*.egg-info/ +dist/ +build/ +.env +.env.local diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8928f40 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Copy requirements and install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY tts_streaming.py . +COPY healthcheck.py . + +# Run the service +CMD ["python", "tts_streaming.py"] diff --git a/README.md b/README.md index eb6a668..cb20bb8 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,169 @@ -# tts-module +# Streaming TTS Module +A dedicated Text-to-Speech (TTS) service that processes synthesis requests from NATS using Coqui XTTS. + +## Overview + +This module enables real-time text-to-speech synthesis by accepting text via NATS and streaming audio chunks back as they're generated. This reduces latency for voice assistant applications by allowing playback to begin before synthesis completes. + +## Features + +- **NATS Integration**: Accepts TTS requests via NATS messaging +- **Streaming Audio**: Streams audio chunks back for immediate playback +- **Voice Cloning**: Support for custom speaker voices via reference audio +- **Multi-language**: Support for multiple languages via XTTS +- **OpenTelemetry**: Full observability with tracing and metrics +- **HyperDX Support**: Optional cloud observability integration + +## Architecture + +``` +┌─────────────────┐ +│ Voice App │ (voice-assistant, chat-handler) +│ │ +└────────┬────────┘ + │ Text + ▼ +┌─────────────────┐ +│ NATS Subject │ ai.voice.tts.request.{session_id} +│ TTS Request │ +└────────┬────────┘ + │ + ▼ +┌─────────────────┐ +│ TTS Streaming │ (This Service) +│ Service │ - Calls XTTS API +│ │ - Streams audio chunks +└────────┬────────┘ + │ + ▼ +┌─────────────────┐ +│ NATS Subject │ ai.voice.tts.audio.{session_id} +│ Audio Chunks │ +└─────────────────┘ +``` + +## NATS Message Protocol + +### TTS Request (ai.voice.tts.request.{session_id}) + +All messages use **msgpack** binary encoding. + +**Request:** +```python +{ + "text": "Hello, how can I help you today?", + "speaker": "default", # Optional: speaker ID + "language": "en", # Optional: language code + "speaker_wav_b64": "...", # Optional: base64 reference audio for voice cloning + "stream": True # Optional: stream chunks (default) or send complete audio +} +``` + +### Audio Output (ai.voice.tts.audio.{session_id}) + +**Streamed Chunk:** +```python +{ + "session_id": "unique-session-id", + "chunk_index": 0, + "total_chunks": 5, + "audio_b64": "base64-encoded-audio-chunk", + "is_last": False, + "timestamp": 1234567890.123, + "sample_rate": 24000 +} +``` + +**Complete Audio (when stream=False):** +```python +{ + "session_id": "unique-session-id", + "audio_b64": "base64-encoded-complete-audio", + "timestamp": 1234567890.123, + "sample_rate": 24000 +} +``` + +### Status Updates (ai.voice.tts.status.{session_id}) + +```python +{ + "session_id": "unique-session-id", + "status": "processing", # processing, completed, error + "message": "Synthesizing 50 characters", + "timestamp": 1234567890.123 +} +``` + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `NATS_URL` | `nats://nats.ai-ml.svc.cluster.local:4222` | NATS server URL | +| `XTTS_URL` | `http://xtts-predictor.ai-ml.svc.cluster.local` | Coqui XTTS service URL | +| `TTS_DEFAULT_SPEAKER` | `default` | Default speaker ID | +| `TTS_DEFAULT_LANGUAGE` | `en` | Default language code | +| `TTS_AUDIO_CHUNK_SIZE` | `32768` | Audio chunk size in bytes | +| `TTS_SAMPLE_RATE` | `24000` | Audio sample rate (Hz) | +| `OTEL_ENABLED` | `true` | Enable OpenTelemetry | +| `HYPERDX_ENABLED` | `false` | Enable HyperDX observability | + +## Building + +```bash +docker build -t tts-module:latest . +``` + +## Testing + +```bash +# Port-forward NATS +kubectl port-forward -n ai-ml svc/nats 4222:4222 + +# Send TTS request +python -c " +import nats +import msgpack +import asyncio + +async def test(): + nc = await nats.connect('nats://localhost:4222') + + request = { + 'text': 'Hello, this is a test of text to speech.', + 'stream': True + } + + await nc.publish( + 'ai.voice.tts.request.test-session', + msgpack.packb(request) + ) + await nc.close() + +asyncio.run(test()) +" + +# Subscribe to audio output +nats sub "ai.voice.tts.audio.>" +``` + +## Voice Cloning + +To use a custom voice, provide reference audio in the request: + +```python +import base64 + +with open("reference_voice.wav", "rb") as f: + speaker_wav_b64 = base64.b64encode(f.read()).decode() + +request = { + "text": "This will sound like the reference voice.", + "speaker_wav_b64": speaker_wav_b64 +} +``` + +## License + +MIT diff --git a/healthcheck.py b/healthcheck.py new file mode 100644 index 0000000..55a5678 --- /dev/null +++ b/healthcheck.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +""" +Health check script for Kubernetes probes +Verifies NATS connectivity +""" +import sys +import os +import asyncio + +import nats + +NATS_URL = os.environ.get("NATS_URL", "nats://nats.ai-ml.svc.cluster.local:4222") + + +async def check_health(): + """Check if service can connect to NATS.""" + try: + nc = await asyncio.wait_for(nats.connect(NATS_URL), timeout=5.0) + await nc.close() + return True + except Exception as e: + print(f"Health check failed: {e}", file=sys.stderr) + return False + + +if __name__ == "__main__": + result = asyncio.run(check_health()) + sys.exit(0 if result else 1) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ba4a966 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,15 @@ +nats-py>=2.0.0,<3.0.0 +httpx>=0.20.0,<1.0.0 +msgpack + +# OpenTelemetry core +opentelemetry-api +opentelemetry-sdk + +# OTEL exporters (gRPC for local collector, HTTP for HyperDX) +opentelemetry-exporter-otlp-proto-grpc +opentelemetry-exporter-otlp-proto-http + +# OTEL instrumentation +opentelemetry-instrumentation-httpx +opentelemetry-instrumentation-logging diff --git a/tts_streaming.py b/tts_streaming.py new file mode 100644 index 0000000..1cd98a5 --- /dev/null +++ b/tts_streaming.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +""" +Streaming TTS Service + +Real-time Text-to-Speech service that processes synthesis requests from NATS: +1. Subscribe to TTS requests on "ai.voice.tts.request.{session_id}" +2. Synthesize speech using Coqui XTTS via HTTP API +3. Stream audio chunks back via "ai.voice.tts.audio.{session_id}" +4. Support for voice cloning and multi-speaker synthesis + +This enables real-time voice synthesis for voice assistant applications. +""" +import asyncio +import base64 +import logging +import os +import signal +import time +from typing import Dict, Optional + +import httpx +import msgpack +import nats +import nats.js +from nats.aio.msg import Msg + +# OpenTelemetry imports +from opentelemetry import trace, metrics +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanExporterHTTP +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPMetricExporterHTTP +from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION, SERVICE_NAMESPACE +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.logging import LoggingInstrumentor + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("tts-streaming") + + +def setup_telemetry(): + """Initialize OpenTelemetry tracing and metrics with HyperDX support.""" + otel_enabled = os.environ.get("OTEL_ENABLED", "true").lower() == "true" + if not otel_enabled: + logger.info("OpenTelemetry disabled") + return None, None + + otel_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "http://opentelemetry-collector.observability.svc.cluster.local:4317") + service_name = os.environ.get("OTEL_SERVICE_NAME", "tts-streaming") + service_namespace = os.environ.get("OTEL_SERVICE_NAMESPACE", "ai-ml") + + hyperdx_api_key = os.environ.get("HYPERDX_API_KEY", "") + hyperdx_endpoint = os.environ.get("HYPERDX_ENDPOINT", "https://in-otel.hyperdx.io") + use_hyperdx = os.environ.get("HYPERDX_ENABLED", "false").lower() == "true" and hyperdx_api_key + + resource = Resource.create({ + SERVICE_NAME: service_name, + SERVICE_VERSION: os.environ.get("SERVICE_VERSION", "1.0.0"), + SERVICE_NAMESPACE: service_namespace, + "deployment.environment": os.environ.get("DEPLOYMENT_ENV", "production"), + "host.name": os.environ.get("HOSTNAME", "unknown"), + }) + + trace_provider = TracerProvider(resource=resource) + + if use_hyperdx: + logger.info(f"Configuring HyperDX exporter at {hyperdx_endpoint}") + headers = {"authorization": hyperdx_api_key} + otlp_span_exporter = OTLPSpanExporterHTTP( + endpoint=f"{hyperdx_endpoint}/v1/traces", + headers=headers + ) + otlp_metric_exporter = OTLPMetricExporterHTTP( + endpoint=f"{hyperdx_endpoint}/v1/metrics", + headers=headers + ) + else: + otlp_span_exporter = OTLPSpanExporter(endpoint=otel_endpoint, insecure=True) + otlp_metric_exporter = OTLPMetricExporter(endpoint=otel_endpoint, insecure=True) + + trace_provider.add_span_processor(BatchSpanProcessor(otlp_span_exporter)) + trace.set_tracer_provider(trace_provider) + + metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter, export_interval_millis=60000) + meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + HTTPXClientInstrumentor().instrument() + LoggingInstrumentor().instrument(set_logging_format=True) + + destination = "HyperDX" if use_hyperdx else "OTEL Collector" + logger.info(f"OpenTelemetry initialized - destination: {destination}, service: {service_name}") + + return trace.get_tracer(__name__), metrics.get_meter(__name__) + + +# Configuration from environment +XTTS_URL = os.environ.get("XTTS_URL", "http://xtts-predictor.ai-ml.svc.cluster.local") +NATS_URL = os.environ.get("NATS_URL", "nats://nats.ai-ml.svc.cluster.local:4222") + +# NATS subjects +REQUEST_SUBJECT_PREFIX = "ai.voice.tts.request" # ai.voice.tts.request.{session_id} +AUDIO_SUBJECT_PREFIX = "ai.voice.tts.audio" # ai.voice.tts.audio.{session_id} +STATUS_SUBJECT_PREFIX = "ai.voice.tts.status" # ai.voice.tts.status.{session_id} + +# TTS parameters +DEFAULT_SPEAKER = os.environ.get("TTS_DEFAULT_SPEAKER", "default") +DEFAULT_LANGUAGE = os.environ.get("TTS_DEFAULT_LANGUAGE", "en") +AUDIO_CHUNK_SIZE = int(os.environ.get("TTS_AUDIO_CHUNK_SIZE", "32768")) # 32KB chunks for streaming +SAMPLE_RATE = int(os.environ.get("TTS_SAMPLE_RATE", "24000")) # XTTS default sample rate + + +class StreamingTTS: + """Streaming Text-to-Speech service using Coqui XTTS.""" + + def __init__(self): + self.nc = None + self.js = None + self.http_client = None + self.running = True + self.is_healthy = False + self.tracer = None + self.meter = None + self.synthesis_counter = None + self.synthesis_duration = None + self.active_sessions: Dict[str, dict] = {} + + async def setup(self): + """Initialize connections.""" + self.tracer, self.meter = setup_telemetry() + + if self.meter: + self.synthesis_counter = self.meter.create_counter( + name="tts_synthesis_total", + description="Total number of TTS synthesis requests", + unit="1" + ) + self.synthesis_duration = self.meter.create_histogram( + name="tts_synthesis_duration_seconds", + description="Duration of TTS synthesis", + unit="s" + ) + + # NATS connection + self.nc = await nats.connect(NATS_URL) + logger.info(f"Connected to NATS at {NATS_URL}") + + # Initialize JetStream context + self.js = self.nc.jetstream() + + # Create or update stream for TTS messages + try: + stream_config = nats.js.api.StreamConfig( + name="AI_VOICE_TTS", + subjects=["ai.voice.tts.>"], + retention=nats.js.api.RetentionPolicy.LIMITS, + max_age=300, # 5 minutes + storage=nats.js.api.StorageType.MEMORY, + ) + await self.js.add_stream(stream_config) + logger.info("Created/updated JetStream stream: AI_VOICE_TTS") + except Exception as e: + logger.info(f"JetStream stream setup: {e}") + + # HTTP client for XTTS service + self.http_client = httpx.AsyncClient(timeout=180.0) + logger.info("HTTP client initialized") + + self.is_healthy = True + + async def synthesize(self, text: str, speaker: str = None, language: str = None, + speaker_wav_b64: str = None) -> Optional[bytes]: + """Synthesize speech using XTTS API.""" + start_time = time.time() + + try: + # Build request payload + payload = { + "text": text, + "speaker": speaker or DEFAULT_SPEAKER, + "language": language or DEFAULT_LANGUAGE, + } + + # Add speaker reference audio for voice cloning if provided + if speaker_wav_b64: + payload["speaker_wav"] = speaker_wav_b64 + + # Call XTTS API + response = await self.http_client.post( + f"{XTTS_URL}/v1/audio/speech", + json=payload + ) + response.raise_for_status() + + audio_bytes = response.content + + duration = time.time() - start_time + audio_duration = len(audio_bytes) / (SAMPLE_RATE * 2) # 16-bit audio + rtf = duration / audio_duration if audio_duration > 0 else 0 + + logger.info(f"Synthesized {len(audio_bytes)} bytes in {duration:.2f}s (RTF: {rtf:.2f})") + + if self.synthesis_duration: + self.synthesis_duration.record(duration, {"speaker": speaker or DEFAULT_SPEAKER}) + + return audio_bytes + + except Exception as e: + logger.error(f"Synthesis failed: {e}") + return None + + async def stream_audio(self, session_id: str, audio_bytes: bytes): + """Stream audio back to client in chunks.""" + total_chunks = (len(audio_bytes) + AUDIO_CHUNK_SIZE - 1) // AUDIO_CHUNK_SIZE + + for i in range(0, len(audio_bytes), AUDIO_CHUNK_SIZE): + chunk = audio_bytes[i:i + AUDIO_CHUNK_SIZE] + chunk_index = i // AUDIO_CHUNK_SIZE + is_last = (i + AUDIO_CHUNK_SIZE) >= len(audio_bytes) + + message = { + "session_id": session_id, + "chunk_index": chunk_index, + "total_chunks": total_chunks, + "audio_b64": base64.b64encode(chunk).decode(), + "is_last": is_last, + "timestamp": time.time(), + "sample_rate": SAMPLE_RATE, + } + + await self.nc.publish( + f"{AUDIO_SUBJECT_PREFIX}.{session_id}", + msgpack.packb(message) + ) + + logger.debug(f"Sent chunk {chunk_index + 1}/{total_chunks} for session {session_id}") + + logger.info(f"Streamed {total_chunks} chunks for session {session_id}") + + async def handle_request(self, msg: Msg): + """Handle incoming TTS request.""" + try: + # Extract session_id from subject: ai.voice.tts.request.{session_id} + subject_parts = msg.subject.split('.') + if len(subject_parts) < 5: + logger.warning(f"Invalid subject format: {msg.subject}") + return + + session_id = subject_parts[4] + + # Parse request using msgpack + data = msgpack.unpackb(msg.data, raw=False) + + text = data.get("text", "") + speaker = data.get("speaker") + language = data.get("language") + speaker_wav_b64 = data.get("speaker_wav_b64") # For voice cloning + stream = data.get("stream", True) # Default to streaming + + if not text: + logger.warning(f"Empty text for session {session_id}") + await self.publish_status(session_id, "error", "Empty text provided") + return + + logger.info(f"Processing TTS request for session {session_id}: {text[:50]}...") + + if self.synthesis_counter: + self.synthesis_counter.add(1, {"session_id": session_id}) + + # Publish status: processing + await self.publish_status(session_id, "processing", f"Synthesizing {len(text)} characters") + + # Synthesize audio + audio_bytes = await self.synthesize(text, speaker, language, speaker_wav_b64) + + if audio_bytes: + if stream: + # Stream audio in chunks + await self.stream_audio(session_id, audio_bytes) + else: + # Send complete audio in one message + message = { + "session_id": session_id, + "audio_b64": base64.b64encode(audio_bytes).decode(), + "timestamp": time.time(), + "sample_rate": SAMPLE_RATE, + } + await self.nc.publish( + f"{AUDIO_SUBJECT_PREFIX}.{session_id}", + msgpack.packb(message) + ) + + await self.publish_status(session_id, "completed", f"Audio size: {len(audio_bytes)} bytes") + else: + await self.publish_status(session_id, "error", "Synthesis failed") + + except Exception as e: + logger.error(f"Error handling TTS request: {e}", exc_info=True) + try: + await self.publish_status(session_id, "error", str(e)) + except: + pass + + async def publish_status(self, session_id: str, status: str, message: str = ""): + """Publish TTS status update.""" + status_msg = { + "session_id": session_id, + "status": status, + "message": message, + "timestamp": time.time(), + } + await self.nc.publish( + f"{STATUS_SUBJECT_PREFIX}.{session_id}", + msgpack.packb(status_msg) + ) + logger.debug(f"Published status '{status}' for session {session_id}") + + async def run(self): + """Main run loop.""" + await self.setup() + + # Subscribe to TTS requests + sub = await self.nc.subscribe(f"{REQUEST_SUBJECT_PREFIX}.>", cb=self.handle_request) + logger.info(f"Subscribed to {REQUEST_SUBJECT_PREFIX}.>") + + # Handle shutdown + def signal_handler(): + self.running = False + + loop = asyncio.get_event_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, signal_handler) + + # Keep running + while self.running: + await asyncio.sleep(1) + + # Cleanup + logger.info("Shutting down...") + await sub.unsubscribe() + await self.nc.close() + await self.http_client.aclose() + logger.info("Shutdown complete") + + +if __name__ == "__main__": + service = StreamingTTS() + asyncio.run(service.run())