Skip to content

Commit

Permalink
move some shared code around to top level test_base and finish test t…
Browse files Browse the repository at this point in the history
…hat validates process_data_packet
  • Loading branch information
saponifi3d committed Dec 13, 2024
1 parent 67be3f2 commit b9f8d52
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 37 deletions.
2 changes: 2 additions & 0 deletions src/sentry/workflow_engine/models/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class DataSource(DefaultFieldsModel):
__relocation_scope__ = RelocationScope.Organization

organization = FlexibleForeignKey("sentry.Organization")

# Should this be a string so we can support UUID / ints?
query_id = BoundedBigIntegerField()

# TODO - Add a type here
Expand Down
2 changes: 0 additions & 2 deletions src/sentry/workflow_engine/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@
"process_data_packet",
]

from .data_packet import process_data_packet
from .data_source import process_data_sources
from .detector import process_detectors
from .workflow import process_workflows
9 changes: 6 additions & 3 deletions src/sentry/workflow_engine/processors/data_packet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult
from sentry.workflow_engine.models import DataPacket, Detector
from sentry.workflow_engine.processors import process_data_sources, process_detectors
from sentry.workflow_engine.processors.data_source import process_data_sources
from sentry.workflow_engine.processors.detector import process_detectors
from sentry.workflow_engine.types import DetectorGroupKey


Expand All @@ -15,7 +16,9 @@ def process_data_packets(

results: list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]] = []
for data_packet, detectors in processed_sources:
detector_result = process_detectors(data_packet, detectors)
results.append(detector_result)
detector_results = process_detectors(data_packet, detectors)

for detector, detector_state in detector_results:
results.append((detector, detector_state))

return results
7 changes: 4 additions & 3 deletions src/sentry/workflow_engine/processors/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ def process_data_sources(
) -> list[tuple[DataPacket, list[Detector]]]:
metrics.incr("sentry.workflow_engine.process_data_sources", tags={"query_type": query_type})

data_packet_ids = {packet.query_id for packet in data_packets}
# TODO - change data_source.query_id to be a string to support UUIDs
data_packet_ids = {int(packet.query_id) for packet in data_packets}

# Fetch all data sources and associated detectors for the given data packets
with sentry_sdk.start_span(op="sentry.workflow_engine.process_data_sources.fetch_data_sources"):
Expand All @@ -23,12 +24,12 @@ def process_data_sources(
).prefetch_related(Prefetch("detectors"))

# Build a lookup dict for query_id to detectors
query_id_to_detectors = {ds.query_id: list(ds.detectors.all()) for ds in data_sources}
query_id_to_detectors = {int(ds.query_id): list(ds.detectors.all()) for ds in data_sources}

# Create the result tuples
result = []
for packet in data_packets:
detectors = query_id_to_detectors.get(packet.query_id)
detectors = query_id_to_detectors.get(int(packet.query_id))

if detectors:
data_packet_tuple = (packet, detectors)
Expand Down
21 changes: 21 additions & 0 deletions tests/sentry/workflow_engine/processors/test_data_packet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from sentry.workflow_engine.processors.data_packet import process_data_packets
from sentry.workflow_engine.types import DetectorPriorityLevel
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest


class TestProcessDataPacket(BaseWorkflowTest):
def setUp(self):
self.snuba_query = self.create_snuba_query()

(self.workflow, self.detector, self.detector_workflow, self.workflow_triggers) = (
self.create_detector_and_workflow()
)

self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)

def test_single_data_packet(self):
results = process_data_packets([self.data_packet], "snuba_query_subscription")
assert len(results) == 1

detector, detector_evaluation_result = results[0]
assert detector_evaluation_result[None].priority == DetectorPriorityLevel.HIGH
53 changes: 50 additions & 3 deletions tests/sentry/workflow_engine/test_base.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
from datetime import datetime
from datetime import UTC, datetime
from uuid import uuid4

from sentry.eventstore.models import Event, GroupEvent
from sentry.incidents.utils.types import QuerySubscriptionUpdate
from sentry.models.group import Group
from sentry.snuba.models import SnubaQuery
from sentry.testutils.cases import TestCase
from sentry.testutils.factories import EventType
from sentry.workflow_engine.models import (
Action,
DataConditionGroup,
DataPacket,
DataSource,
Detector,
DetectorWorkflow,
Workflow,
)
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.types import DetectorType
from sentry.workflow_engine.types import DetectorPriorityLevel, DetectorType
from tests.sentry.issues.test_utils import OccurrenceTestMixin


Expand Down Expand Up @@ -66,9 +69,13 @@ def create_detector_and_workflow(
self,
name_prefix="test",
workflow_triggers: DataConditionGroup | None = None,
detector_type: DetectorType | str = "TestDetector",
detector_type: DetectorType | str = DetectorType.METRIC_ALERT_FIRE,
**kwargs,
) -> tuple[Workflow, Detector, DetectorWorkflow, DataConditionGroup]:
"""
Create a Worfkllow, Detector, DetectorWorkflow, and DataConditionGroup for testing.
These models are configuerd to be related to each other.
"""
workflow_triggers = workflow_triggers or self.create_data_condition_group()

if not workflow_triggers.conditions.exists():
Expand Down Expand Up @@ -100,6 +107,46 @@ def create_detector_and_workflow(

return workflow, detector, detector_workflow, workflow_triggers

def create_test_query_data_source(self, detector) -> tuple[DataSource, DataPacket]:
"""
Create a DataSource and DataPacket for testing; this will create a fake QuerySubscriptionUpdate and link it to a data_source.
A detector is required to create this test data, so we can ensure that the detector
has a condition to evaluate for the data_packet that evalutes to true.
"""
subscription_update: QuerySubscriptionUpdate = {
"subscription_id": "123",
"values": {"foo": 1},
"timestamp": datetime.now(UTC),
"entity": "test-entity",
}

data_source = self.create_data_source(
query_id=subscription_update["subscription_id"],
organization=self.organization,
)

data_source.detectors.add(detector)

if detector.workflow_condition_group is None:
detector.workflow_condition_group = self.create_data_condition_group(logic_type="any")
detector.save()

self.create_data_condition(
condition_group=detector.workflow_condition_group,
type=Condition.EQUAL,
condition_result=DetectorPriorityLevel.HIGH,
comparison=1,
)

# Create a data_packet from the update for testing
data_packet = DataPacket[QuerySubscriptionUpdate](
query_id=subscription_update["subscription_id"],
packet=subscription_update,
)

return data_source, data_packet

def create_workflow_action(
self,
workflow: Workflow,
Expand Down
28 changes: 2 additions & 26 deletions tests/sentry/workflow_engine/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@

from sentry.eventstream.types import EventStreamEventType
from sentry.incidents.grouptype import MetricAlertFire
from sentry.incidents.utils.types import QuerySubscriptionUpdate
from sentry.issues.grouptype import ErrorGroupType
from sentry.issues.ingest import save_issue_occurrence
from sentry.models.group import Group
from sentry.tasks.post_process import post_process_group
from sentry.testutils.helpers.features import with_feature
from sentry.workflow_engine.models import DataPacket, DataSource
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.processors import process_data_sources, process_detectors
from sentry.workflow_engine.types import DetectorPriorityLevel
Expand Down Expand Up @@ -77,36 +75,14 @@ def call_post_process_group(

return cache_key

def create_test_data_source(self) -> DataSource:
self.subscription_update: QuerySubscriptionUpdate = {
"subscription_id": "123",
"values": {"foo": 1},
"timestamp": datetime.utcnow(),
"entity": "test-entity",
}

self.data_source = self.create_data_source(
query_id=self.subscription_update["subscription_id"],
organization=self.organization,
)
self.data_source.detectors.add(self.detector)

# Create a data_packet from the update for testing
self.data_packet = DataPacket[QuerySubscriptionUpdate](
query_id=self.subscription_update["subscription_id"],
packet=self.subscription_update,
)

return self.data_source


class TestWorkflowEngineIntegrationToIssuePlatform(BaseWorkflowIntegrationTest):
@with_feature("organizations:workflow-engine-metric-alert-processing")
def test_workflow_engine__data_source__to_metric_issue_workflow(self):
"""
This test ensures that a data_source can create the correct event in Issue Platform
"""
self.create_test_data_source()
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)

with mock.patch(
"sentry.workflow_engine.processors.detector.produce_occurrence_to_kafka"
Expand All @@ -121,7 +97,7 @@ def test_workflow_engine__data_source__to_metric_issue_workflow(self):

@with_feature("organizations:workflow-engine-metric-alert-processing")
def test_workflow_engine__data_source__different_type(self):
self.create_test_data_source()
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)

with mock.patch(
"sentry.workflow_engine.processors.detector.produce_occurrence_to_kafka"
Expand Down

0 comments on commit b9f8d52

Please sign in to comment.