From 8200d690f90b89dae06c7ecdf5ebcedc1306b721 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Fri, 13 Dec 2024 14:18:56 -0800 Subject: [PATCH] replace GroupEvent with PostProcessJob --- .../handlers/action/notification.py | 4 +- .../handlers/condition/__init__.py | 3 +- .../handlers/condition/event_state.py | 19 ------- .../handlers/condition/group_event.py | 6 +-- src/sentry/workflow_engine/models/action.py | 6 +-- .../workflow_engine/models/data_condition.py | 12 ----- src/sentry/workflow_engine/models/workflow.py | 6 +-- .../workflow_engine/processors/action.py | 6 +-- .../processors/data_condition_group.py | 3 +- .../workflow_engine/processors/detector.py | 10 ++-- .../workflow_engine/processors/workflow.py | 20 ++++---- src/sentry/workflow_engine/types.py | 4 +- .../workflow_engine/models/test_workflow.py | 8 +-- .../workflow_engine/processors/test_action.py | 8 +-- .../processors/test_workflow.py | 51 ++++++++++--------- tests/sentry/workflow_engine/test_base.py | 2 +- 16 files changed, 70 insertions(+), 98 deletions(-) delete mode 100644 src/sentry/workflow_engine/handlers/condition/event_state.py diff --git a/src/sentry/workflow_engine/handlers/action/notification.py b/src/sentry/workflow_engine/handlers/action/notification.py index 97e8cf84f39818..d77e31eb2da1d1 100644 --- a/src/sentry/workflow_engine/handlers/action/notification.py +++ b/src/sentry/workflow_engine/handlers/action/notification.py @@ -1,4 +1,4 @@ -from sentry.eventstore.models import GroupEvent +from sentry.tasks.post_process import PostProcessJob from sentry.workflow_engine.models import Action, Detector from sentry.workflow_engine.registry import action_handler_registry from sentry.workflow_engine.types import ActionHandler @@ -8,7 +8,7 @@ class NotificationActionHandler(ActionHandler): @staticmethod def execute( - evt: GroupEvent, + job: PostProcessJob, action: Action, detector: Detector, ) -> None: diff --git a/src/sentry/workflow_engine/handlers/condition/__init__.py b/src/sentry/workflow_engine/handlers/condition/__init__.py index 031b65bcc73fb9..ea243b024a06f1 100644 --- a/src/sentry/workflow_engine/handlers/condition/__init__.py +++ b/src/sentry/workflow_engine/handlers/condition/__init__.py @@ -1,4 +1,3 @@ -__all__ = ["GroupEventConditionHandler", "EventStateConditionHandler"] +__all__ = ["GroupEventConditionHandler"] -from .event_state import EventStateConditionHandler from .group_event import GroupEventConditionHandler diff --git a/src/sentry/workflow_engine/handlers/condition/event_state.py b/src/sentry/workflow_engine/handlers/condition/event_state.py deleted file mode 100644 index 3755fb4f91f111..00000000000000 --- a/src/sentry/workflow_engine/handlers/condition/event_state.py +++ /dev/null @@ -1,19 +0,0 @@ -from typing import Any - -from sentry.workflow_engine.models.data_condition import Condition -from sentry.workflow_engine.registry import condition_handler_registry -from sentry.workflow_engine.types import DataConditionHandler - -from .group_event import get_nested_value - - -@condition_handler_registry.register(Condition.EVENT_STATE_COMPARISON) -class EventStateConditionHandler(DataConditionHandler[dict[str, Any]]): - @staticmethod - def evaluate_value(data: dict[str, Any], comparison: Any, data_filter: str) -> bool: - event_value = get_nested_value(data, data_filter) - return event_value == comparison - - @staticmethod - def get_expected_value(value: Any, **kwargs) -> Any: - return kwargs diff --git a/src/sentry/workflow_engine/handlers/condition/group_event.py b/src/sentry/workflow_engine/handlers/condition/group_event.py index e392db084cfdd9..1828443efbfa7c 100644 --- a/src/sentry/workflow_engine/handlers/condition/group_event.py +++ b/src/sentry/workflow_engine/handlers/condition/group_event.py @@ -1,6 +1,6 @@ from typing import Any -from sentry.eventstore.models import GroupEvent +from sentry.tasks.post_process import PostProcessJob from sentry.workflow_engine.models.data_condition import Condition from sentry.workflow_engine.registry import condition_handler_registry from sentry.workflow_engine.types import DataConditionHandler @@ -22,8 +22,8 @@ def get_nested_value(data: Any, path: str, default: Any = None) -> Any | None: @condition_handler_registry.register(Condition.GROUP_EVENT_ATTR_COMPARISON) -class GroupEventConditionHandler(DataConditionHandler[GroupEvent]): +class GroupEventConditionHandler(DataConditionHandler[PostProcessJob]): @staticmethod - def evaluate_value(data: GroupEvent, comparison: Any, data_filter: str) -> bool: + def evaluate_value(data: PostProcessJob, comparison: Any, data_filter: str) -> bool: event_value = get_nested_value(data, data_filter) return event_value == comparison diff --git a/src/sentry/workflow_engine/models/action.py b/src/sentry/workflow_engine/models/action.py index 027ef5cf11c933..700ddb07c2ed99 100644 --- a/src/sentry/workflow_engine/models/action.py +++ b/src/sentry/workflow_engine/models/action.py @@ -7,8 +7,8 @@ from sentry.backup.scopes import RelocationScope from sentry.db.models import DefaultFieldsModel, region_silo_model, sane_repr from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey -from sentry.eventstore.models import GroupEvent from sentry.notifications.models.notificationaction import ActionTarget +from sentry.tasks.post_process import PostProcessJob from sentry.workflow_engine.registry import action_handler_registry from sentry.workflow_engine.types import ActionHandler @@ -59,7 +59,7 @@ def get_handler(self) -> ActionHandler: action_type = Action.Type(self.type) return action_handler_registry.get(action_type) - def trigger(self, evt: GroupEvent, detector: Detector) -> None: + def trigger(self, job: PostProcessJob, detector: Detector) -> None: # get the handler for the action type handler = self.get_handler() - handler.execute(evt, self, detector) + handler.execute(job, self, detector) diff --git a/src/sentry/workflow_engine/models/data_condition.py b/src/sentry/workflow_engine/models/data_condition.py index d18e849ff8d777..622b5babd55a85 100644 --- a/src/sentry/workflow_engine/models/data_condition.py +++ b/src/sentry/workflow_engine/models/data_condition.py @@ -27,7 +27,6 @@ class Condition(StrEnum): LESS = "lt" NOT_EQUAL = "ne" GROUP_EVENT_ATTR_COMPARISON = "group_event_attr_comparison" - EVENT_STATE_COMPARISON = "event_state_comparison" condition_ops = { @@ -95,17 +94,6 @@ def get_condition_handler(self) -> DataConditionHandler[T] | None: return condition_handler_registry.get(condition_type) - def get_expected_value(self, value: T, **kwargs) -> T | dict[str, Any]: - try: - condition_handler: DataConditionHandler[T] | None = self.get_condition_handler() - except NoRegistrationExistsError: - return value - - if not condition_handler: - return value - - return condition_handler.get_expected_value(value, **kwargs) - def evaluate_value(self, value: T) -> DataConditionResult: condition_handler: DataConditionHandler[T] | None = None op: Callable | None = None diff --git a/src/sentry/workflow_engine/models/workflow.py b/src/sentry/workflow_engine/models/workflow.py index 249bda8ecc7c4b..e11bd3a518529a 100644 --- a/src/sentry/workflow_engine/models/workflow.py +++ b/src/sentry/workflow_engine/models/workflow.py @@ -8,8 +8,8 @@ from sentry.backup.scopes import RelocationScope from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model, sane_repr from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey -from sentry.eventstore.models import GroupEvent from sentry.models.owner_base import OwnerModel +from sentry.tasks.post_process import PostProcessJob from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group from .json_config import JSONConfigBase @@ -53,7 +53,7 @@ class Meta: ) ] - def evaluate_trigger_conditions(self, evt: GroupEvent, **kwargs) -> bool: + def evaluate_trigger_conditions(self, job: PostProcessJob) -> bool: """ Evaluate the conditions for the workflow trigger and return if the evaluation was successful. If there aren't any workflow trigger conditions, the workflow is considered triggered. @@ -61,7 +61,7 @@ def evaluate_trigger_conditions(self, evt: GroupEvent, **kwargs) -> bool: if self.when_condition_group is None: return True - evaluation, _ = evaluate_condition_group(self.when_condition_group, evt, **kwargs) + evaluation, _ = evaluate_condition_group(self.when_condition_group, job) return evaluation diff --git a/src/sentry/workflow_engine/processors/action.py b/src/sentry/workflow_engine/processors/action.py index 0e57ee44441aea..c603bcd3bedb21 100644 --- a/src/sentry/workflow_engine/processors/action.py +++ b/src/sentry/workflow_engine/processors/action.py @@ -1,11 +1,11 @@ from sentry.db.models.manager.base_query_set import BaseQuerySet -from sentry.eventstore.models import GroupEvent +from sentry.tasks.post_process import PostProcessJob from sentry.workflow_engine.models import Action, DataConditionGroup, Workflow from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group def evaluate_workflow_action_filters( - workflows: set[Workflow], evt: GroupEvent + workflows: set[Workflow], job: PostProcessJob ) -> BaseQuerySet[Action]: filtered_action_groups: set[DataConditionGroup] = set() @@ -17,7 +17,7 @@ def evaluate_workflow_action_filters( ).distinct() for action_condition in action_conditions: - evaluation, result = evaluate_condition_group(action_condition, evt) + evaluation, result = evaluate_condition_group(action_condition, job) if evaluation: filtered_action_groups.add(action_condition) diff --git a/src/sentry/workflow_engine/processors/data_condition_group.py b/src/sentry/workflow_engine/processors/data_condition_group.py index a7beb98e5d021e..637e91c5a6b34d 100644 --- a/src/sentry/workflow_engine/processors/data_condition_group.py +++ b/src/sentry/workflow_engine/processors/data_condition_group.py @@ -21,7 +21,6 @@ def get_data_conditions_for_group(data_condition_group_id: int) -> list[DataCond def evaluate_condition_group( data_condition_group: DataConditionGroup, value: T, - **kwargs, ) -> ProcessedDataConditionResult: """ Evaluate the conditions for a given group and value. @@ -39,7 +38,7 @@ def evaluate_condition_group( return True, [] for condition in conditions: - evaluation_result = condition.evaluate_value(condition.get_expected_value(value, **kwargs)) + evaluation_result = condition.evaluate_value(value) is_condition_triggered = evaluation_result is not None if is_condition_triggered: diff --git a/src/sentry/workflow_engine/processors/detector.py b/src/sentry/workflow_engine/processors/detector.py index ba7c6e9718cacd..c5291a9e1e304a 100644 --- a/src/sentry/workflow_engine/processors/detector.py +++ b/src/sentry/workflow_engine/processors/detector.py @@ -2,10 +2,10 @@ import logging -from sentry.eventstore.models import GroupEvent from sentry.issues.grouptype import ErrorGroupType from sentry.issues.issue_occurrence import IssueOccurrence from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka +from sentry.tasks.post_process import PostProcessJob from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult from sentry.workflow_engine.models import DataPacket, Detector from sentry.workflow_engine.types import DetectorGroupKey @@ -14,11 +14,13 @@ # TODO - cache these by evt.group_id? :thinking: -def get_detector_by_event(evt: GroupEvent) -> Detector: - issue_occurrence = evt.occurrence +def get_detector_by_event(job: PostProcessJob) -> Detector: + issue_occurrence = job["event"].occurrence if issue_occurrence is None: - detector = Detector.objects.get(project_id=evt.project_id, type=ErrorGroupType.slug) + detector = Detector.objects.get( + project_id=job["event"].project_id, type=ErrorGroupType.slug + ) else: detector = Detector.objects.get(id=issue_occurrence.evidence_data.get("detector_id", None)) diff --git a/src/sentry/workflow_engine/processors/workflow.py b/src/sentry/workflow_engine/processors/workflow.py index 7200885124aae3..1a21a47a23cff8 100644 --- a/src/sentry/workflow_engine/processors/workflow.py +++ b/src/sentry/workflow_engine/processors/workflow.py @@ -2,7 +2,7 @@ import sentry_sdk -from sentry.eventstore.models import GroupEvent +from sentry.tasks.post_process import PostProcessJob from sentry.utils import metrics from sentry.workflow_engine.models import Detector, Workflow from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters @@ -11,19 +11,17 @@ logger = logging.getLogger(__name__) -def evaluate_workflow_triggers( - workflows: set[Workflow], evt: GroupEvent, **kwargs -) -> set[Workflow]: +def evaluate_workflow_triggers(workflows: set[Workflow], job: PostProcessJob) -> set[Workflow]: triggered_workflows: set[Workflow] = set() for workflow in workflows: - if workflow.evaluate_trigger_conditions(evt, **kwargs): + if workflow.evaluate_trigger_conditions(job): triggered_workflows.add(workflow) return triggered_workflows -def process_workflows(evt: GroupEvent, **kwargs) -> set[Workflow]: +def process_workflows(job: PostProcessJob) -> set[Workflow]: """ This method will get the detector based on the event, and then gather the associated workflows. Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met, @@ -35,19 +33,19 @@ def process_workflows(evt: GroupEvent, **kwargs) -> set[Workflow]: """ # Check to see if the GroupEvent has an issue occurrence try: - detector = get_detector_by_event(evt) + detector = get_detector_by_event(job) except Detector.DoesNotExist: metrics.incr("workflow_engine.process_workflows.error") - logger.exception("Detector not found for event", extra={"event_id": evt.event_id}) + logger.exception("Detector not found for event", extra={"event_id": job["event"].event_id}) return set() # Get the workflows, evaluate the when_condition_group, finally evaluate the actions for workflows that are triggered workflows = set(Workflow.objects.filter(detectorworkflow__detector_id=detector.id).distinct()) - triggered_workflows = evaluate_workflow_triggers(workflows, evt, **kwargs) - actions = evaluate_workflow_action_filters(triggered_workflows, evt) + triggered_workflows = evaluate_workflow_triggers(workflows, job) + actions = evaluate_workflow_action_filters(triggered_workflows, job) with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"): for action in actions: - action.trigger(evt, detector) + action.trigger(job, detector) return triggered_workflows diff --git a/src/sentry/workflow_engine/types.py b/src/sentry/workflow_engine/types.py index 96022e94d8a807..65765aec05ad12 100644 --- a/src/sentry/workflow_engine/types.py +++ b/src/sentry/workflow_engine/types.py @@ -6,7 +6,7 @@ from sentry.types.group import PriorityLevel if TYPE_CHECKING: - from sentry.eventstore.models import GroupEvent + from sentry.tasks.post_process import PostProcessJob from sentry.workflow_engine.models import Action, Detector T = TypeVar("T") @@ -30,7 +30,7 @@ class DetectorPriorityLevel(IntEnum): class ActionHandler: @staticmethod - def execute(group_event: GroupEvent, action: Action, detector: Detector) -> None: + def execute(job: PostProcessJob, action: Action, detector: Detector) -> None: raise NotImplementedError diff --git a/tests/sentry/workflow_engine/models/test_workflow.py b/tests/sentry/workflow_engine/models/test_workflow.py index c00edd89a02432..f038820e687e84 100644 --- a/tests/sentry/workflow_engine/models/test_workflow.py +++ b/tests/sentry/workflow_engine/models/test_workflow.py @@ -1,3 +1,4 @@ +from sentry.tasks.post_process import PostProcessJob from tests.sentry.workflow_engine.test_base import BaseWorkflowTest @@ -8,21 +9,22 @@ def setUp(self): ) self.data_condition = self.data_condition_group.conditions.first() self.group, self.event, self.group_event = self.create_group_event() + self.job = PostProcessJob({"event": self.group_event}) def test_evaluate_trigger_conditions__condition_new_event__True(self): - evaluation = self.workflow.evaluate_trigger_conditions(self.group_event) + evaluation = self.workflow.evaluate_trigger_conditions(self.job) assert evaluation is True def test_evaluate_trigger_conditions__condition_new_event__False(self): # Update event to have been seen before self.group_event.group.times_seen = 5 - evaluation = self.workflow.evaluate_trigger_conditions(self.group_event) + evaluation = self.workflow.evaluate_trigger_conditions(self.job) assert evaluation is False def test_evaluate_trigger_conditions__no_conditions(self): self.workflow.when_condition_group = None self.workflow.save() - evaluation = self.workflow.evaluate_trigger_conditions(self.group_event) + evaluation = self.workflow.evaluate_trigger_conditions(self.job) assert evaluation is True diff --git a/tests/sentry/workflow_engine/processors/test_action.py b/tests/sentry/workflow_engine/processors/test_action.py index 097890ab8fc535..839ff4ac0c0857 100644 --- a/tests/sentry/workflow_engine/processors/test_action.py +++ b/tests/sentry/workflow_engine/processors/test_action.py @@ -1,3 +1,4 @@ +from sentry.tasks.post_process import PostProcessJob from sentry.workflow_engine.models.data_condition import Condition from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters from tests.sentry.workflow_engine.test_base import BaseWorkflowTest @@ -17,9 +18,10 @@ def setUp(self): self.group, self.event, self.group_event = self.create_group_event( occurrence=self.build_occurrence_data(evidence_data={"detector_id": self.detector.id}) ) + self.job = PostProcessJob({"event": self.group_event}) def test_basic__no_filter(self): - triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event) + triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job) assert set(triggered_actions) == {self.action} def test_basic__with_filter__passes(self): @@ -31,7 +33,7 @@ def test_basic__with_filter__passes(self): condition_result=True, ) - triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event) + triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job) assert set(triggered_actions) == {self.action} def test_basic__with_filter__filtered(self): @@ -43,5 +45,5 @@ def test_basic__with_filter__filtered(self): comparison=self.detector.id + 1, ) - triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event) + triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job) assert not triggered_actions diff --git a/tests/sentry/workflow_engine/processors/test_workflow.py b/tests/sentry/workflow_engine/processors/test_workflow.py index d07d47abb93009..52ce3521c15a04 100644 --- a/tests/sentry/workflow_engine/processors/test_workflow.py +++ b/tests/sentry/workflow_engine/processors/test_workflow.py @@ -1,7 +1,8 @@ from unittest import mock +from sentry.eventstream.base import GroupState from sentry.incidents.grouptype import MetricAlertFire -from sentry.rules.base import EventState +from sentry.tasks.post_process import PostProcessJob from sentry.workflow_engine.models import DataConditionGroup from sentry.workflow_engine.models.data_condition import Condition from sentry.workflow_engine.processors.workflow import evaluate_workflow_triggers, process_workflows @@ -25,16 +26,24 @@ def setUp(self): ) self.group, self.event, self.group_event = self.create_group_event() + self.job = PostProcessJob( + { + "event": self.group_event, + "group_state": GroupState( + id=1, is_new=False, is_regression=True, is_new_group_environment=False + ), + } + ) def test_error_event(self): - triggered_workflows = process_workflows(self.group_event) + triggered_workflows = process_workflows(self.job) assert triggered_workflows == {self.error_workflow} def test_error_event_with_kwargs(self): dcg = self.create_data_condition_group() self.create_data_condition( - type=Condition.EVENT_STATE_COMPARISON, - condition="state.is_regression", + type=Condition.GROUP_EVENT_ATTR_COMPARISON, + condition="group_state.is_regression", comparison=True, condition_result=True, condition_group=dcg, @@ -46,31 +55,24 @@ def test_error_event_with_kwargs(self): workflow=workflow, ) - triggered_workflows = process_workflows( - self.group_event, - state=EventState( - is_new=False, - is_regression=True, - is_new_group_environment=False, - has_reappeared=False, - has_escalated=False, - ), - ) + triggered_workflows = process_workflows(self.job) assert triggered_workflows == {self.error_workflow, workflow} def test_issue_occurrence_event(self): issue_occurrence = self.build_occurrence(evidence_data={"detector_id": self.detector.id}) self.group_event.occurrence = issue_occurrence + self.job = PostProcessJob({"event": self.group_event}) - triggered_workflows = process_workflows(self.group_event) + triggered_workflows = process_workflows(self.job) assert triggered_workflows == {self.workflow} def test_no_detector(self): self.group_event.occurrence = self.build_occurrence(evidence_data={}) + self.job = PostProcessJob({"event": self.group_event}) with mock.patch("sentry.workflow_engine.processors.workflow.logger") as mock_logger: with mock.patch("sentry.workflow_engine.processors.workflow.metrics") as mock_metrics: - triggered_workflows = process_workflows(self.group_event) + triggered_workflows = process_workflows(self.job) assert not triggered_workflows @@ -94,13 +96,14 @@ def setUp(self): self.group, self.event, self.group_event = self.create_group_event( occurrence=occurrence, ) + self.job = PostProcessJob({"event": self.group_event}) def test_workflow_trigger(self): - triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.group_event) + triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job) assert triggered_workflows == {self.workflow} def test_no_workflow_trigger(self): - triggered_workflows = evaluate_workflow_triggers(set(), self.group_event) + triggered_workflows = evaluate_workflow_triggers(set(), self.job) assert not triggered_workflows def test_workflow_many_filters(self): @@ -110,12 +113,12 @@ def test_workflow_many_filters(self): self.create_data_condition( condition_group=self.workflow.when_condition_group, type=Condition.GROUP_EVENT_ATTR_COMPARISON, - condition="occurrence.evidence_data.detector_id", + condition="event.occurrence.evidence_data.detector_id", comparison=self.detector.id, condition_result=75, ) - triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.group_event) + triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job) assert triggered_workflows == {self.workflow} def test_workflow_filterd_out(self): @@ -125,17 +128,15 @@ def test_workflow_filterd_out(self): self.create_data_condition( condition_group=self.workflow.when_condition_group, type=Condition.GROUP_EVENT_ATTR_COMPARISON, - condition="occurrence.evidence_data.detector_id", + condition="event.occurrence.evidence_data.detector_id", comparison=self.detector.id + 1, ) - triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.group_event) + triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job) assert not triggered_workflows def test_many_workflows(self): workflow_two, _, _, _ = self.create_detector_and_workflow(name_prefix="two") - triggered_workflows = evaluate_workflow_triggers( - {self.workflow, workflow_two}, self.group_event - ) + triggered_workflows = evaluate_workflow_triggers({self.workflow, workflow_two}, self.job) assert triggered_workflows == {self.workflow, workflow_two} diff --git a/tests/sentry/workflow_engine/test_base.py b/tests/sentry/workflow_engine/test_base.py index ab3090d95889e1..f58d57bf256a4f 100644 --- a/tests/sentry/workflow_engine/test_base.py +++ b/tests/sentry/workflow_engine/test_base.py @@ -76,7 +76,7 @@ def create_detector_and_workflow( self.create_data_condition( condition_group=workflow_triggers, type=Condition.GROUP_EVENT_ATTR_COMPARISON, - condition="group.times_seen", + condition="event.group.times_seen", comparison=1, condition_result=True, )