Skip to content

Commit

Permalink
replace GroupEvent with PostProcessJob
Browse files Browse the repository at this point in the history
  • Loading branch information
cathteng committed Dec 13, 2024
1 parent 94a8900 commit 8200d69
Show file tree
Hide file tree
Showing 16 changed files with 70 additions and 98 deletions.
4 changes: 2 additions & 2 deletions src/sentry/workflow_engine/handlers/action/notification.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sentry.eventstore.models import GroupEvent
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.types import ActionHandler
Expand All @@ -8,7 +8,7 @@
class NotificationActionHandler(ActionHandler):
@staticmethod
def execute(
evt: GroupEvent,
job: PostProcessJob,
action: Action,
detector: Detector,
) -> None:
Expand Down
3 changes: 1 addition & 2 deletions src/sentry/workflow_engine/handlers/condition/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
__all__ = ["GroupEventConditionHandler", "EventStateConditionHandler"]
__all__ = ["GroupEventConditionHandler"]

from .event_state import EventStateConditionHandler
from .group_event import GroupEventConditionHandler
19 changes: 0 additions & 19 deletions src/sentry/workflow_engine/handlers/condition/event_state.py

This file was deleted.

6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/handlers/condition/group_event.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any

from sentry.eventstore.models import GroupEvent
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.registry import condition_handler_registry
from sentry.workflow_engine.types import DataConditionHandler
Expand All @@ -22,8 +22,8 @@ def get_nested_value(data: Any, path: str, default: Any = None) -> Any | None:


@condition_handler_registry.register(Condition.GROUP_EVENT_ATTR_COMPARISON)
class GroupEventConditionHandler(DataConditionHandler[GroupEvent]):
class GroupEventConditionHandler(DataConditionHandler[PostProcessJob]):
@staticmethod
def evaluate_value(data: GroupEvent, comparison: Any, data_filter: str) -> bool:
def evaluate_value(data: PostProcessJob, comparison: Any, data_filter: str) -> bool:
event_value = get_nested_value(data, data_filter)
return event_value == comparison
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/models/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.eventstore.models import GroupEvent
from sentry.notifications.models.notificationaction import ActionTarget
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.types import ActionHandler

Expand Down Expand Up @@ -59,7 +59,7 @@ def get_handler(self) -> ActionHandler:
action_type = Action.Type(self.type)
return action_handler_registry.get(action_type)

def trigger(self, evt: GroupEvent, detector: Detector) -> None:
def trigger(self, job: PostProcessJob, detector: Detector) -> None:
# get the handler for the action type
handler = self.get_handler()
handler.execute(evt, self, detector)
handler.execute(job, self, detector)
12 changes: 0 additions & 12 deletions src/sentry/workflow_engine/models/data_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class Condition(StrEnum):
LESS = "lt"
NOT_EQUAL = "ne"
GROUP_EVENT_ATTR_COMPARISON = "group_event_attr_comparison"
EVENT_STATE_COMPARISON = "event_state_comparison"


condition_ops = {
Expand Down Expand Up @@ -95,17 +94,6 @@ def get_condition_handler(self) -> DataConditionHandler[T] | None:

return condition_handler_registry.get(condition_type)

def get_expected_value(self, value: T, **kwargs) -> T | dict[str, Any]:
try:
condition_handler: DataConditionHandler[T] | None = self.get_condition_handler()
except NoRegistrationExistsError:
return value

if not condition_handler:
return value

return condition_handler.get_expected_value(value, **kwargs)

def evaluate_value(self, value: T) -> DataConditionResult:
condition_handler: DataConditionHandler[T] | None = None
op: Callable | None = None
Expand Down
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.eventstore.models import GroupEvent
from sentry.models.owner_base import OwnerModel
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group

from .json_config import JSONConfigBase
Expand Down Expand Up @@ -53,15 +53,15 @@ class Meta:
)
]

def evaluate_trigger_conditions(self, evt: GroupEvent, **kwargs) -> bool:
def evaluate_trigger_conditions(self, job: PostProcessJob) -> bool:
"""
Evaluate the conditions for the workflow trigger and return if the evaluation was successful.
If there aren't any workflow trigger conditions, the workflow is considered triggered.
"""
if self.when_condition_group is None:
return True

evaluation, _ = evaluate_condition_group(self.when_condition_group, evt, **kwargs)
evaluation, _ = evaluate_condition_group(self.when_condition_group, job)
return evaluation


Expand Down
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/processors/action.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from sentry.db.models.manager.base_query_set import BaseQuerySet
from sentry.eventstore.models import GroupEvent
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models import Action, DataConditionGroup, Workflow
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group


def evaluate_workflow_action_filters(
workflows: set[Workflow], evt: GroupEvent
workflows: set[Workflow], job: PostProcessJob
) -> BaseQuerySet[Action]:
filtered_action_groups: set[DataConditionGroup] = set()

Expand All @@ -17,7 +17,7 @@ def evaluate_workflow_action_filters(
).distinct()

for action_condition in action_conditions:
evaluation, result = evaluate_condition_group(action_condition, evt)
evaluation, result = evaluate_condition_group(action_condition, job)

if evaluation:
filtered_action_groups.add(action_condition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def get_data_conditions_for_group(data_condition_group_id: int) -> list[DataCond
def evaluate_condition_group(
data_condition_group: DataConditionGroup,
value: T,
**kwargs,
) -> ProcessedDataConditionResult:
"""
Evaluate the conditions for a given group and value.
Expand All @@ -39,7 +38,7 @@ def evaluate_condition_group(
return True, []

for condition in conditions:
evaluation_result = condition.evaluate_value(condition.get_expected_value(value, **kwargs))
evaluation_result = condition.evaluate_value(value)
is_condition_triggered = evaluation_result is not None

if is_condition_triggered:
Expand Down
10 changes: 6 additions & 4 deletions src/sentry/workflow_engine/processors/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import logging

from sentry.eventstore.models import GroupEvent
from sentry.issues.grouptype import ErrorGroupType
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult
from sentry.workflow_engine.models import DataPacket, Detector
from sentry.workflow_engine.types import DetectorGroupKey
Expand All @@ -14,11 +14,13 @@


# TODO - cache these by evt.group_id? :thinking:
def get_detector_by_event(evt: GroupEvent) -> Detector:
issue_occurrence = evt.occurrence
def get_detector_by_event(job: PostProcessJob) -> Detector:
issue_occurrence = job["event"].occurrence

if issue_occurrence is None:
detector = Detector.objects.get(project_id=evt.project_id, type=ErrorGroupType.slug)
detector = Detector.objects.get(
project_id=job["event"].project_id, type=ErrorGroupType.slug
)
else:
detector = Detector.objects.get(id=issue_occurrence.evidence_data.get("detector_id", None))

Expand Down
20 changes: 9 additions & 11 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import sentry_sdk

from sentry.eventstore.models import GroupEvent
from sentry.tasks.post_process import PostProcessJob
from sentry.utils import metrics
from sentry.workflow_engine.models import Detector, Workflow
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
Expand All @@ -11,19 +11,17 @@
logger = logging.getLogger(__name__)


def evaluate_workflow_triggers(
workflows: set[Workflow], evt: GroupEvent, **kwargs
) -> set[Workflow]:
def evaluate_workflow_triggers(workflows: set[Workflow], job: PostProcessJob) -> set[Workflow]:
triggered_workflows: set[Workflow] = set()

for workflow in workflows:
if workflow.evaluate_trigger_conditions(evt, **kwargs):
if workflow.evaluate_trigger_conditions(job):
triggered_workflows.add(workflow)

return triggered_workflows


def process_workflows(evt: GroupEvent, **kwargs) -> set[Workflow]:
def process_workflows(job: PostProcessJob) -> set[Workflow]:
"""
This method will get the detector based on the event, and then gather the associated workflows.
Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met,
Expand All @@ -35,19 +33,19 @@ def process_workflows(evt: GroupEvent, **kwargs) -> set[Workflow]:
"""
# Check to see if the GroupEvent has an issue occurrence
try:
detector = get_detector_by_event(evt)
detector = get_detector_by_event(job)
except Detector.DoesNotExist:
metrics.incr("workflow_engine.process_workflows.error")
logger.exception("Detector not found for event", extra={"event_id": evt.event_id})
logger.exception("Detector not found for event", extra={"event_id": job["event"].event_id})
return set()

# Get the workflows, evaluate the when_condition_group, finally evaluate the actions for workflows that are triggered
workflows = set(Workflow.objects.filter(detectorworkflow__detector_id=detector.id).distinct())
triggered_workflows = evaluate_workflow_triggers(workflows, evt, **kwargs)
actions = evaluate_workflow_action_filters(triggered_workflows, evt)
triggered_workflows = evaluate_workflow_triggers(workflows, job)
actions = evaluate_workflow_action_filters(triggered_workflows, job)

with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"):
for action in actions:
action.trigger(evt, detector)
action.trigger(job, detector)

return triggered_workflows
4 changes: 2 additions & 2 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from sentry.types.group import PriorityLevel

if TYPE_CHECKING:
from sentry.eventstore.models import GroupEvent
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models import Action, Detector

T = TypeVar("T")
Expand All @@ -30,7 +30,7 @@ class DetectorPriorityLevel(IntEnum):

class ActionHandler:
@staticmethod
def execute(group_event: GroupEvent, action: Action, detector: Detector) -> None:
def execute(job: PostProcessJob, action: Action, detector: Detector) -> None:
raise NotImplementedError


Expand Down
8 changes: 5 additions & 3 deletions tests/sentry/workflow_engine/models/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sentry.tasks.post_process import PostProcessJob
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest


Expand All @@ -8,21 +9,22 @@ def setUp(self):
)
self.data_condition = self.data_condition_group.conditions.first()
self.group, self.event, self.group_event = self.create_group_event()
self.job = PostProcessJob({"event": self.group_event})

def test_evaluate_trigger_conditions__condition_new_event__True(self):
evaluation = self.workflow.evaluate_trigger_conditions(self.group_event)
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is True

def test_evaluate_trigger_conditions__condition_new_event__False(self):
# Update event to have been seen before
self.group_event.group.times_seen = 5

evaluation = self.workflow.evaluate_trigger_conditions(self.group_event)
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is False

def test_evaluate_trigger_conditions__no_conditions(self):
self.workflow.when_condition_group = None
self.workflow.save()

evaluation = self.workflow.evaluate_trigger_conditions(self.group_event)
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is True
8 changes: 5 additions & 3 deletions tests/sentry/workflow_engine/processors/test_action.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest
Expand All @@ -17,9 +18,10 @@ def setUp(self):
self.group, self.event, self.group_event = self.create_group_event(
occurrence=self.build_occurrence_data(evidence_data={"detector_id": self.detector.id})
)
self.job = PostProcessJob({"event": self.group_event})

def test_basic__no_filter(self):
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event)
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job)
assert set(triggered_actions) == {self.action}

def test_basic__with_filter__passes(self):
Expand All @@ -31,7 +33,7 @@ def test_basic__with_filter__passes(self):
condition_result=True,
)

triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event)
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job)
assert set(triggered_actions) == {self.action}

def test_basic__with_filter__filtered(self):
Expand All @@ -43,5 +45,5 @@ def test_basic__with_filter__filtered(self):
comparison=self.detector.id + 1,
)

triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event)
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job)
assert not triggered_actions
Loading

0 comments on commit 8200d69

Please sign in to comment.