diff --git a/aws/logs_monitoring/customized_log_group.py b/aws/logs_monitoring/customized_log_group.py index ad63a952..475d66e3 100644 --- a/aws/logs_monitoring/customized_log_group.py +++ b/aws/logs_monitoring/customized_log_group.py @@ -26,6 +26,11 @@ def is_lambda_customized_log_group(logstream_name): ) +# For both default and customzied Step Functions log groups, the log_stream starts with "states/" +def is_step_functions_log_group(logstream_name): + return logstream_name.startswith("states/") + + def get_lambda_function_name_from_logstream_name(logstream_name): try: # Not match the pattern for customized Lambda log group diff --git a/aws/logs_monitoring/steps/enums.py b/aws/logs_monitoring/steps/enums.py index 7a25a490..c81655bf 100644 --- a/aws/logs_monitoring/steps/enums.py +++ b/aws/logs_monitoring/steps/enums.py @@ -136,7 +136,6 @@ def __init__(self, string, event_source): RDS = ("/aws/rds", AwsEventSource.RDS) # e.g. sns/us-east-1/123456779121/SnsTopicX SNS = ("sns/", AwsEventSource.SNS) - STEPFUNCTION = ("/aws/vendedlogs/states", AwsEventSource.STEPFUNCTION) TRANSITGATEWAY = ("tgw-attach", AwsEventSource.TRANSITGATEWAY) def __str__(self): diff --git a/aws/logs_monitoring/steps/handlers/awslogs_handler.py b/aws/logs_monitoring/steps/handlers/awslogs_handler.py index a3f46aad..49309889 100644 --- a/aws/logs_monitoring/steps/handlers/awslogs_handler.py +++ b/aws/logs_monitoring/steps/handlers/awslogs_handler.py @@ -13,6 +13,7 @@ ) from customized_log_group import ( is_lambda_customized_log_group, + is_step_functions_log_group, get_lambda_function_name_from_logstream_name, ) from steps.handlers.aws_attributes import AwsAttributes @@ -102,11 +103,14 @@ def set_source(self, event): source = str(AwsEventSource.BEDROCK) self.metadata[DD_SOURCE] = parse_event_source(event, source) - # Special handling for customized log group of Lambda functions - # Multiple Lambda functions can share one single customized log group - # Need to parse logStream name to determine whether it is a Lambda function + # Special handling for customized log group of Lambda Functions and Step Functions + # Multiple functions can share one single customized log group. Need to parse logStream name to determine + # Need to place the handling of customized log group at the bottom so that it can correct the source for some edge cases if is_lambda_customized_log_group(log_stream): self.metadata[DD_SOURCE] = str(AwsEventSource.LAMBDA) + # Regardless of whether the log group is customized, the corresponding log stream starts with 'states/'." + if is_step_functions_log_group(log_stream): + self.metadata[DD_SOURCE] = str(AwsEventSource.STEPFUNCTION) def add_cloudwatch_tags_from_cache(self): log_group_arn = self.aws_attributes.get_log_group_arn() @@ -159,9 +163,6 @@ def handle_rds_source(self): ) def handle_step_function_source(self): - if not self.aws_attributes.get_log_stream().startswith("states/"): - return - state_machine_arn = self.get_state_machine_arn() if not state_machine_arn: return diff --git a/aws/logs_monitoring/tests/approved_files/TestAWSLogsHandler.test_awslogs_handler_step_functions_customized_log_group.approved.json b/aws/logs_monitoring/tests/approved_files/TestAWSLogsHandler.test_awslogs_handler_step_functions_customized_log_group.approved.json new file mode 100644 index 00000000..f99ff145 --- /dev/null +++ b/aws/logs_monitoring/tests/approved_files/TestAWSLogsHandler.test_awslogs_handler_step_functions_customized_log_group.approved.json @@ -0,0 +1,14 @@ +[ + { + "aws": { + "awslogs": { + "logGroup": "test/logs", + "logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9", + "owner": "425362996713" + } + }, + "id": "37199773595581154154810589279545129148442535997644275712", + "message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}", + "timestamp": 1668095539607 + } +] diff --git a/aws/logs_monitoring/tests/approved_files/TestAWSLogsHandler.test_awslogs_handler_step_functions_customized_log_group.metadata.approved.json b/aws/logs_monitoring/tests/approved_files/TestAWSLogsHandler.test_awslogs_handler_step_functions_customized_log_group.metadata.approved.json new file mode 100644 index 00000000..3e65daa4 --- /dev/null +++ b/aws/logs_monitoring/tests/approved_files/TestAWSLogsHandler.test_awslogs_handler_step_functions_customized_log_group.metadata.approved.json @@ -0,0 +1,6 @@ +{ + "ddsource": "stepfunction", + "ddtags": "env:dev,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true", + "host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2", + "service": "stepfunction" +} diff --git a/aws/logs_monitoring/tests/test_awslogs_handler.py b/aws/logs_monitoring/tests/test_awslogs_handler.py index 6c53670b..90177cd4 100644 --- a/aws/logs_monitoring/tests/test_awslogs_handler.py +++ b/aws/logs_monitoring/tests/test_awslogs_handler.py @@ -8,6 +8,8 @@ from approvaltests.approvals import verify_as_json from approvaltests.namer import NamerFactory +from aws.logs_monitoring.steps.enums import AwsEventSource + sys.modules["trace_forwarder.connection"] = MagicMock() sys.modules["datadog_lambda.wrapper"] = MagicMock() sys.modules["datadog_lambda.metric"] = MagicMock() @@ -22,6 +24,7 @@ }, ) env_patch.start() +from aws.logs_monitoring.settings import DD_HOST, DD_SOURCE from steps.handlers.awslogs_handler import AwsLogsHandler from steps.handlers.aws_attributes import AwsAttributes from caching.cache_layer import CacheLayer @@ -117,6 +120,72 @@ def test_awslogs_handler_step_functions_tags_added_properly( awslogs_handler = AwsLogsHandler(context, metadata, cache_layer) verify_as_json(list(awslogs_handler.handle(event))) verify_as_json(metadata, options=NamerFactory.with_parameters("metadata")) + # verify that the handling can properly handle SF logs with the default log group naming + self.assertEqual( + awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value + ) + self.assertEqual( + awslogs_handler.metadata[DD_HOST], + "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction1", + ) + + @patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__") + @patch("caching.cloudwatch_log_group_cache.send_forwarder_internal_metrics") + @patch.dict("os.environ", {"DD_STEP_FUNCTIONS_TRACE_ENABLED": "true"}) + def test_awslogs_handler_step_functions_customized_log_group( + self, + mock_forward_metrics, + mock_cache_init, + ): + # SF customized log group + eventFromCustomizedLogGroup = { + "awslogs": { + "data": base64.b64encode( + gzip.compress( + bytes( + json.dumps( + { + "messageType": "DATA_MESSAGE", + "owner": "425362996713", + "logGroup": "test/logs", + "logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9", + "subscriptionFilters": ["testFilter"], + "logEvents": [ + { + "id": "37199773595581154154810589279545129148442535997644275712", + "timestamp": 1668095539607, + "message": '{"id": "1","type": "ExecutionStarted","details": {"input": "{}","inputDetails": {"truncated": "false"},"roleArn": "arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt"},"previous_event_id": "0","event_timestamp": "1716992192441","execution_arn": "arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa","redrive_count": "0"}', + } + ], + } + ), + "utf-8", + ) + ) + ) + } + } + context = None + metadata = {"ddtags": "env:dev"} + mock_forward_metrics.side_effect = MagicMock() + mock_cache_init.return_value = None + cache_layer = CacheLayer("") + cache_layer._step_functions_cache.get = MagicMock( + return_value=["test_tag_key:test_tag_value"] + ) + cache_layer._cloudwatch_log_group_cache.get = MagicMock() + + awslogs_handler = AwsLogsHandler(context, metadata, cache_layer) + # for some reasons, the below two are needed to update the context of the handler + verify_as_json(list(awslogs_handler.handle(eventFromCustomizedLogGroup))) + verify_as_json(metadata, options=NamerFactory.with_parameters("metadata")) + self.assertEqual( + awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value + ) + self.assertEqual( + awslogs_handler.metadata[DD_HOST], + "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2", + ) def test_process_lambda_logs(self): # Non Lambda log diff --git a/aws/logs_monitoring/tests/test_customized_log_group.py b/aws/logs_monitoring/tests/test_customized_log_group.py index 9765ad29..7074611b 100644 --- a/aws/logs_monitoring/tests/test_customized_log_group.py +++ b/aws/logs_monitoring/tests/test_customized_log_group.py @@ -2,6 +2,7 @@ from customized_log_group import ( is_lambda_customized_log_group, get_lambda_function_name_from_logstream_name, + is_step_functions_log_group, ) @@ -58,3 +59,16 @@ def get_lambda_function_name_from_logstream_name(self): get_lambda_function_name_from_logstream_name(stepfunction_log_stream_name), None, ) + + def test_is_step_functions_log_group(self): + # Lambda logstream is false + lambda_log_stream_name = "2023/11/04/[$LATEST]4426346c2cdf4c54a74d3bd2b929fc44" + self.assertEqual(is_step_functions_log_group(lambda_log_stream_name), False) + + # SF logstream is true + step_functions_log_stream_name = ( + "states/selfmonit-statemachine/2024-11-04-15-30/00000000" + ) + self.assertEqual( + is_step_functions_log_group(step_functions_log_stream_name), True + ) diff --git a/aws/logs_monitoring/tests/test_parsing.py b/aws/logs_monitoring/tests/test_parsing.py index bb2ed83c..f5267dd8 100644 --- a/aws/logs_monitoring/tests/test_parsing.py +++ b/aws/logs_monitoring/tests/test_parsing.py @@ -295,14 +295,6 @@ def test_carbon_black_event(self): str(AwsEventSource.CARBONBLACK), ) - def test_step_function_event(self): - self.assertEqual( - parse_event_source( - {"awslogs": "logs"}, "/aws/vendedlogs/states/MyStateMachine-Logs" - ), - str(AwsEventSource.STEPFUNCTION), - ) - def test_cloudwatch_source_if_none_found(self): self.assertEqual( parse_event_source({"awslogs": "logs"}, ""), str(AwsEventSource.CLOUDWATCH)