Skip to content

Commit

Permalink
chore(statistical-detectors): Add feature flags for function regressi… (
Browse files Browse the repository at this point in the history
#58197)

…on detection

This adds feature flags to allow the incremental roll out for function
regression detection and some metrics for better monitoring.
  • Loading branch information
Zylphrex authored Oct 18, 2023
1 parent 28d42c9 commit f682586
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 59 deletions.
136 changes: 87 additions & 49 deletions src/sentry/tasks/statistical_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from typing import Any, Dict, Generator, List, Optional, Set, Tuple, Union, cast

import sentry_sdk
Expand All @@ -25,7 +25,7 @@
Request,
)

from sentry import options
from sentry import features, options
from sentry.api.serializers.snuba import SnubaTSResultSerializer
from sentry.constants import ObjectStatus
from sentry.models.project import Project
Expand Down Expand Up @@ -88,13 +88,21 @@ def run_detection() -> None:
performance_projects = []
profiling_projects = []

# TODO: make the amount of projects per batch configurable
performance_projects_count = 0
profiling_projects_count = 0

for project in RangeQuerySetWrapper(
Project.objects.filter(status=ObjectStatus.ACTIVE),
Project.objects.filter(status=ObjectStatus.ACTIVE).select_related("organization"),
step=100,
):
if project.flags.has_transactions and project.id in enabled_performance_projects:
if project.flags.has_transactions and (
features.has(
"organizations:performance-statistical-detectors-ema", project.organization
)
or project.id in enabled_performance_projects
):
performance_projects.append(project)
performance_projects_count += 1

if len(performance_projects) >= PROJECTS_PER_BATCH:
detect_transaction_trends.delay(
Expand All @@ -104,8 +112,12 @@ def run_detection() -> None:
)
performance_projects = []

if project.flags.has_profiles and project.id in enabled_profiling_projects:
if project.flags.has_profiles and (
features.has("organizations:profiling-statistical-detectors-ema", project.organization)
or project.id in enabled_profiling_projects
):
profiling_projects.append(project.id)
profiling_projects_count += 1

if len(profiling_projects) >= PROJECTS_PER_BATCH:
detect_function_trends.delay(profiling_projects, now)
Expand All @@ -121,6 +133,18 @@ def run_detection() -> None:
if profiling_projects:
detect_function_trends.delay(profiling_projects, now)

metrics.incr(
"statistical_detectors.performance.projects.total",
amount=performance_projects_count,
sample_rate=1.0,
)

metrics.incr(
"statistical_detectors.profiling.projects.total",
amount=profiling_projects_count,
sample_rate=1.0,
)


@instrumented_task(
name="sentry.tasks.statistical_detectors.detect_transaction_trends",
Expand All @@ -141,7 +165,12 @@ def detect_transaction_trends(
delay = 12 # hours
delayed_start = start + timedelta(hours=delay)

unique_project_ids: Set[int] = set()

for trends in chunked(regressions, TRANSACTIONS_PER_BATCH):
for _, payload in trends:
unique_project_ids.add(payload.project_id)

detect_transaction_change_points.apply_async(
args=[
[(payload.project_id, payload.group) for _, payload in trends],
Expand All @@ -153,6 +182,12 @@ def detect_transaction_trends(
countdown=delay * 60 * 60,
)

metrics.incr(
"statistical_detectors.performance.projects.active",
amount=len(unique_project_ids),
sample_rate=1.0,
)


@instrumented_task(
name="sentry.tasks.statistical_detectors.detect_transaction_change_points",
Expand All @@ -165,18 +200,6 @@ def detect_transaction_change_points(
if not options.get("statistical_detectors.enable"):
return

for project_id, transaction_name in transactions:
with sentry_sdk.push_scope() as scope:
scope.set_tag("regressed_project_id", project_id)
scope.set_tag("regressed_transaction", transaction_name)
scope.set_tag("breakpoint", "no")

scope.set_context(
"statistical_detectors",
{"timestamp": start.isoformat()},
)
sentry_sdk.capture_message("Potential Transaction Regression")

breakpoint_count = 0

for breakpoints in chunked(_detect_transaction_change_points(transactions, start), 10):
Expand Down Expand Up @@ -396,7 +419,12 @@ def detect_function_trends(project_ids: List[int], start: datetime, *args, **kwa
delay = 12 # hours
delayed_start = start + timedelta(hours=delay)

unique_project_ids: Set[int] = set()

for regression_chunk in chunked(regressions, FUNCTIONS_PER_BATCH):
for _, payload in regression_chunk:
unique_project_ids.add(payload.project_id)

detect_function_change_points.apply_async(
args=[
[(payload.project_id, payload.group) for _, payload in regression_chunk],
Expand All @@ -408,6 +436,12 @@ def detect_function_trends(project_ids: List[int], start: datetime, *args, **kwa
countdown=delay * 60 * 60,
)

metrics.incr(
"statistical_detectors.profiling.projects.active",
amount=len(unique_project_ids),
sample_rate=1.0,
)


@instrumented_task(
name="sentry.tasks.statistical_detectors.detect_function_change_points",
Expand All @@ -420,18 +454,6 @@ def detect_function_change_points(
if not options.get("statistical_detectors.enable"):
return

for project_id, fingerprint in functions_list:
with sentry_sdk.push_scope() as scope:
scope.set_tag("regressed_project_id", project_id)
scope.set_tag("regressed_function_id", fingerprint)
scope.set_tag("breakpoint", "no")

scope.set_context(
"statistical_detectors",
{"timestamp": start.isoformat()},
)
sentry_sdk.capture_message("Potential Function Regression")

breakpoint_count = 0
emitted_count = 0

Expand Down Expand Up @@ -612,24 +634,6 @@ def emit_function_regression_issue(
payloads = []

for entry in breakpoints:
with sentry_sdk.push_scope() as scope:
scope.set_tag("breakpoint", "yes")
scope.set_tag("regressed_project_id", entry["project"])
# the service was originally meant for transactions so this
# naming is a result of this
scope.set_tag("regressed_function_id", entry["transaction"])

breakpoint_ts = datetime.fromtimestamp(entry["breakpoint"], tz=timezone.utc)
scope.set_context(
"statistical_detectors",
{
**entry,
"timestamp": start.isoformat(),
"breakpoint_timestamp": breakpoint_ts.isoformat(),
},
)
sentry_sdk.capture_message("Potential Function Regression")

project_id = int(entry["project"])
fingerprint = int(entry["transaction"])
example = examples.get((project_id, fingerprint))
Expand Down Expand Up @@ -669,10 +673,24 @@ def all_function_payloads(
project_ids: List[int],
start: datetime,
) -> Generator[DetectorPayload, None, None]:
enabled_profiling_projects: Set[int] = set(
options.get("statistical_detectors.enable.projects.profiling")
)

projects_per_query = options.get("statistical_detectors.query.batch_size")
assert projects_per_query > 0

for projects in chunked(Project.objects.filter(id__in=project_ids), projects_per_query):
selected_projects = Project.objects.filter(id__in=project_ids).select_related("organization")
selected_projects = filter(
lambda project: (
features.has(
"organizations:profiling-statistical-detectors-breakpoint", project.organization
)
or project.id in enabled_profiling_projects
),
selected_projects,
)
for projects in chunked(selected_projects, projects_per_query):
try:
yield from query_functions(projects, start)
except Exception as e:
Expand Down Expand Up @@ -726,6 +744,26 @@ def query_transactions(
end: datetime,
transactions_per_project: int,
) -> List[DetectorPayload]:
enabled_performance_projects: Set[int] = set(
options.get("statistical_detectors.enable.projects.performance")
)

projects = [
project
for project in Project.objects.filter(id__in=project_ids).select_related("organization")
if (
project.id in enabled_performance_projects
or features.has(
"organizations:performance-statistical-detectors-breakpoint", project.organization
)
)
]
project_ids = [project.id for project in projects]
org_ids = list({project.organization_id for project in projects})

if not project_ids or not org_ids:
return []

use_case_id = UseCaseID.TRANSACTIONS

# both the metric and tag that we are using are hardcoded values in sentry_metrics.indexer.strings
Expand Down
43 changes: 33 additions & 10 deletions tests/sentry/tasks/test_statistical_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,25 @@ def test_detect_function_trends_options(
timestamp,
project,
):
with override_options({"statistical_detectors.enable": enabled}):
with override_options(
{
"statistical_detectors.enable": enabled,
"statistical_detectors.enable.projects.profiling": [project.id],
}
):
detect_function_trends([project.id], timestamp)
assert query_functions.called == enabled


@mock.patch("sentry.snuba.functions.query")
@django_db_all
def test_detect_function_trends_query_timerange(functions_query, timestamp, project):
with override_options({"statistical_detectors.enable": True}):
with override_options(
{
"statistical_detectors.enable": True,
"statistical_detectors.enable.projects.profiling": [project.id],
}
):
detect_function_trends([project.id], timestamp)

assert functions_query.called
Expand Down Expand Up @@ -290,7 +300,12 @@ def test_detect_function_trends(
for i, ts in enumerate(timestamps)
]

with override_options({"statistical_detectors.enable": True}), TaskRunner():
with override_options(
{
"statistical_detectors.enable": True,
"statistical_detectors.enable.projects.profiling": [project.id],
}
), TaskRunner():
for ts in timestamps:
detect_function_trends([project.id], ts)
assert detect_function_change_points.apply_async.called
Expand Down Expand Up @@ -511,13 +526,21 @@ def now(self):
return MetricsAPIBaseTestCase.MOCK_DATETIME

def test_transactions_query(self) -> None:
res = query_transactions(
[self.org.id],
[p.id for p in self.projects],
self.hour_ago,
self.now,
self.num_transactions + 1, # detect if any extra transactions are returned
)
with override_options(
{
"statistical_detectors.enable.projects.performance": [
project.id for project in self.projects
],
}
):
res = query_transactions(
[self.org.id],
[p.id for p in self.projects],
self.hour_ago,
self.now,
self.num_transactions + 1, # detect if any extra transactions are returned
)

assert len(res) == len(self.projects) * self.num_transactions
for trend_payload in res:
assert trend_payload.count == 2
Expand Down

0 comments on commit f682586

Please sign in to comment.