Compare commits
6 Commits
81581337cd
...
v0.1.5
| Author | SHA1 | Date | |
|---|---|---|---|
| 3585d81ff5 | |||
| fba7b62573 | |||
| 6fd0b9a265 | |||
| 8b6232141a | |||
| 9876cb9388 | |||
| 39673d31b8 |
@@ -17,20 +17,22 @@ jobs:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up uv
|
||||
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version-file: go.mod
|
||||
cache: true
|
||||
|
||||
- name: Set up Python
|
||||
run: uv python install 3.13
|
||||
- name: Run go vet
|
||||
run: go vet ./...
|
||||
|
||||
- name: Install dependencies
|
||||
run: uv sync --frozen --extra dev
|
||||
- name: Install golangci-lint
|
||||
run: |
|
||||
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b "$(go env GOPATH)/bin"
|
||||
echo "$(go env GOPATH)/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Run ruff check
|
||||
run: uv run ruff check .
|
||||
|
||||
- name: Run ruff format check
|
||||
run: uv run ruff format --check .
|
||||
- name: Run golangci-lint
|
||||
run: golangci-lint run ./...
|
||||
|
||||
test:
|
||||
name: Test
|
||||
@@ -39,23 +41,28 @@ jobs:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up uv
|
||||
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version-file: go.mod
|
||||
cache: true
|
||||
|
||||
- name: Set up Python
|
||||
run: uv python install 3.13
|
||||
- name: Verify dependencies
|
||||
run: go mod verify
|
||||
|
||||
- name: Install dependencies
|
||||
run: uv sync --frozen --extra dev
|
||||
- name: Build
|
||||
run: go build -v ./...
|
||||
|
||||
- name: Run tests with coverage
|
||||
run: uv run pytest --cov=handler_base --cov-report=xml --cov-report=term
|
||||
- name: Run tests
|
||||
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./...
|
||||
|
||||
release:
|
||||
name: Release
|
||||
runs-on: ubuntu-latest
|
||||
needs: [lint, test]
|
||||
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
|
||||
outputs:
|
||||
version: ${{ steps.version.outputs.version }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
@@ -95,10 +102,32 @@ jobs:
|
||||
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
|
||||
git push origin ${{ steps.version.outputs.version }}
|
||||
|
||||
notify-downstream:
|
||||
name: Notify Downstream
|
||||
runs-on: ubuntu-latest
|
||||
needs: [release]
|
||||
if: needs.release.result == 'success'
|
||||
strategy:
|
||||
matrix:
|
||||
repo:
|
||||
- chat-handler
|
||||
- pipeline-bridge
|
||||
- tts-module
|
||||
- voice-assistant
|
||||
- stt-module
|
||||
steps:
|
||||
- name: Trigger dependency update
|
||||
run: |
|
||||
curl -s -X POST \
|
||||
-H "Authorization: token ${{ secrets.DISPATCH_TOKEN }}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"event_type":"handler-base-release","client_payload":{"version":"${{ needs.release.outputs.version }}"}}' \
|
||||
"${{ gitea.server_url }}/api/v1/repos/daviestechlabs/${{ matrix.repo }}/dispatches"
|
||||
|
||||
notify:
|
||||
name: Notify
|
||||
runs-on: ubuntu-latest
|
||||
needs: [lint, test, release]
|
||||
needs: [lint, test, release, notify-downstream]
|
||||
if: always()
|
||||
steps:
|
||||
- name: Notify on success
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
package clients
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
@@ -14,6 +15,7 @@ import (
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@@ -124,7 +126,7 @@ resp, err := h.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("http %s %s: %w", req.Method, req.URL.Path, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
buf := getBuf()
|
||||
defer putBuf(buf)
|
||||
@@ -142,6 +144,36 @@ return nil, fmt.Errorf("http %d: %s", resp.StatusCode, string(body))
|
||||
return body, nil
|
||||
}
|
||||
|
||||
// postJSONStream sends a JSON POST and returns the raw *http.Response so the
|
||||
// caller can read the body incrementally (e.g. for SSE streaming). The caller
|
||||
// is responsible for closing resp.Body.
|
||||
func (h *httpClient) postJSONStream(ctx context.Context, path string, body any) (*http.Response, error) {
|
||||
buf := getBuf()
|
||||
defer putBuf(buf)
|
||||
if err := json.NewEncoder(buf).Encode(body); err != nil {
|
||||
return nil, fmt.Errorf("marshal: %w", err)
|
||||
}
|
||||
// Copy to a non-pooled buffer so we can safely return the pool buffer.
|
||||
payload := make([]byte, buf.Len())
|
||||
copy(payload, buf.Bytes())
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.baseURL+path, bytes.NewReader(payload))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, err := h.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("http %s %s: %w", req.Method, req.URL.Path, err)
|
||||
}
|
||||
if resp.StatusCode >= 400 {
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
return nil, fmt.Errorf("http %d: %s", resp.StatusCode, string(respBody))
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (h *httpClient) healthCheck(ctx context.Context) bool {
|
||||
data, err := h.get(ctx, "/health", nil)
|
||||
_ = data
|
||||
@@ -320,6 +352,73 @@ return "", fmt.Errorf("no choices in LLM response")
|
||||
return resp.Choices[0].Message.Content, nil
|
||||
}
|
||||
|
||||
// StreamGenerate sends a streaming chat completion request and calls onToken
|
||||
// for each content delta received via SSE. Returns the fully assembled text.
|
||||
// The onToken callback is invoked synchronously on the calling goroutine; it
|
||||
// should be fast (e.g. publish a NATS message).
|
||||
func (c *LLMClient) StreamGenerate(ctx context.Context, prompt string, context_ string, systemPrompt string, onToken func(token string)) (string, error) {
|
||||
msgs := buildMessages(prompt, context_, systemPrompt)
|
||||
payload := map[string]any{
|
||||
"model": c.Model,
|
||||
"messages": msgs,
|
||||
"max_tokens": c.MaxTokens,
|
||||
"temperature": c.Temperature,
|
||||
"top_p": c.TopP,
|
||||
"stream": true,
|
||||
}
|
||||
|
||||
resp, err := c.postJSONStream(ctx, "/v1/chat/completions", payload)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
var full strings.Builder
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
// SSE lines can be up to 64 KiB for large token batches.
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 64*1024)
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if !strings.HasPrefix(line, "data: ") {
|
||||
continue
|
||||
}
|
||||
data := strings.TrimPrefix(line, "data: ")
|
||||
if data == "[DONE]" {
|
||||
break
|
||||
}
|
||||
var chunk struct {
|
||||
Choices []struct {
|
||||
Delta struct {
|
||||
Content string `json:"content"`
|
||||
} `json:"delta"`
|
||||
} `json:"choices"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(data), &chunk); err != nil {
|
||||
continue // skip malformed chunks
|
||||
}
|
||||
if len(chunk.Choices) == 0 {
|
||||
continue
|
||||
}
|
||||
token := chunk.Choices[0].Delta.Content
|
||||
if token == "" {
|
||||
continue
|
||||
}
|
||||
full.WriteString(token)
|
||||
if onToken != nil {
|
||||
onToken(token)
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
// If we already collected some text, return it with the error.
|
||||
if full.Len() > 0 {
|
||||
return full.String(), fmt.Errorf("stream interrupted: %w", err)
|
||||
}
|
||||
return "", fmt.Errorf("stream read: %w", err)
|
||||
}
|
||||
return full.String(), nil
|
||||
}
|
||||
|
||||
func buildMessages(prompt, ctx, systemPrompt string) []ChatMessage {
|
||||
var msgs []ChatMessage
|
||||
if systemPrompt != "" {
|
||||
@@ -420,7 +519,6 @@ type MilvusClient struct {
|
||||
Host string
|
||||
Port int
|
||||
Collection string
|
||||
connected bool
|
||||
}
|
||||
|
||||
// NewMilvusClient creates a Milvus client.
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -90,14 +91,14 @@ func TestEmbeddingsClient_Embed(t *testing.T) {
|
||||
t.Errorf("method = %s, want POST", r.Method)
|
||||
}
|
||||
var req map[string]any
|
||||
json.NewDecoder(r.Body).Decode(&req)
|
||||
_ = json.NewDecoder(r.Body).Decode(&req)
|
||||
input, _ := req["input"].([]any)
|
||||
if len(input) != 2 {
|
||||
t.Errorf("input len = %d, want 2", len(input))
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"data": []map[string]any{
|
||||
{"embedding": []float64{0.1, 0.2, 0.3}},
|
||||
{"embedding": []float64{0.4, 0.5, 0.6}},
|
||||
@@ -121,7 +122,7 @@ func TestEmbeddingsClient_Embed(t *testing.T) {
|
||||
|
||||
func TestEmbeddingsClient_EmbedSingle(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"data": []map[string]any{
|
||||
{"embedding": []float64{1.0, 2.0}},
|
||||
},
|
||||
@@ -141,7 +142,7 @@ func TestEmbeddingsClient_EmbedSingle(t *testing.T) {
|
||||
|
||||
func TestEmbeddingsClient_EmbedEmpty(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(map[string]any{"data": []any{}})
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{"data": []any{}})
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -175,11 +176,11 @@ func TestEmbeddingsClient_Health(t *testing.T) {
|
||||
func TestRerankerClient_Rerank(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var req map[string]any
|
||||
json.NewDecoder(r.Body).Decode(&req)
|
||||
_ = json.NewDecoder(r.Body).Decode(&req)
|
||||
if req["query"] != "test query" {
|
||||
t.Errorf("query = %v", req["query"])
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"results": []map[string]any{
|
||||
{"index": 1, "relevance_score": 0.95},
|
||||
{"index": 0, "relevance_score": 0.80},
|
||||
@@ -207,7 +208,7 @@ func TestRerankerClient_Rerank(t *testing.T) {
|
||||
|
||||
func TestRerankerClient_RerankFallbackScore(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"results": []map[string]any{
|
||||
{"index": 0, "score": 0.77, "relevance_score": 0}, // some APIs only set score
|
||||
},
|
||||
@@ -235,13 +236,13 @@ func TestLLMClient_Generate(t *testing.T) {
|
||||
t.Errorf("path = %q", r.URL.Path)
|
||||
}
|
||||
var req map[string]any
|
||||
json.NewDecoder(r.Body).Decode(&req)
|
||||
_ = json.NewDecoder(r.Body).Decode(&req)
|
||||
msgs, _ := req["messages"].([]any)
|
||||
if len(msgs) == 0 {
|
||||
t.Error("no messages in request")
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"choices": []map[string]any{
|
||||
{"message": map[string]any{"content": "Paris is the capital of France."}},
|
||||
},
|
||||
@@ -262,13 +263,13 @@ func TestLLMClient_Generate(t *testing.T) {
|
||||
func TestLLMClient_GenerateWithContext(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var req map[string]any
|
||||
json.NewDecoder(r.Body).Decode(&req)
|
||||
_ = json.NewDecoder(r.Body).Decode(&req)
|
||||
msgs, _ := req["messages"].([]any)
|
||||
// Should have system + user message
|
||||
if len(msgs) != 2 {
|
||||
t.Errorf("expected 2 messages, got %d", len(msgs))
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"choices": []map[string]any{
|
||||
{"message": map[string]any{"content": "answer with context"}},
|
||||
},
|
||||
@@ -288,7 +289,7 @@ func TestLLMClient_GenerateWithContext(t *testing.T) {
|
||||
|
||||
func TestLLMClient_GenerateNoChoices(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(map[string]any{"choices": []any{}})
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{"choices": []any{}})
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -299,6 +300,293 @@ func TestLLMClient_GenerateNoChoices(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// LLM client — StreamGenerate
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// sseChunk builds an OpenAI-compatible SSE chat.completion.chunk line.
|
||||
func sseChunk(content string) string {
|
||||
chunk := map[string]any{
|
||||
"choices": []map[string]any{
|
||||
{"delta": map[string]any{"content": content}},
|
||||
},
|
||||
}
|
||||
b, _ := json.Marshal(chunk)
|
||||
return "data: " + string(b) + "\n\n"
|
||||
}
|
||||
|
||||
func TestLLMClient_StreamGenerate(t *testing.T) {
|
||||
tokens := []string{"Hello", " world", "!"}
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/chat/completions" {
|
||||
t.Errorf("path = %q", r.URL.Path)
|
||||
}
|
||||
var req map[string]any
|
||||
_ = json.NewDecoder(r.Body).Decode(&req)
|
||||
if req["stream"] != true {
|
||||
t.Errorf("stream = %v, want true", req["stream"])
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
flusher, _ := w.(http.Flusher)
|
||||
for _, tok := range tokens {
|
||||
_, _ = w.Write([]byte(sseChunk(tok)))
|
||||
flusher.Flush()
|
||||
}
|
||||
_, _ = w.Write([]byte("data: [DONE]\n\n"))
|
||||
flusher.Flush()
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
c := NewLLMClient(ts.URL, 5*time.Second)
|
||||
var received []string
|
||||
result, err := c.StreamGenerate(context.Background(), "hi", "", "", func(tok string) {
|
||||
received = append(received, tok)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if result != "Hello world!" {
|
||||
t.Errorf("result = %q, want %q", result, "Hello world!")
|
||||
}
|
||||
if len(received) != 3 {
|
||||
t.Fatalf("callback count = %d, want 3", len(received))
|
||||
}
|
||||
if received[0] != "Hello" || received[1] != " world" || received[2] != "!" {
|
||||
t.Errorf("received = %v", received)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLLMClient_StreamGenerateWithSystemPrompt(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var req map[string]any
|
||||
_ = json.NewDecoder(r.Body).Decode(&req)
|
||||
msgs, _ := req["messages"].([]any)
|
||||
if len(msgs) != 2 {
|
||||
t.Errorf("expected system+user, got %d messages", len(msgs))
|
||||
}
|
||||
first, _ := msgs[0].(map[string]any)
|
||||
if first["role"] != "system" || first["content"] != "You are a DM" {
|
||||
t.Errorf("system msg = %v", first)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
flusher, _ := w.(http.Flusher)
|
||||
_, _ = w.Write([]byte(sseChunk("ok")))
|
||||
_, _ = w.Write([]byte("data: [DONE]\n\n"))
|
||||
flusher.Flush()
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
c := NewLLMClient(ts.URL, 5*time.Second)
|
||||
result, err := c.StreamGenerate(context.Background(), "roll dice", "", "You are a DM", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if result != "ok" {
|
||||
t.Errorf("result = %q", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLLMClient_StreamGenerateNilCallback(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
flusher, _ := w.(http.Flusher)
|
||||
_, _ = w.Write([]byte(sseChunk("token")))
|
||||
_, _ = w.Write([]byte("data: [DONE]\n\n"))
|
||||
flusher.Flush()
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
c := NewLLMClient(ts.URL, 5*time.Second)
|
||||
// nil callback should not panic
|
||||
result, err := c.StreamGenerate(context.Background(), "hi", "", "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if result != "token" {
|
||||
t.Errorf("result = %q", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLLMClient_StreamGenerateEmptyDelta(t *testing.T) {
|
||||
// SSE chunks with empty content should be silently skipped.
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
flusher, _ := w.(http.Flusher)
|
||||
// role-only chunk (no content) — common for first chunk from vLLM
|
||||
_, _ = w.Write([]byte("data: {\"choices\":[{\"delta\":{\"role\":\"assistant\"}}]}\n\n"))
|
||||
// empty content string
|
||||
_, _ = w.Write([]byte(sseChunk("")))
|
||||
// real token
|
||||
_, _ = w.Write([]byte(sseChunk("hello")))
|
||||
_, _ = w.Write([]byte("data: [DONE]\n\n"))
|
||||
flusher.Flush()
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
c := NewLLMClient(ts.URL, 5*time.Second)
|
||||
var count int
|
||||
result, err := c.StreamGenerate(context.Background(), "q", "", "", func(tok string) {
|
||||
count++
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if result != "hello" {
|
||||
t.Errorf("result = %q", result)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Errorf("callback count = %d, want 1 (empty deltas should be skipped)", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLLMClient_StreamGenerateMalformedChunks(t *testing.T) {
|
||||
// Malformed JSON should be skipped without error.
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
flusher, _ := w.(http.Flusher)
|
||||
_, _ = w.Write([]byte("data: {invalid json}\n\n"))
|
||||
_, _ = w.Write([]byte("data: {\"choices\":[]}\n\n")) // empty choices
|
||||
_, _ = w.Write([]byte(sseChunk("good")))
|
||||
_, _ = w.Write([]byte("data: [DONE]\n\n"))
|
||||
flusher.Flush()
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
c := NewLLMClient(ts.URL, 5*time.Second)
|
||||
result, err := c.StreamGenerate(context.Background(), "q", "", "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if result != "good" {
|
||||
t.Errorf("result = %q, want %q", result, "good")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLLMClient_StreamGenerateHTTPError(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(500)
|
||||
_, _ = w.Write([]byte("internal server error"))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
c := NewLLMClient(ts.URL, 5*time.Second)
|
||||
_, err := c.StreamGenerate(context.Background(), "q", "", "", nil)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for 500")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "500") {
|
||||
t.Errorf("error should contain 500: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLLMClient_StreamGenerateContextCanceled(t *testing.T) {
|
||||
started := make(chan struct{})
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
flusher, _ := w.(http.Flusher)
|
||||
// Send several tokens so the client receives some before cancel.
|
||||
for i := range 20 {
|
||||
_, _ = w.Write([]byte(sseChunk(fmt.Sprintf("tok%d ", i))))
|
||||
flusher.Flush()
|
||||
}
|
||||
close(started)
|
||||
// Block until client cancels
|
||||
<-r.Context().Done()
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c := NewLLMClient(ts.URL, 10*time.Second)
|
||||
|
||||
var streamErr error
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
_, streamErr = c.StreamGenerate(ctx, "q", "", "", nil)
|
||||
}()
|
||||
|
||||
<-started
|
||||
cancel()
|
||||
<-done
|
||||
|
||||
// After cancel the stream should return an error (context canceled or
|
||||
// stream interrupted). The exact partial text depends on timing.
|
||||
if streamErr == nil {
|
||||
t.Error("expected error after context cancel")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLLMClient_StreamGenerateNoSSEPrefix(t *testing.T) {
|
||||
// Lines without "data: " prefix should be silently ignored (comments, blank lines, event IDs).
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
flusher, _ := w.(http.Flusher)
|
||||
_, _ = w.Write([]byte(": this is an SSE comment\n\n"))
|
||||
_, _ = w.Write([]byte("event: message\n"))
|
||||
_, _ = w.Write([]byte(sseChunk("word")))
|
||||
_, _ = w.Write([]byte("\n")) // blank line
|
||||
_, _ = w.Write([]byte("data: [DONE]\n\n"))
|
||||
flusher.Flush()
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
c := NewLLMClient(ts.URL, 5*time.Second)
|
||||
result, err := c.StreamGenerate(context.Background(), "q", "", "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if result != "word" {
|
||||
t.Errorf("result = %q, want %q", result, "word")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLLMClient_StreamGenerateManyTokens(t *testing.T) {
|
||||
// Verify token ordering and full assembly with many chunks.
|
||||
n := 100
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
flusher, _ := w.(http.Flusher)
|
||||
for i := range n {
|
||||
tok := fmt.Sprintf("t%d ", i)
|
||||
_, _ = w.Write([]byte(sseChunk(tok)))
|
||||
flusher.Flush()
|
||||
}
|
||||
_, _ = w.Write([]byte("data: [DONE]\n\n"))
|
||||
flusher.Flush()
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
c := NewLLMClient(ts.URL, 5*time.Second)
|
||||
var mu sync.Mutex
|
||||
var order []int
|
||||
result, err := c.StreamGenerate(context.Background(), "q", "", "", func(tok string) {
|
||||
var idx int
|
||||
fmt.Sscanf(tok, "t%d ", &idx)
|
||||
mu.Lock()
|
||||
order = append(order, idx)
|
||||
mu.Unlock()
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Verify all tokens arrived in order
|
||||
if len(order) != n {
|
||||
t.Fatalf("got %d tokens, want %d", len(order), n)
|
||||
}
|
||||
for i, v := range order {
|
||||
if v != i {
|
||||
t.Errorf("order[%d] = %d", i, v)
|
||||
break
|
||||
}
|
||||
}
|
||||
// Quick sanity: result should start with "t0 " and end with last token
|
||||
if !strings.HasPrefix(result, "t0 ") {
|
||||
t.Errorf("result prefix = %q", result[:10])
|
||||
}
|
||||
}
|
||||
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
// TTS client
|
||||
// ────────────────────────────────────────────────────────────────────────────
|
||||
@@ -312,7 +600,7 @@ func TestTTSClient_Synthesize(t *testing.T) {
|
||||
if r.URL.Query().Get("text") != "hello world" {
|
||||
t.Errorf("text = %q", r.URL.Query().Get("text"))
|
||||
}
|
||||
w.Write(expected)
|
||||
_, _ = w.Write(expected)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -331,7 +619,7 @@ func TestTTSClient_SynthesizeWithSpeaker(t *testing.T) {
|
||||
if r.URL.Query().Get("speaker_id") != "alice" {
|
||||
t.Errorf("speaker_id = %q", r.URL.Query().Get("speaker_id"))
|
||||
}
|
||||
w.Write([]byte{0x01})
|
||||
_, _ = w.Write([]byte{0x01})
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -365,7 +653,7 @@ func TestSTTClient_Transcribe(t *testing.T) {
|
||||
t.Errorf("file size = %d, want 100", len(data))
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(map[string]string{"text": "hello world"})
|
||||
_ = json.NewEncoder(w).Encode(map[string]string{"text": "hello world"})
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -384,7 +672,7 @@ func TestSTTClient_TranscribeTranslate(t *testing.T) {
|
||||
if r.URL.Path != "/v1/audio/translations" {
|
||||
t.Errorf("path = %q, want /v1/audio/translations", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]string{"text": "translated"})
|
||||
_ = json.NewEncoder(w).Encode(map[string]string{"text": "translated"})
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -406,7 +694,7 @@ func TestSTTClient_TranscribeTranslate(t *testing.T) {
|
||||
func TestHTTPError4xx(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(422)
|
||||
w.Write([]byte(`{"error": "bad input"}`))
|
||||
_, _ = w.Write([]byte(`{"error": "bad input"}`))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -423,7 +711,7 @@ func TestHTTPError4xx(t *testing.T) {
|
||||
func TestHTTPError5xx(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte("internal server error"))
|
||||
_, _ = w.Write([]byte("internal server error"))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -467,8 +755,8 @@ func TestBuildMessages(t *testing.T) {
|
||||
|
||||
func BenchmarkPostJSON(b *testing.B) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
io.Copy(io.Discard, r.Body)
|
||||
w.Write([]byte(`{"ok":true}`))
|
||||
_, _ = io.Copy(io.Discard, r.Body)
|
||||
_, _ = w.Write([]byte(`{"ok":true}`))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -482,7 +770,7 @@ func BenchmarkPostJSON(b *testing.B) {
|
||||
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
c.postJSON(ctx, "/test", payload)
|
||||
_, _ = c.postJSON(ctx, "/test", payload)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -66,8 +66,8 @@ func TestCallbackRegistration(t *testing.T) {
|
||||
}
|
||||
|
||||
// Verify setup/teardown work when called directly.
|
||||
h.onSetup(context.Background())
|
||||
h.onTeardown(context.Background())
|
||||
_ = h.onSetup(context.Background())
|
||||
_ = h.onTeardown(context.Background())
|
||||
if !setupCalled || !teardownCalled {
|
||||
t.Error("callbacks should have been invoked")
|
||||
}
|
||||
@@ -290,7 +290,7 @@ func BenchmarkWrapTypedHandler(b *testing.B) {
|
||||
h := New("ai.test", cfg)
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
|
||||
var req benchReq
|
||||
msgpack.Unmarshal(msg.Data, &req)
|
||||
_ = msgpack.Unmarshal(msg.Data, &req)
|
||||
return map[string]any{"ok": true}, nil
|
||||
})
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -22,7 +21,6 @@ type Server struct {
|
||||
readyPath string
|
||||
readyCheck ReadyFunc
|
||||
srv *http.Server
|
||||
ready atomic.Bool
|
||||
}
|
||||
|
||||
// New creates a health server on the given port.
|
||||
|
||||
@@ -19,7 +19,7 @@ func TestHealthEndpoint(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("health request failed: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
t.Errorf("expected 200, got %d", resp.StatusCode)
|
||||
@@ -42,7 +42,7 @@ func TestReadyEndpointDefault(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("ready request failed: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
t.Errorf("expected 200, got %d", resp.StatusCode)
|
||||
@@ -60,7 +60,7 @@ func TestReadyEndpointNotReady(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("ready request failed: %v", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode != 503 {
|
||||
t.Errorf("expected 503 when not ready, got %d", resp.StatusCode)
|
||||
}
|
||||
@@ -70,7 +70,7 @@ func TestReadyEndpointNotReady(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("ready request failed: %v", err)
|
||||
}
|
||||
resp2.Body.Close()
|
||||
_ = resp2.Body.Close()
|
||||
if resp2.StatusCode != 200 {
|
||||
t.Errorf("expected 200 when ready, got %d", resp2.StatusCode)
|
||||
}
|
||||
|
||||
@@ -178,7 +178,7 @@ func BenchmarkEncode_ChatRequest_MsgpackMap(b *testing.B) {
|
||||
data := chatRequestMap()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
msgpack.Marshal(data)
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,7 +186,7 @@ func BenchmarkEncode_ChatRequest_MsgpackStruct(b *testing.B) {
|
||||
data := chatRequestStruct()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
msgpack.Marshal(data)
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,7 +194,7 @@ func BenchmarkEncode_ChatRequest_Protobuf(b *testing.B) {
|
||||
data := chatRequestProto()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
proto.Marshal(data)
|
||||
_, _ = proto.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,7 +202,7 @@ func BenchmarkEncode_VoiceResponse_MsgpackMap(b *testing.B) {
|
||||
data := voiceResponseMap()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
msgpack.Marshal(data)
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,7 +210,7 @@ func BenchmarkEncode_VoiceResponse_MsgpackStruct(b *testing.B) {
|
||||
data := voiceResponseStruct()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
msgpack.Marshal(data)
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,7 +218,7 @@ func BenchmarkEncode_VoiceResponse_Protobuf(b *testing.B) {
|
||||
data := voiceResponseProto()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
proto.Marshal(data)
|
||||
_, _ = proto.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,7 +226,7 @@ func BenchmarkEncode_TTSChunk_MsgpackMap(b *testing.B) {
|
||||
data := ttsChunkMap()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
msgpack.Marshal(data)
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,7 +234,7 @@ func BenchmarkEncode_TTSChunk_MsgpackStruct(b *testing.B) {
|
||||
data := ttsChunkStruct()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
msgpack.Marshal(data)
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -242,7 +242,7 @@ func BenchmarkEncode_TTSChunk_Protobuf(b *testing.B) {
|
||||
data := ttsChunkProto()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
proto.Marshal(data)
|
||||
_, _ = proto.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -255,7 +255,7 @@ func BenchmarkDecode_ChatRequest_MsgpackMap(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m map[string]any
|
||||
msgpack.Unmarshal(encoded, &m)
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,7 +264,7 @@ func BenchmarkDecode_ChatRequest_MsgpackStruct(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m ChatRequest
|
||||
msgpack.Unmarshal(encoded, &m)
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -273,7 +273,7 @@ func BenchmarkDecode_ChatRequest_Protobuf(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m pb.ChatRequest
|
||||
proto.Unmarshal(encoded, &m)
|
||||
_ = proto.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,7 +282,7 @@ func BenchmarkDecode_VoiceResponse_MsgpackMap(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m map[string]any
|
||||
msgpack.Unmarshal(encoded, &m)
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -291,7 +291,7 @@ func BenchmarkDecode_VoiceResponse_MsgpackStruct(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m VoiceResponse
|
||||
msgpack.Unmarshal(encoded, &m)
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -300,7 +300,7 @@ func BenchmarkDecode_VoiceResponse_Protobuf(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m pb.VoiceResponse
|
||||
proto.Unmarshal(encoded, &m)
|
||||
_ = proto.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -309,7 +309,7 @@ func BenchmarkDecode_TTSChunk_MsgpackMap(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m map[string]any
|
||||
msgpack.Unmarshal(encoded, &m)
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -318,7 +318,7 @@ func BenchmarkDecode_TTSChunk_MsgpackStruct(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m TTSAudioChunk
|
||||
msgpack.Unmarshal(encoded, &m)
|
||||
_ = msgpack.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -327,7 +327,7 @@ func BenchmarkDecode_TTSChunk_Protobuf(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
var m pb.TTSAudioChunk
|
||||
proto.Unmarshal(encoded, &m)
|
||||
_ = proto.Unmarshal(encoded, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,7 +341,7 @@ func BenchmarkRoundtrip_ChatRequest_MsgpackMap(b *testing.B) {
|
||||
for b.Loop() {
|
||||
enc, _ := msgpack.Marshal(data)
|
||||
var dec map[string]any
|
||||
msgpack.Unmarshal(enc, &dec)
|
||||
_ = msgpack.Unmarshal(enc, &dec)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,7 +351,7 @@ func BenchmarkRoundtrip_ChatRequest_MsgpackStruct(b *testing.B) {
|
||||
for b.Loop() {
|
||||
enc, _ := msgpack.Marshal(data)
|
||||
var dec ChatRequest
|
||||
msgpack.Unmarshal(enc, &dec)
|
||||
_ = msgpack.Unmarshal(enc, &dec)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -361,7 +361,7 @@ func BenchmarkRoundtrip_ChatRequest_Protobuf(b *testing.B) {
|
||||
for b.Loop() {
|
||||
enc, _ := proto.Marshal(data)
|
||||
var dec pb.ChatRequest
|
||||
proto.Unmarshal(enc, &dec)
|
||||
_ = proto.Unmarshal(enc, &dec)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -200,7 +200,7 @@ func BenchmarkEncodeMap(b *testing.B) {
|
||||
"top_k": 10,
|
||||
}
|
||||
for b.Loop() {
|
||||
msgpack.Marshal(data)
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,7 +212,7 @@ func BenchmarkEncodeStruct(b *testing.B) {
|
||||
Active: true,
|
||||
}
|
||||
for b.Loop() {
|
||||
msgpack.Marshal(data)
|
||||
_, _ = msgpack.Marshal(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,7 +226,7 @@ func BenchmarkDecodeMap(b *testing.B) {
|
||||
})
|
||||
for b.Loop() {
|
||||
var m map[string]any
|
||||
msgpack.Unmarshal(raw, &m)
|
||||
_ = msgpack.Unmarshal(raw, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -239,7 +239,7 @@ func BenchmarkDecodeStruct(b *testing.B) {
|
||||
})
|
||||
for b.Loop() {
|
||||
var m testMessage
|
||||
msgpack.Unmarshal(raw, &m)
|
||||
_ = msgpack.Unmarshal(raw, &m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -251,6 +251,6 @@ func BenchmarkDecodeAudio32KB(b *testing.B) {
|
||||
})
|
||||
for b.Loop() {
|
||||
var m audioMessage
|
||||
msgpack.Unmarshal(raw, &m)
|
||||
_ = msgpack.Unmarshal(raw, &m)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user