diff --git a/src/lumigo_opentelemetry/instrumentations/botocore/__init__.py b/src/lumigo_opentelemetry/instrumentations/botocore/__init__.py index daa00fc8..4030386d 100644 --- a/src/lumigo_opentelemetry/instrumentations/botocore/__init__.py +++ b/src/lumigo_opentelemetry/instrumentations/botocore/__init__.py @@ -1,5 +1,13 @@ +from typing import Dict, Any + +from lumigo_opentelemetry.libs.general_utils import lumigo_safe_execute +from lumigo_opentelemetry.resources.span_processor import set_span_skip_export +from opentelemetry.trace import Span, SpanKind + from lumigo_opentelemetry.instrumentations import AbstractInstrumentor from lumigo_opentelemetry.instrumentations.botocore.parsers import AwsParser +from lumigo_opentelemetry.libs.sampling import should_sample +from lumigo_opentelemetry.utils.span_utils import safe_get_span_attributes class BotoCoreInstrumentorWrapper(AbstractInstrumentor): @@ -21,8 +29,19 @@ def install_instrumentation(self) -> None: BotocoreInstrumentor().instrument( request_hook=AwsParser.request_hook, - response_hook=AwsParser.response_hook, + response_hook=filtered_resource_hook, ) +def filtered_resource_hook( + span: Span, service_name: str, operation_name: str, result: Dict[Any, Any] +) -> None: + AwsParser.response_hook(span, service_name, operation_name, result) + with lumigo_safe_execute("aws: response_hook skip check"): + span_attributes = safe_get_span_attributes(span) + if span_attributes: + if not should_sample(span_attributes, SpanKind.CLIENT): + set_span_skip_export(span) + + instrumentor: AbstractInstrumentor = BotoCoreInstrumentorWrapper() diff --git a/src/lumigo_opentelemetry/instrumentations/botocore/parsers/__init__.py b/src/lumigo_opentelemetry/instrumentations/botocore/parsers/__init__.py index dd33ad4e..9575a409 100644 --- a/src/lumigo_opentelemetry/instrumentations/botocore/parsers/__init__.py +++ b/src/lumigo_opentelemetry/instrumentations/botocore/parsers/__init__.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import logging from typing import Any, Dict, Optional, Type from lumigo_core.triggers.event_trigger import parse_triggers @@ -11,6 +12,7 @@ from lumigo_opentelemetry.libs.general_utils import ( get_boolean_env_var, lumigo_safe_execute, + lumigo_safe_wrapper, ) from lumigo_opentelemetry.libs.json_utils import dump_with_context from lumigo_opentelemetry.resources.span_processor import set_span_skip_export @@ -19,6 +21,8 @@ get_resource_fullname, ) +from lumigo_opentelemetry.utils.span_utils import safe_get_span_attribute + class AwsParser: @staticmethod @@ -33,20 +37,169 @@ def get_parser( } return parsers.get(service_name, AwsParser) - @staticmethod - def parse_request( - span: Span, service_name: str, operation_name: str, api_params: Dict[Any, Any] - ) -> None: + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_region( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[str, Any]] = None, + ) -> Optional[str]: + if span: + region_from_attrs: Optional[str] = safe_get_span_attribute( + span=span, attribute_name="aws.region" + ) + if region_from_attrs: + return region_from_attrs + + # Try getting the region from the ARN + arn = cls.safe_extract_arn( + span=span, + service_name=service_name, + operation_name=operation_name, + api_params=api_params, + ) + region_from_arn: Optional[str] = ( + extract_region_from_arn(arn=arn) if arn else None + ) + if region_from_arn: + return region_from_arn + + return None + + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_url( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[str, Any]] = None, + ) -> Optional[str]: + region = cls.safe_extract_region( + span=span, + service_name=service_name, + operation_name=operation_name, + api_params=api_params, + ) + if region and service_name: + return f"https://{service_name.lower()}.{region}.amazonaws.com" + + return None + + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_arn( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[str, Any]] = None, + ) -> Optional[str]: + # Child classes need to implement this, no generic implementation is possible + return None + + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_resource_name( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[str, Any]] = None, + ) -> Optional[str]: + arn = cls.safe_extract_arn( + span=span, + service_name=service_name, + operation_name=operation_name, + api_params=api_params, + ) + return get_resource_fullname(arn) if arn else None + + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_http_request_body( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[str, Any]] = None, + ) -> Optional[str]: + return dump_with_context("requestBody", api_params) # type: ignore + + @classmethod + def _get_request_additional_attributes( + cls, + span: Span, + service_name: str, + operation_name: str, + api_params: Dict[str, Any], + ) -> Dict[str, Any]: attributes = { - "http.request.body": dump_with_context("requestBody", api_params), "aws.service": service_name, "http.method": operation_name, } + request_body = cls.safe_extract_http_request_body( + span=span, + service_name=service_name, + operation_name=operation_name, + api_params=api_params, + ) + if request_body: + attributes["http.request.body"] = request_body + + region = AwsParser.safe_extract_region( + span=span, + service_name=service_name, + operation_name=operation_name, + api_params=api_params, + ) + if region: + attributes["region"] = region + url = AwsParser.safe_extract_url( + span=span, + service_name=service_name, + operation_name=operation_name, + api_params=api_params, + ) + if url: + attributes["http.url"] = url + + resource_name = cls.safe_extract_resource_name( + span=span, + service_name=service_name, + operation_name=operation_name, + api_params=api_params, + ) + if resource_name: + attributes["aws.resource.name"] = resource_name + + return attributes + + @classmethod + def parse_request( + cls, + span: Span, + service_name: str, + operation_name: str, + api_params: Dict[str, Any], + ) -> None: + attributes = cls._get_request_additional_attributes( + span=span, + service_name=service_name, + operation_name=operation_name, + api_params=api_params, + ) span.set_attributes(attributes) - @staticmethod + @classmethod def request_hook( - span: Span, service_name: str, operation_name: str, api_params: Dict[Any, Any] + cls, + span: Span, + service_name: str, + operation_name: str, + api_params: Dict[str, Any], ) -> None: with lumigo_safe_execute("aws: request_hook"): parser = AwsParser.get_parser(service_name=service_name) @@ -57,9 +210,9 @@ def request_hook( api_params=api_params, ) - @staticmethod + @classmethod def parse_response( - span: Span, service_name: str, operation_name: str, result: Dict[Any, Any] + cls, span: Span, service_name: str, operation_name: str, result: Dict[Any, Any] ) -> None: headers = result.get("ResponseMetadata", {}).get("HTTPHeaders", {}) attributes = { @@ -74,9 +227,9 @@ def parse_response( } span.set_attributes(attributes) - @staticmethod + @classmethod def response_hook( - span: Span, service_name: str, operation_name: str, result: Dict[Any, Any] + cls, span: Span, service_name: str, operation_name: str, result: Dict[Any, Any] ) -> None: with lumigo_safe_execute("aws: response_hook"): parser = AwsParser.get_parser(service_name=service_name) @@ -89,28 +242,33 @@ def response_hook( class SnsParser(AwsParser): - @staticmethod - def safe_extract_arn(api_params: Dict[Any, Any]) -> Optional[str]: - return api_params.get("TargetArn") - - @staticmethod - def parse_request( - span: Span, service_name: str, operation_name: str, api_params: Dict[Any, Any] - ) -> None: - arn = SnsParser.safe_extract_arn(api_params=api_params) - region = extract_region_from_arn(arn=arn) if arn else None + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_arn( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[Any, Any]] = None, + ) -> Optional[str]: + return api_params.get("TargetArn") if api_params else None - attributes = { - "http.request.body": dump_with_context( + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_http_request_body( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[Any, Any]] = None, + ) -> Optional[str]: + return ( + dump_with_context( "requestBody", api_params.get("Message", api_params or {}) - ), - "aws.service": service_name, - "region": region or "", - "http.method": operation_name, - "http.url": f"https://{service_name.lower()}.{region}.amazonaws.com", - "aws.resource.name": get_resource_fullname(arn) if arn else "", - } - span.set_attributes(attributes) + ) + if api_params + else None + ) class SqsParser(AwsParser): @@ -118,26 +276,42 @@ class SqsParser(AwsParser): def extract_queue_name_from_url(queue_url: str) -> str: return queue_url.split("/")[-1] - @staticmethod - def parse_request( - span: Span, service_name: str, operation_name: str, api_params: Dict[Any, Any] - ) -> None: - queue_url = api_params.get("QueueUrl") - resource_name = ( - SqsParser.extract_queue_name_from_url(queue_url=queue_url) - if queue_url + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_http_request_body( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[Any, Any]] = None, + ) -> Optional[str]: + return ( + dump_with_context( + "requestBody", api_params.get("MessageBody", api_params or {}) + ) + if api_params else None ) - attributes = { - "http.request.body": dump_with_context( - "requestBody", api_params.get("MessageBody", api_params or {}) - ), - "aws.service": service_name, - "http.method": operation_name, - "http.url": queue_url or "", - "aws.resource.name": resource_name or "", - } - span.set_attributes(attributes) + + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_resource_name( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[Any, Any]] = None, + ) -> Optional[str]: + queue_url = cls.safe_extract_url( + span=span, + service_name=service_name, + operation_name=operation_name, + api_params=api_params, + ) + resource_name = ( + cls.extract_queue_name_from_url(queue_url=queue_url) if queue_url else None + ) + return resource_name if resource_name else None @staticmethod def _should_skip_empty_sqs_polling_response( @@ -155,9 +329,9 @@ def _should_skip_empty_sqs_polling_response( and get_boolean_env_var(AUTO_FILTER_EMPTY_SQS, True) ) - @staticmethod + @classmethod def parse_response( - span: Span, service_name: str, operation_name: str, result: Dict[Any, Any] + cls, span: Span, service_name: str, operation_name: str, result: Dict[Any, Any] ) -> None: trigger_details = parse_triggers( {"service_name": service_name, "operation_name": operation_name, **result} @@ -168,7 +342,7 @@ def parse_response( ) # Filter out sqs polls with empty response - if SqsParser._should_skip_empty_sqs_polling_response(operation_name, result): + if cls._should_skip_empty_sqs_polling_response(operation_name, result): logger.debug( "Not tracing empty SQS polling requests " f"(override by setting the {AUTO_FILTER_EMPTY_SQS} env var to false)" @@ -177,32 +351,68 @@ def parse_response( class LambdaParser(AwsParser): - @staticmethod - def parse_request( - span: Span, service_name: str, operation_name: str, api_params: Dict[Any, Any] - ) -> None: - resource_name = api_params.get("FunctionName") - attributes = { - "http.request.body": dump_with_context( + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_http_request_body( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[Any, Any]] = None, + ) -> Optional[str]: + return ( + dump_with_context( "requestBody", api_params.get("Payload", api_params or {}) - ), - "aws.resource.name": resource_name or "", - "aws.service": service_name, - "http.method": operation_name, - } - span.set_attributes(attributes) + ) + if api_params + else None + ) + + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_resource_name( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[str, str]] = None, + ) -> Optional[str]: + if api_params: + resource_name = api_params.get("FunctionName") + return resource_name if resource_name else None + + return None class DynamoParser(AwsParser): - @staticmethod - def parse_request( - span: Span, service_name: str, operation_name: str, api_params: Dict[Any, Any] - ) -> None: - attributes = { - "http.request.body": dump_with_context("requestBody", api_params), - "aws.service": service_name, - "http.method": operation_name, - "aws.resource.name": api_params.get("TableName", ""), - "aws.dynamodb.method": operation_name, - } - span.set_attributes(attributes) + @classmethod + @lumigo_safe_wrapper(level=logging.DEBUG) + def safe_extract_resource_name( + cls, + span: Optional[Span] = None, + service_name: Optional[str] = None, + operation_name: Optional[str] = None, + api_params: Optional[Dict[str, Any]] = None, + ) -> Optional[str]: + if api_params: + resource_name: Optional[str] = api_params.get("TableName") + return resource_name if resource_name else None + + return None + + @classmethod + def _get_request_additional_attributes( + cls, + span: Span, + service_name: str, + operation_name: str, + api_params: Dict[Any, Any], + ) -> Dict[str, Any]: + attributes = super()._get_request_additional_attributes( + span=span, + service_name=service_name, + operation_name=operation_name, + api_params=api_params, + ) + attributes["aws.dynamodb.method"] = operation_name + return attributes diff --git a/src/lumigo_opentelemetry/libs/general_utils.py b/src/lumigo_opentelemetry/libs/general_utils.py index eee35312..c7b511a0 100644 --- a/src/lumigo_opentelemetry/libs/general_utils.py +++ b/src/lumigo_opentelemetry/libs/general_utils.py @@ -1,3 +1,6 @@ +import functools +import logging + try: from collections import Iterable except ImportError: @@ -6,7 +9,7 @@ import os from contextlib import contextmanager -from typing import Generator, List, Optional, TypeVar, Union +from typing import Generator, List, Optional, TypeVar, Union, Any, Callable, cast from opentelemetry.sdk.environment_variables import ( OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, @@ -30,6 +33,39 @@ def lumigo_safe_execute(part_name: str = "") -> Generator[None, None, None]: ) +TCallable = TypeVar("TCallable", bound=Callable) # type: ignore + + +def lumigo_safe_wrapper( + default_return_value: Any = None, level: Optional[int] = logging.ERROR +) -> Callable: # type: ignore + """ + A wrapper to safely execute a function, and return a default value in case of an exception. + @param default_return_value: + @param level: The log level for the error message. If None, no error message will be logged. + @return: The wrapper function + """ + + def decorator(func: TCallable) -> TCallable: + @functools.wraps(func) + def wrapper(*args, **kwargs): # type: ignore + try: + return func(*args, **kwargs) + except Exception as e: + if level is not None: + logger.log( + level=level, + msg=f"An exception occurred in lumigo's code {func.__name__}", + exc_info=e, + ) + + return default_return_value + + return cast(TCallable, wrapper) + + return decorator + + def safe_split_get( string: str, sep: str, index: int, default: Optional[str] = None ) -> Optional[str]: diff --git a/src/lumigo_opentelemetry/libs/sampling.py b/src/lumigo_opentelemetry/libs/sampling.py index ed254a3c..85c6df55 100644 --- a/src/lumigo_opentelemetry/libs/sampling.py +++ b/src/lumigo_opentelemetry/libs/sampling.py @@ -1,7 +1,7 @@ import json import os import re -from typing import Optional, Sequence, List +from typing import Optional, Sequence, List, Mapping, Any from opentelemetry.context import Context from opentelemetry.sdk.trace.sampling import ( @@ -47,35 +47,9 @@ def should_sample( trace_state: "TraceState" = None, ) -> "SamplingResult": decision = Decision.RECORD_AND_SAMPLE - endpoint = _extract_endpoint(attributes, kind) - if endpoint: - if ( - kind == SpanKind.SERVER - and does_endpoint_match_server_filtering_regexes(endpoint) - ): - logger.debug( - f"Dropping trace for endpoint '{endpoint}' because it matches one of the" - f" filter regexes specified by the '{LUMIGO_FILTER_HTTP_ENDPOINTS_REGEX_SERVER}' env var" - ) - decision = Decision.DROP - attributes = None - elif ( - kind == SpanKind.CLIENT - and does_endpoint_match_client_filtering_regexes(endpoint) - ): - logger.debug( - f"Dropping trace for endpoint '{endpoint}' because it matches one of the" - f" filter regexes specified by the '{LUMIGO_FILTER_HTTP_ENDPOINTS_REGEX_CLIENT}' env var" - ) - decision = Decision.DROP - attributes = None - elif does_endpoint_match_filtering_regexes(endpoint): - logger.debug( - f"Dropping trace for endpoint '{endpoint}' because it matches one of the" - f" filter regexes specified by the '{LUMIGO_FILTER_HTTP_ENDPOINTS_REGEX}' env var" - ) - decision = Decision.DROP - attributes = None + if not should_sample(attributes, kind): + decision = Decision.DROP + attributes = None return SamplingResult( decision, @@ -87,6 +61,37 @@ def get_description(self) -> str: return "SkipSampler" +def should_sample(attributes: Mapping[str, Any], span_kind: SpanKind) -> bool: + endpoint = _extract_endpoint(attributes, span_kind) + if endpoint: + if ( + span_kind == SpanKind.SERVER + and does_endpoint_match_server_filtering_regexes(endpoint) + ): + logger.debug( + f"Dropping trace for endpoint '{endpoint}' because it matches one of the" + f" filter regexes specified by the '{LUMIGO_FILTER_HTTP_ENDPOINTS_REGEX_SERVER}' env var" + ) + return False + elif ( + span_kind == SpanKind.CLIENT + and does_endpoint_match_client_filtering_regexes(endpoint) + ): + logger.debug( + f"Dropping trace for endpoint '{endpoint}' because it matches one of the" + f" filter regexes specified by the '{LUMIGO_FILTER_HTTP_ENDPOINTS_REGEX_CLIENT}' env var" + ) + return False + elif does_endpoint_match_filtering_regexes(endpoint): + logger.debug( + f"Dropping trace for endpoint '{endpoint}' because it matches one of the" + f" filter regexes specified by the '{LUMIGO_FILTER_HTTP_ENDPOINTS_REGEX}' env var" + ) + return False + + return True + + _attribute_sampler = AttributeSampler() LUMIGO_SAMPLER = ParentBased( root=_attribute_sampler, diff --git a/src/lumigo_opentelemetry/utils/span_utils.py b/src/lumigo_opentelemetry/utils/span_utils.py new file mode 100644 index 00000000..48290056 --- /dev/null +++ b/src/lumigo_opentelemetry/utils/span_utils.py @@ -0,0 +1,22 @@ +from typing import Optional, Mapping, Any + +from opentelemetry.trace import Span + + +def safe_get_span_attributes(span: Span) -> Optional[Mapping[str, Any]]: + try: + return ( + span.attributes + if hasattr(span, "attributes") and isinstance(span.attributes, Mapping) + else {} + ) + except Exception: + return None + + +def safe_get_span_attribute(span: Span, attribute_name: str) -> Optional[Any]: + try: + attributes = safe_get_span_attributes(span=span) + return attributes.get(attribute_name) if attributes else None + except Exception: + return None diff --git a/src/test/integration/boto3-sqs/requirements_others.txt b/src/test/integration/boto3-sqs/requirements_others.txt index d3513cf9..da6ef04f 100644 --- a/src/test/integration/boto3-sqs/requirements_others.txt +++ b/src/test/integration/boto3-sqs/requirements_others.txt @@ -1,3 +1,3 @@ pytest psutil -moto \ No newline at end of file +moto[sqs]==4.2.14 diff --git a/src/test/unit/libs/test_general_utils.py b/src/test/unit/libs/test_general_utils.py index 5aa5b93d..9ed49b59 100644 --- a/src/test/unit/libs/test_general_utils.py +++ b/src/test/unit/libs/test_general_utils.py @@ -1,6 +1,10 @@ import pytest -from lumigo_opentelemetry.libs.general_utils import get_max_size, get_boolean_env_var +from lumigo_opentelemetry.libs.general_utils import ( + get_max_size, + get_boolean_env_var, + lumigo_safe_wrapper, +) def test_get_max_size_otel_span_attr_limit_is_set(monkeypatch): @@ -48,3 +52,27 @@ def test_get_boolean_env_var(env_var_value, default, expected_result, monkeypatc if env_var_value is not None: monkeypatch.setenv("TEST_VAR", env_var_value) assert get_boolean_env_var("TEST_VAR", default=default) is expected_result + + +def test_lumigo_safe_wrapper_happy_flow(): + @lumigo_safe_wrapper() + def my_func(): + return 1 + + assert my_func() == 1 + + +def test_lumigo_safe_wrapper_exception(): + @lumigo_safe_wrapper() + def my_func(): + raise Exception("test") + + assert my_func() is None + + +def test_lumigo_safe_wrapper_exception_default_value(): + @lumigo_safe_wrapper(default_return_value=1) + def my_func(): + raise Exception("test") + + assert my_func() == 1