Skip to content

Commit

Permalink
Lowering retry period for high priority emails to 25 seconds (#2031)
Browse files Browse the repository at this point in the history
* Decoupled scan malware code + lowering retry period for high priority emails

* Extract common email retry handling logic into its own function

* Cleaned up import

* Forgot to provide default value to optional fn arg

* Fixed test import

* Isolated retry task param builder in a class

* Cleaned up import

* Fixed moved refs

* Trying a different strategy to fix circular import

* Fixing another bad import ref

* Introducing celery utils module instead of using celery root one

* Cover edge cases + modified tests

* Formatting

* Sort imports

* Make notification_process_type param optional

* Fixed edge case when template not associated with notification obj

* Fixing params order

* Fixing regression tests

* More tests

* Added null protection against a potential NPE

* Formatting

* Fix imports

---------

Co-authored-by: Steve Astels <[email protected]>
  • Loading branch information
jimleroyer and sastels authored Dec 14, 2023
1 parent 643ff13 commit 68f2d3c
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 88 deletions.
77 changes: 43 additions & 34 deletions app/celery/provider_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from typing import Optional

from flask import current_app
from notifications_utils.recipients import InvalidEmailError
from notifications_utils.statsd_decorators import statsd
from sqlalchemy.orm.exc import NoResultFound

from app import notify_celery
from app.config import Config, QueueNames
from app.celery.utils import CeleryParams
from app.config import Config
from app.dao import notifications_dao
from app.dao.notifications_dao import update_notification_status_by_id
from app.delivery import send_to_providers
Expand All @@ -14,9 +17,9 @@
MalwareScanInProgressException,
NotificationTechnicalFailureException,
)
from app.models import NOTIFICATION_TECHNICAL_FAILURE
from app.notifications import build_retry_task_params
from app.models import NOTIFICATION_TECHNICAL_FAILURE, Notification
from app.notifications.callbacks import _check_and_queue_callback_task
from celery import Task


# Celery rate limits are per worker instance and not a global rate limit.
Expand Down Expand Up @@ -62,6 +65,7 @@ def deliver_sms(self, notification_id):
@notify_celery.task(bind=True, name="deliver_email", max_retries=48, default_retry_delay=300)
@statsd(namespace="tasks")
def deliver_email(self, notification_id):
notification = None
try:
current_app.logger.debug("Start sending email for notification id: {}".format(notification_id))
notification = notifications_dao.get_notification_by_id(notification_id)
Expand All @@ -81,30 +85,17 @@ def deliver_email(self, notification_id):
_check_and_queue_callback_task(notification)
except MalwareDetectedException:
_check_and_queue_callback_task(notification)
except Exception as e:
if isinstance(e, MalwareScanInProgressException) and self.request.retries <= SCAN_MAX_BACKOFF_RETRIES:
countdown = SCAN_RETRY_BACKOFF * (self.request.retries + 1) # do we need to add 1 here?
except MalwareScanInProgressException as me:
if self.request.retries <= SCAN_MAX_BACKOFF_RETRIES:
countdown = SCAN_RETRY_BACKOFF * (self.request.retries + 1)
else:
countdown = None
try:
current_app.logger.warning(f"The exception is {repr(e)}")
if self.request.retries <= 10:
current_app.logger.warning("RETRY {}: Email notification {} failed".format(self.request.retries, notification_id))
else:
current_app.logger.exception("RETRY: Email notification {} failed".format(notification_id))
if countdown is not None:
self.retry(queue=QueueNames.RETRY, countdown=countdown)
else:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. "
"The task send_email_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(notification_id)
)
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
_check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(message)
current_app.logger.warning(
"RETRY {}: Email notification {} is waiting on pending malware scanning".format(self.request.retries, notification_id)
)
_handle_error_with_email_retry(self, me, notification_id, notification, countdown)
except Exception as e:
_handle_error_with_email_retry(self, e, notification_id, notification)


def _deliver_sms(self, notification_id):
Expand All @@ -120,15 +111,8 @@ def _deliver_sms(self, notification_id):
_check_and_queue_callback_task(notification)
except Exception:
try:
if self.request.retries == 0:
# Retry immediately, especially as a common failure is for the database data
# replication to be delayed. The immediate retry likely succeeds in these scenarios.
self.retry(queue=QueueNames.RETRY, countdown=0)
else:
# Once the previous retry failed, log the exception and this time,
# retry with the default delay.
current_app.logger.exception("SMS notification delivery for id: {} failed".format(notification_id))
self.retry(**build_retry_task_params(notification.notification_type, notification.template.process_type))
current_app.logger.exception("SMS notification delivery for id: {} failed".format(notification_id))
self.retry(**CeleryParams.retry(None if notification is None else notification.template.process_type))
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. "
Expand All @@ -137,3 +121,28 @@ def _deliver_sms(self, notification_id):
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
_check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(message)


def _handle_error_with_email_retry(
task: Task, e: Exception, notification_id: int, notification: Optional[Notification], countdown: Optional[None] = None
):
try:
if task.request.retries <= 10:
current_app.logger.warning("RETRY {}: Email notification {} failed".format(task.request.retries, notification_id))
else:
current_app.logger.exception("RETRY: Email notification {} failed".format(notification_id), exc_info=e)
# There is an edge case when a notification is not found in the database.
if notification is None or notification.template is None:
task.retry(**CeleryParams.retry(countdown=countdown))
else:
task.retry(**CeleryParams.retry(notification.template.process_type, countdown))
except task.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. "
"The task send_email_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(notification_id)
)
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
if notification is not None:
_check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(message)
43 changes: 43 additions & 0 deletions app/celery/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Any, Dict, Optional

from flask import current_app

from app import config, models

# Default retry periods for sending notifications.
RETRY_DEFAULT = 300
RETRY_HIGH = 25


class CeleryParams(object):
# Important to load from the object and not the module to avoid
# circular imports, back and forth between the app and celery modules.

RETRY_PERIODS = {
models.BULK: RETRY_DEFAULT,
models.NORMAL: RETRY_DEFAULT,
models.PRIORITY: RETRY_HIGH,
None: RETRY_HIGH, # In case we cannot identify the priority, treat it as high.
}

@staticmethod
def retry(notification_process_type: Optional[str] = None, countdown: Optional[int] = None) -> Dict[str, Any]:
"""
Build task params for the sending parameter retry tasks.
If the notification is a high priority SMS, set the retry policy to retry every 25 seconds
else fall back to the default retry policy of retrying every 5 minutes.
Provide an override parameter for cases the calling task wants to override the retry policy.
"""
params: dict[str, Any] = {"queue": config.QueueNames.RETRY}
if current_app.config["FF_CELERY_CUSTOM_TASK_PARAMS"] is False:
return params

if countdown is not None:
params["countdown"] = countdown
else:
# Overring the retry policy is only supported for SMS for now;
# email support coming later.
params["countdown"] = CeleryParams.RETRY_PERIODS[notification_process_type]
return params
36 changes: 0 additions & 36 deletions app/notifications/__init__.py
Original file line number Diff line number Diff line change
@@ -1,36 +0,0 @@
from typing import Any, Dict

from flask import current_app

from app.config import QueueNames
from app.models import BULK, NORMAL, PRIORITY, SMS_TYPE

# Default retry periods for sending notifications.
RETRY_DEFAULT = 300
RETRY_HIGH = 25

RETRY_PERIODS = {
BULK: RETRY_DEFAULT,
NORMAL: RETRY_DEFAULT,
PRIORITY: RETRY_HIGH,
}


def build_retry_task_params(notification_type: str, notification_process_type: str) -> Dict[str, Any]:
"""
Build task params for the sending parameter retry tasks.
If the notification is a high priority SMS, set the retry policy to retry every 25 seconds
else fall back to the default retry policy of retrying every 5 minutes.
"""
params: dict[str, Any] = {"queue": QueueNames.RETRY}
if current_app.config["FF_CELERY_CUSTOM_TASK_PARAMS"] is False:
return params

# Overring the retry policy is only supported for SMS for now;
# email support coming later.
if notification_type == SMS_TYPE:
params["countdown"] = RETRY_PERIODS[notification_process_type]
else:
params["countdown"] = RETRY_DEFAULT
return params
6 changes: 5 additions & 1 deletion app/notifications/callbacks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from flask import current_app

from app.celery.service_callback_tasks import send_delivery_status_to_service
from app.config import QueueNames
from app.dao.service_callback_api_dao import (
Expand All @@ -6,11 +8,13 @@


def _check_and_queue_callback_task(notification):
if notification is None:
current_app.logger.warning("No notification provided, cannot queue callback task")
return
# queue callback task only if the service_callback_api exists
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
notification_data = create_delivery_status_callback_data(notification, service_callback_api)

send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS)


Expand Down
8 changes: 4 additions & 4 deletions tests/app/celery/test_provider_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_should_add_to_retry_queue_if_notification_not_found_in_deliver_sms_task
sms_method(notification_id)
app.delivery.send_to_providers.send_sms_to_provider.assert_not_called()

getattr(app.celery.provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=0)
getattr(app.celery.provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=25)


def test_should_call_send_email_to_provider_from_deliver_email_task(
Expand All @@ -87,7 +87,7 @@ def test_should_add_to_retry_queue_if_notification_not_found_in_deliver_email_ta

deliver_email(notification_id)
app.delivery.send_to_providers.send_email_to_provider.assert_not_called()
app.celery.provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks")
app.celery.provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks", countdown=25)


# DO THESE FOR THE 4 TYPES OF TASK
Expand All @@ -114,7 +114,7 @@ def test_should_go_into_technical_error_if_exceeds_retries_on_deliver_sms_task(
sms_method(sample_notification.id)
assert str(sample_notification.id) in str(e.value)

getattr(provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=0)
getattr(provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=300)

assert sample_notification.status == "technical-failure"
queued_callback.assert_called_once_with(sample_notification)
Expand All @@ -135,7 +135,7 @@ def test_should_go_into_technical_error_if_exceeds_retries_on_deliver_email_task
deliver_email(sample_notification.id)
assert str(sample_notification.id) in str(e.value)

provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks")
provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks", countdown=300)
assert sample_notification.status == "technical-failure"
queued_callback.assert_called_once_with(sample_notification)

Expand Down
48 changes: 35 additions & 13 deletions tests/app/notifications/test_process_notification.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime
import uuid
from typing import Any, Dict
from unittest.mock import call

import pytest
Expand All @@ -12,21 +11,19 @@
)
from sqlalchemy.exc import SQLAlchemyError

from app.celery.utils import CeleryParams
from app.config import QueueNames
from app.dao.service_sms_sender_dao import dao_update_service_sms_sender
from app.models import (
BULK,
EMAIL_TYPE,
LETTER_TYPE,
NORMAL,
PRIORITY,
SMS_TYPE,
Notification,
NotificationHistory,
ScheduledNotification,
Template,
)
from app.notifications import RETRY_PERIODS, build_retry_task_params
from app.notifications.process_notifications import (
choose_queue,
create_content_for_notification,
Expand All @@ -41,6 +38,7 @@
from app.v2.errors import BadRequestError
from tests.app.conftest import create_sample_api_key
from tests.app.db import create_service, create_service_sms_sender, create_template
from tests.conftest import set_config


class TestContentCreation:
Expand Down Expand Up @@ -1093,23 +1091,47 @@ def test_db_save_and_send_notification_throws_exception_deletes_notification(
assert NotificationHistory.query.count() == 0

@pytest.mark.parametrize(
("notification_type, process_type, expected_retry_period"),
("process_type, expected_retry_period"),
[
(EMAIL_TYPE, BULK, RETRY_PERIODS[BULK]),
(EMAIL_TYPE, NORMAL, RETRY_PERIODS[NORMAL]),
(EMAIL_TYPE, PRIORITY, RETRY_PERIODS[NORMAL]),
(SMS_TYPE, BULK, RETRY_PERIODS[BULK]),
(SMS_TYPE, NORMAL, RETRY_PERIODS[NORMAL]),
(SMS_TYPE, PRIORITY, RETRY_PERIODS[PRIORITY]),
(BULK, CeleryParams.RETRY_PERIODS[BULK]),
(NORMAL, CeleryParams.RETRY_PERIODS[NORMAL]),
(PRIORITY, CeleryParams.RETRY_PERIODS[PRIORITY]),
],
)
def test_retry_task_parameters(self, notify_api, notification_type, process_type, expected_retry_period):
def test_retry_task_parameters(self, notify_api, process_type, expected_retry_period):
with notify_api.app_context():
params: Dict[str, Any] = build_retry_task_params(notification_type, process_type)
params = CeleryParams.retry(process_type)

assert params["queue"] == QueueNames.RETRY
assert params["countdown"] == expected_retry_period

@pytest.mark.parametrize(
("process_type"),
[(BULK), (NORMAL), (PRIORITY), (None)],
)
def test_retry_task_parameters_with_countdown_override(self, notify_api, process_type):
with notify_api.app_context():
params = CeleryParams.retry(process_type, countdown=-1)

assert params["queue"] == QueueNames.RETRY
assert params["countdown"] == -1

@pytest.mark.parametrize(
("process_type, expected_retry_period"),
[
(BULK, CeleryParams.RETRY_PERIODS[BULK]),
(NORMAL, CeleryParams.RETRY_PERIODS[NORMAL]),
(PRIORITY, CeleryParams.RETRY_PERIODS[PRIORITY]),
(None, CeleryParams.RETRY_PERIODS[PRIORITY]),
],
)
def test_retry_task_parameters_with_ff_off(self, notify_api, process_type, expected_retry_period):
with notify_api.app_context(), set_config(notify_api, "FF_CELERY_CUSTOM_TASK_PARAMS", False):
params = CeleryParams.retry(process_type)

assert params["queue"] == QueueNames.RETRY
assert params.get("countdown") is None

def test_db_save_and_send_notification_throws_exception_when_missing_template(self, sample_api_key, mocker):
mocker.patch("app.celery.provider_tasks.deliver_sms.apply_async")
assert Notification.query.count() == 0
Expand Down

0 comments on commit 68f2d3c

Please sign in to comment.