// Package handler provides the base Handler pattern for NATS message-driven services. package handler import ( "context" "fmt" "log/slog" "os" "os/signal" "syscall" "github.com/nats-io/nats.go" "google.golang.org/protobuf/proto" "git.daviestechlabs.io/daviestechlabs/handler-base/config" pb "git.daviestechlabs.io/daviestechlabs/handler-base/gen/messagespb" "git.daviestechlabs.io/daviestechlabs/handler-base/health" "git.daviestechlabs.io/daviestechlabs/handler-base/natsutil" "git.daviestechlabs.io/daviestechlabs/handler-base/telemetry" ) // TypedMessageHandler processes the raw NATS message. // Services unmarshal msg.Data into their own typed structs via natsutil.Decode. // Return a proto.Message (or nil for no reply). type TypedMessageHandler func(ctx context.Context, msg *nats.Msg) (proto.Message, error) // SetupFunc is called once before the handler starts processing messages. type SetupFunc func(ctx context.Context) error // TeardownFunc is called during graceful shutdown. type TeardownFunc func(ctx context.Context) error // Handler is the base service runner that wires NATS, health, and telemetry. type Handler struct { Settings *config.Settings NATS *natsutil.Client Telemetry *telemetry.Provider Subject string QueueGroup string onSetup SetupFunc onTeardown TeardownFunc onTypedMessage TypedMessageHandler running bool } // New creates a Handler for the given NATS subject. func New(subject string, settings *config.Settings) *Handler { if settings == nil { settings = config.Load() } queueGroup := settings.NATSQueueGroup natsOpts := []nats.Option{} if settings.NATSUser != "" && settings.NATSPassword != "" { natsOpts = append(natsOpts, nats.UserInfo(settings.NATSUser, settings.NATSPassword)) } return &Handler{ Settings: settings, Subject: subject, QueueGroup: queueGroup, NATS: natsutil.New(settings.NATSURL, natsOpts...), } } // OnSetup registers the setup callback. func (h *Handler) OnSetup(fn SetupFunc) { h.onSetup = fn } // OnTeardown registers the teardown callback. func (h *Handler) OnTeardown(fn TeardownFunc) { h.onTeardown = fn } // OnTypedMessage registers the message handler callback. func (h *Handler) OnTypedMessage(fn TypedMessageHandler) { h.onTypedMessage = fn } // Run starts the handler: telemetry, health server, NATS subscription, and blocks until SIGTERM/SIGINT. func (h *Handler) Run() error { // Structured logging slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))) slog.Info("starting service", "name", h.Settings.ServiceName, "version", h.Settings.ServiceVersion) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Telemetry tp, shutdown, err := telemetry.Setup(ctx, telemetry.Config{ ServiceName: h.Settings.ServiceName, ServiceVersion: h.Settings.ServiceVersion, ServiceNamespace: h.Settings.ServiceNamespace, DeploymentEnv: h.Settings.DeploymentEnv, Enabled: h.Settings.OTELEnabled, Endpoint: h.Settings.OTELEndpoint, }) if err != nil { return fmt.Errorf("telemetry setup: %w", err) } defer shutdown(ctx) h.Telemetry = tp // Health server healthSrv := health.New( h.Settings.HealthPort, h.Settings.HealthPath, h.Settings.ReadyPath, func() bool { return h.running && h.NATS.IsConnected() }, ) healthSrv.Start() defer healthSrv.Stop(ctx) // Connect to NATS if err := h.NATS.Connect(); err != nil { return fmt.Errorf("nats: %w", err) } defer h.NATS.Close() // User setup if h.onSetup != nil { slog.Info("running service setup") if err := h.onSetup(ctx); err != nil { return fmt.Errorf("setup: %w", err) } } // Subscribe if h.onTypedMessage == nil { return fmt.Errorf("no message handler registered") } if err := h.NATS.Subscribe(h.Subject, h.wrapHandler(ctx), h.QueueGroup); err != nil { return fmt.Errorf("subscribe: %w", err) } h.running = true slog.Info("handler ready", "subject", h.Subject) // Wait for shutdown signal sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT) <-sigCh slog.Info("shutting down") h.running = false // Teardown if h.onTeardown != nil { if err := h.onTeardown(ctx); err != nil { slog.Warn("teardown error", "error", err) } } slog.Info("shutdown complete") return nil } // wrapHandler creates a nats.MsgHandler that dispatches to the registered callback. func (h *Handler) wrapHandler(ctx context.Context) nats.MsgHandler { return func(msg *nats.Msg) { response, err := h.onTypedMessage(ctx, msg) if err != nil { slog.Error("handler error", "subject", msg.Subject, "error", err) if msg.Reply != "" { _ = h.NATS.Publish(msg.Reply, &pb.ErrorResponse{ Error: true, Message: err.Error(), Type: fmt.Sprintf("%T", err), }) } return } if response != nil && msg.Reply != "" { if err := h.NATS.Publish(msg.Reply, response); err != nil { slog.Error("failed to publish reply", "error", err) } } } }