diff --git a/src/sentry/features/temporary.py b/src/sentry/features/temporary.py index a204d53343bece..3e22db09e24a44 100644 --- a/src/sentry/features/temporary.py +++ b/src/sentry/features/temporary.py @@ -541,6 +541,8 @@ def register_temporary_features(manager: FeatureManager): manager.add("organizations:webhooks-unresolved", OrganizationFeature, FeatureHandlerStrategy.OPTIONS, api_expose=True) # Enable dual writing for metric alert issues (see: alerts create issues) manager.add("organizations:workflow-engine-m3-dual-write", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) + # Enable the new workflow for processing subscription updates (see: alerts create issues) + manager.add("organizations:workflow-engine-m3-process", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) # Enable reading from new ACI tables for metric alert issues (see: alerts create issues) manager.add("organizations:workflow-engine-m3-read", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) # Enable new workflow_engine UI (see: alerts create issues) diff --git a/src/sentry/incidents/grouptype.py b/src/sentry/incidents/grouptype.py index b7078c7c2f7a40..4ec3d21390d1ca 100644 --- a/src/sentry/incidents/grouptype.py +++ b/src/sentry/incidents/grouptype.py @@ -1,15 +1,74 @@ from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any from sentry.incidents.endpoints.validators import MetricAlertsDetectorValidator from sentry.incidents.utils.types import QuerySubscriptionUpdate -from sentry.issues.grouptype import GroupCategory, GroupType +from sentry.issues.grouptype import GroupCategory, GroupType, MetricIssuePOC +from sentry.issues.issue_occurrence import IssueOccurrence from sentry.ratelimits.sliding_windows import Quota from sentry.types.group import PriorityLevel from sentry.workflow_engine.handlers.detector import StatefulDetectorHandler +from sentry.workflow_engine.handlers.detector.base import DetectorEvaluationResult +from sentry.workflow_engine.models.data_source import DataPacket +from sentry.workflow_engine.types import DetectorPriorityLevel class MetricAlertDetectorHandler(StatefulDetectorHandler[QuerySubscriptionUpdate]): - pass + def evaluate( + self, data_packet: DataPacket[QuerySubscriptionUpdate] + ) -> dict[str | None, DetectorEvaluationResult]: + return { + "dummy_group": DetectorEvaluationResult( + group_key="dummy_group", + is_active=True, + priority=DetectorPriorityLevel.HIGH, + result=None, + event_data=None, + ) + } + + def build_occurrence_and_event_data( + self, group_key: str | None, value: int, new_status: PriorityLevel + ) -> tuple[IssueOccurrence, dict[str, Any]]: + # placeholder return for now + occurrence = IssueOccurrence( + id="eb4b0acffadb4d098d48cb14165ab578", + project_id=123, + event_id="43878ab4419f4ab181f6379ac376d5aa", + fingerprint=["abc123"], + issue_title="Some Issue", + subtitle="Some subtitle", + resource_id=None, + evidence_data={}, + evidence_display=[], + type=MetricAlertFire, + detection_time=datetime.now(timezone.utc), + level="error", + culprit="Some culprit", + initial_issue_priority=new_status.value, + ) + event_data = { + "timestamp": occurrence.detection_time, + "project_id": occurrence.project_id, + "event_id": occurrence.event_id, + "platform": "python", + "received": occurrence.detection_time, + "tags": {}, + } + return occurrence, event_data + + @property + def counter_names(self) -> list[str]: + return [] # placeholder return for now + + def get_dedupe_value(self, data_packet: DataPacket[QuerySubscriptionUpdate]) -> int: + return 0 # placeholder return for now + + def get_group_key_values( + self, data_packet: DataPacket[QuerySubscriptionUpdate] + ) -> dict[str, int]: + return {"dummy": 0} # placeholder return for now # Example GroupType and detector handler for metric alerts. We don't create these issues yet, but we'll use something diff --git a/src/sentry/incidents/subscription_processor.py b/src/sentry/incidents/subscription_processor.py index 4654948204437a..15b166fd07ce5e 100644 --- a/src/sentry/incidents/subscription_processor.py +++ b/src/sentry/incidents/subscription_processor.py @@ -57,6 +57,9 @@ from sentry.snuba.subscriptions import delete_snuba_subscription from sentry.utils import metrics, redis from sentry.utils.dates import to_datetime +from sentry.workflow_engine.models import DataPacket +from sentry.workflow_engine.processors.data_source import process_data_sources +from sentry.workflow_engine.processors.detector import process_detectors logger = logging.getLogger(__name__) REDIS_TTL = int(timedelta(days=7).total_seconds()) @@ -358,6 +361,24 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None: metrics.incr("incidents.alert_rules.skipping_already_processed_update") return + if features.has( + "organizations:workflow-engine-m3-process", self.subscription.project.organization + ): + # NOTE: feed the data through the new pipeline, but don't do anything with it yet. + # This will change at some point. + data_packet = DataPacket[QuerySubscriptionUpdate]( + query_id=self.subscription.id, packet=subscription_update + ) + detectors = process_data_sources([data_packet], query_type=self.subscription.type) + results = [] + for data_packet, detectors in detectors: + results.append(process_detectors(data_packet, detectors)) + logger.info( + "Results from process_detectors", + extra={ + "results": results, + }, + ) self.last_update = subscription_update["timestamp"] if ( diff --git a/tests/sentry/incidents/test_subscription_processor.py b/tests/sentry/incidents/test_subscription_processor.py index 24589e0a4692d9..47a6b9892da89b 100644 --- a/tests/sentry/incidents/test_subscription_processor.py +++ b/tests/sentry/incidents/test_subscription_processor.py @@ -2956,6 +2956,15 @@ def test_resolved_alert_updates_metric_issue(self, mock_produce_occurrence_to_ka assert status_change.new_status == GroupStatus.RESOLVED assert occurrence.fingerprint == status_change.fingerprint + @with_feature("organizations:workflow-engine-m3-process") + @mock.patch("sentry.incidents.subscription_processor.logger") + def test_process_data_sources(self, mock_logger): + rule = self.rule + detector = self.create_detector(name="hojicha", type="metric_alert_fire") + data_source = self.create_data_source(query_id=rule.snuba_query.id, type="incidents") + data_source.detectors.set([detector]) + self.send_update(rule, 10) + class MetricsCrashRateAlertProcessUpdateTest(ProcessUpdateBaseClass, BaseMetricsTestCase): @pytest.fixture(autouse=True)