// Package natsutil provides a NATS/JetStream client with msgpack serialization. package natsutil import ( "fmt" "log/slog" "time" "github.com/nats-io/nats.go" "github.com/vmihailenco/msgpack/v5" ) // Client wraps a NATS connection with msgpack helpers. type Client struct { nc *nats.Conn js nats.JetStreamContext subs []*nats.Subscription url string opts []nats.Option } // New creates a NATS client configured to connect to the given URL. // Optional NATS options (e.g. credentials) can be appended. func New(url string, opts ...nats.Option) *Client { defaults := []nats.Option{ nats.ReconnectWait(2 * time.Second), nats.MaxReconnects(-1), nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { slog.Warn("NATS disconnected", "error", err) }), nats.ReconnectHandler(func(_ *nats.Conn) { slog.Info("NATS reconnected") }), } return &Client{ url: url, opts: append(defaults, opts...), } } // Connect establishes the NATS connection and JetStream context. func (c *Client) Connect() error { nc, err := nats.Connect(c.url, c.opts...) if err != nil { return fmt.Errorf("nats connect: %w", err) } js, err := nc.JetStream() if err != nil { nc.Close() return fmt.Errorf("jetstream: %w", err) } c.nc = nc c.js = js slog.Info("connected to NATS", "url", c.url) return nil } // Close drains subscriptions and closes the connection. func (c *Client) Close() { if c.nc == nil { return } for _, sub := range c.subs { _ = sub.Drain() } c.nc.Close() slog.Info("NATS connection closed") } // Conn returns the underlying *nats.Conn. func (c *Client) Conn() *nats.Conn { return c.nc } // JS returns the JetStream context. func (c *Client) JS() nats.JetStreamContext { return c.js } // IsConnected returns true if the NATS connection is active. func (c *Client) IsConnected() bool { return c.nc != nil && c.nc.IsConnected() } // Subscribe subscribes to a subject with an optional queue group. // The handler receives the raw *nats.Msg. func (c *Client) Subscribe(subject string, handler nats.MsgHandler, queue string) error { var sub *nats.Subscription var err error if queue != "" { sub, err = c.nc.QueueSubscribe(subject, queue, handler) slog.Info("subscribed", "subject", subject, "queue", queue) } else { sub, err = c.nc.Subscribe(subject, handler) slog.Info("subscribed", "subject", subject) } if err != nil { return fmt.Errorf("subscribe %s: %w", subject, err) } c.subs = append(c.subs, sub) return nil } // Publish encodes data as msgpack and publishes to the subject. func (c *Client) Publish(subject string, data any) error { payload, err := msgpack.Marshal(data) if err != nil { return fmt.Errorf("msgpack marshal: %w", err) } return c.nc.Publish(subject, payload) } // Request sends a msgpack-encoded request and decodes the response into result. func (c *Client) Request(subject string, data any, result any, timeout time.Duration) error { payload, err := msgpack.Marshal(data) if err != nil { return fmt.Errorf("msgpack marshal: %w", err) } msg, err := c.nc.Request(subject, payload, timeout) if err != nil { return fmt.Errorf("nats request: %w", err) } return msgpack.Unmarshal(msg.Data, result) } // DecodeMsgpack decodes msgpack-encoded NATS message data into dest. func DecodeMsgpack(msg *nats.Msg, dest any) error { return msgpack.Unmarshal(msg.Data, dest) } // DecodeMsgpackMap decodes msgpack data into a generic map. func DecodeMsgpackMap(data []byte) (map[string]any, error) { var m map[string]any if err := msgpack.Unmarshal(data, &m); err != nil { return nil, err } return m, nil }