From 982db451a8c169aa1f643d6baa82ff5b43ef3394 Mon Sep 17 00:00:00 2001 From: Evan Purkhiser Date: Mon, 18 Nov 2024 13:53:50 -0500 Subject: [PATCH] feat(crons): Prune check-in volume at incident end (#80916) When a system incident recovers we need to prune volume data recorded during the incident, since this data will be a large outlier and will skew future metric calculations for these minutes. Part of GH-79328 --- .../monitors/consumers/clock_tick_consumer.py | 29 ++------ src/sentry/monitors/system_incidents.py | 69 +++++++++++++++++-- .../consumers/test_clock_tick_consumer.py | 8 +-- .../sentry/monitors/test_system_incidents.py | 68 ++++++++++++++++++ 4 files changed, 143 insertions(+), 31 deletions(-) diff --git a/src/sentry/monitors/consumers/clock_tick_consumer.py b/src/sentry/monitors/consumers/clock_tick_consumer.py index b81bd9749d1b1f..4d8dfae7761e3c 100644 --- a/src/sentry/monitors/consumers/clock_tick_consumer.py +++ b/src/sentry/monitors/consumers/clock_tick_consumer.py @@ -15,11 +15,7 @@ from sentry.conf.types.kafka_definition import Topic, get_topic_codec from sentry.monitors.clock_tasks.check_missed import dispatch_check_missing from sentry.monitors.clock_tasks.check_timeout import dispatch_check_timeout -from sentry.monitors.system_incidents import ( - make_clock_tick_decision, - record_clock_tick_volume_metric, -) -from sentry.utils import metrics +from sentry.monitors.system_incidents import process_clock_tick_for_system_incidents logger = logging.getLogger(__name__) @@ -33,29 +29,16 @@ def process_clock_tick(message: Message[KafkaPayload | FilteredPayload]): wrapper: ClockTick = MONITORS_CLOCK_TICK_CODEC.decode(message.payload.value) ts = datetime.fromtimestamp(wrapper["ts"], tz=timezone.utc) - record_clock_tick_volume_metric(ts) - try: - result = make_clock_tick_decision(ts) - - metrics.incr( - "monitors.tasks.clock_tick.tick_decision", - tags={"decision": result.decision}, - sample_rate=1.0, - ) - if result.transition: - metrics.incr( - "monitors.tasks.clock_tick.tick_transition", - tags={"transition": result.transition}, - sample_rate=1.0, - ) - except Exception: - logger.exception("sentry.tasks.clock_tick.clock_tick_decision_failed") - logger.info( "process_clock_tick", extra={"reference_datetime": str(ts)}, ) + try: + process_clock_tick_for_system_incidents(ts) + except Exception: + logger.exception("failed_process_clock_tick_for_system_incidents") + dispatch_check_missing(ts) dispatch_check_timeout(ts) diff --git a/src/sentry/monitors/system_incidents.py b/src/sentry/monitors/system_incidents.py index a8926aee89f21f..cca0cfec6e2a09 100644 --- a/src/sentry/monitors/system_incidents.py +++ b/src/sentry/monitors/system_incidents.py @@ -13,7 +13,7 @@ from collections import Counter from collections.abc import Generator, Sequence from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import UTC, datetime, timedelta from enum import StrEnum from itertools import batched, chain @@ -30,9 +30,12 @@ # This key is used to record the metric volume metric for the tick. MONITOR_TICK_METRIC = "sentry.monitors.volume_metric:{ts}" -# This key is used to record the anomaly decision for a tick +# This key is used to record the anomaly decision for a tick. MONITOR_TICK_DECISION = "sentry.monitors.tick_decision:{ts}" +# Tracks the timestamp of the first clock tick of a system incident. +MONITR_LAST_SYSTEM_INCIDENT_TS = "sentry.monitors.last_system_incident_ts" + # When fetching historic volume data to make a decision whether we have lost # data this value will determine how many historic volume data-points we fetch # of the window of the MONITOR_VOLUME_RETENTION. It is important to consider @@ -78,10 +81,68 @@ def update_check_in_volume(ts_list: Sequence[datetime]): pipeline.execute() +def process_clock_tick_for_system_incidents(tick: datetime) -> DecisionResult: + """ + Encapsulates logic specific to determining if we are in a system incident + during each clock tick. + """ + record_clock_tick_volume_metric(tick) + result = make_clock_tick_decision(tick) + + logger.info( + "monitors.system_incidents.process_clock_tick", + extra={"decision": result.decision, "transition": result.transition}, + ) + + # Record metrics for each tick decision + metrics.incr( + "monitors.tasks.clock_tick.tick_decision", + tags={"decision": result.decision}, + sample_rate=1.0, + ) + if result.transition: + metrics.incr( + "monitors.tasks.clock_tick.tick_transition", + tags={"transition": result.transition}, + sample_rate=1.0, + ) + + # When entering an incident record the starting tiemstamp of the incident + if result.transition == AnomalyTransition.INCIDENT_STARTED: + record_last_incidnet_ts(result.ts) + + # When exiting an incident prune check-in volume during that incident + if result.transition == AnomalyTransition.INCIDENT_RECOVERED: + if start := get_last_incident_ts(): + prune_incident_check_in_volume(start, result.ts) + else: + logger.error("monitors.system_incidents.recovered_without_start_ts") + + return result + + +def record_last_incidnet_ts(ts: datetime) -> None: + """ + Records the timestamp of the most recent + """ + redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) + redis_client.set(MONITR_LAST_SYSTEM_INCIDENT_TS, int(ts.timestamp())) + + +def get_last_incident_ts() -> datetime | None: + """ + Retrieves the timestamp of the last system incident + """ + redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) + value = _int_or_none(redis_client.get(MONITR_LAST_SYSTEM_INCIDENT_TS)) + return datetime.fromtimestamp(value, UTC) if value else None + + def prune_incident_check_in_volume(start: datetime, end: datetime) -> None: """ - After recovering from a system incident the volume and metric data must be - discarded to avoid skewing future computations. This function does this + After recovering from a system incident the volume data must be discarded + to avoid skewing future computations. Note that the start time is inclusive + and the end time is exclusive. """ redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) diff --git a/tests/sentry/monitors/consumers/test_clock_tick_consumer.py b/tests/sentry/monitors/consumers/test_clock_tick_consumer.py index d43fe016c9d4cb..ef2774aa1bfcb0 100644 --- a/tests/sentry/monitors/consumers/test_clock_tick_consumer.py +++ b/tests/sentry/monitors/consumers/test_clock_tick_consumer.py @@ -38,9 +38,9 @@ def create_consumer(): @mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_missing") @mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_timeout") -@mock.patch("sentry.monitors.consumers.clock_tick_consumer.record_clock_tick_volume_metric") +@mock.patch("sentry.monitors.consumers.clock_tick_consumer.process_clock_tick_for_system_incidents") def test_simple( - mock_record_clock_tick_volume_metric, + mock_process_clock_tick_for_system_incidents, mock_dispatch_check_timeout, mock_dispatch_check_missing, ): @@ -56,8 +56,8 @@ def test_simple( ) consumer.submit(Message(value)) - assert mock_record_clock_tick_volume_metric.call_count == 1 - assert mock_record_clock_tick_volume_metric.mock_calls[0] == mock.call(ts) + assert mock_process_clock_tick_for_system_incidents.call_count == 1 + assert mock_process_clock_tick_for_system_incidents.mock_calls[0] == mock.call(ts) assert mock_dispatch_check_timeout.call_count == 1 assert mock_dispatch_check_timeout.mock_calls[0] == mock.call(ts) diff --git a/tests/sentry/monitors/test_system_incidents.py b/tests/sentry/monitors/test_system_incidents.py index 84c31819ac4ea3..e08ef73b1a2988 100644 --- a/tests/sentry/monitors/test_system_incidents.py +++ b/tests/sentry/monitors/test_system_incidents.py @@ -12,11 +12,14 @@ MONITOR_VOLUME_HISTORY, MONITOR_VOLUME_RETENTION, AnomalyTransition, + DecisionResult, TickAnomalyDecision, _make_reference_ts, get_clock_tick_decision, get_clock_tick_volume_metric, + get_last_incident_ts, make_clock_tick_decision, + process_clock_tick_for_system_incidents, prune_incident_check_in_volume, record_clock_tick_volume_metric, update_check_in_volume, @@ -104,6 +107,71 @@ def make_key(offset: timedelta) -> str: assert minute_3 == "1" +@mock.patch("sentry.monitors.system_incidents.logger") +@mock.patch("sentry.monitors.system_incidents.metrics") +@mock.patch("sentry.monitors.system_incidents.record_clock_tick_volume_metric") +@mock.patch("sentry.monitors.system_incidents.make_clock_tick_decision") +@mock.patch("sentry.monitors.system_incidents.prune_incident_check_in_volume") +def test_process_clock_tick_for_system_incident( + mock_prune_incident_check_in_volume, + mock_make_clock_tick_decision, + mock_record_clock_tick_volume_metric, + mock_metrics, + mock_logger, +): + ts = timezone.now().replace(second=0, microsecond=0) + + mock_make_clock_tick_decision.return_value = DecisionResult( + ts=ts, + decision=TickAnomalyDecision.ABNORMAL, + transition=AnomalyTransition.ABNORMALITY_STARTED, + ) + process_clock_tick_for_system_incidents(ts) + + assert mock_record_clock_tick_volume_metric.call_count == 1 + assert mock_make_clock_tick_decision.call_count == 1 + + # Metrics and logs are recorded + assert mock_logger.info.call_args_list[0] == mock.call( + "monitors.system_incidents.process_clock_tick", + extra={ + "decision": TickAnomalyDecision.ABNORMAL, + "transition": AnomalyTransition.ABNORMALITY_STARTED, + }, + ) + assert mock_metrics.incr.call_args_list[0] == mock.call( + "monitors.tasks.clock_tick.tick_decision", + tags={"decision": TickAnomalyDecision.ABNORMAL}, + sample_rate=1.0, + ) + assert mock_metrics.incr.call_args_list[1] == mock.call( + "monitors.tasks.clock_tick.tick_transition", + tags={"transition": AnomalyTransition.ABNORMALITY_STARTED}, + sample_rate=1.0, + ) + + # Transition into an incident records the start timestamp + mock_make_clock_tick_decision.return_value = DecisionResult( + ts=ts, + decision=TickAnomalyDecision.INCIDENT, + transition=AnomalyTransition.INCIDENT_STARTED, + ) + process_clock_tick_for_system_incidents(ts) + assert get_last_incident_ts() == ts + + # Transitioning out of an incident prunes volume during the incident + mock_make_clock_tick_decision.return_value = DecisionResult( + ts=ts + timedelta(minutes=5), + decision=TickAnomalyDecision.NORMAL, + transition=AnomalyTransition.INCIDENT_RECOVERED, + ) + process_clock_tick_for_system_incidents(ts) + assert mock_prune_incident_check_in_volume.call_args_list[0] == mock.call( + ts, + ts + timedelta(minutes=5), + ) + + @mock.patch("sentry.monitors.system_incidents.logger") @mock.patch("sentry.monitors.system_incidents.metrics") @override_options({"crons.tick_volume_anomaly_detection": True})