Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lowering retry period for high priority emails to 25 seconds #2031

Merged
merged 28 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8d4a8f8
Decoupled scan malware code + lowering retry period for high priority…
jimleroyer Nov 20, 2023
8b3ad4e
Extract common email retry handling logic into its own function
jimleroyer Nov 21, 2023
4b40f65
Cleaned up import
jimleroyer Nov 21, 2023
dab3a12
Forgot to provide default value to optional fn arg
jimleroyer Nov 21, 2023
5ec97ae
Fixed test import
jimleroyer Nov 21, 2023
a6a418a
Isolated retry task param builder in a class
jimleroyer Nov 22, 2023
fd12319
Merge remote-tracking branch 'origin/main' into feat/decrease-retry-f…
jimleroyer Nov 22, 2023
1cef267
Cleaned up import
jimleroyer Nov 22, 2023
f4a6470
Fixed moved refs
jimleroyer Nov 22, 2023
17482f5
Trying a different strategy to fix circular import
jimleroyer Nov 22, 2023
e124f21
Fixing another bad import ref
jimleroyer Nov 23, 2023
e5bfea8
Introducing celery utils module instead of using celery root one
jimleroyer Nov 23, 2023
c7f6057
Merge remote-tracking branch 'origin/main' into feat/decrease-retry-f…
jimleroyer Nov 23, 2023
ec41793
Cover edge cases + modified tests
jimleroyer Nov 24, 2023
e084c8f
Merge remote-tracking branch 'origin/main' into feat/decrease-retry-f…
jimleroyer Nov 24, 2023
5433518
Formatting
jimleroyer Nov 24, 2023
85f5140
Sort imports
jimleroyer Nov 24, 2023
9e4bfe6
Make notification_process_type param optional
jimleroyer Nov 24, 2023
d064161
Merge remote-tracking branch 'origin/main' into feat/decrease-retry-f…
jimleroyer Dec 7, 2023
c3e5ee6
Fixed edge case when template not associated with notification obj
jimleroyer Dec 7, 2023
3b33d29
Merge remote-tracking branch 'origin/main' into feat/decrease-retry-f…
jimleroyer Dec 11, 2023
bcd3f16
Fixing params order
jimleroyer Dec 11, 2023
4e236d4
Fixing regression tests
jimleroyer Dec 11, 2023
7264ae8
More tests
jimleroyer Dec 12, 2023
cc991f2
Added null protection against a potential NPE
jimleroyer Dec 12, 2023
5bf854b
Formatting
jimleroyer Dec 12, 2023
2ea5a4b
Fix imports
jimleroyer Dec 12, 2023
e7daf30
Merge branch 'main' into feat/decrease-retry-for-email-high
sastels Dec 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 43 additions & 34 deletions app/celery/provider_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from typing import Optional

from flask import current_app
from notifications_utils.recipients import InvalidEmailError
from notifications_utils.statsd_decorators import statsd
from sqlalchemy.orm.exc import NoResultFound

from app import notify_celery
from app.config import Config, QueueNames
from app.celery.utils import CeleryParams
from app.config import Config
from app.dao import notifications_dao
from app.dao.notifications_dao import update_notification_status_by_id
from app.delivery import send_to_providers
Expand All @@ -14,9 +17,9 @@
MalwareScanInProgressException,
NotificationTechnicalFailureException,
)
from app.models import NOTIFICATION_TECHNICAL_FAILURE
from app.notifications import build_retry_task_params
from app.models import NOTIFICATION_TECHNICAL_FAILURE, Notification
from app.notifications.callbacks import _check_and_queue_callback_task
from celery import Task


# Celery rate limits are per worker instance and not a global rate limit.
Expand Down Expand Up @@ -62,6 +65,7 @@ def deliver_sms(self, notification_id):
@notify_celery.task(bind=True, name="deliver_email", max_retries=48, default_retry_delay=300)
@statsd(namespace="tasks")
def deliver_email(self, notification_id):
notification = None
try:
current_app.logger.debug("Start sending email for notification id: {}".format(notification_id))
notification = notifications_dao.get_notification_by_id(notification_id)
Expand All @@ -81,30 +85,17 @@ def deliver_email(self, notification_id):
_check_and_queue_callback_task(notification)
except MalwareDetectedException:
_check_and_queue_callback_task(notification)
except Exception as e:
if isinstance(e, MalwareScanInProgressException) and self.request.retries <= SCAN_MAX_BACKOFF_RETRIES:
countdown = SCAN_RETRY_BACKOFF * (self.request.retries + 1) # do we need to add 1 here?
except MalwareScanInProgressException as me:
if self.request.retries <= SCAN_MAX_BACKOFF_RETRIES:
countdown = SCAN_RETRY_BACKOFF * (self.request.retries + 1)
else:
countdown = None
try:
current_app.logger.warning(f"The exception is {repr(e)}")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we add this log warning in a specific context? This is not informative of the context. Nonetheless, I added the exception to the parameter while logging the exception down below.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was probably debugging code that slipped through

if self.request.retries <= 10:
current_app.logger.warning("RETRY {}: Email notification {} failed".format(self.request.retries, notification_id))
else:
current_app.logger.exception("RETRY: Email notification {} failed".format(notification_id))
if countdown is not None:
self.retry(queue=QueueNames.RETRY, countdown=countdown)
else:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. "
"The task send_email_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(notification_id)
)
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
_check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(message)
current_app.logger.warning(
"RETRY {}: Email notification {} is waiting on pending malware scanning".format(self.request.retries, notification_id)
)
_handle_error_with_email_retry(self, me, notification_id, notification, countdown)
except Exception as e:
_handle_error_with_email_retry(self, e, notification_id, notification)


def _deliver_sms(self, notification_id):
Expand All @@ -120,15 +111,8 @@ def _deliver_sms(self, notification_id):
_check_and_queue_callback_task(notification)
except Exception:
try:
if self.request.retries == 0:
# Retry immediately, especially as a common failure is for the database data
# replication to be delayed. The immediate retry likely succeeds in these scenarios.
self.retry(queue=QueueNames.RETRY, countdown=0)
else:
# Once the previous retry failed, log the exception and this time,
# retry with the default delay.
current_app.logger.exception("SMS notification delivery for id: {} failed".format(notification_id))
self.retry(**build_retry_task_params(notification.notification_type, notification.template.process_type))
current_app.logger.exception("SMS notification delivery for id: {} failed".format(notification_id))
self.retry(**CeleryParams.retry(None if notification is None else notification.template.process_type))
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. "
Expand All @@ -137,3 +121,28 @@ def _deliver_sms(self, notification_id):
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
_check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(message)


def _handle_error_with_email_retry(
task: Task, e: Exception, notification_id: int, notification: Optional[Notification], countdown: Optional[None] = None
):
try:
if task.request.retries <= 10:
current_app.logger.warning("RETRY {}: Email notification {} failed".format(task.request.retries, notification_id))
else:
current_app.logger.exception("RETRY: Email notification {} failed".format(notification_id), exc_info=e)
# There is an edge case when a notification is not found in the database.
if notification is None or notification.template is None:
task.retry(**CeleryParams.retry(countdown=countdown))
else:
task.retry(**CeleryParams.retry(notification.template.process_type, countdown))
except task.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. "
"The task send_email_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(notification_id)
)
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
if notification is not None:
_check_and_queue_callback_task(notification)
raise NotificationTechnicalFailureException(message)
43 changes: 43 additions & 0 deletions app/celery/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Any, Dict, Optional
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to move the parameters logic in its own module to avoid a circular import issue, as it was previously in a notifications init module. This would create issues.


from flask import current_app

from app import config, models

# Default retry periods for sending notifications.
RETRY_DEFAULT = 300
RETRY_HIGH = 25


class CeleryParams(object):
# Important to load from the object and not the module to avoid
# circular imports, back and forth between the app and celery modules.

RETRY_PERIODS = {
models.BULK: RETRY_DEFAULT,
models.NORMAL: RETRY_DEFAULT,
models.PRIORITY: RETRY_HIGH,
None: RETRY_HIGH, # In case we cannot identify the priority, treat it as high.
}

@staticmethod
def retry(notification_process_type: Optional[str] = None, countdown: Optional[int] = None) -> Dict[str, Any]:
"""
Build task params for the sending parameter retry tasks.

If the notification is a high priority SMS, set the retry policy to retry every 25 seconds
else fall back to the default retry policy of retrying every 5 minutes.

Provide an override parameter for cases the calling task wants to override the retry policy.
"""
params: dict[str, Any] = {"queue": config.QueueNames.RETRY}
if current_app.config["FF_CELERY_CUSTOM_TASK_PARAMS"] is False:
return params

if countdown is not None:
params["countdown"] = countdown
else:
# Overring the retry policy is only supported for SMS for now;
# email support coming later.
params["countdown"] = CeleryParams.RETRY_PERIODS[notification_process_type]
return params
36 changes: 0 additions & 36 deletions app/notifications/__init__.py
Original file line number Diff line number Diff line change
@@ -1,36 +0,0 @@
from typing import Any, Dict

from flask import current_app

from app.config import QueueNames
from app.models import BULK, NORMAL, PRIORITY, SMS_TYPE

# Default retry periods for sending notifications.
RETRY_DEFAULT = 300
RETRY_HIGH = 25

RETRY_PERIODS = {
BULK: RETRY_DEFAULT,
NORMAL: RETRY_DEFAULT,
PRIORITY: RETRY_HIGH,
}


def build_retry_task_params(notification_type: str, notification_process_type: str) -> Dict[str, Any]:
"""
Build task params for the sending parameter retry tasks.

If the notification is a high priority SMS, set the retry policy to retry every 25 seconds
else fall back to the default retry policy of retrying every 5 minutes.
"""
params: dict[str, Any] = {"queue": QueueNames.RETRY}
if current_app.config["FF_CELERY_CUSTOM_TASK_PARAMS"] is False:
return params

# Overring the retry policy is only supported for SMS for now;
# email support coming later.
if notification_type == SMS_TYPE:
params["countdown"] = RETRY_PERIODS[notification_process_type]
else:
params["countdown"] = RETRY_DEFAULT
return params
6 changes: 5 additions & 1 deletion app/notifications/callbacks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from flask import current_app

from app.celery.service_callback_tasks import send_delivery_status_to_service
from app.config import QueueNames
from app.dao.service_callback_api_dao import (
Expand All @@ -6,11 +8,13 @@


def _check_and_queue_callback_task(notification):
if notification is None:
current_app.logger.warning("No notification provided, cannot queue callback task")
return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit more null protection, I think I added not part of the scope of these changes and while pairing with someone.

# queue callback task only if the service_callback_api exists
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
notification_data = create_delivery_status_callback_data(notification, service_callback_api)

send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS)


Expand Down
8 changes: 4 additions & 4 deletions tests/app/celery/test_provider_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_should_add_to_retry_queue_if_notification_not_found_in_deliver_sms_task
sms_method(notification_id)
app.delivery.send_to_providers.send_sms_to_provider.assert_not_called()

getattr(app.celery.provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=0)
getattr(app.celery.provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=25)


def test_should_call_send_email_to_provider_from_deliver_email_task(
Expand All @@ -87,7 +87,7 @@ def test_should_add_to_retry_queue_if_notification_not_found_in_deliver_email_ta

deliver_email(notification_id)
app.delivery.send_to_providers.send_email_to_provider.assert_not_called()
app.celery.provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks")
app.celery.provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks", countdown=25)


# DO THESE FOR THE 4 TYPES OF TASK
Expand All @@ -114,7 +114,7 @@ def test_should_go_into_technical_error_if_exceeds_retries_on_deliver_sms_task(
sms_method(sample_notification.id)
assert str(sample_notification.id) in str(e.value)

getattr(provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=0)
getattr(provider_tasks, sms_method_name).retry.assert_called_with(queue="retry-tasks", countdown=300)

assert sample_notification.status == "technical-failure"
queued_callback.assert_called_once_with(sample_notification)
Expand All @@ -135,7 +135,7 @@ def test_should_go_into_technical_error_if_exceeds_retries_on_deliver_email_task
deliver_email(sample_notification.id)
assert str(sample_notification.id) in str(e.value)

provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks")
provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks", countdown=300)
assert sample_notification.status == "technical-failure"
queued_callback.assert_called_once_with(sample_notification)

Expand Down
48 changes: 35 additions & 13 deletions tests/app/notifications/test_process_notification.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime
import uuid
from typing import Any, Dict
from unittest.mock import call

import pytest
Expand All @@ -12,21 +11,19 @@
)
from sqlalchemy.exc import SQLAlchemyError

from app.celery.utils import CeleryParams
from app.config import QueueNames
from app.dao.service_sms_sender_dao import dao_update_service_sms_sender
from app.models import (
BULK,
EMAIL_TYPE,
LETTER_TYPE,
NORMAL,
PRIORITY,
SMS_TYPE,
Notification,
NotificationHistory,
ScheduledNotification,
Template,
)
from app.notifications import RETRY_PERIODS, build_retry_task_params
from app.notifications.process_notifications import (
choose_queue,
create_content_for_notification,
Expand All @@ -41,6 +38,7 @@
from app.v2.errors import BadRequestError
from tests.app.conftest import create_sample_api_key
from tests.app.db import create_service, create_service_sms_sender, create_template
from tests.conftest import set_config


class TestContentCreation:
Expand Down Expand Up @@ -1093,23 +1091,47 @@ def test_db_save_and_send_notification_throws_exception_deletes_notification(
assert NotificationHistory.query.count() == 0

@pytest.mark.parametrize(
("notification_type, process_type, expected_retry_period"),
("process_type, expected_retry_period"),
[
(EMAIL_TYPE, BULK, RETRY_PERIODS[BULK]),
(EMAIL_TYPE, NORMAL, RETRY_PERIODS[NORMAL]),
(EMAIL_TYPE, PRIORITY, RETRY_PERIODS[NORMAL]),
(SMS_TYPE, BULK, RETRY_PERIODS[BULK]),
(SMS_TYPE, NORMAL, RETRY_PERIODS[NORMAL]),
(SMS_TYPE, PRIORITY, RETRY_PERIODS[PRIORITY]),
(BULK, CeleryParams.RETRY_PERIODS[BULK]),
(NORMAL, CeleryParams.RETRY_PERIODS[NORMAL]),
(PRIORITY, CeleryParams.RETRY_PERIODS[PRIORITY]),
],
)
def test_retry_task_parameters(self, notify_api, notification_type, process_type, expected_retry_period):
def test_retry_task_parameters(self, notify_api, process_type, expected_retry_period):
with notify_api.app_context():
params: Dict[str, Any] = build_retry_task_params(notification_type, process_type)
params = CeleryParams.retry(process_type)

assert params["queue"] == QueueNames.RETRY
assert params["countdown"] == expected_retry_period

@pytest.mark.parametrize(
("process_type"),
[(BULK), (NORMAL), (PRIORITY), (None)],
)
def test_retry_task_parameters_with_countdown_override(self, notify_api, process_type):
with notify_api.app_context():
params = CeleryParams.retry(process_type, countdown=-1)

assert params["queue"] == QueueNames.RETRY
assert params["countdown"] == -1

@pytest.mark.parametrize(
("process_type, expected_retry_period"),
[
(BULK, CeleryParams.RETRY_PERIODS[BULK]),
(NORMAL, CeleryParams.RETRY_PERIODS[NORMAL]),
(PRIORITY, CeleryParams.RETRY_PERIODS[PRIORITY]),
(None, CeleryParams.RETRY_PERIODS[PRIORITY]),
],
)
def test_retry_task_parameters_with_ff_off(self, notify_api, process_type, expected_retry_period):
with notify_api.app_context(), set_config(notify_api, "FF_CELERY_CUSTOM_TASK_PARAMS", False):
params = CeleryParams.retry(process_type)

assert params["queue"] == QueueNames.RETRY
assert params.get("countdown") is None

def test_db_save_and_send_notification_throws_exception_when_missing_template(self, sample_api_key, mocker):
mocker.patch("app.celery.provider_tasks.deliver_sms.apply_async")
assert Notification.query.count() == 0
Expand Down
Loading