Skip to content

Commit

Permalink
feat(aci): event frequency condition handler (#82551)
Browse files Browse the repository at this point in the history
  • Loading branch information
cathteng authored Jan 15, 2025
1 parent 7a072af commit 8cff0dc
Show file tree
Hide file tree
Showing 6 changed files with 570 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import contextlib
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Callable, Mapping
from datetime import datetime, timedelta
from typing import Any, Literal, TypedDict

from django.db.models import QuerySet

from sentry.issues.grouptype import GroupCategory, get_group_type_by_type_id
from sentry.models.group import Group
from sentry.rules.conditions.event_frequency import SNUBA_LIMIT
from sentry.tsdb.base import TSDBModel
from sentry.utils.iterators import chunked
from sentry.utils.snuba import options_override


class _QSTypedDict(TypedDict):
id: int
type: int
project_id: int
project__organization_id: int


class BaseEventFrequencyConditionHandler(ABC):
@property
@abstractmethod
def intervals(self) -> dict[str, tuple[str, timedelta]]:
raise NotImplementedError

def get_query_window(self, end: datetime, duration: timedelta) -> tuple[datetime, datetime]:
"""
Calculate the start and end times for the query.
"duration" is the length of the window we're querying over.
"""
start = end - duration
return (start, end)

def disable_consistent_snuba_mode(
self, duration: timedelta
) -> contextlib.AbstractContextManager[object]:
"""For conditions with interval >= 1 hour we don't need to worry about read your writes
consistency. Disable it so that we can scale to more nodes.
"""
option_override_cm: contextlib.AbstractContextManager[object] = contextlib.nullcontext()
if duration >= timedelta(hours=1):
option_override_cm = options_override({"consistent": False})
return option_override_cm

def get_snuba_query_result(
self,
tsdb_function: Callable[..., Any],
keys: list[int],
group_id: int,
organization_id: int,
model: TSDBModel,
start: datetime,
end: datetime,
environment_id: int,
referrer_suffix: str,
) -> Mapping[int, int]:
result: Mapping[int, int] = tsdb_function(
model=model,
keys=keys,
start=start,
end=end,
environment_id=environment_id,
use_cache=True,
jitter_value=group_id,
tenant_ids={"organization_id": organization_id},
referrer_suffix=referrer_suffix,
)
return result

def get_chunked_result(
self,
tsdb_function: Callable[..., Any],
model: TSDBModel,
group_ids: list[int],
organization_id: int,
start: datetime,
end: datetime,
environment_id: int,
referrer_suffix: str,
) -> dict[int, int]:
batch_totals: dict[int, int] = defaultdict(int)
group_id = group_ids[0]
for group_chunk in chunked(group_ids, SNUBA_LIMIT):
result = self.get_snuba_query_result(
tsdb_function=tsdb_function,
model=model,
keys=[group_id for group_id in group_chunk],
group_id=group_id,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
referrer_suffix=referrer_suffix,
)
batch_totals.update(result)
return batch_totals

def get_group_ids_by_category(
self,
groups: QuerySet[Group, _QSTypedDict],
) -> dict[GroupCategory, list[int]]:
"""
Separate group ids into error group ids and generic group ids
"""
category_group_ids: dict[GroupCategory, list[int]] = defaultdict(list)

for group in groups:
issue_type = get_group_type_by_type_id(group["type"])
category = GroupCategory(issue_type.category)
category_group_ids[category].append(group["id"])

return category_group_ids

def get_value_from_groups(
self,
groups: QuerySet[Group, _QSTypedDict] | None,
value: Literal["id", "project_id", "project__organization_id"],
) -> int | None:
result = None
if groups:
group = groups[0]
result = group.get(value)
return result

@abstractmethod
def batch_query(
self, group_ids: set[int], start: datetime, end: datetime, environment_id: int
) -> dict[int, int]:
"""
Abstract method that specifies how to query Snuba for multiple groups
depending on the condition. Must be implemented by subclasses.
"""
raise NotImplementedError

def get_rate_bulk(
self,
duration: timedelta,
group_ids: set[int],
environment_id: int,
current_time: datetime,
comparison_interval: timedelta | None,
) -> dict[int, int]:
"""
Make a batch query for multiple groups. The return value is a dictionary
of group_id to the result for that group.
If comparison_interval is not None, we're making the second query in a
percent comparison condition. For example, if the condition is:
- num of issues is {}% higher in 1 hr compared to 5 min ago
The second query would be querying for num of events from:
- 5 min ago to 1 hr 5 min ago
"""
if comparison_interval:
current_time -= comparison_interval
start, end = self.get_query_window(end=current_time, duration=duration)

with self.disable_consistent_snuba_mode(duration):
result = self.batch_query(
group_ids=group_ids,
start=start,
end=end,
environment_id=environment_id,
)
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any

from sentry import tsdb
from sentry.issues.constants import get_issue_tsdb_group_model
from sentry.models.group import Group
from sentry.rules.conditions.event_frequency import (
COMPARISON_INTERVALS,
STANDARD_INTERVALS,
percent_increase,
)
from sentry.tsdb.base import TSDBModel
from sentry.workflow_engine.handlers.condition.event_frequency_base_handler import (
BaseEventFrequencyConditionHandler,
)
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.registry import condition_handler_registry
from sentry.workflow_engine.types import DataConditionHandler, DataConditionResult


class EventFrequencyConditionHandler(BaseEventFrequencyConditionHandler):
@property
def intervals(self) -> dict[str, tuple[str, timedelta]]:
return STANDARD_INTERVALS

def batch_query(
self, group_ids: set[int], start: datetime, end: datetime, environment_id: int
) -> dict[int, int]:
batch_sums: dict[int, int] = defaultdict(int)
groups = Group.objects.filter(id__in=group_ids).values(
"id", "type", "project_id", "project__organization_id"
)
category_group_ids = self.get_group_ids_by_category(groups)
organization_id = self.get_value_from_groups(groups, "project__organization_id")

if not organization_id:
return batch_sums

def get_result(model: TSDBModel, group_ids: list[int]) -> dict[int, int]:
return self.get_chunked_result(
tsdb_function=tsdb.backend.get_sums,
model=model,
group_ids=group_ids,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
referrer_suffix="batch_alert_event_frequency",
)

for category, issue_ids in category_group_ids.items():
model = get_issue_tsdb_group_model(
category
) # TODO: may need to update logic for crons, metric issues, uptime
batch_sums.update(get_result(model, issue_ids))

return batch_sums


@condition_handler_registry.register(Condition.EVENT_FREQUENCY_COUNT)
class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHandler[int]):
comparison_json_schema = {
"type": "object",
"properties": {
"interval": {"type": "string", "enum": list(STANDARD_INTERVALS.keys())},
"value": {"type": "integer", "minimum": 0},
},
"required": ["interval", "value"],
"additionalProperties": False,
}

@staticmethod
def evaluate_value(value: int, comparison: Any) -> DataConditionResult:
return value > comparison["value"]


@condition_handler_registry.register(Condition.EVENT_FREQUENCY_PERCENT)
class EventFrequencyPercentHandler(EventFrequencyConditionHandler, DataConditionHandler[list[int]]):
comparison_json_schema = {
"type": "object",
"properties": {
"interval": {"type": "string", "enum": list(STANDARD_INTERVALS.keys())},
"value": {"type": "integer", "minimum": 0},
"comparison_interval": {"type": "string", "enum": list(COMPARISON_INTERVALS.keys())},
},
"required": ["interval", "value", "comparison_interval"],
"additionalProperties": False,
}

@staticmethod
def evaluate_value(value: list[int], comparison: Any) -> DataConditionResult:
if len(value) != 2:
return False
return percent_increase(value[0], value[1]) > comparison["value"]
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any

from sentry.rules.conditions.event_attribute import EventAttributeCondition
from sentry.rules.conditions.event_frequency import ComparisonType, EventFrequencyCondition
from sentry.rules.conditions.every_event import EveryEventCondition
from sentry.rules.conditions.existing_high_priority_issue import ExistingHighPriorityIssueCondition
from sentry.rules.conditions.first_seen_event import FirstSeenEventCondition
Expand Down Expand Up @@ -47,7 +48,7 @@ def create_reappeared_event_data_condition(


@data_condition_translator_registry.register(RegressionEventCondition.id)
def create_regressed_event_data_condition(
def create_regression_event_data_condition(
data: dict[str, Any], dcg: DataConditionGroup
) -> DataCondition:
return DataCondition.objects.create(
Expand Down Expand Up @@ -259,3 +260,27 @@ def create_latest_adopted_release_data_condition(
condition_result=True,
condition_group=dcg,
)


@data_condition_translator_registry.register(EventFrequencyCondition.id)
def create_event_frequency_data_condition(
data: dict[str, Any], dcg: DataConditionGroup
) -> DataCondition:
comparison_type = data["comparisonType"] # this is camelCase, age comparison is snake_case
comparison = {
"interval": data["interval"],
"value": data["value"],
}

if comparison_type == ComparisonType.COUNT:
type = Condition.EVENT_FREQUENCY_COUNT
else:
type = Condition.EVENT_FREQUENCY_PERCENT
comparison["comparison_interval"] = data["comparisonInterval"]

return DataCondition.objects.create(
type=type,
comparison=comparison,
condition_result=True,
condition_group=dcg,
)
30 changes: 30 additions & 0 deletions src/sentry/workflow_engine/models/data_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ class Condition(models.TextChoices):
TAGGED_EVENT = "tagged_event"
ISSUE_PRIORITY_EQUALS = "issue_priority_equals"

# Event frequency conditions
EVENT_FREQUENCY_COUNT = "event_frequency_count"
EVENT_FREQUENCY_PERCENT = "event_frequency_percent"
EVENT_UNIQUE_USER_FREQUENCY_COUNT = "event_unique_user_frequency_count"
EVENT_UNIQUE_USER_FREQUENCY_PERCENT = "event_unique_user_frequency_percent"
PERCENT_SESSIONS_COUNT = "percent_sessions_count"
PERCENT_SESSIONS_PERCENT = "percent_sessions_percent"
EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_COUNT = (
"event_unique_user_frequency_with_conditions_count"
)
EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_PERCENT = (
"event_unique_user_frequency_with_conditions_percent"
)


CONDITION_OPS = {
Condition.EQUAL: operator.eq,
Expand Down Expand Up @@ -126,6 +140,22 @@ def evaluate_value(self, value: T) -> DataConditionResult:
return self.get_condition_result() if result else None


SLOW_CONDITIONS = [
Condition.EVENT_FREQUENCY_COUNT,
Condition.EVENT_FREQUENCY_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_PERCENT,
Condition.PERCENT_SESSIONS_COUNT,
Condition.PERCENT_SESSIONS_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_PERCENT,
]


def is_slow_condition(cond: DataCondition) -> bool:
return Condition(cond.type) in SLOW_CONDITIONS


@receiver(pre_save, sender=DataCondition)
def enforce_comparison_schema(sender, instance: DataCondition, **kwargs):

Expand Down
Loading

0 comments on commit 8cff0dc

Please sign in to comment.