All checks were successful
CI / Test (push) Successful in 3m2s
CI / Lint (push) Successful in 3m7s
CI / Release (push) Successful in 1m55s
CI / Notify Downstream (stt-module) (push) Successful in 1s
CI / Notify Downstream (voice-assistant) (push) Successful in 1s
CI / Notify (push) Successful in 2s
CI / Notify Downstream (chat-handler) (push) Successful in 1s
CI / Notify Downstream (pipeline-bridge) (push) Successful in 1s
CI / Notify Downstream (tts-module) (push) Successful in 1s
269 lines
7.2 KiB
Go
269 lines
7.2 KiB
Go
// Package config provides environment-based configuration for handler services
|
|
// with optional live reload of secrets and service endpoints.
|
|
package config
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fsnotify/fsnotify"
|
|
)
|
|
|
|
// Settings holds base configuration for all handler services.
|
|
// Fields in the "hot-reload" section are protected by a RWMutex and can be
|
|
// updated at runtime via WatchSecrets(). All other fields are immutable
|
|
// after Load() returns.
|
|
type Settings struct {
|
|
// Service identification (immutable)
|
|
ServiceName string
|
|
ServiceVersion string
|
|
ServiceNamespace string
|
|
DeploymentEnv string
|
|
|
|
// NATS configuration (immutable)
|
|
NATSURL string
|
|
NATSUser string
|
|
NATSPassword string
|
|
NATSQueueGroup string
|
|
|
|
// Redis/Valkey configuration (immutable)
|
|
RedisURL string
|
|
RedisPassword string
|
|
|
|
// Milvus configuration (immutable)
|
|
MilvusHost string
|
|
MilvusPort int
|
|
MilvusCollection string
|
|
|
|
// OpenTelemetry configuration (immutable)
|
|
OTELEnabled bool
|
|
OTELEndpoint string
|
|
OTELUseHTTP bool
|
|
|
|
// HyperDX configuration (immutable)
|
|
HyperDXEnabled bool
|
|
HyperDXAPIKey string
|
|
HyperDXEndpoint string
|
|
|
|
// MLflow configuration (immutable)
|
|
MLflowTrackingURI string
|
|
MLflowExperimentName string
|
|
MLflowEnabled bool
|
|
|
|
// Health check configuration (immutable)
|
|
HealthPort int
|
|
HealthPath string
|
|
ReadyPath string
|
|
|
|
// Timeouts (immutable)
|
|
HTTPTimeout time.Duration
|
|
NATSTimeout time.Duration
|
|
|
|
// Hot-reloadable fields — access via getter methods.
|
|
mu sync.RWMutex
|
|
embeddingsURL string
|
|
rerankerURL string
|
|
llmURL string
|
|
ttsURL string
|
|
sttURL string
|
|
|
|
// Secrets path for file-based hot reload (Kubernetes secret mounts)
|
|
SecretsPath string
|
|
}
|
|
|
|
// Load creates a Settings populated from environment variables with defaults.
|
|
func Load() *Settings {
|
|
return &Settings{
|
|
ServiceName: getEnv("SERVICE_NAME", "handler"),
|
|
ServiceVersion: getEnv("SERVICE_VERSION", "1.0.0"),
|
|
ServiceNamespace: getEnv("SERVICE_NAMESPACE", "ai-ml"),
|
|
DeploymentEnv: getEnv("DEPLOYMENT_ENV", "production"),
|
|
|
|
NATSURL: getEnv("NATS_URL", "nats://nats.ai-ml.svc.cluster.local:4222"),
|
|
NATSUser: getEnv("NATS_USER", ""),
|
|
NATSPassword: getEnv("NATS_PASSWORD", ""),
|
|
NATSQueueGroup: getEnv("NATS_QUEUE_GROUP", ""),
|
|
|
|
RedisURL: getEnv("REDIS_URL", "redis://valkey.ai-ml.svc.cluster.local:6379"),
|
|
RedisPassword: getEnv("REDIS_PASSWORD", ""),
|
|
|
|
MilvusHost: getEnv("MILVUS_HOST", "milvus.ai-ml.svc.cluster.local"),
|
|
MilvusPort: getEnvInt("MILVUS_PORT", 19530),
|
|
MilvusCollection: getEnv("MILVUS_COLLECTION", "documents"),
|
|
|
|
embeddingsURL: getEnv("EMBEDDINGS_URL", "http://embeddings-predictor.ai-ml.svc.cluster.local"),
|
|
rerankerURL: getEnv("RERANKER_URL", "http://reranker-predictor.ai-ml.svc.cluster.local"),
|
|
llmURL: getEnv("LLM_URL", "http://vllm-predictor.ai-ml.svc.cluster.local"),
|
|
ttsURL: getEnv("TTS_URL", "http://tts-predictor.ai-ml.svc.cluster.local"),
|
|
sttURL: getEnv("STT_URL", "http://whisper-predictor.ai-ml.svc.cluster.local"),
|
|
|
|
OTELEnabled: getEnvBool("OTEL_ENABLED", true),
|
|
OTELEndpoint: getEnv("OTEL_ENDPOINT", "http://opentelemetry-collector.observability.svc.cluster.local:4317"),
|
|
OTELUseHTTP: getEnvBool("OTEL_USE_HTTP", false),
|
|
|
|
HyperDXEnabled: getEnvBool("HYPERDX_ENABLED", false),
|
|
HyperDXAPIKey: getEnv("HYPERDX_API_KEY", ""),
|
|
HyperDXEndpoint: getEnv("HYPERDX_ENDPOINT", "https://in-otel.hyperdx.io"),
|
|
|
|
MLflowTrackingURI: getEnv("MLFLOW_TRACKING_URI", "http://mlflow.mlflow.svc.cluster.local:80"),
|
|
MLflowExperimentName: getEnv("MLFLOW_EXPERIMENT_NAME", ""),
|
|
MLflowEnabled: getEnvBool("MLFLOW_ENABLED", true),
|
|
|
|
HealthPort: getEnvInt("HEALTH_PORT", 8080),
|
|
HealthPath: getEnv("HEALTH_PATH", "/health"),
|
|
ReadyPath: getEnv("READY_PATH", "/ready"),
|
|
|
|
HTTPTimeout: getEnvDuration("HTTP_TIMEOUT", 60*time.Second),
|
|
NATSTimeout: getEnvDuration("NATS_TIMEOUT", 30*time.Second),
|
|
|
|
SecretsPath: getEnv("SECRETS_PATH", ""),
|
|
}
|
|
}
|
|
|
|
// EmbeddingsURL returns the current embeddings service URL (thread-safe).
|
|
func (s *Settings) EmbeddingsURL() string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.embeddingsURL
|
|
}
|
|
|
|
// RerankerURL returns the current reranker service URL (thread-safe).
|
|
func (s *Settings) RerankerURL() string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.rerankerURL
|
|
}
|
|
|
|
// LLMURL returns the current LLM service URL (thread-safe).
|
|
func (s *Settings) LLMURL() string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.llmURL
|
|
}
|
|
|
|
// TTSURL returns the current TTS service URL (thread-safe).
|
|
func (s *Settings) TTSURL() string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.ttsURL
|
|
}
|
|
|
|
// STTURL returns the current STT service URL (thread-safe).
|
|
func (s *Settings) STTURL() string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.sttURL
|
|
}
|
|
|
|
// WatchSecrets watches the SecretsPath directory for changes and reloads
|
|
// hot-reloadable fields. Blocks until ctx is cancelled.
|
|
func (s *Settings) WatchSecrets(ctx context.Context) {
|
|
if s.SecretsPath == "" {
|
|
return
|
|
}
|
|
|
|
watcher, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
slog.Error("config: failed to create fsnotify watcher", "error", err)
|
|
return
|
|
}
|
|
defer func() { _ = watcher.Close() }()
|
|
|
|
if err := watcher.Add(s.SecretsPath); err != nil {
|
|
slog.Error("config: failed to watch secrets path", "error", err, "path", s.SecretsPath)
|
|
return
|
|
}
|
|
|
|
slog.Info("config: watching secrets for hot reload", "path", s.SecretsPath)
|
|
|
|
for {
|
|
select {
|
|
case event, ok := <-watcher.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
if event.Has(fsnotify.Create) || event.Has(fsnotify.Write) {
|
|
s.reloadFromSecrets()
|
|
}
|
|
case err, ok := <-watcher.Errors:
|
|
if !ok {
|
|
return
|
|
}
|
|
slog.Error("config: fsnotify error", "error", err)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// reloadFromSecrets reads hot-reloadable values from the secrets directory.
|
|
func (s *Settings) reloadFromSecrets() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
updated := 0
|
|
reload := func(filename string, target *string) {
|
|
path := filepath.Join(s.SecretsPath, filename)
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return
|
|
}
|
|
val := strings.TrimSpace(string(data))
|
|
if val != "" && val != *target {
|
|
*target = val
|
|
updated++
|
|
slog.Info("config: reloaded secret", "key", filename)
|
|
}
|
|
}
|
|
|
|
reload("embeddings-url", &s.embeddingsURL)
|
|
reload("reranker-url", &s.rerankerURL)
|
|
reload("llm-url", &s.llmURL)
|
|
reload("tts-url", &s.ttsURL)
|
|
reload("stt-url", &s.sttURL)
|
|
|
|
if updated > 0 {
|
|
slog.Info("config: secrets reloaded", "updated", updated)
|
|
}
|
|
}
|
|
|
|
func getEnv(key, fallback string) string {
|
|
if v := os.Getenv(key); v != "" {
|
|
return v
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func getEnvInt(key string, fallback int) int {
|
|
if v := os.Getenv(key); v != "" {
|
|
if i, err := strconv.Atoi(v); err == nil {
|
|
return i
|
|
}
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func getEnvBool(key string, fallback bool) bool {
|
|
if v := os.Getenv(key); v != "" {
|
|
if b, err := strconv.ParseBool(v); err == nil {
|
|
return b
|
|
}
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func getEnvDuration(key string, fallback time.Duration) time.Duration {
|
|
if v := os.Getenv(key); v != "" {
|
|
if f, err := strconv.ParseFloat(v, 64); err == nil {
|
|
return time.Duration(f * float64(time.Second))
|
|
}
|
|
}
|
|
return fallback
|
|
}
|