style: fix ruff lint and formatting issues
All checks were successful
CI / Docker Build & Push (push) Successful in 5m38s
CI / Deploy to Kubernetes (push) Successful in 1m21s
CI / Notify (push) Successful in 1s
CI / Lint (push) Successful in 1m4s
CI / Release (push) Successful in 54s

- tts.py: rename ambiguous variable 'l' to 'line' (E741)
- tts.py, llm.py: apply ruff formatter
This commit is contained in:
2026-02-22 10:55:00 -05:00
parent dfe93ae856
commit 0d1c40725e
2 changed files with 73 additions and 25 deletions

14
llm.py
View File

@@ -168,10 +168,12 @@ async def chat_stream(
messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "system", "content": system_prompt})
for entry in history: for entry in history:
messages.append({ messages.append(
{
"role": entry["role"], "role": entry["role"],
"content": _extract_content(entry["content"]), "content": _extract_content(entry["content"]),
}) }
)
messages.append({"role": "user", "content": message}) messages.append({"role": "user", "content": message})
@@ -214,7 +216,9 @@ async def chat_stream(
continue continue
latency = time.time() - start_time latency = time.time() - start_time
logger.info("LLM streamed response: %d chars in %.1fs", len(full_text), latency) logger.info(
"LLM streamed response: %d chars in %.1fs", len(full_text), latency
)
# Best-effort metrics from the final SSE payload # Best-effort metrics from the final SSE payload
_log_llm_metrics( _log_llm_metrics(
@@ -229,9 +233,7 @@ async def chat_stream(
# Non-streaming fallback (endpoint doesn't support stream) # Non-streaming fallback (endpoint doesn't support stream)
body = await response.aread() body = await response.aread()
result = json.loads(body) result = json.loads(body)
text = _extract_content( text = _extract_content(result["choices"][0]["message"]["content"])
result["choices"][0]["message"]["content"]
)
latency = time.time() - start_time latency = time.time() - start_time
usage = result.get("usage", {}) usage = result.get("usage", {})

80
tts.py
View File

@@ -132,8 +132,16 @@ LANGUAGES = {
_SENTENCE_RE = re.compile(r"(?<=[.!?;])\s+|(?<=\n)\s*", re.MULTILINE) _SENTENCE_RE = re.compile(r"(?<=[.!?;])\s+|(?<=\n)\s*", re.MULTILINE)
_DIGIT_WORDS = { _DIGIT_WORDS = {
"0": "zero", "1": "one", "2": "two", "3": "three", "4": "four", "0": "zero",
"5": "five", "6": "six", "7": "seven", "8": "eight", "9": "nine", "1": "one",
"2": "two",
"3": "three",
"4": "four",
"5": "five",
"6": "six",
"7": "seven",
"8": "eight",
"9": "nine",
} }
@@ -190,6 +198,7 @@ def _split_sentences(text: str) -> list[str]:
# ─── Audio helpers ─────────────────────────────────────────────────────── # ─── Audio helpers ───────────────────────────────────────────────────────
def _read_wav_bytes(data: bytes) -> tuple[int, np.ndarray]: def _read_wav_bytes(data: bytes) -> tuple[int, np.ndarray]:
"""Read WAV audio from bytes, handling scipy wavfile and standard WAV. """Read WAV audio from bytes, handling scipy wavfile and standard WAV.
@@ -211,7 +220,9 @@ def _read_wav_bytes(data: bytes) -> tuple[int, np.ndarray]:
elif sampwidth == 4: elif sampwidth == 4:
audio = np.frombuffer(raw, dtype=np.int32).astype(np.float32) / 2147483648.0 audio = np.frombuffer(raw, dtype=np.int32).astype(np.float32) / 2147483648.0
elif sampwidth == 1: elif sampwidth == 1:
audio = (np.frombuffer(raw, dtype=np.uint8).astype(np.float32) - 128.0) / 128.0 audio = (
np.frombuffer(raw, dtype=np.uint8).astype(np.float32) - 128.0
) / 128.0
else: else:
raise ValueError(f"Unsupported sample width: {sampwidth}") raise ValueError(f"Unsupported sample width: {sampwidth}")
@@ -237,7 +248,8 @@ def _read_wav_bytes(data: bytes) -> tuple[int, np.ndarray]:
# Last resort: raw 16-bit PCM at 22050 Hz # Last resort: raw 16-bit PCM at 22050 Hz
logger.warning( logger.warning(
"Could not parse WAV header (len=%d, first 4 bytes=%r); raw PCM decode", "Could not parse WAV header (len=%d, first 4 bytes=%r); raw PCM decode",
len(data), data[:4], len(data),
data[:4],
) )
audio = np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0 audio = np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0
return 22050, audio return 22050, audio
@@ -272,6 +284,7 @@ def _concat_audio(
# ─── TTS synthesis ─────────────────────────────────────────────────────── # ─── TTS synthesis ───────────────────────────────────────────────────────
def _synthesize_chunk(text: str, lang_code: str, speed: float = 1.0) -> bytes: def _synthesize_chunk(text: str, lang_code: str, speed: float = 1.0) -> bytes:
"""Synthesize a single text chunk via the TTS backend. """Synthesize a single text chunk via the TTS backend.
@@ -303,7 +316,9 @@ def _synthesize_chunk(text: str, lang_code: str, speed: float = 1.0) -> bytes:
# Non-JSON response — treat as raw audio bytes # Non-JSON response — treat as raw audio bytes
return resp.content return resp.content
except Exception: except Exception:
logger.debug("POST endpoint failed, falling back to GET /api/tts", exc_info=True) logger.debug(
"POST endpoint failed, falling back to GET /api/tts", exc_info=True
)
# Fallback: Coqui-compatible GET (no speed control) # Fallback: Coqui-compatible GET (no speed control)
resp = client.get( resp = client.get(
@@ -426,11 +441,16 @@ in multiple languages. Long text is automatically split into sentences for bette
label="Language", label="Language",
) )
speed = gr.Slider( speed = gr.Slider(
minimum=0.5, maximum=2.0, value=1.0, minimum=0.5,
step=0.1, label="Speed", maximum=2.0,
value=1.0,
step=0.1,
label="Speed",
) )
synthesize_btn = gr.Button( synthesize_btn = gr.Button(
"🔊 Synthesize", variant="primary", scale=2, "🔊 Synthesize",
variant="primary",
scale=2,
) )
with gr.Column(scale=1): with gr.Column(scale=1):
status_output = gr.Textbox(label="Status", interactive=False) status_output = gr.Textbox(label="Status", interactive=False)
@@ -446,9 +466,21 @@ in multiple languages. Long text is automatically split into sentences for bette
gr.Examples( gr.Examples(
examples=[ examples=[
["Hello! Welcome to Davies Tech Labs. This is a demonstration of our text-to-speech system.", "English", 1.0], [
["The quick brown fox jumps over the lazy dog. This sentence contains every letter of the alphabet.", "English", 1.0], "Hello! Welcome to Davies Tech Labs. This is a demonstration of our text-to-speech system.",
["Bonjour! Bienvenue au laboratoire technique de Davies.", "French", 1.0], "English",
1.0,
],
[
"The quick brown fox jumps over the lazy dog. This sentence contains every letter of the alphabet.",
"English",
1.0,
],
[
"Bonjour! Bienvenue au laboratoire technique de Davies.",
"French",
1.0,
],
["Hola! Bienvenido al laboratorio de tecnología.", "Spanish", 1.0], ["Hola! Bienvenido al laboratorio de tecnología.", "Spanish", 1.0],
["Guten Tag! Willkommen im Techniklabor.", "German", 1.0], ["Guten Tag! Willkommen im Techniklabor.", "German", 1.0],
], ],
@@ -462,9 +494,15 @@ in multiple languages. Long text is automatically split into sentences for bette
label="Text to Compare", value="Hello, how are you today?", lines=2 label="Text to Compare", value="Hello, how are you today?", lines=2
) )
with gr.Row(): with gr.Row():
lang1 = gr.Dropdown(choices=list(LANGUAGES.keys()), value="English", label="Language 1") lang1 = gr.Dropdown(
lang2 = gr.Dropdown(choices=list(LANGUAGES.keys()), value="Spanish", label="Language 2") choices=list(LANGUAGES.keys()), value="English", label="Language 1"
compare_speed = gr.Slider(minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speed") )
lang2 = gr.Dropdown(
choices=list(LANGUAGES.keys()), value="Spanish", label="Language 2"
)
compare_speed = gr.Slider(
minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speed"
)
compare_btn = gr.Button("Compare Languages", variant="primary") compare_btn = gr.Button("Compare Languages", variant="primary")
@@ -497,15 +535,23 @@ in multiple languages. Long text is automatically split into sentences for bette
placeholder="Enter multiple texts, one per line...", placeholder="Enter multiple texts, one per line...",
lines=6, lines=6,
) )
batch_lang = gr.Dropdown(choices=list(LANGUAGES.keys()), value="English", label="Language") batch_lang = gr.Dropdown(
batch_speed = gr.Slider(minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speed") choices=list(LANGUAGES.keys()), value="English", label="Language"
)
batch_speed = gr.Slider(
minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speed"
)
batch_btn = gr.Button("Synthesize All", variant="primary") batch_btn = gr.Button("Synthesize All", variant="primary")
batch_status = gr.Textbox(label="Status", interactive=False) batch_status = gr.Textbox(label="Status", interactive=False)
batch_audio = gr.Audio(label="Combined Audio", type="numpy") batch_audio = gr.Audio(label="Combined Audio", type="numpy")
def batch_synthesize(texts_raw: str, lang: str, spd: float): def batch_synthesize(texts_raw: str, lang: str, spd: float):
lines = [l.strip() for l in texts_raw.strip().splitlines() if l.strip()] lines = [
line.strip()
for line in texts_raw.strip().splitlines()
if line.strip()
]
if not lines: if not lines:
return "❌ Please enter at least one line of text", None return "❌ Please enter at least one line of text", None
combined = "\n".join(lines) combined = "\n".join(lines)