From 194a431e8c41bc607dc938de01aa57ab6844d4d7 Mon Sep 17 00:00:00 2001 From: "Billy D." Date: Sun, 22 Feb 2026 10:45:58 -0500 Subject: [PATCH] feat(tts): add streaming SSE endpoint and sentence splitter - Add POST /stream SSE endpoint that splits text into sentences, synthesizes each individually, and streams base64 WAV via SSE events - Add _split_sentences() helper for robust sentence boundary detection - Enables progressive audio playback for lower time-to-first-audio --- ray_serve/serve_tts.py | 101 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 98 insertions(+), 3 deletions(-) diff --git a/ray_serve/serve_tts.py b/ray_serve/serve_tts.py index b409933..e213444 100644 --- a/ray_serve/serve_tts.py +++ b/ray_serve/serve_tts.py @@ -2,19 +2,22 @@ Ray Serve deployment for Coqui TTS. Runs on: elminster (RTX 2070 8GB, CUDA) -Provides two API styles: +Provides three API styles: POST /tts — JSON body → JSON response with base64 audio + POST /tts/stream — JSON body → SSE with per-sentence base64 audio chunks GET /tts/api/tts — Coqui-compatible query params → raw WAV bytes """ import base64 import io +import json import os +import re import time from typing import Any -from fastapi import FastAPI, Query -from fastapi.responses import Response +from fastapi import FastAPI, Query, Request +from fastapi.responses import Response, StreamingResponse from ray import serve try: @@ -24,6 +27,32 @@ except ImportError: _fastapi = FastAPI() +# ── Sentence splitting for streaming ───────────────────────────────────── + +_SENTENCE_RE = re.compile(r"(?<=[.!?;])\s+|(?<=\n)\s*", re.MULTILINE) + + +def _split_sentences(text: str, max_len: int = 200) -> list[str]: + """Split text into sentences suitable for per-sentence TTS streaming.""" + text = re.sub(r"[ \t]+", " ", text).strip() + if not text: + return [] + + raw_parts = _SENTENCE_RE.split(text) + sentences: list[str] = [] + for part in raw_parts: + part = part.strip() + if not part: + continue + if len(part) > max_len: + for sp in re.split(r"(?<=[,;:])\s+", part): + sp = sp.strip() + if sp: + sentences.append(sp) + else: + sentences.append(part) + return sentences + @serve.deployment(name="TTSDeployment", num_replicas=1) @serve.ingress(_fastapi) @@ -154,6 +183,72 @@ class TTSDeployment: except Exception as e: return Response(content=str(e), status_code=500) + # ── POST /stream — SSE per-sentence streaming ────────────────────── + + @_fastapi.post("/stream") + async def generate_stream(self, request: Request) -> StreamingResponse: + """Stream TTS audio as SSE events, one per sentence. + + Request body: + {"text": "...", "language": "en", "speaker": null, "speed": 1.0} + + Each SSE event is a JSON object: + data: {"text": "sentence", "audio": "", "index": 0, + "sample_rate": 24000, "duration": 1.23, "done": false} + + Final event: + data: {"text": "", "audio": "", "index": N, "done": true} + followed by: + data: [DONE] + """ + body = await request.json() + text = body.get("text", "") + if not text: + async def _empty(): + yield 'data: {"error": "No text provided"}\n\n' + yield "data: [DONE]\n\n" + return StreamingResponse(_empty(), media_type="text/event-stream") + + speaker = body.get("speaker") + language = body.get("language") + speed = body.get("speed", 1.0) + sentences = _split_sentences(text) + + async def _generate(): + for idx, sentence in enumerate(sentences): + _start = time.time() + try: + audio_bytes, sample_rate, duration = self._synthesize( + sentence, speaker, language, speed + ) + self._log(_start, duration, len(sentence)) + chunk = { + "text": sentence, + "audio": base64.b64encode(audio_bytes).decode("utf-8"), + "index": idx, + "sample_rate": sample_rate, + "duration": duration, + "done": False, + } + except Exception as e: + chunk = { + "text": sentence, + "audio": "", + "index": idx, + "error": str(e), + "done": False, + } + yield f"data: {json.dumps(chunk)}\n\n" + + yield f'data: {json.dumps({"text": "", "audio": "", "index": len(sentences), "done": True})}\n\n' + yield "data: [DONE]\n\n" + + return StreamingResponse( + _generate(), + media_type="text/event-stream", + headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"}, + ) + # ── GET /speakers — list available speakers ────────────────────────── @_fastapi.get("/speakers")