235 lines
7.7 KiB
Python
235 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Voice Assistant Service (Refactored)
|
|
|
|
End-to-end voice assistant pipeline using handler-base:
|
|
1. Listen for audio on NATS subject "voice.request"
|
|
2. Transcribe with Whisper (STT)
|
|
3. Generate embeddings for RAG
|
|
4. Retrieve context from Milvus
|
|
5. Rerank with BGE reranker
|
|
6. Generate response with vLLM
|
|
7. Synthesize speech with XTTS
|
|
8. Publish result to NATS "voice.response.{request_id}"
|
|
"""
|
|
import base64
|
|
import logging
|
|
from typing import Any, Optional
|
|
|
|
from nats.aio.msg import Msg
|
|
|
|
from handler_base import Handler, Settings
|
|
from handler_base.clients import (
|
|
EmbeddingsClient,
|
|
RerankerClient,
|
|
LLMClient,
|
|
TTSClient,
|
|
STTClient,
|
|
MilvusClient,
|
|
)
|
|
from handler_base.telemetry import create_span
|
|
|
|
logger = logging.getLogger("voice-assistant")
|
|
|
|
|
|
class VoiceSettings(Settings):
|
|
"""Voice assistant specific settings."""
|
|
|
|
service_name: str = "voice-assistant"
|
|
|
|
# RAG settings
|
|
rag_top_k: int = 10
|
|
rag_rerank_top_k: int = 5
|
|
rag_collection: str = "documents"
|
|
|
|
# Audio settings
|
|
stt_language: Optional[str] = None # Auto-detect
|
|
tts_language: str = "en"
|
|
|
|
# Response settings
|
|
include_transcription: bool = True
|
|
include_sources: bool = False
|
|
|
|
|
|
class VoiceAssistant(Handler):
|
|
"""
|
|
Voice request handler with full STT -> RAG -> LLM -> TTS pipeline.
|
|
|
|
Request format (msgpack):
|
|
{
|
|
"request_id": "uuid",
|
|
"audio": "base64 encoded audio",
|
|
"language": "optional language code",
|
|
"collection": "optional collection name"
|
|
}
|
|
|
|
Response format:
|
|
{
|
|
"request_id": "uuid",
|
|
"transcription": "what the user said",
|
|
"response": "generated text response",
|
|
"audio": "base64 encoded response audio"
|
|
}
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.voice_settings = VoiceSettings()
|
|
super().__init__(
|
|
subject="voice.request",
|
|
settings=self.voice_settings,
|
|
queue_group="voice-assistants",
|
|
)
|
|
|
|
async def setup(self) -> None:
|
|
"""Initialize service clients."""
|
|
logger.info("Initializing voice assistant clients...")
|
|
|
|
self.stt = STTClient(self.voice_settings)
|
|
self.embeddings = EmbeddingsClient(self.voice_settings)
|
|
self.reranker = RerankerClient(self.voice_settings)
|
|
self.llm = LLMClient(self.voice_settings)
|
|
self.tts = TTSClient(self.voice_settings)
|
|
self.milvus = MilvusClient(self.voice_settings)
|
|
|
|
await self.milvus.connect(self.voice_settings.rag_collection)
|
|
|
|
logger.info("Voice assistant clients initialized")
|
|
|
|
async def teardown(self) -> None:
|
|
"""Clean up service clients."""
|
|
logger.info("Closing voice assistant clients...")
|
|
|
|
await self.stt.close()
|
|
await self.embeddings.close()
|
|
await self.reranker.close()
|
|
await self.llm.close()
|
|
await self.tts.close()
|
|
await self.milvus.close()
|
|
|
|
logger.info("Voice assistant clients closed")
|
|
|
|
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
|
|
"""Handle incoming voice request."""
|
|
request_id = data.get("request_id", "unknown")
|
|
audio_b64 = data.get("audio", "")
|
|
language = data.get("language", self.voice_settings.stt_language)
|
|
collection = data.get("collection", self.voice_settings.rag_collection)
|
|
|
|
logger.info(f"Processing voice request {request_id}")
|
|
|
|
with create_span("voice.process") as span:
|
|
if span:
|
|
span.set_attribute("request.id", request_id)
|
|
|
|
# 1. Decode audio
|
|
audio_bytes = base64.b64decode(audio_b64)
|
|
|
|
# 2. Transcribe audio to text
|
|
transcription = await self._transcribe(audio_bytes, language)
|
|
query = transcription.get("text", "")
|
|
|
|
if not query.strip():
|
|
logger.warning(f"Empty transcription for request {request_id}")
|
|
return {
|
|
"request_id": request_id,
|
|
"error": "Could not transcribe audio",
|
|
}
|
|
|
|
logger.info(f"Transcribed: {query[:50]}...")
|
|
|
|
# 3. Generate query embedding
|
|
embedding = await self._get_embedding(query)
|
|
|
|
# 4. Search Milvus for context
|
|
documents = await self._search_context(embedding, collection)
|
|
|
|
# 5. Rerank documents
|
|
reranked = await self._rerank_documents(query, documents)
|
|
|
|
# 6. Build context
|
|
context = self._build_context(reranked)
|
|
|
|
# 7. Generate LLM response
|
|
response_text = await self._generate_response(query, context)
|
|
|
|
# 8. Synthesize speech
|
|
response_audio = await self._synthesize_speech(response_text)
|
|
|
|
# Build response
|
|
result = {
|
|
"request_id": request_id,
|
|
"response": response_text,
|
|
"audio": response_audio,
|
|
}
|
|
|
|
if self.voice_settings.include_transcription:
|
|
result["transcription"] = query
|
|
|
|
if self.voice_settings.include_sources:
|
|
result["sources"] = [
|
|
{"text": d["document"][:200], "score": d["score"]}
|
|
for d in reranked[:3]
|
|
]
|
|
|
|
logger.info(f"Completed voice request {request_id}")
|
|
|
|
# Publish to response subject
|
|
response_subject = f"voice.response.{request_id}"
|
|
await self.nats.publish(response_subject, result)
|
|
|
|
return result
|
|
|
|
async def _transcribe(
|
|
self, audio: bytes, language: Optional[str]
|
|
) -> dict:
|
|
"""Transcribe audio to text."""
|
|
with create_span("voice.stt"):
|
|
return await self.stt.transcribe(audio, language=language)
|
|
|
|
async def _get_embedding(self, text: str) -> list[float]:
|
|
"""Generate embedding for query text."""
|
|
with create_span("voice.embedding"):
|
|
return await self.embeddings.embed_single(text)
|
|
|
|
async def _search_context(
|
|
self, embedding: list[float], collection: str
|
|
) -> list[dict]:
|
|
"""Search Milvus for relevant documents."""
|
|
with create_span("voice.search"):
|
|
return await self.milvus.search_with_texts(
|
|
embedding,
|
|
limit=self.voice_settings.rag_top_k,
|
|
text_field="text",
|
|
)
|
|
|
|
async def _rerank_documents(
|
|
self, query: str, documents: list[dict]
|
|
) -> list[dict]:
|
|
"""Rerank documents by relevance."""
|
|
with create_span("voice.rerank"):
|
|
texts = [d.get("text", "") for d in documents]
|
|
return await self.reranker.rerank(
|
|
query, texts, top_k=self.voice_settings.rag_rerank_top_k
|
|
)
|
|
|
|
def _build_context(self, documents: list[dict]) -> str:
|
|
"""Build context string from ranked documents."""
|
|
return "\n\n".join(d.get("document", "") for d in documents)
|
|
|
|
async def _generate_response(self, query: str, context: str) -> str:
|
|
"""Generate LLM response."""
|
|
with create_span("voice.generate"):
|
|
return await self.llm.generate(query, context=context)
|
|
|
|
async def _synthesize_speech(self, text: str) -> str:
|
|
"""Synthesize speech and return base64."""
|
|
with create_span("voice.tts"):
|
|
audio_bytes = await self.tts.synthesize(
|
|
text, language=self.voice_settings.tts_language
|
|
)
|
|
return base64.b64encode(audio_bytes).decode()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
VoiceAssistant().run()
|