Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c971f34017 | |||
| b97849e50d | |||
| a8b5d9153d | |||
| c28ceadb54 | |||
| 66ef758808 | |||
| 338210eb74 |
@@ -121,42 +121,19 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
with:
|
||||
buildkitd-config-inline: |
|
||||
[registry."gitea-http.gitea.svc.cluster.local:3000"]
|
||||
http = true
|
||||
insecure = true
|
||||
|
||||
- name: Login to Docker Hub
|
||||
if: vars.DOCKERHUB_USERNAME != ''
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ vars.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
|
||||
- name: Configure Docker for insecure registry
|
||||
- name: Configure insecure registry
|
||||
run: |
|
||||
sudo mkdir -p /etc/docker
|
||||
echo '{"insecure-registries": ["${{ env.REGISTRY_HOST }}"]}' | sudo tee /etc/docker/daemon.json
|
||||
sudo systemctl restart docker || sudo service docker restart || true
|
||||
sleep 2
|
||||
sudo kill -SIGHUP "$(pidof dockerd)" || true
|
||||
sleep 3
|
||||
|
||||
- name: Login to Gitea Registry
|
||||
run: |
|
||||
AUTH=$(echo -n "${{ secrets.REGISTRY_USER }}:${{ secrets.REGISTRY_TOKEN }}" | base64 -w0)
|
||||
mkdir -p ~/.docker
|
||||
cat > ~/.docker/config.json << EOF
|
||||
{
|
||||
"auths": {
|
||||
"${{ env.REGISTRY_HOST }}": {
|
||||
"auth": "$AUTH"
|
||||
}
|
||||
}
|
||||
}
|
||||
EOF
|
||||
echo "Auth configured for ${{ env.REGISTRY_HOST }}"
|
||||
run: echo "${{ secrets.REGISTRY_TOKEN }}" | docker login "${{ env.REGISTRY_HOST }}" -u "${{ secrets.REGISTRY_USER }}" --password-stdin
|
||||
|
||||
- name: Login to Docker Hub
|
||||
if: vars.DOCKERHUB_USERNAME != ''
|
||||
run: echo "${{ secrets.DOCKERHUB_TOKEN }}" | docker login -u "${{ vars.DOCKERHUB_USERNAME }}" --password-stdin
|
||||
|
||||
- name: Extract metadata
|
||||
id: meta
|
||||
@@ -164,19 +141,25 @@ jobs:
|
||||
with:
|
||||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
|
||||
tags: |
|
||||
type=semver,pattern={{version}},value=${{ needs.release.outputs.version }}
|
||||
type=semver,pattern={{major}}.{{minor}},value=${{ needs.release.outputs.version }}
|
||||
type=raw,value=${{ needs.release.outputs.version }}
|
||||
type=raw,value=latest,enable={{is_default_branch}}
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
run: |
|
||||
# Build with all tags
|
||||
TAGS=""
|
||||
while IFS= read -r tag; do
|
||||
[ -n "$tag" ] && TAGS="$TAGS -t $tag"
|
||||
done <<< "${{ steps.meta.outputs.tags }}"
|
||||
docker build $TAGS \
|
||||
--label "org.opencontainers.image.source=${{ gitea.server_url }}/${{ gitea.repository }}" \
|
||||
--label "org.opencontainers.image.revision=${{ gitea.sha }}" \
|
||||
.
|
||||
# Push each tag
|
||||
while IFS= read -r tag; do
|
||||
[ -n "$tag" ] && docker push "$tag"
|
||||
done <<< "${{ steps.meta.outputs.tags }}"
|
||||
|
||||
|
||||
notify:
|
||||
name: Notify
|
||||
|
||||
@@ -4,11 +4,14 @@ FROM golang:1.25-alpine AS builder
|
||||
WORKDIR /app
|
||||
|
||||
# Install ca-certificates for HTTPS
|
||||
RUN apk add --no-cache ca-certificates
|
||||
RUN apk add --no-cache ca-certificates git
|
||||
|
||||
ENV GOPRIVATE=git.daviestechlabs.io
|
||||
ENV GONOSUMCHECK=git.daviestechlabs.io
|
||||
|
||||
# Copy go mod files
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
RUN --mount=type=secret,id=netrc,target=/root/.netrc go mod download
|
||||
|
||||
# Copy source code
|
||||
COPY . .
|
||||
|
||||
17
e2e_test.go
17
e2e_test.go
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
@@ -27,10 +28,10 @@ func TestSubmitArgoE2E_FullPayload(t *testing.T) {
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
params := map[string]any{
|
||||
params := map[string]string{
|
||||
"source": "s3://bucket/docs",
|
||||
"collection": "knowledge-base",
|
||||
"batch_size": 100,
|
||||
"batch_size": "100",
|
||||
}
|
||||
|
||||
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", params, "req-e2e-001")
|
||||
@@ -80,9 +81,9 @@ func TestSubmitKubeflowE2E_FullPayload(t *testing.T) {
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
params := map[string]any{
|
||||
params := map[string]string{
|
||||
"query": "what is kubernetes",
|
||||
"top_k": 5,
|
||||
"top_k": "5",
|
||||
"user_id": "test-user",
|
||||
}
|
||||
|
||||
@@ -131,7 +132,7 @@ func TestPipelineDispatchE2E_UnknownPipeline(t *testing.T) {
|
||||
names = append(names, k)
|
||||
}
|
||||
resp := &messages.PipelineStatus{
|
||||
RequestID: "req-bad",
|
||||
RequestId: "req-bad",
|
||||
Status: "error",
|
||||
Error: "Unknown pipeline: nonexistent-pipeline",
|
||||
AvailablePipelines: names,
|
||||
@@ -162,7 +163,7 @@ func TestSubmitArgoE2E_ConcurrentRequests(t *testing.T) {
|
||||
for i := 0; i < 10; i++ {
|
||||
go func() {
|
||||
_, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "batch-inference",
|
||||
map[string]any{"batch": i}, "req-concurrent")
|
||||
map[string]string{"batch": fmt.Sprintf("%d", i)}, "req-concurrent")
|
||||
errs <- err
|
||||
}()
|
||||
}
|
||||
@@ -189,7 +190,7 @@ func BenchmarkSubmitArgo(b *testing.B) {
|
||||
defer ts.Close()
|
||||
|
||||
ctx := b.Context()
|
||||
params := map[string]any{"source": "test"}
|
||||
params := map[string]string{"source": "test"}
|
||||
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
@@ -204,7 +205,7 @@ func BenchmarkSubmitKubeflow(b *testing.B) {
|
||||
defer ts.Close()
|
||||
|
||||
ctx := b.Context()
|
||||
params := map[string]any{"query": "test"}
|
||||
params := map[string]string{"query": "test"}
|
||||
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
|
||||
6
go.mod
6
go.mod
@@ -3,9 +3,9 @@ module git.daviestechlabs.io/daviestechlabs/pipeline-bridge
|
||||
go 1.25.1
|
||||
|
||||
require (
|
||||
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3
|
||||
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0
|
||||
github.com/nats-io/nats.go v1.48.0
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1
|
||||
google.golang.org/protobuf v1.36.11
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -19,7 +19,6 @@ require (
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/otel v1.40.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
|
||||
@@ -37,5 +36,4 @@ require (
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
|
||||
google.golang.org/grpc v1.78.0 // indirect
|
||||
google.golang.org/protobuf v1.36.11 // indirect
|
||||
)
|
||||
|
||||
8
go.sum
8
go.sum
@@ -1,5 +1,5 @@
|
||||
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3 h1:uYog8B839ulqrWoht3qqCvT7CnR3e2skpaLZc2Pg3GI=
|
||||
git.daviestechlabs.io/daviestechlabs/handler-base v0.1.3/go.mod h1:M3HgvUDWnRn7cX3BE8l+HvoCUYtmRr5OoumB+hnRHoE=
|
||||
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0 h1:pB3ehOKaDYQfbyRBKQXrB9curqSFteLrDveoElRKnBY=
|
||||
git.daviestechlabs.io/daviestechlabs/handler-base v1.0.0/go.mod h1:zocOHFt8yY3cW4+Xi37sNr5Tw7KcjGFSZqgWYxPWyqA=
|
||||
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
|
||||
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
@@ -33,10 +33,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
|
||||
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
|
||||
|
||||
28
main.go
28
main.go
@@ -17,6 +17,7 @@ import (
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/handler"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// Pipeline definitions — maps pipeline name to engine config.
|
||||
@@ -47,20 +48,20 @@ func main() {
|
||||
|
||||
h := handler.New("ai.pipeline.trigger", cfg)
|
||||
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (any, error) {
|
||||
req, err := natsutil.Decode[messages.PipelineTrigger](msg.Data)
|
||||
if err != nil {
|
||||
h.OnTypedMessage(func(ctx context.Context, msg *nats.Msg) (proto.Message, error) {
|
||||
var req messages.PipelineTrigger
|
||||
if err := natsutil.Decode(msg.Data, &req); err != nil {
|
||||
return &messages.PipelineStatus{Status: "error", Error: "Invalid request encoding"}, nil
|
||||
}
|
||||
|
||||
requestID := req.RequestID
|
||||
requestID := req.RequestId
|
||||
if requestID == "" {
|
||||
requestID = "unknown"
|
||||
}
|
||||
pipelineName := req.Pipeline
|
||||
params := req.Parameters
|
||||
if params == nil {
|
||||
params = map[string]any{}
|
||||
params = map[string]string{}
|
||||
}
|
||||
|
||||
slog.Info("triggering pipeline", "pipeline", pipelineName, "request_id", requestID)
|
||||
@@ -73,7 +74,7 @@ func main() {
|
||||
names = append(names, k)
|
||||
}
|
||||
return &messages.PipelineStatus{
|
||||
RequestID: requestID,
|
||||
RequestId: requestID,
|
||||
Status: "error",
|
||||
Error: fmt.Sprintf("Unknown pipeline: %s", pipelineName),
|
||||
AvailablePipelines: names,
|
||||
@@ -81,6 +82,7 @@ func main() {
|
||||
}
|
||||
|
||||
var runID string
|
||||
var err error
|
||||
|
||||
if pipeline.Engine == "argo" {
|
||||
runID, err = submitArgo(ctx, httpClient, argoHost, argoNamespace, pipeline.Template, params, requestID)
|
||||
@@ -91,16 +93,16 @@ func main() {
|
||||
if err != nil {
|
||||
slog.Error("pipeline submit failed", "pipeline", pipelineName, "error", err)
|
||||
return &messages.PipelineStatus{
|
||||
RequestID: requestID,
|
||||
RequestId: requestID,
|
||||
Status: "error",
|
||||
Error: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
result := &messages.PipelineStatus{
|
||||
RequestID: requestID,
|
||||
RequestId: requestID,
|
||||
Status: "submitted",
|
||||
RunID: runID,
|
||||
RunId: runID,
|
||||
Engine: pipeline.Engine,
|
||||
Pipeline: pipelineName,
|
||||
SubmittedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
@@ -118,10 +120,10 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]any, requestID string) (string, error) {
|
||||
func submitArgo(ctx context.Context, client *http.Client, host, namespace, template string, params map[string]string, requestID string) (string, error) {
|
||||
argoParams := make([]map[string]string, 0, len(params))
|
||||
for k, v := range params {
|
||||
argoParams = append(argoParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
|
||||
argoParams = append(argoParams, map[string]string{"name": k, "value": v})
|
||||
}
|
||||
|
||||
workflow := map[string]any{
|
||||
@@ -169,10 +171,10 @@ func submitArgo(ctx context.Context, client *http.Client, host, namespace, templ
|
||||
return result.Metadata.Name, nil
|
||||
}
|
||||
|
||||
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]any, requestID string) (string, error) {
|
||||
func submitKubeflow(ctx context.Context, client *http.Client, host, pipelineID string, params map[string]string, requestID string) (string, error) {
|
||||
kfParams := make([]map[string]string, 0, len(params))
|
||||
for k, v := range params {
|
||||
kfParams = append(kfParams, map[string]string{"name": k, "value": fmt.Sprintf("%v", v)})
|
||||
kfParams = append(kfParams, map[string]string{"name": k, "value": v})
|
||||
}
|
||||
|
||||
runRequest := map[string]any{
|
||||
|
||||
34
main_test.go
34
main_test.go
@@ -8,25 +8,25 @@ import (
|
||||
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/messages"
|
||||
"git.daviestechlabs.io/daviestechlabs/handler-base/natsutil"
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestPipelineTriggerDecode(t *testing.T) {
|
||||
req := messages.PipelineTrigger{
|
||||
RequestID: "req-001",
|
||||
req := &messages.PipelineTrigger{
|
||||
RequestId: "req-001",
|
||||
Pipeline: "document-ingestion",
|
||||
Parameters: map[string]any{"source": "s3://bucket"},
|
||||
Parameters: map[string]string{"source": "s3://bucket"},
|
||||
}
|
||||
data, err := msgpack.Marshal(&req)
|
||||
data, err := proto.Marshal(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
decoded, err := natsutil.Decode[messages.PipelineTrigger](data)
|
||||
if err != nil {
|
||||
var decoded messages.PipelineTrigger
|
||||
if err := natsutil.Decode(data, &decoded); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if decoded.RequestID != "req-001" {
|
||||
t.Errorf("RequestID = %q", decoded.RequestID)
|
||||
if decoded.RequestId != "req-001" {
|
||||
t.Errorf("RequestID = %q", decoded.RequestId)
|
||||
}
|
||||
if decoded.Pipeline != "document-ingestion" {
|
||||
t.Errorf("Pipeline = %q", decoded.Pipeline)
|
||||
@@ -38,22 +38,22 @@ func TestPipelineTriggerDecode(t *testing.T) {
|
||||
|
||||
func TestPipelineStatusRoundtrip(t *testing.T) {
|
||||
status := messages.PipelineStatus{
|
||||
RequestID: "req-002",
|
||||
RequestId: "req-002",
|
||||
Status: "submitted",
|
||||
RunID: "argo-abc123",
|
||||
RunId: "argo-abc123",
|
||||
Engine: "argo",
|
||||
Pipeline: "batch-inference",
|
||||
}
|
||||
data, err := msgpack.Marshal(&status)
|
||||
data, err := proto.Marshal(&status)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var got messages.PipelineStatus
|
||||
if err := msgpack.Unmarshal(data, &got); err != nil {
|
||||
if err := proto.Unmarshal(data, &got); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got.RunID != "argo-abc123" {
|
||||
t.Errorf("RunID = %q", got.RunID)
|
||||
if got.RunId != "argo-abc123" {
|
||||
t.Errorf("RunID = %q", got.RunId)
|
||||
}
|
||||
if got.Engine != "argo" {
|
||||
t.Errorf("Engine = %q", got.Engine)
|
||||
@@ -118,7 +118,7 @@ func TestSubmitArgo(t *testing.T) {
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]any{
|
||||
runID, err := submitArgo(ctx, ts.Client(), ts.URL, "ai-ml", "document-ingestion", map[string]string{
|
||||
"source": "test",
|
||||
}, "req-001")
|
||||
if err != nil {
|
||||
@@ -160,7 +160,7 @@ func TestSubmitKubeflow(t *testing.T) {
|
||||
defer ts.Close()
|
||||
|
||||
ctx := t.Context()
|
||||
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]any{
|
||||
runID, err := submitKubeflow(ctx, ts.Client(), ts.URL, "rag-pipeline", map[string]string{
|
||||
"query": "test",
|
||||
}, "req-002")
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user