Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aci milestone 3): call process_data_sources in subscription processor #81788

Closed
wants to merge 11 commits into from
2 changes: 2 additions & 0 deletions src/sentry/features/temporary.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ def register_temporary_features(manager: FeatureManager):
manager.add("organizations:webhooks-unresolved", OrganizationFeature, FeatureHandlerStrategy.OPTIONS, api_expose=True)
# Enable dual writing for metric alert issues (see: alerts create issues)
manager.add("organizations:workflow-engine-m3-dual-write", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable the new workflow for processing subscription updates (see: alerts create issues)
manager.add("organizations:workflow-engine-m3-process", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable reading from new ACI tables for metric alert issues (see: alerts create issues)
manager.add("organizations:workflow-engine-m3-read", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable new workflow_engine UI (see: alerts create issues)
Expand Down
63 changes: 61 additions & 2 deletions src/sentry/incidents/grouptype.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,74 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any

from sentry.incidents.endpoints.validators import MetricAlertsDetectorValidator
from sentry.incidents.utils.types import QuerySubscriptionUpdate
from sentry.issues.grouptype import GroupCategory, GroupType
from sentry.issues.grouptype import GroupCategory, GroupType, MetricIssuePOC
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.ratelimits.sliding_windows import Quota
from sentry.types.group import PriorityLevel
from sentry.workflow_engine.handlers.detector import StatefulDetectorHandler
from sentry.workflow_engine.handlers.detector.base import DetectorEvaluationResult
from sentry.workflow_engine.models.data_source import DataPacket
from sentry.workflow_engine.types import DetectorPriorityLevel


class MetricAlertDetectorHandler(StatefulDetectorHandler[QuerySubscriptionUpdate]):
pass
def evaluate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be overriding the evaluate method from a stateful detector? just looking a little closer at the base class, it looks like this would call through to check the detectors conditions and that should all work from the base if the models are constructed. (lemme double check with dan on some of his intentions on code in this area though)

self, data_packet: DataPacket[QuerySubscriptionUpdate]
) -> dict[str | None, DetectorEvaluationResult]:
return {
"dummy_group": DetectorEvaluationResult(
group_key="dummy_group",
is_active=True,
priority=DetectorPriorityLevel.HIGH,
result=None,
event_data=None,
Comment on lines +26 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice for this to be a little more complete for tests by adding the occurrence and event data,

occurrence, event_data = self.build_occurrence_and_event_data("dummy_group", 0, PriorityLevel.HIGH)

and then update the result / event_data,

Suggested change
result=None,
event_data=None,
result=occurrence,
event_data=event_data,

)
}

def build_occurrence_and_event_data(
self, group_key: str | None, value: int, new_status: PriorityLevel
) -> tuple[IssueOccurrence, dict[str, Any]]:
# placeholder return for now
occurrence = IssueOccurrence(
id="eb4b0acffadb4d098d48cb14165ab578",
project_id=123,
event_id="43878ab4419f4ab181f6379ac376d5aa",
fingerprint=["abc123"],
issue_title="Some Issue",
subtitle="Some subtitle",
resource_id=None,
evidence_data={},
evidence_display=[],
type=MetricAlertFire,
detection_time=datetime.now(timezone.utc),
level="error",
culprit="Some culprit",
initial_issue_priority=new_status.value,
)
event_data = {
"timestamp": occurrence.detection_time,
"project_id": occurrence.project_id,
"event_id": occurrence.event_id,
"platform": "python",
"received": occurrence.detection_time,
"tags": {},
}
return occurrence, event_data

@property
def counter_names(self) -> list[str]:
return [] # placeholder return for now

def get_dedupe_value(self, data_packet: DataPacket[QuerySubscriptionUpdate]) -> int:
return 0 # placeholder return for now

def get_group_key_values(
self, data_packet: DataPacket[QuerySubscriptionUpdate]
) -> dict[str, int]:
return {"dummy": 0} # placeholder return for now


# Example GroupType and detector handler for metric alerts. We don't create these issues yet, but we'll use something
Expand Down
21 changes: 21 additions & 0 deletions src/sentry/incidents/subscription_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
from sentry.snuba.subscriptions import delete_snuba_subscription
from sentry.utils import metrics, redis
from sentry.utils.dates import to_datetime
from sentry.workflow_engine.models import DataPacket
from sentry.workflow_engine.processors.data_source import process_data_sources
from sentry.workflow_engine.processors.detector import process_detectors

logger = logging.getLogger(__name__)
REDIS_TTL = int(timedelta(days=7).total_seconds())
Expand Down Expand Up @@ -358,6 +361,24 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
metrics.incr("incidents.alert_rules.skipping_already_processed_update")
return

if features.has(
"organizations:workflow-engine-m3-process", self.subscription.project.organization
):
# NOTE: feed the data through the new pipeline, but don't do anything with it yet.
# This will change at some point.
data_packet = DataPacket[QuerySubscriptionUpdate](
query_id=self.subscription.id, packet=subscription_update
)
detectors = process_data_sources([data_packet], query_type=self.subscription.type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 - i was intending the query_type to be more of the DataSource, so this would be like snuba_subscription or something along those lines. maybe we should have those exported from data_sources as an enum and applied to the type field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Colleen is writing an enum for this purpose!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: change query_type to be DataSourceType.SNUBA_QUERY_SUBSCRIPTION once Colleen's PR lands

results = []
for data_packet, detectors in detectors:
results.append(process_detectors(data_packet, detectors))
logger.info(
"Results from process_detectors",
extra={
"results": results,
},
)
self.last_update = subscription_update["timestamp"]

if (
Expand Down
9 changes: 9 additions & 0 deletions tests/sentry/incidents/test_subscription_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2956,6 +2956,15 @@ def test_resolved_alert_updates_metric_issue(self, mock_produce_occurrence_to_ka
assert status_change.new_status == GroupStatus.RESOLVED
assert occurrence.fingerprint == status_change.fingerprint

@with_feature("organizations:workflow-engine-m3-process")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty messy. Any suggestions for how we can better log the data flowing through the pipeline?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding centralized logging to the process_* methods, so we shouldn't need to add much in the way of metrics / logging here. It will all be based off of the types set in each of those flows. We should only have to worry about anything that's specific to metric alerts implementation here.

If we make the change to have process_data_packets as i was talking about before, you would just have to verify that process_data_packets is being invoked correctly in different states, then you can assume the platform would be handling the rest. (yay #platforms)

If you wanted to test this stuff w/o that method, it may want to be like different snuba query results with a configured detector, and how does that invoke process_data_sources / process_detectors differently.

@mock.patch("sentry.incidents.subscription_processor.logger")
def test_process_data_sources(self, mock_logger):
rule = self.rule
detector = self.create_detector(name="hojicha", type="metric_alert_fire")
data_source = self.create_data_source(query_id=rule.snuba_query.id, type="incidents")
data_source.detectors.set([detector])
self.send_update(rule, 10)


class MetricsCrashRateAlertProcessUpdateTest(ProcessUpdateBaseClass, BaseMetricsTestCase):
@pytest.fixture(autouse=True)
Expand Down
Loading