Source code for tango._telemetry

# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later

import atexit
import contextlib
import enum
import importlib.util
import inspect
import logging
import re
import socket
import sys
import threading
import typing
import warnings
from collections import namedtuple
from contextvars import ContextVar
from dataclasses import dataclass

from tango._tango import ApiUtil, LockerLanguage, _telemetry, is_omni_thread
from tango.constants import TELEMETRY_SUPPORTED
from tango.release import Release

from ._warnings import PyTangoUserWarning, warn_once


def _truthy_env_var(name) -> bool:
    value = ApiUtil.get_env_var(name)
    return bool(value and value.lower() in {"on", "1", "true", "yes", "y"})


def _module_available(module_name: str) -> bool:
    try:
        return importlib.util.find_spec(module_name) is not None
    except (ImportError, ValueError):
        return False


[docs] class TelemetryExporter(enum.IntEnum): """ An enumeration representing the telemetry exporter types. .. versionadded:: 10.3.0 """ GRPC = 0 HTTP = 1 CONSOLE = 2
[docs] class TelemetryType(enum.IntEnum): """ An enumeration representing the telemetry signal types. .. versionadded:: 10.3.0 """ TRACING = 0 LOGGING = 1 NONE = 2
[docs] class TelemetryTopic(enum.IntEnum): """ An enumeration representing the telemetry topics. .. note:: As of version 10.3.0, the only usable topic is ALL. The DATABASE, EVENTS and POLLING topics are not implemented. The USER topic leads to inconsistent results due to an `issue in cppTango <https://gitlab.com/tango-controls/cppTango/-/work_items/1645>`__ A fix is expected in version 10.4.0, which will also bring in a new set of topics. .. versionadded:: 10.3.0 """ DATABASE = 0 EVENTS = 1 POLLING = 2 USER = 3 ALL = 4
[docs] @dataclass(frozen=True) class TelemetryEndpoint: exporter: TelemetryExporter endpoint: str
class TracerProviderFactory(typing.Protocol): def __call__( self, service_name: str, service_instance_id: None | str = None, extra_resource_attributes: None | dict[str, str] = None, endpoints: None | tuple[TelemetryEndpoint, ...] = None, ): ... @dataclass(frozen=True) class _TelemetryEnvConfig: enabled: bool tracing_endpoints: tuple[TelemetryEndpoint, ...] logging_endpoints: tuple[TelemetryEndpoint, ...] types: tuple[TelemetryType, ...] topics: tuple[TelemetryTopic, ...] tracing_enabled: bool logging_enabled: bool @dataclass(frozen=True) class _TelemetryHandlers: get_current_otel_context: typing.Any = None span_from_cpptango: typing.Any = None context_from_cpptango: typing.Any = None span_to_cpptango: typing.Any = None tracer_provider_factory: typing.Any = None create_device_tracer: typing.Any = None _DummySpanContext = namedtuple( "_DummySpanContext", ["trace_id", "span_id", "is_remote", "trace_flags", "trace_state", "is_valid"], ) class _DummySpan: def set_attributes(self, *args, **kwargs): pass def set_attribute(self, *args, **kwargs): pass def add_event(self, *args, **kwargs): pass def add_link(self, *args, **kwargs): pass def update_name(self, *args, **kwargs): pass def is_recording(self): return False def set_status(self, *args, **kwargs): pass def record_exception(self, *args, **kwargs): pass def end(self, *args, **kwargs): pass def get_span_context(self): return _DummySpanContext(0, 0, False, 0, {}, False) def __enter__(self): return self def __exit__(self, *args, **kwargs): self.end() class _DummyTracer: def start_span(self, *args, **kwargs): pass @contextlib.contextmanager def start_as_current_span(self, *args, **kwargs): yield _DummySpan() class _DummyTracerProvider: def get_tracer(self, *args, **kwargs): return _DummyTracer() def _dummy_get_current_otel_context(): return {} @contextlib.contextmanager def _dummy_span_from_cpptango(device, fn): yield _DummySpan() @contextlib.contextmanager def _dummy_context_from_cpptango(): yield @contextlib.contextmanager def _dummy_span_to_cpptango(name: str, context=None, topic: str = ""): yield def _dummy_telemetry_tracer_provider_factory( service_name, service_instance_id=None, extra_resource_attributes=None, endpoints=None, ): return _DummyTracerProvider() def _dummy_create_device_telemetry_tracer(tracer_provider) -> _DummyTracer: return _DummyTracer() _dummy_telemetry_handlers = _TelemetryHandlers( get_current_otel_context=_dummy_get_current_otel_context, span_from_cpptango=_dummy_span_from_cpptango, context_from_cpptango=_dummy_context_from_cpptango, span_to_cpptango=_dummy_span_to_cpptango, tracer_provider_factory=_dummy_telemetry_tracer_provider_factory, create_device_tracer=_dummy_create_device_telemetry_tracer, ) def _telemetry_exporter_to_wire(exporter) -> str: if isinstance(exporter, str): return exporter.lower() return exporter.name.lower() def _telemetry_endpoint_to_wire( endpoint: TelemetryEndpoint, ): return [_telemetry_exporter_to_wire(endpoint.exporter), endpoint.endpoint] def _telemetry_endpoints_to_wire(endpoints): wire = [] for endpoint in endpoints: wire.extend(_telemetry_endpoint_to_wire(endpoint)) return wire def _telemetry_endpoint_from_wire(exporter: str, endpoint: str) -> TelemetryEndpoint: return TelemetryEndpoint(TelemetryExporter[exporter.upper()], endpoint) def _telemetry_endpoints_from_wire(endpoint_pairs) -> tuple[TelemetryEndpoint, ...]: if len(endpoint_pairs) % 2 != 0: raise ValueError("telemetry endpoint data must contain exporter/endpoint pairs") return tuple( _telemetry_endpoint_from_wire(endpoint_pairs[index], endpoint_pairs[index + 1]) for index in range(0, len(endpoint_pairs), 2) ) _trace_api = None _telemetry_runtime_available = False _telemetry_sdk_available = False _telemetry_http_exporter_available = False _telemetry_grpc_exporter_available = False _real_telemetry_handlers = _TelemetryHandlers() def _call_tracer_provider_factory( factory, service_name, service_instance_id=None, extra_resource_attributes=None, endpoints=None, ): signature = inspect.signature(factory) parameters = signature.parameters.values() accepts_endpoints = "endpoints" in signature.parameters or any( parameter.kind == inspect.Parameter.VAR_KEYWORD for parameter in parameters ) if accepts_endpoints: return factory( service_name, service_instance_id, extra_resource_attributes, endpoints=endpoints, ) # fallback to v10.0.0 signature, before endpoints were added return factory(service_name, service_instance_id, extra_resource_attributes) def _warn_if_deprecated_telemetry_env_vars_used() -> None: deprecated_env_vars = ( ("TANGO_TELEMETRY_TRACES_EXPORTER", "TANGO_TELEMETRY_TRACING_EXPORTERS"), ("TANGO_TELEMETRY_TRACES_ENDPOINT", "TANGO_TELEMETRY_TRACING_ENDPOINTS"), ("TANGO_TELEMETRY_LOGS_EXPORTER", "TANGO_TELEMETRY_LOGGING_EXPORTERS"), ("TANGO_TELEMETRY_LOGS_ENDPOINT", "TANGO_TELEMETRY_LOGGING_ENDPOINTS"), ) for name, replacement_name in deprecated_env_vars: _warn_if_deprecated_env_var_used(name, replacement_name) def _warn_if_deprecated_env_var_used(name: str, replacement_name: str) -> None: if ApiUtil.get_env_var(name) is None: return warn_once( f"{name} is deprecated; use {replacement_name} instead.", key=f"deprecated-env:{name}", category=DeprecationWarning, stacklevel=2, ) def _parse_telemetry_endpoints_from_env( enabled: bool, *, exporters_env_var: str, endpoints_env_var: str, ) -> tuple[TelemetryEndpoint, ...]: exporters_value = ApiUtil.get_env_var(exporters_env_var) endpoints_value = ApiUtil.get_env_var(endpoints_env_var) if exporters_value: exporter_names = [item.strip().lower() for item in exporters_value.split(",") if item.strip()] else: exporter_names = ["console" if enabled else "none"] endpoints = [item.strip() for item in endpoints_value.split(",")] if endpoints_value else [] parsed_endpoints = [] if len(endpoints) > len(exporter_names): warnings.warn( f"Ignoring extra endpoint values from {endpoints_env_var}: " f"expected at most {len(exporter_names)}, got {len(endpoints)}.", stacklevel=1, ) endpoints = endpoints[: len(exporter_names)] while len(endpoints) < len(exporter_names): endpoints.append("") for exporter_name, endpoint in zip(exporter_names, endpoints, strict=True): if exporter_name not in {"console", "grpc", "http", "none"}: warnings.warn( f"Unknown value '{exporter_name}' for {exporters_env_var}. Options are: " f"'console', 'grpc', 'http' and 'none'. Defaulting to 'console'.", stacklevel=1, ) exporter_name = "console" if exporter_name == "none": continue parsed_endpoints.append(TelemetryEndpoint(TelemetryExporter[exporter_name.upper()], endpoint)) return tuple(parsed_endpoints) def _parse_telemetry_enum_values_from_env( env_var_name: str, enum_type, ) -> tuple: values = ApiUtil.get_env_var(env_var_name) if not values: return () parsed_values = [] for item in values.split(","): name = item.strip().upper() if not name: continue try: parsed_values.append(enum_type[name]) except KeyError: warnings.warn(f"Unknown value '{item.strip()}' for {env_var_name}. Ignoring it.", stacklevel=1) return tuple(parsed_values) def _get_env_telemetry_config() -> _TelemetryEnvConfig: _warn_if_deprecated_telemetry_env_vars_used() enabled = _truthy_env_var("TANGO_TELEMETRY_ENABLE") telemetry_types = _parse_telemetry_enum_values_from_env( "TANGO_TELEMETRY_TYPES", TelemetryType, ) tracing_enabled = not telemetry_types logging_enabled = not telemetry_types if telemetry_types and TelemetryType.NONE not in telemetry_types: tracing_enabled = TelemetryType.TRACING in telemetry_types logging_enabled = TelemetryType.LOGGING in telemetry_types return _TelemetryEnvConfig( enabled=enabled, tracing_endpoints=_parse_telemetry_endpoints_from_env( enabled, exporters_env_var="TANGO_TELEMETRY_TRACING_EXPORTERS", endpoints_env_var="TANGO_TELEMETRY_TRACING_ENDPOINTS", ), logging_endpoints=_parse_telemetry_endpoints_from_env( enabled, exporters_env_var="TANGO_TELEMETRY_LOGGING_EXPORTERS", endpoints_env_var="TANGO_TELEMETRY_LOGGING_ENDPOINTS", ), types=telemetry_types, topics=_parse_telemetry_enum_values_from_env( "TANGO_TELEMETRY_TOPICS", TelemetryTopic, ), tracing_enabled=tracing_enabled, logging_enabled=logging_enabled, ) _env_telemetry_config = _get_env_telemetry_config() try: _telemetry_disabled_via_env_var = _truthy_env_var("PYTANGO_DISABLE_TELEMETRY_PATCHING") if not _telemetry_disabled_via_env_var and TELEMETRY_SUPPORTED: from opentelemetry import context as context_api from opentelemetry import trace as trace_api from opentelemetry.trace.propagation.tracecontext import ( TraceContextTextMapPropagator, ) _trace_api = trace_api _telemetry_runtime_available = True try: from opentelemetry.sdk.resources import ( HOST_NAME, SERVICE_INSTANCE_ID, SERVICE_NAME, SERVICE_NAMESPACE, Resource, ) from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor, ) _telemetry_http_exporter_available = _module_available( "opentelemetry.exporter.otlp.proto.http.trace_exporter" ) _telemetry_grpc_exporter_available = _module_available( "opentelemetry.exporter.otlp.proto.grpc.trace_exporter" ) _telemetry_sdk_available = True except ImportError: _telemetry_sdk_available = False def _real_get_current_otel_context() -> context_api.Context: return context_api.get_current() _current_telemetry_tracer = ContextVar("current_telemetry_tracer") @contextlib.contextmanager def _real_span_from_cpptango(device, fn) -> typing.Iterator[trace_api.Span]: fn = inspect.unwrap(fn) if not hasattr(fn, "__code__") and hasattr(fn, "__call__"): # noqa: B004 fn = fn.__call__ name = getattr(fn, "__qualname__", getattr(fn, "__name__", "unknown")) code = getattr(fn, "__code__", None) filepath = getattr(code, "co_filename", "unknown") lineno = getattr(code, "co_firstlineno", 0) carrier = _telemetry.get_trace_context() ctx = TraceContextTextMapPropagator().extract(carrier=carrier) device_tracer = device.get_telemetry_tracer() token = _current_telemetry_tracer.set(device_tracer) try: with device_tracer.start_as_current_span(name, context=ctx, kind=trace_api.SpanKind.SERVER) as span: span.set_attribute("code.filepath", filepath) span.set_attribute("code.lineno", lineno) current_thread = threading.current_thread() span.set_attribute("thread.id", hex(current_thread.ident)) span.set_attribute("thread.name", current_thread.name) _add_client_ident_info(device, span) yield span finally: _current_telemetry_tracer.reset(token) @contextlib.contextmanager def _real_context_from_cpptango(): carrier = _telemetry.get_trace_context() ctx = TraceContextTextMapPropagator().extract(carrier=carrier) token = context_api.attach(ctx) try: yield finally: context_api.detach(token) _PID_PATTERN = re.compile(r"\bPID=(\d+)\b", re.IGNORECASE) def _add_client_ident_info(device, span): if not is_omni_thread(): return ident = device.get_client_ident() if not ident: return if ident.client_lang in {LockerLanguage.JAVA, LockerLanguage.JAVA_6}: span.set_attribute( "tango.client_ident.java_ident", f"{ident.java_ident[0]:x}{ident.java_ident[1]:x}", ) span.set_attribute("tango.client_ident.java_main_class", ident.java_main_class) match = _PID_PATTERN.search(ident.java_main_class) client_pid = int(match.group(1)) if match else 0 else: client_pid = ident.client_pid span.set_attribute("tango.client_ident.location", ident.client_ip) span.set_attribute("tango.client_ident.pid", client_pid) span.set_attribute("tango.client_ident.lang", str(ident.client_lang)) @contextlib.contextmanager def _real_span_to_cpptango(name: str, context=None, topic: str = ""): carrier = {} TraceContextTextMapPropagator().inject(carrier, context=context) traceparent = carrier.get("traceparent", "") tracestate = carrier.get("tracestate", "") with _telemetry.TraceContextScope(name, traceparent, tracestate, topic): yield def _create_span_processor_for_endpoint(endpoint: TelemetryEndpoint): global _telemetry_grpc_exporter_available global _telemetry_http_exporter_available endpoint_value = endpoint.endpoint exporter_type = endpoint.exporter if exporter_type == TelemetryExporter.GRPC: if not _telemetry_grpc_exporter_available: return None try: from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( OTLPSpanExporter as OTLPGrpcSpanExporter, ) except ImportError: _telemetry_grpc_exporter_available = False warn_once( "OpenTelemetry OTLP gRPC trace exporter package is not " "available. Install opentelemetry-exporter-otlp-proto-grpc " "to emit spans to gRPC tracing endpoints.", key="telemetry-missing-exporter:grpc", category=PyTangoUserWarning, stacklevel=2, ) return None endpoint_value = endpoint_value.lower() if endpoint_value.startswith("grpc://"): endpoint_value = endpoint_value.replace("grpc://", "http://") exporter = OTLPGrpcSpanExporter(endpoint=endpoint_value) elif exporter_type == TelemetryExporter.HTTP: if not _telemetry_http_exporter_available: return None try: from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( OTLPSpanExporter as OTLPHttpSpanExporter, ) except ImportError: _telemetry_http_exporter_available = False warn_once( "OpenTelemetry OTLP HTTP trace exporter package is not " "available. Install opentelemetry-exporter-otlp-proto-http " "to emit spans to HTTP tracing endpoints.", key="telemetry-missing-exporter:http", category=PyTangoUserWarning, stacklevel=2, ) return None exporter = OTLPHttpSpanExporter(endpoint=endpoint_value) elif exporter_type == TelemetryExporter.CONSOLE: io_out = sys.stderr if endpoint_value.lower() == "cerr" else sys.stdout exporter = ConsoleSpanExporter(out=io_out) else: raise ValueError(f"Unknown exporter type in {endpoint!r}") return _create_span_processor_for_exporter(exporter) def _create_span_processor_for_exporter(exporter): processor_type = ApiUtil.get_env_var("PYTANGO_TELEMETRY_SPAN_PROCESSOR_TYPE") if processor_type and processor_type.lower() == "simple": processor_class = SimpleSpanProcessor elif processor_type and processor_type.lower() == "batch": processor_class = BatchSpanProcessor elif isinstance(exporter, ConsoleSpanExporter): processor_class = SimpleSpanProcessor else: processor_class = BatchSpanProcessor return processor_class(exporter) def _real_default_telemetry_tracer_provider_factory( service_name: str, service_instance_id: None | str = None, extra_resource_attributes: None | dict[str, str] = None, endpoints: None | tuple[TelemetryEndpoint, ...] = None, ) -> trace_api.TracerProvider: if not _telemetry_sdk_available: return trace_api.NoOpTracerProvider() resource_attributes = { HOST_NAME: socket.getfqdn(), SERVICE_NAMESPACE: "tango", SERVICE_NAME: service_name, } if service_instance_id: resource_attributes[SERVICE_INSTANCE_ID] = service_instance_id if extra_resource_attributes: resource_attributes.update(extra_resource_attributes) processors = [] for endpoint in endpoints or (): processor = _create_span_processor_for_endpoint(endpoint) if processor is not None: processors.append(processor) if not processors: return trace_api.NoOpTracerProvider() tracer_provider = TracerProvider(resource=Resource.create(resource_attributes)) for processor in processors: tracer_provider.add_span_processor(processor) return tracer_provider def _real_create_device_telemetry_tracer( tracer_provider: trace_api.TracerProvider, ) -> trace_api.Tracer: return trace_api.get_tracer( instrumenting_module_name="tango.python.server", instrumenting_library_version=Release.version, tracer_provider=tracer_provider, ) @atexit.register def _shutdown_telemetry(): _telemetry.cleanup_default_telemetry_interface() def _set_telemetry_sdk_log_level(): level_str = ApiUtil.get_env_var("PYTANGO_TELEMETRY_SDK_LOG_LEVEL") if level_str: level_int = logging.getLevelNamesMapping()[level_str.upper()] names_csv = ApiUtil.get_env_var("PYTANGO_TELEMETRY_SDK_LOGGER_NAMES") if names_csv: names = names_csv.split(",") else: names = [ "opentelemetry.sdk.trace.export", "opentelemetry.sdk._shared_internal", "opentelemetry.exporter.otlp.proto.grpc.exporter", "opentelemetry.exporter.otlp.proto.http._log_exporter", "opentelemetry.exporter.otlp.proto.http.log_exporter", "opentelemetry.exporter.otlp.proto.http.metric_exporter", "opentelemetry.exporter.otlp.proto.http.trace_exporter", ] for name in names: logging.getLogger(name).setLevel(level_int) _telemetry.set_log_level(level_str) _set_telemetry_sdk_log_level() _real_telemetry_handlers = _TelemetryHandlers( get_current_otel_context=_real_get_current_otel_context, span_from_cpptango=_real_span_from_cpptango, context_from_cpptango=_real_context_from_cpptango, span_to_cpptango=_real_span_to_cpptango, tracer_provider_factory=_real_default_telemetry_tracer_provider_factory, create_device_tracer=_real_create_device_telemetry_tracer, ) except ImportError: pass except Exception as exc: warnings.warn( f"Error setting up telemetry. Telemetry context may not be passed on " f"and traces may not be emitted. Possibly a PyTango bug.\n" f"Error: {exc!r}", stacklevel=1, ) class _TelemetryRuntimeManager: def __init__( self, runtime_available: bool, sdk_available: bool, real_handlers: _TelemetryHandlers, dummy_handlers: _TelemetryHandlers, telemetry_active: bool, client_tracing_endpoints, client_tracing_enabled, ): self._real_handlers = real_handlers self._dummy_handlers = dummy_handlers self._runtime_available = runtime_available self._sdk_available = sdk_available self._telemetry_active = telemetry_active self._using_real_handlers = False self._real_custom_factory = None self._client_tracing_endpoints = client_tracing_endpoints or () self._client_tracing_enabled = client_tracing_enabled self._client_tracer = None self._skip_kernel_spans = None def runtime_available(self) -> bool: return self._runtime_available def skip_kernel_spans(self) -> bool: if self._skip_kernel_spans is None: self._skip_kernel_spans = not _truthy_env_var("PYTANGO_TELEMETRY_EMIT_KERNEL_SPANS") return self._skip_kernel_spans def reset_client_tracer(self): self._client_tracer = None def warn_if_telemetry_requested(self, endpoints: tuple[TelemetryEndpoint, ...]) -> tuple[bool, str]: if _truthy_env_var("PYTANGO_DISABLE_TELEMETRY_PATCHING"): return False, "" if not self._runtime_available: message = ( "OpenTelemetry API packages are not available: telemetry context " "cannot be propagated and Python telemetry spans will not be " "emitted. Install opentelemetry-api to enable context " "propagation, plus opentelemetry-sdk and an OTLP exporter " "package to emit spans." ) warn_now = warn_once( message, key="telemetry-missing-api", category=PyTangoUserWarning, stacklevel=2, ) return warn_now, message if not self._sdk_available: message = ( "OpenTelemetry SDK packages are not available: telemetry can be " "enabled in cppTango, but Python telemetry spans will not be " "emitted. Install opentelemetry-sdk and an OTLP exporter " "package to emit spans." ) warn_now = warn_once( message, key="telemetry-missing-sdk", category=PyTangoUserWarning, stacklevel=2, ) return warn_now, message unavailable_exporters = [] if not _telemetry_http_exporter_available and any( endpoint.exporter == TelemetryExporter.HTTP for endpoint in endpoints ): unavailable_exporters.append(TelemetryExporter.HTTP) if not _telemetry_grpc_exporter_available and any( endpoint.exporter == TelemetryExporter.GRPC for endpoint in endpoints ): unavailable_exporters.append(TelemetryExporter.GRPC) if unavailable_exporters: exporter_messages = { TelemetryExporter.HTTP: ("OpenTelemetry OTLP HTTP trace exporter package is not available"), TelemetryExporter.GRPC: ("OpenTelemetry OTLP gRPC trace exporter package is not available"), } package_messages = { TelemetryExporter.HTTP: ( "Install opentelemetry-exporter-otlp-proto-http to emit spans to HTTP tracing endpoints." ), TelemetryExporter.GRPC: ( "Install opentelemetry-exporter-otlp-proto-grpc to emit spans to gRPC tracing endpoints." ), } messages = [exporter_messages[exporter] for exporter in unavailable_exporters] packages = [package_messages[exporter] for exporter in unavailable_exporters] message = f"{'; '.join(messages)}. {' '.join(packages)}" warn_key_suffix = ",".join(exporter.name.lower() for exporter in unavailable_exporters) warn_now = warn_once( message, key=f"telemetry-missing-exporter:{warn_key_suffix}", category=PyTangoUserWarning, stacklevel=2, ) return warn_now, message return False, "" def switch_handlers(self, use_real: bool): use_real = use_real and self._runtime_available self._using_real_handlers = use_real self.reset_client_tracer() def set_custom_factory(self, provider_factory): self._real_custom_factory = provider_factory self.reset_client_tracer() def get_factory(self): if self._using_real_handlers: return self._real_custom_factory or self._real_handlers.tracer_provider_factory return self._dummy_handlers.tracer_provider_factory def get_client_tracing_endpoints(self) -> tuple[TelemetryEndpoint, ...]: return self._client_tracing_endpoints def refresh_from_env(self): config = _get_env_telemetry_config() enabled = self._runtime_available and config.enabled config_changed = ( enabled != self._telemetry_active or config.tracing_endpoints != self._client_tracing_endpoints or config.tracing_enabled != self._client_tracing_enabled ) if config_changed and config.enabled: self.warn_if_telemetry_requested(config.tracing_endpoints) if config_changed: self._telemetry_active = enabled self._client_tracing_endpoints = config.tracing_endpoints self._client_tracing_enabled = config.tracing_enabled self.reset_client_tracer() def can_propagate_context(self): return self._runtime_available and self._telemetry_active def should_emit_client_spans(self): return self.can_propagate_context() and self._client_tracing_enabled def get_current_otel_context(self): if self.can_propagate_context(): return self._real_handlers.get_current_otel_context() return self._dummy_handlers.get_current_otel_context() def span_from_cpptango(self, device, fn): if self._runtime_available and device._is_telemetry_enabled(): return self._real_handlers.span_from_cpptango(device, fn) return self._dummy_handlers.span_from_cpptango(device, fn) def context_from_cpptango(self): if self.can_propagate_context(): return self._real_handlers.context_from_cpptango() return self._dummy_handlers.context_from_cpptango() def span_to_cpptango(self, name: str, context=None, topic: str = ""): if self.can_propagate_context(): return self._real_handlers.span_to_cpptango(name, context=context, topic=topic) return self._dummy_handlers.span_to_cpptango(name, context=context, topic=topic) def create_device_telemetry_tracer(self, tracer_provider): if self._using_real_handlers: return self._real_handlers.create_device_tracer(tracer_provider) return self._dummy_handlers.create_device_tracer(tracer_provider) def using_real_handlers(self): return self._using_real_handlers def get_or_create_client_tracer(self): if self._client_tracer is None and self._runtime_available: self.warn_if_telemetry_requested(self._client_tracing_endpoints) self._client_tracer = self._create_client_tracer() return self._client_tracer def _create_client_tracer(self): service_name = ApiUtil.get_env_var("PYTANGO_TELEMETRY_CLIENT_SERVICE_NAME") if not service_name: service_name = "pytango.client" tracer_provider = _call_tracer_provider_factory( self.get_factory(), service_name, endpoints=self._client_tracing_endpoints, ) return _trace_api.get_tracer( instrumenting_module_name="tango.python.client", instrumenting_library_version=Release.version, tracer_provider=tracer_provider, ) _telemetry_runtime = _TelemetryRuntimeManager( runtime_available=_telemetry_runtime_available, sdk_available=_telemetry_sdk_available, real_handlers=_real_telemetry_handlers, dummy_handlers=_dummy_telemetry_handlers, telemetry_active=_env_telemetry_config.enabled, client_tracing_endpoints=_env_telemetry_config.tracing_endpoints, client_tracing_enabled=_env_telemetry_config.tracing_enabled, ) _telemetry_runtime.switch_handlers(use_real=_telemetry_runtime_available)
[docs] def set_telemetry_tracer_provider_factory(provider_factory: TracerProviderFactory): """ Change the factory that will be used to create tracer providers. .. versionadded:: 10.3.0 """ _telemetry_runtime.set_custom_factory(provider_factory)
def _wrap_tracer_provider_factory_with_client_env_defaults( provider_factory: TracerProviderFactory, ) -> TracerProviderFactory: def wrapped_factory( service_name: str, service_instance_id: None | str = None, extra_resource_attributes: None | dict[str, str] = None, endpoints: None | tuple[TelemetryEndpoint, ...] = None, ): _telemetry_runtime.refresh_from_env() if endpoints is None: endpoints = _telemetry_runtime.get_client_tracing_endpoints() return _call_tracer_provider_factory( provider_factory, service_name, service_instance_id, extra_resource_attributes, endpoints=endpoints, ) return wrapped_factory
[docs] def get_telemetry_tracer_provider_factory() -> TracerProviderFactory: """ Get the factory that will be used to create tracer providers. If ``endpoints`` is omitted when calling the returned factory, the current client tracing endpoints derived from the environment variables are used. .. versionadded:: 10.3.0 """ return _wrap_tracer_provider_factory_with_client_env_defaults(_telemetry_runtime.get_factory())