diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py index f150bb2b5e..0539bd6ce1 100644 --- a/app/celery/provider_tasks.py +++ b/app/celery/provider_tasks.py @@ -1,10 +1,13 @@ +from typing import Optional + from flask import current_app from notifications_utils.recipients import InvalidEmailError from notifications_utils.statsd_decorators import statsd from sqlalchemy.orm.exc import NoResultFound from app import notify_celery -from app.config import Config, QueueNames +from app.celery.utils import CeleryParams +from app.config import Config from app.dao import notifications_dao from app.dao.notifications_dao import update_notification_status_by_id from app.delivery import send_to_providers @@ -14,9 +17,9 @@ MalwareScanInProgressException, NotificationTechnicalFailureException, ) -from app.models import NOTIFICATION_TECHNICAL_FAILURE -from app.notifications import build_retry_task_params +from app.models import NOTIFICATION_TECHNICAL_FAILURE, Notification from app.notifications.callbacks import _check_and_queue_callback_task +from celery import Task # Celery rate limits are per worker instance and not a global rate limit. @@ -62,6 +65,7 @@ def deliver_sms(self, notification_id): @notify_celery.task(bind=True, name="deliver_email", max_retries=48, default_retry_delay=300) @statsd(namespace="tasks") def deliver_email(self, notification_id): + notification = None try: current_app.logger.debug("Start sending email for notification id: {}".format(notification_id)) notification = notifications_dao.get_notification_by_id(notification_id) @@ -81,30 +85,17 @@ def deliver_email(self, notification_id): _check_and_queue_callback_task(notification) except MalwareDetectedException: _check_and_queue_callback_task(notification) - except Exception as e: - if isinstance(e, MalwareScanInProgressException) and self.request.retries <= SCAN_MAX_BACKOFF_RETRIES: - countdown = SCAN_RETRY_BACKOFF * (self.request.retries + 1) # do we need to add 1 here? + except MalwareScanInProgressException as me: + if self.request.retries <= SCAN_MAX_BACKOFF_RETRIES: + countdown = SCAN_RETRY_BACKOFF * (self.request.retries + 1) else: countdown = None - try: - current_app.logger.warning(f"The exception is {repr(e)}") - if self.request.retries <= 10: - current_app.logger.warning("RETRY {}: Email notification {} failed".format(self.request.retries, notification_id)) - else: - current_app.logger.exception("RETRY: Email notification {} failed".format(notification_id)) - if countdown is not None: - self.retry(queue=QueueNames.RETRY, countdown=countdown) - else: - self.retry(queue=QueueNames.RETRY) - except self.MaxRetriesExceededError: - message = ( - "RETRY FAILED: Max retries reached. " - "The task send_email_to_provider failed for notification {}. " - "Notification has been updated to technical-failure".format(notification_id) - ) - update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE) - _check_and_queue_callback_task(notification) - raise NotificationTechnicalFailureException(message) + current_app.logger.warning( + "RETRY {}: Email notification {} is waiting on pending malware scanning".format(self.request.retries, notification_id) + ) + _handle_error_with_email_retry(self, me, notification_id, notification, countdown) + except Exception as e: + _handle_error_with_email_retry(self, e, notification_id, notification) def _deliver_sms(self, notification_id): @@ -120,15 +111,8 @@ def _deliver_sms(self, notification_id): _check_and_queue_callback_task(notification) except Exception: try: - if self.request.retries == 0: - # Retry immediately, especially as a common failure is for the database data - # replication to be delayed. The immediate retry likely succeeds in these scenarios. - self.retry(queue=QueueNames.RETRY, countdown=0) - else: - # Once the previous retry failed, log the exception and this time, - # retry with the default delay. - current_app.logger.exception("SMS notification delivery for id: {} failed".format(notification_id)) - self.retry(**build_retry_task_params(notification.notification_type, notification.template.process_type)) + current_app.logger.exception("SMS notification delivery for id: {} failed".format(notification_id)) + self.retry(**CeleryParams.retry(None if notification is None else notification.template.process_type)) except self.MaxRetriesExceededError: message = ( "RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. " @@ -137,3 +121,28 @@ def _deliver_sms(self, notification_id): update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE) _check_and_queue_callback_task(notification) raise NotificationTechnicalFailureException(message) + + +def _handle_error_with_email_retry( + task: Task, e: Exception, notification_id: int, notification: Optional[Notification], countdown: Optional[None] = None +): + try: + if task.request.retries <= 10: + current_app.logger.warning("RETRY {}: Email notification {} failed".format(task.request.retries, notification_id)) + else: + current_app.logger.exception("RETRY: Email notification {} failed".format(notification_id), exc_info=e) + # There is an edge case when a notification is not found in the database. + if notification is None or notification.template is None: + task.retry(**CeleryParams.retry(countdown=countdown)) + else: + task.retry(**CeleryParams.retry(notification.template.process_type, countdown)) + except task.MaxRetriesExceededError: + message = ( + "RETRY FAILED: Max retries reached. " + "The task send_email_to_provider failed for notification {}. " + "Notification has been updated to technical-failure".format(notification_id) + ) + update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE) + if notification is not None: + _check_and_queue_callback_task(notification) + raise NotificationTechnicalFailureException(message) diff --git a/app/celery/utils.py b/app/celery/utils.py new file mode 100644 index 0000000000..f849e8f02a --- /dev/null +++ b/app/celery/utils.py @@ -0,0 +1,43 @@ +from typing import Any, Dict, Optional + +from flask import current_app + +from app import config, models + +# Default retry periods for sending notifications. +RETRY_DEFAULT = 300 +RETRY_HIGH = 25 + + +class CeleryParams(object): + # Important to load from the object and not the module to avoid + # circular imports, back and forth between the app and celery modules. + + RETRY_PERIODS = { + models.BULK: RETRY_DEFAULT, + models.NORMAL: RETRY_DEFAULT, + models.PRIORITY: RETRY_HIGH, + None: RETRY_HIGH, # In case we cannot identify the priority, treat it as high. + } + + @staticmethod + def retry(notification_process_type: Optional[str] = None, countdown: Optional[int] = None) -> Dict[str, Any]: + """ + Build task params for the sending parameter retry tasks. + + If the notification is a high priority SMS, set the retry policy to retry every 25 seconds + else fall back to the default retry policy of retrying every 5 minutes. + + Provide an override parameter for cases the calling task wants to override the retry policy. + """ + params: dict[str, Any] = {"queue": config.QueueNames.RETRY} + if current_app.config["FF_CELERY_CUSTOM_TASK_PARAMS"] is False: + return params + + if countdown is not None: + params["countdown"] = countdown + else: + # Overring the retry policy is only supported for SMS for now; + # email support coming later. + params["countdown"] = CeleryParams.RETRY_PERIODS[notification_process_type] + return params diff --git a/app/notifications/__init__.py b/app/notifications/__init__.py index 1ec9036155..e69de29bb2 100644 --- a/app/notifications/__init__.py +++ b/app/notifications/__init__.py @@ -1,36 +0,0 @@ -from typing import Any, Dict - -from flask import current_app - -from app.config import QueueNames -from app.models import BULK, NORMAL, PRIORITY, SMS_TYPE - -# Default retry periods for sending notifications. -RETRY_DEFAULT = 300 -RETRY_HIGH = 25 - -RETRY_PERIODS = { - BULK: RETRY_DEFAULT, - NORMAL: RETRY_DEFAULT, - PRIORITY: RETRY_HIGH, -} - - -def build_retry_task_params(notification_type: str, notification_process_type: str) -> Dict[str, Any]: - """ - Build task params for the sending parameter retry tasks. - - If the notification is a high priority SMS, set the retry policy to retry every 25 seconds - else fall back to the default retry policy of retrying every 5 minutes. - """ - params: dict[str, Any] = {"queue": QueueNames.RETRY} - if current_app.config["FF_CELERY_CUSTOM_TASK_PARAMS"] is False: - return params - - # Overring the retry policy is only supported for SMS for now; - # email support coming later. - if notification_type == SMS_TYPE: - params["countdown"] = RETRY_PERIODS[notification_process_type] - else: - params["countdown"] = RETRY_DEFAULT - return params diff --git a/app/notifications/callbacks.py b/app/notifications/callbacks.py index 712d1c3edc..3cc6d0bb91 100644 --- a/app/notifications/callbacks.py +++ b/app/notifications/callbacks.py @@ -1,3 +1,5 @@ +from flask import current_app + from app.celery.service_callback_tasks import send_delivery_status_to_service from app.config import QueueNames from app.dao.service_callback_api_dao import ( @@ -6,11 +8,13 @@ def _check_and_queue_callback_task(notification): + if notification is None: + current_app.logger.warning("No notification provided, cannot queue callback task") + return # queue callback task only if the service_callback_api exists service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) if service_callback_api: notification_data = create_delivery_status_callback_data(notification, service_callback_api) - send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS) diff --git a/tests/app/celery/test_provider_tasks.py b/tests/app/celery/test_provider_tasks.py index aae4707941..e79d8176d3 100644 --- a/tests/app/celery/test_provider_tasks.py +++ b/tests/app/celery/test_provider_tasks.py @@ -64,7 +64,7 @@ def test_should_add_to_retry_queue_if_notification_not_found_in_deliver_sms_task sms_method(notification_id) app.delivery.send_to_providers.send_sms_to_provider.assert_not_called() - getattr(app.celery.provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=0) + getattr(app.celery.provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=25) def test_should_call_send_email_to_provider_from_deliver_email_task( @@ -87,7 +87,7 @@ def test_should_add_to_retry_queue_if_notification_not_found_in_deliver_email_ta deliver_email(notification_id) app.delivery.send_to_providers.send_email_to_provider.assert_not_called() - app.celery.provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks") + app.celery.provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks", countdown=25) # DO THESE FOR THE 4 TYPES OF TASK @@ -114,7 +114,7 @@ def test_should_go_into_technical_error_if_exceeds_retries_on_deliver_sms_task( sms_method(sample_notification.id) assert str(sample_notification.id) in str(e.value) - getattr(provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=0) + getattr(provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=300) assert sample_notification.status == "technical-failure" queued_callback.assert_called_once_with(sample_notification) @@ -135,7 +135,7 @@ def test_should_go_into_technical_error_if_exceeds_retries_on_deliver_email_task deliver_email(sample_notification.id) assert str(sample_notification.id) in str(e.value) - provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks") + provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks", countdown=300) assert sample_notification.status == "technical-failure" queued_callback.assert_called_once_with(sample_notification) diff --git a/tests/app/notifications/test_process_notification.py b/tests/app/notifications/test_process_notification.py index c9a627439e..bb194d3c28 100644 --- a/tests/app/notifications/test_process_notification.py +++ b/tests/app/notifications/test_process_notification.py @@ -1,6 +1,5 @@ import datetime import uuid -from typing import Any, Dict from unittest.mock import call import pytest @@ -12,21 +11,19 @@ ) from sqlalchemy.exc import SQLAlchemyError +from app.celery.utils import CeleryParams from app.config import QueueNames from app.dao.service_sms_sender_dao import dao_update_service_sms_sender from app.models import ( BULK, - EMAIL_TYPE, LETTER_TYPE, NORMAL, PRIORITY, - SMS_TYPE, Notification, NotificationHistory, ScheduledNotification, Template, ) -from app.notifications import RETRY_PERIODS, build_retry_task_params from app.notifications.process_notifications import ( choose_queue, create_content_for_notification, @@ -41,6 +38,7 @@ from app.v2.errors import BadRequestError from tests.app.conftest import create_sample_api_key from tests.app.db import create_service, create_service_sms_sender, create_template +from tests.conftest import set_config class TestContentCreation: @@ -1093,23 +1091,47 @@ def test_db_save_and_send_notification_throws_exception_deletes_notification( assert NotificationHistory.query.count() == 0 @pytest.mark.parametrize( - ("notification_type, process_type, expected_retry_period"), + ("process_type, expected_retry_period"), [ - (EMAIL_TYPE, BULK, RETRY_PERIODS[BULK]), - (EMAIL_TYPE, NORMAL, RETRY_PERIODS[NORMAL]), - (EMAIL_TYPE, PRIORITY, RETRY_PERIODS[NORMAL]), - (SMS_TYPE, BULK, RETRY_PERIODS[BULK]), - (SMS_TYPE, NORMAL, RETRY_PERIODS[NORMAL]), - (SMS_TYPE, PRIORITY, RETRY_PERIODS[PRIORITY]), + (BULK, CeleryParams.RETRY_PERIODS[BULK]), + (NORMAL, CeleryParams.RETRY_PERIODS[NORMAL]), + (PRIORITY, CeleryParams.RETRY_PERIODS[PRIORITY]), ], ) - def test_retry_task_parameters(self, notify_api, notification_type, process_type, expected_retry_period): + def test_retry_task_parameters(self, notify_api, process_type, expected_retry_period): with notify_api.app_context(): - params: Dict[str, Any] = build_retry_task_params(notification_type, process_type) + params = CeleryParams.retry(process_type) assert params["queue"] == QueueNames.RETRY assert params["countdown"] == expected_retry_period + @pytest.mark.parametrize( + ("process_type"), + [(BULK), (NORMAL), (PRIORITY), (None)], + ) + def test_retry_task_parameters_with_countdown_override(self, notify_api, process_type): + with notify_api.app_context(): + params = CeleryParams.retry(process_type, countdown=-1) + + assert params["queue"] == QueueNames.RETRY + assert params["countdown"] == -1 + + @pytest.mark.parametrize( + ("process_type, expected_retry_period"), + [ + (BULK, CeleryParams.RETRY_PERIODS[BULK]), + (NORMAL, CeleryParams.RETRY_PERIODS[NORMAL]), + (PRIORITY, CeleryParams.RETRY_PERIODS[PRIORITY]), + (None, CeleryParams.RETRY_PERIODS[PRIORITY]), + ], + ) + def test_retry_task_parameters_with_ff_off(self, notify_api, process_type, expected_retry_period): + with notify_api.app_context(), set_config(notify_api, "FF_CELERY_CUSTOM_TASK_PARAMS", False): + params = CeleryParams.retry(process_type) + + assert params["queue"] == QueueNames.RETRY + assert params.get("countdown") is None + def test_db_save_and_send_notification_throws_exception_when_missing_template(self, sample_api_key, mocker): mocker.patch("app.celery.provider_tasks.deliver_sms.apply_async") assert Notification.query.count() == 0