7 Commits
v0.0.8 ... main

Author SHA1 Message Date
a298568ef0 fix: use type=raw for Docker tags to preserve v prefix
Some checks failed
CI / Docker Build & Push (push) Failing after 3m3s
CI / Notify (push) Successful in 2s
CI / Lint (push) Successful in 2m54s
CI / Test (push) Successful in 3m2s
CI / Release (push) Successful in 1m15s
docker/metadata-action type=semver strips the v prefix, causing
tag mismatch between git tags (v0.1.3) and Docker tags (0.1.3).
Switch to type=raw to pass through the version as-is.
2026-02-22 09:58:40 -05:00
4c3eb4952a fix: switch Docker build to plain docker build/push with insecure registry
Some checks failed
CI / Lint (push) Successful in 2m50s
CI / Release (push) Successful in 1m32s
CI / Test (push) Successful in 2m51s
CI / Docker Build & Push (push) Failing after 3m1s
CI / Notify (push) Successful in 2s
- Drop buildx (setup-buildx-action, build-push-action)
- Use insecure HTTP registry with SIGHUP daemon reload
- Use org-level PAT secrets for registry auth
2026-02-21 22:37:55 -05:00
4de3639df0 fix: use docker login CLI instead of login-action for Gitea compat
Some checks failed
CI / Lint (push) Successful in 2m18s
CI / Release (push) Successful in 1m20s
CI / Docker Build & Push (push) Failing after 5m43s
CI / Test (push) Successful in 3m20s
CI / Notify (push) Has been cancelled
docker/login-action@v3 fails with 'Username and password required' on
Gitea Actions — secrets not passed to action with: inputs. Switch to
direct docker login CLI which reliably interpolates secrets in run: steps.
2026-02-21 19:34:27 -05:00
f6623d5473 fix: switch Docker registry to HTTPS endpoint with login-action
Some checks failed
CI / Release (push) Successful in 1m17s
CI / Docker Build & Push (push) Failing after 8m21s
CI / Test (push) Successful in 3m11s
CI / Lint (push) Successful in 2m18s
CI / Notify (push) Successful in 2s
- Replace gitea-http.gitea.svc.cluster.local:3000 with registry.lab.daviestechlabs.io
- Use docker/login-action@v3 for Gitea registry auth (proper buildx integration)
- Remove manual base64 auth to ~/.docker/config.json (not picked up by buildkit)
- Remove insecure registry daemon.json config and Docker restart
- Remove buildkitd insecure registry config
- Remove cache-from/cache-to type=gha (not supported on Gitea Actions)

Fixes 401 Unauthorized: reqPackageAccess on Docker push
2026-02-21 18:05:40 -05:00
f61488a868 style: gofmt + fix copylocks lint warning
Some checks failed
CI / Test (push) Successful in 2m49s
CI / Lint (push) Successful in 3m16s
CI / Release (push) Successful in 2m8s
CI / Docker Build & Push (push) Failing after 6m13s
CI / Notify (push) Successful in 1s
2026-02-21 15:36:46 -05:00
e2176331c8 feat: migrate from msgpack to protobuf (handler-base v1.0.0)
Some checks failed
CI / Lint (push) Failing after 2m49s
CI / Test (push) Successful in 3m36s
CI / Notify (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
CI / Release (push) Has been cancelled
- Replace msgpack encoding with protobuf wire format
- Update field names to proto convention (UserId, RequestId, EnableRag, etc.)
- Use messages.EffectiveQuery() standalone function
- Cast TopK to int32 for proto compatibility
- Rewrite tests for proto round-trips
2026-02-21 15:30:04 -05:00
87d0545d2c feat: replace fake streaming with real SSE StreamGenerate
Some checks failed
CI / Lint (push) Successful in 3m0s
CI / Test (push) Successful in 3m23s
CI / Docker Build & Push (push) Failing after 4m55s
CI / Release (push) Successful in 1m4s
CI / Notify (push) Successful in 1s
Use handler-base StreamGenerate() to publish real token-by-token
ChatStreamChunk messages to NATS as they arrive from Ray Serve,
instead of calling Generate() and splitting into 4-word chunks.

Add 8 streaming tests: happy path, system prompt, RAG context,
nil callback, timeout, HTTP error, context canceled, fallback.
2026-02-21 09:23:57 -05:00
6 changed files with 344 additions and 132 deletions

View File

@@ -121,42 +121,19 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Set up Docker Buildx - name: Configure insecure registry
uses: docker/setup-buildx-action@v3
with:
buildkitd-config-inline: |
[registry."gitea-http.gitea.svc.cluster.local:3000"]
http = true
insecure = true
- name: Login to Docker Hub
if: vars.DOCKERHUB_USERNAME != ''
uses: docker/login-action@v3
with:
username: ${{ vars.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Configure Docker for insecure registry
run: | run: |
sudo mkdir -p /etc/docker sudo mkdir -p /etc/docker
echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json
sudo systemctl restart docker || sudo service docker restart || true sudo kill -SIGHUP "$(pidof dockerd)" || true
sleep 2 sleep 3
- name: Login to Gitea Registry - name: Login to Gitea Registry
run: | run: echo "${{ secrets.REGISTRY_TOKEN }}" | docker login "${{ env.REGISTRY_HOST }}" -u "${{ secrets.REGISTRY_USER }}" --password-stdin
AUTH=$(echo -n "${{ secrets.REGISTRY_USER }}:${{ secrets.REGISTRY_TOKEN }}" | base64 -w0)
mkdir -p ~/.docker - name: Login to Docker Hub
cat > ~/.docker/config.json << EOF if: vars.DOCKERHUB_USERNAME != ''
{ run: echo "${{ secrets.DOCKERHUB_TOKEN }}" | docker login -u "${{ vars.DOCKERHUB_USERNAME }}" --password-stdin
"auths": {
"${{ env.REGISTRY_HOST }}": {
"auth": "$AUTH"
}
}
}
EOF
echo "Auth configured for ${{ env.REGISTRY_HOST }}"
- name: Extract metadata - name: Extract metadata
id: meta id: meta
@@ -164,19 +141,25 @@ jobs:
with: with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: | tags: |
type=semver,pattern={{version}},value=${{ needs.release.outputs.version }} type=raw,value=${{ needs.release.outputs.version }}
type=semver,pattern={{major}}.{{minor}},value=${{ needs.release.outputs.version }}
type=raw,value=latest,enable={{is_default_branch}} type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v5 run: |
with: # Build with all tags
context: . TAGS=""
push: true while IFS= read -r tag; do
tags: ${{ steps.meta.outputs.tags }} [ -n "$tag" ] && TAGS="$TAGS -t $tag"
labels: ${{ steps.meta.outputs.labels }} done <<< "${{ steps.meta.outputs.tags }}"
cache-from: type=gha docker build $TAGS \
cache-to: type=gha,mode=max --label "org.opencontainers.image.source=${{ gitea.server_url }}/${{ gitea.repository }}" \
--label "org.opencontainers.image.revision=${{ gitea.sha }}" \
.
# Push each tag
while IFS= read -r tag; do
[ -n "$tag" ] && docker push "$tag"
done <<< "${{ steps.meta.outputs.tags }}"
notify: notify:
name: Notify name: Notify

View File

@@ -3,14 +3,17 @@ package main
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings"
"sync"
"testing" "testing"
"time" "time"
"git.daviestechlabs.io/daviestechlabs/handler-base/clients" "git.daviestechlabs.io/daviestechlabs/handler-base/clients"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages" "git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"github.com/vmihailenco/msgpack/v5" "google.golang.org/protobuf/proto"
) )
// ──────────────────────────────────────────────────────────────────────────── // ────────────────────────────────────────────────────────────────────────────
@@ -164,33 +167,33 @@ func TestChatPipeline_LLMTimeout(t *testing.T) {
} }
func TestChatPipeline_TypedDecoding(t *testing.T) { func TestChatPipeline_TypedDecoding(t *testing.T) {
// Verify typed struct decoding from msgpack (same path as OnTypedMessage). // Verify typed struct decoding from proto (same path as OnTypedMessage).
raw := map[string]any{ original := &messages.ChatRequest{
"request_id": "req-e2e-001", RequestId: "req-e2e-001",
"user_id": "user-1", UserId: "user-1",
"message": "hello", Message: "hello",
"premium": true, Premium: true,
"enable_rag": false, EnableRag: false,
"enable_streaming": false, EnableStreaming: false,
"system_prompt": "Be brief.", SystemPrompt: "Be brief.",
} }
data, _ := msgpack.Marshal(raw) data, _ := proto.Marshal(original)
var req messages.ChatRequest var req messages.ChatRequest
if err := msgpack.Unmarshal(data, &req); err != nil { if err := proto.Unmarshal(data, &req); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if req.RequestID != "req-e2e-001" { if req.RequestId != "req-e2e-001" {
t.Errorf("RequestID = %q", req.RequestID) t.Errorf("RequestID = %q", req.RequestId)
} }
if req.UserID != "user-1" { if req.UserId != "user-1" {
t.Errorf("UserID = %q", req.UserID) t.Errorf("UserID = %q", req.UserId)
} }
if req.EffectiveQuery() != "hello" { if messages.EffectiveQuery(&req) != "hello" {
t.Errorf("query = %q", req.EffectiveQuery()) t.Errorf("query = %q", messages.EffectiveQuery(&req))
} }
if req.EnableRAG { if req.EnableRag {
t.Error("EnableRAG should be false") t.Error("EnableRAG should be false")
} }
if req.SystemPrompt != "Be brief." { if req.SystemPrompt != "Be brief." {
@@ -198,6 +201,220 @@ func TestChatPipeline_TypedDecoding(t *testing.T) {
} }
} }
// ────────────────────────────────────────────────────────────────────────────
// Streaming tests: exercise StreamGenerate path (the real SSE pipeline)
// ────────────────────────────────────────────────────────────────────────────
// sseChunk builds an OpenAI-compatible SSE data line.
func sseChunk(content string) string {
return fmt.Sprintf("data: {\"choices\":[{\"delta\":{\"content\":%q}}]}\n\n", content)
}
// newStreamingLLM creates a mock LLM server that responds with SSE-streamed tokens.
func newStreamingLLM(t *testing.T, tokens []string) *httptest.Server {
t.Helper()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var req map[string]any
_ = json.NewDecoder(r.Body).Decode(&req)
// Verify stream=true was requested.
if stream, ok := req["stream"].(bool); !ok || !stream {
t.Error("expected stream=true in request body")
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
flusher, _ := w.(http.Flusher)
// Role-only chunk (should be skipped by StreamGenerate)
_, _ = fmt.Fprintf(w, "data: {\"choices\":[{\"delta\":{\"role\":\"assistant\"}}]}\n\n")
if flusher != nil {
flusher.Flush()
}
for _, tok := range tokens {
_, _ = fmt.Fprint(w, sseChunk(tok))
if flusher != nil {
flusher.Flush()
}
}
_, _ = fmt.Fprintf(w, "data: [DONE]\n\n")
if flusher != nil {
flusher.Flush()
}
}))
t.Cleanup(srv.Close)
return srv
}
func TestChatPipeline_StreamGenerate(t *testing.T) {
tokens := []string{"Paris", " is", " the", " capital", " of", " France", "."}
srv := newStreamingLLM(t, tokens)
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
var mu sync.Mutex
var received []string
full, err := llm.StreamGenerate(context.Background(), "capital of France?", "", "", func(token string) {
mu.Lock()
defer mu.Unlock()
received = append(received, token)
})
if err != nil {
t.Fatal(err)
}
if full != "Paris is the capital of France." {
t.Errorf("full = %q", full)
}
if len(received) != len(tokens) {
t.Errorf("callback count = %d, want %d", len(received), len(tokens))
}
for i, tok := range tokens {
if received[i] != tok {
t.Errorf("token[%d] = %q, want %q", i, received[i], tok)
}
}
}
func TestChatPipeline_StreamWithSystemPrompt(t *testing.T) {
srv := newStreamingLLM(t, []string{"Hello", "!"})
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
full, err := llm.StreamGenerate(context.Background(), "greet me", "", "You are a friendly assistant.", func(token string) {})
if err != nil {
t.Fatal(err)
}
if full != "Hello!" {
t.Errorf("full = %q", full)
}
}
func TestChatPipeline_StreamWithRAGContext(t *testing.T) {
m := newMockBackends(t)
srv := newStreamingLLM(t, []string{"The", " answer", " is", " 42"})
embeddings := clients.NewEmbeddingsClient(m.Embeddings.URL, 5*time.Second, "bge")
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
ctx := context.Background()
// 1. Embed
embedding, err := embeddings.EmbedSingle(ctx, "deep thought")
if err != nil {
t.Fatal(err)
}
if len(embedding) == 0 {
t.Fatal("empty embedding")
}
// 2. Stream with context
var tokens []string
full, err := llm.StreamGenerate(ctx, "deep thought", "The answer to everything is 42.", "", func(tok string) {
tokens = append(tokens, tok)
})
if err != nil {
t.Fatal(err)
}
if full != "The answer is 42" {
t.Errorf("full = %q", full)
}
if len(tokens) != 4 {
t.Errorf("token count = %d, want 4", len(tokens))
}
}
func TestChatPipeline_StreamNilCallback(t *testing.T) {
srv := newStreamingLLM(t, []string{"ok"})
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
full, err := llm.StreamGenerate(context.Background(), "test", "", "", nil)
if err != nil {
t.Fatal(err)
}
if full != "ok" {
t.Errorf("full = %q", full)
}
}
func TestChatPipeline_StreamTimeout(t *testing.T) {
slow := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(200 * time.Millisecond)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = fmt.Fprint(w, sseChunk("late"))
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
}))
defer slow.Close()
llm := clients.NewLLMClient(slow.URL, 100*time.Millisecond)
_, err := llm.StreamGenerate(context.Background(), "hello", "", "", nil)
if err == nil {
t.Error("expected timeout error")
}
}
func TestChatPipeline_StreamHTTPError(t *testing.T) {
errSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("internal error"))
}))
defer errSrv.Close()
llm := clients.NewLLMClient(errSrv.URL, 5*time.Second)
_, err := llm.StreamGenerate(context.Background(), "hello", "", "", nil)
if err == nil {
t.Error("expected error for HTTP 500")
}
if !strings.Contains(err.Error(), "500") {
t.Errorf("error = %q, should mention status 500", err)
}
}
func TestChatPipeline_StreamContextCanceled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
srv := newStreamingLLM(t, []string{"should", "not", "arrive"})
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
_, err := llm.StreamGenerate(ctx, "hello", "", "", nil)
if err == nil {
t.Error("expected context canceled error")
}
}
func TestChatPipeline_StreamFallbackToNonStreaming(t *testing.T) {
// Simulate the branching in main.go: non-streaming uses Generate(),
// streaming uses StreamGenerate(). Verify both paths work from same mock.
m := newMockBackends(t)
streamSrv := newStreamingLLM(t, []string{"streamed", " answer"})
nonStreamLLM := clients.NewLLMClient(m.LLM.URL, 5*time.Second)
streamLLM := clients.NewLLMClient(streamSrv.URL, 5*time.Second)
ctx := context.Background()
// Non-streaming path
resp1, err := nonStreamLLM.Generate(ctx, "hello", "", "")
if err != nil {
t.Fatal(err)
}
if resp1 != "Paris is the capital of France." {
t.Errorf("non-stream = %q", resp1)
}
// Streaming path
var tokens []string
resp2, err := streamLLM.StreamGenerate(ctx, "hello", "", "", func(tok string) {
tokens = append(tokens, tok)
})
if err != nil {
t.Fatal(err)
}
if resp2 != "streamed answer" {
t.Errorf("stream = %q", resp2)
}
if len(tokens) != 2 {
t.Errorf("token count = %d", len(tokens))
}
}
// ──────────────────────────────────────────────────────────────────────────── // ────────────────────────────────────────────────────────────────────────────
// Benchmark: full chat pipeline overhead (mock backends) // Benchmark: full chat pipeline overhead (mock backends)
// ──────────────────────────────────────────────────────────────────────────── // ────────────────────────────────────────────────────────────────────────────
@@ -245,3 +462,23 @@ func BenchmarkChatPipeline_RAGFlow(b *testing.B) {
_, _ = llm.Generate(ctx, "question", "context", "") _, _ = llm.Generate(ctx, "question", "context", "")
} }
} }
func BenchmarkChatPipeline_StreamGenerate(b *testing.B) {
tokens := []string{"one", " two", " three", " four", " five"}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
for _, tok := range tokens {
_, _ = fmt.Fprintf(w, "data: {\"choices\":[{\"delta\":{\"content\":%q}}]}\n\n", tok)
}
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
}))
defer srv.Close()
llm := clients.NewLLMClient(srv.URL, 10*time.Second)
ctx := context.Background()
b.ResetTimer()
for b.Loop() {
_, _ = llm.StreamGenerate(ctx, "question", "", "", func(string) {})
}
}

6
go.mod
View File

@@ -3,9 +3,9 @@ module git.daviestechlabs.io/daviestechlabs/chat-handler
go 1.25.1 go 1.25.1
require ( require (
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5 git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0
github.com/nats-io/nats.go v1.48.0 github.com/nats-io/nats.go v1.48.0
github.com/vmihailenco/msgpack/v5 v5.4.1 google.golang.org/protobuf v1.36.11
) )
require ( require (
@@ -19,7 +19,6 @@ require (
github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/compress v1.18.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect go.opentelemetry.io/otel v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
@@ -37,5 +36,4 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/grpc v1.78.0 // indirect google.golang.org/grpc v1.78.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
) )

8
go.sum
View File

@@ -1,5 +1,5 @@
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5 h1:DqYZpeluTXh5QKqdVFgN8YIMh4Ycqzw5E9+5FTNDFCA= git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0 h1:pB3ehOKaDYQfbyRBKQXrB9curqSFteLrDveoElRKnBY=
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5/go.mod h1:M3HgvUDWnRn7cX3BE8l+HvoCUYtmRr5OoumB+hnRHoE= git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0/go.mod h1:zocOHFt8yY3cW4+Xi37sNr5Tw7KcjGFSZqgWYxPWyqA=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@@ -33,10 +33,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=

79
main.go
View File

@@ -16,6 +16,7 @@ import (
"git.daviestechlabs.io/daviestechlabs/handler-base/handler" "git.daviestechlabs.io/daviestechlabs/handler-base/handler"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages" "git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil" "git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
"google.golang.org/protobuf/proto"
) )
func main() { func main() {
@@ -45,23 +46,23 @@ func main() {
h := handler.New("ai.chat.user.*.message", cfg) h := handler.New("ai.chat.user.*.message", cfg)
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) { h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
req, err := natsutil.Decode[messages.ChatRequest](msg.Data) var req messages.ChatRequest
if err != nil { if err := natsutil.Decode(msg.Data, &req); err != nil {
slog.Error("decode failed", "error", err) slog.Error("decode failed", "error", err)
return &messages.ErrorResponse{Error: true, Message: err.Error(), Type: "DecodeError"}, nil return &messages.ErrorResponse{Error: true, Message: err.Error(), Type: "DecodeError"}, nil
} }
query := req.EffectiveQuery() query := messages.EffectiveQuery(&req)
requestID := req.RequestID requestID := req.RequestId
if requestID == "" { if requestID == "" {
requestID = "unknown" requestID = "unknown"
} }
userID := req.UserID userID := req.UserId
if userID == "" { if userID == "" {
userID = "unknown" userID = "unknown"
} }
enableRAG := req.EnableRAG enableRAG := req.EnableRag
if !enableRAG && req.Premium { if !enableRAG && req.Premium {
enableRAG = true enableRAG = true
} }
@@ -71,13 +72,13 @@ func main() {
} }
topK := req.TopK topK := req.TopK
if topK == 0 { if topK == 0 {
topK = ragTopK topK = int32(ragTopK)
} }
collection := req.Collection collection := req.Collection
if collection == "" { if collection == "" {
collection = ragCollection collection = ragCollection
} }
reqEnableTTS := req.EnableTTS || enableTTS reqEnableTTS := req.EnableTts || enableTTS
systemPrompt := req.SystemPrompt systemPrompt := req.SystemPrompt
responseSubject := req.ResponseSubject responseSubject := req.ResponseSubject
if responseSubject == "" { if responseSubject == "" {
@@ -157,44 +158,38 @@ func main() {
} }
} }
// 5. Generate LLM response // 5. Generate LLM response (streaming when requested)
responseText, err := llm.Generate(ctx, query, contextText, systemPrompt) var responseText string
var err error
if req.EnableStreaming {
streamSubject := fmt.Sprintf("ai.chat.response.stream.%s", requestID)
responseText, err = llm.StreamGenerate(ctx, query, contextText, systemPrompt, func(token string) {
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
RequestId: requestID,
Type: "chunk",
Content: token,
Timestamp: messages.Timestamp(),
})
})
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
RequestId: requestID,
Type: "done",
Done: true,
Timestamp: messages.Timestamp(),
})
} else {
responseText, err = llm.Generate(ctx, query, contextText, systemPrompt)
}
if err != nil { if err != nil {
slog.Error("LLM generation failed", "error", err) slog.Error("LLM generation failed", "error", err)
return &messages.ChatResponse{ return &messages.ChatResponse{
UserID: userID, UserId: userID,
Success: false, Success: false,
Error: err.Error(), Error: err.Error(),
}, nil }, nil
} }
// 6. Stream chunks if requested // 6. Optional TTS — audio as raw bytes (no base64)
if req.EnableStreaming {
streamSubject := fmt.Sprintf("ai.chat.response.stream.%s", requestID)
words := strings.Fields(responseText)
chunkSize := 4
for i := 0; i < len(words); i += chunkSize {
end := i + chunkSize
if end > len(words) {
end = len(words)
}
chunk := strings.Join(words[i:end], " ")
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
RequestID: requestID,
Type: "chunk",
Content: chunk,
Timestamp: messages.Timestamp(),
})
}
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
RequestID: requestID,
Type: "done",
Done: true,
Timestamp: messages.Timestamp(),
})
}
// 7. Optional TTS — audio as raw bytes (no base64)
var audio []byte var audio []byte
if reqEnableTTS && tts != nil { if reqEnableTTS && tts != nil {
audioBytes, err := tts.Synthesize(ctx, responseText, ttsLanguage, "") audioBytes, err := tts.Synthesize(ctx, responseText, ttsLanguage, "")
@@ -206,15 +201,15 @@ func main() {
} }
result := &messages.ChatResponse{ result := &messages.ChatResponse{
UserID: userID, UserId: userID,
Response: responseText, Response: responseText,
ResponseText: responseText, ResponseText: responseText,
UsedRAG: usedRAG, UsedRag: usedRAG,
Success: true, Success: true,
Audio: audio, Audio: audio,
} }
if includeSources { if includeSources {
result.RAGSources = ragSources result.RagSources = ragSources
} }
// Publish to the response subject the frontend is waiting on // Publish to the response subject the frontend is waiting on

View File

@@ -5,28 +5,31 @@ import (
"testing" "testing"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages" "git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"github.com/vmihailenco/msgpack/v5" "google.golang.org/protobuf/proto"
) )
func TestChatRequestDecode(t *testing.T) { func TestChatRequestDecode(t *testing.T) {
// Verify a msgpack-encoded map decodes cleanly into typed struct. // Verify a proto-encoded struct round-trips cleanly.
raw := map[string]any{ original := &messages.ChatRequest{
"request_id": "req-1", RequestId: "req-1",
"user_id": "user-1", UserId: "user-1",
"message": "hello", Message: "hello",
"premium": true, Premium: true,
"top_k": 10, TopK: 10,
} }
data, _ := msgpack.Marshal(raw) data, err := proto.Marshal(original)
var req messages.ChatRequest if err != nil {
if err := msgpack.Unmarshal(data, &req); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if req.RequestID != "req-1" { var req messages.ChatRequest
t.Errorf("RequestID = %q", req.RequestID) if err := proto.Unmarshal(data, &req); err != nil {
t.Fatal(err)
} }
if req.EffectiveQuery() != "hello" { if req.RequestId != "req-1" {
t.Errorf("EffectiveQuery = %q", req.EffectiveQuery()) t.Errorf("RequestID = %q", req.RequestId)
}
if messages.EffectiveQuery(&req) != "hello" {
t.Errorf("EffectiveQuery = %q", messages.EffectiveQuery(&req))
} }
if !req.Premium { if !req.Premium {
t.Error("Premium should be true") t.Error("Premium should be true")
@@ -38,21 +41,21 @@ func TestChatRequestDecode(t *testing.T) {
func TestChatResponseRoundtrip(t *testing.T) { func TestChatResponseRoundtrip(t *testing.T) {
resp := &messages.ChatResponse{ resp := &messages.ChatResponse{
UserID: "user-1", UserId: "user-1",
Response: "answer", Response: "answer",
Success: true, Success: true,
Audio: []byte{0x01, 0x02, 0x03}, Audio: []byte{0x01, 0x02, 0x03},
} }
data, err := msgpack.Marshal(resp) data, err := proto.Marshal(resp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
var decoded messages.ChatResponse var decoded messages.ChatResponse
if err := msgpack.Unmarshal(data, &decoded); err != nil { if err := proto.Unmarshal(data, &decoded); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if decoded.UserID != "user-1" || !decoded.Success { if decoded.UserId != "user-1" || !decoded.Success {
t.Errorf("decoded = %+v", decoded) t.Errorf("decoded: UserId=%s, Success=%v", decoded.UserId, decoded.Success)
} }
if len(decoded.Audio) != 3 { if len(decoded.Audio) != 3 {
t.Errorf("audio len = %d", len(decoded.Audio)) t.Errorf("audio len = %d", len(decoded.Audio))