Skip to content

Commit

Permalink
add WorkflowJob type that requires GroupEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
cathteng committed Dec 20, 2024
1 parent 74109d7 commit cbce3fc
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 36 deletions.
5 changes: 2 additions & 3 deletions src/sentry/workflow_engine/handlers/action/notification.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.types import ActionHandler
from sentry.workflow_engine.types import ActionHandler, WorkflowJob


@action_handler_registry.register(Action.Type.NOTIFICATION)
class NotificationActionHandler(ActionHandler):
@staticmethod
def execute(
job: PostProcessJob,
job: WorkflowJob,
action: Action,
detector: Detector,
) -> None:
Expand Down
7 changes: 3 additions & 4 deletions src/sentry/workflow_engine/handlers/condition/group_event.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import Any

from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.registry import condition_handler_registry
from sentry.workflow_engine.types import DataConditionHandler
from sentry.workflow_engine.types import DataConditionHandler, WorkflowJob


def get_nested_value(data: Any, path: str, default: Any = None) -> Any | None:
Expand All @@ -22,8 +21,8 @@ def get_nested_value(data: Any, path: str, default: Any = None) -> Any | None:


@condition_handler_registry.register(Condition.GROUP_EVENT_ATTR_COMPARISON)
class GroupEventConditionHandler(DataConditionHandler[PostProcessJob]):
class GroupEventConditionHandler(DataConditionHandler[WorkflowJob]):
@staticmethod
def evaluate_value(data: PostProcessJob, comparison: Any, data_filter: str) -> bool:
def evaluate_value(data: WorkflowJob, comparison: Any, data_filter: str) -> bool:
event_value = get_nested_value(data, data_filter)
return event_value == comparison
5 changes: 2 additions & 3 deletions src/sentry/workflow_engine/models/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
from sentry.db.models import DefaultFieldsModel, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.notifications.models.notificationaction import ActionTarget
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.types import ActionHandler
from sentry.workflow_engine.types import ActionHandler, WorkflowJob

if TYPE_CHECKING:
from sentry.workflow_engine.models import Detector
Expand Down Expand Up @@ -59,7 +58,7 @@ def get_handler(self) -> ActionHandler:
action_type = Action.Type(self.type)
return action_handler_registry.get(action_type)

def trigger(self, job: PostProcessJob, detector: Detector) -> None:
def trigger(self, job: WorkflowJob, detector: Detector) -> None:
# get the handler for the action type
handler = self.get_handler()
handler.execute(job, self, detector)
4 changes: 2 additions & 2 deletions src/sentry/workflow_engine/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.models.owner_base import OwnerModel
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.types import WorkflowJob

from .json_config import JSONConfigBase

Expand Down Expand Up @@ -53,7 +53,7 @@ class Meta:
)
]

def evaluate_trigger_conditions(self, job: PostProcessJob) -> bool:
def evaluate_trigger_conditions(self, job: WorkflowJob) -> bool:
"""
Evaluate the conditions for the workflow trigger and return if the evaluation was successful.
If there aren't any workflow trigger conditions, the workflow is considered triggered.
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/workflow_engine/processors/action.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from sentry.db.models.manager.base_query_set import BaseQuerySet
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models import Action, DataConditionGroup, Workflow
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.types import WorkflowJob


def evaluate_workflow_action_filters(
workflows: set[Workflow], job: PostProcessJob
workflows: set[Workflow], job: WorkflowJob
) -> BaseQuerySet[Action]:
filtered_action_groups: set[DataConditionGroup] = set()

Expand Down
12 changes: 5 additions & 7 deletions src/sentry/workflow_engine/processors/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@
from sentry.issues.grouptype import ErrorGroupType
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult
from sentry.workflow_engine.models import DataPacket, Detector
from sentry.workflow_engine.types import DetectorGroupKey
from sentry.workflow_engine.types import DetectorGroupKey, WorkflowJob

logger = logging.getLogger(__name__)


# TODO - cache these by evt.group_id? :thinking:
def get_detector_by_event(job: PostProcessJob) -> Detector:
issue_occurrence = job["event"].occurrence
def get_detector_by_event(job: WorkflowJob) -> Detector:
event = job["event"]
issue_occurrence = event.occurrence

if issue_occurrence is None:
detector = Detector.objects.get(
project_id=job["event"].project_id, type=ErrorGroupType.slug
)
detector = Detector.objects.get(project_id=event.project_id, type=ErrorGroupType.slug)
else:
detector = Detector.objects.get(id=issue_occurrence.evidence_data.get("detector_id", None))

Expand Down
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

import sentry_sdk

from sentry.tasks.post_process import PostProcessJob
from sentry.utils import metrics
from sentry.workflow_engine.models import Detector, Workflow
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from sentry.workflow_engine.processors.detector import get_detector_by_event
from sentry.workflow_engine.types import WorkflowJob

logger = logging.getLogger(__name__)


def evaluate_workflow_triggers(workflows: set[Workflow], job: PostProcessJob) -> set[Workflow]:
def evaluate_workflow_triggers(workflows: set[Workflow], job: WorkflowJob) -> set[Workflow]:
triggered_workflows: set[Workflow] = set()

for workflow in workflows:
Expand All @@ -21,7 +21,7 @@ def evaluate_workflow_triggers(workflows: set[Workflow], job: PostProcessJob) ->
return triggered_workflows


def process_workflows(job: PostProcessJob) -> set[Workflow]:
def process_workflows(job: WorkflowJob) -> set[Workflow]:
"""
This method will get the detector based on the event, and then gather the associated workflows.
Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met,
Expand Down
19 changes: 16 additions & 3 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from __future__ import annotations

from enum import IntEnum
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from typing import TYPE_CHECKING, Any, Generic, TypedDict, TypeVar

from sentry.eventstore.models import GroupEvent
from sentry.types.group import PriorityLevel

if TYPE_CHECKING:
from sentry.tasks.post_process import PostProcessJob
from sentry.eventstream.base import GroupState
from sentry.workflow_engine.models import Action, Detector

T = TypeVar("T")
Expand All @@ -28,9 +29,21 @@ class DetectorPriorityLevel(IntEnum):
ProcessedDataConditionResult = tuple[bool, list[DataConditionResult]]


class EventJob(TypedDict):
event: GroupEvent


class WorkflowJob(EventJob, total=False):
group_state: GroupState
is_reprocessed: bool
has_reappeared: bool
has_alert: bool
has_escalated: bool


class ActionHandler:
@staticmethod
def execute(job: PostProcessJob, action: Action, detector: Detector) -> None:
def execute(job: WorkflowJob, action: Action, detector: Detector) -> None:
raise NotImplementedError


Expand Down
4 changes: 2 additions & 2 deletions tests/sentry/workflow_engine/models/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.types import WorkflowJob
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest


Expand All @@ -9,7 +9,7 @@ def setUp(self):
)
self.data_condition = self.data_condition_group.conditions.first()
self.group, self.event, self.group_event = self.create_group_event()
self.job = PostProcessJob({"event": self.group_event})
self.job = WorkflowJob({"event": self.group_event})

def test_evaluate_trigger_conditions__condition_new_event__True(self):
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
Expand Down
4 changes: 2 additions & 2 deletions tests/sentry/workflow_engine/processors/test_action.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from sentry.workflow_engine.types import WorkflowJob
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest


Expand All @@ -18,7 +18,7 @@ def setUp(self):
self.group, self.event, self.group_event = self.create_group_event(
occurrence=self.build_occurrence_data(evidence_data={"detector_id": self.detector.id})
)
self.job = PostProcessJob({"event": self.group_event})
self.job = WorkflowJob({"event": self.group_event})

def test_basic__no_filter(self):
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job)
Expand Down
10 changes: 5 additions & 5 deletions tests/sentry/workflow_engine/processors/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

from sentry.eventstream.base import GroupState
from sentry.issues.grouptype import ErrorGroupType
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models import DataConditionGroup
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.processors.workflow import evaluate_workflow_triggers, process_workflows
from sentry.workflow_engine.types import WorkflowJob
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest


Expand All @@ -27,7 +27,7 @@ def setUp(self):
)

self.group, self.event, self.group_event = self.create_group_event()
self.job = PostProcessJob(
self.job = WorkflowJob(
{
"event": self.group_event,
"group_state": GroupState(
Expand Down Expand Up @@ -62,14 +62,14 @@ def test_error_event_with_kwargs(self):
def test_issue_occurrence_event(self):
issue_occurrence = self.build_occurrence(evidence_data={"detector_id": self.detector.id})
self.group_event.occurrence = issue_occurrence
self.job = PostProcessJob({"event": self.group_event})
self.job = WorkflowJob({"event": self.group_event})

triggered_workflows = process_workflows(self.job)
assert triggered_workflows == {self.workflow}

def test_no_detector(self):
self.group_event.occurrence = self.build_occurrence(evidence_data={})
self.job = PostProcessJob({"event": self.group_event})
self.job = WorkflowJob({"event": self.group_event})

with mock.patch("sentry.workflow_engine.processors.workflow.logger") as mock_logger:
with mock.patch("sentry.workflow_engine.processors.workflow.metrics") as mock_metrics:
Expand Down Expand Up @@ -97,7 +97,7 @@ def setUp(self):
self.group, self.event, self.group_event = self.create_group_event(
occurrence=occurrence,
)
self.job = PostProcessJob({"event": self.group_event})
self.job = WorkflowJob({"event": self.group_event})

def test_workflow_trigger(self):
triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job)
Expand Down

0 comments on commit cbce3fc

Please sign in to comment.