Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f61488a868 | |||
| e2176331c8 | |||
| 87d0545d2c |
275
e2e_test.go
275
e2e_test.go
@@ -3,14 +3,17 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/clients"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/clients"
|
||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
||||||
"github.com/vmihailenco/msgpack/v5"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ────────────────────────────────────────────────────────────────────────────
|
// ────────────────────────────────────────────────────────────────────────────
|
||||||
@@ -164,33 +167,33 @@ func TestChatPipeline_LLMTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestChatPipeline_TypedDecoding(t *testing.T) {
|
func TestChatPipeline_TypedDecoding(t *testing.T) {
|
||||||
// Verify typed struct decoding from msgpack (same path as OnTypedMessage).
|
// Verify typed struct decoding from proto (same path as OnTypedMessage).
|
||||||
raw := map[string]any{
|
original := &messages.ChatRequest{
|
||||||
"request_id": "req-e2e-001",
|
RequestId: "req-e2e-001",
|
||||||
"user_id": "user-1",
|
UserId: "user-1",
|
||||||
"message": "hello",
|
Message: "hello",
|
||||||
"premium": true,
|
Premium: true,
|
||||||
"enable_rag": false,
|
EnableRag: false,
|
||||||
"enable_streaming": false,
|
EnableStreaming: false,
|
||||||
"system_prompt": "Be brief.",
|
SystemPrompt: "Be brief.",
|
||||||
}
|
}
|
||||||
data, _ := msgpack.Marshal(raw)
|
data, _ := proto.Marshal(original)
|
||||||
|
|
||||||
var req messages.ChatRequest
|
var req messages.ChatRequest
|
||||||
if err := msgpack.Unmarshal(data, &req); err != nil {
|
if err := proto.Unmarshal(data, &req); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.RequestID != "req-e2e-001" {
|
if req.RequestId != "req-e2e-001" {
|
||||||
t.Errorf("RequestID = %q", req.RequestID)
|
t.Errorf("RequestID = %q", req.RequestId)
|
||||||
}
|
}
|
||||||
if req.UserID != "user-1" {
|
if req.UserId != "user-1" {
|
||||||
t.Errorf("UserID = %q", req.UserID)
|
t.Errorf("UserID = %q", req.UserId)
|
||||||
}
|
}
|
||||||
if req.EffectiveQuery() != "hello" {
|
if messages.EffectiveQuery(&req) != "hello" {
|
||||||
t.Errorf("query = %q", req.EffectiveQuery())
|
t.Errorf("query = %q", messages.EffectiveQuery(&req))
|
||||||
}
|
}
|
||||||
if req.EnableRAG {
|
if req.EnableRag {
|
||||||
t.Error("EnableRAG should be false")
|
t.Error("EnableRAG should be false")
|
||||||
}
|
}
|
||||||
if req.SystemPrompt != "Be brief." {
|
if req.SystemPrompt != "Be brief." {
|
||||||
@@ -198,6 +201,220 @@ func TestChatPipeline_TypedDecoding(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ────────────────────────────────────────────────────────────────────────────
|
||||||
|
// Streaming tests: exercise StreamGenerate path (the real SSE pipeline)
|
||||||
|
// ────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// sseChunk builds an OpenAI-compatible SSE data line.
|
||||||
|
func sseChunk(content string) string {
|
||||||
|
return fmt.Sprintf("data: {\"choices\":[{\"delta\":{\"content\":%q}}]}\n\n", content)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newStreamingLLM creates a mock LLM server that responds with SSE-streamed tokens.
|
||||||
|
func newStreamingLLM(t *testing.T, tokens []string) *httptest.Server {
|
||||||
|
t.Helper()
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req map[string]any
|
||||||
|
_ = json.NewDecoder(r.Body).Decode(&req)
|
||||||
|
// Verify stream=true was requested.
|
||||||
|
if stream, ok := req["stream"].(bool); !ok || !stream {
|
||||||
|
t.Error("expected stream=true in request body")
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
|
flusher, _ := w.(http.Flusher)
|
||||||
|
|
||||||
|
// Role-only chunk (should be skipped by StreamGenerate)
|
||||||
|
_, _ = fmt.Fprintf(w, "data: {\"choices\":[{\"delta\":{\"role\":\"assistant\"}}]}\n\n")
|
||||||
|
if flusher != nil {
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tok := range tokens {
|
||||||
|
_, _ = fmt.Fprint(w, sseChunk(tok))
|
||||||
|
if flusher != nil {
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, _ = fmt.Fprintf(w, "data: [DONE]\n\n")
|
||||||
|
if flusher != nil {
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
t.Cleanup(srv.Close)
|
||||||
|
return srv
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChatPipeline_StreamGenerate(t *testing.T) {
|
||||||
|
tokens := []string{"Paris", " is", " the", " capital", " of", " France", "."}
|
||||||
|
srv := newStreamingLLM(t, tokens)
|
||||||
|
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
|
||||||
|
|
||||||
|
var mu sync.Mutex
|
||||||
|
var received []string
|
||||||
|
full, err := llm.StreamGenerate(context.Background(), "capital of France?", "", "", func(token string) {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
received = append(received, token)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if full != "Paris is the capital of France." {
|
||||||
|
t.Errorf("full = %q", full)
|
||||||
|
}
|
||||||
|
if len(received) != len(tokens) {
|
||||||
|
t.Errorf("callback count = %d, want %d", len(received), len(tokens))
|
||||||
|
}
|
||||||
|
for i, tok := range tokens {
|
||||||
|
if received[i] != tok {
|
||||||
|
t.Errorf("token[%d] = %q, want %q", i, received[i], tok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChatPipeline_StreamWithSystemPrompt(t *testing.T) {
|
||||||
|
srv := newStreamingLLM(t, []string{"Hello", "!"})
|
||||||
|
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
|
||||||
|
|
||||||
|
full, err := llm.StreamGenerate(context.Background(), "greet me", "", "You are a friendly assistant.", func(token string) {})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if full != "Hello!" {
|
||||||
|
t.Errorf("full = %q", full)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChatPipeline_StreamWithRAGContext(t *testing.T) {
|
||||||
|
m := newMockBackends(t)
|
||||||
|
srv := newStreamingLLM(t, []string{"The", " answer", " is", " 42"})
|
||||||
|
embeddings := clients.NewEmbeddingsClient(m.Embeddings.URL, 5*time.Second, "bge")
|
||||||
|
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// 1. Embed
|
||||||
|
embedding, err := embeddings.EmbedSingle(ctx, "deep thought")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(embedding) == 0 {
|
||||||
|
t.Fatal("empty embedding")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Stream with context
|
||||||
|
var tokens []string
|
||||||
|
full, err := llm.StreamGenerate(ctx, "deep thought", "The answer to everything is 42.", "", func(tok string) {
|
||||||
|
tokens = append(tokens, tok)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if full != "The answer is 42" {
|
||||||
|
t.Errorf("full = %q", full)
|
||||||
|
}
|
||||||
|
if len(tokens) != 4 {
|
||||||
|
t.Errorf("token count = %d, want 4", len(tokens))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChatPipeline_StreamNilCallback(t *testing.T) {
|
||||||
|
srv := newStreamingLLM(t, []string{"ok"})
|
||||||
|
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
|
||||||
|
|
||||||
|
full, err := llm.StreamGenerate(context.Background(), "test", "", "", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if full != "ok" {
|
||||||
|
t.Errorf("full = %q", full)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChatPipeline_StreamTimeout(t *testing.T) {
|
||||||
|
slow := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
_, _ = fmt.Fprint(w, sseChunk("late"))
|
||||||
|
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
|
||||||
|
}))
|
||||||
|
defer slow.Close()
|
||||||
|
|
||||||
|
llm := clients.NewLLMClient(slow.URL, 100*time.Millisecond)
|
||||||
|
_, err := llm.StreamGenerate(context.Background(), "hello", "", "", nil)
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected timeout error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChatPipeline_StreamHTTPError(t *testing.T) {
|
||||||
|
errSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
_, _ = w.Write([]byte("internal error"))
|
||||||
|
}))
|
||||||
|
defer errSrv.Close()
|
||||||
|
|
||||||
|
llm := clients.NewLLMClient(errSrv.URL, 5*time.Second)
|
||||||
|
_, err := llm.StreamGenerate(context.Background(), "hello", "", "", nil)
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected error for HTTP 500")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "500") {
|
||||||
|
t.Errorf("error = %q, should mention status 500", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChatPipeline_StreamContextCanceled(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel() // cancel immediately
|
||||||
|
|
||||||
|
srv := newStreamingLLM(t, []string{"should", "not", "arrive"})
|
||||||
|
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
|
||||||
|
|
||||||
|
_, err := llm.StreamGenerate(ctx, "hello", "", "", nil)
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected context canceled error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChatPipeline_StreamFallbackToNonStreaming(t *testing.T) {
|
||||||
|
// Simulate the branching in main.go: non-streaming uses Generate(),
|
||||||
|
// streaming uses StreamGenerate(). Verify both paths work from same mock.
|
||||||
|
m := newMockBackends(t)
|
||||||
|
streamSrv := newStreamingLLM(t, []string{"streamed", " answer"})
|
||||||
|
|
||||||
|
nonStreamLLM := clients.NewLLMClient(m.LLM.URL, 5*time.Second)
|
||||||
|
streamLLM := clients.NewLLMClient(streamSrv.URL, 5*time.Second)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Non-streaming path
|
||||||
|
resp1, err := nonStreamLLM.Generate(ctx, "hello", "", "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if resp1 != "Paris is the capital of France." {
|
||||||
|
t.Errorf("non-stream = %q", resp1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Streaming path
|
||||||
|
var tokens []string
|
||||||
|
resp2, err := streamLLM.StreamGenerate(ctx, "hello", "", "", func(tok string) {
|
||||||
|
tokens = append(tokens, tok)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if resp2 != "streamed answer" {
|
||||||
|
t.Errorf("stream = %q", resp2)
|
||||||
|
}
|
||||||
|
if len(tokens) != 2 {
|
||||||
|
t.Errorf("token count = %d", len(tokens))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ────────────────────────────────────────────────────────────────────────────
|
// ────────────────────────────────────────────────────────────────────────────
|
||||||
// Benchmark: full chat pipeline overhead (mock backends)
|
// Benchmark: full chat pipeline overhead (mock backends)
|
||||||
// ────────────────────────────────────────────────────────────────────────────
|
// ────────────────────────────────────────────────────────────────────────────
|
||||||
@@ -245,3 +462,23 @@ func BenchmarkChatPipeline_RAGFlow(b *testing.B) {
|
|||||||
_, _ = llm.Generate(ctx, "question", "context", "")
|
_, _ = llm.Generate(ctx, "question", "context", "")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkChatPipeline_StreamGenerate(b *testing.B) {
|
||||||
|
tokens := []string{"one", " two", " three", " four", " five"}
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
for _, tok := range tokens {
|
||||||
|
_, _ = fmt.Fprintf(w, "data: {\"choices\":[{\"delta\":{\"content\":%q}}]}\n\n", tok)
|
||||||
|
}
|
||||||
|
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
llm := clients.NewLLMClient(srv.URL, 10*time.Second)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for b.Loop() {
|
||||||
|
_, _ = llm.StreamGenerate(ctx, "question", "", "", func(string) {})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
6
go.mod
6
go.mod
@@ -3,9 +3,9 @@ module git.daviestechlabs.io/daviestechlabs/chat-handler
|
|||||||
go 1.25.1
|
go 1.25.1
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5
|
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0
|
||||||
github.com/nats-io/nats.go v1.48.0
|
github.com/nats-io/nats.go v1.48.0
|
||||||
github.com/vmihailenco/msgpack/v5 v5.4.1
|
google.golang.org/protobuf v1.36.11
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
@@ -19,7 +19,6 @@ require (
|
|||||||
github.com/klauspost/compress v1.18.0 // indirect
|
github.com/klauspost/compress v1.18.0 // indirect
|
||||||
github.com/nats-io/nkeys v0.4.11 // indirect
|
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
|
||||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||||
go.opentelemetry.io/otel v1.40.0 // indirect
|
go.opentelemetry.io/otel v1.40.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
|
||||||
@@ -37,5 +36,4 @@ require (
|
|||||||
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
|
||||||
google.golang.org/grpc v1.78.0 // indirect
|
google.golang.org/grpc v1.78.0 // indirect
|
||||||
google.golang.org/protobuf v1.36.11 // indirect
|
|
||||||
)
|
)
|
||||||
|
|||||||
8
go.sum
8
go.sum
@@ -1,5 +1,5 @@
|
|||||||
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5 h1:DqYZpeluTXh5QKqdVFgN8YIMh4Ycqzw5E9+5FTNDFCA=
|
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0 h1:pB3ehOKaDYQfbyRBKQXrB9curqSFteLrDveoElRKnBY=
|
||||||
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5/go.mod h1:M3HgvUDWnRn7cX3BE8l+HvoCUYtmRr5OoumB+hnRHoE=
|
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0/go.mod h1:zocOHFt8yY3cW4+Xi37sNr5Tw7KcjGFSZqgWYxPWyqA=
|
||||||
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
|
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
|
||||||
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
||||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
@@ -33,10 +33,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
|||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
|
|
||||||
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
|
|
||||||
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
|
|
||||||
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
|
|
||||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||||
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
|
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
|
||||||
|
|||||||
79
main.go
79
main.go
@@ -16,6 +16,7 @@ import (
|
|||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
|
||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -45,23 +46,23 @@ func main() {
|
|||||||
|
|
||||||
h := handler.New("ai.chat.user.*.message", cfg)
|
h := handler.New("ai.chat.user.*.message", cfg)
|
||||||
|
|
||||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||||
req, err := natsutil.Decode[messages.ChatRequest](msg.Data)
|
var req messages.ChatRequest
|
||||||
if err != nil {
|
if err := natsutil.Decode(msg.Data, &req); err != nil {
|
||||||
slog.Error("decode failed", "error", err)
|
slog.Error("decode failed", "error", err)
|
||||||
return &messages.ErrorResponse{Error: true, Message: err.Error(), Type: "DecodeError"}, nil
|
return &messages.ErrorResponse{Error: true, Message: err.Error(), Type: "DecodeError"}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
query := req.EffectiveQuery()
|
query := messages.EffectiveQuery(&req)
|
||||||
requestID := req.RequestID
|
requestID := req.RequestId
|
||||||
if requestID == "" {
|
if requestID == "" {
|
||||||
requestID = "unknown"
|
requestID = "unknown"
|
||||||
}
|
}
|
||||||
userID := req.UserID
|
userID := req.UserId
|
||||||
if userID == "" {
|
if userID == "" {
|
||||||
userID = "unknown"
|
userID = "unknown"
|
||||||
}
|
}
|
||||||
enableRAG := req.EnableRAG
|
enableRAG := req.EnableRag
|
||||||
if !enableRAG && req.Premium {
|
if !enableRAG && req.Premium {
|
||||||
enableRAG = true
|
enableRAG = true
|
||||||
}
|
}
|
||||||
@@ -71,13 +72,13 @@ func main() {
|
|||||||
}
|
}
|
||||||
topK := req.TopK
|
topK := req.TopK
|
||||||
if topK == 0 {
|
if topK == 0 {
|
||||||
topK = ragTopK
|
topK = int32(ragTopK)
|
||||||
}
|
}
|
||||||
collection := req.Collection
|
collection := req.Collection
|
||||||
if collection == "" {
|
if collection == "" {
|
||||||
collection = ragCollection
|
collection = ragCollection
|
||||||
}
|
}
|
||||||
reqEnableTTS := req.EnableTTS || enableTTS
|
reqEnableTTS := req.EnableTts || enableTTS
|
||||||
systemPrompt := req.SystemPrompt
|
systemPrompt := req.SystemPrompt
|
||||||
responseSubject := req.ResponseSubject
|
responseSubject := req.ResponseSubject
|
||||||
if responseSubject == "" {
|
if responseSubject == "" {
|
||||||
@@ -157,44 +158,38 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Generate LLM response
|
// 5. Generate LLM response (streaming when requested)
|
||||||
responseText, err := llm.Generate(ctx, query, contextText, systemPrompt)
|
var responseText string
|
||||||
|
var err error
|
||||||
|
if req.EnableStreaming {
|
||||||
|
streamSubject := fmt.Sprintf("ai.chat.response.stream.%s", requestID)
|
||||||
|
responseText, err = llm.StreamGenerate(ctx, query, contextText, systemPrompt, func(token string) {
|
||||||
|
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
|
||||||
|
RequestId: requestID,
|
||||||
|
Type: "chunk",
|
||||||
|
Content: token,
|
||||||
|
Timestamp: messages.Timestamp(),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
|
||||||
|
RequestId: requestID,
|
||||||
|
Type: "done",
|
||||||
|
Done: true,
|
||||||
|
Timestamp: messages.Timestamp(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
responseText, err = llm.Generate(ctx, query, contextText, systemPrompt)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("LLM generation failed", "error", err)
|
slog.Error("LLM generation failed", "error", err)
|
||||||
return &messages.ChatResponse{
|
return &messages.ChatResponse{
|
||||||
UserID: userID,
|
UserId: userID,
|
||||||
Success: false,
|
Success: false,
|
||||||
Error: err.Error(),
|
Error: err.Error(),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Stream chunks if requested
|
// 6. Optional TTS — audio as raw bytes (no base64)
|
||||||
if req.EnableStreaming {
|
|
||||||
streamSubject := fmt.Sprintf("ai.chat.response.stream.%s", requestID)
|
|
||||||
words := strings.Fields(responseText)
|
|
||||||
chunkSize := 4
|
|
||||||
for i := 0; i < len(words); i += chunkSize {
|
|
||||||
end := i + chunkSize
|
|
||||||
if end > len(words) {
|
|
||||||
end = len(words)
|
|
||||||
}
|
|
||||||
chunk := strings.Join(words[i:end], " ")
|
|
||||||
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
|
|
||||||
RequestID: requestID,
|
|
||||||
Type: "chunk",
|
|
||||||
Content: chunk,
|
|
||||||
Timestamp: messages.Timestamp(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
|
|
||||||
RequestID: requestID,
|
|
||||||
Type: "done",
|
|
||||||
Done: true,
|
|
||||||
Timestamp: messages.Timestamp(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// 7. Optional TTS — audio as raw bytes (no base64)
|
|
||||||
var audio []byte
|
var audio []byte
|
||||||
if reqEnableTTS && tts != nil {
|
if reqEnableTTS && tts != nil {
|
||||||
audioBytes, err := tts.Synthesize(ctx, responseText, ttsLanguage, "")
|
audioBytes, err := tts.Synthesize(ctx, responseText, ttsLanguage, "")
|
||||||
@@ -206,15 +201,15 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
result := &messages.ChatResponse{
|
result := &messages.ChatResponse{
|
||||||
UserID: userID,
|
UserId: userID,
|
||||||
Response: responseText,
|
Response: responseText,
|
||||||
ResponseText: responseText,
|
ResponseText: responseText,
|
||||||
UsedRAG: usedRAG,
|
UsedRag: usedRAG,
|
||||||
Success: true,
|
Success: true,
|
||||||
Audio: audio,
|
Audio: audio,
|
||||||
}
|
}
|
||||||
if includeSources {
|
if includeSources {
|
||||||
result.RAGSources = ragSources
|
result.RagSources = ragSources
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish to the response subject the frontend is waiting on
|
// Publish to the response subject the frontend is waiting on
|
||||||
|
|||||||
43
main_test.go
43
main_test.go
@@ -5,28 +5,31 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
||||||
"github.com/vmihailenco/msgpack/v5"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestChatRequestDecode(t *testing.T) {
|
func TestChatRequestDecode(t *testing.T) {
|
||||||
// Verify a msgpack-encoded map decodes cleanly into typed struct.
|
// Verify a proto-encoded struct round-trips cleanly.
|
||||||
raw := map[string]any{
|
original := &messages.ChatRequest{
|
||||||
"request_id": "req-1",
|
RequestId: "req-1",
|
||||||
"user_id": "user-1",
|
UserId: "user-1",
|
||||||
"message": "hello",
|
Message: "hello",
|
||||||
"premium": true,
|
Premium: true,
|
||||||
"top_k": 10,
|
TopK: 10,
|
||||||
}
|
}
|
||||||
data, _ := msgpack.Marshal(raw)
|
data, err := proto.Marshal(original)
|
||||||
var req messages.ChatRequest
|
if err != nil {
|
||||||
if err := msgpack.Unmarshal(data, &req); err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if req.RequestID != "req-1" {
|
var req messages.ChatRequest
|
||||||
t.Errorf("RequestID = %q", req.RequestID)
|
if err := proto.Unmarshal(data, &req); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if req.EffectiveQuery() != "hello" {
|
if req.RequestId != "req-1" {
|
||||||
t.Errorf("EffectiveQuery = %q", req.EffectiveQuery())
|
t.Errorf("RequestID = %q", req.RequestId)
|
||||||
|
}
|
||||||
|
if messages.EffectiveQuery(&req) != "hello" {
|
||||||
|
t.Errorf("EffectiveQuery = %q", messages.EffectiveQuery(&req))
|
||||||
}
|
}
|
||||||
if !req.Premium {
|
if !req.Premium {
|
||||||
t.Error("Premium should be true")
|
t.Error("Premium should be true")
|
||||||
@@ -38,21 +41,21 @@ func TestChatRequestDecode(t *testing.T) {
|
|||||||
|
|
||||||
func TestChatResponseRoundtrip(t *testing.T) {
|
func TestChatResponseRoundtrip(t *testing.T) {
|
||||||
resp := &messages.ChatResponse{
|
resp := &messages.ChatResponse{
|
||||||
UserID: "user-1",
|
UserId: "user-1",
|
||||||
Response: "answer",
|
Response: "answer",
|
||||||
Success: true,
|
Success: true,
|
||||||
Audio: []byte{0x01, 0x02, 0x03},
|
Audio: []byte{0x01, 0x02, 0x03},
|
||||||
}
|
}
|
||||||
data, err := msgpack.Marshal(resp)
|
data, err := proto.Marshal(resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
var decoded messages.ChatResponse
|
var decoded messages.ChatResponse
|
||||||
if err := msgpack.Unmarshal(data, &decoded); err != nil {
|
if err := proto.Unmarshal(data, &decoded); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if decoded.UserID != "user-1" || !decoded.Success {
|
if decoded.UserId != "user-1" || !decoded.Success {
|
||||||
t.Errorf("decoded = %+v", decoded)
|
t.Errorf("decoded: UserId=%s, Success=%v", decoded.UserId, decoded.Success)
|
||||||
}
|
}
|
||||||
if len(decoded.Audio) != 3 {
|
if len(decoded.Audio) != 3 {
|
||||||
t.Errorf("audio len = %d", len(decoded.Audio))
|
t.Errorf("audio len = %d", len(decoded.Audio))
|
||||||
|
|||||||
Reference in New Issue
Block a user