package ntfy import ( "bufio" "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "strings" "time" ) // Message represents an ntfy message type Message struct { ID string `json:"id"` Time int64 `json:"time"` Expires int64 `json:"expires,omitempty"` Event string `json:"event"` Topic string `json:"topic"` Title string `json:"title,omitempty"` Message string `json:"message,omitempty"` Priority int `json:"priority,omitempty"` Tags []string `json:"tags,omitempty"` Click string `json:"click,omitempty"` Icon string `json:"icon,omitempty"` } // Client subscribes to ntfy topics via SSE type Client struct { baseURL string topics []string client *http.Client } // NewClient creates a new ntfy SSE client func NewClient(baseURL string, topics []string) *Client { return &Client{ baseURL: strings.TrimSuffix(baseURL, "/"), topics: topics, client: &http.Client{ Timeout: 0, // No timeout for SSE }, } } // Subscribe connects to ntfy and streams messages to the channel // It automatically reconnects with exponential backoff on failure func (c *Client) Subscribe(ctx context.Context, msgCh chan<- Message) { backoff := time.Second maxBackoff := time.Minute for { select { case <-ctx.Done(): return default: } err := c.connect(ctx, msgCh) if err != nil { if ctx.Err() != nil { return // Context cancelled } slog.Error("ntfy connection failed", "error", err, "backoff", backoff) time.Sleep(backoff) backoff = min(backoff*2, maxBackoff) continue } // Reset backoff on successful connection backoff = time.Second } } func (c *Client) connect(ctx context.Context, msgCh chan<- Message) error { // Build URL with all topics topicPath := strings.Join(c.topics, ",") url := fmt.Sprintf("%s/%s/json", c.baseURL, topicPath) slog.Info("connecting to ntfy", "url", url) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return fmt.Errorf("create request: %w", err) } resp, err := c.client.Do(req) if err != nil { return fmt.Errorf("connect: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) return fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body)) } slog.Info("connected to ntfy", "topics", c.topics) // Read line by line (ntfy sends newline-delimited JSON) scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() if line == "" { continue } var msg Message if err := json.Unmarshal([]byte(line), &msg); err != nil { slog.Warn("failed to parse ntfy message", "error", err, "line", line) continue } // Skip keepalive/open events if msg.Event == "keepalive" || msg.Event == "open" { slog.Debug("ntfy event", "event", msg.Event) continue } if msg.Event == "message" { slog.Debug("received ntfy message", "id", msg.ID, "topic", msg.Topic, "title", msg.Title, ) msgCh <- msg } } if err := scanner.Err(); err != nil { return fmt.Errorf("read stream: %w", err) } return fmt.Errorf("stream closed") }