// Package config provides environment-based configuration for handler services // with optional live reload of secrets and service endpoints. package config import ( "context" "log/slog" "os" "path/filepath" "strconv" "strings" "sync" "time" "github.com/fsnotify/fsnotify" ) // Settings holds base configuration for all handler services. // Fields in the "hot-reload" section are protected by a RWMutex and can be // updated at runtime via WatchSecrets(). All other fields are immutable // after Load() returns. type Settings struct { // Service identification (immutable) ServiceName string ServiceVersion string ServiceNamespace string DeploymentEnv string // NATS configuration (immutable) NATSURL string NATSUser string NATSPassword string NATSQueueGroup string // Redis/Valkey configuration (immutable) RedisURL string RedisPassword string // Milvus configuration (immutable) MilvusHost string MilvusPort int MilvusCollection string // OpenTelemetry configuration (immutable) OTELEnabled bool OTELEndpoint string OTELUseHTTP bool // HyperDX configuration (immutable) HyperDXEnabled bool HyperDXAPIKey string HyperDXEndpoint string // MLflow configuration (immutable) MLflowTrackingURI string MLflowExperimentName string MLflowEnabled bool // Health check configuration (immutable) HealthPort int HealthPath string ReadyPath string // Timeouts (immutable) HTTPTimeout time.Duration NATSTimeout time.Duration // Hot-reloadable fields — access via getter methods. mu sync.RWMutex embeddingsURL string rerankerURL string llmURL string ttsURL string sttURL string // Secrets path for file-based hot reload (Kubernetes secret mounts) SecretsPath string } // Load creates a Settings populated from environment variables with defaults. func Load() *Settings { return &Settings{ ServiceName: getEnv("SERVICE_NAME", "handler"), ServiceVersion: getEnv("SERVICE_VERSION", "1.0.0"), ServiceNamespace: getEnv("SERVICE_NAMESPACE", "ai-ml"), DeploymentEnv: getEnv("DEPLOYMENT_ENV", "production"), NATSURL: getEnv("NATS_URL", "nats://nats.ai-ml.svc.cluster.local:4222"), NATSUser: getEnv("NATS_USER", ""), NATSPassword: getEnv("NATS_PASSWORD", ""), NATSQueueGroup: getEnv("NATS_QUEUE_GROUP", ""), RedisURL: getEnv("REDIS_URL", "redis://valkey.ai-ml.svc.cluster.local:6379"), RedisPassword: getEnv("REDIS_PASSWORD", ""), MilvusHost: getEnv("MILVUS_HOST", "milvus.ai-ml.svc.cluster.local"), MilvusPort: getEnvInt("MILVUS_PORT", 19530), MilvusCollection: getEnv("MILVUS_COLLECTION", "documents"), embeddingsURL: getEnv("EMBEDDINGS_URL", "http://embeddings-predictor.ai-ml.svc.cluster.local"), rerankerURL: getEnv("RERANKER_URL", "http://reranker-predictor.ai-ml.svc.cluster.local"), llmURL: getEnv("LLM_URL", "http://vllm-predictor.ai-ml.svc.cluster.local"), ttsURL: getEnv("TTS_URL", "http://tts-predictor.ai-ml.svc.cluster.local"), sttURL: getEnv("STT_URL", "http://whisper-predictor.ai-ml.svc.cluster.local"), OTELEnabled: getEnvBool("OTEL_ENABLED", true), OTELEndpoint: getEnv("OTEL_ENDPOINT", "http://opentelemetry-collector.observability.svc.cluster.local:4317"), OTELUseHTTP: getEnvBool("OTEL_USE_HTTP", false), HyperDXEnabled: getEnvBool("HYPERDX_ENABLED", false), HyperDXAPIKey: getEnv("HYPERDX_API_KEY", ""), HyperDXEndpoint: getEnv("HYPERDX_ENDPOINT", "https://in-otel.hyperdx.io"), MLflowTrackingURI: getEnv("MLFLOW_TRACKING_URI", "http://mlflow.mlflow.svc.cluster.local:80"), MLflowExperimentName: getEnv("MLFLOW_EXPERIMENT_NAME", ""), MLflowEnabled: getEnvBool("MLFLOW_ENABLED", true), HealthPort: getEnvInt("HEALTH_PORT", 8080), HealthPath: getEnv("HEALTH_PATH", "/health"), ReadyPath: getEnv("READY_PATH", "/ready"), HTTPTimeout: getEnvDuration("HTTP_TIMEOUT", 60*time.Second), NATSTimeout: getEnvDuration("NATS_TIMEOUT", 30*time.Second), SecretsPath: getEnv("SECRETS_PATH", ""), } } // EmbeddingsURL returns the current embeddings service URL (thread-safe). func (s *Settings) EmbeddingsURL() string { s.mu.RLock() defer s.mu.RUnlock() return s.embeddingsURL } // RerankerURL returns the current reranker service URL (thread-safe). func (s *Settings) RerankerURL() string { s.mu.RLock() defer s.mu.RUnlock() return s.rerankerURL } // LLMURL returns the current LLM service URL (thread-safe). func (s *Settings) LLMURL() string { s.mu.RLock() defer s.mu.RUnlock() return s.llmURL } // TTSURL returns the current TTS service URL (thread-safe). func (s *Settings) TTSURL() string { s.mu.RLock() defer s.mu.RUnlock() return s.ttsURL } // STTURL returns the current STT service URL (thread-safe). func (s *Settings) STTURL() string { s.mu.RLock() defer s.mu.RUnlock() return s.sttURL } // WatchSecrets watches the SecretsPath directory for changes and reloads // hot-reloadable fields. Blocks until ctx is cancelled. func (s *Settings) WatchSecrets(ctx context.Context) { if s.SecretsPath == "" { return } watcher, err := fsnotify.NewWatcher() if err != nil { slog.Error("config: failed to create fsnotify watcher", "error", err) return } defer func() { _ = watcher.Close() }() if err := watcher.Add(s.SecretsPath); err != nil { slog.Error("config: failed to watch secrets path", "error", err, "path", s.SecretsPath) return } slog.Info("config: watching secrets for hot reload", "path", s.SecretsPath) for { select { case event, ok := <-watcher.Events: if !ok { return } if event.Has(fsnotify.Create) || event.Has(fsnotify.Write) { s.reloadFromSecrets() } case err, ok := <-watcher.Errors: if !ok { return } slog.Error("config: fsnotify error", "error", err) case <-ctx.Done(): return } } } // reloadFromSecrets reads hot-reloadable values from the secrets directory. func (s *Settings) reloadFromSecrets() { s.mu.Lock() defer s.mu.Unlock() updated := 0 reload := func(filename string, target *string) { path := filepath.Join(s.SecretsPath, filename) data, err := os.ReadFile(path) if err != nil { return } val := strings.TrimSpace(string(data)) if val != "" && val != *target { *target = val updated++ slog.Info("config: reloaded secret", "key", filename) } } reload("embeddings-url", &s.embeddingsURL) reload("reranker-url", &s.rerankerURL) reload("llm-url", &s.llmURL) reload("tts-url", &s.ttsURL) reload("stt-url", &s.sttURL) if updated > 0 { slog.Info("config: secrets reloaded", "updated", updated) } } func getEnv(key, fallback string) string { if v := os.Getenv(key); v != "" { return v } return fallback } func getEnvInt(key string, fallback int) int { if v := os.Getenv(key); v != "" { if i, err := strconv.Atoi(v); err == nil { return i } } return fallback } func getEnvBool(key string, fallback bool) bool { if v := os.Getenv(key); v != "" { if b, err := strconv.ParseBool(v); err == nil { return b } } return fallback } func getEnvDuration(key string, fallback time.Duration) time.Duration { if v := os.Getenv(key); v != "" { if f, err := strconv.ParseFloat(v, 64); err == nil { return time.Duration(f * float64(time.Second)) } } return fallback }