From 9a897b840c2c6158d53cd8e618cf0c0db226ccad Mon Sep 17 00:00:00 2001 From: Evan Purkhiser Date: Mon, 13 May 2024 14:18:27 -0400 Subject: [PATCH] ref(crons): Split processing_errors into 2 modules Splits out the errors from the manager. Removes the CheckinProcessErrorsManager class (which had no state) and updated the function names to be more clear --- .../monitors/consumers/monitor_consumer.py | 4 +- ...ization_monitor_processing_errors_index.py | 10 +- ...project_monitor_processing_errors_index.py | 10 +- .../project_processing_errors_details.py | 4 +- src/sentry/monitors/processing_errors.py | 248 ------------------ .../monitors/processing_errors/errors.py | 114 ++++++++ .../monitors/processing_errors/manager.py | 153 +++++++++++ src/sentry/monitors/serializers.py | 5 +- src/sentry/monitors/testutils.py | 2 +- .../consumers/test_monitor_consumer.py | 2 +- ...ization_monitor_processing_errors_index.py | 12 +- ...project_monitor_processing_errors_index.py | 14 +- .../test_project_processing_errors_details.py | 21 +- .../monitors/processing_errors/__init__.py | 0 .../monitors/processing_errors/test_errors.py | 23 ++ .../test_manager.py} | 112 ++++---- 16 files changed, 369 insertions(+), 365 deletions(-) delete mode 100644 src/sentry/monitors/processing_errors.py create mode 100644 src/sentry/monitors/processing_errors/errors.py create mode 100644 src/sentry/monitors/processing_errors/manager.py create mode 100644 tests/sentry/monitors/processing_errors/__init__.py create mode 100644 tests/sentry/monitors/processing_errors/test_errors.py rename tests/sentry/monitors/{test_processing_errors.py => processing_errors/test_manager.py} (56%) diff --git a/src/sentry/monitors/consumers/monitor_consumer.py b/src/sentry/monitors/consumers/monitor_consumer.py index 4822f107d422ca..13472deed7097b 100644 --- a/src/sentry/monitors/consumers/monitor_consumer.py +++ b/src/sentry/monitors/consumers/monitor_consumer.py @@ -42,12 +42,12 @@ MonitorLimitsExceeded, MonitorType, ) -from sentry.monitors.processing_errors import ( +from sentry.monitors.processing_errors.errors import ( CheckinValidationError, ProcessingError, ProcessingErrorType, - handle_processing_errors, ) +from sentry.monitors.processing_errors.manager import handle_processing_errors from sentry.monitors.types import CheckinItem from sentry.monitors.utils import ( get_new_timeout_at, diff --git a/src/sentry/monitors/endpoints/organization_monitor_processing_errors_index.py b/src/sentry/monitors/endpoints/organization_monitor_processing_errors_index.py index e0499d39393273..d44317dd234325 100644 --- a/src/sentry/monitors/endpoints/organization_monitor_processing_errors_index.py +++ b/src/sentry/monitors/endpoints/organization_monitor_processing_errors_index.py @@ -11,10 +11,8 @@ from sentry.apidocs.parameters import GlobalParams from sentry.apidocs.utils import inline_sentry_response_serializer from sentry.models.organization import Organization -from sentry.monitors.processing_errors import ( - CheckinProcessErrorsManager, - CheckinProcessingErrorData, -) +from sentry.monitors.processing_errors.errors import CheckinProcessingErrorData +from sentry.monitors.processing_errors.manager import get_errors_for_projects from sentry.utils.auth import AuthenticatedHttpRequest @@ -46,9 +44,7 @@ def get(self, request: AuthenticatedHttpRequest, organization: Organization) -> Retrieves checkin processing errors for a monitor """ projects = self.get_projects(request, organization) - paginator = SequencePaginator( - list(enumerate(CheckinProcessErrorsManager().get_for_projects(projects))) - ) + paginator = SequencePaginator(list(enumerate(get_errors_for_projects(projects)))) return self.paginate( request=request, diff --git a/src/sentry/monitors/endpoints/project_monitor_processing_errors_index.py b/src/sentry/monitors/endpoints/project_monitor_processing_errors_index.py index c7b516a0b37709..4fc9df158949a1 100644 --- a/src/sentry/monitors/endpoints/project_monitor_processing_errors_index.py +++ b/src/sentry/monitors/endpoints/project_monitor_processing_errors_index.py @@ -9,10 +9,8 @@ from sentry.apidocs.constants import RESPONSE_FORBIDDEN, RESPONSE_NOT_FOUND, RESPONSE_UNAUTHORIZED from sentry.apidocs.parameters import GlobalParams, MonitorParams from sentry.apidocs.utils import inline_sentry_response_serializer -from sentry.monitors.processing_errors import ( - CheckinProcessErrorsManager, - CheckinProcessingErrorData, -) +from sentry.monitors.processing_errors.errors import CheckinProcessingErrorData +from sentry.monitors.processing_errors.manager import get_errors_for_monitor from sentry.utils.auth import AuthenticatedHttpRequest from .base import ProjectMonitorEndpoint @@ -46,9 +44,7 @@ def get(self, request: AuthenticatedHttpRequest, project, monitor) -> Response: """ Retrieves checkin processing errors for a monitor """ - paginator = SequencePaginator( - list(enumerate(CheckinProcessErrorsManager().get_for_monitor(monitor))) - ) + paginator = SequencePaginator(list(enumerate(get_errors_for_monitor(monitor)))) return self.paginate( request=request, diff --git a/src/sentry/monitors/endpoints/project_processing_errors_details.py b/src/sentry/monitors/endpoints/project_processing_errors_details.py index 67169046a9b4a4..7799b97693e8dd 100644 --- a/src/sentry/monitors/endpoints/project_processing_errors_details.py +++ b/src/sentry/monitors/endpoints/project_processing_errors_details.py @@ -20,7 +20,7 @@ ) from sentry.apidocs.parameters import GlobalParams, MonitorParams from sentry.models.project import Project -from sentry.monitors.processing_errors import CheckinProcessErrorsManager, InvalidProjectError +from sentry.monitors.processing_errors.manager import InvalidProjectError, delete_error from .base import ProjectMonitorPermission @@ -55,7 +55,7 @@ def delete(self, request: Request, project: Project, uuid: str) -> Response: except ValueError: raise ValidationError("Invalid UUID") try: - CheckinProcessErrorsManager().delete(project, parsed_uuid) + delete_error(project, parsed_uuid) except InvalidProjectError: raise ValidationError("Invalid uuid for project") return self.respond(status=204) diff --git a/src/sentry/monitors/processing_errors.py b/src/sentry/monitors/processing_errors.py deleted file mode 100644 index 4956d535903fa7..00000000000000 --- a/src/sentry/monitors/processing_errors.py +++ /dev/null @@ -1,248 +0,0 @@ -from __future__ import annotations - -import dataclasses -import logging -import uuid -from datetime import timedelta -from enum import Enum -from itertools import chain -from typing import Any, TypedDict - -from django.conf import settings -from redis.client import StrictRedis -from rediscluster import RedisCluster - -from sentry import features -from sentry.models.organization import Organization -from sentry.models.project import Project -from sentry.monitors.models import Monitor -from sentry.monitors.types import CheckinItem, CheckinItemData -from sentry.utils import json, metrics, redis - -logger = logging.getLogger(__name__) - -MAX_ERRORS_PER_SET = 10 -MONITOR_ERRORS_LIFETIME = timedelta(days=7) - - -class ProcessingErrorType(Enum): - CHECKIN_ENVIRONMENT_MISMATCH = 0 - """The environment sent with the checkin update doesn't match the environment already associated with the checkin""" - CHECKIN_FINISHED = 1 - """The checkin was already completed and we attempted to modify it""" - CHECKIN_GUID_PROJECT_MISMATCH = 2 - """The guid for the checkin matched a checkin that was related to a different project than the one provided in the DSN""" - CHECKIN_INVALID_DURATION = 3 - """We dropped a checkin due to invalid duration""" - CHECKIN_INVALID_GUID = 4 - """GUID passed with checkin is invalid""" - CHECKIN_VALIDATION_FAILED = 5 - """Checkin format was invalid""" - MONITOR_DISABLED = 6 - """Monitor was disabled for a non-billing related reason""" - MONITOR_DISABLED_NO_QUOTA = 7 - """Monitor was disabled and we couldn't assign a seat""" - MONITOR_INVALID_CONFIG = 8 - """A monitor wasn't found, and we failed to upsert due to invalid config""" - MONITOR_INVALID_ENVIRONMENT = 9 - """The environment information passed with the checkin was invalid""" - MONITOR_LIMIT_EXCEEDED = 10 - """The maximum number of monitors allowed per project has been exceeded""" - MONITOR_NOT_FOUND = 11 - """Monitor with the provided slug doesn't exist, and either no or invalid upsert data provided""" - MONITOR_OVER_QUOTA = 12 - """This monitor can't accept checkins and is over quota""" - MONITOR_ENVIRONMENT_LIMIT_EXCEEDED = 13 - """The monitor has too many environments associated with it already, can't add another""" - MONITOR_ENVIRONMENT_RATELIMITED = 14 - """This monitor environment is sending checkins too frequently""" - ORGANIZATION_KILLSWITCH_ENABLED = 15 - """We have disabled checkin ingestion for this org. Contact support for details""" - - -class CheckinValidationError(Exception): - def __init__(self, processing_errors: list[ProcessingError], monitor: Monitor | None = None): - # Monitor is optional, since we don't always have the monitor related to the checkin available - self.processing_errors = processing_errors - self.monitor = monitor - - -class ProcessingErrorData(TypedDict): - type: str - data: dict[str, Any] - - -@dataclasses.dataclass(frozen=True) -class ProcessingError: - type: ProcessingErrorType - data: dict[str, Any] = dataclasses.field(default_factory=dict) - - def to_dict(self) -> ProcessingErrorData: - return { - "type": self.type.name, - "data": self.data, - } - - @classmethod - def from_dict(cls, processing_error_data: ProcessingErrorData) -> ProcessingError: - return cls( - ProcessingErrorType[processing_error_data["type"]], - processing_error_data["data"], - ) - - -class CheckinProcessingErrorData(TypedDict): - errors: list[ProcessingErrorData] - checkin: CheckinItemData - id: str - - -@dataclasses.dataclass(frozen=True) -class CheckinProcessingError: - errors: list[ProcessingError] - checkin: CheckinItem - id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4) - - def to_dict(self) -> CheckinProcessingErrorData: - return { - "errors": [error.to_dict() for error in self.errors], - "checkin": self.checkin.to_dict(), - "id": self.id.hex, - } - - @classmethod - def from_dict(cls, data: CheckinProcessingErrorData) -> CheckinProcessingError: - return cls( - errors=[ProcessingError.from_dict(error) for error in data["errors"]], - checkin=CheckinItem.from_dict(data["checkin"]), - id=uuid.UUID(data["id"]), - ) - - def __hash__(self): - return hash(self.id.hex) - - def __eq__(self, other): - if isinstance(other, CheckinProcessingError): - return self.id.hex == other.id.hex - return False - - -class InvalidProjectError(Exception): - pass - - -class CheckinProcessErrorsManager: - def _get_cluster(self) -> RedisCluster[str] | StrictRedis[str]: - return redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) - - def _get_entity_identifier_from_error( - self, error: CheckinProcessingError, monitor: Monitor | None = None - ) -> str: - if monitor is None: - # Attempt to get the monitor from the checkin info if we failed to retrieve it during ingestion - try: - monitor = Monitor.objects.get( - project_id=error.checkin.message["project_id"], - slug=error.checkin.payload["monitor_slug"], - ) - except Monitor.DoesNotExist: - pass - if monitor: - entity_identifier = self.build_monitor_identifier(monitor) - else: - entity_identifier = self.build_project_identifier(error.checkin.message["project_id"]) - - return entity_identifier - - def store(self, error: CheckinProcessingError, monitor: Monitor | None): - entity_identifier = self._get_entity_identifier_from_error(error, monitor) - error_set_key = self.build_set_identifier(entity_identifier) - error_key = self.build_error_identifier(error.id) - serialized_error = json.dumps(error.to_dict()) - redis_client = self._get_cluster() - pipeline = redis_client.pipeline(transaction=False) - pipeline.zadd(error_set_key, {error.id.hex: error.checkin.ts.timestamp()}) - pipeline.set(error_key, serialized_error, ex=MONITOR_ERRORS_LIFETIME) - # Cap the error list to the `MAX_ERRORS_PER_SET` most recent errors - pipeline.zremrangebyrank(error_set_key, 0, -(MAX_ERRORS_PER_SET + 1)) - pipeline.expire(error_set_key, MONITOR_ERRORS_LIFETIME) - pipeline.execute() - - def build_set_identifier(self, entity_identifier: str) -> str: - return f"monitors.processing_errors_set.{entity_identifier}" - - def build_error_identifier(self, uuid: uuid.UUID) -> str: - return f"monitors.processing_errors.{uuid.hex}" - - def build_monitor_identifier(self, monitor: Monitor) -> str: - return f"monitor:{monitor.id}" - - def get_for_monitor(self, monitor: Monitor) -> list[CheckinProcessingError]: - return self._get_for_entities([self.build_monitor_identifier(monitor)]) - - def build_project_identifier(self, project_id: int) -> str: - return f"project:{project_id}" - - def get_for_projects(self, projects: list[Project]) -> list[CheckinProcessingError]: - return self._get_for_entities( - [self.build_project_identifier(project.id) for project in projects] - ) - - def delete(self, project: Project, uuid: uuid.UUID): - error_identifier = self.build_error_identifier(uuid) - redis = self._get_cluster() - raw_error = redis.get(error_identifier) - if raw_error is None: - return - error = CheckinProcessingError.from_dict(json.loads(raw_error)) - if error.checkin.message["project_id"] != project.id: - # TODO: Better exception class - raise InvalidProjectError() - - entity_identifier = self._get_entity_identifier_from_error(error) - self._delete_for_entity(entity_identifier, uuid) - - def _get_for_entities(self, entity_identifiers: list[str]) -> list[CheckinProcessingError]: - redis = self._get_cluster() - pipeline = redis.pipeline() - for identifier in entity_identifiers: - pipeline.zrange(self.build_set_identifier(identifier), 0, MAX_ERRORS_PER_SET, desc=True) - error_identifiers = [ - self.build_error_identifier(uuid.UUID(error_identifier)) - for error_identifier in chain(*pipeline.execute()) - ] - errors = [ - CheckinProcessingError.from_dict(json.loads(raw_error)) - for raw_error in redis.mget(error_identifiers) - if raw_error is not None - ] - errors.sort(key=lambda error: error.checkin.ts.timestamp(), reverse=True) - return errors - - def _delete_for_entity(self, entity_identifier: str, uuid: uuid.UUID) -> None: - pipeline = self._get_cluster().pipeline() - pipeline.zrem(self.build_set_identifier(entity_identifier), uuid.hex) - pipeline.delete(self.build_error_identifier(uuid)) - pipeline.execute() - - -def handle_processing_errors(item: CheckinItem, error: CheckinValidationError): - try: - project = Project.objects.get_from_cache(id=item.message["project_id"]) - organization = Organization.objects.get_from_cache(id=project.organization_id) - if not features.has("organizations:crons-write-user-feedback", organization): - return - - metrics.incr( - "monitors.checkin.handle_processing_error", - tags={ - "source": "consumer", - "sdk_platform": item.message["sdk"], - }, - ) - - checkin_processing_error = CheckinProcessingError(error.processing_errors, item) - manager = CheckinProcessErrorsManager() - manager.store(checkin_processing_error, error.monitor) - except Exception: - logger.exception("Failed to log processing error") diff --git a/src/sentry/monitors/processing_errors/errors.py b/src/sentry/monitors/processing_errors/errors.py new file mode 100644 index 00000000000000..63915bd0b25ed1 --- /dev/null +++ b/src/sentry/monitors/processing_errors/errors.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import dataclasses +import logging +import uuid +from enum import Enum +from typing import Any, TypedDict + +from sentry.monitors.models import Monitor +from sentry.monitors.types import CheckinItem, CheckinItemData + +logger = logging.getLogger(__name__) + + +class ProcessingErrorType(Enum): + CHECKIN_ENVIRONMENT_MISMATCH = 0 + """The environment sent with the checkin update doesn't match the environment already associated with the checkin""" + CHECKIN_FINISHED = 1 + """The checkin was already completed and we attempted to modify it""" + CHECKIN_GUID_PROJECT_MISMATCH = 2 + """The guid for the checkin matched a checkin that was related to a different project than the one provided in the DSN""" + CHECKIN_INVALID_DURATION = 3 + """We dropped a checkin due to invalid duration""" + CHECKIN_INVALID_GUID = 4 + """GUID passed with checkin is invalid""" + CHECKIN_VALIDATION_FAILED = 5 + """Checkin format was invalid""" + MONITOR_DISABLED = 6 + """Monitor was disabled for a non-billing related reason""" + MONITOR_DISABLED_NO_QUOTA = 7 + """Monitor was disabled and we couldn't assign a seat""" + MONITOR_INVALID_CONFIG = 8 + """A monitor wasn't found, and we failed to upsert due to invalid config""" + MONITOR_INVALID_ENVIRONMENT = 9 + """The environment information passed with the checkin was invalid""" + MONITOR_LIMIT_EXCEEDED = 10 + """The maximum number of monitors allowed per project has been exceeded""" + MONITOR_NOT_FOUND = 11 + """Monitor with the provided slug doesn't exist, and either no or invalid upsert data provided""" + MONITOR_OVER_QUOTA = 12 + """This monitor can't accept checkins and is over quota""" + MONITOR_ENVIRONMENT_LIMIT_EXCEEDED = 13 + """The monitor has too many environments associated with it already, can't add another""" + MONITOR_ENVIRONMENT_RATELIMITED = 14 + """This monitor environment is sending checkins too frequently""" + ORGANIZATION_KILLSWITCH_ENABLED = 15 + """We have disabled checkin ingestion for this org. Contact support for details""" + + +class CheckinValidationError(Exception): + def __init__(self, processing_errors: list[ProcessingError], monitor: Monitor | None = None): + # Monitor is optional, since we don't always have the monitor related to the checkin available + self.processing_errors = processing_errors + self.monitor = monitor + + +class ProcessingErrorData(TypedDict): + type: str + data: dict[str, Any] + + +@dataclasses.dataclass(frozen=True) +class ProcessingError: + type: ProcessingErrorType + data: dict[str, Any] = dataclasses.field(default_factory=dict) + + def to_dict(self) -> ProcessingErrorData: + return { + "type": self.type.name, + "data": self.data, + } + + @classmethod + def from_dict(cls, processing_error_data: ProcessingErrorData) -> ProcessingError: + return cls( + ProcessingErrorType[processing_error_data["type"]], + processing_error_data["data"], + ) + + +class CheckinProcessingErrorData(TypedDict): + errors: list[ProcessingErrorData] + checkin: CheckinItemData + id: str + + +@dataclasses.dataclass(frozen=True) +class CheckinProcessingError: + errors: list[ProcessingError] + checkin: CheckinItem + id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4) + + def to_dict(self) -> CheckinProcessingErrorData: + return { + "errors": [error.to_dict() for error in self.errors], + "checkin": self.checkin.to_dict(), + "id": self.id.hex, + } + + @classmethod + def from_dict(cls, data: CheckinProcessingErrorData) -> CheckinProcessingError: + return cls( + errors=[ProcessingError.from_dict(error) for error in data["errors"]], + checkin=CheckinItem.from_dict(data["checkin"]), + id=uuid.UUID(data["id"]), + ) + + def __hash__(self): + return hash(self.id.hex) + + def __eq__(self, other): + if isinstance(other, CheckinProcessingError): + return self.id.hex == other.id.hex + return False diff --git a/src/sentry/monitors/processing_errors/manager.py b/src/sentry/monitors/processing_errors/manager.py new file mode 100644 index 00000000000000..fdf054e4eb09db --- /dev/null +++ b/src/sentry/monitors/processing_errors/manager.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +import logging +import uuid +from datetime import timedelta +from itertools import chain + +from django.conf import settings +from redis.client import StrictRedis +from rediscluster import RedisCluster + +from sentry import features +from sentry.models.organization import Organization +from sentry.models.project import Project +from sentry.monitors.models import Monitor +from sentry.monitors.types import CheckinItem +from sentry.utils import json, metrics, redis + +from .errors import CheckinProcessingError, CheckinValidationError + +logger = logging.getLogger(__name__) + +MAX_ERRORS_PER_SET = 10 +MONITOR_ERRORS_LIFETIME = timedelta(days=7) + + +class InvalidProjectError(Exception): + pass + + +def _get_cluster() -> RedisCluster | StrictRedis[str]: + return redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) + + +def build_set_identifier(entity_identifier: str) -> str: + return f"monitors.processing_errors_set.{entity_identifier}" + + +def build_error_identifier(uuid: uuid.UUID) -> str: + return f"monitors.processing_errors.{uuid.hex}" + + +def build_monitor_identifier(monitor: Monitor) -> str: + return f"monitor:{monitor.id}" + + +def build_project_identifier(project_id: int) -> str: + return f"project:{project_id}" + + +def _get_entity_identifier_from_error( + error: CheckinProcessingError, + monitor: Monitor | None = None, +) -> str: + if monitor is None: + # Attempt to get the monitor from the checkin info if we failed to retrieve it during ingestion + try: + monitor = Monitor.objects.get( + project_id=error.checkin.message["project_id"], + slug=error.checkin.payload["monitor_slug"], + ) + except Monitor.DoesNotExist: + pass + if monitor: + entity_identifier = build_monitor_identifier(monitor) + else: + entity_identifier = build_project_identifier(error.checkin.message["project_id"]) + + return entity_identifier + + +def _get_for_entities(entity_identifiers: list[str]) -> list[CheckinProcessingError]: + redis = _get_cluster() + pipeline = redis.pipeline() + for identifier in entity_identifiers: + pipeline.zrange(build_set_identifier(identifier), 0, MAX_ERRORS_PER_SET, desc=True) + error_identifiers = [ + build_error_identifier(uuid.UUID(error_identifier)) + for error_identifier in chain(*pipeline.execute()) + ] + errors = [ + CheckinProcessingError.from_dict(json.loads(raw_error)) + for raw_error in redis.mget(error_identifiers) + if raw_error is not None + ] + errors.sort(key=lambda error: error.checkin.ts.timestamp(), reverse=True) + return errors + + +def _delete_for_entity(entity_identifier: str, uuid: uuid.UUID) -> None: + pipeline = _get_cluster().pipeline() + pipeline.zrem(build_set_identifier(entity_identifier), uuid.hex) + pipeline.delete(build_error_identifier(uuid)) + pipeline.execute() + + +def store_error(error: CheckinProcessingError, monitor: Monitor | None): + entity_identifier = _get_entity_identifier_from_error(error, monitor) + error_set_key = build_set_identifier(entity_identifier) + error_key = build_error_identifier(error.id) + serialized_error = json.dumps(error.to_dict()) + redis_client = _get_cluster() + pipeline = redis_client.pipeline(transaction=False) + pipeline.zadd(error_set_key, {error.id.hex: error.checkin.ts.timestamp()}) + pipeline.set(error_key, serialized_error, ex=MONITOR_ERRORS_LIFETIME) + # Cap the error list to the `MAX_ERRORS_PER_SET` most recent errors + pipeline.zremrangebyrank(error_set_key, 0, -(MAX_ERRORS_PER_SET + 1)) + pipeline.expire(error_set_key, MONITOR_ERRORS_LIFETIME) + pipeline.execute() + + +def delete_error(project: Project, uuid: uuid.UUID): + error_identifier = build_error_identifier(uuid) + redis = _get_cluster() + raw_error = redis.get(error_identifier) + if raw_error is None: + return + error = CheckinProcessingError.from_dict(json.loads(raw_error)) + if error.checkin.message["project_id"] != project.id: + # TODO: Better exception class + raise InvalidProjectError() + + entity_identifier = _get_entity_identifier_from_error(error) + _delete_for_entity(entity_identifier, uuid) + + +def get_errors_for_monitor(monitor: Monitor) -> list[CheckinProcessingError]: + return _get_for_entities([build_monitor_identifier(monitor)]) + + +def get_errors_for_projects(projects: list[Project]) -> list[CheckinProcessingError]: + return _get_for_entities([build_project_identifier(project.id) for project in projects]) + + +def handle_processing_errors(item: CheckinItem, error: CheckinValidationError): + try: + project = Project.objects.get_from_cache(id=item.message["project_id"]) + organization = Organization.objects.get_from_cache(id=project.organization_id) + if not features.has("organizations:crons-write-user-feedback", organization): + return + + metrics.incr( + "monitors.checkin.handle_processing_error", + tags={ + "source": "consumer", + "sdk_platform": item.message["sdk"], + }, + ) + + checkin_processing_error = CheckinProcessingError(error.processing_errors, item) + store_error(checkin_processing_error, error.monitor) + except Exception: + logger.exception("Failed to log processing error") diff --git a/src/sentry/monitors/serializers.py b/src/sentry/monitors/serializers.py index 52790f21a15365..84073824ca0f7a 100644 --- a/src/sentry/monitors/serializers.py +++ b/src/sentry/monitors/serializers.py @@ -17,7 +17,10 @@ MonitorIncident, MonitorStatus, ) -from sentry.monitors.processing_errors import CheckinProcessingError, CheckinProcessingErrorData +from sentry.monitors.processing_errors.errors import ( + CheckinProcessingError, + CheckinProcessingErrorData, +) from sentry.monitors.utils import fetch_associated_groups from sentry.monitors.validators import IntervalNames from sentry.types.actor import Actor diff --git a/src/sentry/monitors/testutils.py b/src/sentry/monitors/testutils.py index 2957781443f4b9..9ee592ee34ed56 100644 --- a/src/sentry/monitors/testutils.py +++ b/src/sentry/monitors/testutils.py @@ -2,7 +2,7 @@ from sentry_kafka_schemas.schema_types.ingest_monitors_v1 import CheckIn -from sentry.monitors.processing_errors import ( +from sentry.monitors.processing_errors.errors import ( CheckinProcessingError, ProcessingError, ProcessingErrorType, diff --git a/tests/sentry/monitors/consumers/test_monitor_consumer.py b/tests/sentry/monitors/consumers/test_monitor_consumer.py index d0fc419e0d3414..aac71bd521cb3b 100644 --- a/tests/sentry/monitors/consumers/test_monitor_consumer.py +++ b/tests/sentry/monitors/consumers/test_monitor_consumer.py @@ -29,7 +29,7 @@ MonitorType, ScheduleType, ) -from sentry.monitors.processing_errors import ( +from sentry.monitors.processing_errors.errors import ( CheckinValidationError, ProcessingError, ProcessingErrorType, diff --git a/tests/sentry/monitors/endpoints/test_organization_monitor_processing_errors_index.py b/tests/sentry/monitors/endpoints/test_organization_monitor_processing_errors_index.py index 45b3ecd74aaf06..6ed61002611621 100644 --- a/tests/sentry/monitors/endpoints/test_organization_monitor_processing_errors_index.py +++ b/tests/sentry/monitors/endpoints/test_organization_monitor_processing_errors_index.py @@ -1,9 +1,6 @@ from sentry.api.serializers import serialize -from sentry.monitors.processing_errors import ( - CheckinProcessErrorsManager, - ProcessingError, - ProcessingErrorType, -) +from sentry.monitors.processing_errors.errors import ProcessingError, ProcessingErrorType +from sentry.monitors.processing_errors.manager import store_error from sentry.monitors.testutils import build_checkin_processing_error from sentry.testutils.cases import APITestCase, MonitorTestCase from sentry.utils import json @@ -24,7 +21,6 @@ def test(self): monitor = self.create_monitor() project_2 = self.create_project() - manager = CheckinProcessErrorsManager() monitor_error = build_checkin_processing_error( [ProcessingError(ProcessingErrorType.CHECKIN_INVALID_GUID, {"guid": "bad"})], message_overrides={"project_id": self.project.id}, @@ -46,9 +42,9 @@ def test(self): ), ] - manager.store(monitor_error, monitor) + store_error(monitor_error, monitor) for error in project_errors: - manager.store(error, None) + store_error(error, None) resp = self.get_success_response( self.organization.slug, project=[self.project.id, project_2.id] diff --git a/tests/sentry/monitors/endpoints/test_project_monitor_processing_errors_index.py b/tests/sentry/monitors/endpoints/test_project_monitor_processing_errors_index.py index cc5ec7f2b5934f..f5ad46c65a1337 100644 --- a/tests/sentry/monitors/endpoints/test_project_monitor_processing_errors_index.py +++ b/tests/sentry/monitors/endpoints/test_project_monitor_processing_errors_index.py @@ -1,9 +1,6 @@ from sentry.api.serializers import serialize -from sentry.monitors.processing_errors import ( - CheckinProcessErrorsManager, - ProcessingError, - ProcessingErrorType, -) +from sentry.monitors.processing_errors.errors import ProcessingError, ProcessingErrorType +from sentry.monitors.processing_errors.manager import store_error from sentry.monitors.testutils import build_checkin_processing_error from sentry.testutils.cases import APITestCase, MonitorTestCase from sentry.utils import json @@ -25,7 +22,6 @@ def test_empty(self): def test(self): monitor = self.create_monitor() - manager = CheckinProcessErrorsManager() monitor_errors = [ build_checkin_processing_error( [ProcessingError(ProcessingErrorType.CHECKIN_INVALID_GUID, {"guid": "bad"})], @@ -43,9 +39,9 @@ def test(self): message_overrides={"project_id": self.project.id}, ) - manager.store(monitor_errors[0], monitor) - manager.store(monitor_errors[1], monitor) - manager.store(project_error, None) + store_error(monitor_errors[0], monitor) + store_error(monitor_errors[1], monitor) + store_error(project_error, None) resp = self.get_success_response(self.organization.slug, self.project.slug, monitor.slug) assert resp.data == json.loads(json.dumps(serialize(list(reversed(monitor_errors))))) diff --git a/tests/sentry/monitors/endpoints/test_project_processing_errors_details.py b/tests/sentry/monitors/endpoints/test_project_processing_errors_details.py index 06f161fbdef43f..59e038d683536b 100644 --- a/tests/sentry/monitors/endpoints/test_project_processing_errors_details.py +++ b/tests/sentry/monitors/endpoints/test_project_processing_errors_details.py @@ -1,8 +1,5 @@ -from sentry.monitors.processing_errors import ( - CheckinProcessErrorsManager, - ProcessingError, - ProcessingErrorType, -) +from sentry.monitors.processing_errors.errors import ProcessingError, ProcessingErrorType +from sentry.monitors.processing_errors.manager import get_errors_for_projects, store_error from sentry.monitors.testutils import build_checkin_processing_error from sentry.testutils.cases import APITestCase, MonitorTestCase @@ -19,27 +16,25 @@ def test_empty(self): self.get_error_response(self.organization.slug, self.project.slug, "hi") def test(self): - manager = CheckinProcessErrorsManager() monitor_error = build_checkin_processing_error( [ProcessingError(ProcessingErrorType.CHECKIN_INVALID_GUID, {"guid": "bad"})], message_overrides={"project_id": self.project.id}, ) - manager.store(monitor_error, None) - assert len(manager.get_for_projects([self.project])) == 1 + store_error(monitor_error, None) + assert len(get_errors_for_projects([self.project])) == 1 self.get_success_response(self.organization.slug, self.project.slug, monitor_error.id) - assert len(manager.get_for_projects([self.project])) == 0 + assert len(get_errors_for_projects([self.project])) == 0 def test_invalid_project(self): - manager = CheckinProcessErrorsManager() monitor_error = build_checkin_processing_error( [ProcessingError(ProcessingErrorType.CHECKIN_INVALID_GUID, {"guid": "bad"})], message_overrides={"project_id": self.project.id}, ) unrelated_project = self.create_project() - manager.store(monitor_error, None) - assert len(manager.get_for_projects([self.project])) == 1 + store_error(monitor_error, None) + assert len(get_errors_for_projects([self.project])) == 1 self.get_error_response( self.organization.slug, unrelated_project.slug, monitor_error.id, status_code=400 ) - assert len(manager.get_for_projects([self.project])) == 1 + assert len(get_errors_for_projects([self.project])) == 1 diff --git a/tests/sentry/monitors/processing_errors/__init__.py b/tests/sentry/monitors/processing_errors/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/tests/sentry/monitors/processing_errors/test_errors.py b/tests/sentry/monitors/processing_errors/test_errors.py new file mode 100644 index 00000000000000..c9fdef6dfd824c --- /dev/null +++ b/tests/sentry/monitors/processing_errors/test_errors.py @@ -0,0 +1,23 @@ +from sentry.monitors.processing_errors.errors import ( + CheckinProcessingError, + ProcessingError, + ProcessingErrorType, +) +from sentry.monitors.testutils import build_checkin_item + + +def test_processing_error(): + error = ProcessingError(ProcessingErrorType.CHECKIN_INVALID_GUID, {"some": "data"}) + recreated_error = ProcessingError.from_dict(error.to_dict()) + assert recreated_error.type == error.type + assert recreated_error.data == error.data + + +def test_checkin_processing_error(): + item = build_checkin_item() + error = CheckinProcessingError( + [ProcessingError(ProcessingErrorType.MONITOR_DISABLED, {"some": "data"})], + item, + ) + recreated_error = CheckinProcessingError.from_dict(error.to_dict()) + assert error == recreated_error diff --git a/tests/sentry/monitors/test_processing_errors.py b/tests/sentry/monitors/processing_errors/test_manager.py similarity index 56% rename from tests/sentry/monitors/test_processing_errors.py rename to tests/sentry/monitors/processing_errors/test_manager.py index d5e916f902f571..5a4506ffafa1d0 100644 --- a/tests/sentry/monitors/test_processing_errors.py +++ b/tests/sentry/monitors/processing_errors/test_manager.py @@ -1,69 +1,62 @@ from unittest import mock -from sentry.monitors.processing_errors import ( - CheckinProcessErrorsManager, +from sentry.monitors.processing_errors.errors import ( CheckinProcessingError, CheckinValidationError, ProcessingError, ProcessingErrorType, +) +from sentry.monitors.processing_errors.manager import ( + _get_cluster, + build_error_identifier, + delete_error, + get_errors_for_monitor, + get_errors_for_projects, handle_processing_errors, + store_error, ) from sentry.monitors.testutils import build_checkin_item, build_checkin_processing_error from sentry.testutils.cases import TestCase -class ProcessingErrorTest(TestCase): - def test(self): - error = ProcessingError(ProcessingErrorType.CHECKIN_INVALID_GUID, {"some": "data"}) - recreated_error = ProcessingError.from_dict(error.to_dict()) - assert recreated_error.type == error.type - assert recreated_error.data == error.data - - -class CheckinProcessingErrorTest(TestCase): - def test(self): - item = build_checkin_item() - error = CheckinProcessingError( - [ProcessingError(ProcessingErrorType.MONITOR_DISABLED, {"some": "data"})], - item, - ) - recreated_error = CheckinProcessingError.from_dict(error.to_dict()) - assert error == recreated_error +def assert_processing_errors_equal( + error_1: CheckinProcessingError, + error_2: CheckinProcessingError, +): + assert error_1.errors == error_2.errors + assert error_2.checkin == error_2.checkin class CheckinProcessErrorsManagerTest(TestCase): def test_store_with_monitor(self): monitor = self.create_monitor() - manager = CheckinProcessErrorsManager() processing_error = build_checkin_processing_error() - manager.store(processing_error, monitor) - fetched_processing_error = manager.get_for_monitor(monitor) + store_error(processing_error, monitor) + fetched_processing_error = get_errors_for_monitor(monitor) assert len(fetched_processing_error) == 1 - self.assert_processing_errors_equal(processing_error, fetched_processing_error[0]) + assert_processing_errors_equal(processing_error, fetched_processing_error[0]) def test_store_with_slug_exists(self): monitor = self.create_monitor() - manager = CheckinProcessErrorsManager() processing_error = build_checkin_processing_error( message_overrides={"project_id": self.project.id}, payload_overrides={"monitor_slug": monitor.slug}, ) - manager.store(processing_error, None) - fetched_processing_error = manager.get_for_monitor(monitor) + store_error(processing_error, None) + fetched_processing_error = get_errors_for_monitor(monitor) assert len(fetched_processing_error) == 1 - self.assert_processing_errors_equal(processing_error, fetched_processing_error[0]) + assert_processing_errors_equal(processing_error, fetched_processing_error[0]) def test_store_with_slug_not_exist(self): - manager = CheckinProcessErrorsManager() processing_error = build_checkin_processing_error( message_overrides={"project_id": self.project.id}, payload_overrides={"monitor_slug": "hi"}, ) - manager.store(processing_error, None) - fetched_processing_error = manager.get_for_projects([self.project]) + store_error(processing_error, None) + fetched_processing_error = get_errors_for_projects([self.project]) assert len(fetched_processing_error) == 1 - self.assert_processing_errors_equal(processing_error, fetched_processing_error[0]) + assert_processing_errors_equal(processing_error, fetched_processing_error[0]) def test_store_max(self): monitor = self.create_monitor() @@ -84,36 +77,26 @@ def test_store_max(self): payload_overrides={"monitor_slug": monitor.slug}, ), ] - manager = CheckinProcessErrorsManager() - with mock.patch("sentry.monitors.processing_errors.MAX_ERRORS_PER_SET", new=2): + with mock.patch("sentry.monitors.processing_errors.manager.MAX_ERRORS_PER_SET", new=2): for error in processing_errors: - manager.store(error, monitor) + store_error(error, monitor) - retrieved_errors = manager.get_for_monitor(monitor) + retrieved_errors = get_errors_for_monitor(monitor) assert len(retrieved_errors) == 2 - self.assert_processing_errors_equal(processing_errors[2], retrieved_errors[0]) - self.assert_processing_errors_equal(processing_errors[1], retrieved_errors[1]) - - def assert_processing_errors_equal( - self, error_1: CheckinProcessingError, error_2: CheckinProcessingError - ): - assert error_1.errors == error_2.errors - assert error_2.checkin == error_2.checkin + assert_processing_errors_equal(processing_errors[2], retrieved_errors[0]) + assert_processing_errors_equal(processing_errors[1], retrieved_errors[1]) def test_get_for_monitor_empty(self): - manager = CheckinProcessErrorsManager() monitor = self.create_monitor() - assert len(manager.get_for_monitor(monitor)) == 0 + assert len(get_errors_for_monitor(monitor)) == 0 def test_get_for_project(self): - manager = CheckinProcessErrorsManager() - assert len(manager.get_for_projects([self.project])) == 0 + assert len(get_errors_for_projects([self.project])) == 0 def test_get_missing_data(self): # Validate that we don't error if a processing error has expired but is still # in the set monitor = self.create_monitor() - manager = CheckinProcessErrorsManager() processing_errors = [ build_checkin_processing_error( [ProcessingError(ProcessingErrorType.CHECKIN_INVALID_GUID, {"guid": "bad"})], @@ -127,34 +110,32 @@ def test_get_missing_data(self): ), ] for processing_error in processing_errors: - manager.store(processing_error, monitor) - redis = manager._get_cluster() - redis.delete(manager.build_error_identifier(processing_errors[0].id)) - fetched_processing_error = manager.get_for_monitor(monitor) + store_error(processing_error, monitor) + redis = _get_cluster() + redis.delete(build_error_identifier(processing_errors[0].id)) + fetched_processing_error = get_errors_for_monitor(monitor) assert len(fetched_processing_error) == 1 - self.assert_processing_errors_equal(processing_errors[1], fetched_processing_error[0]) + assert_processing_errors_equal(processing_errors[1], fetched_processing_error[0]) def test_delete_for_monitor(self): - manager = CheckinProcessErrorsManager() monitor = self.create_monitor() processing_error = build_checkin_processing_error( message_overrides={"project_id": self.project.id}, payload_overrides={"monitor_slug": monitor.slug}, ) - manager.store(processing_error, monitor) - assert len(manager.get_for_monitor(monitor)) == 1 - manager.delete(self.project, processing_error.id) - assert len(manager.get_for_monitor(monitor)) == 0 + store_error(processing_error, monitor) + assert len(get_errors_for_monitor(monitor)) == 1 + delete_error(self.project, processing_error.id) + assert len(get_errors_for_monitor(monitor)) == 0 def test_delete_for_project(self): - manager = CheckinProcessErrorsManager() processing_error = build_checkin_processing_error( message_overrides={"project_id": self.project.id}, ) - manager.store(processing_error, None) - assert len(manager.get_for_projects([self.project])) == 1 - manager.delete(self.project, processing_error.id) - assert len(manager.get_for_projects([self.project])) == 0 + store_error(processing_error, None) + assert len(get_errors_for_projects([self.project])) == 1 + delete_error(self.project, processing_error.id) + assert len(get_errors_for_projects([self.project])) == 0 class HandleProcessingErrorsTest(TestCase): @@ -170,8 +151,7 @@ def test(self): ), exception, ) - manager = CheckinProcessErrorsManager() - errors = manager.get_for_monitor(monitor) + errors = get_errors_for_monitor(monitor) assert not errors with self.feature("organizations:crons-write-user-feedback"): handle_processing_errors( @@ -180,5 +160,5 @@ def test(self): ), exception, ) - errors = manager.get_for_monitor(monitor) + errors = get_errors_for_monitor(monitor) assert len(errors) == 1