Skip to content

Commit

Permalink
feat(workflow_engine): Add process_data_packets method (#82002)
Browse files Browse the repository at this point in the history
## Description
Adding a wrapper method, `process_data_packets` to merge the
process_data_sources and process_detectors.

Still thinking through higher level abstractions to simplify using the
workflow_engine, but this should simplify integrations a little further
for now.
  • Loading branch information
saponifi3d authored and evanh committed Dec 17, 2024
1 parent f6caae2 commit 0142957
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 32 deletions.
4 changes: 4 additions & 0 deletions src/sentry/workflow_engine/models/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ class DataSource(DefaultFieldsModel):
__relocation_scope__ = RelocationScope.Organization

organization = FlexibleForeignKey("sentry.Organization")

# Should this be a string so we can support UUID / ints?
query_id = BoundedBigIntegerField()

# TODO - Add a type here
type = models.TextField()

detectors = models.ManyToManyField("workflow_engine.Detector", through=DataSourceDetector)
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/workflow_engine/processors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
__all__ = [
"process_data_sources",
"process_detectors",
"process_workflows",
"process_data_packet",
]

from .data_source import process_data_sources
Expand Down
24 changes: 24 additions & 0 deletions src/sentry/workflow_engine/processors/data_packet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult
from sentry.workflow_engine.models import DataPacket, Detector
from sentry.workflow_engine.processors.data_source import process_data_sources
from sentry.workflow_engine.processors.detector import process_detectors
from sentry.workflow_engine.types import DetectorGroupKey


def process_data_packets(
data_packets: list[DataPacket], query_type: str
) -> list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]]:
"""
This method ties the two main pre-processing methods together to process
the incoming data and create issue occurrences.
"""
processed_sources = process_data_sources(data_packets, query_type)

results: list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]] = []
for data_packet, detectors in processed_sources:
detector_results = process_detectors(data_packet, detectors)

for detector, detector_state in detector_results:
results.append((detector, detector_state))

return results
1 change: 1 addition & 0 deletions src/sentry/workflow_engine/processors/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def process_data_sources(
) -> list[tuple[DataPacket, list[Detector]]]:
metrics.incr("sentry.workflow_engine.process_data_sources", tags={"query_type": query_type})

# TODO - change data_source.query_id to be a string to support UUIDs
data_packet_ids = {int(packet.query_id) for packet in data_packets}

# Fetch all data sources and associated detectors for the given data packets
Expand Down
21 changes: 21 additions & 0 deletions tests/sentry/workflow_engine/processors/test_data_packet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from sentry.workflow_engine.processors.data_packet import process_data_packets
from sentry.workflow_engine.types import DetectorPriorityLevel
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest


class TestProcessDataPacket(BaseWorkflowTest):
def setUp(self):
self.snuba_query = self.create_snuba_query()

(self.workflow, self.detector, self.detector_workflow, self.workflow_triggers) = (
self.create_detector_and_workflow()
)

self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)

def test_single_data_packet(self):
results = process_data_packets([self.data_packet], "snuba_query_subscription")
assert len(results) == 1

detector, detector_evaluation_result = results[0]
assert detector_evaluation_result[None].priority == DetectorPriorityLevel.HIGH
5 changes: 3 additions & 2 deletions tests/sentry/workflow_engine/processors/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from unittest import mock

from sentry.incidents.grouptype import MetricAlertFire
from sentry.issues.grouptype import ErrorGroupType
from sentry.workflow_engine.models import DataConditionGroup
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.processors.workflow import evaluate_workflow_triggers, process_workflows
Expand All @@ -14,12 +14,13 @@ def setUp(self):
self.detector,
self.detector_workflow,
self.workflow_triggers,
) = self.create_detector_and_workflow(detector_type=MetricAlertFire.slug)
) = self.create_detector_and_workflow()

self.error_workflow, self.error_detector, self.detector_workflow_error, _ = (
self.create_detector_and_workflow(
name_prefix="error",
workflow_triggers=self.create_data_condition_group(),
detector_type=ErrorGroupType.slug,
)
)

Expand Down
54 changes: 51 additions & 3 deletions tests/sentry/workflow_engine/test_base.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
from datetime import datetime
from datetime import UTC, datetime
from uuid import uuid4

from sentry.eventstore.models import Event, GroupEvent
from sentry.issues.grouptype import ErrorGroupType
from sentry.incidents.grouptype import MetricAlertFire
from sentry.incidents.utils.types import QuerySubscriptionUpdate
from sentry.models.group import Group
from sentry.snuba.models import SnubaQuery
from sentry.testutils.cases import TestCase
from sentry.testutils.factories import EventType
from sentry.workflow_engine.models import (
Action,
DataConditionGroup,
DataPacket,
DataSource,
Detector,
DetectorWorkflow,
Workflow,
)
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.types import DetectorPriorityLevel
from tests.sentry.issues.test_utils import OccurrenceTestMixin


Expand Down Expand Up @@ -66,9 +70,13 @@ def create_detector_and_workflow(
self,
name_prefix="test",
workflow_triggers: DataConditionGroup | None = None,
detector_type: str = ErrorGroupType.slug,
detector_type: str = MetricAlertFire.slug,
**kwargs,
) -> tuple[Workflow, Detector, DetectorWorkflow, DataConditionGroup]:
"""
Create a Worfkllow, Detector, DetectorWorkflow, and DataConditionGroup for testing.
These models are configured to work together to test the workflow engine.
"""
workflow_triggers = workflow_triggers or self.create_data_condition_group()

if not workflow_triggers.conditions.exists():
Expand Down Expand Up @@ -100,6 +108,46 @@ def create_detector_and_workflow(

return workflow, detector, detector_workflow, workflow_triggers

def create_test_query_data_source(self, detector) -> tuple[DataSource, DataPacket]:
"""
Create a DataSource and DataPacket for testing; this will create a fake QuerySubscriptionUpdate and link it to a data_source.
A detector is required to create this test data, so we can ensure that the detector
has a condition to evaluate for the data_packet that evalutes to true.
"""
subscription_update: QuerySubscriptionUpdate = {
"subscription_id": "123",
"values": {"foo": 1},
"timestamp": datetime.now(UTC),
"entity": "test-entity",
}

data_source = self.create_data_source(
query_id=subscription_update["subscription_id"],
organization=self.organization,
)

data_source.detectors.add(detector)

if detector.workflow_condition_group is None:
detector.workflow_condition_group = self.create_data_condition_group(logic_type="any")
detector.save()

self.create_data_condition(
condition_group=detector.workflow_condition_group,
type=Condition.EQUAL,
condition_result=DetectorPriorityLevel.HIGH,
comparison=1,
)

# Create a data_packet from the update for testing
data_packet = DataPacket[QuerySubscriptionUpdate](
query_id=subscription_update["subscription_id"],
packet=subscription_update,
)

return data_source, data_packet

def create_workflow_action(
self,
workflow: Workflow,
Expand Down
30 changes: 3 additions & 27 deletions tests/sentry/workflow_engine/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@

from sentry.eventstream.types import EventStreamEventType
from sentry.incidents.grouptype import MetricAlertFire
from sentry.incidents.utils.types import QuerySubscriptionUpdate
from sentry.issues.grouptype import ErrorGroupType
from sentry.issues.ingest import save_issue_occurrence
from sentry.models.group import Group
from sentry.tasks.post_process import post_process_group
from sentry.testutils.helpers.features import with_feature
from sentry.workflow_engine.models import DataPacket, DataSource
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.processors import process_data_sources, process_detectors
from sentry.workflow_engine.types import DetectorPriorityLevel
Expand Down Expand Up @@ -77,36 +75,14 @@ def call_post_process_group(

return cache_key

def create_test_data_source(self) -> DataSource:
self.subscription_update: QuerySubscriptionUpdate = {
"subscription_id": "123",
"values": {"foo": 1},
"timestamp": datetime.utcnow(),
"entity": "test-entity",
}

self.data_source = self.create_data_source(
query_id=self.subscription_update["subscription_id"],
organization=self.organization,
)
self.data_source.detectors.add(self.detector)

# Create a data_packet from the update for testing
self.data_packet = DataPacket[QuerySubscriptionUpdate](
query_id=self.subscription_update["subscription_id"],
packet=self.subscription_update,
)

return self.data_source


class TestWorkflowEngineIntegrationToIssuePlatform(BaseWorkflowIntegrationTest):
@with_feature("organizations:workflow-engine-metric-alert-processing")
def test_workflow_engine__data_source__to_metric_issue_workflow(self):
"""
This test ensures that a data_source can create the correct event in Issue Platform
"""
self.create_test_data_source()
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)

with mock.patch(
"sentry.workflow_engine.processors.detector.produce_occurrence_to_kafka"
Expand All @@ -121,7 +97,7 @@ def test_workflow_engine__data_source__to_metric_issue_workflow(self):

@with_feature("organizations:workflow-engine-metric-alert-processing")
def test_workflow_engine__data_source__different_type(self):
self.create_test_data_source()
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)

with mock.patch(
"sentry.workflow_engine.processors.detector.produce_occurrence_to_kafka"
Expand All @@ -134,7 +110,7 @@ def test_workflow_engine__data_source__different_type(self):

@with_feature("organizations:workflow-engine-metric-alert-processing")
def test_workflow_engine__data_source__no_detectors(self):
self.create_test_data_source()
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)
self.detector.delete()

with mock.patch(
Expand Down

0 comments on commit 0142957

Please sign in to comment.