From 4a7fa6e3d10b6a331b03ca2c148b1c83238ecc15 Mon Sep 17 00:00:00 2001 From: Michelle Fu Date: Mon, 6 Jan 2025 16:02:48 -0800 Subject: [PATCH] wip --- .../migration_helpers/alert_rule.py | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/src/sentry/workflow_engine/migration_helpers/alert_rule.py b/src/sentry/workflow_engine/migration_helpers/alert_rule.py index 79f2fdb43452bd..20ce9c917b1ac4 100644 --- a/src/sentry/workflow_engine/migration_helpers/alert_rule.py +++ b/src/sentry/workflow_engine/migration_helpers/alert_rule.py @@ -1,3 +1,4 @@ +from typing import Any from sentry.incidents.grouptype import MetricAlertFire from sentry.incidents.models.alert_rule import AlertRule from sentry.snuba.models import QuerySubscription, SnubaQuery @@ -12,6 +13,7 @@ DetectorWorkflow, Workflow, WorkflowDataConditionGroup, + workflow, ) from sentry.workflow_engine.types import DetectorPriorityLevel @@ -155,3 +157,71 @@ def migrate_alert_rule( detector_workflow, workflow_data_condition_group, ) + + +def update_migrated_alert_rule(alert_rule: AlertRule, updated_fields: dict[str, Any]) -> ( + tuple[ + DataSource, + DataConditionGroup, # ??? + Workflow, # ??? + Detector, + ] + | None +): + # TODO: figure out how to handle DataConditionGroups (once Colleen's changes to that land) + # TODO: maybe pull this into a helper method? + try: + alert_rule_detector = AlertRuleDetector.objects.get(alert_rule=alert_rule) + alert_rule_workflow = AlertRuleWorkflow.objects.get(alert_rule=alert_rule) + except AlertRuleDetector.DoesNotExist: + logger.exception( + "AlertRuleDetector does not exist", + extra={"alert_rule_id": AlertRule.id}, + ) + except AlertRuleWorkflow.DoesNotExist: + logger.exception( + "AlertRuleWorkflow does not exist", + extra={"alert_rule_id": AlertRule.id}, + ) + return None + + detector = alert_rule_detector.detector + workflow = alert_rule_workflow.workflow + try: + detector_state = DetectorState.objects.get(detector=detector) + except DetectorState.DoesNotExist: + logger.exception( + "DetectorState does not exist", + extra={"alert_rule_id": AlertRule.id, "detector_id": detector.id}, + ) + return None + + # data condition group ?? + + # TODO: pull this into a helper + updated_detector_fields: dict[str:Any] = {} + config = detector.config.copy() + + if "name" in updated_fields: + updated_detector_fields["name"] = updated_fields["name"] + if "description" in updated_fields: + updated_detector_fields["description"] = updated_fields["description"] + if "user_id" in updated_fields: + updated_detector_fields["owner_user_id"] = updated_fields["user_id"] + if "team_id" in updated_fields: + updated_detector_fields["owner_team_id"] = updated_fields["team_id"] + # update config fields + if "threshold_period" in updated_fields: + config["threshold_period"] = updated_fields["threshold_period"] + if "sensitivity" in updated_fields: + config["sensitivity"] = updated_fields["sensitivity"] + if "seasonality" in updated_fields: + config["seasonality"] = updated_fields["seasonality"] + if "comparison_delta" in updated_fields: + config["comparison_delta"] = updated_fields["comparison_delta"] + updated_detector_fields["config"] = config + + detector.update(**updated_detector_fields) + + # TODO: do we need to create an audit log entry here? + return detector