12 Commits

Author SHA1 Message Date
c971f34017 fix: use type=raw for Docker tags to preserve v prefix
Some checks failed
CI / Lint (push) Successful in 2m10s
CI / Notify (push) Successful in 1s
CI / Release (push) Successful in 1m27s
CI / Docker Build & Push (push) Failing after 2m44s
CI / Test (push) Successful in 3m2s
docker/metadata-action type=semver strips the v prefix, causing
tag mismatch between git tags (v0.1.3) and Docker tags (0.1.3).
Switch to type=raw to pass through the version as-is.
2026-02-22 09:58:43 -05:00
b97849e50d fix: switch Docker build to plain docker build/push with insecure registry
All checks were successful
CI / Test (push) Successful in 3m20s
CI / Release (push) Successful in 1m27s
CI / Lint (push) Successful in 2m59s
CI / Docker Build & Push (push) Successful in 3m29s
CI / Notify (push) Successful in 1s
- Drop buildx (setup-buildx-action, build-push-action)
- Use insecure HTTP registry with SIGHUP daemon reload
- Use org-level PAT secrets for registry auth
2026-02-21 22:37:55 -05:00
a8b5d9153d fix: use docker login CLI instead of login-action for Gitea compat
Some checks failed
CI / Lint (push) Successful in 2m55s
CI / Release (push) Successful in 1m46s
CI / Test (push) Successful in 2m57s
CI / Notify (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
docker/login-action@v3 fails with 'Username and password required' on
Gitea Actions — secrets not passed to action with: inputs. Switch to
direct docker login CLI which reliably interpolates secrets in run: steps.
2026-02-21 19:34:28 -05:00
c28ceadb54 fix: switch Docker registry to HTTPS endpoint with login-action
Some checks failed
CI / Lint (push) Successful in 2m38s
CI / Test (push) Successful in 2m54s
CI / Release (push) Successful in 1m13s
CI / Docker Build & Push (push) Failing after 9m8s
CI / Notify (push) Successful in 2s
- Replace gitea-http.gitea.svc.cluster.local:3000 with registry.lab.daviestechlabs.io
- Use docker/login-action@v3 for Gitea registry auth (proper buildx integration)
- Remove manual base64 auth to ~/.docker/config.json (not picked up by buildkit)
- Remove insecure registry daemon.json config and Docker restart
- Remove buildkitd insecure registry config
- Remove cache-from/cache-to type=gha (not supported on Gitea Actions)

Fixes 401 Unauthorized: reqPackageAccess on Docker push
2026-02-21 18:05:41 -05:00
66ef758808 feat: migrate from msgpack to protobuf (handler-base v1.0.0)
Some checks failed
CI / Lint (push) Successful in 3m9s
CI / Test (push) Successful in 2m42s
CI / Release (push) Successful in 1m0s
CI / Notify (push) Successful in 2s
CI / Docker Build & Push (push) Failing after 9m28s
- Replace msgpack encoding with protobuf wire format
- Update field names to proto convention
- Change Parameters type from map[string]any to map[string]string
- Rewrite tests for proto round-trips
2026-02-21 15:30:17 -05:00
338210eb74 chore: bump handler-base to v0.1.5, add netrc secret mount to Dockerfile
Some checks failed
CI / Lint (push) Successful in 2m56s
CI / Test (push) Successful in 2m53s
CI / Release (push) Successful in 1m34s
CI / Docker Build & Push (push) Failing after 7m4s
CI / Notify (push) Successful in 1s
2026-02-20 18:17:58 -05:00
8b50306568 ci: retrigger build
Some checks failed
CI / Lint (push) Successful in 2m41s
CI / Test (push) Successful in 2m23s
CI / Release (push) Successful in 1m15s
CI / Docker Build & Push (push) Failing after 6m26s
CI / Notify (push) Successful in 3s
2026-02-20 10:05:22 -05:00
1bdf92c376 fix: add GOPRIVATE and git auth for private handler-base module
Some checks failed
CI / Lint (push) Successful in 2m33s
CI / Test (push) Successful in 2m36s
CI / Release (push) Successful in 1m20s
CI / Docker Build & Push (push) Failing after 6m44s
CI / Notify (push) Successful in 1s
- Set GOPRIVATE=git.daviestechlabs.io to bypass public Go module proxy
- Configure git URL insteadOf with DISPATCH_TOKEN for private repo access
2026-02-20 09:22:36 -05:00
c832a837eb fix: rename GITEA_TOKEN to DISPATCH_TOKEN
Some checks failed
CI / Lint (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Release (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
CI / Notify (push) Has been cancelled
2026-02-20 09:10:21 -05:00
b6f0e9b88f ci: add handler-base auto-update workflow, remove old Python CI
Some checks failed
CI / Test (push) Has been cancelled
CI / Release (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
CI / Notify (push) Has been cancelled
CI / Lint (push) Has been cancelled
2026-02-20 09:06:00 -05:00
43fb5afcc5 fix: use tagged handler-base v0.1.3, remove local replace directive
Some checks failed
CI / Lint (push) Failing after 1m20s
CI / Release (push) Has been cancelled
CI / Notify (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
CI / Test (push) Failing after 1m22s
2026-02-20 09:00:46 -05:00
de4fa6ea90 fix: resolve golangci-lint errcheck warnings
Some checks failed
CI / Release (push) Has been cancelled
CI / Docker Build & Push (push) Has been cancelled
CI / Notify (push) Has been cancelled
CI / Test (push) Has been cancelled
CI / Lint (push) Has been cancelled
- Add error checks for unchecked return values (errcheck)
- Remove unused struct fields (unused)
- Fix gofmt formatting issues
2026-02-20 08:45:25 -05:00
9 changed files with 156 additions and 235 deletions

View File

@@ -8,6 +8,7 @@ on:
env: env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80 NTFY_URL: http://ntfy.observability.svc.cluster.local:80
GOPRIVATE: git.daviestechlabs.io
REGISTRY: gitea-http.gitea.svc.cluster.local:3000/daviestechlabs REGISTRY: gitea-http.gitea.svc.cluster.local:3000/daviestechlabs
REGISTRY_HOST: gitea-http.gitea.svc.cluster.local:3000 REGISTRY_HOST: gitea-http.gitea.svc.cluster.local:3000
IMAGE_NAME: pipeline-bridge IMAGE_NAME: pipeline-bridge
@@ -26,6 +27,9 @@ jobs:
go-version-file: go.mod go-version-file: go.mod
cache: true cache: true
- name: Configure private modules
run: git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
- name: Run go vet - name: Run go vet
run: go vet ./... run: go vet ./...
@@ -50,6 +54,9 @@ jobs:
go-version-file: go.mod go-version-file: go.mod
cache: true cache: true
- name: Configure private modules
run: git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
- name: Verify dependencies - name: Verify dependencies
run: go mod verify run: go mod verify
@@ -114,42 +121,19 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Set up Docker Buildx - name: Configure insecure registry
uses: docker/setup-buildx-action@v3
with:
buildkitd-config-inline: |
[registry."gitea-http.gitea.svc.cluster.local:3000"]
http = true
insecure = true
- name: Login to Docker Hub
if: vars.DOCKERHUB_USERNAME != ''
uses: docker/login-action@v3
with:
username: ${{ vars.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Configure Docker for insecure registry
run: | run: |
sudo mkdir -p /etc/docker sudo mkdir -p /etc/docker
echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json
sudo systemctl restart docker || sudo service docker restart || true sudo kill -SIGHUP "$(pidof dockerd)" || true
sleep 2 sleep 3
- name: Login to Gitea Registry - name: Login to Gitea Registry
run: | run: echo "${{ secrets.REGISTRY_TOKEN }}" | docker login "${{ env.REGISTRY_HOST }}" -u "${{ secrets.REGISTRY_USER }}" --password-stdin
AUTH=$(echo -n "${{ secrets.REGISTRY_USER }}:${{ secrets.REGISTRY_TOKEN }}" | base64 -w0)
mkdir -p ~/.docker - name: Login to Docker Hub
cat > ~/.docker/config.json << EOF if: vars.DOCKERHUB_USERNAME != ''
{ run: echo "${{ secrets.DOCKERHUB_TOKEN }}" | docker login -u "${{ vars.DOCKERHUB_USERNAME }}" --password-stdin
"auths": {
"${{ env.REGISTRY_HOST }}": {
"auth": "$AUTH"
}
}
}
EOF
echo "Auth configured for ${{ env.REGISTRY_HOST }}"
- name: Extract metadata - name: Extract metadata
id: meta id: meta
@@ -157,19 +141,25 @@ jobs:
with: with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: | tags: |
type=semver,pattern={{version}},value=${{ needs.release.outputs.version }} type=raw,value=${{ needs.release.outputs.version }}
type=semver,pattern={{major}}.{{minor}},value=${{ needs.release.outputs.version }}
type=raw,value=latest,enable={{is_default_branch}} type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v5 run: |
with: # Build with all tags
context: . TAGS=""
push: true while IFS= read -r tag; do
tags: ${{ steps.meta.outputs.tags }} [ -n "$tag" ] && TAGS="$TAGS -t $tag"
labels: ${{ steps.meta.outputs.labels }} done <<< "${{ steps.meta.outputs.tags }}"
cache-from: type=gha docker build $TAGS \
cache-to: type=gha,mode=max --label "org.opencontainers.image.source=${{ gitea.server_url }}/${{ gitea.repository }}" \
--label "org.opencontainers.image.revision=${{ gitea.sha }}" \
.
# Push each tag
while IFS= read -r tag; do
[ -n "$tag" ] && docker push "$tag"
done <<< "${{ steps.meta.outputs.tags }}"
notify: notify:
name: Notify name: Notify

View File

@@ -1,129 +0,0 @@
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Set up Python
run: uv python install 3.13
- name: Install dependencies
run: uv sync --frozen --extra dev
- name: Run ruff check
run: uv run ruff check .
- name: Run ruff format check
run: uv run ruff format --check .
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Set up Python
run: uv python install 3.13
- name: Install dependencies
run: uv sync --frozen --extra dev
- name: Run tests
run: uv run pytest -v
release:
name: Release
runs-on: ubuntu-latest
needs: [lint, test]
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Determine version bump
id: version
run: |
# Get latest tag or default to v0.0.0
LATEST=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
VERSION=${LATEST#v}
IFS='.' read -r MAJOR MINOR PATCH <<< "$VERSION"
# Check commit message for keywords
MSG="${{ gitea.event.head_commit.message }}"
if echo "$MSG" | grep -qiE "^major:|BREAKING CHANGE"; then
MAJOR=$((MAJOR + 1)); MINOR=0; PATCH=0
BUMP="major"
elif echo "$MSG" | grep -qiE "^(minor:|feat:)"; then
MINOR=$((MINOR + 1)); PATCH=0
BUMP="minor"
else
PATCH=$((PATCH + 1))
BUMP="patch"
fi
NEW_VERSION="v${MAJOR}.${MINOR}.${PATCH}"
echo "version=$NEW_VERSION" >> $GITHUB_OUTPUT
echo "bump=$BUMP" >> $GITHUB_OUTPUT
echo "Bumping $LATEST → $NEW_VERSION ($BUMP)"
- name: Create and push tag
run: |
git config user.name "gitea-actions[bot]"
git config user.email "actions@git.daviestechlabs.io"
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
git push origin ${{ steps.version.outputs.version }}
notify:
name: Notify
runs-on: ubuntu-latest
needs: [lint, test, release]
if: always()
steps:
- name: Notify on success
if: needs.lint.result == 'success' && needs.test.result == 'success'
run: |
curl -s \
-H "Title: ✅ CI Passed: ${{ gitea.repository }}" \
-H "Priority: default" \
-H "Tags: white_check_mark,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Release: ${{ needs.release.result == 'success' && 'created' || 'skipped' }}" \
${{ env.NTFY_URL }}/gitea-ci
- name: Notify on failure
if: needs.lint.result == 'failure' || needs.test.result == 'failure'
run: |
curl -s \
-H "Title: ❌ CI Failed: ${{ gitea.repository }}" \
-H "Priority: high" \
-H "Tags: x,github" \
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
-d "Branch: ${{ gitea.ref_name }}
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
Lint: ${{ needs.lint.result }}
Test: ${{ needs.test.result }}" \
${{ env.NTFY_URL }}/gitea-ci

View File

@@ -0,0 +1,60 @@
name: Update handler-base
on:
repository_dispatch:
types: [handler-base-release]
env:
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
GOPRIVATE: git.daviestechlabs.io
jobs:
update:
name: Update handler-base dependency
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
token: ${{ secrets.DISPATCH_TOKEN }}
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- name: Configure Git
run: |
git config user.name "gitea-actions[bot]"
git config user.email "actions@git.daviestechlabs.io"
git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
- name: Update handler-base
run: |
VERSION="${{ gitea.event.client_payload.version }}"
echo "Updating handler-base to ${VERSION}"
go get git.daviestechlabs.io/daviestechlabs/handler-base@${VERSION}
go mod tidy
- name: Commit and push
run: |
VERSION="${{ gitea.event.client_payload.version }}"
if git diff --quiet go.mod go.sum; then
echo "No changes to commit"
exit 0
fi
git add go.mod go.sum
git commit -m "chore(deps): bump handler-base to ${VERSION}"
git push
- name: Notify
if: success()
run: |
VERSION="${{ gitea.event.client_payload.version }}"
curl -s \
-H "Title: 📦 Dep Update: ${{ gitea.repository }}" \
-H "Priority: default" \
-H "Tags: package,github" \
-d "handler-base updated to ${VERSION}" \
${{ env.NTFY_URL }}/gitea-ci

View File

@@ -4,11 +4,14 @@ FROM golang:1.25-alpine AS builder
WORKDIR /app WORKDIR /app
# Install ca-certificates for HTTPS # Install ca-certificates for HTTPS
RUN apk add --no-cache ca-certificates RUN apk add --no-cache ca-certificates git
ENV GOPRIVATE=git.daviestechlabs.io
ENV GONOSUMCHECK=git.daviestechlabs.io
# Copy go mod files # Copy go mod files
COPY go.mod go.sum ./ COPY go.mod go.sum ./
RUN go mod download RUN --mount=type=secret,id=netrc,target=/root/.netrc go mod download
# Copy source code # Copy source code
COPY . . COPY . .

View File

@@ -2,6 +2,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"sync/atomic" "sync/atomic"
@@ -19,18 +20,18 @@ func TestSubmitArgoE2E_FullPayload(t *testing.T) {
var receivedBody map[string]any var receivedBody map[string]any
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewDecoder(r.Body).Decode(&receivedBody) _ = json.NewDecoder(r.Body).Decode(&receivedBody)
json.NewEncoder(w).Encode(map[string]any{ _ = json.NewEncoder(w).Encode(map[string]any{
"metadata": map[string]any{"name": "doc-ingest-xyz"}, "metadata": map[string]any{"name": "doc-ingest-xyz"},
}) })
})) }))
defer ts.Close() defer ts.Close()
ctx := t.Context() ctx := t.Context()
params := map[string]any{ params := map[string]string{
"source": "s3://bucket/docs", "source": "s3://bucket/docs",
"collection": "knowledge-base", "collection": "knowledge-base",
"batch_size": 100, "batch_size": "100",
} }
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001") runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001")
@@ -72,17 +73,17 @@ func TestSubmitKubeflowE2E_FullPayload(t *testing.T) {
var receivedBody map[string]any var receivedBody map[string]any
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewDecoder(r.Body).Decode(&receivedBody) _ = json.NewDecoder(r.Body).Decode(&receivedBody)
json.NewEncoder(w).Encode(map[string]any{ _ = json.NewEncoder(w).Encode(map[string]any{
"run": map[string]any{"id": "kf-run-e2e-789"}, "run": map[string]any{"id": "kf-run-e2e-789"},
}) })
})) }))
defer ts.Close() defer ts.Close()
ctx := t.Context() ctx := t.Context()
params := map[string]any{ params := map[string]string{
"query": "what is kubernetes", "query": "what is kubernetes",
"top_k": 5, "top_k": "5",
"user_id": "test-user", "user_id": "test-user",
} }
@@ -131,7 +132,7 @@ func TestPipelineDispatchE2E_UnknownPipeline(t *testing.T) {
names = append(names, k) names = append(names, k)
} }
resp := &messages.PipelineStatus{ resp := &messages.PipelineStatus{
RequestID: "req-bad", RequestId: "req-bad",
Status: "error", Status: "error",
Error: "Unknown pipeline: nonexistent-pipeline", Error: "Unknown pipeline: nonexistent-pipeline",
AvailablePipelines: names, AvailablePipelines: names,
@@ -150,7 +151,7 @@ func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count.Add(1) count.Add(1)
json.NewEncoder(w).Encode(map[string]any{ _ = json.NewEncoder(w).Encode(map[string]any{
"metadata": map[string]any{"name": "concurrent-wf"}, "metadata": map[string]any{"name": "concurrent-wf"},
}) })
})) }))
@@ -162,7 +163,7 @@ func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
go func() { go func() {
_, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference", _, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference",
map[string]any{"batch": i}, "req-concurrent") map[string]string{"batch": fmt.Sprintf("%d", i)}, "req-concurrent")
errs <- err errs <- err
}() }()
} }
@@ -184,30 +185,30 @@ func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) {
func BenchmarkSubmitArgo(b *testing.B) { func BenchmarkSubmitArgo(b *testing.B) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"metadata":{"name":"bench-wf"}}`)) _, _ = w.Write([]byte(`{"metadata":{"name":"bench-wf"}}`))
})) }))
defer ts.Close() defer ts.Close()
ctx := b.Context() ctx := b.Context()
params := map[string]any{"source": "test"} params := map[string]string{"source": "test"}
b.ResetTimer() b.ResetTimer()
for b.Loop() { for b.Loop() {
submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "bench-req") _, _ = submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "bench-req")
} }
} }
func BenchmarkSubmitKubeflow(b *testing.B) { func BenchmarkSubmitKubeflow(b *testing.B) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"run":{"id":"bench-run"}}`)) _, _ = w.Write([]byte(`{"run":{"id":"bench-run"}}`))
})) }))
defer ts.Close() defer ts.Close()
ctx := b.Context() ctx := b.Context()
params := map[string]any{"query": "test"} params := map[string]string{"query": "test"}
b.ResetTimer() b.ResetTimer()
for b.Loop() { for b.Loop() {
submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", params, "bench-req") _, _ = submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", params, "bench-req")
} }
} }

8
go.mod
View File

@@ -3,8 +3,9 @@ module git.daviestechlabs.io/daviestechlabs/pipeline-bridge
go 1.25.1 go 1.25.1
require ( require (
git.daviestechlabs.io/daviestechlabs/handler-base v0.0.0 git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0
github.com/nats-io/nats.go v1.48.0 github.com/nats-io/nats.go v1.48.0
google.golang.org/protobuf v1.36.11
) )
require ( require (
@@ -18,8 +19,6 @@ require (
github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/compress v1.18.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect go.opentelemetry.io/otel v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
@@ -37,7 +36,4 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/grpc v1.78.0 // indirect google.golang.org/grpc v1.78.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
) )
replace git.daviestechlabs.io/daviestechlabs/handler-base => ../handler-base

6
go.sum
View File

@@ -1,3 +1,5 @@
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0 h1:pB3ehOKaDYQfbyRBKQXrB9curqSFteLrDveoElRKnBY=
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0/go.mod h1:zocOHFt8yY3cW4+Xi37sNr5Tw7KcjGFSZqgWYxPWyqA=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@@ -31,10 +33,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=

32
main.go
View File

@@ -17,6 +17,7 @@ import (
"git.daviestechlabs.io/daviestechlabs/handler-base/handler" "git.daviestechlabs.io/daviestechlabs/handler-base/handler"
"git.daviestechlabs.io/daviestechlabs/handler-base/messages" "git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil" "git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
"google.golang.org/protobuf/proto"
) )
// Pipeline definitions — maps pipeline name to engine config. // Pipeline definitions — maps pipeline name to engine config.
@@ -47,20 +48,20 @@ func main() {
h := handler.New("ai.pipeline.trigger", cfg) h := handler.New("ai.pipeline.trigger", cfg)
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) { h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
req, err := natsutil.Decode[messages.PipelineTrigger](msg.Data) var req messages.PipelineTrigger
if err != nil { if err := natsutil.Decode(msg.Data, &req); err != nil {
return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil
} }
requestID := req.RequestID requestID := req.RequestId
if requestID == "" { if requestID == "" {
requestID = "unknown" requestID = "unknown"
} }
pipelineName := req.Pipeline pipelineName := req.Pipeline
params := req.Parameters params := req.Parameters
if params == nil { if params == nil {
params = map[string]any{} params = map[string]string{}
} }
slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID) slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID)
@@ -73,7 +74,7 @@ func main() {
names = append(names, k) names = append(names, k)
} }
return &messages.PipelineStatus{ return &messages.PipelineStatus{
RequestID: requestID, RequestId: requestID,
Status: "error", Status: "error",
Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName), Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName),
AvailablePipelines: names, AvailablePipelines: names,
@@ -81,6 +82,7 @@ func main() {
} }
var runID string var runID string
var err error
if pipeline.Engine == "argo" { if pipeline.Engine == "argo" {
runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID) runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID)
@@ -91,16 +93,16 @@ func main() {
if err != nil { if err != nil {
slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err) slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err)
return &messages.PipelineStatus{ return &messages.PipelineStatus{
RequestID: requestID, RequestId: requestID,
Status: "error", Status: "error",
Error: err.Error(), Error: err.Error(),
}, nil }, nil
} }
result := &messages.PipelineStatus{ result := &messages.PipelineStatus{
RequestID: requestID, RequestId: requestID,
Status: "submitted", Status: "submitted",
RunID: runID, RunId: runID,
Engine: pipeline.Engine, Engine: pipeline.Engine,
Pipeline: pipelineName, Pipeline: pipelineName,
SubmittedAt: time.Now().UTC().Format(time.RFC3339), SubmittedAt: time.Now().UTC().Format(time.RFC3339),
@@ -118,10 +120,10 @@ func main() {
} }
} }
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]any, requestID string) (string, error) { func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]string, requestID string) (string, error) {
argoParams := make([]map[string]string, 0, len(params)) argoParams := make([]map[string]string, 0, len(params))
for k, v := range params { for k, v := range params {
argoParams = append(argoParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)}) argoParams = append(argoParams, map[string]string{"name": k, "value": v})
} }
workflow := map[string]any{ workflow := map[string]any{
@@ -151,7 +153,7 @@ func submitArgo(ctx context.Context, client *http.Client, host, namespace, templ
if err != nil { if err != nil {
return "", fmt.Errorf("argo request: %w", err) return "", fmt.Errorf("argo request: %w", err)
} }
defer resp.Body.Close() defer func() { _ = resp.Body.Close() }()
respBody, _ := io.ReadAll(resp.Body) respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {
@@ -169,10 +171,10 @@ func submitArgo(ctx context.Context, client *http.Client, host, namespace, templ
return result.Metadata.Name, nil return result.Metadata.Name, nil
} }
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]any, requestID string) (string, error) { func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]string, requestID string) (string, error) {
kfParams := make([]map[string]string, 0, len(params)) kfParams := make([]map[string]string, 0, len(params))
for k, v := range params { for k, v := range params {
kfParams = append(kfParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)}) kfParams = append(kfParams, map[string]string{"name": k, "value": v})
} }
runRequest := map[string]any{ runRequest := map[string]any{
@@ -196,7 +198,7 @@ func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID s
if err != nil { if err != nil {
return "", fmt.Errorf("kubeflow request: %w", err) return "", fmt.Errorf("kubeflow request: %w", err)
} }
defer resp.Body.Close() defer func() { _ = resp.Body.Close() }()
respBody, _ := io.ReadAll(resp.Body) respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {

View File

@@ -8,25 +8,25 @@ import (
"git.daviestechlabs.io/daviestechlabs/handler-base/messages" "git.daviestechlabs.io/daviestechlabs/handler-base/messages"
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil" "git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
"github.com/vmihailenco/msgpack/v5" "google.golang.org/protobuf/proto"
) )
func TestPipelineTriggerDecode(t *testing.T) { func TestPipelineTriggerDecode(t *testing.T) {
req := messages.PipelineTrigger{ req := &messages.PipelineTrigger{
RequestID: "req-001", RequestId: "req-001",
Pipeline: "document-ingestion", Pipeline: "document-ingestion",
Parameters: map[string]any{"source": "s3://bucket"}, Parameters: map[string]string{"source": "s3://bucket"},
} }
data, err := msgpack.Marshal(&req) data, err := proto.Marshal(req)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
decoded, err := natsutil.Decode[messages.PipelineTrigger](data) var decoded messages.PipelineTrigger
if err != nil { if err := natsutil.Decode(data, &decoded); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if decoded.RequestID != "req-001" { if decoded.RequestId != "req-001" {
t.Errorf("RequestID = %q", decoded.RequestID) t.Errorf("RequestID = %q", decoded.RequestId)
} }
if decoded.Pipeline != "document-ingestion" { if decoded.Pipeline != "document-ingestion" {
t.Errorf("Pipeline = %q", decoded.Pipeline) t.Errorf("Pipeline = %q", decoded.Pipeline)
@@ -38,22 +38,22 @@ func TestPipelineTriggerDecode(t *testing.T) {
func TestPipelineStatusRoundtrip(t *testing.T) { func TestPipelineStatusRoundtrip(t *testing.T) {
status := messages.PipelineStatus{ status := messages.PipelineStatus{
RequestID: "req-002", RequestId: "req-002",
Status: "submitted", Status: "submitted",
RunID: "argo-abc123", RunId: "argo-abc123",
Engine: "argo", Engine: "argo",
Pipeline: "batch-inference", Pipeline: "batch-inference",
} }
data, err := msgpack.Marshal(&status) data, err := proto.Marshal(&status)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
var got messages.PipelineStatus var got messages.PipelineStatus
if err := msgpack.Unmarshal(data, &got); err != nil { if err := proto.Unmarshal(data, &got); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if got.RunID != "argo-abc123" { if got.RunId != "argo-abc123" {
t.Errorf("RunID = %q", got.RunID) t.Errorf("RunID = %q", got.RunId)
} }
if got.Engine != "argo" { if got.Engine != "argo" {
t.Errorf("Engine = %q", got.Engine) t.Errorf("Engine = %q", got.Engine)
@@ -111,14 +111,14 @@ func TestSubmitArgo(t *testing.T) {
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{ _ = json.NewEncoder(w).Encode(map[string]any{
"metadata": map[string]any{"name": "document-ingestion-abc123"}, "metadata": map[string]any{"name": "document-ingestion-abc123"},
}) })
})) }))
defer ts.Close() defer ts.Close()
ctx := t.Context() ctx := t.Context()
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]any{ runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]string{
"source": "test", "source": "test",
}, "req-001") }, "req-001")
if err != nil { if err != nil {
@@ -132,7 +132,7 @@ func TestSubmitArgo(t *testing.T) {
func TestSubmitArgoError(t *testing.T) { func TestSubmitArgoError(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(`{"message":"bad request"}`)) _, _ = w.Write([]byte(`{"message":"bad request"}`))
})) }))
defer ts.Close() defer ts.Close()
@@ -153,14 +153,14 @@ func TestSubmitKubeflow(t *testing.T) {
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{ _ = json.NewEncoder(w).Encode(map[string]any{
"run": map[string]any{"id": "kf-run-456"}, "run": map[string]any{"id": "kf-run-456"},
}) })
})) }))
defer ts.Close() defer ts.Close()
ctx := t.Context() ctx := t.Context()
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]any{ runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]string{
"query": "test", "query": "test",
}, "req-002") }, "req-002")
if err != nil { if err != nil {
@@ -174,7 +174,7 @@ func TestSubmitKubeflow(t *testing.T) {
func TestSubmitKubeflowError(t *testing.T) { func TestSubmitKubeflowError(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("internal error")) _, _ = w.Write([]byte("internal error"))
})) }))
defer ts.Close() defer ts.Close()