Skip to content

Commit

Permalink
ref(aci): pass WorkflowJob into process_workflows (#82489)
Browse files Browse the repository at this point in the history
  • Loading branch information
cathteng authored Dec 20, 2024
1 parent 2ecf5fd commit fe35f16
Show file tree
Hide file tree
Showing 15 changed files with 136 additions and 51 deletions.
14 changes: 12 additions & 2 deletions src/sentry/tasks/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from sentry.utils.sdk import bind_organization_context, set_current_event_project
from sentry.utils.sdk_crashes.sdk_crash_detection_config import build_sdk_crash_detection_configs
from sentry.utils.services import build_instance_from_options_of_type
from sentry.workflow_engine.types import WorkflowJob

if TYPE_CHECKING:
from sentry.eventstore.models import Event, GroupEvent
Expand Down Expand Up @@ -1002,10 +1003,19 @@ def process_workflow_engine(job: PostProcessJob) -> None:
# If the flag is enabled, use the code below
from sentry.workflow_engine.processors.workflow import process_workflows

evt = job["event"]
# PostProcessJob event is optional, WorkflowJob event is required
if "event" not in job:
logger.error("Missing event to create WorkflowJob", extra={"job": job})
return

try:
workflow_job = WorkflowJob({**job}) # type: ignore[typeddict-item]
except Exception:
logger.exception("Could not create WorkflowJob", extra={"job": job})
return

with sentry_sdk.start_span(op="tasks.post_process_group.workflow_engine.process_workflow"):
process_workflows(evt)
process_workflows(workflow_job)


def process_rules(job: PostProcessJob) -> None:
Expand Down
5 changes: 2 additions & 3 deletions src/sentry/workflow_engine/handlers/action/notification.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from sentry.eventstore.models import GroupEvent
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.types import ActionHandler
from sentry.workflow_engine.types import ActionHandler, WorkflowJob


# TODO - Enable once the PR to allow for multiple of the same funcs is merged
Expand All @@ -10,7 +9,7 @@
class NotificationActionHandler(ActionHandler):
@staticmethod
def execute(
evt: GroupEvent,
job: WorkflowJob,
action: Action,
detector: Detector,
) -> None:
Expand Down
3 changes: 3 additions & 0 deletions src/sentry/workflow_engine/handlers/condition/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
__all__ = [
"EventCreatedByDetectorConditionHandler",
"EventSeenCountConditionHandler",
"ReappearedEventConditionHandler",
"RegressedEventConditionHandler",
]

from .group_event_handlers import (
EventCreatedByDetectorConditionHandler,
EventSeenCountConditionHandler,
)
from .group_state_handlers import ReappearedEventConditionHandler, RegressedEventConditionHandler
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
from typing import Any

from sentry.eventstore.models import GroupEvent
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.registry import condition_handler_registry
from sentry.workflow_engine.types import DataConditionHandler
from sentry.workflow_engine.types import DataConditionHandler, WorkflowJob


@condition_handler_registry.register(Condition.EVENT_CREATED_BY_DETECTOR)
class EventCreatedByDetectorConditionHandler(DataConditionHandler[GroupEvent]):
class EventCreatedByDetectorConditionHandler(DataConditionHandler[WorkflowJob]):
@staticmethod
def evaluate_value(event: GroupEvent, comparison: Any) -> bool:
def evaluate_value(job: WorkflowJob, comparison: Any) -> bool:
event = job["event"]
if event.occurrence is None or event.occurrence.evidence_data is None:
return False

return event.occurrence.evidence_data.get("detector_id", None) == comparison


@condition_handler_registry.register(Condition.EVENT_SEEN_COUNT)
class EventSeenCountConditionHandler(DataConditionHandler[GroupEvent]):
class EventSeenCountConditionHandler(DataConditionHandler[WorkflowJob]):
@staticmethod
def evaluate_value(event: GroupEvent, comparison: Any) -> bool:
def evaluate_value(job: WorkflowJob, comparison: Any) -> bool:
event = job["event"]
return event.group.times_seen == comparison
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Any

from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.registry import condition_handler_registry
from sentry.workflow_engine.types import DataConditionHandler, WorkflowJob


@condition_handler_registry.register(Condition.REGRESSED_EVENT)
class RegressedEventConditionHandler(DataConditionHandler[WorkflowJob]):
@staticmethod
def evaluate_value(job: WorkflowJob, comparison: Any) -> bool:
state = job.get("group_state", None)
if state is None:
return False

return state["is_regression"] == comparison


@condition_handler_registry.register(Condition.REAPPEARED_EVENT)
class ReappearedEventConditionHandler(DataConditionHandler[WorkflowJob]):
@staticmethod
def evaluate_value(job: WorkflowJob, comparison: Any) -> bool:
has_reappeared = job.get("has_reappeared", None)
if has_reappeared is None:
return False

return has_reappeared == comparison
7 changes: 3 additions & 4 deletions src/sentry/workflow_engine/models/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.eventstore.models import GroupEvent
from sentry.notifications.models.notificationaction import ActionTarget
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.types import ActionHandler
from sentry.workflow_engine.types import ActionHandler, WorkflowJob

if TYPE_CHECKING:
from sentry.workflow_engine.models import Detector
Expand Down Expand Up @@ -72,7 +71,7 @@ def get_handler(self) -> ActionHandler:
action_type = Action.Type(self.type)
return action_handler_registry.get(action_type)

def trigger(self, evt: GroupEvent, detector: Detector) -> None:
def trigger(self, job: WorkflowJob, detector: Detector) -> None:
# get the handler for the action type
handler = self.get_handler()
handler.execute(evt, self, detector)
handler.execute(job, self, detector)
2 changes: 2 additions & 0 deletions src/sentry/workflow_engine/models/data_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class Condition(models.TextChoices):
NOT_EQUAL = "ne"
EVENT_CREATED_BY_DETECTOR = "event_created_by_detector"
EVENT_SEEN_COUNT = "event_seen_count"
REGRESSED_EVENT = "regressed_event"
REAPPEARED_EVENT = "reappeared_event"


condition_ops = {
Expand Down
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.eventstore.models import GroupEvent
from sentry.models.owner_base import OwnerModel
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.types import WorkflowJob

from .json_config import JSONConfigBase

Expand Down Expand Up @@ -53,15 +53,15 @@ class Meta:
)
]

def evaluate_trigger_conditions(self, evt: GroupEvent) -> bool:
def evaluate_trigger_conditions(self, job: WorkflowJob) -> bool:
"""
Evaluate the conditions for the workflow trigger and return if the evaluation was successful.
If there aren't any workflow trigger conditions, the workflow is considered triggered.
"""
if self.when_condition_group is None:
return True

evaluation, _ = evaluate_condition_group(self.when_condition_group, evt)
evaluation, _ = evaluate_condition_group(self.when_condition_group, job)
return evaluation


Expand Down
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/processors/action.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from sentry.db.models.manager.base_query_set import BaseQuerySet
from sentry.eventstore.models import GroupEvent
from sentry.workflow_engine.models import Action, DataConditionGroup, Workflow
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.types import WorkflowJob


def evaluate_workflow_action_filters(
workflows: set[Workflow], evt: GroupEvent
workflows: set[Workflow], job: WorkflowJob
) -> BaseQuerySet[Action]:
filtered_action_groups: set[DataConditionGroup] = set()

Expand All @@ -17,7 +17,7 @@ def evaluate_workflow_action_filters(
).distinct()

for action_condition in action_conditions:
evaluation, result = evaluate_condition_group(action_condition, evt)
evaluation, result = evaluate_condition_group(action_condition, job)

if evaluation:
filtered_action_groups.add(action_condition)
Expand Down
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/processors/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

import logging

from sentry.eventstore.models import GroupEvent
from sentry.issues.grouptype import ErrorGroupType
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult
from sentry.workflow_engine.models import DataPacket, Detector
from sentry.workflow_engine.types import DetectorGroupKey
from sentry.workflow_engine.types import DetectorGroupKey, WorkflowJob

logger = logging.getLogger(__name__)


# TODO - cache these by evt.group_id? :thinking:
def get_detector_by_event(evt: GroupEvent) -> Detector:
def get_detector_by_event(job: WorkflowJob) -> Detector:
evt = job["event"]
issue_occurrence = evt.occurrence

if issue_occurrence is None:
Expand Down
18 changes: 9 additions & 9 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@

import sentry_sdk

from sentry.eventstore.models import GroupEvent
from sentry.utils import metrics
from sentry.workflow_engine.models import Detector, Workflow
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from sentry.workflow_engine.processors.detector import get_detector_by_event
from sentry.workflow_engine.types import WorkflowJob

logger = logging.getLogger(__name__)


def evaluate_workflow_triggers(workflows: set[Workflow], evt: GroupEvent) -> set[Workflow]:
def evaluate_workflow_triggers(workflows: set[Workflow], job: WorkflowJob) -> set[Workflow]:
triggered_workflows: set[Workflow] = set()

for workflow in workflows:
if workflow.evaluate_trigger_conditions(evt):
if workflow.evaluate_trigger_conditions(job):
triggered_workflows.add(workflow)

return triggered_workflows


def process_workflows(evt: GroupEvent) -> set[Workflow]:
def process_workflows(job: WorkflowJob) -> set[Workflow]:
"""
This method will get the detector based on the event, and then gather the associated workflows.
Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met,
Expand All @@ -31,19 +31,19 @@ def process_workflows(evt: GroupEvent) -> set[Workflow]:
"""
# Check to see if the GroupEvent has an issue occurrence
try:
detector = get_detector_by_event(evt)
detector = get_detector_by_event(job)
except Detector.DoesNotExist:
metrics.incr("workflow_engine.process_workflows.error")
logger.exception("Detector not found for event", extra={"event_id": evt.event_id})
logger.exception("Detector not found for event", extra={"event_id": job["event"].event_id})
return set()

# Get the workflows, evaluate the when_condition_group, finally evaluate the actions for workflows that are triggered
workflows = set(Workflow.objects.filter(detectorworkflow__detector_id=detector.id).distinct())
triggered_workflows = evaluate_workflow_triggers(workflows, evt)
actions = evaluate_workflow_action_filters(triggered_workflows, evt)
triggered_workflows = evaluate_workflow_triggers(workflows, job)
actions = evaluate_workflow_action_filters(triggered_workflows, job)

with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"):
for action in actions:
action.trigger(evt, detector)
action.trigger(job, detector)

return triggered_workflows
17 changes: 15 additions & 2 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from __future__ import annotations

from enum import IntEnum
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from typing import TYPE_CHECKING, Any, Generic, TypedDict, TypeVar

from sentry.types.group import PriorityLevel

if TYPE_CHECKING:
from sentry.eventstore.models import GroupEvent
from sentry.eventstream.base import GroupState
from sentry.workflow_engine.models import Action, Detector

T = TypeVar("T")
Expand All @@ -28,9 +29,21 @@ class DetectorPriorityLevel(IntEnum):
ProcessedDataConditionResult = tuple[bool, list[DataConditionResult]]


class EventJob(TypedDict):
event: GroupEvent


class WorkflowJob(EventJob, total=False):
group_state: GroupState
is_reprocessed: bool
has_reappeared: bool
has_alert: bool
has_escalated: bool


class ActionHandler:
@staticmethod
def execute(group_event: GroupEvent, action: Action, detector: Detector) -> None:
def execute(job: WorkflowJob, action: Action, detector: Detector) -> None:
raise NotImplementedError


Expand Down
8 changes: 5 additions & 3 deletions tests/sentry/workflow_engine/models/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sentry.workflow_engine.types import WorkflowJob
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest


Expand All @@ -8,21 +9,22 @@ def setUp(self):
)
self.data_condition = self.data_condition_group.conditions.first()
self.group, self.event, self.group_event = self.create_group_event()
self.job = WorkflowJob({"event": self.group_event})

def test_evaluate_trigger_conditions__condition_new_event__True(self):
evaluation = self.workflow.evaluate_trigger_conditions(self.group_event)
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is True

def test_evaluate_trigger_conditions__condition_new_event__False(self):
# Update event to have been seen before
self.group_event.group.times_seen = 5

evaluation = self.workflow.evaluate_trigger_conditions(self.group_event)
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is False

def test_evaluate_trigger_conditions__no_conditions(self):
self.workflow.when_condition_group = None
self.workflow.save()

evaluation = self.workflow.evaluate_trigger_conditions(self.group_event)
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is True
8 changes: 5 additions & 3 deletions tests/sentry/workflow_engine/processors/test_action.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from sentry.workflow_engine.types import WorkflowJob
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest


Expand All @@ -17,9 +18,10 @@ def setUp(self):
self.group, self.event, self.group_event = self.create_group_event(
occurrence=self.build_occurrence(evidence_data={"detector_id": self.detector.id})
)
self.job = WorkflowJob({"event": self.group_event})

def test_basic__no_filter(self):
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event)
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job)
assert set(triggered_actions) == {self.action}

def test_basic__with_filter__passes(self):
Expand All @@ -30,7 +32,7 @@ def test_basic__with_filter__passes(self):
condition_result=True,
)

triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event)
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job)
assert set(triggered_actions) == {self.action}

def test_basic__with_filter__filtered(self):
Expand All @@ -41,5 +43,5 @@ def test_basic__with_filter__filtered(self):
comparison=self.detector.id + 1,
)

triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event)
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job)
assert not triggered_actions
Loading

0 comments on commit fe35f16

Please sign in to comment.