fix: ruff formatting and allow-direct-references for handler-base dep
All checks were successful
CI / Lint (push) Successful in 43s
CI / Test (push) Successful in 49s
CI / Release (push) Successful in 6s
CI / Notify (push) Successful in 1s

This commit is contained in:
2026-02-02 08:44:34 -05:00
parent 09e8135f93
commit b8ba237946
4 changed files with 107 additions and 106 deletions

View File

@@ -11,6 +11,7 @@ Text-based chat pipeline using handler-base:
6. Optionally synthesize speech with XTTS
7. Publish result to NATS "ai.chat.response.{request_id}"
"""
import base64
import logging
from typing import Any, Optional
@@ -32,14 +33,14 @@ logger = logging.getLogger("chat-handler")
class ChatSettings(Settings):
"""Chat handler specific settings."""
service_name: str = "chat-handler"
# RAG settings
rag_top_k: int = 10
rag_rerank_top_k: int = 5
rag_collection: str = "documents"
# Response settings
include_sources: bool = True
enable_tts: bool = False
@@ -49,7 +50,7 @@ class ChatSettings(Settings):
class ChatHandler(Handler):
"""
Chat request handler with RAG pipeline.
Request format:
{
"request_id": "uuid",
@@ -58,7 +59,7 @@ class ChatHandler(Handler):
"enable_tts": false,
"system_prompt": "optional custom system prompt"
}
Response format:
{
"request_id": "uuid",
@@ -67,7 +68,7 @@ class ChatHandler(Handler):
"audio": "base64 encoded audio (if tts enabled)"
}
"""
def __init__(self):
self.chat_settings = ChatSettings()
super().__init__(
@@ -75,41 +76,41 @@ class ChatHandler(Handler):
settings=self.chat_settings,
queue_group="chat-handlers",
)
async def setup(self) -> None:
"""Initialize service clients."""
logger.info("Initializing service clients...")
self.embeddings = EmbeddingsClient(self.chat_settings)
self.reranker = RerankerClient(self.chat_settings)
self.llm = LLMClient(self.chat_settings)
self.milvus = MilvusClient(self.chat_settings)
# TTS is optional
if self.chat_settings.enable_tts:
self.tts = TTSClient(self.chat_settings)
else:
self.tts = None
# Connect to Milvus
await self.milvus.connect(self.chat_settings.rag_collection)
logger.info("Service clients initialized")
async def teardown(self) -> None:
"""Clean up service clients."""
logger.info("Closing service clients...")
await self.embeddings.close()
await self.reranker.close()
await self.llm.close()
await self.milvus.close()
if self.tts:
await self.tts.close()
logger.info("Service clients closed")
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
"""Handle incoming chat request."""
request_id = data.get("request_id", "unknown")
@@ -117,67 +118,62 @@ class ChatHandler(Handler):
collection = data.get("collection", self.chat_settings.rag_collection)
enable_tts = data.get("enable_tts", self.chat_settings.enable_tts)
system_prompt = data.get("system_prompt")
logger.info(f"Processing request {request_id}: {query[:50]}...")
with create_span("chat.process") as span:
if span:
span.set_attribute("request.id", request_id)
span.set_attribute("query.length", len(query))
# 1. Generate query embedding
embedding = await self._get_embedding(query)
# 2. Search Milvus for context
documents = await self._search_context(embedding, collection)
# 3. Rerank documents
reranked = await self._rerank_documents(query, documents)
# 4. Build context from top documents
context = self._build_context(reranked)
# 5. Generate LLM response
response_text = await self._generate_response(
query, context, system_prompt
)
response_text = await self._generate_response(query, context, system_prompt)
# 6. Optionally synthesize speech
audio_b64 = None
if enable_tts and self.tts:
audio_b64 = await self._synthesize_speech(response_text)
# Build response
result = {
"request_id": request_id,
"response": response_text,
}
if self.chat_settings.include_sources:
result["sources"] = [
{"text": d["document"][:200], "score": d["score"]}
for d in reranked[:3]
{"text": d["document"][:200], "score": d["score"]} for d in reranked[:3]
]
if audio_b64:
result["audio"] = audio_b64
logger.info(f"Completed request {request_id}")
# Publish to response subject
response_subject = f"ai.chat.response.{request_id}"
await self.nats.publish(response_subject, result)
return result
async def _get_embedding(self, text: str) -> list[float]:
"""Generate embedding for query text."""
with create_span("chat.embedding"):
return await self.embeddings.embed_single(text)
async def _search_context(
self, embedding: list[float], collection: str
) -> list[dict]:
async def _search_context(self, embedding: list[float], collection: str) -> list[dict]:
"""Search Milvus for relevant documents."""
with create_span("chat.search"):
return await self.milvus.search_with_texts(
@@ -186,17 +182,15 @@ class ChatHandler(Handler):
text_field="text",
metadata_fields=["source", "title"],
)
async def _rerank_documents(
self, query: str, documents: list[dict]
) -> list[dict]:
async def _rerank_documents(self, query: str, documents: list[dict]) -> list[dict]:
"""Rerank documents by relevance to query."""
with create_span("chat.rerank"):
texts = [d.get("text", "") for d in documents]
return await self.reranker.rerank(
query, texts, top_k=self.chat_settings.rag_rerank_top_k
)
def _build_context(self, documents: list[dict]) -> str:
"""Build context string from ranked documents."""
context_parts = []
@@ -204,7 +198,7 @@ class ChatHandler(Handler):
text = doc.get("document", "")
context_parts.append(f"[{i}] {text}")
return "\n\n".join(context_parts)
async def _generate_response(
self,
query: str,
@@ -218,7 +212,7 @@ class ChatHandler(Handler):
context=context,
system_prompt=system_prompt,
)
async def _synthesize_speech(self, text: str) -> str:
"""Synthesize speech and return base64 encoded audio."""
with create_span("chat.tts"):