Skip to content

Commit

Permalink
feat(crons): Prune check-in volume at incident end (#80916)
Browse files Browse the repository at this point in the history
When a system incident recovers we need to prune volume data recorded
during the incident, since this data will be a large outlier and will
skew future metric calculations for these minutes.

Part of GH-79328
  • Loading branch information
evanpurkhiser authored Nov 18, 2024
1 parent ca07c0a commit 982db45
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 31 deletions.
29 changes: 6 additions & 23 deletions src/sentry/monitors/consumers/clock_tick_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.monitors.clock_tasks.check_missed import dispatch_check_missing
from sentry.monitors.clock_tasks.check_timeout import dispatch_check_timeout
from sentry.monitors.system_incidents import (
make_clock_tick_decision,
record_clock_tick_volume_metric,
)
from sentry.utils import metrics
from sentry.monitors.system_incidents import process_clock_tick_for_system_incidents

logger = logging.getLogger(__name__)

Expand All @@ -33,29 +29,16 @@ def process_clock_tick(message: Message[KafkaPayload | FilteredPayload]):
wrapper: ClockTick = MONITORS_CLOCK_TICK_CODEC.decode(message.payload.value)
ts = datetime.fromtimestamp(wrapper["ts"], tz=timezone.utc)

record_clock_tick_volume_metric(ts)
try:
result = make_clock_tick_decision(ts)

metrics.incr(
"monitors.tasks.clock_tick.tick_decision",
tags={"decision": result.decision},
sample_rate=1.0,
)
if result.transition:
metrics.incr(
"monitors.tasks.clock_tick.tick_transition",
tags={"transition": result.transition},
sample_rate=1.0,
)
except Exception:
logger.exception("sentry.tasks.clock_tick.clock_tick_decision_failed")

logger.info(
"process_clock_tick",
extra={"reference_datetime": str(ts)},
)

try:
process_clock_tick_for_system_incidents(ts)
except Exception:
logger.exception("failed_process_clock_tick_for_system_incidents")

dispatch_check_missing(ts)
dispatch_check_timeout(ts)

Expand Down
69 changes: 65 additions & 4 deletions src/sentry/monitors/system_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from collections import Counter
from collections.abc import Generator, Sequence
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from itertools import batched, chain

Expand All @@ -30,9 +30,12 @@
# This key is used to record the metric volume metric for the tick.
MONITOR_TICK_METRIC = "sentry.monitors.volume_metric:{ts}"

# This key is used to record the anomaly decision for a tick
# This key is used to record the anomaly decision for a tick.
MONITOR_TICK_DECISION = "sentry.monitors.tick_decision:{ts}"

# Tracks the timestamp of the first clock tick of a system incident.
MONITR_LAST_SYSTEM_INCIDENT_TS = "sentry.monitors.last_system_incident_ts"

# When fetching historic volume data to make a decision whether we have lost
# data this value will determine how many historic volume data-points we fetch
# of the window of the MONITOR_VOLUME_RETENTION. It is important to consider
Expand Down Expand Up @@ -78,10 +81,68 @@ def update_check_in_volume(ts_list: Sequence[datetime]):
pipeline.execute()


def process_clock_tick_for_system_incidents(tick: datetime) -> DecisionResult:
"""
Encapsulates logic specific to determining if we are in a system incident
during each clock tick.
"""
record_clock_tick_volume_metric(tick)
result = make_clock_tick_decision(tick)

logger.info(
"monitors.system_incidents.process_clock_tick",
extra={"decision": result.decision, "transition": result.transition},
)

# Record metrics for each tick decision
metrics.incr(
"monitors.tasks.clock_tick.tick_decision",
tags={"decision": result.decision},
sample_rate=1.0,
)
if result.transition:
metrics.incr(
"monitors.tasks.clock_tick.tick_transition",
tags={"transition": result.transition},
sample_rate=1.0,
)

# When entering an incident record the starting tiemstamp of the incident
if result.transition == AnomalyTransition.INCIDENT_STARTED:
record_last_incidnet_ts(result.ts)

# When exiting an incident prune check-in volume during that incident
if result.transition == AnomalyTransition.INCIDENT_RECOVERED:
if start := get_last_incident_ts():
prune_incident_check_in_volume(start, result.ts)
else:
logger.error("monitors.system_incidents.recovered_without_start_ts")

return result


def record_last_incidnet_ts(ts: datetime) -> None:
"""
Records the timestamp of the most recent
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
redis_client.set(MONITR_LAST_SYSTEM_INCIDENT_TS, int(ts.timestamp()))


def get_last_incident_ts() -> datetime | None:
"""
Retrieves the timestamp of the last system incident
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
value = _int_or_none(redis_client.get(MONITR_LAST_SYSTEM_INCIDENT_TS))
return datetime.fromtimestamp(value, UTC) if value else None


def prune_incident_check_in_volume(start: datetime, end: datetime) -> None:
"""
After recovering from a system incident the volume and metric data must be
discarded to avoid skewing future computations. This function does this
After recovering from a system incident the volume data must be discarded
to avoid skewing future computations. Note that the start time is inclusive
and the end time is exclusive.
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

Expand Down
8 changes: 4 additions & 4 deletions tests/sentry/monitors/consumers/test_clock_tick_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ def create_consumer():

@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_missing")
@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_timeout")
@mock.patch("sentry.monitors.consumers.clock_tick_consumer.record_clock_tick_volume_metric")
@mock.patch("sentry.monitors.consumers.clock_tick_consumer.process_clock_tick_for_system_incidents")
def test_simple(
mock_record_clock_tick_volume_metric,
mock_process_clock_tick_for_system_incidents,
mock_dispatch_check_timeout,
mock_dispatch_check_missing,
):
Expand All @@ -56,8 +56,8 @@ def test_simple(
)
consumer.submit(Message(value))

assert mock_record_clock_tick_volume_metric.call_count == 1
assert mock_record_clock_tick_volume_metric.mock_calls[0] == mock.call(ts)
assert mock_process_clock_tick_for_system_incidents.call_count == 1
assert mock_process_clock_tick_for_system_incidents.mock_calls[0] == mock.call(ts)

assert mock_dispatch_check_timeout.call_count == 1
assert mock_dispatch_check_timeout.mock_calls[0] == mock.call(ts)
Expand Down
68 changes: 68 additions & 0 deletions tests/sentry/monitors/test_system_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
MONITOR_VOLUME_HISTORY,
MONITOR_VOLUME_RETENTION,
AnomalyTransition,
DecisionResult,
TickAnomalyDecision,
_make_reference_ts,
get_clock_tick_decision,
get_clock_tick_volume_metric,
get_last_incident_ts,
make_clock_tick_decision,
process_clock_tick_for_system_incidents,
prune_incident_check_in_volume,
record_clock_tick_volume_metric,
update_check_in_volume,
Expand Down Expand Up @@ -104,6 +107,71 @@ def make_key(offset: timedelta) -> str:
assert minute_3 == "1"


@mock.patch("sentry.monitors.system_incidents.logger")
@mock.patch("sentry.monitors.system_incidents.metrics")
@mock.patch("sentry.monitors.system_incidents.record_clock_tick_volume_metric")
@mock.patch("sentry.monitors.system_incidents.make_clock_tick_decision")
@mock.patch("sentry.monitors.system_incidents.prune_incident_check_in_volume")
def test_process_clock_tick_for_system_incident(
mock_prune_incident_check_in_volume,
mock_make_clock_tick_decision,
mock_record_clock_tick_volume_metric,
mock_metrics,
mock_logger,
):
ts = timezone.now().replace(second=0, microsecond=0)

mock_make_clock_tick_decision.return_value = DecisionResult(
ts=ts,
decision=TickAnomalyDecision.ABNORMAL,
transition=AnomalyTransition.ABNORMALITY_STARTED,
)
process_clock_tick_for_system_incidents(ts)

assert mock_record_clock_tick_volume_metric.call_count == 1
assert mock_make_clock_tick_decision.call_count == 1

# Metrics and logs are recorded
assert mock_logger.info.call_args_list[0] == mock.call(
"monitors.system_incidents.process_clock_tick",
extra={
"decision": TickAnomalyDecision.ABNORMAL,
"transition": AnomalyTransition.ABNORMALITY_STARTED,
},
)
assert mock_metrics.incr.call_args_list[0] == mock.call(
"monitors.tasks.clock_tick.tick_decision",
tags={"decision": TickAnomalyDecision.ABNORMAL},
sample_rate=1.0,
)
assert mock_metrics.incr.call_args_list[1] == mock.call(
"monitors.tasks.clock_tick.tick_transition",
tags={"transition": AnomalyTransition.ABNORMALITY_STARTED},
sample_rate=1.0,
)

# Transition into an incident records the start timestamp
mock_make_clock_tick_decision.return_value = DecisionResult(
ts=ts,
decision=TickAnomalyDecision.INCIDENT,
transition=AnomalyTransition.INCIDENT_STARTED,
)
process_clock_tick_for_system_incidents(ts)
assert get_last_incident_ts() == ts

# Transitioning out of an incident prunes volume during the incident
mock_make_clock_tick_decision.return_value = DecisionResult(
ts=ts + timedelta(minutes=5),
decision=TickAnomalyDecision.NORMAL,
transition=AnomalyTransition.INCIDENT_RECOVERED,
)
process_clock_tick_for_system_incidents(ts)
assert mock_prune_incident_check_in_volume.call_args_list[0] == mock.call(
ts,
ts + timedelta(minutes=5),
)


@mock.patch("sentry.monitors.system_incidents.logger")
@mock.patch("sentry.monitors.system_incidents.metrics")
@override_options({"crons.tick_volume_anomaly_detection": True})
Expand Down

0 comments on commit 982db45

Please sign in to comment.