Skip to content

Commit

Permalink
feat(workflow_engine): Adding support for process_workflows in the …
Browse files Browse the repository at this point in the history
…IssuePlatform (#81975)

## Description
- This PR will add a new post process group for our test `MetricIssue`.
- Adds a `test_integration` with two core tests;
- For processing the data and creating issue occurrences
(workflow_engine -> IssuePlatform)
- Another for taking occurrences and invoking workflows (IssuePlatform
-> workflow_engine.workflows)

---------

Co-authored-by: Snigdha Sharma <[email protected]>
  • Loading branch information
saponifi3d and snigdhas authored Dec 13, 2024
1 parent 24dabbc commit fcda2c8
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 15 deletions.
2 changes: 2 additions & 0 deletions src/sentry/features/temporary.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ def register_temporary_features(manager: FeatureManager):
manager.add("organizations:workflow-engine-m3-read", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable new workflow_engine UI (see: alerts create issues)
manager.add("organizations:workflow-engine-ui", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
# Enable Processing for Metric Alerts in the workflow_engine
manager.add("organizations:workflow-engine-metric-alert-processing", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable EventUniqueUserFrequencyConditionWithConditions special alert condition
manager.add("organizations:event-unique-user-frequency-condition-with-conditions", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
# Use spans instead of transactions for dynamic sampling calculations. This will become the new default.
Expand Down
59 changes: 58 additions & 1 deletion src/sentry/incidents/grouptype.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,68 @@
from __future__ import annotations

from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
from uuid import uuid4

from sentry import features
from sentry.incidents.endpoints.validators import MetricAlertsDetectorValidator
from sentry.incidents.utils.types import QuerySubscriptionUpdate
from sentry.issues.grouptype import GroupCategory, GroupType
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.models.organization import Organization
from sentry.ratelimits.sliding_windows import Quota
from sentry.types.group import PriorityLevel
from sentry.workflow_engine.handlers.detector import StatefulDetectorHandler
from sentry.workflow_engine.models.data_source import DataPacket
from sentry.workflow_engine.types import DetectorGroupKey


class MetricAlertDetectorHandler(StatefulDetectorHandler[QuerySubscriptionUpdate]):
pass
def build_occurrence_and_event_data(
self, group_key: DetectorGroupKey, value: int, new_status: PriorityLevel
) -> tuple[IssueOccurrence, dict[str, Any]]:
# Returning a placeholder for now, this may require us passing more info

occurrence = IssueOccurrence(
id=str(uuid4()),
project_id=self.detector.project_id,
event_id=str(uuid4()),
fingerprint=self.build_fingerprint(group_key),
issue_title="Some Issue",
subtitle="Some subtitle",
resource_id=None,
evidence_data={"detector_id": self.detector.id, "value": value},
evidence_display=[],
type=MetricAlertFire,
detection_time=datetime.now(UTC),
level="error",
culprit="Some culprit",
initial_issue_priority=new_status.value,
)
event_data = {
"timestamp": occurrence.detection_time,
"project_id": occurrence.project_id,
"event_id": occurrence.event_id,
"platform": "python",
"received": occurrence.detection_time,
"tags": {},
}
return occurrence, event_data

@property
def counter_names(self) -> list[str]:
# Placeholder for now, this should be a list of counters that we want to update as we go above warning / critical
return []

def get_dedupe_value(self, data_packet: DataPacket[QuerySubscriptionUpdate]) -> int:
return int(data_packet.packet.get("timestamp", datetime.now(UTC)).timestamp())

def get_group_key_values(
self, data_packet: DataPacket[QuerySubscriptionUpdate]
) -> dict[DetectorGroupKey, int]:
# This is for testing purposes, we'll need to update the values inspected.
return {None: data_packet.packet["values"]["foo"]}


# Example GroupType and detector handler for metric alerts. We don't create these issues yet, but we'll use something
Expand All @@ -27,3 +80,7 @@ class MetricAlertFire(GroupType):
detector_handler = MetricAlertDetectorHandler
detector_validator = MetricAlertsDetectorValidator
detector_config_schema = {} # TODO(colleen): update this

@classmethod
def allow_post_process_group(cls, organization: Organization) -> bool:
return features.has("organizations:workflow-engine-metric-alert-processing", organization)
25 changes: 21 additions & 4 deletions src/sentry/tasks/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,12 +720,12 @@ def run_post_process_job(job: PostProcessJob) -> None:
):
return

if issue_category not in GROUP_CATEGORY_POST_PROCESS_PIPELINE:
# pipeline for generic issues
pipeline = GENERIC_POST_PROCESS_PIPELINE
else:
if issue_category in GROUP_CATEGORY_POST_PROCESS_PIPELINE:
# specific pipelines for issue types
pipeline = GROUP_CATEGORY_POST_PROCESS_PIPELINE[issue_category]
else:
# pipeline for generic issues
pipeline = GENERIC_POST_PROCESS_PIPELINE

for pipeline_step in pipeline:
try:
Expand Down Expand Up @@ -994,6 +994,20 @@ def _get_replay_id(event):
)


def process_workflow_engine(job: PostProcessJob) -> None:
if job["is_reprocessed"]:
return

# TODO - Add a rollout flag check here, if it's not enabled, call process_rules
# If the flag is enabled, use the code below
from sentry.workflow_engine.processors.workflow import process_workflows

evt = job["event"]

with sentry_sdk.start_span(op="tasks.post_process_group.workflow_engine.process_workflow"):
process_workflows(evt)


def process_rules(job: PostProcessJob) -> None:
if job["is_reprocessed"]:
return
Expand Down Expand Up @@ -1558,6 +1572,9 @@ def detect_base_urls_for_uptime(job: PostProcessJob):
feedback_filter_decorator(process_inbox_adds),
feedback_filter_decorator(process_rules),
],
GroupCategory.METRIC_ALERT: [
process_workflow_engine,
],
}

GENERIC_POST_PROCESS_PIPELINE = [
Expand Down
11 changes: 9 additions & 2 deletions src/sentry/workflow_engine/handlers/detector/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def get_dedupe_value(self, data_packet: DataPacket[T]) -> int:
pass

@abc.abstractmethod
def get_group_key_values(self, data_packet: DataPacket[T]) -> dict[str, int]:
def get_group_key_values(self, data_packet: DataPacket[T]) -> dict[DetectorGroupKey, int]:
"""
Extracts the values for all the group keys that exist in the given data packet,
and returns then as a dict keyed by group_key.
Expand All @@ -70,6 +70,9 @@ def build_occurrence_and_event_data(
def build_fingerprint(self, group_key) -> list[str]:
"""
Builds a fingerprint to uniquely identify a detected issue
TODO - Take into account the data source / query that triggered the detector,
we'll want to create a new issue if the query changes.
"""
return [f"{self.detector.id}{':' + group_key if group_key is not None else ''}"]

Expand All @@ -84,13 +87,17 @@ def get_state_data(
group_key_detectors = self.bulk_get_detector_state(group_keys)
dedupe_keys = [self.build_dedupe_value_key(gk) for gk in group_keys]
pipeline = get_redis_client().pipeline()

for dk in dedupe_keys:
pipeline.get(dk)

group_key_dedupe_values = {
gk: int(dv) if dv else 0 for gk, dv in zip(group_keys, pipeline.execute())
}

pipeline.reset()
counter_updates = {}

if self.counter_names:
counter_keys = [
self.build_counter_value_key(gk, name)
Expand All @@ -117,7 +124,7 @@ def get_state_data(
else DetectorPriorityLevel.OK
),
dedupe_value=group_key_dedupe_values[gk],
counter_updates=counter_updates[gk],
counter_updates=counter_updates.get(gk, {}),
)
return results

Expand Down
1 change: 1 addition & 0 deletions src/sentry/workflow_engine/models/data_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
logger = logging.getLogger(__name__)


# TODO should this be in types?
class Condition(StrEnum):
EQUAL = "eq"
GREATER_OR_EQUAL = "gte"
Expand Down
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/processors/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def process_data_sources(
) -> list[tuple[DataPacket, list[Detector]]]:
metrics.incr("sentry.workflow_engine.process_data_sources", tags={"query_type": query_type})

data_packet_ids = {packet.query_id for packet in data_packets}
data_packet_ids = {int(packet.query_id) for packet in data_packets}

# Fetch all data sources and associated detectors for the given data packets
with sentry_sdk.start_span(op="sentry.workflow_engine.process_data_sources.fetch_data_sources"):
Expand All @@ -23,12 +23,12 @@ def process_data_sources(
).prefetch_related(Prefetch("detectors"))

# Build a lookup dict for query_id to detectors
query_id_to_detectors = {ds.query_id: list(ds.detectors.all()) for ds in data_sources}
query_id_to_detectors = {int(ds.query_id): list(ds.detectors.all()) for ds in data_sources}

# Create the result tuples
result = []
for packet in data_packets:
detectors = query_id_to_detectors.get(packet.query_id)
detectors = query_id_to_detectors.get(int(packet.query_id))

if detectors:
data_packet_tuple = (packet, detectors)
Expand Down
1 change: 1 addition & 0 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ def evaluate_value(value: T, comparison: Any, condition: str) -> DataConditionRe

class DetectorType(StrEnum):
ERROR = "ErrorDetector"
METRIC_ALERT_FIRE = "metric_alert_fire"
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class MockDetectorStateHandler(StatefulDetectorHandler[dict]):
def get_dedupe_value(self, data_packet: DataPacket[dict]) -> int:
return data_packet.packet.get("dedupe", 0)

def get_group_key_values(self, data_packet: DataPacket[dict]) -> dict[str, int]:
def get_group_key_values(self, data_packet: DataPacket[dict]) -> dict[str | None, int]:
return data_packet.packet.get("group_vals", {})

def build_occurrence_and_event_data(
Expand Down
20 changes: 16 additions & 4 deletions tests/sentry/workflow_engine/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,27 @@ def create_workflow_action(

return action_group, action

def create_group_event(self, project=None, occurrence=None) -> tuple[Group, Event, GroupEvent]:
def create_group_event(
self,
project=None,
event=None,
occurrence=None,
fingerprint="test_fingerprint",
) -> tuple[Group, Event, GroupEvent]:
project = project or self.project
group = self.create_group(project)
event = self.create_event(
group = self.create_group(project=project)

event = event or self.create_event(
project.id,
datetime.now(),
"test_fingerprint",
fingerprint,
)

event.for_group = group

if occurrence is not None and occurrence.get("event_id", None) is None:
occurrence["event_id"] = event.event_id

group_event = GroupEvent(
self.project.id,
event.event_id,
Expand Down
Loading

0 comments on commit fcda2c8

Please sign in to comment.