2 Commits

Author SHA1 Message Date
87d0545d2c feat: replace fake streaming with real SSE StreamGenerate
Some checks failed
CI / Lint (push) Successful in 3m0s
CI / Test (push) Successful in 3m23s
CI / Docker Build & Push (push) Failing after 4m55s
CI / Release (push) Successful in 1m4s
CI / Notify (push) Successful in 1s
Use handler-base StreamGenerate() to publish real token-by-token
ChatStreamChunk messages to NATS as they arrive from Ray Serve,
instead of calling Generate() and splitting into 4-word chunks.

Add 8 streaming tests: happy path, system prompt, RAG context,
nil callback, timeout, HTTP error, context canceled, fallback.
2026-02-21 09:23:57 -05:00
7678d911fe chore: bump handler-base to v0.1.5, add netrc secret mount to Dockerfile
Some checks failed
CI / Release (push) Successful in 1m11s
CI / Lint (push) Successful in 3m55s
CI / Test (push) Successful in 3m36s
CI / Docker Build & Push (push) Failing after 8m10s
CI / Notify (push) Successful in 1s
2026-02-20 18:15:47 -05:00
5 changed files with 267 additions and 34 deletions

View File

@@ -3,10 +3,13 @@ FROM golang:1.25-alpine AS builder
WORKDIR /app WORKDIR /app
RUN apk add --no-cache ca-certificates RUN apk add --no-cache ca-certificates git
ENV GOPRIVATE=git.daviestechlabs.io
ENV GONOSUMCHECK=git.daviestechlabs.io
COPY go.mod go.sum ./ COPY go.mod go.sum ./
RUN go mod download RUN --mount=type=secret,id=netrc,target=/root/.netrc go mod download
COPY . . COPY . .

View File

@@ -3,8 +3,11 @@ package main
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings"
"sync"
"testing" "testing"
"time" "time"
@@ -198,6 +201,220 @@ func TestChatPipeline_TypedDecoding(t *testing.T) {
} }
} }
// ────────────────────────────────────────────────────────────────────────────
// Streaming tests: exercise StreamGenerate path (the real SSE pipeline)
// ────────────────────────────────────────────────────────────────────────────
// sseChunk builds an OpenAI-compatible SSE data line.
func sseChunk(content string) string {
return fmt.Sprintf("data: {\"choices\":[{\"delta\":{\"content\":%q}}]}\n\n", content)
}
// newStreamingLLM creates a mock LLM server that responds with SSE-streamed tokens.
func newStreamingLLM(t *testing.T, tokens []string) *httptest.Server {
t.Helper()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var req map[string]any
_ = json.NewDecoder(r.Body).Decode(&req)
// Verify stream=true was requested.
if stream, ok := req["stream"].(bool); !ok || !stream {
t.Error("expected stream=true in request body")
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
flusher, _ := w.(http.Flusher)
// Role-only chunk (should be skipped by StreamGenerate)
_, _ = fmt.Fprintf(w, "data: {\"choices\":[{\"delta\":{\"role\":\"assistant\"}}]}\n\n")
if flusher != nil {
flusher.Flush()
}
for _, tok := range tokens {
_, _ = fmt.Fprint(w, sseChunk(tok))
if flusher != nil {
flusher.Flush()
}
}
_, _ = fmt.Fprintf(w, "data: [DONE]\n\n")
if flusher != nil {
flusher.Flush()
}
}))
t.Cleanup(srv.Close)
return srv
}
func TestChatPipeline_StreamGenerate(t *testing.T) {
tokens := []string{"Paris", " is", " the", " capital", " of", " France", "."}
srv := newStreamingLLM(t, tokens)
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
var mu sync.Mutex
var received []string
full, err := llm.StreamGenerate(context.Background(), "capital of France?", "", "", func(token string) {
mu.Lock()
defer mu.Unlock()
received = append(received, token)
})
if err != nil {
t.Fatal(err)
}
if full != "Paris is the capital of France." {
t.Errorf("full = %q", full)
}
if len(received) != len(tokens) {
t.Errorf("callback count = %d, want %d", len(received), len(tokens))
}
for i, tok := range tokens {
if received[i] != tok {
t.Errorf("token[%d] = %q, want %q", i, received[i], tok)
}
}
}
func TestChatPipeline_StreamWithSystemPrompt(t *testing.T) {
srv := newStreamingLLM(t, []string{"Hello", "!"})
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
full, err := llm.StreamGenerate(context.Background(), "greet me", "", "You are a friendly assistant.", func(token string) {})
if err != nil {
t.Fatal(err)
}
if full != "Hello!" {
t.Errorf("full = %q", full)
}
}
func TestChatPipeline_StreamWithRAGContext(t *testing.T) {
m := newMockBackends(t)
srv := newStreamingLLM(t, []string{"The", " answer", " is", " 42"})
embeddings := clients.NewEmbeddingsClient(m.Embeddings.URL, 5*time.Second, "bge")
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
ctx := context.Background()
// 1. Embed
embedding, err := embeddings.EmbedSingle(ctx, "deep thought")
if err != nil {
t.Fatal(err)
}
if len(embedding) == 0 {
t.Fatal("empty embedding")
}
// 2. Stream with context
var tokens []string
full, err := llm.StreamGenerate(ctx, "deep thought", "The answer to everything is 42.", "", func(tok string) {
tokens = append(tokens, tok)
})
if err != nil {
t.Fatal(err)
}
if full != "The answer is 42" {
t.Errorf("full = %q", full)
}
if len(tokens) != 4 {
t.Errorf("token count = %d, want 4", len(tokens))
}
}
func TestChatPipeline_StreamNilCallback(t *testing.T) {
srv := newStreamingLLM(t, []string{"ok"})
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
full, err := llm.StreamGenerate(context.Background(), "test", "", "", nil)
if err != nil {
t.Fatal(err)
}
if full != "ok" {
t.Errorf("full = %q", full)
}
}
func TestChatPipeline_StreamTimeout(t *testing.T) {
slow := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(200 * time.Millisecond)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = fmt.Fprint(w, sseChunk("late"))
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
}))
defer slow.Close()
llm := clients.NewLLMClient(slow.URL, 100*time.Millisecond)
_, err := llm.StreamGenerate(context.Background(), "hello", "", "", nil)
if err == nil {
t.Error("expected timeout error")
}
}
func TestChatPipeline_StreamHTTPError(t *testing.T) {
errSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("internal error"))
}))
defer errSrv.Close()
llm := clients.NewLLMClient(errSrv.URL, 5*time.Second)
_, err := llm.StreamGenerate(context.Background(), "hello", "", "", nil)
if err == nil {
t.Error("expected error for HTTP 500")
}
if !strings.Contains(err.Error(), "500") {
t.Errorf("error = %q, should mention status 500", err)
}
}
func TestChatPipeline_StreamContextCanceled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
srv := newStreamingLLM(t, []string{"should", "not", "arrive"})
llm := clients.NewLLMClient(srv.URL, 5*time.Second)
_, err := llm.StreamGenerate(ctx, "hello", "", "", nil)
if err == nil {
t.Error("expected context canceled error")
}
}
func TestChatPipeline_StreamFallbackToNonStreaming(t *testing.T) {
// Simulate the branching in main.go: non-streaming uses Generate(),
// streaming uses StreamGenerate(). Verify both paths work from same mock.
m := newMockBackends(t)
streamSrv := newStreamingLLM(t, []string{"streamed", " answer"})
nonStreamLLM := clients.NewLLMClient(m.LLM.URL, 5*time.Second)
streamLLM := clients.NewLLMClient(streamSrv.URL, 5*time.Second)
ctx := context.Background()
// Non-streaming path
resp1, err := nonStreamLLM.Generate(ctx, "hello", "", "")
if err != nil {
t.Fatal(err)
}
if resp1 != "Paris is the capital of France." {
t.Errorf("non-stream = %q", resp1)
}
// Streaming path
var tokens []string
resp2, err := streamLLM.StreamGenerate(ctx, "hello", "", "", func(tok string) {
tokens = append(tokens, tok)
})
if err != nil {
t.Fatal(err)
}
if resp2 != "streamed answer" {
t.Errorf("stream = %q", resp2)
}
if len(tokens) != 2 {
t.Errorf("token count = %d", len(tokens))
}
}
// ──────────────────────────────────────────────────────────────────────────── // ────────────────────────────────────────────────────────────────────────────
// Benchmark: full chat pipeline overhead (mock backends) // Benchmark: full chat pipeline overhead (mock backends)
// ──────────────────────────────────────────────────────────────────────────── // ────────────────────────────────────────────────────────────────────────────
@@ -245,3 +462,23 @@ func BenchmarkChatPipeline_RAGFlow(b *testing.B) {
_, _ = llm.Generate(ctx, "question", "context", "") _, _ = llm.Generate(ctx, "question", "context", "")
} }
} }
func BenchmarkChatPipeline_StreamGenerate(b *testing.B) {
tokens := []string{"one", " two", " three", " four", " five"}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
for _, tok := range tokens {
_, _ = fmt.Fprintf(w, "data: {\"choices\":[{\"delta\":{\"content\":%q}}]}\n\n", tok)
}
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
}))
defer srv.Close()
llm := clients.NewLLMClient(srv.URL, 10*time.Second)
ctx := context.Background()
b.ResetTimer()
for b.Loop() {
_, _ = llm.StreamGenerate(ctx, "question", "", "", func(string) {})
}
}

2
go.mod
View File

@@ -3,7 +3,7 @@ module git.daviestechlabs.io/daviestechlabs/chat-handler
go 1.25.1 go 1.25.1
require ( require (
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3 git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5
github.com/nats-io/nats.go v1.48.0 github.com/nats-io/nats.go v1.48.0
github.com/vmihailenco/msgpack/v5 v5.4.1 github.com/vmihailenco/msgpack/v5 v5.4.1
) )

4
go.sum
View File

@@ -1,5 +1,5 @@
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3 h1:uYog8B839ulqrWoht3qqCvT7CnR3e2skpaLZc2Pg3GI= git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5 h1:DqYZpeluTXh5QKqdVFgN8YIMh4Ycqzw5E9+5FTNDFCA=
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3/go.mod h1:M3HgvUDWnRn7cX3BE8l+HvoCUYtmRr5OoumB+hnRHoE= git.daviestechlabs.io/daviestechlabs/handler-base v0.1.5/go.mod h1:M3HgvUDWnRn7cX3BE8l+HvoCUYtmRr5OoumB+hnRHoE=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=

51
main.go
View File

@@ -157,8 +157,27 @@ func main() {
} }
} }
// 5. Generate LLM response // 5. Generate LLM response (streaming when requested)
responseText, err := llm.Generate(ctx, query, contextText, systemPrompt) var responseText string
if req.EnableStreaming {
streamSubject := fmt.Sprintf("ai.chat.response.stream.%s", requestID)
responseText, err = llm.StreamGenerate(ctx, query, contextText, systemPrompt, func(token string) {
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
RequestID: requestID,
Type: "chunk",
Content: token,
Timestamp: messages.Timestamp(),
})
})
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
RequestID: requestID,
Type: "done",
Done: true,
Timestamp: messages.Timestamp(),
})
} else {
responseText, err = llm.Generate(ctx, query, contextText, systemPrompt)
}
if err != nil { if err != nil {
slog.Error("LLM generation failed", "error", err) slog.Error("LLM generation failed", "error", err)
return &messages.ChatResponse{ return &messages.ChatResponse{
@@ -168,33 +187,7 @@ func main() {
}, nil }, nil
} }
// 6. Stream chunks if requested // 6. Optional TTS — audio as raw bytes (no base64)
if req.EnableStreaming {
streamSubject := fmt.Sprintf("ai.chat.response.stream.%s", requestID)
words := strings.Fields(responseText)
chunkSize := 4
for i := 0; i < len(words); i += chunkSize {
end := i + chunkSize
if end > len(words) {
end = len(words)
}
chunk := strings.Join(words[i:end], " ")
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
RequestID: requestID,
Type: "chunk",
Content: chunk,
Timestamp: messages.Timestamp(),
})
}
_ = h.NATS.Publish(streamSubject, &messages.ChatStreamChunk{
RequestID: requestID,
Type: "done",
Done: true,
Timestamp: messages.Timestamp(),
})
}
// 7. Optional TTS — audio as raw bytes (no base64)
var audio []byte var audio []byte
if reqEnableTTS && tts != nil { if reqEnableTTS && tts != nil {
audioBytes, err := tts.Synthesize(ctx, responseText, ttsLanguage, "") audioBytes, err := tts.Synthesize(ctx, responseText, ttsLanguage, "")