- SSE subscription to ntfy with auto-reconnect - Discord webhook integration with embed formatting - Priority to color mapping, tag to emoji conversion - Native HashiCorp Vault support (Kubernetes + token auth) - Hot reload secrets via fsnotify or Vault polling - Prometheus metrics (/metrics endpoint) - Health/ready endpoints for Kubernetes probes - Comprehensive unit tests and fuzz tests - Multi-stage Docker build (~10MB scratch image) - CI/CD pipeline for Gitea Actions
117 lines
2.9 KiB
Go
117 lines
2.9 KiB
Go
package bridge
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync/atomic"
|
|
|
|
"git.daviestechlabs.io/daviestechlabs/ntfy-discord/internal/config"
|
|
"git.daviestechlabs.io/daviestechlabs/ntfy-discord/internal/discord"
|
|
"git.daviestechlabs.io/daviestechlabs/ntfy-discord/internal/ntfy"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
)
|
|
|
|
var (
|
|
messagesReceived = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "ntfy_discord_messages_received_total",
|
|
Help: "Total number of messages received from ntfy",
|
|
}, []string{"topic"})
|
|
|
|
messagesSent = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "ntfy_discord_messages_sent_total",
|
|
Help: "Total number of messages sent to Discord",
|
|
}, []string{"topic"})
|
|
|
|
messagesErrors = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "ntfy_discord_messages_errors_total",
|
|
Help: "Total number of errors sending to Discord",
|
|
}, []string{"topic"})
|
|
)
|
|
|
|
// Bridge connects ntfy to Discord
|
|
type Bridge struct {
|
|
cfg *config.Config
|
|
ntfyClient *ntfy.Client
|
|
discordClient *discord.Client
|
|
ready atomic.Bool
|
|
}
|
|
|
|
// New creates a new Bridge
|
|
func New(cfg *config.Config) *Bridge {
|
|
return &Bridge{
|
|
cfg: cfg,
|
|
ntfyClient: ntfy.NewClient(cfg.NtfyURL, cfg.NtfyTopics),
|
|
discordClient: discord.NewClient(),
|
|
}
|
|
}
|
|
|
|
// Run starts the bridge
|
|
func (b *Bridge) Run(ctx context.Context) {
|
|
if len(b.cfg.NtfyTopics) == 0 {
|
|
slog.Error("no topics configured")
|
|
return
|
|
}
|
|
|
|
msgCh := make(chan ntfy.Message, 100)
|
|
|
|
// Start SSE subscription
|
|
go b.ntfyClient.Subscribe(ctx, msgCh)
|
|
|
|
// Mark as ready once we start processing
|
|
b.ready.Store(true)
|
|
|
|
// Process messages
|
|
for {
|
|
select {
|
|
case msg := <-msgCh:
|
|
b.handleMessage(ctx, msg)
|
|
case <-ctx.Done():
|
|
b.ready.Store(false)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// IsReady returns true if the bridge is ready to process messages
|
|
func (b *Bridge) IsReady() bool {
|
|
return b.ready.Load()
|
|
}
|
|
|
|
// IsHealthy returns true if the bridge is healthy
|
|
func (b *Bridge) IsHealthy() bool {
|
|
// For now, healthy == ready
|
|
// Could add more checks like webhook URL configured
|
|
return b.cfg.WebhookURL() != ""
|
|
}
|
|
|
|
func (b *Bridge) handleMessage(ctx context.Context, msg ntfy.Message) {
|
|
messagesReceived.WithLabelValues(msg.Topic).Inc()
|
|
|
|
webhookURL := b.cfg.WebhookURL()
|
|
if webhookURL == "" {
|
|
slog.Warn("no webhook URL configured, dropping message", "topic", msg.Topic)
|
|
messagesErrors.WithLabelValues(msg.Topic).Inc()
|
|
return
|
|
}
|
|
|
|
slog.Info("forwarding message to Discord",
|
|
"id", msg.ID,
|
|
"topic", msg.Topic,
|
|
"title", msg.Title,
|
|
)
|
|
|
|
if err := b.discordClient.Send(ctx, webhookURL, msg); err != nil {
|
|
slog.Error("failed to send to Discord",
|
|
"error", err,
|
|
"topic", msg.Topic,
|
|
"id", msg.ID,
|
|
)
|
|
messagesErrors.WithLabelValues(msg.Topic).Inc()
|
|
return
|
|
}
|
|
|
|
messagesSent.WithLabelValues(msg.Topic).Inc()
|
|
slog.Debug("message sent to Discord", "id", msg.ID)
|
|
}
|