-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(aci milestone 3): call process_data_sources in subscription processor #81788
Changes from 3 commits
7293e82
fdccd9e
a86a300
f0768e7
fa5db78
cda5ebb
e67954a
296c3c4
99d5a0b
fe7c546
e3c7915
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -1,15 +1,47 @@ | ||||||||||
from dataclasses import dataclass | ||||||||||
from typing import Any | ||||||||||
|
||||||||||
from sentry.incidents.endpoints.validators import MetricAlertsDetectorValidator | ||||||||||
from sentry.incidents.utils.types import QuerySubscriptionUpdate | ||||||||||
from sentry.issues.grouptype import GroupCategory, GroupType | ||||||||||
from sentry.issues.issue_occurrence import IssueOccurrence | ||||||||||
from sentry.ratelimits.sliding_windows import Quota | ||||||||||
from sentry.types.group import PriorityLevel | ||||||||||
from sentry.workflow_engine.handlers.detector import StatefulDetectorHandler | ||||||||||
from sentry.workflow_engine.handlers.detector.base import DetectorEvaluationResult | ||||||||||
from sentry.workflow_engine.models.data_source import DataPacket | ||||||||||
from sentry.workflow_engine.types import DetectorPriorityLevel | ||||||||||
|
||||||||||
|
||||||||||
class MetricAlertDetectorHandler(StatefulDetectorHandler[QuerySubscriptionUpdate]): | ||||||||||
pass | ||||||||||
def evaluate( | ||||||||||
self, data_packet: DataPacket[QuerySubscriptionUpdate] | ||||||||||
) -> dict[str | None, DetectorEvaluationResult]: | ||||||||||
return { | ||||||||||
"dummy_group": DetectorEvaluationResult( | ||||||||||
group_key="dummy_group", | ||||||||||
is_active=True, | ||||||||||
priority=DetectorPriorityLevel.HIGH, | ||||||||||
result=None, | ||||||||||
event_data=None, | ||||||||||
Comment on lines
+26
to
+27
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be nice for this to be a little more complete for tests by adding the occurrence and event data, occurrence, event_data = self.build_occurrence_and_event_data("dummy_group", 0, PriorityLevel.HIGH) and then update the result / event_data,
Suggested change
|
||||||||||
) | ||||||||||
} | ||||||||||
|
||||||||||
def build_occurrence_and_event_data( | ||||||||||
self, group_key: str | None, value: int, new_status: PriorityLevel | ||||||||||
) -> tuple[IssueOccurrence, dict[str, Any]]: | ||||||||||
return super().build_occurrence_and_event_data(group_key, value, new_status) | ||||||||||
|
||||||||||
def counter_names(self): | ||||||||||
return [] | ||||||||||
|
||||||||||
def get_dedupe_value(self, data_packet: DataPacket[QuerySubscriptionUpdate]) -> int: | ||||||||||
return super().get_dedupe_value(data_packet) | ||||||||||
|
||||||||||
def get_group_key_values( | ||||||||||
self, data_packet: DataPacket[QuerySubscriptionUpdate] | ||||||||||
) -> dict[str, int]: | ||||||||||
return super().get_group_key_values(data_packet) | ||||||||||
|
||||||||||
|
||||||||||
# Example GroupType and detector handler for metric alerts. We don't create these issues yet, but we'll use something | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,9 @@ | |
from sentry.snuba.subscriptions import delete_snuba_subscription | ||
from sentry.utils import metrics, redis | ||
from sentry.utils.dates import to_datetime | ||
from sentry.workflow_engine.models import DataPacket | ||
from sentry.workflow_engine.processors.data_source import process_data_sources | ||
from sentry.workflow_engine.processors.detector import process_detectors | ||
|
||
logger = logging.getLogger(__name__) | ||
REDIS_TTL = int(timedelta(days=7).total_seconds()) | ||
|
@@ -358,6 +361,25 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None: | |
metrics.incr("incidents.alert_rules.skipping_already_processed_update") | ||
return | ||
|
||
if features.has( | ||
"organizations:workflow-engine-m3-dual-write", self.subscription.project.organization | ||
mifu67 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
# NOTE: feed the data through the new pipeline, but don't do anything with it yet. | ||
# This will change at some point. | ||
data_packet = DataPacket( | ||
mifu67 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
query_id=self.subscription.snuba_query.id, packet=subscription_update | ||
mifu67 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
detectors = process_data_sources([data_packet], query_type=self.subscription.type) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 - i was intending the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Colleen is writing an enum for this purpose! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: change |
||
results = [] | ||
for pair in detectors: | ||
data_packet, detectors = pair | ||
mifu67 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
results.append(process_detectors(data_packet, detectors)) | ||
logger.info( | ||
"Results from process detectors", | ||
extra={ | ||
"result": results, | ||
}, | ||
) | ||
self.last_update = subscription_update["timestamp"] | ||
|
||
if ( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be overriding the
evaluate
method from a stateful detector? just looking a little closer at the base class, it looks like this would call through to check the detectors conditions and that should all work from the base if the models are constructed. (lemme double check with dan on some of his intentions on code in this area though)