// Package handler provides the base Handler pattern for NATS message-driven services. package handler import ( "context" "fmt" "log/slog" "os" "os/signal" "syscall" "github.com/nats-io/nats.go" "git.daviestechlabs.io/daviestechlabs/handler-base/config" "git.daviestechlabs.io/daviestechlabs/handler-base/health" "git.daviestechlabs.io/daviestechlabs/handler-base/natsutil" "git.daviestechlabs.io/daviestechlabs/handler-base/telemetry" ) // MessageHandler is the callback for processing decoded NATS messages. // data is the msgpack-decoded map. Return a response map (or nil for no reply). type MessageHandler func(ctx context.Context, msg *nats.Msg, data map[string]any) (map[string]any, error) // SetupFunc is called once before the handler starts processing messages. type SetupFunc func(ctx context.Context) error // TeardownFunc is called during graceful shutdown. type TeardownFunc func(ctx context.Context) error // Handler is the base service runner that wires NATS, health, and telemetry. type Handler struct { Settings *config.Settings NATS *natsutil.Client Telemetry *telemetry.Provider Subject string QueueGroup string onSetup SetupFunc onTeardown TeardownFunc onMessage MessageHandler running bool } // New creates a Handler for the given NATS subject. func New(subject string, settings *config.Settings) *Handler { if settings == nil { settings = config.Load() } queueGroup := settings.NATSQueueGroup natsOpts := []nats.Option{} if settings.NATSUser != "" && settings.NATSPassword != "" { natsOpts = append(natsOpts, nats.UserInfo(settings.NATSUser, settings.NATSPassword)) } return &Handler{ Settings: settings, Subject: subject, QueueGroup: queueGroup, NATS: natsutil.New(settings.NATSURL, natsOpts...), } } // OnSetup registers the setup callback. func (h *Handler) OnSetup(fn SetupFunc) { h.onSetup = fn } // OnTeardown registers the teardown callback. func (h *Handler) OnTeardown(fn TeardownFunc) { h.onTeardown = fn } // OnMessage registers the message handler callback. func (h *Handler) OnMessage(fn MessageHandler) { h.onMessage = fn } // Run starts the handler: telemetry, health server, NATS subscription, and blocks until SIGTERM/SIGINT. func (h *Handler) Run() error { // Structured logging slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))) slog.Info("starting service", "name", h.Settings.ServiceName, "version", h.Settings.ServiceVersion) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Telemetry tp, shutdown, err := telemetry.Setup(ctx, telemetry.Config{ ServiceName: h.Settings.ServiceName, ServiceVersion: h.Settings.ServiceVersion, ServiceNamespace: h.Settings.ServiceNamespace, DeploymentEnv: h.Settings.DeploymentEnv, Enabled: h.Settings.OTELEnabled, Endpoint: h.Settings.OTELEndpoint, }) if err != nil { return fmt.Errorf("telemetry setup: %w", err) } defer shutdown(ctx) h.Telemetry = tp // Health server healthSrv := health.New( h.Settings.HealthPort, h.Settings.HealthPath, h.Settings.ReadyPath, func() bool { return h.running && h.NATS.IsConnected() }, ) healthSrv.Start() defer healthSrv.Stop(ctx) // Connect to NATS if err := h.NATS.Connect(); err != nil { return fmt.Errorf("nats: %w", err) } defer h.NATS.Close() // User setup if h.onSetup != nil { slog.Info("running service setup") if err := h.onSetup(ctx); err != nil { return fmt.Errorf("setup: %w", err) } } // Subscribe if h.onMessage == nil { return fmt.Errorf("no message handler registered") } if err := h.NATS.Subscribe(h.Subject, h.wrapHandler(ctx), h.QueueGroup); err != nil { return fmt.Errorf("subscribe: %w", err) } h.running = true slog.Info("handler ready", "subject", h.Subject) // Wait for shutdown signal sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT) <-sigCh slog.Info("shutting down") h.running = false // Teardown if h.onTeardown != nil { if err := h.onTeardown(ctx); err != nil { slog.Warn("teardown error", "error", err) } } slog.Info("shutdown complete") return nil } // wrapHandler creates a nats.MsgHandler that decodes msgpack and dispatches to the user handler. func (h *Handler) wrapHandler(ctx context.Context) nats.MsgHandler { return func(msg *nats.Msg) { data, err := natsutil.DecodeMsgpackMap(msg.Data) if err != nil { slog.Error("failed to decode message", "subject", msg.Subject, "error", err) if msg.Reply != "" { _ = h.NATS.Publish(msg.Reply, map[string]any{ "error": true, "message": err.Error(), "type": "DecodeError", }) } return } response, err := h.onMessage(ctx, msg, data) if err != nil { slog.Error("handler error", "subject", msg.Subject, "error", err) if msg.Reply != "" { _ = h.NATS.Publish(msg.Reply, map[string]any{ "error": true, "message": err.Error(), "type": fmt.Sprintf("%T", err), }) } return } if response != nil && msg.Reply != "" { if err := h.NATS.Publish(msg.Reply, response); err != nil { slog.Error("failed to publish reply", "error", err) } } } }