Files
ntfy-discord/internal/ntfy/client.go
Billy D. f97ad0e7cb
Some checks failed
Build and Push / build (push) Failing after 4m36s
Build and Push / test (push) Has been cancelled
feat: implement ntfy-discord bridge in Go
- SSE subscription to ntfy with auto-reconnect
- Discord webhook integration with embed formatting
- Priority to color mapping, tag to emoji conversion
- Native HashiCorp Vault support (Kubernetes + token auth)
- Hot reload secrets via fsnotify or Vault polling
- Prometheus metrics (/metrics endpoint)
- Health/ready endpoints for Kubernetes probes
- Comprehensive unit tests and fuzz tests
- Multi-stage Docker build (~10MB scratch image)
- CI/CD pipeline for Gitea Actions
2026-02-02 18:13:55 -05:00

138 lines
3.1 KiB
Go

package ntfy
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"time"
)
// Message represents an ntfy message
type Message struct {
ID string `json:"id"`
Time int64 `json:"time"`
Expires int64 `json:"expires,omitempty"`
Event string `json:"event"`
Topic string `json:"topic"`
Title string `json:"title,omitempty"`
Message string `json:"message,omitempty"`
Priority int `json:"priority,omitempty"`
Tags []string `json:"tags,omitempty"`
Click string `json:"click,omitempty"`
Icon string `json:"icon,omitempty"`
}
// Client subscribes to ntfy topics via SSE
type Client struct {
baseURL string
topics []string
client *http.Client
}
// NewClient creates a new ntfy SSE client
func NewClient(baseURL string, topics []string) *Client {
return &Client{
baseURL: strings.TrimSuffix(baseURL, "/"),
topics: topics,
client: &http.Client{
Timeout: 0, // No timeout for SSE
},
}
}
// Subscribe connects to ntfy and streams messages to the channel
// It automatically reconnects with exponential backoff on failure
func (c *Client) Subscribe(ctx context.Context, msgCh chan<- Message) {
backoff := time.Second
maxBackoff := time.Minute
for {
select {
case <-ctx.Done():
return
default:
}
err := c.connect(ctx, msgCh)
if err != nil {
if ctx.Err() != nil {
return // Context cancelled
}
slog.Error("ntfy connection failed", "error", err, "backoff", backoff)
time.Sleep(backoff)
backoff = min(backoff*2, maxBackoff)
continue
}
// Reset backoff on successful connection
backoff = time.Second
}
}
func (c *Client) connect(ctx context.Context, msgCh chan<- Message) error {
// Build URL with all topics
topicPath := strings.Join(c.topics, ",")
url := fmt.Sprintf("%s/%s/json", c.baseURL, topicPath)
slog.Info("connecting to ntfy", "url", url)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("connect: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body))
}
slog.Info("connected to ntfy", "topics", c.topics)
// Read line by line (ntfy sends newline-delimited JSON)
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
var msg Message
if err := json.Unmarshal([]byte(line), &msg); err != nil {
slog.Warn("failed to parse ntfy message", "error", err, "line", line)
continue
}
// Skip keepalive/open events
if msg.Event == "keepalive" || msg.Event == "open" {
slog.Debug("ntfy event", "event", msg.Event)
continue
}
if msg.Event == "message" {
slog.Debug("received ntfy message",
"id", msg.ID,
"topic", msg.Topic,
"title", msg.Title,
)
msgCh <- msg
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("read stream: %w", err)
}
return fmt.Errorf("stream closed")
}