Skip to content

Commit

Permalink
RD-11321-otel-distros-python-and-node-filter-out-empty-sqs-polls (#481)
Browse files Browse the repository at this point in the history
* feat: do not export empty sqs polling spans

* test: new span processor and empty sqs span skipping

* test: integration test empty sqs polling span skipping

* docs: update README.md with filter empty sqs messages feature

* docs: detailed possible values for LUMIGO_AUTO_FILTER_EMPTY_SQS
  • Loading branch information
sagivoululumigo authored Sep 11, 2023
1 parent c436f0c commit e21f5ac
Show file tree
Hide file tree
Showing 12 changed files with 356 additions and 5 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The `lumigo_opentelemetry` package additionally supports the following configura
* We support more granular masking using the following parameters. If not given, the above configuration is the fallback: `LUMIGO_SECRET_MASKING_REGEX_HTTP_REQUEST_BODIES`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_REQUEST_HEADERS`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_RESPONSE_BODIES`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_RESPONSE_HEADERS`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_QUERY_PARAMS`, `LUMIGO_SECRET_MASKING_REGEX_ENVIRONMENT`.
* `LUMIGO_SWITCH_OFF=true`: This option disables the Lumigo OpenTelemetry distro entirely; no instrumentation will be injected, no tracing data will be collected.
* `LUMIGO_REPORT_DEPENDENCIES=false`: This option disables the built-in dependency reporting to Lumigo SaaS. For more information, refer to the [Automated dependency reporting](#automated-dependency-reporting) section.
* `LUMIGO_AUTO_FILTER_EMPTY_SQS`: This option enables the automatic filtering of empty SQS messages from being sent to Lumigo SaaS. For more information, refer to the [Filtering out empty SQS messages](#filtering-out-empty-sqs-messages) section.

### Execution Tags

Expand Down Expand Up @@ -340,6 +341,19 @@ for message in response.get("Messages", []):

Without the scope provided by the iterator over `response["Messages"]`, `span_1` would be without a parent span, and that would result in a separate invocation and a separate transaction in Lumigo.

### Filtering out empty SQS messages

A common pattern in SQS-based applications is to continuously poll an SQS queue for messages,
and to process them as they arrive.
In order not to clutter the Lumigo Dashboard with empty SQS polling messages, the default behavior is to filter them
out from being sent to Lumigo.

You can change this behavior by setting the boolean environment variable `LUMIGO_AUTO_FILTER_EMPTY_SQS` to `false`.
The possible variations are:
* `LUMIGO_AUTO_FILTER_EMPTY_SQS=true` filter out empty SQS polling messages
* `LUMIGO_AUTO_FILTER_EMPTY_SQS=false` do not filter out empty SQS polling messages
* No environment variable set (default): filter out empty SQS polling messages

## Testing

We use [nox](https://pypi.org/project/nox/) for setting up and running our tests.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pre-commit==2.20.0
psutil==5.9.1
pytest==7.1.1
pytest-cov==3.0.0
pyyaml==6.0
pyyaml==6.0.1
requests==2.27.1
types-attrs==19.1.0
types-boto==2.49.17
Expand Down
5 changes: 3 additions & 2 deletions src/lumigo_opentelemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def init() -> Dict[str, Any]:
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import SpanLimits, TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from lumigo_opentelemetry.resources.span_processor import LumigoSpanProcessor

DEFAULT_LUMIGO_ENDPOINT = (
"https://ga-otlp.lumigo-tracer-edge.golumigo.com/v1/traces"
Expand Down Expand Up @@ -125,7 +126,7 @@ def init() -> Dict[str, Any]:

if lumigo_token:
tracer_provider.add_span_processor(
BatchSpanProcessor(
LumigoSpanProcessor(
OTLPSpanExporter(
endpoint=lumigo_endpoint,
headers={"Authorization": f"LumigoToken {lumigo_token}"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
extract_region_from_arn,
get_resource_fullname,
)
from lumigo_opentelemetry.resources.span_processor import set_span_skip_export
from lumigo_opentelemetry import logger
from lumigo_opentelemetry.libs.general_utils import get_boolean_env_var
from lumigo_opentelemetry.libs.environment_variables import AUTO_FILTER_EMPTY_SQS


class AwsParser:
Expand Down Expand Up @@ -133,6 +137,22 @@ def parse_request(
}
span.set_attributes(attributes)

@staticmethod
def _should_skip_empty_sqs_polling_response(
operation_name: str, result: Dict[Any, Any]
) -> bool:
"""
checks the sqs response & returns true if the request receive messages from SQS but no messages were returned
"""

no_messages = not result or not result.get("Messages", None)
sqs_poll = operation_name == "ReceiveMessage"
return (
sqs_poll
and no_messages
and get_boolean_env_var(AUTO_FILTER_EMPTY_SQS, True)
)

@staticmethod
def parse_response(
span: Span, service_name: str, operation_name: str, result: Dict[Any, Any]
Expand All @@ -145,6 +165,14 @@ def parse_response(
{"lumigoData": json.dumps({"trigger": trigger_details})}
)

# Filter out sqs polls with empty response
if SqsParser._should_skip_empty_sqs_polling_response(operation_name, result):
logger.info(
"Not tracing empty SQS polling requests "
f"(override by setting the {AUTO_FILTER_EMPTY_SQS} env var to false)"
)
set_span_skip_export(span)


class LambdaParser(AwsParser):
@staticmethod
Expand Down
1 change: 1 addition & 0 deletions src/lumigo_opentelemetry/libs/environment_variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
AUTO_FILTER_EMPTY_SQS = "LUMIGO_AUTO_FILTER_EMPTY_SQS"
27 changes: 27 additions & 0 deletions src/lumigo_opentelemetry/libs/general_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,30 @@ def get_max_size() -> int:
os.environ.get(OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, DEFAULT_MAX_ENTRY_SIZE),
)
)


def get_boolean_env_var(env_var_name: str, default: bool = False) -> bool:
"""
This function return the boolean value of the given environment variable.
If this values doesn't exist, return default.
@param env_var_name: The env var to get (case-sensitive)
@param default: Default value if env var is not set
@return: The boolean value of the env var
"""

env_var_value = os.environ.get(env_var_name, str(default))
env_var_value = (
env_var_value.lower() if isinstance(env_var_value, str) else env_var_value
)

is_truth_value = env_var_value == "true"
is_false_value = env_var_value == "false"
if not is_truth_value and not is_false_value:
logger.debug(
f'Invalid boolean value for env var "{env_var_name}", '
f'defaulting to value "{str(default).lower()}"'
)
return default

return is_truth_value
40 changes: 40 additions & 0 deletions src/lumigo_opentelemetry/resources/span_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from opentelemetry.trace import Span
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from lumigo_opentelemetry import logger


# A span attributes that if is set to True, the span will not be exported
SKIP_EXPORT_SPAN_ATTRIBUTE = "SKIP_EXPORT"


class LumigoSpanProcessor(BatchSpanProcessor):
def on_end(self, span: ReadableSpan) -> None:
if should_skip_exporting_span(span):
logger.debug("Not exporting span because it has NO_EXPORT=True attribute")
return

super().on_end(span)


def should_skip_exporting_span(span: ReadableSpan) -> bool:
"""
Given a span, returns an answer if the span should be exported or not.
@param span: A readable span to check
@return: True if the span should not be exported, False otherwise
"""
return span.attributes.get(SKIP_EXPORT_SPAN_ATTRIBUTE, False) is True


def set_span_skip_export(span: Span, skip_export: bool = True) -> None:
"""
marks the span as a span not intended for export (for example in spans that create a lot of noise and customers
do not want to trace)
@param span: The span to mark (The span is altered in place)
@param skip_export: Should the span be exported or not (default is True, the span will not be exported)
@return:
"""

span.set_attributes({SKIP_EXPORT_SPAN_ATTRIBUTE: skip_export})
5 changes: 5 additions & 0 deletions src/test/integration/boto3-sqs/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ def run():
client = boto3.client("sqs", region_name="eu-central-1")
queue = client.create_queue(QueueName="test")

# Simulate polling an empty sqs queue
client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1)
client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1)
client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1)

client.send_message(
QueueUrl=queue["QueueUrl"],
MessageBody="Message_1",
Expand Down
27 changes: 26 additions & 1 deletion src/test/integration/boto3-sqs/tests/test_boto3_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
class TestBoto3SqsSpans(unittest.TestCase):
def test_boto3_instrumentation(self):
spans_container = SpansContainer.parse_spans_from_file()
self.assertEqual(9, len(spans_container.spans))
self.assertEqual(12, len(spans_container.spans))

[
create_queue_span,
empty_sqs_poll_1_span,
empty_sqs_poll_2_span,
empty_sqs_poll_3_span,
send_message_1_span,
send_message_2_span,
receive_message_1_span,
Expand All @@ -21,6 +24,28 @@ def test_boto3_instrumentation(self):
after_iterator_break_span,
] = spans_container.spans

# Make sure that all the empty polling spans are marked as skipped
for span in [
empty_sqs_poll_1_span,
empty_sqs_poll_2_span,
empty_sqs_poll_3_span,
]:
self.assertIsNotNone(span.get("attributes"))
self.assertEqual(span["attributes"].get("SKIP_EXPORT"), True)

# Make sure that other spans are not marked as skipped
for span in [
create_queue_span,
send_message_1_span,
send_message_2_span,
receive_message_1_span,
receive_message_2_span,
consume_message_2_span,
consume_message_1_span,
receive_message_2_span,
]:
self.assertNotEqual(span.get("attributes", {}).get("SKIP_EXPORT"), True)

self.assertEqual(create_queue_span["name"], "SQS.CreateQueue")
self.assertIsNone(create_queue_span["parent_id"])

Expand Down
108 changes: 108 additions & 0 deletions src/test/unit/instrumentations/botocore/test_parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import logging

import pytest
from unittest.mock import Mock, patch

from lumigo_opentelemetry.instrumentations.botocore.parsers import SqsParser


EMPTY_SQS_RESULT_1 = {}
EMPTY_SQS_RESULT_2 = {"Messages": []}
NON_EMPTY_SQS_RESULT = {"Messages": [{"MessageId": "1234", "Body": "test"}]}


@pytest.mark.parametrize(
"env_var_value, operation, result, should_skip",
[
# Check that empty sqs polls are skipped
("true", "ReceiveMessage", EMPTY_SQS_RESULT_1, True),
("true", "ReceiveMessage", EMPTY_SQS_RESULT_2, True),
# Check that non-empty polls are not skipped
("true", "ReceiveMessage", NON_EMPTY_SQS_RESULT, False),
# Check that other operations are not skipped
("true", "DeleteMessage", EMPTY_SQS_RESULT_1, False),
("true", "DeleteMessageBatch", EMPTY_SQS_RESULT_1, False),
("true", "SendMessage", EMPTY_SQS_RESULT_1, False),
("true", "SendMessageBatch", EMPTY_SQS_RESULT_1, False),
("true", "UnknownOperation", EMPTY_SQS_RESULT_1, False),
("true", None, EMPTY_SQS_RESULT_1, False),
# Check that empty sqs polls are not skipped if the env var is set to false
("false", "ReceiveMessage", EMPTY_SQS_RESULT_1, False),
("false", "ReceiveMessage", EMPTY_SQS_RESULT_2, False),
# Check that non-empty polls are not skipped if the env var is set to false
("false", "ReceiveMessage", NON_EMPTY_SQS_RESULT, False),
# Check that the default behavior is to skip empty sqs polls
(None, "ReceiveMessage", EMPTY_SQS_RESULT_1, True),
(None, "ReceiveMessage", EMPTY_SQS_RESULT_2, True),
("UnsupportedEnvVarValue", "ReceiveMessage", EMPTY_SQS_RESULT_2, True),
],
)
def test_sqs_skip_sqs_response(
env_var_value, operation, result, should_skip, monkeypatch
):
if env_var_value is not None:
monkeypatch.setenv("LUMIGO_AUTO_FILTER_EMPTY_SQS", env_var_value)

assert (
SqsParser._should_skip_empty_sqs_polling_response(operation, result)
== should_skip
)


@patch(
"lumigo_opentelemetry.instrumentations.botocore.parsers.SqsParser._should_skip_empty_sqs_polling_response"
)
def test_parse_sqs_response_skipping_empty_polls_outputs_log(should_skip_mock, caplog):
should_skip_mock.return_value = True
span = Mock(set_attribute=Mock())
service_name = "sqs"
operation_name = "ReceiveMessage"
result = {
"ResponseMetadata": {
"RequestId": "54bf0dab-cfab-5fa5-b284-50d83403c94c",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": "54bf0dab-cfab-5fa5-b284-50d83403c94c",
"date": "Thu, 07 Sep 2023 16:25:12 GMT",
"content-type": "text/xml",
"content-length": "240",
"connection": "keep-alive",
},
"RetryAttempts": 0,
}
}

with caplog.at_level(logging.INFO):
SqsParser.parse_response(span, service_name, operation_name, result)

assert "not tracing empty sqs polling requests" in caplog.text.lower()


@patch(
"lumigo_opentelemetry.instrumentations.botocore.parsers.SqsParser._should_skip_empty_sqs_polling_response"
)
def test_parse_sqs_response_not_skipping_polls_no_output_log(should_skip_mock, caplog):
should_skip_mock.return_value = False
span = Mock(set_attribute=Mock())
service_name = "sqs"
operation_name = "ReceiveMessage"
result = {
"ResponseMetadata": {
"RequestId": "54bf0dab-cfab-5fa5-b284-50d83403c94c",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": "54bf0dab-cfab-5fa5-b284-50d83403c94c",
"date": "Thu, 07 Sep 2023 16:25:12 GMT",
"content-type": "text/xml",
"content-length": "240",
"connection": "keep-alive",
},
"RetryAttempts": 0,
}
}

SqsParser.parse_response(span, service_name, operation_name, result)

assert "not tracing empty sqs polling requests" not in caplog.text.lower()

# Make sure that there is an info log
31 changes: 30 additions & 1 deletion src/test/unit/libs/test_general_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from lumigo_opentelemetry.libs.general_utils import get_max_size
import pytest

from lumigo_opentelemetry.libs.general_utils import get_max_size, get_boolean_env_var


def test_get_max_size_otel_span_attr_limit_is_set(monkeypatch):
Expand All @@ -19,3 +21,30 @@ def test_get_max_size_both_env_vars_are_set(monkeypatch):

def test_get_max_size_get_default_value():
assert get_max_size() == 2048


@pytest.mark.parametrize(
"env_var_value,default,expected_result",
[
# Normal truth values
("true", False, True),
("True", False, True),
("TRUE", False, True),
# Normal false values
("false", True, False),
("False", True, False),
("FALSE", True, False),
# Invalid values, use the default
("RandomValue", False, False),
("RandomValue", True, True),
# Empty values, use the default
(None, False, False),
(None, True, True),
],
)
def test_get_boolean_env_var(env_var_value, default, expected_result, monkeypatch):
"""Try getting a boolean value from env vars, and check all the different options work"""

if env_var_value is not None:
monkeypatch.setenv("TEST_VAR", env_var_value)
assert get_boolean_env_var("TEST_VAR", default=default) is expected_result
Loading

0 comments on commit e21f5ac

Please sign in to comment.