-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(aci milestone 3): call process_data_sources in subscription processor #81788
Changes from all commits
7293e82
fdccd9e
a86a300
f0768e7
fa5db78
cda5ebb
e67954a
296c3c4
99d5a0b
fe7c546
e3c7915
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -1,15 +1,74 @@ | ||||||||||
from dataclasses import dataclass | ||||||||||
from datetime import datetime, timezone | ||||||||||
from typing import Any | ||||||||||
|
||||||||||
from sentry.incidents.endpoints.validators import MetricAlertsDetectorValidator | ||||||||||
from sentry.incidents.utils.types import QuerySubscriptionUpdate | ||||||||||
from sentry.issues.grouptype import GroupCategory, GroupType | ||||||||||
from sentry.issues.grouptype import GroupCategory, GroupType, MetricIssuePOC | ||||||||||
from sentry.issues.issue_occurrence import IssueOccurrence | ||||||||||
from sentry.ratelimits.sliding_windows import Quota | ||||||||||
from sentry.types.group import PriorityLevel | ||||||||||
from sentry.workflow_engine.handlers.detector import StatefulDetectorHandler | ||||||||||
from sentry.workflow_engine.handlers.detector.base import DetectorEvaluationResult | ||||||||||
from sentry.workflow_engine.models.data_source import DataPacket | ||||||||||
from sentry.workflow_engine.types import DetectorPriorityLevel | ||||||||||
|
||||||||||
|
||||||||||
class MetricAlertDetectorHandler(StatefulDetectorHandler[QuerySubscriptionUpdate]): | ||||||||||
pass | ||||||||||
def evaluate( | ||||||||||
self, data_packet: DataPacket[QuerySubscriptionUpdate] | ||||||||||
) -> dict[str | None, DetectorEvaluationResult]: | ||||||||||
return { | ||||||||||
"dummy_group": DetectorEvaluationResult( | ||||||||||
group_key="dummy_group", | ||||||||||
is_active=True, | ||||||||||
priority=DetectorPriorityLevel.HIGH, | ||||||||||
result=None, | ||||||||||
event_data=None, | ||||||||||
Comment on lines
+26
to
+27
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be nice for this to be a little more complete for tests by adding the occurrence and event data, occurrence, event_data = self.build_occurrence_and_event_data("dummy_group", 0, PriorityLevel.HIGH) and then update the result / event_data,
Suggested change
|
||||||||||
) | ||||||||||
} | ||||||||||
|
||||||||||
def build_occurrence_and_event_data( | ||||||||||
self, group_key: str | None, value: int, new_status: PriorityLevel | ||||||||||
) -> tuple[IssueOccurrence, dict[str, Any]]: | ||||||||||
# placeholder return for now | ||||||||||
occurrence = IssueOccurrence( | ||||||||||
id="eb4b0acffadb4d098d48cb14165ab578", | ||||||||||
project_id=123, | ||||||||||
event_id="43878ab4419f4ab181f6379ac376d5aa", | ||||||||||
fingerprint=["abc123"], | ||||||||||
issue_title="Some Issue", | ||||||||||
subtitle="Some subtitle", | ||||||||||
resource_id=None, | ||||||||||
evidence_data={}, | ||||||||||
evidence_display=[], | ||||||||||
type=MetricAlertFire, | ||||||||||
detection_time=datetime.now(timezone.utc), | ||||||||||
level="error", | ||||||||||
culprit="Some culprit", | ||||||||||
initial_issue_priority=new_status.value, | ||||||||||
) | ||||||||||
event_data = { | ||||||||||
"timestamp": occurrence.detection_time, | ||||||||||
"project_id": occurrence.project_id, | ||||||||||
"event_id": occurrence.event_id, | ||||||||||
"platform": "python", | ||||||||||
"received": occurrence.detection_time, | ||||||||||
"tags": {}, | ||||||||||
} | ||||||||||
return occurrence, event_data | ||||||||||
|
||||||||||
@property | ||||||||||
def counter_names(self) -> list[str]: | ||||||||||
return [] # placeholder return for now | ||||||||||
|
||||||||||
def get_dedupe_value(self, data_packet: DataPacket[QuerySubscriptionUpdate]) -> int: | ||||||||||
return 0 # placeholder return for now | ||||||||||
|
||||||||||
def get_group_key_values( | ||||||||||
self, data_packet: DataPacket[QuerySubscriptionUpdate] | ||||||||||
) -> dict[str, int]: | ||||||||||
return {"dummy": 0} # placeholder return for now | ||||||||||
|
||||||||||
|
||||||||||
# Example GroupType and detector handler for metric alerts. We don't create these issues yet, but we'll use something | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,6 +57,9 @@ | |
from sentry.snuba.subscriptions import delete_snuba_subscription | ||
from sentry.utils import metrics, redis | ||
from sentry.utils.dates import to_datetime | ||
from sentry.workflow_engine.models import DataPacket | ||
from sentry.workflow_engine.processors.data_source import process_data_sources | ||
from sentry.workflow_engine.processors.detector import process_detectors | ||
|
||
logger = logging.getLogger(__name__) | ||
REDIS_TTL = int(timedelta(days=7).total_seconds()) | ||
|
@@ -358,6 +361,24 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None: | |
metrics.incr("incidents.alert_rules.skipping_already_processed_update") | ||
return | ||
|
||
if features.has( | ||
"organizations:workflow-engine-m3-process", self.subscription.project.organization | ||
): | ||
# NOTE: feed the data through the new pipeline, but don't do anything with it yet. | ||
# This will change at some point. | ||
data_packet = DataPacket[QuerySubscriptionUpdate]( | ||
query_id=self.subscription.id, packet=subscription_update | ||
) | ||
detectors = process_data_sources([data_packet], query_type=self.subscription.type) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 - i was intending the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Colleen is writing an enum for this purpose! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: change |
||
results = [] | ||
for data_packet, detectors in detectors: | ||
results.append(process_detectors(data_packet, detectors)) | ||
logger.info( | ||
"Results from process_detectors", | ||
extra={ | ||
"results": results, | ||
}, | ||
) | ||
self.last_update = subscription_update["timestamp"] | ||
|
||
if ( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2956,6 +2956,15 @@ def test_resolved_alert_updates_metric_issue(self, mock_produce_occurrence_to_ka | |
assert status_change.new_status == GroupStatus.RESOLVED | ||
assert occurrence.fingerprint == status_change.fingerprint | ||
|
||
@with_feature("organizations:workflow-engine-m3-process") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is pretty messy. Any suggestions for how we can better log the data flowing through the pipeline? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm adding centralized logging to the If we make the change to have If you wanted to test this stuff w/o that method, it may want to be like different snuba query results with a configured detector, and how does that invoke process_data_sources / process_detectors differently. |
||
@mock.patch("sentry.incidents.subscription_processor.logger") | ||
def test_process_data_sources(self, mock_logger): | ||
rule = self.rule | ||
detector = self.create_detector(name="hojicha", type="metric_alert_fire") | ||
data_source = self.create_data_source(query_id=rule.snuba_query.id, type="incidents") | ||
data_source.detectors.set([detector]) | ||
self.send_update(rule, 10) | ||
|
||
|
||
class MetricsCrashRateAlertProcessUpdateTest(ProcessUpdateBaseClass, BaseMetricsTestCase): | ||
@pytest.fixture(autouse=True) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be overriding the
evaluate
method from a stateful detector? just looking a little closer at the base class, it looks like this would call through to check the detectors conditions and that should all work from the base if the models are constructed. (lemme double check with dan on some of his intentions on code in this area though)