From c8fc781cdffd93b625f5b18dcd67c17cf3738595 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Mon, 17 Jun 2024 16:19:11 +0200 Subject: [PATCH] Add Celery receive latency (#3174) Add new header to instrumented celery tasks to calculate `messaging.message.receive.latency`. --- sentry_sdk/consts.py | 5 +++++ sentry_sdk/integrations/celery/__init__.py | 22 +++++++++++++++++++ tests/integrations/celery/test_celery.py | 16 ++++++++++++++ .../celery/test_update_celery_task_headers.py | 13 +++++++++-- 4 files changed, 54 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 976edf86ac..99edb3ff5c 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -301,6 +301,11 @@ class SPANDATA: Number of retries/attempts to process a message. """ + MESSAGING_MESSAGE_RECEIVE_LATENCY = "messaging.message.receive.latency" + """ + The latency between when the task was enqueued and when it was started to be processed. + """ + MESSAGING_SYSTEM = "messaging.system" """ The messaging system's name, e.g. `kafka`, `aws_sqs` diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 46e8002218..2b05871d70 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -181,6 +181,12 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks): } ) + # Add the time the task was enqueued to the headers + # This is used in the consumer to calculate the latency + updated_headers.update( + {"sentry-task-enqueued-time": _now_seconds_since_epoch()} + ) + if headers: existing_baggage = updated_headers.get(BAGGAGE_HEADER_NAME) sentry_baggage = headers.get(BAGGAGE_HEADER_NAME) @@ -360,12 +366,28 @@ def _inner(*args, **kwargs): op=OP.QUEUE_PROCESS, description=task.name ) as span: _set_messaging_destination_name(task, span) + + latency = None + with capture_internal_exceptions(): + if ( + task.request.headers is not None + and "sentry-task-enqueued-time" in task.request.headers + ): + latency = _now_seconds_since_epoch() - task.request.headers.pop( + "sentry-task-enqueued-time" + ) + + if latency is not None: + span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency) + with capture_internal_exceptions(): span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id) + with capture_internal_exceptions(): span.set_data( SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries ) + with capture_internal_exceptions(): span.set_data( SPANDATA.MESSAGING_SYSTEM, diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index d8308c5978..c5311a9d62 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -530,6 +530,7 @@ def dummy_task(self, x, y): # Newly added headers expected_headers["sentry-trace"] = mock.ANY expected_headers["baggage"] = mock.ANY + expected_headers["sentry-task-enqueued-time"] = mock.ANY assert result.get() == expected_headers @@ -754,3 +755,18 @@ def task(): ... assert span["data"]["messaging.message.retry.count"] == 0 monkeypatch.setattr(kombu.messaging.Producer, "_publish", old_publish) + + +def test_receive_latency(init_celery, capture_events): + celery = init_celery(traces_sample_rate=1.0) + events = capture_events() + + @celery.task() + def task(): ... + + task.apply_async() + + (event,) = events + (span,) = event["spans"] + assert "messaging.message.receive.latency" in span["data"] + assert span["data"]["messaging.message.receive.latency"] > 0 diff --git a/tests/integrations/celery/test_update_celery_task_headers.py b/tests/integrations/celery/test_update_celery_task_headers.py index e94379f763..d1ab7ef0c1 100644 --- a/tests/integrations/celery/test_update_celery_task_headers.py +++ b/tests/integrations/celery/test_update_celery_task_headers.py @@ -29,11 +29,17 @@ def test_monitor_beat_tasks(monitor_beat_tasks): if monitor_beat_tasks: assert updated_headers == { - "headers": {"sentry-monitor-start-timestamp-s": mock.ANY}, + "headers": { + "sentry-monitor-start-timestamp-s": mock.ANY, + "sentry-task-enqueued-time": mock.ANY, + }, "sentry-monitor-start-timestamp-s": mock.ANY, + "sentry-task-enqueued-time": mock.ANY, } else: - assert updated_headers == headers + assert updated_headers == { + "sentry-task-enqueued-time": mock.ANY, + } @pytest.mark.parametrize("monitor_beat_tasks", [True, False, None, "", "bla", 1, 0]) @@ -41,6 +47,7 @@ def test_monitor_beat_tasks_with_headers(monitor_beat_tasks): headers = { "blub": "foo", "sentry-something": "bar", + "sentry-task-enqueued-time": mock.ANY, } span = None @@ -53,8 +60,10 @@ def test_monitor_beat_tasks_with_headers(monitor_beat_tasks): "headers": { "sentry-monitor-start-timestamp-s": mock.ANY, "sentry-something": "bar", + "sentry-task-enqueued-time": mock.ANY, }, "sentry-monitor-start-timestamp-s": mock.ANY, + "sentry-task-enqueued-time": mock.ANY, } else: assert updated_headers == headers