Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aci milestone 3): call process_data_sources in subscription processor #81788

Closed
wants to merge 11 commits into from
34 changes: 33 additions & 1 deletion src/sentry/incidents/grouptype.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,47 @@
from dataclasses import dataclass
from typing import Any

from sentry.incidents.endpoints.validators import MetricAlertsDetectorValidator
from sentry.incidents.utils.types import QuerySubscriptionUpdate
from sentry.issues.grouptype import GroupCategory, GroupType
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.ratelimits.sliding_windows import Quota
from sentry.types.group import PriorityLevel
from sentry.workflow_engine.handlers.detector import StatefulDetectorHandler
from sentry.workflow_engine.handlers.detector.base import DetectorEvaluationResult
from sentry.workflow_engine.models.data_source import DataPacket
from sentry.workflow_engine.types import DetectorPriorityLevel


class MetricAlertDetectorHandler(StatefulDetectorHandler[QuerySubscriptionUpdate]):
pass
def evaluate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be overriding the evaluate method from a stateful detector? just looking a little closer at the base class, it looks like this would call through to check the detectors conditions and that should all work from the base if the models are constructed. (lemme double check with dan on some of his intentions on code in this area though)

self, data_packet: DataPacket[QuerySubscriptionUpdate]
) -> dict[str | None, DetectorEvaluationResult]:
return {
"dummy_group": DetectorEvaluationResult(
group_key="dummy_group",
is_active=True,
priority=DetectorPriorityLevel.HIGH,
result=None,
event_data=None,
Comment on lines +26 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice for this to be a little more complete for tests by adding the occurrence and event data,

occurrence, event_data = self.build_occurrence_and_event_data("dummy_group", 0, PriorityLevel.HIGH)

and then update the result / event_data,

Suggested change
result=None,
event_data=None,
result=occurrence,
event_data=event_data,

)
}

def build_occurrence_and_event_data(
self, group_key: str | None, value: int, new_status: PriorityLevel
) -> tuple[IssueOccurrence, dict[str, Any]]:
return super().build_occurrence_and_event_data(group_key, value, new_status)

def counter_names(self):
return []

def get_dedupe_value(self, data_packet: DataPacket[QuerySubscriptionUpdate]) -> int:
return super().get_dedupe_value(data_packet)

def get_group_key_values(
self, data_packet: DataPacket[QuerySubscriptionUpdate]
) -> dict[str, int]:
return super().get_group_key_values(data_packet)


# Example GroupType and detector handler for metric alerts. We don't create these issues yet, but we'll use something
Expand Down
22 changes: 22 additions & 0 deletions src/sentry/incidents/subscription_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
from sentry.snuba.subscriptions import delete_snuba_subscription
from sentry.utils import metrics, redis
from sentry.utils.dates import to_datetime
from sentry.workflow_engine.models import DataPacket
from sentry.workflow_engine.processors.data_source import process_data_sources
from sentry.workflow_engine.processors.detector import process_detectors

logger = logging.getLogger(__name__)
REDIS_TTL = int(timedelta(days=7).total_seconds())
Expand Down Expand Up @@ -358,6 +361,25 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
metrics.incr("incidents.alert_rules.skipping_already_processed_update")
return

if features.has(
"organizations:workflow-engine-m3-dual-write", self.subscription.project.organization
mifu67 marked this conversation as resolved.
Show resolved Hide resolved
):
# NOTE: feed the data through the new pipeline, but don't do anything with it yet.
# This will change at some point.
data_packet = DataPacket(
mifu67 marked this conversation as resolved.
Show resolved Hide resolved
query_id=self.subscription.snuba_query.id, packet=subscription_update
mifu67 marked this conversation as resolved.
Show resolved Hide resolved
)
detectors = process_data_sources([data_packet], query_type=self.subscription.type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 - i was intending the query_type to be more of the DataSource, so this would be like snuba_subscription or something along those lines. maybe we should have those exported from data_sources as an enum and applied to the type field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Colleen is writing an enum for this purpose!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: change query_type to be DataSourceType.SNUBA_QUERY_SUBSCRIPTION once Colleen's PR lands

results = []
for pair in detectors:
data_packet, detectors = pair
mifu67 marked this conversation as resolved.
Show resolved Hide resolved
results.append(process_detectors(data_packet, detectors))
logger.info(
"Results from process detectors",
extra={
"result": results,
},
)
self.last_update = subscription_update["timestamp"]

if (
Expand Down
9 changes: 9 additions & 0 deletions tests/sentry/incidents/test_subscription_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2956,6 +2956,15 @@ def test_resolved_alert_updates_metric_issue(self, mock_produce_occurrence_to_ka
assert status_change.new_status == GroupStatus.RESOLVED
assert occurrence.fingerprint == status_change.fingerprint

@with_feature("organizations:workflow-engine-m3-dual-write")
def test_process_data_sources(self):
rule = self.rule
detector = self.create_detector(name="hojicha", type="metric_alert_fire")
data_source = self.create_data_source(query_id=rule.snuba_query.id, type="incidents")
data_source.detectors.set([detector])
self.send_update(rule, 10)
# TODO: the rest of this


class MetricsCrashRateAlertProcessUpdateTest(ProcessUpdateBaseClass, BaseMetricsTestCase):
@pytest.fixture(autouse=True)
Expand Down
Loading