Files
handler-base/config/config.go
Billy D. f1dd96a42b
All checks were successful
CI / Test (push) Successful in 3m2s
CI / Lint (push) Successful in 3m7s
CI / Release (push) Successful in 1m55s
CI / Notify Downstream (stt-module) (push) Successful in 1s
CI / Notify Downstream (voice-assistant) (push) Successful in 1s
CI / Notify (push) Successful in 2s
CI / Notify Downstream (chat-handler) (push) Successful in 1s
CI / Notify Downstream (pipeline-bridge) (push) Successful in 1s
CI / Notify Downstream (tts-module) (push) Successful in 1s
style: gofmt + fix errcheck lint warning
2026-02-21 15:35:37 -05:00

269 lines
7.2 KiB
Go

// Package config provides environment-based configuration for handler services
// with optional live reload of secrets and service endpoints.
package config
import (
"context"
"log/slog"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
)
// Settings holds base configuration for all handler services.
// Fields in the "hot-reload" section are protected by a RWMutex and can be
// updated at runtime via WatchSecrets(). All other fields are immutable
// after Load() returns.
type Settings struct {
// Service identification (immutable)
ServiceName string
ServiceVersion string
ServiceNamespace string
DeploymentEnv string
// NATS configuration (immutable)
NATSURL string
NATSUser string
NATSPassword string
NATSQueueGroup string
// Redis/Valkey configuration (immutable)
RedisURL string
RedisPassword string
// Milvus configuration (immutable)
MilvusHost string
MilvusPort int
MilvusCollection string
// OpenTelemetry configuration (immutable)
OTELEnabled bool
OTELEndpoint string
OTELUseHTTP bool
// HyperDX configuration (immutable)
HyperDXEnabled bool
HyperDXAPIKey string
HyperDXEndpoint string
// MLflow configuration (immutable)
MLflowTrackingURI string
MLflowExperimentName string
MLflowEnabled bool
// Health check configuration (immutable)
HealthPort int
HealthPath string
ReadyPath string
// Timeouts (immutable)
HTTPTimeout time.Duration
NATSTimeout time.Duration
// Hot-reloadable fields — access via getter methods.
mu sync.RWMutex
embeddingsURL string
rerankerURL string
llmURL string
ttsURL string
sttURL string
// Secrets path for file-based hot reload (Kubernetes secret mounts)
SecretsPath string
}
// Load creates a Settings populated from environment variables with defaults.
func Load() *Settings {
return &Settings{
ServiceName: getEnv("SERVICE_NAME", "handler"),
ServiceVersion: getEnv("SERVICE_VERSION", "1.0.0"),
ServiceNamespace: getEnv("SERVICE_NAMESPACE", "ai-ml"),
DeploymentEnv: getEnv("DEPLOYMENT_ENV", "production"),
NATSURL: getEnv("NATS_URL", "nats://nats.ai-ml.svc.cluster.local:4222"),
NATSUser: getEnv("NATS_USER", ""),
NATSPassword: getEnv("NATS_PASSWORD", ""),
NATSQueueGroup: getEnv("NATS_QUEUE_GROUP", ""),
RedisURL: getEnv("REDIS_URL", "redis://valkey.ai-ml.svc.cluster.local:6379"),
RedisPassword: getEnv("REDIS_PASSWORD", ""),
MilvusHost: getEnv("MILVUS_HOST", "milvus.ai-ml.svc.cluster.local"),
MilvusPort: getEnvInt("MILVUS_PORT", 19530),
MilvusCollection: getEnv("MILVUS_COLLECTION", "documents"),
embeddingsURL: getEnv("EMBEDDINGS_URL", "http://embeddings-predictor.ai-ml.svc.cluster.local"),
rerankerURL: getEnv("RERANKER_URL", "http://reranker-predictor.ai-ml.svc.cluster.local"),
llmURL: getEnv("LLM_URL", "http://vllm-predictor.ai-ml.svc.cluster.local"),
ttsURL: getEnv("TTS_URL", "http://tts-predictor.ai-ml.svc.cluster.local"),
sttURL: getEnv("STT_URL", "http://whisper-predictor.ai-ml.svc.cluster.local"),
OTELEnabled: getEnvBool("OTEL_ENABLED", true),
OTELEndpoint: getEnv("OTEL_ENDPOINT", "http://opentelemetry-collector.observability.svc.cluster.local:4317"),
OTELUseHTTP: getEnvBool("OTEL_USE_HTTP", false),
HyperDXEnabled: getEnvBool("HYPERDX_ENABLED", false),
HyperDXAPIKey: getEnv("HYPERDX_API_KEY", ""),
HyperDXEndpoint: getEnv("HYPERDX_ENDPOINT", "https://in-otel.hyperdx.io"),
MLflowTrackingURI: getEnv("MLFLOW_TRACKING_URI", "http://mlflow.mlflow.svc.cluster.local:80"),
MLflowExperimentName: getEnv("MLFLOW_EXPERIMENT_NAME", ""),
MLflowEnabled: getEnvBool("MLFLOW_ENABLED", true),
HealthPort: getEnvInt("HEALTH_PORT", 8080),
HealthPath: getEnv("HEALTH_PATH", "/health"),
ReadyPath: getEnv("READY_PATH", "/ready"),
HTTPTimeout: getEnvDuration("HTTP_TIMEOUT", 60*time.Second),
NATSTimeout: getEnvDuration("NATS_TIMEOUT", 30*time.Second),
SecretsPath: getEnv("SECRETS_PATH", ""),
}
}
// EmbeddingsURL returns the current embeddings service URL (thread-safe).
func (s *Settings) EmbeddingsURL() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.embeddingsURL
}
// RerankerURL returns the current reranker service URL (thread-safe).
func (s *Settings) RerankerURL() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.rerankerURL
}
// LLMURL returns the current LLM service URL (thread-safe).
func (s *Settings) LLMURL() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.llmURL
}
// TTSURL returns the current TTS service URL (thread-safe).
func (s *Settings) TTSURL() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.ttsURL
}
// STTURL returns the current STT service URL (thread-safe).
func (s *Settings) STTURL() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.sttURL
}
// WatchSecrets watches the SecretsPath directory for changes and reloads
// hot-reloadable fields. Blocks until ctx is cancelled.
func (s *Settings) WatchSecrets(ctx context.Context) {
if s.SecretsPath == "" {
return
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
slog.Error("config: failed to create fsnotify watcher", "error", err)
return
}
defer func() { _ = watcher.Close() }()
if err := watcher.Add(s.SecretsPath); err != nil {
slog.Error("config: failed to watch secrets path", "error", err, "path", s.SecretsPath)
return
}
slog.Info("config: watching secrets for hot reload", "path", s.SecretsPath)
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Create) || event.Has(fsnotify.Write) {
s.reloadFromSecrets()
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
slog.Error("config: fsnotify error", "error", err)
case <-ctx.Done():
return
}
}
}
// reloadFromSecrets reads hot-reloadable values from the secrets directory.
func (s *Settings) reloadFromSecrets() {
s.mu.Lock()
defer s.mu.Unlock()
updated := 0
reload := func(filename string, target *string) {
path := filepath.Join(s.SecretsPath, filename)
data, err := os.ReadFile(path)
if err != nil {
return
}
val := strings.TrimSpace(string(data))
if val != "" && val != *target {
*target = val
updated++
slog.Info("config: reloaded secret", "key", filename)
}
}
reload("embeddings-url", &s.embeddingsURL)
reload("reranker-url", &s.rerankerURL)
reload("llm-url", &s.llmURL)
reload("tts-url", &s.ttsURL)
reload("stt-url", &s.sttURL)
if updated > 0 {
slog.Info("config: secrets reloaded", "updated", updated)
}
}
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func getEnvInt(key string, fallback int) int {
if v := os.Getenv(key); v != "" {
if i, err := strconv.Atoi(v); err == nil {
return i
}
}
return fallback
}
func getEnvBool(key string, fallback bool) bool {
if v := os.Getenv(key); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
return b
}
}
return fallback
}
func getEnvDuration(key string, fallback time.Duration) time.Duration {
if v := os.Getenv(key); v != "" {
if f, err := strconv.ParseFloat(v, 64); err == nil {
return time.Duration(f * float64(time.Second))
}
}
return fallback
}