-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(aci milestone 3): call process_data_sources in subscription processor #81788
Conversation
Codecov ReportAttention: Patch coverage is ✅ All tests successful. No failed tests found.
Additional details and impacted files@@ Coverage Diff @@
## master #81788 +/- ##
========================================
Coverage 80.35% 80.36%
========================================
Files 7252 7257 +5
Lines 320551 320812 +261
Branches 20859 20859
========================================
+ Hits 257577 257812 +235
- Misses 62579 62605 +26
Partials 395 395 |
data_packet = DataPacket( | ||
query_id=self.subscription.snuba_query.id, packet=subscription_update | ||
) | ||
detectors = process_data_sources([data_packet], query_type=self.subscription.type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 - i was intending the query_type
to be more of the DataSource
, so this would be like snuba_subscription
or something along those lines. maybe we should have those exported from data_sources
as an enum and applied to the type
field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Colleen is writing an enum for this purpose!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: change query_type
to be DataSourceType.SNUBA_QUERY_SUBSCRIPTION
once Colleen's PR lands
Co-authored-by: Josh Callender <[email protected]>
60aa52f
to
f0768e7
Compare
@@ -2956,6 +2958,38 @@ def test_resolved_alert_updates_metric_issue(self, mock_produce_occurrence_to_ka | |||
assert status_change.new_status == GroupStatus.RESOLVED | |||
assert occurrence.fingerprint == status_change.fingerprint | |||
|
|||
@with_feature("organizations:workflow-engine-m3-process") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty messy. Any suggestions for how we can better log the data flowing through the pipeline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm adding centralized logging to the process_*
methods, so we shouldn't need to add much in the way of metrics / logging here. It will all be based off of the types set in each of those flows. We should only have to worry about anything that's specific to metric alerts implementation here.
If we make the change to have process_data_packets
as i was talking about before, you would just have to verify that process_data_packets is being invoked correctly in different states, then you can assume the platform would be handling the rest. (yay #platforms)
If you wanted to test this stuff w/o that method, it may want to be like different snuba query results with a configured detector, and how does that invoke process_data_sources / process_detectors differently.
result=None, | ||
event_data=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be nice for this to be a little more complete for tests by adding the occurrence and event data,
occurrence, event_data = self.build_occurrence_and_event_data("dummy_group", 0, PriorityLevel.HIGH)
and then update the result / event_data,
result=None, | |
event_data=None, | |
result=occurrence, | |
event_data=event_data, |
|
||
|
||
class MetricAlertDetectorHandler(StatefulDetectorHandler[QuerySubscriptionUpdate]): | ||
pass | ||
def evaluate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be overriding the evaluate
method from a stateful detector? just looking a little closer at the base class, it looks like this would call through to check the detectors conditions and that should all work from the base if the models are constructed. (lemme double check with dan on some of his intentions on code in this area though)
detectors = process_data_sources([data_packet], query_type=self.subscription.type) | ||
results = [] | ||
for data_packet, detectors in detectors: | ||
results.append(process_detectors(data_packet, detectors)) | ||
# NOTE: this is temporary, to verify in the tests that the right information is flowing through the pipeline. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was thinking of adding another process method that wraps process_data_sources
and process_detectors
and then returns a list of the results.
That would turn this code into something along these lines:
detectors = process_data_sources([data_packet], query_type=self.subscription.type) | |
results = [] | |
for data_packet, detectors in detectors: | |
results.append(process_detectors(data_packet, detectors)) | |
# NOTE: this is temporary, to verify in the tests that the right information is flowing through the pipeline. | |
results = process_data_packets([data_packet], query_type=self.subscription.type) |
Inside of process_data_packets
i was thinking we could add centralize metrics / logging there. Whatcha think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an excellent idea.
@@ -2956,6 +2958,38 @@ def test_resolved_alert_updates_metric_issue(self, mock_produce_occurrence_to_ka | |||
assert status_change.new_status == GroupStatus.RESOLVED | |||
assert occurrence.fingerprint == status_change.fingerprint | |||
|
|||
@with_feature("organizations:workflow-engine-m3-process") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm adding centralized logging to the process_*
methods, so we shouldn't need to add much in the way of metrics / logging here. It will all be based off of the types set in each of those flows. We should only have to worry about anything that's specific to metric alerts implementation here.
If we make the change to have process_data_packets
as i was talking about before, you would just have to verify that process_data_packets is being invoked correctly in different states, then you can assume the platform would be handling the rest. (yay #platforms)
If you wanted to test this stuff w/o that method, it may want to be like different snuba query results with a configured detector, and how does that invoke process_data_sources / process_detectors differently.
Co-authored-by: Josh Callender <[email protected]>
Co-authored-by: Josh Callender <[email protected]>
Co-authored-by: Josh Callender <[email protected]>
Closing because the rebase is too complicated and there aren't that many lines anyway—I'll start a new PR instead. |
Wrap the subscription update in a
DataPacket
and callprocess_data_sources
, then feed the results intoprocess_detectors
and log the result (process_detectors
calls a dummyevaluate
function for now). Step one of the pipeline.