Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a1cf87909d | |||
| 24a4098c9a | |||
| b34e8d2e1c | |||
| 3a4a13f0de | |||
| 3d71ecd8e0 |
@@ -18,10 +18,7 @@ jobs:
|
|||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
- name: Set up uv
|
- name: Set up uv
|
||||||
uses: astral-sh/setup-uv@v7
|
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
|
||||||
with:
|
|
||||||
version: "latest"
|
|
||||||
activate-environment: false
|
|
||||||
|
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
run: uv python install 3.13
|
run: uv python install 3.13
|
||||||
@@ -43,10 +40,7 @@ jobs:
|
|||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
- name: Set up uv
|
- name: Set up uv
|
||||||
uses: astral-sh/setup-uv@v7
|
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
|
||||||
with:
|
|
||||||
version: "latest"
|
|
||||||
activate-environment: false
|
|
||||||
|
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
run: uv python install 3.13
|
run: uv python install 3.13
|
||||||
|
|||||||
32
.pre-commit-config.yaml
Normal file
32
.pre-commit-config.yaml
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
# Pre-commit hooks for chat-handler
|
||||||
|
# Install: pip install pre-commit && pre-commit install
|
||||||
|
# Run: pre-commit run --all-files
|
||||||
|
|
||||||
|
repos:
|
||||||
|
# Ruff - fast Python linter and formatter
|
||||||
|
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||||
|
rev: v0.4.4
|
||||||
|
hooks:
|
||||||
|
- id: ruff
|
||||||
|
args: [--fix, --exit-non-zero-on-fix]
|
||||||
|
- id: ruff-format
|
||||||
|
|
||||||
|
# Standard pre-commit hooks
|
||||||
|
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||||
|
rev: v4.6.0
|
||||||
|
hooks:
|
||||||
|
- id: trailing-whitespace
|
||||||
|
- id: end-of-file-fixer
|
||||||
|
- id: check-yaml
|
||||||
|
- id: check-added-large-files
|
||||||
|
args: [--maxkb=500]
|
||||||
|
- id: check-merge-conflict
|
||||||
|
- id: detect-private-key
|
||||||
|
|
||||||
|
# Type checking (optional - uncomment when ready)
|
||||||
|
# - repo: https://github.com/pre-commit/mirrors-mypy
|
||||||
|
# rev: v1.10.0
|
||||||
|
# hooks:
|
||||||
|
# - id: mypy
|
||||||
|
# additional_dependencies: [types-all]
|
||||||
|
# args: [--ignore-missing-imports]
|
||||||
175
chat_handler.py
175
chat_handler.py
@@ -3,13 +3,12 @@
|
|||||||
Chat Handler Service (Refactored)
|
Chat Handler Service (Refactored)
|
||||||
|
|
||||||
Text-based chat pipeline using handler-base:
|
Text-based chat pipeline using handler-base:
|
||||||
1. Listen for text on NATS subject "ai.chat.request"
|
1. Listen for text on NATS subject "ai.chat.user.*.message"
|
||||||
2. Generate embeddings for RAG
|
2. If RAG enabled (premium/explicit): embed → Milvus search → rerank
|
||||||
3. Retrieve context from Milvus
|
3. Generate response with vLLM (with or without RAG context)
|
||||||
4. Rerank with BGE reranker
|
4. Optionally stream chunks to "ai.chat.response.stream.{request_id}"
|
||||||
5. Generate response with vLLM
|
5. Optionally synthesize speech with XTTS
|
||||||
6. Optionally synthesize speech with XTTS
|
6. Publish result to "ai.chat.response.{request_id}" (or custom response_subject)
|
||||||
7. Publish result to NATS "ai.chat.response.{request_id}"
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
@@ -51,28 +50,38 @@ class ChatHandler(Handler):
|
|||||||
"""
|
"""
|
||||||
Chat request handler with RAG pipeline.
|
Chat request handler with RAG pipeline.
|
||||||
|
|
||||||
Request format:
|
Subscribes to: ai.chat.user.*.message (JetStream durable "chat-handler")
|
||||||
|
|
||||||
|
Request format (msgpack):
|
||||||
{
|
{
|
||||||
"request_id": "uuid",
|
"request_id": "uuid",
|
||||||
"query": "user question",
|
"user_id": "user-123",
|
||||||
"collection": "optional collection name",
|
"username": "john_doe",
|
||||||
"enable_tts": false,
|
"message": "user question",
|
||||||
|
"premium": false,
|
||||||
|
"enable_rag": true,
|
||||||
|
"enable_reranker": true,
|
||||||
|
"enable_streaming": true,
|
||||||
|
"top_k": 5,
|
||||||
|
"session_id": "session-abc",
|
||||||
"system_prompt": "optional custom system prompt"
|
"system_prompt": "optional custom system prompt"
|
||||||
}
|
}
|
||||||
|
|
||||||
Response format:
|
Response format (msgpack):
|
||||||
{
|
{
|
||||||
"request_id": "uuid",
|
"user_id": "user-123",
|
||||||
"response": "generated response",
|
"response": "generated response",
|
||||||
"sources": [{"text": "...", "score": 0.95}],
|
"response_text": "generated response",
|
||||||
"audio": "base64 encoded audio (if tts enabled)"
|
"used_rag": true,
|
||||||
|
"rag_sources": ["source1", "source2"],
|
||||||
|
"success": true
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.chat_settings = ChatSettings()
|
self.chat_settings = ChatSettings()
|
||||||
super().__init__(
|
super().__init__(
|
||||||
subject="ai.chat.request",
|
subject="ai.chat.user.*.message",
|
||||||
settings=self.chat_settings,
|
settings=self.chat_settings,
|
||||||
queue_group="chat-handlers",
|
queue_group="chat-handlers",
|
||||||
)
|
)
|
||||||
@@ -114,56 +123,96 @@ class ChatHandler(Handler):
|
|||||||
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
|
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
|
||||||
"""Handle incoming chat request."""
|
"""Handle incoming chat request."""
|
||||||
request_id = data.get("request_id", "unknown")
|
request_id = data.get("request_id", "unknown")
|
||||||
query = data.get("query", "")
|
user_id = data.get("user_id", "unknown")
|
||||||
|
query = data.get("message", "") or data.get("query", "")
|
||||||
|
premium = data.get("premium", False)
|
||||||
|
enable_rag = data.get("enable_rag", premium)
|
||||||
|
enable_reranker = data.get("enable_reranker", enable_rag)
|
||||||
|
enable_streaming = data.get("enable_streaming", False)
|
||||||
|
top_k = data.get("top_k", self.chat_settings.rag_top_k)
|
||||||
collection = data.get("collection", self.chat_settings.rag_collection)
|
collection = data.get("collection", self.chat_settings.rag_collection)
|
||||||
enable_tts = data.get("enable_tts", self.chat_settings.enable_tts)
|
enable_tts = data.get("enable_tts", self.chat_settings.enable_tts)
|
||||||
system_prompt = data.get("system_prompt")
|
system_prompt = data.get("system_prompt")
|
||||||
|
# companions-frontend may set a custom response subject
|
||||||
|
response_subject = data.get("response_subject", f"ai.chat.response.{request_id}")
|
||||||
|
|
||||||
logger.info(f"Processing request {request_id}: {query[:50]}...")
|
logger.info(f"Processing request {request_id}: {query[:50]}...")
|
||||||
|
|
||||||
with create_span("chat.process") as span:
|
with create_span("chat.process") as span:
|
||||||
if span:
|
if span:
|
||||||
span.set_attribute("request.id", request_id)
|
span.set_attribute("request.id", request_id)
|
||||||
|
span.set_attribute("user.id", user_id)
|
||||||
span.set_attribute("query.length", len(query))
|
span.set_attribute("query.length", len(query))
|
||||||
|
span.set_attribute("premium", premium)
|
||||||
|
span.set_attribute("rag.enabled", enable_rag)
|
||||||
|
|
||||||
# 1. Generate query embedding
|
context = ""
|
||||||
embedding = await self._get_embedding(query)
|
rag_sources: list[str] = []
|
||||||
|
used_rag = False
|
||||||
|
|
||||||
# 2. Search Milvus for context
|
# Only run RAG pipeline when enabled (premium users or explicit flag)
|
||||||
documents = await self._search_context(embedding, collection)
|
if enable_rag:
|
||||||
|
# 1. Generate query embedding
|
||||||
|
embedding = await self._get_embedding(query)
|
||||||
|
|
||||||
# 3. Rerank documents
|
# 2. Search Milvus for context
|
||||||
reranked = await self._rerank_documents(query, documents)
|
documents = await self._search_context(
|
||||||
|
embedding,
|
||||||
|
collection,
|
||||||
|
top_k=top_k,
|
||||||
|
)
|
||||||
|
|
||||||
# 4. Build context from top documents
|
# 3. Optionally rerank documents
|
||||||
context = self._build_context(reranked)
|
if enable_reranker and documents:
|
||||||
|
reranked = await self._rerank_documents(query, documents)
|
||||||
|
else:
|
||||||
|
reranked = documents
|
||||||
|
|
||||||
# 5. Generate LLM response
|
# 4. Build context from top documents
|
||||||
response_text = await self._generate_response(query, context, system_prompt)
|
if reranked:
|
||||||
|
context = self._build_context(reranked)
|
||||||
|
rag_sources = [
|
||||||
|
d.get("source", d.get("document", "")[:80]) for d in reranked[:3]
|
||||||
|
]
|
||||||
|
used_rag = True
|
||||||
|
|
||||||
# 6. Optionally synthesize speech
|
# 5. Generate LLM response (with or without RAG context)
|
||||||
|
response_text = await self._generate_response(
|
||||||
|
query,
|
||||||
|
context or None,
|
||||||
|
system_prompt,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 6. Stream response chunks if requested
|
||||||
|
if enable_streaming:
|
||||||
|
stream_subject = f"ai.chat.response.stream.{request_id}"
|
||||||
|
await self._publish_streaming_chunks(
|
||||||
|
stream_subject,
|
||||||
|
request_id,
|
||||||
|
response_text,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 7. Optionally synthesize speech
|
||||||
audio_b64 = None
|
audio_b64 = None
|
||||||
if enable_tts and self.tts:
|
if enable_tts and self.tts:
|
||||||
audio_b64 = await self._synthesize_speech(response_text)
|
audio_b64 = await self._synthesize_speech(response_text)
|
||||||
|
|
||||||
# Build response
|
# Build response (compatible with companions-frontend NATSChatResponse)
|
||||||
result = {
|
result: dict[str, Any] = {
|
||||||
"request_id": request_id,
|
"user_id": user_id,
|
||||||
"response": response_text,
|
"response": response_text,
|
||||||
|
"response_text": response_text,
|
||||||
|
"used_rag": used_rag,
|
||||||
|
"rag_sources": rag_sources,
|
||||||
|
"success": True,
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.chat_settings.include_sources:
|
|
||||||
result["sources"] = [
|
|
||||||
{"text": d["document"][:200], "score": d["score"]} for d in reranked[:3]
|
|
||||||
]
|
|
||||||
|
|
||||||
if audio_b64:
|
if audio_b64:
|
||||||
result["audio"] = audio_b64
|
result["audio"] = audio_b64
|
||||||
|
|
||||||
logger.info(f"Completed request {request_id}")
|
logger.info(f"Completed request {request_id} (rag={used_rag})")
|
||||||
|
|
||||||
# Publish to response subject
|
# Publish to the response subject the frontend is waiting on
|
||||||
response_subject = f"ai.chat.response.{request_id}"
|
|
||||||
await self.nats.publish(response_subject, result)
|
await self.nats.publish(response_subject, result)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
@@ -173,12 +222,17 @@ class ChatHandler(Handler):
|
|||||||
with create_span("chat.embedding"):
|
with create_span("chat.embedding"):
|
||||||
return await self.embeddings.embed_single(text)
|
return await self.embeddings.embed_single(text)
|
||||||
|
|
||||||
async def _search_context(self, embedding: list[float], collection: str) -> list[dict]:
|
async def _search_context(
|
||||||
|
self,
|
||||||
|
embedding: list[float],
|
||||||
|
collection: str,
|
||||||
|
top_k: int | None = None,
|
||||||
|
) -> list[dict]:
|
||||||
"""Search Milvus for relevant documents."""
|
"""Search Milvus for relevant documents."""
|
||||||
with create_span("chat.search"):
|
with create_span("chat.search"):
|
||||||
return await self.milvus.search_with_texts(
|
return await self.milvus.search_with_texts(
|
||||||
embedding,
|
embedding,
|
||||||
limit=self.chat_settings.rag_top_k,
|
limit=top_k or self.chat_settings.rag_top_k,
|
||||||
text_field="text",
|
text_field="text",
|
||||||
metadata_fields=["source", "title"],
|
metadata_fields=["source", "title"],
|
||||||
)
|
)
|
||||||
@@ -202,10 +256,10 @@ class ChatHandler(Handler):
|
|||||||
async def _generate_response(
|
async def _generate_response(
|
||||||
self,
|
self,
|
||||||
query: str,
|
query: str,
|
||||||
context: str,
|
context: Optional[str] = None,
|
||||||
system_prompt: Optional[str] = None,
|
system_prompt: Optional[str] = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Generate LLM response with context."""
|
"""Generate LLM response, optionally augmented with RAG context."""
|
||||||
with create_span("chat.generate"):
|
with create_span("chat.generate"):
|
||||||
return await self.llm.generate(
|
return await self.llm.generate(
|
||||||
query,
|
query,
|
||||||
@@ -213,6 +267,41 @@ class ChatHandler(Handler):
|
|||||||
system_prompt=system_prompt,
|
system_prompt=system_prompt,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def _publish_streaming_chunks(
|
||||||
|
self,
|
||||||
|
subject: str,
|
||||||
|
request_id: str,
|
||||||
|
full_text: str,
|
||||||
|
) -> None:
|
||||||
|
"""Publish response as streaming chunks for real-time display."""
|
||||||
|
import time
|
||||||
|
|
||||||
|
words = full_text.split(" ")
|
||||||
|
chunk_size = 4
|
||||||
|
for i in range(0, len(words), chunk_size):
|
||||||
|
token_chunk = " ".join(words[i : i + chunk_size])
|
||||||
|
await self.nats.publish(
|
||||||
|
subject,
|
||||||
|
{
|
||||||
|
"request_id": request_id,
|
||||||
|
"type": "chunk",
|
||||||
|
"content": token_chunk,
|
||||||
|
"done": False,
|
||||||
|
"timestamp": time.time(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
# Send done marker
|
||||||
|
await self.nats.publish(
|
||||||
|
subject,
|
||||||
|
{
|
||||||
|
"request_id": request_id,
|
||||||
|
"type": "done",
|
||||||
|
"content": "",
|
||||||
|
"done": True,
|
||||||
|
"timestamp": time.time(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
async def _synthesize_speech(self, text: str) -> str:
|
async def _synthesize_speech(self, text: str) -> str:
|
||||||
"""Synthesize speech and return base64 encoded audio."""
|
"""Synthesize speech and return base64 encoded audio."""
|
||||||
with create_span("chat.tts"):
|
with create_span("chat.tts"):
|
||||||
|
|||||||
7
renovate.json
Normal file
7
renovate.json
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
{
|
||||||
|
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
|
||||||
|
"extends": [
|
||||||
|
"local>daviestechlabs/renovate-config",
|
||||||
|
"local>daviestechlabs/renovate-config:python"
|
||||||
|
]
|
||||||
|
}
|
||||||
@@ -53,7 +53,7 @@ def sample_reranked():
|
|||||||
def mock_nats_message():
|
def mock_nats_message():
|
||||||
"""Create a mock NATS message."""
|
"""Create a mock NATS message."""
|
||||||
msg = MagicMock()
|
msg = MagicMock()
|
||||||
msg.subject = "ai.chat.request"
|
msg.subject = "ai.chat.user.test-user-1.message"
|
||||||
msg.reply = "ai.chat.response.test-123"
|
msg.reply = "ai.chat.response.test-123"
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
@@ -63,7 +63,13 @@ def mock_chat_request():
|
|||||||
"""Sample chat request payload."""
|
"""Sample chat request payload."""
|
||||||
return {
|
return {
|
||||||
"request_id": "test-request-123",
|
"request_id": "test-request-123",
|
||||||
"query": "What is machine learning?",
|
"user_id": "test-user-1",
|
||||||
|
"username": "testuser",
|
||||||
|
"message": "What is machine learning?",
|
||||||
|
"premium": True,
|
||||||
|
"enable_rag": True,
|
||||||
|
"enable_reranker": True,
|
||||||
|
"enable_streaming": False,
|
||||||
"collection": "test_collection",
|
"collection": "test_collection",
|
||||||
"enable_tts": False,
|
"enable_tts": False,
|
||||||
"system_prompt": None,
|
"system_prompt": None,
|
||||||
@@ -75,7 +81,13 @@ def mock_chat_request_with_tts():
|
|||||||
"""Sample chat request with TTS enabled."""
|
"""Sample chat request with TTS enabled."""
|
||||||
return {
|
return {
|
||||||
"request_id": "test-request-456",
|
"request_id": "test-request-456",
|
||||||
"query": "Tell me about AI",
|
"user_id": "test-user-2",
|
||||||
|
"username": "testuser2",
|
||||||
|
"message": "Tell me about AI",
|
||||||
|
"premium": True,
|
||||||
|
"enable_rag": True,
|
||||||
|
"enable_reranker": True,
|
||||||
|
"enable_streaming": False,
|
||||||
"collection": "documents",
|
"collection": "documents",
|
||||||
"enable_tts": True,
|
"enable_tts": True,
|
||||||
"system_prompt": "You are a helpful assistant.",
|
"system_prompt": "You are a helpful assistant.",
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ class TestChatHandler:
|
|||||||
|
|
||||||
def test_init(self, handler):
|
def test_init(self, handler):
|
||||||
"""Test handler initialization."""
|
"""Test handler initialization."""
|
||||||
assert handler.subject == "ai.chat.request"
|
assert handler.subject == "ai.chat.user.*.message"
|
||||||
assert handler.queue_group == "chat-handlers"
|
assert handler.queue_group == "chat-handlers"
|
||||||
assert handler.chat_settings.service_name == "chat-handler"
|
assert handler.chat_settings.service_name == "chat-handler"
|
||||||
|
|
||||||
@@ -111,12 +111,15 @@ class TestChatHandler:
|
|||||||
result = await handler.handle_message(mock_nats_message, mock_chat_request)
|
result = await handler.handle_message(mock_nats_message, mock_chat_request)
|
||||||
|
|
||||||
# Verify
|
# Verify
|
||||||
assert result["request_id"] == "test-request-123"
|
assert result["user_id"] == "test-user-1"
|
||||||
|
assert result["success"] is True
|
||||||
assert "response" in result
|
assert "response" in result
|
||||||
assert result["response"] == "Machine learning is a subset of AI that..."
|
assert result["response"] == "Machine learning is a subset of AI that..."
|
||||||
assert "sources" in result # include_sources is True by default
|
assert result["response_text"] == result["response"]
|
||||||
|
assert result["used_rag"] is True
|
||||||
|
assert isinstance(result["rag_sources"], list)
|
||||||
|
|
||||||
# Verify pipeline was called
|
# Verify RAG pipeline was called (enable_rag=True in fixture)
|
||||||
handler.embeddings.embed_single.assert_called_once()
|
handler.embeddings.embed_single.assert_called_once()
|
||||||
handler.milvus.search_with_texts.assert_called_once()
|
handler.milvus.search_with_texts.assert_called_once()
|
||||||
handler.reranker.rerank.assert_called_once()
|
handler.reranker.rerank.assert_called_once()
|
||||||
@@ -142,7 +145,9 @@ class TestChatHandler:
|
|||||||
|
|
||||||
result = await handler.handle_message(mock_nats_message, mock_chat_request)
|
result = await handler.handle_message(mock_nats_message, mock_chat_request)
|
||||||
|
|
||||||
assert "sources" not in result
|
# New response format doesn't have a separate "sources" key;
|
||||||
|
# rag_sources is always present (may be empty)
|
||||||
|
assert "rag_sources" in result
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_handle_message_with_tts(
|
async def test_handle_message_with_tts(
|
||||||
@@ -180,7 +185,10 @@ class TestChatHandler:
|
|||||||
"""Test LLM is called with custom system prompt."""
|
"""Test LLM is called with custom system prompt."""
|
||||||
request = {
|
request = {
|
||||||
"request_id": "test-123",
|
"request_id": "test-123",
|
||||||
"query": "Hello",
|
"user_id": "user-42",
|
||||||
|
"message": "Hello",
|
||||||
|
"premium": True,
|
||||||
|
"enable_rag": True,
|
||||||
"system_prompt": "You are a pirate. Respond like one.",
|
"system_prompt": "You are a pirate. Respond like one.",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user