From 84ed981d8722249ec4c901bbed6a70846be59b15 Mon Sep 17 00:00:00 2001 From: Georgi Date: Wed, 23 Oct 2024 15:39:11 +0200 Subject: [PATCH] [Logs forwarder] Scrub log's inner messages (#860) * Add a service override test * [Logs forwarder] allow to scrub log's inner messages * properly catch exception --- aws/logs_monitoring/forwarder.py | 12 ++++++++- aws/logs_monitoring/settings.py | 2 +- .../tests/events/cloudwatch_logs_ddtags.json | 16 ++++++++++++ .../tests/test_lambda_function.py | 25 +++++++++++++++++-- 4 files changed, 51 insertions(+), 4 deletions(-) create mode 100644 aws/logs_monitoring/tests/events/cloudwatch_logs_ddtags.json diff --git a/aws/logs_monitoring/forwarder.py b/aws/logs_monitoring/forwarder.py index e09dceafa..60fe8a70b 100644 --- a/aws/logs_monitoring/forwarder.py +++ b/aws/logs_monitoring/forwarder.py @@ -82,17 +82,27 @@ def _forward_logs(self, logs, key=None): if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Forwarding {len(logs)} logs") + scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS) logs_to_forward = [] for log in logs: if key: log = add_retry_tag(log) + + # apply scrubbing rules to inner log message if exists + if isinstance(log, dict) and log.get("message"): + try: + log["message"] = scrubber.scrub(log["message"]) + except Exception as e: + logger.exception( + f"Exception while scrubbing log message {log['message']}: {e}" + ) + logs_to_forward.append(json.dumps(log, ensure_ascii=False)) logs_to_forward = filter_logs( logs_to_forward, INCLUDE_AT_MATCH, EXCLUDE_AT_MATCH ) - scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS) if DD_USE_TCP: batcher = DatadogBatcher(256 * 1000, 256 * 1000, 1) cli = DatadogTCPClient(DD_URL, DD_PORT, DD_NO_SSL, DD_API_KEY, scrubber) diff --git a/aws/logs_monitoring/settings.py b/aws/logs_monitoring/settings.py index 7c6c81036..473385b88 100644 --- a/aws/logs_monitoring/settings.py +++ b/aws/logs_monitoring/settings.py @@ -176,7 +176,7 @@ def __init__(self, name, pattern, placeholder): ScrubbingRuleConfig( "DD_SCRUBBING_RULE", get_env_var("DD_SCRUBBING_RULE", default=None), - get_env_var("DD_SCRUBBING_RULE_REPLACEMENT", default="xxxxx"), + get_env_var("DD_SCRUBBING_RULE_REPLACEMENT", default=""), ), ] diff --git a/aws/logs_monitoring/tests/events/cloudwatch_logs_ddtags.json b/aws/logs_monitoring/tests/events/cloudwatch_logs_ddtags.json new file mode 100644 index 000000000..cc45eff47 --- /dev/null +++ b/aws/logs_monitoring/tests/events/cloudwatch_logs_ddtags.json @@ -0,0 +1,16 @@ +{ + "messageType": "DATA_MESSAGE", + "owner": "601427279990", + "logGroup": "/aws/lambda/testing-datadog", + "logStream": "2024/10/10/[$LATEST]20bddfd5a2dc4c6b97ac02800eae90d0", + "subscriptionFilters": [ + "testing-datadog" + ], + "logEvents": [ + { + "id": "35311576111948622874033876462979853992919938886093242368", + "timestamp": 1583425836114, + "message": "{\"status\":\"debug\",\"message\":\"datadog:Patched console output with trace context\",\"ddtags\":\"env:test,service:test-inner-message\"}\n" + } + ] +} diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index 42c94cd35..5d88c4b40 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -242,9 +242,30 @@ def test_overrding_service_tag_from_lambda_cache_when_dd_tags_is_set( for log in logs: self.assertEqual(log["service"], "lambda_service") - def _get_input_data(self): + @patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__") + def test_overrding_service_tag_from_message_ddtags(self, mock_cache_init): + mock_cache_init.return_value = None + cache_layer = CacheLayer("") + cache_layer._lambda_cache = MagicMock() + cache_layer._cloudwatch_log_group_cache = MagicMock() + context = Context() + input_data = self._get_input_data(path="events/cloudwatch_logs_ddtags.json") + event = { + "awslogs": {"data": self._create_cloudwatch_log_event_from_data(input_data)} + } + normalized_events = parse(event, context, cache_layer) + enriched_events = enrich(normalized_events, cache_layer) + transformed_events = transform(enriched_events) + + _, logs, _ = split(transformed_events) + self.assertEqual(len(logs), 1) + for log in logs: + self.assertEqual(log["service"], "test-inner-message") + self.assertTrue(isinstance(log["message"], str)) + + def _get_input_data(self, path="events/cloudwatch_logs.json"): my_path = os.path.abspath(os.path.dirname(__file__)) - path = os.path.join(my_path, "events/cloudwatch_logs.json") + path = os.path.join(my_path, path) with open( path,