4 Commits

Author SHA1 Message Date
a1cf87909d fixing ruff suggestions and tests needed updating.
All checks were successful
CI / Lint (push) Successful in 1m39s
CI / Test (push) Successful in 1m37s
CI / Release (push) Successful in 6s
CI / Notify (push) Successful in 1s
2026-02-18 07:37:13 -05:00
24a4098c9a fixing up chat-handler.
Some checks failed
CI / Lint (push) Failing after 1m39s
CI / Test (push) Failing after 1m37s
CI / Release (push) Has been skipped
CI / Notify (push) Successful in 1s
2026-02-18 07:29:41 -05:00
b34e8d2e1c fix: replace astral-sh/setup-uv action with shell install
All checks were successful
CI / Lint (push) Successful in 58s
CI / Test (push) Successful in 1m50s
CI / Release (push) Successful in 25s
CI / Notify (push) Successful in 1s
The JS-based GitHub Action doesn't work on Gitea's act runner.
Use curl installer + GITHUB_PATH instead.
2026-02-13 19:40:53 -05:00
3a4a13f0de chore: add Renovate config for automated dependency updates
All checks were successful
CI / Notify (push) Successful in 1s
CI / Lint (push) Successful in 1m8s
CI / Test (push) Successful in 2m0s
CI / Release (push) Successful in 4s
Ref: ADR-0057
2026-02-13 15:33:43 -05:00
5 changed files with 170 additions and 60 deletions

View File

@@ -18,10 +18,7 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Set up uv - name: Set up uv
uses: astral-sh/setup-uv@v7 run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
with:
version: "latest"
activate-environment: false
- name: Set up Python - name: Set up Python
run: uv python install 3.13 run: uv python install 3.13
@@ -43,10 +40,7 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Set up uv - name: Set up uv
uses: astral-sh/setup-uv@v7 run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
with:
version: "latest"
activate-environment: false
- name: Set up Python - name: Set up Python
run: uv python install 3.13 run: uv python install 3.13

View File

@@ -3,13 +3,12 @@
Chat Handler Service (Refactored) Chat Handler Service (Refactored)
Text-based chat pipeline using handler-base: Text-based chat pipeline using handler-base:
1. Listen for text on NATS subject "ai.chat.request" 1. Listen for text on NATS subject "ai.chat.user.*.message"
2. Generate embeddings for RAG 2. If RAG enabled (premium/explicit): embed → Milvus search → rerank
3. Retrieve context from Milvus 3. Generate response with vLLM (with or without RAG context)
4. Rerank with BGE reranker 4. Optionally stream chunks to "ai.chat.response.stream.{request_id}"
5. Generate response with vLLM 5. Optionally synthesize speech with XTTS
6. Optionally synthesize speech with XTTS 6. Publish result to "ai.chat.response.{request_id}" (or custom response_subject)
7. Publish result to NATS "ai.chat.response.{request_id}"
""" """
import base64 import base64
@@ -51,28 +50,38 @@ class ChatHandler(Handler):
""" """
Chat request handler with RAG pipeline. Chat request handler with RAG pipeline.
Request format: Subscribes to: ai.chat.user.*.message (JetStream durable "chat-handler")
Request format (msgpack):
{ {
"request_id": "uuid", "request_id": "uuid",
"query": "user question", "user_id": "user-123",
"collection": "optional collection name", "username": "john_doe",
"enable_tts": false, "message": "user question",
"premium": false,
"enable_rag": true,
"enable_reranker": true,
"enable_streaming": true,
"top_k": 5,
"session_id": "session-abc",
"system_prompt": "optional custom system prompt" "system_prompt": "optional custom system prompt"
} }
Response format: Response format (msgpack):
{ {
"request_id": "uuid", "user_id": "user-123",
"response": "generated response", "response": "generated response",
"sources": [{"text": "...", "score": 0.95}], "response_text": "generated response",
"audio": "base64 encoded audio (if tts enabled)" "used_rag": true,
"rag_sources": ["source1", "source2"],
"success": true
} }
""" """
def __init__(self): def __init__(self):
self.chat_settings = ChatSettings() self.chat_settings = ChatSettings()
super().__init__( super().__init__(
subject="ai.chat.request", subject="ai.chat.user.*.message",
settings=self.chat_settings, settings=self.chat_settings,
queue_group="chat-handlers", queue_group="chat-handlers",
) )
@@ -114,56 +123,96 @@ class ChatHandler(Handler):
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]: async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
"""Handle incoming chat request.""" """Handle incoming chat request."""
request_id = data.get("request_id", "unknown") request_id = data.get("request_id", "unknown")
query = data.get("query", "") user_id = data.get("user_id", "unknown")
query = data.get("message", "") or data.get("query", "")
premium = data.get("premium", False)
enable_rag = data.get("enable_rag", premium)
enable_reranker = data.get("enable_reranker", enable_rag)
enable_streaming = data.get("enable_streaming", False)
top_k = data.get("top_k", self.chat_settings.rag_top_k)
collection = data.get("collection", self.chat_settings.rag_collection) collection = data.get("collection", self.chat_settings.rag_collection)
enable_tts = data.get("enable_tts", self.chat_settings.enable_tts) enable_tts = data.get("enable_tts", self.chat_settings.enable_tts)
system_prompt = data.get("system_prompt") system_prompt = data.get("system_prompt")
# companions-frontend may set a custom response subject
response_subject = data.get("response_subject", f"ai.chat.response.{request_id}")
logger.info(f"Processing request {request_id}: {query[:50]}...") logger.info(f"Processing request {request_id}: {query[:50]}...")
with create_span("chat.process") as span: with create_span("chat.process") as span:
if span: if span:
span.set_attribute("request.id", request_id) span.set_attribute("request.id", request_id)
span.set_attribute("user.id", user_id)
span.set_attribute("query.length", len(query)) span.set_attribute("query.length", len(query))
span.set_attribute("premium", premium)
span.set_attribute("rag.enabled", enable_rag)
# 1. Generate query embedding context = ""
embedding = await self._get_embedding(query) rag_sources: list[str] = []
used_rag = False
# 2. Search Milvus for context # Only run RAG pipeline when enabled (premium users or explicit flag)
documents = await self._search_context(embedding, collection) if enable_rag:
# 1. Generate query embedding
embedding = await self._get_embedding(query)
# 3. Rerank documents # 2. Search Milvus for context
reranked = await self._rerank_documents(query, documents) documents = await self._search_context(
embedding,
collection,
top_k=top_k,
)
# 4. Build context from top documents # 3. Optionally rerank documents
context = self._build_context(reranked) if enable_reranker and documents:
reranked = await self._rerank_documents(query, documents)
else:
reranked = documents
# 5. Generate LLM response # 4. Build context from top documents
response_text = await self._generate_response(query, context, system_prompt) if reranked:
context = self._build_context(reranked)
rag_sources = [
d.get("source", d.get("document", "")[:80]) for d in reranked[:3]
]
used_rag = True
# 6. Optionally synthesize speech # 5. Generate LLM response (with or without RAG context)
response_text = await self._generate_response(
query,
context or None,
system_prompt,
)
# 6. Stream response chunks if requested
if enable_streaming:
stream_subject = f"ai.chat.response.stream.{request_id}"
await self._publish_streaming_chunks(
stream_subject,
request_id,
response_text,
)
# 7. Optionally synthesize speech
audio_b64 = None audio_b64 = None
if enable_tts and self.tts: if enable_tts and self.tts:
audio_b64 = await self._synthesize_speech(response_text) audio_b64 = await self._synthesize_speech(response_text)
# Build response # Build response (compatible with companions-frontend NATSChatResponse)
result = { result: dict[str, Any] = {
"request_id": request_id, "user_id": user_id,
"response": response_text, "response": response_text,
"response_text": response_text,
"used_rag": used_rag,
"rag_sources": rag_sources,
"success": True,
} }
if self.chat_settings.include_sources:
result["sources"] = [
{"text": d["document"][:200], "score": d["score"]} for d in reranked[:3]
]
if audio_b64: if audio_b64:
result["audio"] = audio_b64 result["audio"] = audio_b64
logger.info(f"Completed request {request_id}") logger.info(f"Completed request {request_id} (rag={used_rag})")
# Publish to response subject # Publish to the response subject the frontend is waiting on
response_subject = f"ai.chat.response.{request_id}"
await self.nats.publish(response_subject, result) await self.nats.publish(response_subject, result)
return result return result
@@ -173,12 +222,17 @@ class ChatHandler(Handler):
with create_span("chat.embedding"): with create_span("chat.embedding"):
return await self.embeddings.embed_single(text) return await self.embeddings.embed_single(text)
async def _search_context(self, embedding: list[float], collection: str) -> list[dict]: async def _search_context(
self,
embedding: list[float],
collection: str,
top_k: int | None = None,
) -> list[dict]:
"""Search Milvus for relevant documents.""" """Search Milvus for relevant documents."""
with create_span("chat.search"): with create_span("chat.search"):
return await self.milvus.search_with_texts( return await self.milvus.search_with_texts(
embedding, embedding,
limit=self.chat_settings.rag_top_k, limit=top_k or self.chat_settings.rag_top_k,
text_field="text", text_field="text",
metadata_fields=["source", "title"], metadata_fields=["source", "title"],
) )
@@ -202,10 +256,10 @@ class ChatHandler(Handler):
async def _generate_response( async def _generate_response(
self, self,
query: str, query: str,
context: str, context: Optional[str] = None,
system_prompt: Optional[str] = None, system_prompt: Optional[str] = None,
) -> str: ) -> str:
"""Generate LLM response with context.""" """Generate LLM response, optionally augmented with RAG context."""
with create_span("chat.generate"): with create_span("chat.generate"):
return await self.llm.generate( return await self.llm.generate(
query, query,
@@ -213,6 +267,41 @@ class ChatHandler(Handler):
system_prompt=system_prompt, system_prompt=system_prompt,
) )
async def _publish_streaming_chunks(
self,
subject: str,
request_id: str,
full_text: str,
) -> None:
"""Publish response as streaming chunks for real-time display."""
import time
words = full_text.split(" ")
chunk_size = 4
for i in range(0, len(words), chunk_size):
token_chunk = " ".join(words[i : i + chunk_size])
await self.nats.publish(
subject,
{
"request_id": request_id,
"type": "chunk",
"content": token_chunk,
"done": False,
"timestamp": time.time(),
},
)
# Send done marker
await self.nats.publish(
subject,
{
"request_id": request_id,
"type": "done",
"content": "",
"done": True,
"timestamp": time.time(),
},
)
async def _synthesize_speech(self, text: str) -> str: async def _synthesize_speech(self, text: str) -> str:
"""Synthesize speech and return base64 encoded audio.""" """Synthesize speech and return base64 encoded audio."""
with create_span("chat.tts"): with create_span("chat.tts"):

7
renovate.json Normal file
View File

@@ -0,0 +1,7 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"local>daviestechlabs/renovate-config",
"local>daviestechlabs/renovate-config:python"
]
}

View File

@@ -53,7 +53,7 @@ def sample_reranked():
def mock_nats_message(): def mock_nats_message():
"""Create a mock NATS message.""" """Create a mock NATS message."""
msg = MagicMock() msg = MagicMock()
msg.subject = "ai.chat.request" msg.subject = "ai.chat.user.test-user-1.message"
msg.reply = "ai.chat.response.test-123" msg.reply = "ai.chat.response.test-123"
return msg return msg
@@ -63,7 +63,13 @@ def mock_chat_request():
"""Sample chat request payload.""" """Sample chat request payload."""
return { return {
"request_id": "test-request-123", "request_id": "test-request-123",
"query": "What is machine learning?", "user_id": "test-user-1",
"username": "testuser",
"message": "What is machine learning?",
"premium": True,
"enable_rag": True,
"enable_reranker": True,
"enable_streaming": False,
"collection": "test_collection", "collection": "test_collection",
"enable_tts": False, "enable_tts": False,
"system_prompt": None, "system_prompt": None,
@@ -75,7 +81,13 @@ def mock_chat_request_with_tts():
"""Sample chat request with TTS enabled.""" """Sample chat request with TTS enabled."""
return { return {
"request_id": "test-request-456", "request_id": "test-request-456",
"query": "Tell me about AI", "user_id": "test-user-2",
"username": "testuser2",
"message": "Tell me about AI",
"premium": True,
"enable_rag": True,
"enable_reranker": True,
"enable_streaming": False,
"collection": "documents", "collection": "documents",
"enable_tts": True, "enable_tts": True,
"system_prompt": "You are a helpful assistant.", "system_prompt": "You are a helpful assistant.",

View File

@@ -86,7 +86,7 @@ class TestChatHandler:
def test_init(self, handler): def test_init(self, handler):
"""Test handler initialization.""" """Test handler initialization."""
assert handler.subject == "ai.chat.request" assert handler.subject == "ai.chat.user.*.message"
assert handler.queue_group == "chat-handlers" assert handler.queue_group == "chat-handlers"
assert handler.chat_settings.service_name == "chat-handler" assert handler.chat_settings.service_name == "chat-handler"
@@ -111,12 +111,15 @@ class TestChatHandler:
result = await handler.handle_message(mock_nats_message, mock_chat_request) result = await handler.handle_message(mock_nats_message, mock_chat_request)
# Verify # Verify
assert result["request_id"] == "test-request-123" assert result["user_id"] == "test-user-1"
assert result["success"] is True
assert "response" in result assert "response" in result
assert result["response"] == "Machine learning is a subset of AI that..." assert result["response"] == "Machine learning is a subset of AI that..."
assert "sources" in result # include_sources is True by default assert result["response_text"] == result["response"]
assert result["used_rag"] is True
assert isinstance(result["rag_sources"], list)
# Verify pipeline was called # Verify RAG pipeline was called (enable_rag=True in fixture)
handler.embeddings.embed_single.assert_called_once() handler.embeddings.embed_single.assert_called_once()
handler.milvus.search_with_texts.assert_called_once() handler.milvus.search_with_texts.assert_called_once()
handler.reranker.rerank.assert_called_once() handler.reranker.rerank.assert_called_once()
@@ -142,7 +145,9 @@ class TestChatHandler:
result = await handler.handle_message(mock_nats_message, mock_chat_request) result = await handler.handle_message(mock_nats_message, mock_chat_request)
assert "sources" not in result # New response format doesn't have a separate "sources" key;
# rag_sources is always present (may be empty)
assert "rag_sources" in result
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_handle_message_with_tts( async def test_handle_message_with_tts(
@@ -180,7 +185,10 @@ class TestChatHandler:
"""Test LLM is called with custom system prompt.""" """Test LLM is called with custom system prompt."""
request = { request = {
"request_id": "test-123", "request_id": "test-123",
"query": "Hello", "user_id": "user-42",
"message": "Hello",
"premium": True,
"enable_rag": True,
"system_prompt": "You are a pirate. Respond like one.", "system_prompt": "You are a pirate. Respond like one.",
} }