feature/go-handler-refactor #1

Merged
billy merged 3 commits from feature/go-handler-refactor into main 2026-02-20 12:33:33 +00:00
16 changed files with 953 additions and 3402 deletions

9
.dockerignore Normal file
View File

@@ -0,0 +1,9 @@
.git
.gitignore
*.md
LICENSE
renovate.json
*_test.go
e2e_test.go
__pycache__
.env*

View File

@@ -0,0 +1,206 @@
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
REGISTRY: gitea-http.gitea.svc.cluster.local:3000/daviestechlabs
REGISTRY_HOST: gitea-http.gitea.svc.cluster.local:3000
IMAGE_NAME: chat-handler
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Run go vet
run: go vet ./...
- name: Install golangci-lint
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b "$(go env GOPATH)/bin"
echo "$(go env GOPATH)/bin" >> $GITHUB_PATH
- name: Run golangci-lint
run: golangci-lint run ./...
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Verify dependencies
run: go mod verify
- name: Build
run: go build -v ./...
- name: Run tests
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./...
release:
name: Release
runs-on: ubuntu-latest
needs: [lint, test]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
outputs:
version: ${{ steps.version.outputs.version }}
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Determine version bump
id: version
run: |
# Get latest tag or default to v0.0.0
LATEST=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
VERSION=${LATEST#v}
IFS='.' read -r MAJOR MINOR PATCH <<< "$VERSION"
# Check commit message for keywords
MSG="${{ gitea.event.head_commit.message }}"
if echo "$MSG" | grep -qiE "^major:|BREAKING CHANGE"; then
MAJOR=$((MAJOR + 1)); MINOR=0; PATCH=0
BUMP="major"
elif echo "$MSG" | grep -qiE "^(minor:|feat:)"; then
MINOR=$((MINOR + 1)); PATCH=0
BUMP="minor"
else
PATCH=$((PATCH + 1))
BUMP="patch"
fi
NEW_VERSION="v${MAJOR}.${MINOR}.${PATCH}"
echo "version=$NEW_VERSION" >> $GITHUB_OUTPUT
echo "bump=$BUMP" >> $GITHUB_OUTPUT
echo "Bumping $LATEST → $NEW_VERSION ($BUMP)"
- name: Create and push tag
run: |
git config user.name "gitea-actions[bot]"
git config user.email "actions@git.daviestechlabs.io"
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
git push origin ${{ steps.version.outputs.version }}
docker:
name: Docker Build & Push
runs-on: ubuntu-latest
needs: [lint, test, release]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
buildkitd-config-inline: |
[registry."gitea-http.gitea.svc.cluster.local:3000"]
http = true
insecure = true
- name: Login to Docker Hub
if: vars.DOCKERHUB_USERNAME != ''
uses: docker/login-action@v3
with:
username: ${{ vars.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Configure Docker for insecure registry
run: |
sudo mkdir -p /etc/docker
echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json
sudo systemctl restart docker || sudo service docker restart || true
sleep 2
- name: Login to Gitea Registry
run: |
AUTH=$(echo -n "${{ secrets.REGISTRY_USER }}:${{ secrets.REGISTRY_TOKEN }}" | base64 -w0)
mkdir -p ~/.docker
cat > ~/.docker/config.json << EOF
{
"auths": {
"${{ env.REGISTRY_HOST }}": {
"auth": "$AUTH"
}
}
}
EOF
echo "Auth configured for ${{ env.REGISTRY_HOST }}"
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=semver,pattern={{version}},value=${{ needs.release.outputs.version }}
type=semver,pattern={{major}}.{{minor}},value=${{ needs.release.outputs.version }}
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
notify:
name: Notify
runs-on: ubuntu-latest
needs: [lint, test, release, docker]
if: always()
steps:
- name: Notify on success
if: needs.lint.result == 'success' && needs.test.result == 'success'
run: |
curl -s \
-H "Title: ✅ CI Passed: ${{ gitea.repository }}" \
-H "Priority: default" \
-H "Tags: white_check_mark,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Release: ${{ needs.release.result == 'success' && needs.release.outputs.version || 'skipped' }}
Docker: ${{ needs.docker.result }}" \
${{ env.NTFY_URL }}/gitea-ci
- name: Notify on failure
if: needs.lint.result == 'failure' || needs.test.result == 'failure'
run: |
curl -s \
-H "Title: ❌ CI Failed: ${{ gitea.repository }}" \
-H "Priority: high" \
-H "Tags: x,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Lint: ${{ needs.lint.result }}
Test: ${{ needs.test.result }}" \
${{ env.NTFY_URL }}/gitea-ci

1
.gitignore vendored
View File

@@ -24,3 +24,4 @@ ENV/
.env
.env.local
*.log
chat-handler

View File

@@ -1,32 +0,0 @@
# Pre-commit hooks for chat-handler
# Install: pip install pre-commit && pre-commit install
# Run: pre-commit run --all-files
repos:
# Ruff - fast Python linter and formatter
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.4.4
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- id: ruff-format
# Standard pre-commit hooks
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
args: [--maxkb=500]
- id: check-merge-conflict
- id: detect-private-key
# Type checking (optional - uncomment when ready)
# - repo: https://github.com/pre-commit/mirrors-mypy
# rev: v1.10.0
# hooks:
# - id: mypy
# additional_dependencies: [types-all]
# args: [--ignore-missing-imports]

View File

@@ -1,9 +1,23 @@
# Chat Handler - Using handler-base
ARG BASE_TAG=latest
FROM ghcr.io/billy-davies-2/handler-base:${BASE_TAG}
# Build stage
FROM golang:1.25-alpine AS builder
WORKDIR /app
COPY chat_handler.py .
RUN apk add --no-cache ca-certificates
CMD ["python", "chat_handler.py"]
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOAMD64=v3 go build -ldflags="-w -s" -o /chat-handler .
# Runtime stage
FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /chat-handler /chat-handler
USER 65534:65534
ENTRYPOINT ["/chat-handler"]

View File

@@ -1,316 +0,0 @@
#!/usr/bin/env python3
"""
Chat Handler Service (Refactored)
Text-based chat pipeline using handler-base:
1. Listen for text on NATS subject "ai.chat.user.*.message"
2. If RAG enabled (premium/explicit): embed → Milvus search → rerank
3. Generate response with vLLM (with or without RAG context)
4. Optionally stream chunks to "ai.chat.response.stream.{request_id}"
5. Optionally synthesize speech with XTTS
6. Publish result to "ai.chat.response.{request_id}" (or custom response_subject)
"""
import base64
import logging
from typing import Any, Optional
from nats.aio.msg import Msg
from handler_base import Handler, Settings
from handler_base.clients import (
EmbeddingsClient,
RerankerClient,
LLMClient,
TTSClient,
MilvusClient,
)
from handler_base.telemetry import create_span
logger = logging.getLogger("chat-handler")
class ChatSettings(Settings):
"""Chat handler specific settings."""
service_name: str = "chat-handler"
# RAG settings
rag_top_k: int = 10
rag_rerank_top_k: int = 5
rag_collection: str = "documents"
# Response settings
include_sources: bool = True
enable_tts: bool = False
tts_language: str = "en"
class ChatHandler(Handler):
"""
Chat request handler with RAG pipeline.
Subscribes to: ai.chat.user.*.message (JetStream durable "chat-handler")
Request format (msgpack):
{
"request_id": "uuid",
"user_id": "user-123",
"username": "john_doe",
"message": "user question",
"premium": false,
"enable_rag": true,
"enable_reranker": true,
"enable_streaming": true,
"top_k": 5,
"session_id": "session-abc",
"system_prompt": "optional custom system prompt"
}
Response format (msgpack):
{
"user_id": "user-123",
"response": "generated response",
"response_text": "generated response",
"used_rag": true,
"rag_sources": ["source1", "source2"],
"success": true
}
"""
def __init__(self):
self.chat_settings = ChatSettings()
super().__init__(
subject="ai.chat.user.*.message",
settings=self.chat_settings,
queue_group="chat-handlers",
)
async def setup(self) -> None:
"""Initialize service clients."""
logger.info("Initializing service clients...")
self.embeddings = EmbeddingsClient(self.chat_settings)
self.reranker = RerankerClient(self.chat_settings)
self.llm = LLMClient(self.chat_settings)
self.milvus = MilvusClient(self.chat_settings)
# TTS is optional
if self.chat_settings.enable_tts:
self.tts = TTSClient(self.chat_settings)
else:
self.tts = None
# Connect to Milvus
await self.milvus.connect(self.chat_settings.rag_collection)
logger.info("Service clients initialized")
async def teardown(self) -> None:
"""Clean up service clients."""
logger.info("Closing service clients...")
await self.embeddings.close()
await self.reranker.close()
await self.llm.close()
await self.milvus.close()
if self.tts:
await self.tts.close()
logger.info("Service clients closed")
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
"""Handle incoming chat request."""
request_id = data.get("request_id", "unknown")
user_id = data.get("user_id", "unknown")
query = data.get("message", "") or data.get("query", "")
premium = data.get("premium", False)
enable_rag = data.get("enable_rag", premium)
enable_reranker = data.get("enable_reranker", enable_rag)
enable_streaming = data.get("enable_streaming", False)
top_k = data.get("top_k", self.chat_settings.rag_top_k)
collection = data.get("collection", self.chat_settings.rag_collection)
enable_tts = data.get("enable_tts", self.chat_settings.enable_tts)
system_prompt = data.get("system_prompt")
# companions-frontend may set a custom response subject
response_subject = data.get("response_subject", f"ai.chat.response.{request_id}")
logger.info(f"Processing request {request_id}: {query[:50]}...")
with create_span("chat.process") as span:
if span:
span.set_attribute("request.id", request_id)
span.set_attribute("user.id", user_id)
span.set_attribute("query.length", len(query))
span.set_attribute("premium", premium)
span.set_attribute("rag.enabled", enable_rag)
context = ""
rag_sources: list[str] = []
used_rag = False
# Only run RAG pipeline when enabled (premium users or explicit flag)
if enable_rag:
# 1. Generate query embedding
embedding = await self._get_embedding(query)
# 2. Search Milvus for context
documents = await self._search_context(
embedding,
collection,
top_k=top_k,
)
# 3. Optionally rerank documents
if enable_reranker and documents:
reranked = await self._rerank_documents(query, documents)
else:
reranked = documents
# 4. Build context from top documents
if reranked:
context = self._build_context(reranked)
rag_sources = [
d.get("source", d.get("document", "")[:80]) for d in reranked[:3]
]
used_rag = True
# 5. Generate LLM response (with or without RAG context)
response_text = await self._generate_response(
query,
context or None,
system_prompt,
)
# 6. Stream response chunks if requested
if enable_streaming:
stream_subject = f"ai.chat.response.stream.{request_id}"
await self._publish_streaming_chunks(
stream_subject,
request_id,
response_text,
)
# 7. Optionally synthesize speech
audio_b64 = None
if enable_tts and self.tts:
audio_b64 = await self._synthesize_speech(response_text)
# Build response (compatible with companions-frontend NATSChatResponse)
result: dict[str, Any] = {
"user_id": user_id,
"response": response_text,
"response_text": response_text,
"used_rag": used_rag,
"rag_sources": rag_sources,
"success": True,
}
if audio_b64:
result["audio"] = audio_b64
logger.info(f"Completed request {request_id} (rag={used_rag})")
# Publish to the response subject the frontend is waiting on
await self.nats.publish(response_subject, result)
return result
async def _get_embedding(self, text: str) -> list[float]:
"""Generate embedding for query text."""
with create_span("chat.embedding"):
return await self.embeddings.embed_single(text)
async def _search_context(
self,
embedding: list[float],
collection: str,
top_k: int | None = None,
) -> list[dict]:
"""Search Milvus for relevant documents."""
with create_span("chat.search"):
return await self.milvus.search_with_texts(
embedding,
limit=top_k or self.chat_settings.rag_top_k,
text_field="text",
metadata_fields=["source", "title"],
)
async def _rerank_documents(self, query: str, documents: list[dict]) -> list[dict]:
"""Rerank documents by relevance to query."""
with create_span("chat.rerank"):
texts = [d.get("text", "") for d in documents]
return await self.reranker.rerank(
query, texts, top_k=self.chat_settings.rag_rerank_top_k
)
def _build_context(self, documents: list[dict]) -> str:
"""Build context string from ranked documents."""
context_parts = []
for i, doc in enumerate(documents, 1):
text = doc.get("document", "")
context_parts.append(f"[{i}] {text}")
return "\n\n".join(context_parts)
async def _generate_response(
self,
query: str,
context: Optional[str] = None,
system_prompt: Optional[str] = None,
) -> str:
"""Generate LLM response, optionally augmented with RAG context."""
with create_span("chat.generate"):
return await self.llm.generate(
query,
context=context,
system_prompt=system_prompt,
)
async def _publish_streaming_chunks(
self,
subject: str,
request_id: str,
full_text: str,
) -> None:
"""Publish response as streaming chunks for real-time display."""
import time
words = full_text.split(" ")
chunk_size = 4
for i in range(0, len(words), chunk_size):
token_chunk = " ".join(words[i : i + chunk_size])
await self.nats.publish(
subject,
{
"request_id": request_id,
"type": "chunk",
"content": token_chunk,
"done": False,
"timestamp": time.time(),
},
)
# Send done marker
await self.nats.publish(
subject,
{
"request_id": request_id,
"type": "done",
"content": "",
"done": True,
"timestamp": time.time(),
},
)
async def _synthesize_speech(self, text: str) -> str:
"""Synthesize speech and return base64 encoded audio."""
with create_span("chat.tts"):
audio_bytes = await self.tts.synthesize(
text,
language=self.chat_settings.tts_language,
)
return base64.b64encode(audio_bytes).decode()
if __name__ == "__main__":
ChatHandler().run()

247
e2e_test.go Normal file
View File

@@ -0,0 +1,247 @@
package main
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"git.daviestechlabs.io/daviestechlabs/handler-base/clients"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"github.com/vmihailenco/msgpack/v5"
)
// ────────────────────────────────────────────────────────────────────────────
// E2E tests: exercise the full chat pipeline with mock backends
// ────────────────────────────────────────────────────────────────────────────
// mockBackends starts httptest servers simulating all downstream services.
type mockBackends struct {
Embeddings *httptest.Server
Reranker *httptest.Server
LLM *httptest.Server
TTS *httptest.Server
}
func newMockBackends(t *testing.T) *mockBackends {
t.Helper()
m := &mockBackends{}
m.Embeddings = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(map[string]any{
"data": []map[string]any{
{"embedding": []float64{0.1, 0.2, 0.3, 0.4}},
},
})
}))
t.Cleanup(m.Embeddings.Close)
m.Reranker = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(map[string]any{
"results": []map[string]any{
{"index": 0, "relevance_score": 0.95},
},
})
}))
t.Cleanup(m.Reranker.Close)
m.LLM = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var req map[string]any
json.NewDecoder(r.Body).Decode(&req)
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{
{"message": map[string]any{
"content": "Paris is the capital of France.",
}},
},
})
}))
t.Cleanup(m.LLM.Close)
m.TTS = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte{0xDE, 0xAD, 0xBE, 0xEF})
}))
t.Cleanup(m.TTS.Close)
return m
}
func TestChatPipeline_LLMOnly(t *testing.T) {
m := newMockBackends(t)
llm := clients.NewLLMClient(m.LLM.URL, 5*time.Second)
// Simulate what main.go does for a non-RAG request.
response, err := llm.Generate(context.Background(), "What is the capital of France?", "", "")
if err != nil {
t.Fatal(err)
}
if response != "Paris is the capital of France." {
t.Errorf("response = %q", response)
}
}
func TestChatPipeline_WithRAG(t *testing.T) {
m := newMockBackends(t)
embeddings := clients.NewEmbeddingsClient(m.Embeddings.URL, 5*time.Second, "bge")
reranker := clients.NewRerankerClient(m.Reranker.URL, 5*time.Second)
llm := clients.NewLLMClient(m.LLM.URL, 5*time.Second)
ctx := context.Background()
// 1. Embed query
embedding, err := embeddings.EmbedSingle(ctx, "What is the capital of France?")
if err != nil {
t.Fatal(err)
}
if len(embedding) == 0 {
t.Fatal("empty embedding")
}
// 2. Rerank (with mock documents)
docs := []string{"France is a country in Europe", "Paris is its capital"}
results, err := reranker.Rerank(ctx, "capital of France", docs, 2)
if err != nil {
t.Fatal(err)
}
if len(results) == 0 {
t.Fatal("no rerank results")
}
if results[0].Score == 0 {
t.Error("expected non-zero score")
}
// 3. Generate with context
contextText := results[0].Document
response, err := llm.Generate(ctx, "capital of France?", contextText, "")
if err != nil {
t.Fatal(err)
}
if response == "" {
t.Error("empty response")
}
}
func TestChatPipeline_WithTTS(t *testing.T) {
m := newMockBackends(t)
llm := clients.NewLLMClient(m.LLM.URL, 5*time.Second)
tts := clients.NewTTSClient(m.TTS.URL, 5*time.Second, "en")
ctx := context.Background()
response, err := llm.Generate(ctx, "hello", "", "")
if err != nil {
t.Fatal(err)
}
audio, err := tts.Synthesize(ctx, response, "en", "")
if err != nil {
t.Fatal(err)
}
if len(audio) == 0 {
t.Error("empty audio")
}
}
func TestChatPipeline_LLMTimeout(t *testing.T) {
// Simulate slow LLM.
slow := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(200 * time.Millisecond)
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{
{"message": map[string]any{"content": "late response"}},
},
})
}))
defer slow.Close()
llm := clients.NewLLMClient(slow.URL, 100*time.Millisecond)
_, err := llm.Generate(context.Background(), "hello", "", "")
if err == nil {
t.Error("expected timeout error")
}
}
func TestChatPipeline_TypedDecoding(t *testing.T) {
// Verify typed struct decoding from msgpack (same path as OnTypedMessage).
raw := map[string]any{
"request_id": "req-e2e-001",
"user_id": "user-1",
"message": "hello",
"premium": true,
"enable_rag": false,
"enable_streaming": false,
"system_prompt": "Be brief.",
}
data, _ := msgpack.Marshal(raw)
var req messages.ChatRequest
if err := msgpack.Unmarshal(data, &req); err != nil {
t.Fatal(err)
}
if req.RequestID != "req-e2e-001" {
t.Errorf("RequestID = %q", req.RequestID)
}
if req.UserID != "user-1" {
t.Errorf("UserID = %q", req.UserID)
}
if req.EffectiveQuery() != "hello" {
t.Errorf("query = %q", req.EffectiveQuery())
}
if req.EnableRAG {
t.Error("EnableRAG should be false")
}
if req.SystemPrompt != "Be brief." {
t.Errorf("SystemPrompt = %q", req.SystemPrompt)
}
}
// ────────────────────────────────────────────────────────────────────────────
// Benchmark: full chat pipeline overhead (mock backends)
// ────────────────────────────────────────────────────────────────────────────
func BenchmarkChatPipeline_LLMOnly(b *testing.B) {
llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"choices":[{"message":{"content":"answer"}}]}`))
}))
defer llmSrv.Close()
llm := clients.NewLLMClient(llmSrv.URL, 10*time.Second)
ctx := context.Background()
b.ResetTimer()
for b.Loop() {
llm.Generate(ctx, "question", "", "")
}
}
func BenchmarkChatPipeline_RAGFlow(b *testing.B) {
embedSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"data":[{"embedding":[0.1,0.2]}]}`))
}))
defer embedSrv.Close()
rerankSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"results":[{"index":0,"relevance_score":0.9}]}`))
}))
defer rerankSrv.Close()
llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"choices":[{"message":{"content":"answer"}}]}`))
}))
defer llmSrv.Close()
embed := clients.NewEmbeddingsClient(embedSrv.URL, 10*time.Second, "bge")
rerank := clients.NewRerankerClient(rerankSrv.URL, 10*time.Second)
llm := clients.NewLLMClient(llmSrv.URL, 10*time.Second)
ctx := context.Background()
b.ResetTimer()
for b.Loop() {
embed.EmbedSingle(ctx, "question")
rerank.Rerank(ctx, "question", []string{"doc1", "doc2"}, 2)
llm.Generate(ctx, "question", "context", "")
}
}

43
go.mod Normal file
View File

@@ -0,0 +1,43 @@
module git.daviestechlabs.io/daviestechlabs/chat-handler
go 1.25.1
require (
git.daviestechlabs.io/daviestechlabs/handler-base v0.0.0
github.com/nats-io/nats.go v1.48.0
)
require (
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 // indirect
go.opentelemetry.io/otel/metric v1.40.0 // indirect
go.opentelemetry.io/otel/sdk v1.40.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect
go.opentelemetry.io/otel/trace v1.40.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/grpc v1.78.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)
replace git.daviestechlabs.io/daviestechlabs/handler-base => ../handler-base

79
go.sum Normal file
View File

@@ -0,0 +1,79 @@
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 h1:NOyNnS19BF2SUDApbOKbDtWZ0IK7b8FJ2uAGdIWOGb0=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0/go.mod h1:VL6EgVikRLcJa9ftukrHu/ZkkhFBSo1lzvdBC9CF1ss=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 h1:DvJDOPmSWQHWywQS6lKL+pb8s3gBLOZUtw4N+mavW1I=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0/go.mod h1:EtekO9DEJb4/jRyN4v4Qjc2yA7AtfCBuz2FynRUWTXs=
go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g=
go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc=
go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8=
go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE=
go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw=
go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg=
go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw=
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

256
main.go Normal file
View File

@@ -0,0 +1,256 @@
package main
import (
"context"
"fmt"
"log/slog"
"os"
"strconv"
"strings"
"time"
"github.com/nats-io/nats.go"
"git.daviestechlabs.io/daviestechlabs/handler-base/clients"
"git.daviestechlabs.io/daviestechlabs/handler-base/config"
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
)
func main() {
cfg := config.Load()
cfg.ServiceName = "chat-handler"
cfg.NATSQueueGroup = "chat-handlers"
// Chat-specific settings
ragTopK := getEnvInt("RAG_TOP_K", 10)
ragRerankTopK := getEnvInt("RAG_RERANK_TOP_K", 5)
ragCollection := getEnv("RAG_COLLECTION", "documents")
includeSources := getEnvBool("INCLUDE_SOURCES", true)
enableTTS := getEnvBool("ENABLE_TTS", false)
ttsLanguage := getEnv("TTS_LANGUAGE", "en")
// Service clients
timeout := 60 * time.Second
embeddings := clients.NewEmbeddingsClient(cfg.EmbeddingsURL(), timeout, "")
reranker := clients.NewRerankerClient(cfg.RerankerURL(), timeout)
llm := clients.NewLLMClient(cfg.LLMURL(), timeout)
milvus := clients.NewMilvusClient(cfg.MilvusHost, cfg.MilvusPort, ragCollection)
var tts *clients.TTSClient
if enableTTS {
tts = clients.NewTTSClient(cfg.TTSURL(), timeout, ttsLanguage)
}
h := handler.New("ai.chat.user.*.message", cfg)
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
req, err := natsutil.Decode[messages.ChatRequest](msg.Data)
if err != nil {
slog.Error("decode failed", "error", err)
return &messages.ErrorResponse{Error: true, Message: err.Error(), Type: "DecodeError"}, nil
}
query := req.EffectiveQuery()
requestID := req.RequestID
if requestID == "" {
requestID = "unknown"
}
userID := req.UserID
if userID == "" {
userID = "unknown"
}
enableRAG := req.EnableRAG
if !enableRAG && req.Premium {
enableRAG = true
}
enableReranker := req.EnableReranker
if !enableReranker && enableRAG {
enableReranker = true
}
topK := req.TopK
if topK == 0 {
topK = ragTopK
}
collection := req.Collection
if collection == "" {
collection = ragCollection
}
reqEnableTTS := req.EnableTTS || enableTTS
systemPrompt := req.SystemPrompt
responseSubject := req.ResponseSubject
if responseSubject == "" {
responseSubject = fmt.Sprintf("ai.chat.response.%s", requestID)
}
slog.Info("processing request", "request_id", requestID, "query_len", len(query))
contextText := ""
var ragSources []string
usedRAG := false
// RAG pipeline
if enableRAG {
// 1. Embed query
embedding, err := embeddings.EmbedSingle(ctx, query)
if err != nil {
slog.Error("embedding failed", "error", err)
} else {
// 2. Search Milvus
_ = milvus
_ = collection
_ = topK
_ = embedding
// NOTE: Milvus search uses the gRPC SDK (requires milvus-sdk-go)
// For now, we pass through without search; Milvus client will be
// connected when the SDK is integrated.
// documents := milvus.Search(ctx, embedding, topK)
var documents []map[string]any // placeholder for Milvus results
// 3. Rerank
if enableReranker && len(documents) > 0 {
texts := make([]string, len(documents))
for i, d := range documents {
if t, ok := d["text"].(string); ok {
texts[i] = t
}
}
reranked, err := reranker.Rerank(ctx, query, texts, ragRerankTopK)
if err != nil {
slog.Error("rerank failed", "error", err)
} else {
documents = make([]map[string]any, len(reranked))
for i, r := range reranked {
documents[i] = map[string]any{"document": r.Document, "score": r.Score}
}
}
}
// 4. Build context
if len(documents) > 0 {
var parts []string
for i, d := range documents {
text := ""
if t, ok := d["document"].(string); ok {
text = t
}
parts = append(parts, fmt.Sprintf("[%d] %s", i+1, text))
}
contextText = strings.Join(parts, "\n\n")
for _, d := range documents {
if len(ragSources) >= 3 {
break
}
src := ""
if s, ok := d["source"].(string); ok {
src = s
} else if s, ok := d["document"].(string); ok && len(s) > 80 {
src = s[:80]
}
ragSources = append(ragSources, src)
}
usedRAG = true
}
}
}
// 5. Generate LLM response
responseText, err := llm.Generate(ctx, query, contextText, systemPrompt)
if err != nil {
slog.Error("LLM generation failed", "error", err)
return &messages.ChatResponse{
UserID: userID,
Success: false,
Error: err.Error(),
}, nil
}
// 6. Stream chunks if requested
if req.EnableStreaming {
streamSubject := fmt.Sprintf("ai.chat.response.stream.%s", requestID)
words := strings.Fields(responseText)
chunkSize := 4
for i := 0; i < len(words); i += chunkSize {
end := i + chunkSize
if end > len(words) {
end = len(words)
}
chunk := strings.Join(words[i:end], " ")
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
RequestID: requestID,
Type: "chunk",
Content: chunk,
Timestamp: messages.Timestamp(),
})
}
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
RequestID: requestID,
Type: "done",
Done: true,
Timestamp: messages.Timestamp(),
})
}
// 7. Optional TTS — audio as raw bytes (no base64)
var audio []byte
if reqEnableTTS && tts != nil {
audioBytes, err := tts.Synthesize(ctx, responseText, ttsLanguage, "")
if err != nil {
slog.Error("TTS failed", "error", err)
} else {
audio = audioBytes
}
}
result := &messages.ChatResponse{
UserID: userID,
Response: responseText,
ResponseText: responseText,
UsedRAG: usedRAG,
Success: true,
Audio: audio,
}
if includeSources {
result.RAGSources = ragSources
}
// Publish to the response subject the frontend is waiting on
_ = h.NATS.Publish(responseSubject, result)
slog.Info("completed request", "request_id", requestID, "rag", usedRAG)
return result, nil
})
if err := h.Run(); err != nil {
slog.Error("handler failed", "error", err)
os.Exit(1)
}
}
// Helpers
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func getEnvInt(key string, fallback int) int {
if v := os.Getenv(key); v != "" {
if i, err := strconv.Atoi(v); err == nil {
return i
}
}
return fallback
}
func getEnvBool(key string, fallback bool) bool {
if v := os.Getenv(key); v != "" {
return strings.EqualFold(v, "true") || v == "1"
}
return fallback
}

93
main_test.go Normal file
View File

@@ -0,0 +1,93 @@
package main
import (
"os"
"testing"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"github.com/vmihailenco/msgpack/v5"
)
func TestChatRequestDecode(t *testing.T) {
// Verify a msgpack-encoded map decodes cleanly into typed struct.
raw := map[string]any{
"request_id": "req-1",
"user_id": "user-1",
"message": "hello",
"premium": true,
"top_k": 10,
}
data, _ := msgpack.Marshal(raw)
var req messages.ChatRequest
if err := msgpack.Unmarshal(data, &req); err != nil {
t.Fatal(err)
}
if req.RequestID != "req-1" {
t.Errorf("RequestID = %q", req.RequestID)
}
if req.EffectiveQuery() != "hello" {
t.Errorf("EffectiveQuery = %q", req.EffectiveQuery())
}
if !req.Premium {
t.Error("Premium should be true")
}
if req.TopK != 10 {
t.Errorf("TopK = %d", req.TopK)
}
}
func TestChatResponseRoundtrip(t *testing.T) {
resp := &messages.ChatResponse{
UserID: "user-1",
Response: "answer",
Success: true,
Audio: []byte{0x01, 0x02, 0x03},
}
data, err := msgpack.Marshal(resp)
if err != nil {
t.Fatal(err)
}
var decoded messages.ChatResponse
if err := msgpack.Unmarshal(data, &decoded); err != nil {
t.Fatal(err)
}
if decoded.UserID != "user-1" || !decoded.Success {
t.Errorf("decoded = %+v", decoded)
}
if len(decoded.Audio) != 3 {
t.Errorf("audio len = %d", len(decoded.Audio))
}
}
func TestGetEnvHelpers(t *testing.T) {
t.Setenv("CHAT_TEST", "hello")
if got := getEnv("CHAT_TEST", "x"); got != "hello" {
t.Errorf("getEnv = %q", got)
}
if got := getEnv("NO_SUCH_VAR", "x"); got != "x" {
t.Errorf("getEnv fallback = %q", got)
}
t.Setenv("CHAT_PORT", "9090")
if got := getEnvInt("CHAT_PORT", 0); got != 9090 {
t.Errorf("getEnvInt = %d", got)
}
if got := getEnvInt("NO_SUCH_VAR", 80); got != 80 {
t.Errorf("getEnvInt fallback = %d", got)
}
t.Setenv("CHAT_FLAG", "true")
if got := getEnvBool("CHAT_FLAG", false); !got {
t.Error("getEnvBool should be true")
}
if got := getEnvBool("NO_SUCH_VAR", false); got {
t.Error("getEnvBool fallback should be false")
}
}
func TestMainBinaryBuilds(t *testing.T) {
// Verify the binary exists after build
if _, err := os.Stat("main.go"); err != nil {
t.Skip("main.go not found")
}
}

View File

@@ -1,43 +0,0 @@
[project]
name = "chat-handler"
version = "1.0.0"
description = "Text chat pipeline with RAG - Query → Embeddings → Milvus → Rerank → LLM"
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [{ name = "Davies Tech Labs" }]
dependencies = [
"handler-base @ git+https://git.daviestechlabs.io/daviestechlabs/handler-base.git",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"ruff>=0.1.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.metadata]
allow-direct-references = true
[tool.hatch.build.targets.wheel]
packages = ["."]
only-include = ["chat_handler.py"]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --tb=short"
filterwarnings = ["ignore::DeprecationWarning"]

View File

@@ -1 +0,0 @@
# Chat Handler Tests

View File

@@ -1,94 +0,0 @@
"""
Pytest configuration and fixtures for chat-handler tests.
"""
import asyncio
import os
from unittest.mock import MagicMock
import pytest
# Set test environment variables before importing
os.environ.setdefault("NATS_URL", "nats://localhost:4222")
os.environ.setdefault("REDIS_URL", "redis://localhost:6379")
os.environ.setdefault("MILVUS_HOST", "localhost")
os.environ.setdefault("OTEL_ENABLED", "false")
os.environ.setdefault("MLFLOW_ENABLED", "false")
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def sample_embedding():
"""Sample embedding vector."""
return [0.1] * 1024
@pytest.fixture
def sample_documents():
"""Sample search results."""
return [
{"text": "Machine learning is a subset of AI.", "score": 0.95},
{"text": "Deep learning uses neural networks.", "score": 0.90},
{"text": "AI enables intelligent automation.", "score": 0.85},
]
@pytest.fixture
def sample_reranked():
"""Sample reranked results."""
return [
{"document": "Machine learning is a subset of AI.", "score": 0.98},
{"document": "Deep learning uses neural networks.", "score": 0.85},
]
@pytest.fixture
def mock_nats_message():
"""Create a mock NATS message."""
msg = MagicMock()
msg.subject = "ai.chat.user.test-user-1.message"
msg.reply = "ai.chat.response.test-123"
return msg
@pytest.fixture
def mock_chat_request():
"""Sample chat request payload."""
return {
"request_id": "test-request-123",
"user_id": "test-user-1",
"username": "testuser",
"message": "What is machine learning?",
"premium": True,
"enable_rag": True,
"enable_reranker": True,
"enable_streaming": False,
"collection": "test_collection",
"enable_tts": False,
"system_prompt": None,
}
@pytest.fixture
def mock_chat_request_with_tts():
"""Sample chat request with TTS enabled."""
return {
"request_id": "test-request-456",
"user_id": "test-user-2",
"username": "testuser2",
"message": "Tell me about AI",
"premium": True,
"enable_rag": True,
"enable_reranker": True,
"enable_streaming": False,
"collection": "documents",
"enable_tts": True,
"system_prompt": "You are a helpful assistant.",
}

View File

@@ -1,273 +0,0 @@
"""
Unit tests for ChatHandler.
"""
import pytest
from unittest.mock import AsyncMock, patch
from chat_handler import ChatHandler, ChatSettings
class TestChatSettings:
"""Tests for ChatSettings configuration."""
def test_default_settings(self):
"""Test default settings values."""
settings = ChatSettings()
assert settings.service_name == "chat-handler"
assert settings.rag_top_k == 10
assert settings.rag_rerank_top_k == 5
assert settings.rag_collection == "documents"
assert settings.include_sources is True
assert settings.enable_tts is False
assert settings.tts_language == "en"
def test_custom_settings(self):
"""Test custom settings."""
settings = ChatSettings(
rag_top_k=20,
rag_collection="custom_docs",
enable_tts=True,
)
assert settings.rag_top_k == 20
assert settings.rag_collection == "custom_docs"
assert settings.enable_tts is True
class TestChatHandler:
"""Tests for ChatHandler."""
@pytest.fixture
def handler(self):
"""Create handler with mocked clients."""
with (
patch("chat_handler.EmbeddingsClient"),
patch("chat_handler.RerankerClient"),
patch("chat_handler.LLMClient"),
patch("chat_handler.TTSClient"),
patch("chat_handler.MilvusClient"),
):
handler = ChatHandler()
# Setup mock clients
handler.embeddings = AsyncMock()
handler.reranker = AsyncMock()
handler.llm = AsyncMock()
handler.milvus = AsyncMock()
handler.tts = None # TTS disabled by default
handler.nats = AsyncMock()
yield handler
@pytest.fixture
def handler_with_tts(self):
"""Create handler with TTS enabled."""
with (
patch("chat_handler.EmbeddingsClient"),
patch("chat_handler.RerankerClient"),
patch("chat_handler.LLMClient"),
patch("chat_handler.TTSClient"),
patch("chat_handler.MilvusClient"),
):
handler = ChatHandler()
handler.chat_settings.enable_tts = True
# Setup mock clients
handler.embeddings = AsyncMock()
handler.reranker = AsyncMock()
handler.llm = AsyncMock()
handler.milvus = AsyncMock()
handler.tts = AsyncMock()
handler.nats = AsyncMock()
yield handler
def test_init(self, handler):
"""Test handler initialization."""
assert handler.subject == "ai.chat.user.*.message"
assert handler.queue_group == "chat-handlers"
assert handler.chat_settings.service_name == "chat-handler"
@pytest.mark.asyncio
async def test_handle_message_success(
self,
handler,
mock_nats_message,
mock_chat_request,
sample_embedding,
sample_documents,
sample_reranked,
):
"""Test successful chat request handling."""
# Setup mocks
handler.embeddings.embed_single.return_value = sample_embedding
handler.milvus.search_with_texts.return_value = sample_documents
handler.reranker.rerank.return_value = sample_reranked
handler.llm.generate.return_value = "Machine learning is a subset of AI that..."
# Execute
result = await handler.handle_message(mock_nats_message, mock_chat_request)
# Verify
assert result["user_id"] == "test-user-1"
assert result["success"] is True
assert "response" in result
assert result["response"] == "Machine learning is a subset of AI that..."
assert result["response_text"] == result["response"]
assert result["used_rag"] is True
assert isinstance(result["rag_sources"], list)
# Verify RAG pipeline was called (enable_rag=True in fixture)
handler.embeddings.embed_single.assert_called_once()
handler.milvus.search_with_texts.assert_called_once()
handler.reranker.rerank.assert_called_once()
handler.llm.generate.assert_called_once()
@pytest.mark.asyncio
async def test_handle_message_without_sources(
self,
handler,
mock_nats_message,
mock_chat_request,
sample_embedding,
sample_documents,
sample_reranked,
):
"""Test response without sources when disabled."""
handler.chat_settings.include_sources = False
handler.embeddings.embed_single.return_value = sample_embedding
handler.milvus.search_with_texts.return_value = sample_documents
handler.reranker.rerank.return_value = sample_reranked
handler.llm.generate.return_value = "Response text"
result = await handler.handle_message(mock_nats_message, mock_chat_request)
# New response format doesn't have a separate "sources" key;
# rag_sources is always present (may be empty)
assert "rag_sources" in result
@pytest.mark.asyncio
async def test_handle_message_with_tts(
self,
handler_with_tts,
mock_nats_message,
mock_chat_request_with_tts,
sample_embedding,
sample_documents,
sample_reranked,
):
"""Test response with TTS audio."""
handler = handler_with_tts
handler.embeddings.embed_single.return_value = sample_embedding
handler.milvus.search_with_texts.return_value = sample_documents
handler.reranker.rerank.return_value = sample_reranked
handler.llm.generate.return_value = "AI response"
handler.tts.synthesize.return_value = b"audio_bytes"
result = await handler.handle_message(mock_nats_message, mock_chat_request_with_tts)
assert "audio" in result
handler.tts.synthesize.assert_called_once()
@pytest.mark.asyncio
async def test_handle_message_with_custom_system_prompt(
self,
handler,
mock_nats_message,
sample_embedding,
sample_documents,
sample_reranked,
):
"""Test LLM is called with custom system prompt."""
request = {
"request_id": "test-123",
"user_id": "user-42",
"message": "Hello",
"premium": True,
"enable_rag": True,
"system_prompt": "You are a pirate. Respond like one.",
}
handler.embeddings.embed_single.return_value = sample_embedding
handler.milvus.search_with_texts.return_value = sample_documents
handler.reranker.rerank.return_value = sample_reranked
handler.llm.generate.return_value = "Ahoy!"
await handler.handle_message(mock_nats_message, request)
# Verify system_prompt was passed to LLM
handler.llm.generate.assert_called_once()
call_kwargs = handler.llm.generate.call_args.kwargs
assert call_kwargs.get("system_prompt") == "You are a pirate. Respond like one."
def test_build_context(self, handler):
"""Test context building with numbered sources."""
documents = [
{"document": "First doc content"},
{"document": "Second doc content"},
]
context = handler._build_context(documents)
assert "[1]" in context
assert "[2]" in context
assert "First doc content" in context
assert "Second doc content" in context
@pytest.mark.asyncio
async def test_setup_initializes_clients(self):
"""Test that setup initializes all required clients."""
with (
patch("chat_handler.EmbeddingsClient") as emb_cls,
patch("chat_handler.RerankerClient") as rer_cls,
patch("chat_handler.LLMClient") as llm_cls,
patch("chat_handler.TTSClient") as tts_cls,
patch("chat_handler.MilvusClient") as mil_cls,
):
mil_cls.return_value.connect = AsyncMock()
handler = ChatHandler()
await handler.setup()
emb_cls.assert_called_once()
rer_cls.assert_called_once()
llm_cls.assert_called_once()
mil_cls.assert_called_once()
# TTS should not be initialized when disabled
tts_cls.assert_not_called()
@pytest.mark.asyncio
async def test_teardown_closes_clients(self, handler):
"""Test that teardown closes all clients."""
await handler.teardown()
handler.embeddings.close.assert_called_once()
handler.reranker.close.assert_called_once()
handler.llm.close.assert_called_once()
handler.milvus.close.assert_called_once()
@pytest.mark.asyncio
async def test_publishes_to_response_subject(
self,
handler,
mock_nats_message,
mock_chat_request,
sample_embedding,
sample_documents,
sample_reranked,
):
"""Test that result is published to response subject."""
handler.embeddings.embed_single.return_value = sample_embedding
handler.milvus.search_with_texts.return_value = sample_documents
handler.reranker.rerank.return_value = sample_reranked
handler.llm.generate.return_value = "Response"
await handler.handle_message(mock_nats_message, mock_chat_request)
handler.nats.publish.assert_called_once()
call_args = handler.nats.publish.call_args
assert "ai.chat.response.test-request-123" in str(call_args)

2638
uv.lock generated

File diff suppressed because it is too large Load Diff