17 Commits

Author SHA1 Message Date
22eb918fb1 fix: use type=raw for Docker tags to preserve v prefix
Some checks failed
CI / Docker Build & Push (push) Failing after 3m8s
CI / Lint (push) Successful in 2m48s
CI / Test (push) Successful in 3m3s
CI / Release (push) Successful in 1m24s
CI / Notify (push) Successful in 2s
docker/metadata-action type=semver strips the v prefix, causing
tag mismatch between git tags (v0.1.3) and Docker tags (0.1.3).
Switch to type=raw to pass through the version as-is.
2026-02-22 09:58:39 -05:00
0174e31cf7 fix: switch Docker build to plain docker build/push with insecure registry
All checks were successful
CI / Lint (push) Successful in 3m3s
CI / Test (push) Successful in 2m54s
CI / Release (push) Successful in 1m31s
CI / Docker Build & Push (push) Successful in 2m59s
CI / Notify (push) Successful in 1s
- Drop buildx (setup-buildx-action, build-push-action)
- Use insecure HTTP registry with SIGHUP daemon reload
- Use org-level PAT secrets for registry auth
2026-02-21 22:38:02 -05:00
b01057e11a fix: use docker login CLI instead of login-action for Gitea compat
Some checks failed
CI / Test (push) Successful in 2m55s
CI / Lint (push) Successful in 2m49s
CI / Release (push) Successful in 1m53s
CI / Notify (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
docker/login-action@v3 fails with 'Username and password required' on
Gitea Actions — secrets not passed to action with: inputs. Switch to
direct docker login CLI which reliably interpolates secrets in run: steps.
2026-02-21 19:34:30 -05:00
7c0be6d00e fix: switch Docker registry to HTTPS endpoint with login-action
Some checks failed
CI / Test (push) Successful in 2m52s
CI / Lint (push) Successful in 2m48s
CI / Release (push) Successful in 1m36s
CI / Docker Build & Push (push) Failing after 8m21s
CI / Notify (push) Successful in 2s
- Replace gitea-http.gitea.svc.cluster.local:3000 with registry.lab.daviestechlabs.io
- Use docker/login-action@v3 for Gitea registry auth (proper buildx integration)
- Remove manual base64 auth to ~/.docker/config.json (not picked up by buildkit)
- Remove insecure registry daemon.json config and Docker restart
- Remove buildkitd insecure registry config
- Remove cache-from/cache-to type=gha (not supported on Gitea Actions)

Fixes 401 Unauthorized: reqPackageAccess on Docker push
2026-02-21 18:05:43 -05:00
6e498cc0d5 feat: migrate from msgpack to protobuf (handler-base v1.0.0)
Some checks failed
CI / Test (push) Successful in 3m0s
CI / Lint (push) Successful in 3m0s
CI / Release (push) Successful in 59s
CI / Notify (push) Successful in 2s
CI / Docker Build & Push (push) Failing after 8m4s
- Replace msgpack encoding with protobuf wire format
- Update field names to proto convention
- Use pointer slices for repeated message fields ([]*DocumentSource)
- Rewrite tests for proto round-trips
2026-02-21 15:30:26 -05:00
d891140817 chore: bump handler-base to v0.1.5, add netrc secret mount to Dockerfile
Some checks failed
CI / Lint (push) Successful in 2m58s
CI / Test (push) Successful in 2m58s
CI / Release (push) Successful in 1m34s
CI / Docker Build & Push (push) Failing after 8m30s
CI / Notify (push) Successful in 1s
2026-02-20 18:17:00 -05:00
d0115790e0 ci: retrigger build
Some checks failed
CI / Test (push) Successful in 2m26s
CI / Lint (push) Successful in 3m17s
CI / Release (push) Successful in 1m19s
CI / Docker Build & Push (push) Failing after 7m5s
CI / Notify (push) Successful in 2s
2026-02-20 10:06:35 -05:00
ef4ef41404 fix: unignore .gitea workflow yaml files, add build-push.yaml
Some checks failed
CI / Lint (push) Successful in 2m55s
CI / Test (push) Successful in 2m38s
CI / Release (push) Successful in 56s
CI / Docker Build & Push (push) Failing after 4m3s
CI / Notify (push) Successful in 1s
The .gitignore had *.yaml for compiled KFP pipelines which was
preventing .gitea/workflows/build-push.yaml from being tracked.
2026-02-20 09:29:29 -05:00
39b1199617 fix: add GOPRIVATE and git auth for private handler-base module
- Set GOPRIVATE=git.daviestechlabs.io to bypass public Go module proxy
- Configure git URL insteadOf with DISPATCH_TOKEN for private repo access
2026-02-20 09:22:38 -05:00
3a82afd483 fix: rename GITEA_TOKEN to DISPATCH_TOKEN 2026-02-20 09:10:23 -05:00
fb2d58a87f ci: add handler-base auto-update workflow, remove old Python CI 2026-02-20 09:06:02 -05:00
653445b005 fix: use tagged handler-base v0.1.3, remove local replace directive
Some checks failed
CI / Lint (push) Failing after 1m21s
CI / Test (push) Failing after 1m20s
CI / Release (push) Has been cancelled
CI / Notify (push) Has been cancelled
2026-02-20 09:00:48 -05:00
b4ed7dd5b4 fix: resolve golangci-lint errcheck warnings
Some checks failed
CI / Lint (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release (push) Has been cancelled
CI / Notify (push) Has been cancelled
- Add error checks for unchecked return values (errcheck)
- Remove unused struct fields (unused)
- Fix gofmt formatting issues
2026-02-20 08:45:43 -05:00
32b7420c34 Merge pull request 'feature/go-handler-refactor' (#1) from feature/go-handler-refactor into main
Some checks failed
CI / Lint (push) Failing after 1m22s
CI / Test (push) Failing after 1m24s
CI / Release (push) Has been skipped
CI / Notify (push) Successful in 1s
Reviewed-on: #1
2026-02-20 12:33:52 +00:00
8ef0c93e47 feat: migrate to typed messages, drop base64
Some checks failed
CI / Lint (pull_request) Failing after 1m22s
CI / Test (pull_request) Failing after 1m21s
CI / Release (pull_request) Has been skipped
CI / Notify (pull_request) Successful in 1s
- Switch OnMessage → OnTypedMessage with natsutil.Decode[messages.VoiceRequest]
- Return *messages.VoiceResponse with raw []byte audio (no base64)
- Use messages.DocumentSource for RAG sources
- Remove strVal/boolVal helpers
- Add .dockerignore, GOAMD64=v3 in Dockerfile
- Update tests for typed structs (7 tests pass)
2026-02-20 07:10:51 -05:00
f41198d8f2 feat: add e2e tests + benchmarks, fix config API
- e2e_test.go: full voice pipeline (STT->Embed->Rerank->LLM->TTS)
- main.go: fix config field->method references
- Benchmarks: full pipeline 481µs/op
2026-02-20 06:45:21 -05:00
2e66cac1e9 feat: rewrite voice-assistant in Go
Replace Python voice assistant with Go for smaller container images.
Uses handler-base Go module for NATS, health, telemetry, and all service clients.

- Full pipeline: STT → embed → Milvus → rerank → LLM → TTS
- Base64 audio encode/decode
- Dockerfile: multi-stage golang:1.25-alpine → scratch
- CI: Gitea Actions with lint/test/release/docker/notify
2026-02-19 18:00:58 -05:00
17 changed files with 891 additions and 3387 deletions

9
.dockerignore Normal file
View File

@@ -0,0 +1,9 @@
.git
.gitignore
*.md
LICENSE
renovate.json
*_test.go
e2e_test.go
__pycache__
.env*

View File

@@ -0,0 +1,196 @@
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
GOPRIVATE: git.daviestechlabs.io
REGISTRY: gitea-http.gitea.svc.cluster.local:3000/daviestechlabs
REGISTRY_HOST: gitea-http.gitea.svc.cluster.local:3000
IMAGE_NAME: voice-assistant
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Configure private modules
run: git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
- name: Run go vet
run: go vet ./...
- name: Install golangci-lint
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b "$(go env GOPATH)/bin"
echo "$(go env GOPATH)/bin" >> $GITHUB_PATH
- name: Run golangci-lint
run: golangci-lint run ./...
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Configure private modules
run: git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
- name: Verify dependencies
run: go mod verify
- name: Build
run: go build -v ./...
- name: Run tests
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./...
release:
name: Release
runs-on: ubuntu-latest
needs: [lint, test]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
outputs:
version: ${{ steps.version.outputs.version }}
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Determine version bump
id: version
run: |
# Get latest tag or default to v0.0.0
LATEST=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
VERSION=${LATEST#v}
IFS='.' read -r MAJOR MINOR PATCH <<< "$VERSION"
# Check commit message for keywords
MSG="${{ gitea.event.head_commit.message }}"
if echo "$MSG" | grep -qiE "^major:|BREAKING CHANGE"; then
MAJOR=$((MAJOR + 1)); MINOR=0; PATCH=0
BUMP="major"
elif echo "$MSG" | grep -qiE "^(minor:|feat:)"; then
MINOR=$((MINOR + 1)); PATCH=0
BUMP="minor"
else
PATCH=$((PATCH + 1))
BUMP="patch"
fi
NEW_VERSION="v${MAJOR}.${MINOR}.${PATCH}"
echo "version=$NEW_VERSION" >> $GITHUB_OUTPUT
echo "bump=$BUMP" >> $GITHUB_OUTPUT
echo "Bumping $LATEST → $NEW_VERSION ($BUMP)"
- name: Create and push tag
run: |
git config user.name "gitea-actions[bot]"
git config user.email "actions@git.daviestechlabs.io"
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
git push origin ${{ steps.version.outputs.version }}
docker:
name: Docker Build & Push
runs-on: ubuntu-latest
needs: [lint, test, release]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Configure insecure registry
run: |
sudo mkdir -p /etc/docker
echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json
sudo kill -SIGHUP "$(pidof dockerd)" || true
sleep 3
- name: Login to Gitea Registry
run: echo "${{ secrets.REGISTRY_TOKEN }}" | docker login "${{ env.REGISTRY_HOST }}" -u "${{ secrets.REGISTRY_USER }}" --password-stdin
- name: Login to Docker Hub
if: vars.DOCKERHUB_USERNAME != ''
run: echo "${{ secrets.DOCKERHUB_TOKEN }}" | docker login -u "${{ vars.DOCKERHUB_USERNAME }}" --password-stdin
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=raw,value=${{ needs.release.outputs.version }}
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push
run: |
# Build with all tags
TAGS=""
while IFS= read -r tag; do
[ -n "$tag" ] && TAGS="$TAGS -t $tag"
done <<< "${{ steps.meta.outputs.tags }}"
docker build $TAGS \
--label "org.opencontainers.image.source=${{ gitea.server_url }}/${{ gitea.repository }}" \
--label "org.opencontainers.image.revision=${{ gitea.sha }}" \
.
# Push each tag
while IFS= read -r tag; do
[ -n "$tag" ] && docker push "$tag"
done <<< "${{ steps.meta.outputs.tags }}"
notify:
name: Notify
runs-on: ubuntu-latest
needs: [lint, test, release, docker]
if: always()
steps:
- name: Notify on success
if: needs.lint.result == 'success' && needs.test.result == 'success'
run: |
curl -s \
-H "Title: ✅ CI Passed: ${{ gitea.repository }}" \
-H "Priority: default" \
-H "Tags: white_check_mark,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Release: ${{ needs.release.result == 'success' && needs.release.outputs.version || 'skipped' }}
Docker: ${{ needs.docker.result }}" \
${{ env.NTFY_URL }}/gitea-ci
- name: Notify on failure
if: needs.lint.result == 'failure' || needs.test.result == 'failure'
run: |
curl -s \
-H "Title: ❌ CI Failed: ${{ gitea.repository }}" \
-H "Priority: high" \
-H "Tags: x,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Lint: ${{ needs.lint.result }}
Test: ${{ needs.test.result }}" \
${{ env.NTFY_URL }}/gitea-ci

View File

@@ -1,129 +0,0 @@
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Set up Python
run: uv python install 3.13
- name: Install dependencies
run: uv sync --frozen --extra dev
- name: Run ruff check
run: uv run ruff check .
- name: Run ruff format check
run: uv run ruff format --check .
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Set up Python
run: uv python install 3.13
- name: Install dependencies
run: uv sync --frozen --extra dev
- name: Run tests
run: uv run pytest -v
release:
name: Release
runs-on: ubuntu-latest
needs: [lint, test]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Determine version bump
id: version
run: |
# Get latest tag or default to v0.0.0
LATEST=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
VERSION=${LATEST#v}
IFS='.' read -r MAJOR MINOR PATCH <<< "$VERSION"
# Check commit message for keywords
MSG="${{ gitea.event.head_commit.message }}"
if echo "$MSG" | grep -qiE "^major:|BREAKING CHANGE"; then
MAJOR=$((MAJOR + 1)); MINOR=0; PATCH=0
BUMP="major"
elif echo "$MSG" | grep -qiE "^(minor:|feat:)"; then
MINOR=$((MINOR + 1)); PATCH=0
BUMP="minor"
else
PATCH=$((PATCH + 1))
BUMP="patch"
fi
NEW_VERSION="v${MAJOR}.${MINOR}.${PATCH}"
echo "version=$NEW_VERSION" >> $GITHUB_OUTPUT
echo "bump=$BUMP" >> $GITHUB_OUTPUT
echo "Bumping $LATEST → $NEW_VERSION ($BUMP)"
- name: Create and push tag
run: |
git config user.name "gitea-actions[bot]"
git config user.email "actions@git.daviestechlabs.io"
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
git push origin ${{ steps.version.outputs.version }}
notify:
name: Notify
runs-on: ubuntu-latest
needs: [lint, test, release]
if: always()
steps:
- name: Notify on success
if: needs.lint.result == 'success' && needs.test.result == 'success'
run: |
curl -s \
-H "Title: ✅ CI Passed: ${{ gitea.repository }}" \
-H "Priority: default" \
-H "Tags: white_check_mark,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Release: ${{ needs.release.result == 'success' && 'created' || 'skipped' }}" \
${{ env.NTFY_URL }}/gitea-ci
- name: Notify on failure
if: needs.lint.result == 'failure' || needs.test.result == 'failure'
run: |
curl -s \
-H "Title: ❌ CI Failed: ${{ gitea.repository }}" \
-H "Priority: high" \
-H "Tags: x,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Lint: ${{ needs.lint.result }}
Test: ${{ needs.test.result }}" \
${{ env.NTFY_URL }}/gitea-ci

View File

@@ -0,0 +1,60 @@
name: Update handler-base
on:
repository_dispatch:
types: [handler-base-release]
env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
GOPRIVATE: git.daviestechlabs.io
jobs:
update:
name: Update handler-base dependency
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
token: ${{ secrets.DISPATCH_TOKEN }}
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Configure Git
run: |
git config user.name "gitea-actions[bot]"
git config user.email "actions@git.daviestechlabs.io"
git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
- name: Update handler-base
run: |
VERSION="${{ gitea.event.client_payload.version }}"
echo "Updating handler-base to ${VERSION}"
go get git.daviestechlabs.io/daviestechlabs/handler-base@${VERSION}
go mod tidy
- name: Commit and push
run: |
VERSION="${{ gitea.event.client_payload.version }}"
if git diff --quiet go.mod go.sum; then
echo "No changes to commit"
exit 0
fi
git add go.mod go.sum
git commit -m "chore(deps): bump handler-base to ${VERSION}"
git push
- name: Notify
if: success()
run: |
VERSION="${{ gitea.event.client_payload.version }}"
curl -s \
-H "Title: 📦 Dep Update: ${{ gitea.repository }}" \
-H "Priority: default" \
-H "Tags: package,github" \
-d "handler-base updated to ${VERSION}" \
${{ env.NTFY_URL }}/gitea-ci

2
.gitignore vendored
View File

@@ -35,9 +35,11 @@ ENV/
# Compiled KFP pipelines # Compiled KFP pipelines
*.yaml *.yaml
!.pre-commit-config.yaml !.pre-commit-config.yaml
!.gitea/**/*.yaml
!pipelines/*.py !pipelines/*.py
# Local # Local
.env .env
.env.local .env.local
*.log *.log
voice-assistant

View File

@@ -1,9 +1,26 @@
# Voice Assistant - Using handler-base with audio support # Build stage
ARG BASE_TAG=latest FROM golang:1.25-alpine AS builder
FROM ghcr.io/billy-davies-2/handler-base:${BASE_TAG}
WORKDIR /app WORKDIR /app
COPY voice_assistant.py . RUN apk add --no-cache ca-certificates git
CMD ["python", "voice_assistant.py"] ENV GOPRIVATE=git.daviestechlabs.io
ENV GONOSUMCHECK=git.daviestechlabs.io
COPY go.mod go.sum ./
RUN --mount=type=secret,id=netrc,target=/root/.netrc go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOAMD64=v3 go build -ldflags="-w -s" -o /voice-assistant .
# Runtime stage
FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /voice-assistant /voice-assistant
USER 65534:65534
ENTRYPOINT ["/voice-assistant"]

185
e2e_test.go Normal file
View File

@@ -0,0 +1,185 @@
package main
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"git.daviestechlabs.io/daviestechlabs/handler-base/clients"
)
// ────────────────────────────────────────────────────────────────────────────
// E2E tests: exercise the voice pipeline (STT → Embed → Rerank → LLM → TTS)
// ────────────────────────────────────────────────────────────────────────────
type voiceMocks struct {
STT *httptest.Server
Embeddings *httptest.Server
Reranker *httptest.Server
LLM *httptest.Server
TTS *httptest.Server
}
func newVoiceMocks(t *testing.T) *voiceMocks {
t.Helper()
m := &voiceMocks{}
m.STT = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = json.NewEncoder(w).Encode(map[string]string{"text": "What is the weather today?"})
}))
t.Cleanup(m.STT.Close)
m.Embeddings = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = json.NewEncoder(w).Encode(map[string]any{
"data": []map[string]any{{"embedding": []float64{0.5, 0.6, 0.7}}},
})
}))
t.Cleanup(m.Embeddings.Close)
m.Reranker = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = json.NewEncoder(w).Encode(map[string]any{
"results": []map[string]any{{"index": 0, "relevance_score": 0.88}},
})
}))
t.Cleanup(m.Reranker.Close)
m.LLM = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{
{"message": map[string]any{"content": "Sunny with a high of 72."}},
},
})
}))
t.Cleanup(m.LLM.Close)
m.TTS = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write(make([]byte, 8000)) // simulated audio
}))
t.Cleanup(m.TTS.Close)
return m
}
func TestVoicePipeline_FullFlow(t *testing.T) {
m := newVoiceMocks(t)
ctx := context.Background()
stt := clients.NewSTTClient(m.STT.URL, 5*time.Second)
embeddings := clients.NewEmbeddingsClient(m.Embeddings.URL, 5*time.Second, "bge")
reranker := clients.NewRerankerClient(m.Reranker.URL, 5*time.Second)
llm := clients.NewLLMClient(m.LLM.URL, 5*time.Second)
tts := clients.NewTTSClient(m.TTS.URL, 5*time.Second, "en")
// 1. STT
transcription, err := stt.Transcribe(ctx, make([]byte, 1000), "en")
if err != nil {
t.Fatal(err)
}
if transcription.Text == "" {
t.Fatal("empty transcription")
}
// 2. Embed
embedding, err := embeddings.EmbedSingle(ctx, transcription.Text)
if err != nil {
t.Fatal(err)
}
if len(embedding) == 0 {
t.Fatal("empty embedding")
}
// 3. Rerank
results, err := reranker.Rerank(ctx, transcription.Text, []string{"doc1"}, 1)
if err != nil {
t.Fatal(err)
}
if len(results) == 0 {
t.Fatal("no rerank results")
}
// 4. LLM
response, err := llm.Generate(ctx, transcription.Text, results[0].Document, "")
if err != nil {
t.Fatal(err)
}
if response == "" {
t.Fatal("empty LLM response")
}
// 5. TTS
audio, err := tts.Synthesize(ctx, response, "en", "")
if err != nil {
t.Fatal(err)
}
if len(audio) == 0 {
t.Fatal("empty audio")
}
}
func TestVoicePipeline_STTFailure(t *testing.T) {
failSTT := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(500)
_, _ = w.Write([]byte("model not loaded"))
}))
defer failSTT.Close()
stt := clients.NewSTTClient(failSTT.URL, 5*time.Second)
_, err := stt.Transcribe(context.Background(), make([]byte, 100), "")
if err == nil {
t.Error("expected error from failed STT")
}
}
func TestVoicePipeline_TTSLargeResponse(t *testing.T) {
// TTS that returns 1 MB of audio.
bigTTS := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write(make([]byte, 1<<20))
}))
defer bigTTS.Close()
tts := clients.NewTTSClient(bigTTS.URL, 10*time.Second, "en")
audio, err := tts.Synthesize(context.Background(), "long text", "en", "")
if err != nil {
t.Fatal(err)
}
if len(audio) != 1<<20 {
t.Errorf("audio size = %d, want %d", len(audio), 1<<20)
}
}
// ────────────────────────────────────────────────────────────────────────────
// Benchmark: voice pipeline latency with mock backends
// ────────────────────────────────────────────────────────────────────────────
func BenchmarkVoicePipeline_Full(b *testing.B) {
sttSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"text":"hello"}`))
}))
defer sttSrv.Close()
llmSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"choices":[{"message":{"content":"answer"}}]}`))
}))
defer llmSrv.Close()
ttsSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write(make([]byte, 4000))
}))
defer ttsSrv.Close()
stt := clients.NewSTTClient(sttSrv.URL, 10*time.Second)
llm := clients.NewLLMClient(llmSrv.URL, 10*time.Second)
tts := clients.NewTTSClient(ttsSrv.URL, 10*time.Second, "en")
ctx := context.Background()
audio := make([]byte, 16384)
b.ResetTimer()
for b.Loop() {
_, _ = stt.Transcribe(ctx, audio, "en")
_, _ = llm.Generate(ctx, "question", "", "")
_, _ = tts.Synthesize(ctx, "answer", "en", "")
}
}

39
go.mod Normal file
View File

@@ -0,0 +1,39 @@
module git.daviestechlabs.io/daviestechlabs/voice-assistant
go 1.25.1
require (
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0
github.com/nats-io/nats.go v1.48.0
google.golang.org/protobuf v1.36.11
)
require (
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 // indirect
go.opentelemetry.io/otel/metric v1.40.0 // indirect
go.opentelemetry.io/otel/sdk v1.40.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect
go.opentelemetry.io/otel/trace v1.40.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/grpc v1.78.0 // indirect
)

77
go.sum Normal file
View File

@@ -0,0 +1,77 @@
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0 h1:pB3ehOKaDYQfbyRBKQXrB9curqSFteLrDveoElRKnBY=
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0/go.mod h1:zocOHFt8yY3cW4+Xi37sNr5Tw7KcjGFSZqgWYxPWyqA=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 h1:NOyNnS19BF2SUDApbOKbDtWZ0IK7b8FJ2uAGdIWOGb0=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0/go.mod h1:VL6EgVikRLcJa9ftukrHu/ZkkhFBSo1lzvdBC9CF1ss=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 h1:DvJDOPmSWQHWywQS6lKL+pb8s3gBLOZUtw4N+mavW1I=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0/go.mod h1:EtekO9DEJb4/jRyN4v4Qjc2yA7AtfCBuz2FynRUWTXs=
go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g=
go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc=
go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8=
go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE=
go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw=
go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg=
go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw=
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

218
main.go Normal file
View File

@@ -0,0 +1,218 @@
package main
import (
"context"
"fmt"
"log/slog"
"os"
"strconv"
"strings"
"time"
"github.com/nats-io/nats.go"
"git.daviestechlabs.io/daviestechlabs/handler-base/clients"
"git.daviestechlabs.io/daviestechlabs/handler-base/config"
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
"google.golang.org/protobuf/proto"
)
func main() {
cfg := config.Load()
cfg.ServiceName = "voice-assistant"
cfg.NATSQueueGroup = "voice-assistants"
// Voice-specific settings
ragTopK := getEnvInt("RAG_TOP_K", 10)
ragRerankTopK := getEnvInt("RAG_RERANK_TOP_K", 5)
ragCollection := getEnv("RAG_COLLECTION", "documents")
sttLanguage := getEnv("STT_LANGUAGE", "") // empty = auto-detect
ttsLanguage := getEnv("TTS_LANGUAGE", "en")
includeTranscription := getEnvBool("INCLUDE_TRANSCRIPTION", true)
includeSources := getEnvBool("INCLUDE_SOURCES", false)
// Service clients
timeout := 60 * time.Second
stt := clients.NewSTTClient(cfg.STTURL(), timeout)
embeddings := clients.NewEmbeddingsClient(cfg.EmbeddingsURL(), timeout, "")
reranker := clients.NewRerankerClient(cfg.RerankerURL(), timeout)
llm := clients.NewLLMClient(cfg.LLMURL(), timeout)
tts := clients.NewTTSClient(cfg.TTSURL(), timeout, ttsLanguage)
milvus := clients.NewMilvusClient(cfg.MilvusHost, cfg.MilvusPort, ragCollection)
h := handler.New("voice.request", cfg)
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
var req messages.VoiceRequest
if err := natsutil.Decode(msg.Data, &req); err != nil {
return &messages.VoiceResponse{Error: "Invalid request encoding"}, nil
}
requestID := req.RequestId
if requestID == "" {
requestID = "unknown"
}
language := req.Language
if language == "" {
language = sttLanguage
}
collection := req.Collection
if collection == "" {
collection = ragCollection
}
slog.Info("processing voice request", "request_id", requestID)
errResp := func(msg string) (proto.Message, error) {
return &messages.VoiceResponse{RequestId: requestID, Error: msg}, nil
}
// 1. Audio arrives as raw bytes — no base64 decode needed
if len(req.Audio) == 0 {
return errResp("No audio data")
}
// 2. Transcribe audio → text
transcription, err := stt.Transcribe(ctx, req.Audio, language)
if err != nil {
slog.Error("STT failed", "error", err)
return errResp("Transcription failed")
}
query := strings.TrimSpace(transcription.Text)
if query == "" {
slog.Warn("empty transcription", "request_id", requestID)
return errResp("Could not transcribe audio")
}
slog.Info("transcribed", "text", truncate(query, 50))
// 3. Generate query embedding
embedding, err := embeddings.EmbedSingle(ctx, query)
if err != nil {
slog.Error("embedding failed", "error", err)
return errResp("Embedding failed")
}
// 4. Search Milvus for context (placeholder — requires Milvus SDK)
_ = milvus
_ = collection
_ = embedding
_ = ragTopK
type docResult struct {
Document string
Score float64
}
var documents []docResult // Milvus results placeholder
// 5. Rerank documents
if len(documents) > 0 {
texts := make([]string, len(documents))
for i, d := range documents {
texts[i] = d.Document
}
reranked, err := reranker.Rerank(ctx, query, texts, ragRerankTopK)
if err != nil {
slog.Error("rerank failed", "error", err)
} else {
documents = make([]docResult, len(reranked))
for i, r := range reranked {
documents[i] = docResult{Document: r.Document, Score: r.Score}
}
}
}
// 6. Build context
var contextParts []string
for _, d := range documents {
contextParts = append(contextParts, d.Document)
}
contextText := strings.Join(contextParts, "\n\n")
// 7. Generate LLM response
responseText, err := llm.Generate(ctx, query, contextText, "")
if err != nil {
slog.Error("LLM generation failed", "error", err)
return errResp("Generation failed")
}
// 8. Synthesize speech — response audio is raw bytes
responseAudio, err := tts.Synthesize(ctx, responseText, ttsLanguage, "")
if err != nil {
slog.Error("TTS failed", "error", err)
return errResp("Speech synthesis failed")
}
// Build typed response
result := &messages.VoiceResponse{
RequestId: requestID,
Response: responseText,
Audio: responseAudio,
}
if includeTranscription {
result.Transcription = query
}
if includeSources && len(documents) > 0 {
limit := 3
if len(documents) < limit {
limit = len(documents)
}
result.Sources = make([]*messages.DocumentSource, limit)
for i := 0; i < limit; i++ {
text := documents[i].Document
if len(text) > 200 {
text = text[:200]
}
result.Sources[i] = &messages.DocumentSource{Text: text, Score: documents[i].Score}
}
}
// Publish to response subject
responseSubject := fmt.Sprintf("voice.response.%s", requestID)
_ = h.NATS.Publish(responseSubject, result)
slog.Info("completed voice request", "request_id", requestID)
return result, nil
})
if err := h.Run(); err != nil {
slog.Error("handler failed", "error", err)
os.Exit(1)
}
}
// Helpers
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func getEnvInt(key string, fallback int) int {
if v := os.Getenv(key); v != "" {
if i, err := strconv.Atoi(v); err == nil {
return i
}
}
return fallback
}
func getEnvBool(key string, fallback bool) bool {
if v := os.Getenv(key); v != "" {
return strings.EqualFold(v, "true") || v == "1"
}
return fallback
}
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "..."
}

83
main_test.go Normal file
View File

@@ -0,0 +1,83 @@
package main
import (
"testing"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
"google.golang.org/protobuf/proto"
)
func TestVoiceRequestDecode(t *testing.T) {
req := &messages.VoiceRequest{
RequestId: "req-123",
Audio: []byte{0x01, 0x02, 0x03},
Language: "en",
Collection: "docs",
}
data, err := proto.Marshal(req)
if err != nil {
t.Fatal(err)
}
var decoded messages.VoiceRequest
if err := natsutil.Decode(data, &decoded); err != nil {
t.Fatal(err)
}
if decoded.RequestId != "req-123" {
t.Errorf("RequestId = %q", decoded.RequestId)
}
if len(decoded.Audio) != 3 {
t.Errorf("Audio len = %d", len(decoded.Audio))
}
}
func TestVoiceResponseRoundtrip(t *testing.T) {
resp := &messages.VoiceResponse{
RequestId: "req-456",
Response: "It is sunny today.",
Audio: make([]byte, 8000),
Transcription: "What is the weather?",
Sources: []*messages.DocumentSource{{Text: "weather doc", Score: 0.9}},
}
data, err := proto.Marshal(resp)
if err != nil {
t.Fatal(err)
}
var got messages.VoiceResponse
if err := proto.Unmarshal(data, &got); err != nil {
t.Fatal(err)
}
if got.Response != "It is sunny today." {
t.Errorf("Response = %q", got.Response)
}
if len(got.Audio) != 8000 {
t.Errorf("Audio len = %d", len(got.Audio))
}
if len(got.Sources) != 1 || got.Sources[0].Text != "weather doc" {
t.Errorf("Sources = %v", got.Sources)
}
}
func TestGetEnvHelpers(t *testing.T) {
t.Setenv("VA_TEST", "hello")
if got := getEnv("VA_TEST", "x"); got != "hello" {
t.Errorf("getEnv = %q", got)
}
t.Setenv("VA_PORT", "9090")
if got := getEnvInt("VA_PORT", 0); got != 9090 {
t.Errorf("getEnvInt = %d", got)
}
t.Setenv("VA_FLAG", "true")
if got := getEnvBool("VA_FLAG", false); !got {
t.Error("getEnvBool should be true")
}
}
func TestTruncate(t *testing.T) {
if got := truncate("hello world", 5); got != "hello..." {
t.Errorf("truncate = %q", got)
}
if got := truncate("hi", 10); got != "hi" {
t.Errorf("truncate short = %q", got)
}
}

View File

@@ -1,43 +0,0 @@
[project]
name = "voice-assistant"
version = "1.0.0"
description = "Voice assistant pipeline - STT → RAG → LLM → TTS"
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [{ name = "Davies Tech Labs" }]
dependencies = [
"handler-base @ git+https://git.daviestechlabs.io/daviestechlabs/handler-base.git",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"ruff>=0.1.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.metadata]
allow-direct-references = true
[tool.hatch.build.targets.wheel]
packages = ["."]
only-include = ["voice_assistant.py"]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --tb=short"
filterwarnings = ["ignore::DeprecationWarning"]

View File

@@ -1 +0,0 @@
# Voice Assistant Tests

View File

@@ -1,147 +0,0 @@
"""
Pytest configuration and fixtures for voice-assistant tests.
"""
import asyncio
import base64
import os
from unittest.mock import MagicMock, patch
import pytest
# Set test environment variables before importing
os.environ.setdefault("NATS_URL", "nats://localhost:4222")
os.environ.setdefault("REDIS_URL", "redis://localhost:6379")
os.environ.setdefault("MILVUS_HOST", "localhost")
os.environ.setdefault("OTEL_ENABLED", "false")
os.environ.setdefault("MLFLOW_ENABLED", "false")
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def sample_audio_b64():
"""Sample base64 encoded audio for testing."""
# 16-bit PCM silence (44 bytes header + 1000 samples)
wav_header = bytes(
[
0x52,
0x49,
0x46,
0x46, # "RIFF"
0x24,
0x08,
0x00,
0x00, # File size
0x57,
0x41,
0x56,
0x45, # "WAVE"
0x66,
0x6D,
0x74,
0x20, # "fmt "
0x10,
0x00,
0x00,
0x00, # Chunk size
0x01,
0x00, # PCM format
0x01,
0x00, # Mono
0x80,
0x3E,
0x00,
0x00, # Sample rate (16000)
0x00,
0x7D,
0x00,
0x00, # Byte rate
0x02,
0x00, # Block align
0x10,
0x00, # Bits per sample
0x64,
0x61,
0x74,
0x61, # "data"
0x00,
0x08,
0x00,
0x00, # Data size
]
)
silence = bytes([0x00] * 2048)
return base64.b64encode(wav_header + silence).decode()
@pytest.fixture
def sample_embedding():
"""Sample embedding vector."""
return [0.1] * 1024
@pytest.fixture
def sample_documents():
"""Sample search results."""
return [
{"text": "Machine learning is a subset of AI.", "score": 0.95},
{"text": "Deep learning uses neural networks.", "score": 0.90},
{"text": "AI enables intelligent automation.", "score": 0.85},
]
@pytest.fixture
def sample_reranked():
"""Sample reranked results."""
return [
{"document": "Machine learning is a subset of AI.", "score": 0.98},
{"document": "Deep learning uses neural networks.", "score": 0.85},
]
@pytest.fixture
def mock_nats_message():
"""Create a mock NATS message."""
msg = MagicMock()
msg.subject = "voice.request"
msg.reply = "voice.response.test-123"
return msg
@pytest.fixture
def mock_voice_request(sample_audio_b64):
"""Sample voice request payload."""
return {
"request_id": "test-request-123",
"audio": sample_audio_b64,
"language": "en",
"collection": "test_collection",
}
@pytest.fixture
def mock_clients():
"""Mock all service clients."""
with (
patch("voice_assistant.STTClient") as stt,
patch("voice_assistant.EmbeddingsClient") as embeddings,
patch("voice_assistant.RerankerClient") as reranker,
patch("voice_assistant.LLMClient") as llm,
patch("voice_assistant.TTSClient") as tts,
patch("voice_assistant.MilvusClient") as milvus,
):
yield {
"stt": stt,
"embeddings": embeddings,
"reranker": reranker,
"llm": llm,
"tts": tts,
"milvus": milvus,
}

View File

@@ -1,198 +0,0 @@
"""
Unit tests for VoiceAssistant handler.
"""
import pytest
from unittest.mock import AsyncMock, patch
# Import after environment is set up in conftest
from voice_assistant import VoiceAssistant, VoiceSettings
class TestVoiceSettings:
"""Tests for VoiceSettings configuration."""
def test_default_settings(self):
"""Test default settings values."""
settings = VoiceSettings()
assert settings.service_name == "voice-assistant"
assert settings.rag_top_k == 10
assert settings.rag_rerank_top_k == 5
assert settings.rag_collection == "documents"
assert settings.stt_language is None # Auto-detect
assert settings.tts_language == "en"
assert settings.include_transcription is True
assert settings.include_sources is False
def test_custom_settings(self, monkeypatch):
"""Test settings from environment."""
monkeypatch.setenv("RAG_TOP_K", "20")
monkeypatch.setenv("RAG_COLLECTION", "custom_collection")
# Note: Would need to re-instantiate settings to pick up env vars
settings = VoiceSettings(rag_top_k=20, rag_collection="custom_collection")
assert settings.rag_top_k == 20
assert settings.rag_collection == "custom_collection"
class TestVoiceAssistant:
"""Tests for VoiceAssistant handler."""
@pytest.fixture
def handler(self):
"""Create handler with mocked clients."""
with (
patch("voice_assistant.STTClient"),
patch("voice_assistant.EmbeddingsClient"),
patch("voice_assistant.RerankerClient"),
patch("voice_assistant.LLMClient"),
patch("voice_assistant.TTSClient"),
patch("voice_assistant.MilvusClient"),
):
handler = VoiceAssistant()
# Setup mock clients
handler.stt = AsyncMock()
handler.embeddings = AsyncMock()
handler.reranker = AsyncMock()
handler.llm = AsyncMock()
handler.tts = AsyncMock()
handler.milvus = AsyncMock()
handler.nats = AsyncMock()
yield handler
def test_init(self, handler):
"""Test handler initialization."""
assert handler.subject == "voice.request"
assert handler.queue_group == "voice-assistants"
assert handler.voice_settings.service_name == "voice-assistant"
@pytest.mark.asyncio
async def test_handle_message_success(
self,
handler,
mock_nats_message,
mock_voice_request,
sample_embedding,
sample_documents,
sample_reranked,
):
"""Test successful voice request handling."""
# Setup mocks
handler.stt.transcribe.return_value = {"text": "What is machine learning?"}
handler.embeddings.embed_single.return_value = sample_embedding
handler.milvus.search_with_texts.return_value = sample_documents
handler.reranker.rerank.return_value = sample_reranked
handler.llm.generate.return_value = "Machine learning is a type of AI."
handler.tts.synthesize.return_value = b"audio_bytes"
# Execute
result = await handler.handle_message(mock_nats_message, mock_voice_request)
# Verify
assert result["request_id"] == "test-request-123"
assert result["response"] == "Machine learning is a type of AI."
assert "audio" in result
assert result["transcription"] == "What is machine learning?"
# Verify pipeline was called
handler.stt.transcribe.assert_called_once()
handler.embeddings.embed_single.assert_called_once()
handler.milvus.search_with_texts.assert_called_once()
handler.reranker.rerank.assert_called_once()
handler.llm.generate.assert_called_once()
handler.tts.synthesize.assert_called_once()
@pytest.mark.asyncio
async def test_handle_message_empty_transcription(
self,
handler,
mock_nats_message,
mock_voice_request,
):
"""Test handling when transcription is empty."""
handler.stt.transcribe.return_value = {"text": ""}
result = await handler.handle_message(mock_nats_message, mock_voice_request)
assert "error" in result
assert result["error"] == "Could not transcribe audio"
# Verify pipeline stopped after transcription
handler.embeddings.embed_single.assert_not_called()
@pytest.mark.asyncio
async def test_handle_message_with_sources(
self,
handler,
mock_nats_message,
mock_voice_request,
sample_embedding,
sample_documents,
sample_reranked,
):
"""Test response includes sources when enabled."""
handler.voice_settings.include_sources = True
# Setup mocks
handler.stt.transcribe.return_value = {"text": "Hello"}
handler.embeddings.embed_single.return_value = sample_embedding
handler.milvus.search_with_texts.return_value = sample_documents
handler.reranker.rerank.return_value = sample_reranked
handler.llm.generate.return_value = "Hi there!"
handler.tts.synthesize.return_value = b"audio"
result = await handler.handle_message(mock_nats_message, mock_voice_request)
assert "sources" in result
assert len(result["sources"]) <= 3
def test_build_context(self, handler):
"""Test context building from documents."""
documents = [
{"document": "First doc content"},
{"document": "Second doc content"},
]
context = handler._build_context(documents)
assert "First doc content" in context
assert "Second doc content" in context
@pytest.mark.asyncio
async def test_setup_initializes_clients(self):
"""Test that setup initializes all clients."""
with (
patch("voice_assistant.STTClient") as stt_cls,
patch("voice_assistant.EmbeddingsClient") as emb_cls,
patch("voice_assistant.RerankerClient") as rer_cls,
patch("voice_assistant.LLMClient") as llm_cls,
patch("voice_assistant.TTSClient") as tts_cls,
patch("voice_assistant.MilvusClient") as mil_cls,
):
mil_cls.return_value.connect = AsyncMock()
handler = VoiceAssistant()
await handler.setup()
stt_cls.assert_called_once()
emb_cls.assert_called_once()
rer_cls.assert_called_once()
llm_cls.assert_called_once()
tts_cls.assert_called_once()
mil_cls.assert_called_once()
@pytest.mark.asyncio
async def test_teardown_closes_clients(self, handler):
"""Test that teardown closes all clients."""
await handler.teardown()
handler.stt.close.assert_called_once()
handler.embeddings.close.assert_called_once()
handler.reranker.close.assert_called_once()
handler.llm.close.assert_called_once()
handler.tts.close.assert_called_once()
handler.milvus.close.assert_called_once()

2638
uv.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,226 +0,0 @@
#!/usr/bin/env python3
"""
Voice Assistant Service (Refactored)
End-to-end voice assistant pipeline using handler-base:
1. Listen for audio on NATS subject "voice.request"
2. Transcribe with Whisper (STT)
3. Generate embeddings for RAG
4. Retrieve context from Milvus
5. Rerank with BGE reranker
6. Generate response with vLLM
7. Synthesize speech with XTTS
8. Publish result to NATS "voice.response.{request_id}"
"""
import base64
import logging
from typing import Any, Optional
from nats.aio.msg import Msg
from handler_base import Handler, Settings
from handler_base.clients import (
EmbeddingsClient,
RerankerClient,
LLMClient,
TTSClient,
STTClient,
MilvusClient,
)
from handler_base.telemetry import create_span
logger = logging.getLogger("voice-assistant")
class VoiceSettings(Settings):
"""Voice assistant specific settings."""
service_name: str = "voice-assistant"
# RAG settings
rag_top_k: int = 10
rag_rerank_top_k: int = 5
rag_collection: str = "documents"
# Audio settings
stt_language: Optional[str] = None # Auto-detect
tts_language: str = "en"
# Response settings
include_transcription: bool = True
include_sources: bool = False
class VoiceAssistant(Handler):
"""
Voice request handler with full STT -> RAG -> LLM -> TTS pipeline.
Request format (msgpack):
{
"request_id": "uuid",
"audio": "base64 encoded audio",
"language": "optional language code",
"collection": "optional collection name"
}
Response format:
{
"request_id": "uuid",
"transcription": "what the user said",
"response": "generated text response",
"audio": "base64 encoded response audio"
}
"""
def __init__(self):
self.voice_settings = VoiceSettings()
super().__init__(
subject="voice.request",
settings=self.voice_settings,
queue_group="voice-assistants",
)
async def setup(self) -> None:
"""Initialize service clients."""
logger.info("Initializing voice assistant clients...")
self.stt = STTClient(self.voice_settings)
self.embeddings = EmbeddingsClient(self.voice_settings)
self.reranker = RerankerClient(self.voice_settings)
self.llm = LLMClient(self.voice_settings)
self.tts = TTSClient(self.voice_settings)
self.milvus = MilvusClient(self.voice_settings)
await self.milvus.connect(self.voice_settings.rag_collection)
logger.info("Voice assistant clients initialized")
async def teardown(self) -> None:
"""Clean up service clients."""
logger.info("Closing voice assistant clients...")
await self.stt.close()
await self.embeddings.close()
await self.reranker.close()
await self.llm.close()
await self.tts.close()
await self.milvus.close()
logger.info("Voice assistant clients closed")
async def handle_message(self, msg: Msg, data: Any) -> Optional[dict]:
"""Handle incoming voice request."""
request_id = data.get("request_id", "unknown")
audio_b64 = data.get("audio", "")
language = data.get("language", self.voice_settings.stt_language)
collection = data.get("collection", self.voice_settings.rag_collection)
logger.info(f"Processing voice request {request_id}")
with create_span("voice.process") as span:
if span:
span.set_attribute("request.id", request_id)
# 1. Decode audio
audio_bytes = base64.b64decode(audio_b64)
# 2. Transcribe audio to text
transcription = await self._transcribe(audio_bytes, language)
query = transcription.get("text", "")
if not query.strip():
logger.warning(f"Empty transcription for request {request_id}")
return {
"request_id": request_id,
"error": "Could not transcribe audio",
}
logger.info(f"Transcribed: {query[:50]}...")
# 3. Generate query embedding
embedding = await self._get_embedding(query)
# 4. Search Milvus for context
documents = await self._search_context(embedding, collection)
# 5. Rerank documents
reranked = await self._rerank_documents(query, documents)
# 6. Build context
context = self._build_context(reranked)
# 7. Generate LLM response
response_text = await self._generate_response(query, context)
# 8. Synthesize speech
response_audio = await self._synthesize_speech(response_text)
# Build response
result = {
"request_id": request_id,
"response": response_text,
"audio": response_audio,
}
if self.voice_settings.include_transcription:
result["transcription"] = query
if self.voice_settings.include_sources:
result["sources"] = [
{"text": d["document"][:200], "score": d["score"]} for d in reranked[:3]
]
logger.info(f"Completed voice request {request_id}")
# Publish to response subject
response_subject = f"voice.response.{request_id}"
await self.nats.publish(response_subject, result)
return result
async def _transcribe(self, audio: bytes, language: Optional[str]) -> dict:
"""Transcribe audio to text."""
with create_span("voice.stt"):
return await self.stt.transcribe(audio, language=language)
async def _get_embedding(self, text: str) -> list[float]:
"""Generate embedding for query text."""
with create_span("voice.embedding"):
return await self.embeddings.embed_single(text)
async def _search_context(self, embedding: list[float], collection: str) -> list[dict]:
"""Search Milvus for relevant documents."""
with create_span("voice.search"):
return await self.milvus.search_with_texts(
embedding,
limit=self.voice_settings.rag_top_k,
text_field="text",
)
async def _rerank_documents(self, query: str, documents: list[dict]) -> list[dict]:
"""Rerank documents by relevance."""
with create_span("voice.rerank"):
texts = [d.get("text", "") for d in documents]
return await self.reranker.rerank(
query, texts, top_k=self.voice_settings.rag_rerank_top_k
)
def _build_context(self, documents: list[dict]) -> str:
"""Build context string from ranked documents."""
return "\n\n".join(d.get("document", "") for d in documents)
async def _generate_response(self, query: str, context: str) -> str:
"""Generate LLM response."""
with create_span("voice.generate"):
return await self.llm.generate(query, context=context)
async def _synthesize_speech(self, text: str) -> str:
"""Synthesize speech and return base64."""
with create_span("voice.tts"):
audio_bytes = await self.tts.synthesize(text, language=self.voice_settings.tts_language)
return base64.b64encode(audio_bytes).decode()
if __name__ == "__main__":
VoiceAssistant().run()