diff --git a/CHANGELOG.md b/CHANGELOG.md index e991ae533a..584d714f0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -136,6 +136,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3681](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3681)) - `opentelemetry-instrumentation-flask`: Fix exemplars generation for `http.server.request.duration` and `http.server.duration` metrics ([#3912](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3912)) +- `opentelemetry-instrumentation-botocore`: Add support for instrumenting `aiobotocore` + ([#4049](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4049)) ### Added diff --git a/instrumentation/README.md b/instrumentation/README.md index 1597be99af..29c9ca744f 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -13,7 +13,7 @@ | [opentelemetry-instrumentation-aws-lambda](./opentelemetry-instrumentation-aws-lambda) | aws_lambda | No | development | [opentelemetry-instrumentation-boto](./opentelemetry-instrumentation-boto) | boto~=2.0 | No | development | [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto3 ~= 1.0 | No | development -| [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 | No | development +| [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore~=1.0,aiobotocore~=2.0 | No | development | [opentelemetry-instrumentation-cassandra](./opentelemetry-instrumentation-cassandra) | cassandra-driver ~= 3.25,scylla-driver ~= 3.25 | No | development | [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | No | development | [opentelemetry-instrumentation-click](./opentelemetry-instrumentation-click) | click >= 8.1.3, < 9.0.0 | No | development diff --git a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml index cc4f480c45..a48793375d 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-botocore/pyproject.toml @@ -32,12 +32,15 @@ dependencies = [ ] [project.optional-dependencies] -instruments = [ +instruments = [] +instruments-any = [ "botocore ~= 1.0", + "aiobotocore ~= 2.0", ] [project.entry-points.opentelemetry_instrumentor] botocore = "opentelemetry.instrumentation.botocore:BotocoreInstrumentor" +aiobotocore = "opentelemetry.instrumentation.botocore:AiobotocoreInstrumentor" [project.urls] Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-botocore" diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py index a5c4bd1c19..ba5b5c667c 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py @@ -97,39 +97,46 @@ def response_hook(span, service_name, operation_name, result): """ import logging -from typing import Any, Callable, Collection, Dict, Optional, Tuple +from typing import Any, Collection, Dict, Optional, Tuple from botocore.client import BaseClient from botocore.endpoint import Endpoint from botocore.exceptions import ClientError from wrapt import wrap_function_wrapper -from opentelemetry._logs import get_logger from opentelemetry.instrumentation.botocore.extensions import ( - _find_extension, - _has_extension, + _AIOBOTOCORE_EXTENSIONS, + _BOTOCORE_EXTENSIONS, +) +from opentelemetry.instrumentation.botocore.extensions.registry import ( + ExtensionRegistry, ) from opentelemetry.instrumentation.botocore.extensions.types import ( _AwsSdkCallContext, - _AwsSdkExtension, _BotocoreInstrumentorContext, ) -from opentelemetry.instrumentation.botocore.package import _instruments -from opentelemetry.instrumentation.botocore.utils import get_server_attributes -from opentelemetry.instrumentation.botocore.version import __version__ +from opentelemetry.instrumentation.botocore.package import ( + _instruments_aiobotocore, + _instruments_botocore, +) +from opentelemetry.instrumentation.botocore.utils import ( + _safe_invoke, + get_server_attributes, +) from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import ( is_instrumentation_enabled, suppress_http_instrumentation, unwrap, ) -from opentelemetry.metrics import Instrument, Meter, get_meter -from opentelemetry.propagators.aws.aws_xray_propagator import AwsXRayPropagator +from opentelemetry.propagators.aws.aws_xray_propagator import ( + TRACE_HEADER_KEY, + AwsXRayPropagator, +) from opentelemetry.semconv._incubating.attributes.cloud_attributes import ( CLOUD_REGION, ) from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.trace import get_tracer from opentelemetry.trace.span import Span logger = logging.getLogger(__name__) @@ -145,23 +152,16 @@ def __init__(self): super().__init__() self.request_hook = None self.response_hook = None + self.extension_registry = ExtensionRegistry( + __name__, _BOTOCORE_EXTENSIONS, None, None, None + ) self.propagator = AwsXRayPropagator() def instrumentation_dependencies(self) -> Collection[str]: - return _instruments + return _instruments_botocore def _instrument(self, **kwargs): # pylint: disable=attribute-defined-outside-init - - # tracers are lazy initialized per-extension in _get_tracer - self._tracers = {} - # loggers are lazy initialized per-extension in _get_logger - self._loggers = {} - # meters are lazy initialized per-extension in _get_meter - self._meters = {} - # metrics are lazy initialized per-extension in _get_metrics - self._metrics: Dict[str, Dict[str, Instrument]] = {} - self.request_hook = kwargs.get("request_hook") self.response_hook = kwargs.get("response_hook") @@ -169,9 +169,13 @@ def _instrument(self, **kwargs): if propagator is not None: self.propagator = propagator - self.tracer_provider = kwargs.get("tracer_provider") - self.logger_provider = kwargs.get("logger_provider") - self.meter_provider = kwargs.get("meter_provider") + self.extension_registry = ExtensionRegistry( + __name__, + _BOTOCORE_EXTENSIONS, + kwargs.get("tracer_provider"), + kwargs.get("logger_provider"), + kwargs.get("meter_provider"), + ) wrap_function_wrapper( "botocore.client", @@ -185,84 +189,160 @@ def _instrument(self, **kwargs): self._patched_endpoint_prepare_request, ) - @staticmethod - def _get_instrumentation_name(extension: _AwsSdkExtension) -> str: - has_extension = _has_extension(extension._call_context) - return ( - f"{__name__}.{extension._call_context.service}" - if has_extension - else __name__ + def _uninstrument(self, **kwargs): + unwrap(BaseClient, "_make_api_call") + unwrap(Endpoint, "prepare_request") + + # pylint: disable=unused-argument + def _patched_endpoint_prepare_request( + self, wrapped, instance, args, kwargs + ): + request = args[0] + headers = request.headers + + # There may be situations where both Botocore and Aiobotocore are + # instrumented at the same time. To avoid double-injection of headers, + # we add a check to see if the header is already present. If it is, + # we skip injection. + if TRACE_HEADER_KEY in headers: + return wrapped(*args, **kwargs) + + # Only the x-ray header is propagated by AWS services. Using any + # other propagator will lose the trace context. + self.propagator.inject(headers) + + return wrapped(*args, **kwargs) + + # pylint: disable=too-many-branches + def _patched_api_call(self, original_func, instance, args, kwargs): + if not is_instrumentation_enabled(): + return original_func(*args, **kwargs) + + call_context = _determine_call_context(instance, args) + if call_context is None: + return original_func(*args, **kwargs) + + extension = self.extension_registry.get_extension(call_context) + if not extension.should_trace_service_call(): + return original_func(*args, **kwargs) + + attributes = { + SpanAttributes.RPC_SYSTEM: "aws-api", + SpanAttributes.RPC_SERVICE: call_context.service_id, + SpanAttributes.RPC_METHOD: call_context.operation, + CLOUD_REGION: call_context.region, + **get_server_attributes(call_context.endpoint_url), + } + + _safe_invoke(extension.extract_attributes, attributes) + end_span_on_exit = extension.should_end_span_on_exit() + + tracer = self.extension_registry.get_tracer(extension) + metrics = self.extension_registry.get_metrics(extension) + instrumentor_ctx = _BotocoreInstrumentorContext( + logger=self.extension_registry.get_logger(extension), + metrics=metrics, ) + with tracer.start_as_current_span( + call_context.span_name, + kind=call_context.span_kind, + attributes=attributes, + # tracing streaming services require to close the span manually + # at a later time after the stream has been consumed + end_on_exit=end_span_on_exit, + ) as span: + _safe_invoke(extension.before_service_call, span, instrumentor_ctx) + self._call_request_hook(span, call_context) - def _get_tracer(self, extension: _AwsSdkExtension): - """This is a multiplexer in order to have a tracer per extension""" + try: + with suppress_http_instrumentation(): + result = None + try: + result = original_func(*args, **kwargs) + except ClientError as error: + result = getattr(error, "response", None) + _apply_response_attributes(span, result) + _safe_invoke( + extension.on_error, span, error, instrumentor_ctx + ) + raise + _apply_response_attributes(span, result) + _safe_invoke( + extension.on_success, span, result, instrumentor_ctx + ) + finally: + _safe_invoke(extension.after_service_call, instrumentor_ctx) + self._call_response_hook(span, call_context, result) - instrumentation_name = self._get_instrumentation_name(extension) - tracer = self._tracers.get(instrumentation_name) - if tracer: - return tracer + return result - schema_version = extension.tracer_schema_version() - self._tracers[instrumentation_name] = get_tracer( - instrumentation_name, - __version__, - self.tracer_provider, - schema_url=f"https://opentelemetry.io/schemas/{schema_version}", + def _call_request_hook(self, span: Span, call_context: _AwsSdkCallContext): + if not callable(self.request_hook): + return + self.request_hook( + span, + call_context.service, + call_context.operation, + call_context.params, ) - return self._tracers[instrumentation_name] - - def _get_logger(self, extension: _AwsSdkExtension): - """This is a multiplexer in order to have a logger per extension""" - - instrumentation_name = self._get_instrumentation_name(extension) - instrumentation_logger = self._loggers.get(instrumentation_name) - if instrumentation_logger: - return instrumentation_logger - - schema_version = extension.event_logger_schema_version() - self._loggers[instrumentation_name] = get_logger( - instrumentation_name, - "", - schema_url=f"https://opentelemetry.io/schemas/{schema_version}", - logger_provider=self.logger_provider, + + def _call_response_hook( + self, span: Span, call_context: _AwsSdkCallContext, result + ): + if not callable(self.response_hook): + return + self.response_hook( + span, call_context.service, call_context.operation, result ) - return self._loggers[instrumentation_name] - def _get_meter(self, extension: _AwsSdkExtension): - """This is a multiplexer in order to have a meter per extension""" +class AiobotocoreInstrumentor(BaseInstrumentor): + """An instrumentor for Aiobotocore. - instrumentation_name = self._get_instrumentation_name(extension) - meter = self._meters.get(instrumentation_name) - if meter: - return meter + See `BaseInstrumentor` + """ - schema_version = extension.meter_schema_version() - self._meters[instrumentation_name] = get_meter( - instrumentation_name, - "", - schema_url=f"https://opentelemetry.io/schemas/{schema_version}", - meter_provider=self.meter_provider, - ) + def __init__(self): + super().__init__() + self.request_hook = None + self.response_hook = None + self.extension_registry = None + self.propagator = AwsXRayPropagator() - return self._meters[instrumentation_name] + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments_aiobotocore - def _get_metrics( - self, extension: _AwsSdkExtension, meter: Meter - ) -> Dict[str, Instrument]: - """This is a multiplexer for lazy initialization of metrics required by extensions""" - instrumentation_name = self._get_instrumentation_name(extension) - metrics = self._metrics.get(instrumentation_name) - if metrics is not None: - return metrics + def _instrument(self, **kwargs): + # pylint: disable=attribute-defined-outside-init + self.request_hook = kwargs.get("request_hook") + self.response_hook = kwargs.get("response_hook") + + propagator = kwargs.get("propagator") + if propagator is not None: + self.propagator = propagator + + self.extension_registry = ExtensionRegistry( + __name__, + _AIOBOTOCORE_EXTENSIONS, + kwargs.get("tracer_provider"), + kwargs.get("logger_provider"), + kwargs.get("meter_provider"), + ) + + wrap_function_wrapper( + "aiobotocore.client", + "AioBaseClient._make_api_call", + self._patched_api_call, + ) - self._metrics.setdefault(instrumentation_name, {}) - metrics = self._metrics[instrumentation_name] - _safe_invoke(extension.setup_metrics, meter, metrics) - return metrics + wrap_function_wrapper( + "botocore.endpoint", + "Endpoint.prepare_request", + self._patched_endpoint_prepare_request, + ) def _uninstrument(self, **kwargs): - unwrap(BaseClient, "_make_api_call") + unwrap("aiobotocore.client.AioBaseClient", "_make_api_call") unwrap(Endpoint, "prepare_request") # pylint: disable=unused-argument @@ -272,6 +352,13 @@ def _patched_endpoint_prepare_request( request = args[0] headers = request.headers + # There may be situations where both Botocore and Aiobotocore are + # instrumented at the same time. To avoid double-injection of headers, + # we add a check to see if the header is already present. If it is, + # we skip injection. + if TRACE_HEADER_KEY in headers: + return wrapped(*args, **kwargs) + # Only the x-ray header is propagated by AWS services. Using any # other propagator will lose the trace context. self.propagator.inject(headers) @@ -279,17 +366,17 @@ def _patched_endpoint_prepare_request( return wrapped(*args, **kwargs) # pylint: disable=too-many-branches - def _patched_api_call(self, original_func, instance, args, kwargs): + async def _patched_api_call(self, original_func, instance, args, kwargs): if not is_instrumentation_enabled(): - return original_func(*args, **kwargs) + return await original_func(*args, **kwargs) call_context = _determine_call_context(instance, args) if call_context is None: - return original_func(*args, **kwargs) + return await original_func(*args, **kwargs) - extension = _find_extension(call_context) + extension = self.extension_registry.get_extension(call_context) if not extension.should_trace_service_call(): - return original_func(*args, **kwargs) + return await original_func(*args, **kwargs) attributes = { SpanAttributes.RPC_SYSTEM: "aws-api", @@ -302,11 +389,10 @@ def _patched_api_call(self, original_func, instance, args, kwargs): _safe_invoke(extension.extract_attributes, attributes) end_span_on_exit = extension.should_end_span_on_exit() - tracer = self._get_tracer(extension) - meter = self._get_meter(extension) - metrics = self._get_metrics(extension, meter) + tracer = self.extension_registry.get_tracer(extension) + metrics = self.extension_registry.get_metrics(extension) instrumentor_ctx = _BotocoreInstrumentorContext( - logger=self._get_logger(extension), + logger=self.extension_registry.get_logger(extension), metrics=metrics, ) with tracer.start_as_current_span( @@ -324,7 +410,7 @@ def _patched_api_call(self, original_func, instance, args, kwargs): with suppress_http_instrumentation(): result = None try: - result = original_func(*args, **kwargs) + result = await original_func(*args, **kwargs) except ClientError as error: result = getattr(error, "response", None) _apply_response_attributes(span, result) @@ -411,14 +497,3 @@ def _determine_call_context( # extracting essential attributes ('service' and 'operation') failed. logger.error("Error when initializing call context", exc_info=ex) return None - - -def _safe_invoke(function: Callable, *args): - function_name = "" - try: - function_name = function.__name__ - function(*args) - except Exception as ex: # pylint:disable=broad-except - logger.error( - "Error when invoking function '%s'", function_name, exc_info=ex - ) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py index dd8ba24e9f..48b78e8efd 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py @@ -15,11 +15,6 @@ import importlib import logging -from opentelemetry.instrumentation.botocore.extensions.types import ( - _AwsSdkCallContext, - _AwsSdkExtension, -) - _logger = logging.getLogger(__name__) @@ -31,7 +26,7 @@ def loader(): return loader -_KNOWN_EXTENSIONS = { +_BOTOCORE_EXTENSIONS = { "bedrock-runtime": _lazy_load(".bedrock", "_BedrockRuntimeExtension"), "dynamodb": _lazy_load(".dynamodb", "_DynamoDbExtension"), "lambda": _lazy_load(".lmbd", "_LambdaExtension"), @@ -43,19 +38,13 @@ def loader(): "sqs": _lazy_load(".sqs", "_SqsExtension"), } - -def _has_extension(call_context: _AwsSdkCallContext) -> bool: - return call_context.service in _KNOWN_EXTENSIONS - - -def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension: - try: - loader = _KNOWN_EXTENSIONS.get(call_context.service) - if loader is None: - return _AwsSdkExtension(call_context) - - extension_cls = loader() - return extension_cls(call_context) - except Exception as ex: # pylint: disable=broad-except - _logger.error("Error when loading extension: %s", ex) - return _AwsSdkExtension(call_context) +_AIOBOTOCORE_EXTENSIONS = { + "dynamodb": _lazy_load(".dynamodb", "_DynamoDbExtension"), + "lambda": _lazy_load(".lmbd", "_LambdaExtension"), + "secretsmanager": _lazy_load( + ".secretsmanager", "_SecretsManagerExtension" + ), + "stepfunctions": _lazy_load(".sfns", "_StepFunctionsExtension"), + "sns": _lazy_load(".sns", "_SnsExtension"), + "sqs": _lazy_load(".sqs", "_SqsExtension"), +} diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py new file mode 100644 index 0000000000..f3429cf23c --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/registry.py @@ -0,0 +1,217 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Callable, Mapping, Optional + +from opentelemetry._logs import get_logger +from opentelemetry.instrumentation.botocore.extensions.types import ( + _AwsSdkCallContext, + _AwsSdkExtension, +) +from opentelemetry.instrumentation.botocore.utils import _safe_invoke +from opentelemetry.instrumentation.botocore.version import __version__ +from opentelemetry.metrics import get_meter +from opentelemetry.trace import get_tracer + +if TYPE_CHECKING: + from opentelemetry._logs import Logger, LoggerProvider + from opentelemetry.metrics import Instrument, Meter, MeterProvider + from opentelemetry.trace import Tracer, TracerProvider + + +_logger = logging.getLogger(__name__) + + +class ExtensionRegistry: + """ + Registry for AWS SDK extensions that manages extension lookup and + associated OpenTelemetry instrumentation components (tracers, loggers, meters, metrics). + """ + + def __init__( + self, + package_name: str, + extensions: Mapping[str, Callable[[], type[_AwsSdkExtension]]], + tracer_provider: Optional[TracerProvider] = None, + logger_provider: Optional[LoggerProvider] = None, + meter_provider: Optional[MeterProvider] = None, + ): + self._package_name = package_name + self._extensions: Mapping[ + str, Callable[[], type[_AwsSdkExtension]] + ] = extensions + self._tracer_provider: TracerProvider = tracer_provider + self._logger_provider: LoggerProvider = logger_provider + self._meter_provider: MeterProvider = meter_provider + self._tracers: dict[str, Tracer] = {} + self._loggers: dict[str, Logger] = {} + self._meters: dict[str, Meter] = {} + self._metrics: dict[str, dict[str, Instrument]] = {} + + def get_extension( + self, call_context: _AwsSdkCallContext + ) -> _AwsSdkExtension: + """ + Get the appropriate extension for a given call context. + + Args: + call_context: The AWS SDK call context + + Returns: + The matching extension for the service/operation + """ + try: + loader: Callable[[], type[_AwsSdkExtension]] = ( + self._extensions.get(call_context.service) + ) + if loader is None: + return _AwsSdkExtension(call_context) + extension_cls = loader() + return extension_cls(call_context) + except Exception as exc: # pylint: disable=broad-except + _logger.error("Error when loading extension: %s", exc) + return _AwsSdkExtension(call_context) + + def has_extension(self, call_context: _AwsSdkCallContext) -> bool: + """ + Check if a dedicated extension exists for the given call context. + + Args: + call_context: The AWS SDK call context + + Returns: + True if a service-specific extension exists, False otherwise + """ + return call_context.service in self._extensions + + def get_instrumentation_name(self, extension: _AwsSdkExtension) -> str: + """ + Get the instrumentation name for an extension. + + Service-specific extensions get a namespaced name (e.g., 'module.s3'), + while the default extension uses just the module name. + + Args: + extension: The AWS SDK extension + + Returns: + The instrumentation name string + """ + if self.has_extension(extension._call_context): + return f"{self._package_name}.{extension._call_context.service}" + return self._package_name + + def get_tracer(self, extension: _AwsSdkExtension) -> Tracer: + """ + Get or create a tracer for the given extension. + + Tracers are cached per instrumentation name for reuse. + + Args: + extension: The AWS SDK extension + + Returns: + A configured Tracer instance + """ + instrumentation_name: str = self.get_instrumentation_name(extension) + if instrumentation_name in self._tracers: + return self._tracers[instrumentation_name] + + schema_version: str = extension.tracer_schema_version() + tracer: Tracer = get_tracer( + instrumentation_name, + __version__, + self._tracer_provider, + schema_url=f"https://opentelemetry.io/schemas/{schema_version}", + ) + self._tracers[instrumentation_name] = tracer + return tracer + + def get_logger(self, extension: _AwsSdkExtension): + """ + Get or create a logger for the given extension. + + Loggers are cached per instrumentation name for reuse. + + Args: + extension: The AWS SDK extension + + Returns: + A configured Logger instance + """ + instrumentation_name: str = self.get_instrumentation_name(extension) + if instrumentation_name in self._loggers: + return self._loggers[instrumentation_name] + + schema_version: str = extension.event_logger_schema_version() + logger: Logger = get_logger( + instrumentation_name, + "", + schema_url=f"https://opentelemetry.io/schemas/{schema_version}", + logger_provider=self._logger_provider, + ) + self._loggers[instrumentation_name] = logger + return logger + + def get_meter(self, extension: _AwsSdkExtension) -> Meter: + """ + Get or create a meter for the given extension. + + Meters are cached per instrumentation name for reuse. + + Args: + extension: The AWS SDK extension + + Returns: + A configured Meter instance + """ + instrumentation_name: str = self.get_instrumentation_name(extension) + if instrumentation_name in self._meters: + return self._meters[instrumentation_name] + + schema_version: str = extension.meter_schema_version() + meter = get_meter( + instrumentation_name, + "", + schema_url=f"https://opentelemetry.io/schemas/{schema_version}", + meter_provider=self._meter_provider, + ) + self._meters[instrumentation_name] = meter + return meter + + def get_metrics( + self, extension: _AwsSdkExtension + ) -> dict[str, Instrument]: + """ + Get or create metrics for the given extension. + + Metrics are lazily initialized by calling the extension's setup_metrics method. + + Args: + extension: The AWS SDK extension + + Returns: + A dictionary mapping metric names to Instrument instances + """ + instrumentation_name: str = self.get_instrumentation_name(extension) + if instrumentation_name in self._metrics: + return self._metrics[instrumentation_name] + + meter: Meter = self.get_meter(extension) + metrics: dict[str, Instrument] = {} + _safe_invoke(extension.setup_metrics, meter, metrics) + self._metrics[instrumentation_name] = metrics + return metrics diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/package.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/package.py index a06b9c206b..13b44a7b7d 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/package.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/package.py @@ -12,5 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +_instruments_botocore = ("botocore~=1.0",) +_instruments_aiobotocore = ("aiobotocore~=2.0",) -_instruments = ("botocore ~= 1.0",) +_instruments = () +_instruments_any = (*_instruments_botocore, *_instruments_aiobotocore) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py index 4309e6e9bd..29e8d12b88 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/utils.py @@ -13,6 +13,8 @@ # limitations under the License. from __future__ import annotations +import logging +from typing import Callable from urllib.parse import urlparse from opentelemetry.semconv._incubating.attributes import ( @@ -20,6 +22,8 @@ ) from opentelemetry.util.types import AttributeValue +_logger = logging.getLogger(__name__) + def get_server_attributes(endpoint_url: str) -> dict[str, AttributeValue]: """Extract server.* attributes from AWS endpoint URL.""" @@ -29,3 +33,14 @@ def get_server_attributes(endpoint_url: str) -> dict[str, AttributeValue]: attributes[ServerAttributes.SERVER_ADDRESS] = parsed.hostname attributes[ServerAttributes.SERVER_PORT] = parsed.port or 443 return attributes + + +def _safe_invoke(function: Callable, *args): + function_name = "" + try: + function_name = function.__name__ + function(*args) + except Exception as ex: # pylint:disable=broad-except + _logger.error( + "Error when invoking function '%s'", function_name, exc_info=ex + ) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-0.txt b/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-0.txt index f6243241bd..69dc82bdfa 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-0.txt +++ b/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-0.txt @@ -1,7 +1,8 @@ asgiref==3.8.1 aws-xray-sdk==2.12.1 -boto3==1.28.80 -botocore==1.31.80 +boto3==1.29.4 +botocore==1.32.4 +aiobotocore==2.8.0 certifi==2024.7.4 cffi==1.17.0 charset-normalizer==3.3.2 diff --git a/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-1.txt b/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-1.txt index 5c7cb24a0c..518da691cc 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-1.txt +++ b/instrumentation/opentelemetry-instrumentation-botocore/test-requirements-1.txt @@ -1,7 +1,8 @@ asgiref==3.8.1 aws-xray-sdk==2.12.1 -boto3==1.35.56 -botocore==1.35.56 +boto3==1.35.16 +botocore==1.35.16 +aiobotocore==2.15.0 certifi==2024.7.4 cffi==1.17.0 charset-normalizer==3.3.2 diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py new file mode 100644 index 0000000000..970c845270 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_aiobotocore_instrumentation.py @@ -0,0 +1,408 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=too-many-public-methods +import asyncio +import json +from typing import Any +from unittest.mock import Mock, patch + +import aiobotocore.session +import botocore.stub +from botocore.exceptions import ClientError + +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.botocore import AiobotocoreInstrumentor +from opentelemetry.instrumentation.utils import suppress_instrumentation +from opentelemetry.semconv._incubating.attributes.cloud_attributes import ( + CLOUD_REGION, +) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase + +_REQUEST_ID_REGEX_MATCH = r"[A-Za-z0-9]{52}" + + +class TestAiobotocoreInstrumentor(TestBase): + """Aiobotocore integration testsuite""" + + def setUp(self): + super().setUp() + AiobotocoreInstrumentor().instrument() + self.session = aiobotocore.session.get_session() + self.session.set_credentials( + access_key="access-key", + secret_key="secret-key", + ) + self.region = "us-west-2" + + def tearDown(self): + super().tearDown() + AiobotocoreInstrumentor().uninstrument() + + def _default_span_attributes(self, service: str, operation: str): + return { + SpanAttributes.RPC_SYSTEM: "aws-api", + SpanAttributes.RPC_SERVICE: service, + SpanAttributes.RPC_METHOD: operation, + CLOUD_REGION: self.region, + "retry_attempts": 0, + SpanAttributes.HTTP_STATUS_CODE: 200, + SpanAttributes.SERVER_ADDRESS: f"{service.lower()}.{self.region}.amazonaws.com", + SpanAttributes.SERVER_PORT: 443, + } + + def assert_only_span(self): + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + return spans[0] + + def assert_span( + self, + service: str, + operation: str, + request_id=None, + attributes=None, + ): + span = self.assert_only_span() + expected = self._default_span_attributes(service, operation) + if attributes: + expected.update(attributes) + + span_attributes_request_id = "aws.request_id" + if request_id is _REQUEST_ID_REGEX_MATCH: + actual_request_id = span.attributes[span_attributes_request_id] + self.assertRegex(actual_request_id, _REQUEST_ID_REGEX_MATCH) + expected[span_attributes_request_id] = actual_request_id + elif request_id is not None: + expected[span_attributes_request_id] = request_id + + self.assertSpanHasAttributes(span, expected) + self.assertEqual(f"{service}.{operation}", span.name) + return span + + def _make_client(self, service: str): + return self.session.create_client(service, region_name=self.region) + + @staticmethod + def _make_response_meta(request_id: str) -> dict[str, Any]: + return { + "RequestId": request_id, + "RetryAttempts": 0, + "HTTPStatusCode": 200, + } + + def test_traced_client_ec2(self): + """Test basic EC2 client tracing with stubbed response.""" + request_id = "fdcdcab1-ae5c-489e-9c33-4637c5dda355" + + async def _test(): + async with self._make_client("ec2") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "describe_instances", + { + "Reservations": [], + "ResponseMetadata": self._make_response_meta( + request_id + ), + }, + ) + await client.describe_instances() + + asyncio.run(_test()) + self.assert_span("EC2", "DescribeInstances", request_id=request_id) + + def test_traced_client_s3(self): + """Test S3 client tracing with stubbed response.""" + request_id = "s3-request-id-12345" + + async def _test(): + async with self._make_client("s3") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "list_buckets", + { + "Buckets": [], + "Owner": {"ID": "owner-id"}, + "ResponseMetadata": self._make_response_meta( + request_id + ), + }, + ) + await client.list_buckets() + + asyncio.run(_test()) + self.assert_span("S3", "ListBuckets", request_id=request_id) + + def test_no_op_tracer_provider(self): + """Test that no spans are created when using NoOpTracerProvider.""" + AiobotocoreInstrumentor().uninstrument() + AiobotocoreInstrumentor().instrument( + tracer_provider=trace_api.NoOpTracerProvider() + ) + + async def _test(): + async with self._make_client("ec2") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "describe_instances", + { + "Reservations": [], + "ResponseMetadata": self._make_response_meta( + "test-id" + ), + }, + ) + await client.describe_instances() + + asyncio.run(_test()) + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + def test_not_recording(self): + """Test behavior when span is not recording.""" + mock_tracer = Mock() + mock_span = Mock() + mock_span.is_recording.return_value = False + mock_tracer.start_span.return_value = mock_span + + async def _test(): + with patch("opentelemetry.trace.get_tracer") as tracer: + tracer.return_value = mock_tracer + async with self._make_client("ec2") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "describe_instances", + { + "Reservations": [], + "ResponseMetadata": self._make_response_meta( + "test-id" + ), + }, + ) + await client.describe_instances() + self.assertFalse(mock_span.is_recording()) + self.assertTrue(mock_span.is_recording.called) + self.assertFalse(mock_span.set_attribute.called) + self.assertFalse(mock_span.set_status.called) + + asyncio.run(_test()) + + def test_client_error(self): + """Test that ClientError is properly traced with error status.""" + + async def _test(): + async with self._make_client("s3") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_client_error( + "get_object", + service_error_code="NoSuchKey", + service_message="The specified key does not exist.", + http_status_code=404, + ) + with self.assertRaises(ClientError): + await client.get_object( + Bucket="test-bucket", Key="test-key" + ) + + asyncio.run(_test()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + span = spans[0] + + self.assertIs(span.status.status_code, trace_api.StatusCode.ERROR) + self.assertEqual("S3.GetObject", span.name) + + # Verify exception event was recorded + self.assertEqual(1, len(span.events)) + event = span.events[0] + self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes) + self.assertIn(SpanAttributes.EXCEPTION_TYPE, event.attributes) + self.assertIn(SpanAttributes.EXCEPTION_MESSAGE, event.attributes) + + def test_suppress_instrumentation(self): + """Test that instrumentation can be suppressed.""" + + async def _test(): + async with self._make_client("ec2") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "describe_instances", + { + "Reservations": [], + "ResponseMetadata": self._make_response_meta( + "test-id" + ), + }, + ) + stubber.add_response( + "describe_instances", + { + "Reservations": [], + "ResponseMetadata": self._make_response_meta( + "test-id-2" + ), + }, + ) + with suppress_instrumentation(): + await client.describe_instances() + await client.describe_instances() + + asyncio.run(_test()) + self.assertEqual(0, len(self.get_finished_spans())) + + def test_request_hook(self): + """Test that request hook is called with correct parameters.""" + request_hook_service_attribute_name = "request_hook.service_name" + request_hook_operation_attribute_name = "request_hook.operation_name" + request_hook_api_params_attribute_name = "request_hook.api_params" + + def request_hook(span, service_name, operation_name, api_params): + hook_attributes = { + request_hook_service_attribute_name: service_name, + request_hook_operation_attribute_name: operation_name, + request_hook_api_params_attribute_name: json.dumps(api_params), + } + span.set_attributes(hook_attributes) + + AiobotocoreInstrumentor().uninstrument() + AiobotocoreInstrumentor().instrument(request_hook=request_hook) + + request_id = "hook-test-request-id" + + async def _test(): + async with self._make_client("s3") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "list_objects_v2", + { + "Contents": [], + "ResponseMetadata": self._make_response_meta( + request_id + ), + }, + expected_params={"Bucket": "test-bucket"}, + ) + await client.list_objects_v2(Bucket="test-bucket") + + asyncio.run(_test()) + + self.assert_span( + "S3", + "ListObjectsV2", + request_id=request_id, + attributes={ + request_hook_service_attribute_name: "s3", + request_hook_operation_attribute_name: "ListObjectsV2", + request_hook_api_params_attribute_name: json.dumps( + {"Bucket": "test-bucket"} + ), + }, + ) + + def test_response_hook(self): + """Test that response hook is called with correct parameters.""" + response_hook_service_attribute_name = "response_hook.service_name" + response_hook_operation_attribute_name = "response_hook.operation_name" + response_hook_bucket_count_attribute_name = ( + "response_hook.bucket_count" + ) + + def response_hook(span, service_name, operation_name, result): + hook_attributes = { + response_hook_service_attribute_name: service_name, + response_hook_operation_attribute_name: operation_name, + response_hook_bucket_count_attribute_name: len( + result["Buckets"] + ), + } + span.set_attributes(hook_attributes) + + AiobotocoreInstrumentor().uninstrument() + AiobotocoreInstrumentor().instrument(response_hook=response_hook) + + request_id = "response-hook-test-id" + + async def _test(): + async with self._make_client("s3") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "list_buckets", + { + "Buckets": [ + {"Name": "bucket1"}, + {"Name": "bucket2"}, + ], + "Owner": {"ID": "owner-id"}, + "ResponseMetadata": self._make_response_meta( + request_id + ), + }, + ) + await client.list_buckets() + + asyncio.run(_test()) + + self.assert_span( + "S3", + "ListBuckets", + request_id=request_id, + attributes={ + response_hook_service_attribute_name: "s3", + response_hook_operation_attribute_name: "ListBuckets", + response_hook_bucket_count_attribute_name: 2, + }, + ) + + def test_multiple_operations(self): + """Test tracing multiple sequential operations.""" + + async def _test(): + async with self._make_client("s3") as client: + with botocore.stub.Stubber(client) as stubber: + stubber.add_response( + "list_buckets", + { + "Buckets": [], + "Owner": {"ID": "owner-id"}, + "ResponseMetadata": self._make_response_meta( + "req-1" + ), + }, + ) + stubber.add_response( + "list_buckets", + { + "Buckets": [], + "Owner": {"ID": "owner-id"}, + "ResponseMetadata": self._make_response_meta( + "req-2" + ), + }, + ) + await client.list_buckets() + await client.list_buckets() + + asyncio.run(_test()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(2, len(spans)) + + for span in spans: + self.assertEqual("S3.ListBuckets", span.name) + + self.assertEqual("req-1", spans[0].attributes["aws.request_id"]) + self.assertEqual("req-2", spans[1].attributes["aws.request_id"]) diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index 2219a02372..218c867deb 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -68,6 +68,10 @@ "library": "botocore ~= 1.0", "instrumentation": "opentelemetry-instrumentation-botocore==0.61b0.dev", }, + { + "library": "aiobotocore ~= 2.0", + "instrumentation": "opentelemetry-instrumentation-botocore==0.61b0.dev", + }, { "library": "cassandra-driver ~= 3.25", "instrumentation": "opentelemetry-instrumentation-cassandra==0.61b0.dev", diff --git a/uv.lock b/uv.lock index bde58946cc..2042c46a73 100644 --- a/uv.lock +++ b/uv.lock @@ -95,6 +95,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/71/cf/efa5581760bd08263bce8dbf943f32006b6dfd5bc120f43a26257281b546/aio_pika-9.5.5-py3-none-any.whl", hash = "sha256:94e0ac3666398d6a28b0c3b530c1febf4c6d4ececb345620727cfd7bfe1c02e0", size = 54257 }, ] +[[package]] +name = "aiobotocore" +version = "2.23.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiohttp" }, + { name = "aioitertools" }, + { name = "botocore" }, + { name = "jmespath" }, + { name = "multidict" }, + { name = "python-dateutil" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f6/1d/babe191fa10a7ecda6c6832c08231536c60cc33b4cddfb3b72133505673e/aiobotocore-2.23.1.tar.gz", hash = "sha256:a59f2a78629b97d52f10936b79c73de64e481a8c44a62c1871f088df6c1afc4f", size = 115869 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/92/d9/25a697a959a7149c93efa4d849421aa5f22bcb82350ac89b4284b0b88aa8/aiobotocore-2.23.1-py3-none-any.whl", hash = "sha256:d81c54d2eae2406ea9a473fea518fed580cf37bc4fc51ce43ba81546e5305114", size = 84219 }, +] + [[package]] name = "aiohappyeyeballs" version = "2.6.1" @@ -202,6 +220,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/4f/ca/3f44aabf63be958ee8ee0cb4c7ad24ea58cc73b0a73919bac9a0b4b92410/aiohttp-3.11.18-cp39-cp39-win_amd64.whl", hash = "sha256:5e7007b8d1d09bce37b54111f593d173691c530b80f27c6493b928dabed9e6ef", size = 443178 }, ] +[[package]] +name = "aioitertools" +version = "0.13.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions", marker = "python_full_version < '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/fd/3c/53c4a17a05fb9ea2313ee1777ff53f5e001aefd5cc85aa2f4c2d982e1e38/aioitertools-0.13.0.tar.gz", hash = "sha256:620bd241acc0bbb9ec819f1ab215866871b4bbd1f73836a55f799200ee86950c", size = 19322 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/10/a1/510b0a7fadc6f43a6ce50152e69dbd86415240835868bb0bd9b5b88b1e06/aioitertools-0.13.0-py3-none-any.whl", hash = "sha256:0be0292b856f08dfac90e31f4739432f4cb6d7520ab9eb73e143f4f2fa5259be", size = 24182 }, +] + [[package]] name = "aiokafka" version = "0.12.0" @@ -500,7 +530,7 @@ wheels = [ [[package]] name = "botocore" -version = "1.38.21" +version = "1.38.46" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "jmespath" }, @@ -508,9 +538,9 @@ dependencies = [ { name = "urllib3", version = "1.26.20", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, { name = "urllib3", version = "2.4.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/05/4a/89f2beab6757c900b15aa301227c9447feff7d327ff0595a2b74406a388c/botocore-1.38.21.tar.gz", hash = "sha256:08d5e9c00e5cc9e0ae0e60570846011789dc7f1d4ea094b3f3e3f3ae1ff2063a", size = 13904318 } +sdist = { url = "https://files.pythonhosted.org/packages/cf/5f/d76870e4399fbfc12aa5c3bb36029edfc1a434392afc70a343c9d7d96e90/botocore-1.38.46.tar.gz", hash = "sha256:8798e5a418c27cf93195b077153644aea44cb171fcd56edc1ecebaa1e49e226e", size = 14074340 } wheels = [ - { url = "https://files.pythonhosted.org/packages/ae/bf/8e943894e0c9f898db63c6af4c590c153dff680bd02536777b0a543e94e5/botocore-1.38.21-py3-none-any.whl", hash = "sha256:567b4d338114174d0b41857002a4b1e8efb68f1654ed9f3ec6c34ebdef5e9eaf", size = 13564842 }, + { url = "https://files.pythonhosted.org/packages/a4/00/dd90b7a0255587ba1c9754d32a221adb4a9022f181df3eef401b0b9fadfc/botocore-1.38.46-py3-none-any.whl", hash = "sha256:89ca782ffbf2e8769ca9c89234cfa5ca577f1987d07d913ee3c68c4776b1eb5b", size = 13736872 }, ] [[package]] @@ -2981,19 +3011,21 @@ dependencies = [ ] [package.optional-dependencies] -instruments = [ +instruments-any = [ + { name = "aiobotocore" }, { name = "botocore" }, ] [package.metadata] requires-dist = [ - { name = "botocore", marker = "extra == 'instruments'", specifier = "~=1.0" }, + { name = "aiobotocore", marker = "extra == 'instruments-any'", specifier = "~=2.0" }, + { name = "botocore", marker = "extra == 'instruments-any'", specifier = "~=1.0" }, { name = "opentelemetry-api", git = "https://github.com/open-telemetry/opentelemetry-python?subdirectory=opentelemetry-api&branch=main" }, { name = "opentelemetry-instrumentation", editable = "opentelemetry-instrumentation" }, { name = "opentelemetry-propagator-aws-xray", editable = "propagator/opentelemetry-propagator-aws-xray" }, { name = "opentelemetry-semantic-conventions", git = "https://github.com/open-telemetry/opentelemetry-python?subdirectory=opentelemetry-semantic-conventions&branch=main" }, ] -provides-extras = ["instruments"] +provides-extras = ["instruments", "instruments-any"] [[package]] name = "opentelemetry-instrumentation-cassandra" @@ -4100,7 +4132,7 @@ dependencies = [ { name = "opentelemetry-instrumentation-aws-lambda" }, { name = "opentelemetry-instrumentation-boto", extra = ["instruments"] }, { name = "opentelemetry-instrumentation-boto3sqs", extra = ["instruments"] }, - { name = "opentelemetry-instrumentation-botocore", extra = ["instruments"] }, + { name = "opentelemetry-instrumentation-botocore" }, { name = "opentelemetry-instrumentation-cassandra", extra = ["instruments"] }, { name = "opentelemetry-instrumentation-celery", extra = ["instruments"] }, { name = "opentelemetry-instrumentation-click", extra = ["instruments"] },