Files
handler-base/handler_base/telemetry.py
Billy D. 99c97b7973 feat: Add handler-base library for NATS AI/ML services
- Handler base class with graceful shutdown and signal handling
- NATSClient with JetStream and msgpack serialization
- Pydantic Settings for environment configuration
- HealthServer for Kubernetes probes
- OpenTelemetry telemetry setup
- Service clients: STT, TTS, LLM, Embeddings, Reranker, Milvus
2026-02-01 20:36:00 -05:00

155 lines
4.9 KiB
Python

"""
OpenTelemetry setup for tracing and metrics.
Supports both gRPC and HTTP exporters, with optional HyperDX integration.
"""
import logging
import os
from typing import Optional, Tuple
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as OTLPSpanExporterHTTP,
)
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter as OTLPMetricExporterHTTP,
)
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION, SERVICE_NAMESPACE
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from handler_base.config import Settings
logger = logging.getLogger(__name__)
# Global references
_tracer: Optional[trace.Tracer] = None
_meter: Optional[metrics.Meter] = None
_initialized = False
def setup_telemetry(
settings: Optional[Settings] = None,
) -> Tuple[Optional[trace.Tracer], Optional[metrics.Meter]]:
"""
Initialize OpenTelemetry tracing and metrics.
Args:
settings: Configuration settings. If None, loads from environment.
Returns:
Tuple of (tracer, meter) or (None, None) if disabled.
"""
global _tracer, _meter, _initialized
if _initialized:
return _tracer, _meter
if settings is None:
settings = Settings()
if not settings.otel_enabled:
logger.info("OpenTelemetry disabled")
_initialized = True
return None, None
# Create resource with service information
resource = Resource.create({
SERVICE_NAME: settings.service_name,
SERVICE_VERSION: settings.service_version,
SERVICE_NAMESPACE: settings.service_namespace,
"deployment.environment": settings.deployment_env,
"host.name": os.environ.get("HOSTNAME", "unknown"),
})
# Determine endpoint and exporter type
if settings.hyperdx_enabled and settings.hyperdx_api_key:
# HyperDX uses HTTP with API key header
endpoint = settings.hyperdx_endpoint
headers = {"authorization": settings.hyperdx_api_key}
use_http = True
logger.info(f"Using HyperDX endpoint: {endpoint}")
else:
endpoint = settings.otel_endpoint
headers = None
use_http = settings.otel_use_http
logger.info(f"Using OTEL endpoint: {endpoint} (HTTP: {use_http})")
# Setup tracing
if use_http:
trace_exporter = OTLPSpanExporterHTTP(
endpoint=f"{endpoint}/v1/traces",
headers=headers,
)
else:
trace_exporter = OTLPSpanExporter(
endpoint=endpoint,
)
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
trace.set_tracer_provider(tracer_provider)
# Setup metrics
if use_http:
metric_exporter = OTLPMetricExporterHTTP(
endpoint=f"{endpoint}/v1/metrics",
headers=headers,
)
else:
metric_exporter = OTLPMetricExporter(
endpoint=endpoint,
)
metric_reader = PeriodicExportingMetricReader(
metric_exporter,
export_interval_millis=60000,
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
# Instrument libraries
HTTPXClientInstrumentor().instrument()
LoggingInstrumentor().instrument(set_logging_format=True)
# Create tracer and meter for this service
_tracer = trace.get_tracer(settings.service_name, settings.service_version)
_meter = metrics.get_meter(settings.service_name, settings.service_version)
logger.info(f"OpenTelemetry initialized for {settings.service_name}")
_initialized = True
return _tracer, _meter
def get_tracer() -> Optional[trace.Tracer]:
"""Get the global tracer instance."""
return _tracer
def get_meter() -> Optional[metrics.Meter]:
"""Get the global meter instance."""
return _meter
def create_span(name: str, **kwargs):
"""
Create a new span.
Usage:
with create_span("my_operation") as span:
span.set_attribute("key", "value")
# do work
"""
if _tracer is None:
# Return a no-op context manager
from contextlib import nullcontext
return nullcontext()
return _tracer.start_as_current_span(name, **kwargs)