# Streaming TTS Module A dedicated Text-to-Speech (TTS) service that processes synthesis requests from NATS using Coqui XTTS. ## Overview This module enables real-time text-to-speech synthesis by accepting text via NATS and streaming audio chunks back as they're generated. This reduces latency for voice assistant applications by allowing playback to begin before synthesis completes. ## Features - **NATS Integration**: Accepts TTS requests via NATS messaging - **Streaming Audio**: Streams audio chunks back for immediate playback - **Voice Cloning**: Support for custom speaker voices via reference audio - **Custom Trained Voices**: Automatic discovery of voices trained by the `coqui-voice-training` Argo workflow - **Voice Registry**: Lists available voices and refreshes on-demand or periodically - **Multi-language**: Support for multiple languages via XTTS - **OpenTelemetry**: Full observability with tracing and metrics - **HyperDX Support**: Optional cloud observability integration ## Architecture ``` ┌─────────────────┐ │ Voice App │ (voice-assistant, chat-handler) │ │ └────────┬────────┘ │ Text ▼ ┌─────────────────┐ │ NATS Subject │ ai.voice.tts.request.{session_id} │ TTS Request │ └────────┬────────┘ │ ▼ ┌─────────────────┐ │ TTS Streaming │ (This Service) │ Service │ - Calls XTTS API │ │ - Streams audio chunks └────────┬────────┘ │ ▼ ┌─────────────────┐ │ NATS Subject │ ai.voice.tts.audio.{session_id} │ Audio Chunks │ └─────────────────┘ ``` ## NATS Message Protocol ### TTS Request (ai.voice.tts.request.{session_id}) All messages use **msgpack** binary encoding. **Request:** ```python { "text": "Hello, how can I help you today?", "speaker": "default", # Optional: speaker ID or custom voice name "language": "en", # Optional: language code "speaker_wav_b64": "...", # Optional: base64 reference audio for ad-hoc voice cloning "stream": True # Optional: stream chunks (default) or send complete audio } ``` > **Custom voices:** When `speaker` matches the name of a custom trained voice > in the voice registry, the service automatically routes to the trained model. > No `speaker_wav_b64` is needed for trained voices. ### Audio Output (ai.voice.tts.audio.{session_id}) **Streamed Chunk:** ```python { "session_id": "unique-session-id", "chunk_index": 0, "total_chunks": 5, "audio_b64": "base64-encoded-audio-chunk", "is_last": False, "timestamp": 1234567890.123, "sample_rate": 24000 } ``` **Complete Audio (when stream=False):** ```python { "session_id": "unique-session-id", "audio_b64": "base64-encoded-complete-audio", "timestamp": 1234567890.123, "sample_rate": 24000 } ``` ### Status Updates (ai.voice.tts.status.{session_id}) ```python { "session_id": "unique-session-id", "status": "processing", # processing, completed, error "message": "Synthesizing 50 characters", "timestamp": 1234567890.123 } ``` ## Environment Variables | Variable | Default | Description | |----------|---------|-------------| | `NATS_URL` | `nats://nats.ai-ml.svc.cluster.local:4222` | NATS server URL | | `XTTS_URL` | `http://xtts-predictor.ai-ml.svc.cluster.local` | Coqui XTTS service URL | | `TTS_DEFAULT_SPEAKER` | `default` | Default speaker ID | | `TTS_DEFAULT_LANGUAGE` | `en` | Default language code | | `TTS_AUDIO_CHUNK_SIZE` | `32768` | Audio chunk size in bytes | | `TTS_SAMPLE_RATE` | `24000` | Audio sample rate (Hz) | | `VOICE_MODEL_STORE` | `/models/tts/custom` | Path to custom voice models (NFS mount) | | `VOICE_REGISTRY_REFRESH_SECONDS` | `300` | Interval to rescan model store for new voices | | `OTEL_ENABLED` | `true` | Enable OpenTelemetry | | `HYPERDX_ENABLED` | `false` | Enable HyperDX observability | ## Building ```bash docker build -t tts-module:latest . ``` ## Testing ```bash # Port-forward NATS kubectl port-forward -n ai-ml svc/nats 4222:4222 # Send TTS request python -c " import nats import msgpack import asyncio async def test(): nc = await nats.connect('nats://localhost:4222') request = { 'text': 'Hello, this is a test of text to speech.', 'stream': True } await nc.publish( 'ai.voice.tts.request.test-session', msgpack.packb(request) ) await nc.close() asyncio.run(test()) " # Subscribe to audio output nats sub "ai.voice.tts.audio.>" ``` ## Voice Cloning To use a custom voice, provide reference audio in the request: ```python import base64 with open("reference_voice.wav", "rb") as f: speaker_wav_b64 = base64.b64encode(f.read()).decode() request = { "text": "This will sound like the reference voice.", "speaker_wav_b64": speaker_wav_b64 } ``` ## Custom Trained Voices The `coqui-voice-training` Argo workflow trains custom TTS models and exports them to the model store (`VOICE_MODEL_STORE`, default `/models/tts/custom`). The TTS module discovers these voices automatically on startup and periodically re-scans for newly trained voices. ### How it works 1. The voice training pipeline exports a model to `/models/tts/custom/{voice-name}/` 2. Each directory contains `model.pth`, `config.json`, and `model_info.json` 3. The TTS module scans the store and registers each voice by name 4. Requests with `"speaker": "my-voice"` automatically route to the trained model ### Using a trained voice ```python # Just set the speaker to the voice name — no reference audio needed request = { "text": "This uses a fine-tuned voice model.", "speaker": "my-custom-voice" # Matches {voice-name} from training pipeline } ``` ### Listing available voices Send a NATS request to `ai.voice.tts.voices.list`: ```python import nats import msgpack import asyncio async def list_voices(): nc = await nats.connect("nats://localhost:4222") resp = await nc.request("ai.voice.tts.voices.list", b"", timeout=5) data = msgpack.unpackb(resp.data, raw=False) print(f"Default speaker: {data['default_speaker']}") for voice in data["custom_voices"]: print(f" - {voice['name']} ({voice['language']}, trained {voice['created_at']})") await nc.close() asyncio.run(list_voices()) ``` ### Refreshing the voice registry Voices are re-scanned every `VOICE_REGISTRY_REFRESH_SECONDS` (default 5 min). To trigger an immediate refresh, publish to `ai.voice.tts.voices.refresh`: ```python resp = await nc.request("ai.voice.tts.voices.refresh", b"", timeout=10) data = msgpack.unpackb(resp.data, raw=False) print(f"Found {data['count']} custom voice(s)") ``` ## License MIT