From c8bfef0d4cca55c92d1d02beaf008065ffba6a0c Mon Sep 17 00:00:00 2001 From: victoria-yining-huang <44307912+victoria-yining-huang@users.noreply.github.com> Date: Wed, 20 Nov 2024 15:50:49 -0500 Subject: [PATCH] feat(events tracking): add abstract class and logging implementation (#80117) [design doc](https://www.notion.so/sentry/Conversion-rate-of-ingest-transactions-to-save-trx-1298b10e4b5d801ab517c8e2218d13d5) need to track the completion of each stage, to 1) compute events conversion rates 2) enable debugging visibility into where events are being dropped the usage will be heavily sampled to not blow up traffic this PR only adds REDIS_PUT stage, in subsequent PRs I will add all the other stages listed in EventStageStatus class **!!!!!IMPORTANT!!!!!!** hash based sampling here's a [blog post](https://www.rsyslog.com/doc/tutorials/hash_sampling.html) explaining hash based sampling, which would provide "all or nothing" logging for the events sampled across the entire pipeline. That's the idea I want to implement the hashing algorithm used must be consistent and uniformly distributed in order for all or nothing sampling to work. I cannot find references that say that md5 is consistent and evenly distributed other than various [stackoverflow pages](https://crypto.stackexchange.com/questions/14967/distribution-for-a-subset-of-md5). All the official sources are too academic and long and i can't understand ---------- for reviewers: please review with the thoughts of how this can be generalized to other pipelines as well, such as errors --- src/sentry/ingest/consumer/processors.py | 6 +++ src/sentry/options/defaults.py | 5 ++ src/sentry/utils/event_tracker.py | 61 ++++++++++++++++++++++++ tests/sentry/utils/test_event_tracker.py | 33 +++++++++++++ 4 files changed, 105 insertions(+) create mode 100644 src/sentry/utils/event_tracker.py create mode 100644 tests/sentry/utils/test_event_tracker.py diff --git a/src/sentry/ingest/consumer/processors.py b/src/sentry/ingest/consumer/processors.py index b4dc48ae05f5ce..95e42756b48164 100644 --- a/src/sentry/ingest/consumer/processors.py +++ b/src/sentry/ingest/consumer/processors.py @@ -25,6 +25,7 @@ from sentry.utils import metrics from sentry.utils.cache import cache_key_for_event from sentry.utils.dates import to_datetime +from sentry.utils.event_tracker import TransactionStageStatus, track_sampled_event from sentry.utils.sdk import set_current_event_project from sentry.utils.snuba import RateLimitExceeded @@ -202,6 +203,11 @@ def process_event( else: with metrics.timer("ingest_consumer._store_event"): cache_key = processing_store.store(data) + if data.get("type") == "transaction": + track_sampled_event( + data["event_id"], "transaction", TransactionStageStatus.REDIS_PUT + ) + save_attachments(attachments, cache_key) try: diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 0ffc355968a5eb..de06fc7fa684d1 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -2915,3 +2915,8 @@ default=[], flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "performance.event-tracker.sample-rate.transaction", + default=0.0, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) diff --git a/src/sentry/utils/event_tracker.py b/src/sentry/utils/event_tracker.py new file mode 100644 index 00000000000000..8ac0287512bc26 --- /dev/null +++ b/src/sentry/utils/event_tracker.py @@ -0,0 +1,61 @@ +import logging +from enum import StrEnum + +from sentry import options + + +class EventType(StrEnum): + TRANSACTION = "transaction" + ERROR = "error" + + +class TransactionStageStatus(StrEnum): + # the transaction is stored to rc-transactions + REDIS_PUT = "redis_put" + + # a save_transactions task is kicked off + SAVE_TRX_STARTED = "save_trx_started" + + # a save_transactions task is finished + SAVE_TRX_FINISHED = "save_trx_finished" + + # the transaction is published to the `events` topic for snuba/sbc consumers to consume + SNUBA_TOPIC_PUT = "snuba_topic_put" + + # the transaction is published to the `snuba-commit-log` topic + COMMIT_LOG_TOPIC_PUT = "commit_log_topic_put" + + # a post_process task is kicked off + POST_PROCESS_STARTED = "post_process_started" + + # the transaction is deleted from rc-transactions + REDIS_DELETED = "redis_deleted" + + +logger = logging.getLogger("EventTracker") + + +def track_sampled_event(event_id: str, event_type: str, status: TransactionStageStatus) -> None: + """ + Records how far an event has made it through the ingestion pipeline. + Each event type will pick up its sampling rate from its registered option. + """ + + sample_rate = options.get(f"performance.event-tracker.sample-rate.{event_type}") + if sample_rate == 0: + return + + event_float = (int(event_id, 16) % 10000) / 10000 + if event_float < sample_rate: + extra = { + "event_id": event_id, + "event_type": getattr(EventType, event_type.upper(), None), + "status": status, + } + _do_record(extra) + + +def _do_record(extra): + # All Python logs will be picked up by Google Cloud Logging. + # TODO: make a google Cloud Sink to filter for these EventTracker logs and put them into BigQuery and do data analysis downstream + logger.info("EventTracker.recorded", extra=extra) diff --git a/tests/sentry/utils/test_event_tracker.py b/tests/sentry/utils/test_event_tracker.py new file mode 100644 index 00000000000000..127969bcf37525 --- /dev/null +++ b/tests/sentry/utils/test_event_tracker.py @@ -0,0 +1,33 @@ +import unittest +from unittest.mock import patch + +from sentry.testutils.cases import TestCase +from sentry.testutils.helpers.options import override_options +from sentry.utils.event_tracker import EventType, TransactionStageStatus, track_sampled_event + +EVENT_ID = "9cdc4c32dff14fbbb012b0aa9e908126" +EVENT_TYPE_STR = "transaction" +STATUS = TransactionStageStatus.REDIS_PUT + +EXPECTED_EVENT_TYPE = EventType.TRANSACTION + + +class TestEventTracking(TestCase): + + @patch("sentry.utils.event_tracker._do_record") + def test_track_sampled_event_logs_event(self, mock_do_record): + with override_options({"performance.event-tracker.sample-rate.transaction": 1.0}): + track_sampled_event(EVENT_ID, EVENT_TYPE_STR, STATUS) + mock_do_record.assert_called_once_with( + {"event_id": EVENT_ID, "event_type": EXPECTED_EVENT_TYPE, "status": STATUS} + ) + + @patch("sentry.utils.event_tracker._do_record") + def test_track_sampled_event_does_not_log_event(self, mock_do_record): + with override_options({"performance.event-tracker.sample-rate.transaction": 0.0}): + track_sampled_event(EVENT_ID, EVENT_TYPE_STR, STATUS) + mock_do_record.assert_not_called() + + +if __name__ == "__main__": + unittest.main()