Skip to content

Commit

Permalink
feat(events tracking): add abstract class and logging implementation (#…
Browse files Browse the repository at this point in the history
…80117)

[design
doc](https://www.notion.so/sentry/Conversion-rate-of-ingest-transactions-to-save-trx-1298b10e4b5d801ab517c8e2218d13d5)

need to track the completion of each stage, to 1) compute events
conversion rates 2) enable debugging visibility into where events are
being dropped

the usage will be heavily sampled to not blow up traffic

this PR only adds REDIS_PUT stage, in subsequent PRs I will add all the
other stages listed in EventStageStatus class


**!!!!!IMPORTANT!!!!!!**
hash based sampling
here's a [blog
post](https://www.rsyslog.com/doc/tutorials/hash_sampling.html)
explaining hash based sampling, which would provide "all or nothing"
logging for the events sampled across the entire pipeline. That's the
idea I want to implement

the hashing algorithm used must be consistent and uniformly distributed
in order for all or nothing sampling to work.
I cannot find references that say that md5 is consistent and evenly
distributed other than various [stackoverflow
pages](https://crypto.stackexchange.com/questions/14967/distribution-for-a-subset-of-md5).
All the official sources are too academic and long and i can't
understand

----------
for reviewers:
please review with the thoughts of how this can be generalized to other
pipelines as well, such as errors
  • Loading branch information
victoria-yining-huang authored and andrewshie-sentry committed Dec 2, 2024
1 parent d2c1f14 commit c8bfef0
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/sentry/ingest/consumer/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from sentry.utils import metrics
from sentry.utils.cache import cache_key_for_event
from sentry.utils.dates import to_datetime
from sentry.utils.event_tracker import TransactionStageStatus, track_sampled_event
from sentry.utils.sdk import set_current_event_project
from sentry.utils.snuba import RateLimitExceeded

Expand Down Expand Up @@ -202,6 +203,11 @@ def process_event(
else:
with metrics.timer("ingest_consumer._store_event"):
cache_key = processing_store.store(data)
if data.get("type") == "transaction":
track_sampled_event(
data["event_id"], "transaction", TransactionStageStatus.REDIS_PUT
)

save_attachments(attachments, cache_key)

try:
Expand Down
5 changes: 5 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2915,3 +2915,8 @@
default=[],
flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"performance.event-tracker.sample-rate.transaction",
default=0.0,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
61 changes: 61 additions & 0 deletions src/sentry/utils/event_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import logging
from enum import StrEnum

from sentry import options


class EventType(StrEnum):
TRANSACTION = "transaction"
ERROR = "error"


class TransactionStageStatus(StrEnum):
# the transaction is stored to rc-transactions
REDIS_PUT = "redis_put"

# a save_transactions task is kicked off
SAVE_TRX_STARTED = "save_trx_started"

# a save_transactions task is finished
SAVE_TRX_FINISHED = "save_trx_finished"

# the transaction is published to the `events` topic for snuba/sbc consumers to consume
SNUBA_TOPIC_PUT = "snuba_topic_put"

# the transaction is published to the `snuba-commit-log` topic
COMMIT_LOG_TOPIC_PUT = "commit_log_topic_put"

# a post_process task is kicked off
POST_PROCESS_STARTED = "post_process_started"

# the transaction is deleted from rc-transactions
REDIS_DELETED = "redis_deleted"


logger = logging.getLogger("EventTracker")


def track_sampled_event(event_id: str, event_type: str, status: TransactionStageStatus) -> None:
"""
Records how far an event has made it through the ingestion pipeline.
Each event type will pick up its sampling rate from its registered option.
"""

sample_rate = options.get(f"performance.event-tracker.sample-rate.{event_type}")
if sample_rate == 0:
return

event_float = (int(event_id, 16) % 10000) / 10000
if event_float < sample_rate:
extra = {
"event_id": event_id,
"event_type": getattr(EventType, event_type.upper(), None),
"status": status,
}
_do_record(extra)


def _do_record(extra):
# All Python logs will be picked up by Google Cloud Logging.
# TODO: make a google Cloud Sink to filter for these EventTracker logs and put them into BigQuery and do data analysis downstream
logger.info("EventTracker.recorded", extra=extra)
33 changes: 33 additions & 0 deletions tests/sentry/utils/test_event_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import unittest
from unittest.mock import patch

from sentry.testutils.cases import TestCase
from sentry.testutils.helpers.options import override_options
from sentry.utils.event_tracker import EventType, TransactionStageStatus, track_sampled_event

EVENT_ID = "9cdc4c32dff14fbbb012b0aa9e908126"
EVENT_TYPE_STR = "transaction"
STATUS = TransactionStageStatus.REDIS_PUT

EXPECTED_EVENT_TYPE = EventType.TRANSACTION


class TestEventTracking(TestCase):

@patch("sentry.utils.event_tracker._do_record")
def test_track_sampled_event_logs_event(self, mock_do_record):
with override_options({"performance.event-tracker.sample-rate.transaction": 1.0}):
track_sampled_event(EVENT_ID, EVENT_TYPE_STR, STATUS)
mock_do_record.assert_called_once_with(
{"event_id": EVENT_ID, "event_type": EXPECTED_EVENT_TYPE, "status": STATUS}
)

@patch("sentry.utils.event_tracker._do_record")
def test_track_sampled_event_does_not_log_event(self, mock_do_record):
with override_options({"performance.event-tracker.sample-rate.transaction": 0.0}):
track_sampled_event(EVENT_ID, EVENT_TYPE_STR, STATUS)
mock_do_record.assert_not_called()


if __name__ == "__main__":
unittest.main()

0 comments on commit c8bfef0

Please sign in to comment.