Compare commits
9 Commits
509f392185
...
v0.1.1
| Author | SHA1 | Date | |
|---|---|---|---|
| c28ceadb54 | |||
| 66ef758808 | |||
| 338210eb74 | |||
| 8b50306568 | |||
| 1bdf92c376 | |||
| c832a837eb | |||
| b6f0e9b88f | |||
| 43fb5afcc5 | |||
| de4fa6ea90 |
@@ -8,8 +8,9 @@ on:
|
|||||||
|
|
||||||
env:
|
env:
|
||||||
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
|
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
|
||||||
REGISTRY: gitea-http.gitea.svc.cluster.local:3000/daviestechlabs
|
GOPRIVATE: git.daviestechlabs.io
|
||||||
REGISTRY_HOST: gitea-http.gitea.svc.cluster.local:3000
|
REGISTRY: registry.lab.daviestechlabs.io/daviestechlabs
|
||||||
|
REGISTRY_HOST: registry.lab.daviestechlabs.io
|
||||||
IMAGE_NAME: pipeline-bridge
|
IMAGE_NAME: pipeline-bridge
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
@@ -26,6 +27,9 @@ jobs:
|
|||||||
go-version-file: go.mod
|
go-version-file: go.mod
|
||||||
cache: true
|
cache: true
|
||||||
|
|
||||||
|
- name: Configure private modules
|
||||||
|
run: git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
|
||||||
|
|
||||||
- name: Run go vet
|
- name: Run go vet
|
||||||
run: go vet ./...
|
run: go vet ./...
|
||||||
|
|
||||||
@@ -50,6 +54,9 @@ jobs:
|
|||||||
go-version-file: go.mod
|
go-version-file: go.mod
|
||||||
cache: true
|
cache: true
|
||||||
|
|
||||||
|
- name: Configure private modules
|
||||||
|
run: git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
|
||||||
|
|
||||||
- name: Verify dependencies
|
- name: Verify dependencies
|
||||||
run: go mod verify
|
run: go mod verify
|
||||||
|
|
||||||
@@ -116,11 +123,13 @@ jobs:
|
|||||||
|
|
||||||
- name: Set up Docker Buildx
|
- name: Set up Docker Buildx
|
||||||
uses: docker/setup-buildx-action@v3
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
|
- name: Login to Gitea Registry
|
||||||
|
uses: docker/login-action@v3
|
||||||
with:
|
with:
|
||||||
buildkitd-config-inline: |
|
registry: ${{ env.REGISTRY_HOST }}
|
||||||
[registry."gitea-http.gitea.svc.cluster.local:3000"]
|
username: ${{ secrets.REGISTRY_USER }}
|
||||||
http = true
|
password: ${{ secrets.REGISTRY_TOKEN }}
|
||||||
insecure = true
|
|
||||||
|
|
||||||
- name: Login to Docker Hub
|
- name: Login to Docker Hub
|
||||||
if: vars.DOCKERHUB_USERNAME != ''
|
if: vars.DOCKERHUB_USERNAME != ''
|
||||||
@@ -129,28 +138,6 @@ jobs:
|
|||||||
username: ${{ vars.DOCKERHUB_USERNAME }}
|
username: ${{ vars.DOCKERHUB_USERNAME }}
|
||||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||||
|
|
||||||
- name: Configure Docker for insecure registry
|
|
||||||
run: |
|
|
||||||
sudo mkdir -p /etc/docker
|
|
||||||
echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json
|
|
||||||
sudo systemctl restart docker || sudo service docker restart || true
|
|
||||||
sleep 2
|
|
||||||
|
|
||||||
- name: Login to Gitea Registry
|
|
||||||
run: |
|
|
||||||
AUTH=$(echo -n "${{ secrets.REGISTRY_USER }}:${{ secrets.REGISTRY_TOKEN }}" | base64 -w0)
|
|
||||||
mkdir -p ~/.docker
|
|
||||||
cat > ~/.docker/config.json << EOF
|
|
||||||
{
|
|
||||||
"auths": {
|
|
||||||
"${{ env.REGISTRY_HOST }}": {
|
|
||||||
"auth": "$AUTH"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
EOF
|
|
||||||
echo "Auth configured for ${{ env.REGISTRY_HOST }}"
|
|
||||||
|
|
||||||
- name: Extract metadata
|
- name: Extract metadata
|
||||||
id: meta
|
id: meta
|
||||||
uses: docker/metadata-action@v5
|
uses: docker/metadata-action@v5
|
||||||
@@ -168,8 +155,7 @@ jobs:
|
|||||||
push: true
|
push: true
|
||||||
tags: ${{ steps.meta.outputs.tags }}
|
tags: ${{ steps.meta.outputs.tags }}
|
||||||
labels: ${{ steps.meta.outputs.labels }}
|
labels: ${{ steps.meta.outputs.labels }}
|
||||||
cache-from: type=gha
|
|
||||||
cache-to: type=gha,mode=max
|
|
||||||
|
|
||||||
notify:
|
notify:
|
||||||
name: Notify
|
name: Notify
|
||||||
|
|||||||
@@ -1,129 +0,0 @@
|
|||||||
name: CI
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches: [main]
|
|
||||||
pull_request:
|
|
||||||
branches: [main]
|
|
||||||
|
|
||||||
env:
|
|
||||||
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
lint:
|
|
||||||
name: Lint
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- name: Checkout code
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
|
|
||||||
- name: Set up uv
|
|
||||||
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
|
|
||||||
|
|
||||||
- name: Set up Python
|
|
||||||
run: uv python install 3.13
|
|
||||||
|
|
||||||
- name: Install dependencies
|
|
||||||
run: uv sync --frozen --extra dev
|
|
||||||
|
|
||||||
- name: Run ruff check
|
|
||||||
run: uv run ruff check .
|
|
||||||
|
|
||||||
- name: Run ruff format check
|
|
||||||
run: uv run ruff format --check .
|
|
||||||
|
|
||||||
test:
|
|
||||||
name: Test
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- name: Checkout code
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
|
|
||||||
- name: Set up uv
|
|
||||||
run: curl -LsSf https://astral.sh/uv/install.sh | sh && echo "$HOME/.local/bin" >> $GITHUB_PATH
|
|
||||||
|
|
||||||
- name: Set up Python
|
|
||||||
run: uv python install 3.13
|
|
||||||
|
|
||||||
- name: Install dependencies
|
|
||||||
run: uv sync --frozen --extra dev
|
|
||||||
|
|
||||||
- name: Run tests
|
|
||||||
run: uv run pytest -v
|
|
||||||
|
|
||||||
release:
|
|
||||||
name: Release
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
needs: [lint, test]
|
|
||||||
if: gitea.ref == 'refs/heads/main' && gitea.event_name == 'push'
|
|
||||||
steps:
|
|
||||||
- name: Checkout
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
with:
|
|
||||||
fetch-depth: 0
|
|
||||||
|
|
||||||
- name: Determine version bump
|
|
||||||
id: version
|
|
||||||
run: |
|
|
||||||
# Get latest tag or default to v0.0.0
|
|
||||||
LATEST=$(git describe --tags --abbrev=0 2>/dev/null || echo "v0.0.0")
|
|
||||||
VERSION=${LATEST#v}
|
|
||||||
IFS='.' read -r MAJOR MINOR PATCH <<< "$VERSION"
|
|
||||||
|
|
||||||
# Check commit message for keywords
|
|
||||||
MSG="${{ gitea.event.head_commit.message }}"
|
|
||||||
if echo "$MSG" | grep -qiE "^major:|BREAKING CHANGE"; then
|
|
||||||
MAJOR=$((MAJOR + 1)); MINOR=0; PATCH=0
|
|
||||||
BUMP="major"
|
|
||||||
elif echo "$MSG" | grep -qiE "^(minor:|feat:)"; then
|
|
||||||
MINOR=$((MINOR + 1)); PATCH=0
|
|
||||||
BUMP="minor"
|
|
||||||
else
|
|
||||||
PATCH=$((PATCH + 1))
|
|
||||||
BUMP="patch"
|
|
||||||
fi
|
|
||||||
|
|
||||||
NEW_VERSION="v${MAJOR}.${MINOR}.${PATCH}"
|
|
||||||
echo "version=$NEW_VERSION" >> $GITHUB_OUTPUT
|
|
||||||
echo "bump=$BUMP" >> $GITHUB_OUTPUT
|
|
||||||
echo "Bumping $LATEST → $NEW_VERSION ($BUMP)"
|
|
||||||
|
|
||||||
- name: Create and push tag
|
|
||||||
run: |
|
|
||||||
git config user.name "gitea-actions[bot]"
|
|
||||||
git config user.email "actions@git.daviestechlabs.io"
|
|
||||||
git tag -a ${{ steps.version.outputs.version }} -m "Release ${{ steps.version.outputs.version }}"
|
|
||||||
git push origin ${{ steps.version.outputs.version }}
|
|
||||||
|
|
||||||
notify:
|
|
||||||
name: Notify
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
needs: [lint, test, release]
|
|
||||||
if: always()
|
|
||||||
steps:
|
|
||||||
- name: Notify on success
|
|
||||||
if: needs.lint.result == 'success' && needs.test.result == 'success'
|
|
||||||
run: |
|
|
||||||
curl -s \
|
|
||||||
-H "Title: ✅ CI Passed: ${{ gitea.repository }}" \
|
|
||||||
-H "Priority: default" \
|
|
||||||
-H "Tags: white_check_mark,github" \
|
|
||||||
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
|
|
||||||
-d "Branch: ${{ gitea.ref_name }}
|
|
||||||
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
|
|
||||||
Release: ${{ needs.release.result == 'success' && 'created' || 'skipped' }}" \
|
|
||||||
${{ env.NTFY_URL }}/gitea-ci
|
|
||||||
|
|
||||||
- name: Notify on failure
|
|
||||||
if: needs.lint.result == 'failure' || needs.test.result == 'failure'
|
|
||||||
run: |
|
|
||||||
curl -s \
|
|
||||||
-H "Title: ❌ CI Failed: ${{ gitea.repository }}" \
|
|
||||||
-H "Priority: high" \
|
|
||||||
-H "Tags: x,github" \
|
|
||||||
-H "Click: ${{ gitea.server_url }}/${{ gitea.repository }}/actions/runs/${{ gitea.run_id }}" \
|
|
||||||
-d "Branch: ${{ gitea.ref_name }}
|
|
||||||
Commit: ${{ gitea.event.head_commit.message || gitea.sha }}
|
|
||||||
Lint: ${{ needs.lint.result }}
|
|
||||||
Test: ${{ needs.test.result }}" \
|
|
||||||
${{ env.NTFY_URL }}/gitea-ci
|
|
||||||
60
.gitea/workflows/update-dependency.yml
Normal file
60
.gitea/workflows/update-dependency.yml
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
name: Update handler-base
|
||||||
|
|
||||||
|
on:
|
||||||
|
repository_dispatch:
|
||||||
|
types: [handler-base-release]
|
||||||
|
|
||||||
|
env:
|
||||||
|
NTFY_URL: http://ntfy.observability.svc.cluster.local:80
|
||||||
|
GOPRIVATE: git.daviestechlabs.io
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
update:
|
||||||
|
name: Update handler-base dependency
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout code
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
with:
|
||||||
|
token: ${{ secrets.DISPATCH_TOKEN }}
|
||||||
|
|
||||||
|
- name: Set up Go
|
||||||
|
uses: actions/setup-go@v5
|
||||||
|
with:
|
||||||
|
go-version-file: go.mod
|
||||||
|
cache: true
|
||||||
|
|
||||||
|
- name: Configure Git
|
||||||
|
run: |
|
||||||
|
git config user.name "gitea-actions[bot]"
|
||||||
|
git config user.email "actions@git.daviestechlabs.io"
|
||||||
|
git config --global url."https://gitea-actions:${{ secrets.DISPATCH_TOKEN }}@git.daviestechlabs.io/".insteadOf "https://git.daviestechlabs.io/"
|
||||||
|
|
||||||
|
- name: Update handler-base
|
||||||
|
run: |
|
||||||
|
VERSION="${{ gitea.event.client_payload.version }}"
|
||||||
|
echo "Updating handler-base to ${VERSION}"
|
||||||
|
go get git.daviestechlabs.io/daviestechlabs/handler-base@${VERSION}
|
||||||
|
go mod tidy
|
||||||
|
|
||||||
|
- name: Commit and push
|
||||||
|
run: |
|
||||||
|
VERSION="${{ gitea.event.client_payload.version }}"
|
||||||
|
if git diff --quiet go.mod go.sum; then
|
||||||
|
echo "No changes to commit"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
git add go.mod go.sum
|
||||||
|
git commit -m "chore(deps): bump handler-base to ${VERSION}"
|
||||||
|
git push
|
||||||
|
|
||||||
|
- name: Notify
|
||||||
|
if: success()
|
||||||
|
run: |
|
||||||
|
VERSION="${{ gitea.event.client_payload.version }}"
|
||||||
|
curl -s \
|
||||||
|
-H "Title: 📦 Dep Update: ${{ gitea.repository }}" \
|
||||||
|
-H "Priority: default" \
|
||||||
|
-H "Tags: package,github" \
|
||||||
|
-d "handler-base updated to ${VERSION}" \
|
||||||
|
${{ env.NTFY_URL }}/gitea-ci
|
||||||
@@ -4,11 +4,14 @@ FROM golang:1.25-alpine AS builder
|
|||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
# Install ca-certificates for HTTPS
|
# Install ca-certificates for HTTPS
|
||||||
RUN apk add --no-cache ca-certificates
|
RUN apk add --no-cache ca-certificates git
|
||||||
|
|
||||||
|
ENV GOPRIVATE=git.daviestechlabs.io
|
||||||
|
ENV GONOSUMCHECK=git.daviestechlabs.io
|
||||||
|
|
||||||
# Copy go mod files
|
# Copy go mod files
|
||||||
COPY go.mod go.sum ./
|
COPY go.mod go.sum ./
|
||||||
RUN go mod download
|
RUN --mount=type=secret,id=netrc,target=/root/.netrc go mod download
|
||||||
|
|
||||||
# Copy source code
|
# Copy source code
|
||||||
COPY . .
|
COPY . .
|
||||||
|
|||||||
35
e2e_test.go
35
e2e_test.go
@@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@@ -19,18 +20,18 @@ func TestSubmitArgoE2E_FullPayload(t *testing.T) {
|
|||||||
var receivedBody map[string]any
|
var receivedBody map[string]any
|
||||||
|
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
json.NewDecoder(r.Body).Decode(&receivedBody)
|
_ = json.NewDecoder(r.Body).Decode(&receivedBody)
|
||||||
json.NewEncoder(w).Encode(map[string]any{
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
"metadata": map[string]any{"name": "doc-ingest-xyz"},
|
"metadata": map[string]any{"name": "doc-ingest-xyz"},
|
||||||
})
|
})
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
ctx := t.Context()
|
ctx := t.Context()
|
||||||
params := map[string]any{
|
params := map[string]string{
|
||||||
"source": "s3://bucket/docs",
|
"source": "s3://bucket/docs",
|
||||||
"collection": "knowledge-base",
|
"collection": "knowledge-base",
|
||||||
"batch_size": 100,
|
"batch_size": "100",
|
||||||
}
|
}
|
||||||
|
|
||||||
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001")
|
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001")
|
||||||
@@ -72,17 +73,17 @@ func TestSubmitKubeflowE2E_FullPayload(t *testing.T) {
|
|||||||
var receivedBody map[string]any
|
var receivedBody map[string]any
|
||||||
|
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
json.NewDecoder(r.Body).Decode(&receivedBody)
|
_ = json.NewDecoder(r.Body).Decode(&receivedBody)
|
||||||
json.NewEncoder(w).Encode(map[string]any{
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
"run": map[string]any{"id": "kf-run-e2e-789"},
|
"run": map[string]any{"id": "kf-run-e2e-789"},
|
||||||
})
|
})
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
ctx := t.Context()
|
ctx := t.Context()
|
||||||
params := map[string]any{
|
params := map[string]string{
|
||||||
"query": "what is kubernetes",
|
"query": "what is kubernetes",
|
||||||
"top_k": 5,
|
"top_k": "5",
|
||||||
"user_id": "test-user",
|
"user_id": "test-user",
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -131,7 +132,7 @@ func TestPipelineDispatchE2E_UnknownPipeline(t *testing.T) {
|
|||||||
names = append(names, k)
|
names = append(names, k)
|
||||||
}
|
}
|
||||||
resp := &messages.PipelineStatus{
|
resp := &messages.PipelineStatus{
|
||||||
RequestID: "req-bad",
|
RequestId: "req-bad",
|
||||||
Status: "error",
|
Status: "error",
|
||||||
Error: "Unknown pipeline: nonexistent-pipeline",
|
Error: "Unknown pipeline: nonexistent-pipeline",
|
||||||
AvailablePipelines: names,
|
AvailablePipelines: names,
|
||||||
@@ -150,7 +151,7 @@ func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) {
|
|||||||
|
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
count.Add(1)
|
count.Add(1)
|
||||||
json.NewEncoder(w).Encode(map[string]any{
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
"metadata": map[string]any{"name": "concurrent-wf"},
|
"metadata": map[string]any{"name": "concurrent-wf"},
|
||||||
})
|
})
|
||||||
}))
|
}))
|
||||||
@@ -162,7 +163,7 @@ func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) {
|
|||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
_, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference",
|
_, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference",
|
||||||
map[string]any{"batch": i}, "req-concurrent")
|
map[string]string{"batch": fmt.Sprintf("%d", i)}, "req-concurrent")
|
||||||
errs <- err
|
errs <- err
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -184,30 +185,30 @@ func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) {
|
|||||||
|
|
||||||
func BenchmarkSubmitArgo(b *testing.B) {
|
func BenchmarkSubmitArgo(b *testing.B) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Write([]byte(`{"metadata":{"name":"bench-wf"}}`))
|
_, _ = w.Write([]byte(`{"metadata":{"name":"bench-wf"}}`))
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
ctx := b.Context()
|
ctx := b.Context()
|
||||||
params := map[string]any{"source": "test"}
|
params := map[string]string{"source": "test"}
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for b.Loop() {
|
for b.Loop() {
|
||||||
submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "bench-req")
|
_, _ = submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "bench-req")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkSubmitKubeflow(b *testing.B) {
|
func BenchmarkSubmitKubeflow(b *testing.B) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Write([]byte(`{"run":{"id":"bench-run"}}`))
|
_, _ = w.Write([]byte(`{"run":{"id":"bench-run"}}`))
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
ctx := b.Context()
|
ctx := b.Context()
|
||||||
params := map[string]any{"query": "test"}
|
params := map[string]string{"query": "test"}
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for b.Loop() {
|
for b.Loop() {
|
||||||
submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", params, "bench-req")
|
_, _ = submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", params, "bench-req")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
8
go.mod
8
go.mod
@@ -3,8 +3,9 @@ module git.daviestechlabs.io/daviestechlabs/pipeline-bridge
|
|||||||
go 1.25.1
|
go 1.25.1
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.daviestechlabs.io/daviestechlabs/handler-base v0.0.0
|
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0
|
||||||
github.com/nats-io/nats.go v1.48.0
|
github.com/nats-io/nats.go v1.48.0
|
||||||
|
google.golang.org/protobuf v1.36.11
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
@@ -18,8 +19,6 @@ require (
|
|||||||
github.com/klauspost/compress v1.18.0 // indirect
|
github.com/klauspost/compress v1.18.0 // indirect
|
||||||
github.com/nats-io/nkeys v0.4.11 // indirect
|
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
|
|
||||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
|
||||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||||
go.opentelemetry.io/otel v1.40.0 // indirect
|
go.opentelemetry.io/otel v1.40.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
|
||||||
@@ -37,7 +36,4 @@ require (
|
|||||||
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
|
||||||
google.golang.org/grpc v1.78.0 // indirect
|
google.golang.org/grpc v1.78.0 // indirect
|
||||||
google.golang.org/protobuf v1.36.11 // indirect
|
|
||||||
)
|
)
|
||||||
|
|
||||||
replace git.daviestechlabs.io/daviestechlabs/handler-base => ../handler-base
|
|
||||||
|
|||||||
6
go.sum
6
go.sum
@@ -1,3 +1,5 @@
|
|||||||
|
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0 h1:pB3ehOKaDYQfbyRBKQXrB9curqSFteLrDveoElRKnBY=
|
||||||
|
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0/go.mod h1:zocOHFt8yY3cW4+Xi37sNr5Tw7KcjGFSZqgWYxPWyqA=
|
||||||
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
|
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
|
||||||
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
||||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
@@ -31,10 +33,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
|||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
|
|
||||||
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
|
|
||||||
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
|
|
||||||
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
|
|
||||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||||
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
|
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
|
||||||
|
|||||||
32
main.go
32
main.go
@@ -17,6 +17,7 @@ import (
|
|||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
|
||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Pipeline definitions — maps pipeline name to engine config.
|
// Pipeline definitions — maps pipeline name to engine config.
|
||||||
@@ -47,20 +48,20 @@ func main() {
|
|||||||
|
|
||||||
h := handler.New("ai.pipeline.trigger", cfg)
|
h := handler.New("ai.pipeline.trigger", cfg)
|
||||||
|
|
||||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
|
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||||
req, err := natsutil.Decode[messages.PipelineTrigger](msg.Data)
|
var req messages.PipelineTrigger
|
||||||
if err != nil {
|
if err := natsutil.Decode(msg.Data, &req); err != nil {
|
||||||
return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil
|
return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
requestID := req.RequestID
|
requestID := req.RequestId
|
||||||
if requestID == "" {
|
if requestID == "" {
|
||||||
requestID = "unknown"
|
requestID = "unknown"
|
||||||
}
|
}
|
||||||
pipelineName := req.Pipeline
|
pipelineName := req.Pipeline
|
||||||
params := req.Parameters
|
params := req.Parameters
|
||||||
if params == nil {
|
if params == nil {
|
||||||
params = map[string]any{}
|
params = map[string]string{}
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID)
|
slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID)
|
||||||
@@ -73,7 +74,7 @@ func main() {
|
|||||||
names = append(names, k)
|
names = append(names, k)
|
||||||
}
|
}
|
||||||
return &messages.PipelineStatus{
|
return &messages.PipelineStatus{
|
||||||
RequestID: requestID,
|
RequestId: requestID,
|
||||||
Status: "error",
|
Status: "error",
|
||||||
Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName),
|
Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName),
|
||||||
AvailablePipelines: names,
|
AvailablePipelines: names,
|
||||||
@@ -81,6 +82,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var runID string
|
var runID string
|
||||||
|
var err error
|
||||||
|
|
||||||
if pipeline.Engine == "argo" {
|
if pipeline.Engine == "argo" {
|
||||||
runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID)
|
runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID)
|
||||||
@@ -91,16 +93,16 @@ func main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err)
|
slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err)
|
||||||
return &messages.PipelineStatus{
|
return &messages.PipelineStatus{
|
||||||
RequestID: requestID,
|
RequestId: requestID,
|
||||||
Status: "error",
|
Status: "error",
|
||||||
Error: err.Error(),
|
Error: err.Error(),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
result := &messages.PipelineStatus{
|
result := &messages.PipelineStatus{
|
||||||
RequestID: requestID,
|
RequestId: requestID,
|
||||||
Status: "submitted",
|
Status: "submitted",
|
||||||
RunID: runID,
|
RunId: runID,
|
||||||
Engine: pipeline.Engine,
|
Engine: pipeline.Engine,
|
||||||
Pipeline: pipelineName,
|
Pipeline: pipelineName,
|
||||||
SubmittedAt: time.Now().UTC().Format(time.RFC3339),
|
SubmittedAt: time.Now().UTC().Format(time.RFC3339),
|
||||||
@@ -118,10 +120,10 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]any, requestID string) (string, error) {
|
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]string, requestID string) (string, error) {
|
||||||
argoParams := make([]map[string]string, 0, len(params))
|
argoParams := make([]map[string]string, 0, len(params))
|
||||||
for k, v := range params {
|
for k, v := range params {
|
||||||
argoParams = append(argoParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
|
argoParams = append(argoParams, map[string]string{"name": k, "value": v})
|
||||||
}
|
}
|
||||||
|
|
||||||
workflow := map[string]any{
|
workflow := map[string]any{
|
||||||
@@ -151,7 +153,7 @@ func submitArgo(ctx context.Context, client *http.Client, host, namespace, templ
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("argo request: %w", err)
|
return "", fmt.Errorf("argo request: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer func() { _ = resp.Body.Close() }()
|
||||||
respBody, _ := io.ReadAll(resp.Body)
|
respBody, _ := io.ReadAll(resp.Body)
|
||||||
|
|
||||||
if resp.StatusCode >= 400 {
|
if resp.StatusCode >= 400 {
|
||||||
@@ -169,10 +171,10 @@ func submitArgo(ctx context.Context, client *http.Client, host, namespace, templ
|
|||||||
return result.Metadata.Name, nil
|
return result.Metadata.Name, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]any, requestID string) (string, error) {
|
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]string, requestID string) (string, error) {
|
||||||
kfParams := make([]map[string]string, 0, len(params))
|
kfParams := make([]map[string]string, 0, len(params))
|
||||||
for k, v := range params {
|
for k, v := range params {
|
||||||
kfParams = append(kfParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
|
kfParams = append(kfParams, map[string]string{"name": k, "value": v})
|
||||||
}
|
}
|
||||||
|
|
||||||
runRequest := map[string]any{
|
runRequest := map[string]any{
|
||||||
@@ -196,7 +198,7 @@ func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID s
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("kubeflow request: %w", err)
|
return "", fmt.Errorf("kubeflow request: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer func() { _ = resp.Body.Close() }()
|
||||||
respBody, _ := io.ReadAll(resp.Body)
|
respBody, _ := io.ReadAll(resp.Body)
|
||||||
|
|
||||||
if resp.StatusCode >= 400 {
|
if resp.StatusCode >= 400 {
|
||||||
|
|||||||
42
main_test.go
42
main_test.go
@@ -8,25 +8,25 @@ import (
|
|||||||
|
|
||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
||||||
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
||||||
"github.com/vmihailenco/msgpack/v5"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPipelineTriggerDecode(t *testing.T) {
|
func TestPipelineTriggerDecode(t *testing.T) {
|
||||||
req := messages.PipelineTrigger{
|
req := &messages.PipelineTrigger{
|
||||||
RequestID: "req-001",
|
RequestId: "req-001",
|
||||||
Pipeline: "document-ingestion",
|
Pipeline: "document-ingestion",
|
||||||
Parameters: map[string]any{"source": "s3://bucket"},
|
Parameters: map[string]string{"source": "s3://bucket"},
|
||||||
}
|
}
|
||||||
data, err := msgpack.Marshal(&req)
|
data, err := proto.Marshal(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
decoded, err := natsutil.Decode[messages.PipelineTrigger](data)
|
var decoded messages.PipelineTrigger
|
||||||
if err != nil {
|
if err := natsutil.Decode(data, &decoded); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if decoded.RequestID != "req-001" {
|
if decoded.RequestId != "req-001" {
|
||||||
t.Errorf("RequestID = %q", decoded.RequestID)
|
t.Errorf("RequestID = %q", decoded.RequestId)
|
||||||
}
|
}
|
||||||
if decoded.Pipeline != "document-ingestion" {
|
if decoded.Pipeline != "document-ingestion" {
|
||||||
t.Errorf("Pipeline = %q", decoded.Pipeline)
|
t.Errorf("Pipeline = %q", decoded.Pipeline)
|
||||||
@@ -38,22 +38,22 @@ func TestPipelineTriggerDecode(t *testing.T) {
|
|||||||
|
|
||||||
func TestPipelineStatusRoundtrip(t *testing.T) {
|
func TestPipelineStatusRoundtrip(t *testing.T) {
|
||||||
status := messages.PipelineStatus{
|
status := messages.PipelineStatus{
|
||||||
RequestID: "req-002",
|
RequestId: "req-002",
|
||||||
Status: "submitted",
|
Status: "submitted",
|
||||||
RunID: "argo-abc123",
|
RunId: "argo-abc123",
|
||||||
Engine: "argo",
|
Engine: "argo",
|
||||||
Pipeline: "batch-inference",
|
Pipeline: "batch-inference",
|
||||||
}
|
}
|
||||||
data, err := msgpack.Marshal(&status)
|
data, err := proto.Marshal(&status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
var got messages.PipelineStatus
|
var got messages.PipelineStatus
|
||||||
if err := msgpack.Unmarshal(data, &got); err != nil {
|
if err := proto.Unmarshal(data, &got); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if got.RunID != "argo-abc123" {
|
if got.RunId != "argo-abc123" {
|
||||||
t.Errorf("RunID = %q", got.RunID)
|
t.Errorf("RunID = %q", got.RunId)
|
||||||
}
|
}
|
||||||
if got.Engine != "argo" {
|
if got.Engine != "argo" {
|
||||||
t.Errorf("Engine = %q", got.Engine)
|
t.Errorf("Engine = %q", got.Engine)
|
||||||
@@ -111,14 +111,14 @@ func TestSubmitArgo(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
json.NewEncoder(w).Encode(map[string]any{
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
"metadata": map[string]any{"name": "document-ingestion-abc123"},
|
"metadata": map[string]any{"name": "document-ingestion-abc123"},
|
||||||
})
|
})
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
ctx := t.Context()
|
ctx := t.Context()
|
||||||
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]any{
|
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]string{
|
||||||
"source": "test",
|
"source": "test",
|
||||||
}, "req-001")
|
}, "req-001")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -132,7 +132,7 @@ func TestSubmitArgo(t *testing.T) {
|
|||||||
func TestSubmitArgoError(t *testing.T) {
|
func TestSubmitArgoError(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
w.Write([]byte(`{"message":"bad request"}`))
|
_, _ = w.Write([]byte(`{"message":"bad request"}`))
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
@@ -153,14 +153,14 @@ func TestSubmitKubeflow(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
json.NewEncoder(w).Encode(map[string]any{
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
"run": map[string]any{"id": "kf-run-456"},
|
"run": map[string]any{"id": "kf-run-456"},
|
||||||
})
|
})
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
ctx := t.Context()
|
ctx := t.Context()
|
||||||
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]any{
|
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]string{
|
||||||
"query": "test",
|
"query": "test",
|
||||||
}, "req-002")
|
}, "req-002")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -174,7 +174,7 @@ func TestSubmitKubeflow(t *testing.T) {
|
|||||||
func TestSubmitKubeflowError(t *testing.T) {
|
func TestSubmitKubeflowError(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
w.Write([]byte("internal error"))
|
_, _ = w.Write([]byte("internal error"))
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user