Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send emails using 3 new queues #1997

Merged
merged 7 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ file. Copy that file to `.env` and customize it to your needs.

## To run the queues
```
scripts/run_celery.sh
```

```
scripts/run_celery_sms.sh
scripts/run_celery_local.sh
```

```
Expand Down Expand Up @@ -179,7 +175,7 @@ To help debug full code paths of emails and SMS, we have a special email and pho
set in the application's configuration. As it stands at the moment these are the following:

| Notification Type | Test destination |
|-------------------|--------------------------|
| ----------------- | ------------------------ |
| Email | [email protected] |
| SMS | +16135550123 |

Expand Down
18 changes: 12 additions & 6 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from kombu import Exchange, Queue
from notifications_utils import logging

# from app.models import EMAIL_TYPE, SMS_TYPE, Priorities
from celery.schedules import crontab

env = Env()
Expand Down Expand Up @@ -95,8 +94,12 @@ class QueueNames(object):
# we have a limit to send per second and hence, needs to be throttled.
SEND_THROTTLED_SMS = "send-throttled-sms-tasks"

# The queue to send emails by default, normal priority.
# TODO: Deprecate to favor priority queues instead, i.e. bulk, normal, priority.
# Queues for sending all emails.
SEND_EMAIL_HIGH = "send-email-high"
SEND_EMAIL_MEDIUM = "send-email-medium"
SEND_EMAIL_LOW = "send-email-low"

# TODO: Delete this queue once we verify that it is not used anymore.
SEND_EMAIL = "send-email-tasks"

# The research mode queue for notifications that are tested by users trying
Expand Down Expand Up @@ -131,9 +134,9 @@ class QueueNames(object):
Priorities.HIGH: SEND_SMS_HIGH,
},
"email": {
Priorities.LOW: BULK,
Priorities.MEDIUM: SEND_EMAIL,
Priorities.HIGH: PRIORITY,
Priorities.LOW: SEND_EMAIL_LOW,
Priorities.MEDIUM: SEND_EMAIL_MEDIUM,
Priorities.HIGH: SEND_EMAIL_HIGH,
},
"letter": {
Priorities.LOW: BULK,
Expand All @@ -157,6 +160,9 @@ def all_queues():
QueueNames.SEND_SMS_LOW,
QueueNames.SEND_SMS,
QueueNames.SEND_THROTTLED_SMS,
QueueNames.SEND_EMAIL_HIGH,
QueueNames.SEND_EMAIL_MEDIUM,
QueueNames.SEND_EMAIL_LOW,
QueueNames.SEND_EMAIL,
QueueNames.RESEARCH_MODE,
QueueNames.REPORTING,
Expand Down
4 changes: 2 additions & 2 deletions app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def choose_queue(notification, research_mode, queue=None) -> QueueNames:
queue = QueueNames.SEND_SMS_MEDIUM
if notification.notification_type == EMAIL_TYPE:
if not queue:
queue = QueueNames.SEND_EMAIL
queue = QueueNames.SEND_EMAIL_MEDIUM
if notification.notification_type == LETTER_TYPE:
if not queue:
queue = QueueNames.CREATE_LETTERS_PDF
Expand Down Expand Up @@ -264,7 +264,7 @@ def send_notification_to_queue(notification, research_mode, queue=None):
queue = QueueNames.SEND_SMS_MEDIUM
if notification.notification_type == EMAIL_TYPE:
if not queue or queue == QueueNames.NORMAL:
queue = QueueNames.SEND_EMAIL
queue = QueueNames.SEND_EMAIL_MEDIUM
deliver_task = provider_tasks.deliver_email
if notification.notification_type == LETTER_TYPE:
if not queue or queue == QueueNames.NORMAL:
Expand Down
10 changes: 10 additions & 0 deletions scripts/run_celery_core_tasks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/sh

# runs celery with all celery queues except send-throttled-sms-tasks, send-sms-* and send-email-*

set -e

echo "Start celery, concurrency: ${CELERY_CONCURRENCY-4}"

celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks,
service-callbacks,delivery-receipts
2 changes: 1 addition & 1 deletion scripts/run_celery_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ set -e

echo "Start celery, concurrency: ${CELERY_CONCURRENCY-4}"

celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks,send-sms-tasks,send-sms-high,send-sms-medium,send-sms-low,send-throttled-sms-tasks,send-email-tasks,service-callbacks,delivery-receipts
celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks,send-sms-tasks,send-sms-high,send-sms-medium,send-sms-low,send-throttled-sms-tasks,send-email-high,send-email-medium,send-email-low,send-email-tasks,service-callbacks,delivery-receipts
2 changes: 1 addition & 1 deletion scripts/run_celery_no_sms_sending.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ fi

echo "Start celery, concurrency: ${CELERY_CONCURRENCY-4}"

celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks,send-email-tasks,service-callbacks,delivery-receipts
celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks,send-email-tasks,send-email-high,send-email-medium,send-email-low,service-callbacks,delivery-receipts
10 changes: 10 additions & 0 deletions scripts/run_celery_send_email.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/sh

# runs celery with only the send-email-* queues

set -e

echo "Start celery, concurrency: ${CELERY_CONCURRENCY-4}"

# TODO: we shouldn't be using the send-email-tasks queue anymore - once we verify this we can remove it
celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q send-email-tasks,send-email-high,send-email-medium,send-email-low
2 changes: 1 addition & 1 deletion tests/app/celery/test_scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def test_replay_created_notifications(notify_db_session, sample_service, mocker)
save_notification(create_notification(template=email_template, created_at=datetime.utcnow(), status="created"))

replay_created_notifications()
email_delivery_queue.assert_called_once_with([str(old_email.id)], queue="send-email-tasks")
email_delivery_queue.assert_called_once_with([str(old_email.id)], queue=QueueNames.SEND_EMAIL_MEDIUM)
sms_delivery_queue.assert_called_once_with([str(old_sms.id)], queue=QueueNames.SEND_SMS_MEDIUM)


Expand Down
42 changes: 22 additions & 20 deletions tests/app/celery/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ def test_process_rows_sends_save_task(
"to": "recip",
"row_number": "row_num",
"personalisation": {"foo": "bar"},
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else "send-{}-tasks".format(template_type),
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else QueueNames.SEND_EMAIL_MEDIUM,
"client_reference": reference,
"sender_id": str(sender_id) if sender_id else None,
},
Expand All @@ -930,12 +930,12 @@ def test_process_rows_sends_save_task(
@pytest.mark.parametrize(
"csv_bulk_threshold, template_process_type, expected_queue",
[
(1_000, PRIORITY, "priority-tasks"), # keep priority when no thresholds are met
(1, PRIORITY, "bulk-tasks"), # autoswitch to bulk queue if bulk threshold is met, even if in priority.
(1, NORMAL, "bulk-tasks"), # autoswitch to bulk queue if bulk threshold is met.
(1_000, NORMAL, "send-email-tasks"), # keep normal priority
(1, BULK, "bulk-tasks"), # keep bulk priority
(1_000, BULK, "send-email-tasks"), # autoswitch to normal queue if normal threshold is met.
(1_000, PRIORITY, QueueNames.SEND_EMAIL_HIGH), # keep priority when no thresholds are met
(1, PRIORITY, QueueNames.SEND_EMAIL_LOW), # autoswitch to bulk queue if bulk threshold is met, even if in priority.
(1, NORMAL, QueueNames.SEND_EMAIL_LOW), # autoswitch to bulk queue if bulk threshold is met.
(1_000, NORMAL, QueueNames.SEND_EMAIL_MEDIUM), # keep normal priority
(1, BULK, QueueNames.SEND_EMAIL_LOW), # keep bulk priority
(1_000, BULK, QueueNames.SEND_EMAIL_MEDIUM), # autoswitch to normal queue if normal threshold is met.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So much better. 👏

],
)
def test_should_redirect_email_job_to_queue_depending_on_csv_threshold(
Expand Down Expand Up @@ -1103,7 +1103,7 @@ def test_process_rows_works_without_key_type(
"to": "recip",
"row_number": "row_num",
"personalisation": {"foo": "bar"},
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else "send-{}-tasks".format(template_type),
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else QueueNames.SEND_EMAIL_MEDIUM,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that change trigger failures for SMS notification type test parameters? 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the SMS ones are being set to QueueNames.SEND_SMS_MEDIUM as before. Previously the else really should have been just else "send-email-tasks" since there's no letter type in the test parameters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I read that change incorrectly, it's all orthogonal now and good

"sender_id": str(sender_id) if sender_id else None,
"client_reference": reference,
},
Expand Down Expand Up @@ -1582,7 +1582,7 @@ def test_save_emails_should_use_redis_cache_to_retrieve_service_and_template_whe
assert persisted_notification.personalisation == {"name": "Jo"}
assert persisted_notification._personalisation == signer_personalisation.sign({"name": "Jo"})
assert persisted_notification.notification_type == "email"
mocked_deliver_email.assert_called_once_with([str(persisted_notification.id)], queue="send-email-tasks")
mocked_deliver_email.assert_called_once_with([str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM)
if sender_id:
mocked_get_sender_id.assert_called_once_with(persisted_notification.service_id, sender_id)

Expand Down Expand Up @@ -1633,9 +1633,11 @@ def test_should_put_save_email_task_in_research_mode_queue_if_research_mode_serv
[str(persisted_notification.id)], queue="research-mode-tasks"
)

@pytest.mark.parametrize("process_type", ["priority", "bulk"])
@pytest.mark.parametrize(
"process_type,expected_queue", [("priority", QueueNames.SEND_EMAIL_HIGH), ("bulk", QueueNames.SEND_EMAIL_LOW)]
)
def test_should_route_save_email_task_to_appropriate_queue_according_to_template_process_type(
self, notify_db_session, mocker, process_type
self, notify_db_session, mocker, process_type, expected_queue
):
service = create_service()
template = create_template(service=service, template_type="email", process_type=process_type)
Expand All @@ -1648,14 +1650,12 @@ def test_should_route_save_email_task_to_appropriate_queue_according_to_template
save_emails(service.id, [signer_notification.sign(notification)], notification_id)

persisted_notification = Notification.query.one()
provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue=f"{process_type}-tasks"
)
provider_tasks.deliver_email.apply_async.assert_called_once_with([str(persisted_notification.id)], queue=expected_queue)

def test_should_route_save_email_task_to_bulk_on_large_csv_file(self, notify_db_session, mocker):
service = create_service()
template = create_template(service=service, template_type="email", process_type="normal")
notification = _notification_json(template, to="[email protected]", queue="bulk-tasks")
notification = _notification_json(template, to="[email protected]", queue=QueueNames.SEND_EMAIL_LOW)

mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")

Expand All @@ -1664,7 +1664,9 @@ def test_should_route_save_email_task_to_bulk_on_large_csv_file(self, notify_db_
save_emails(service.id, [signer_notification.sign(notification)], notification_id)

persisted_notification = Notification.query.one()
provider_tasks.deliver_email.apply_async.assert_called_once_with([str(persisted_notification.id)], queue="bulk-tasks")
provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_LOW
)

def test_should_use_email_template_and_persist(
self, notify_api, sample_email_template_with_placeholders, sample_api_key, mocker
Expand Down Expand Up @@ -1705,7 +1707,7 @@ def test_should_use_email_template_and_persist(
assert persisted_notification.notification_type == "email"

provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue="send-email-tasks"
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM
)
mock_over_daily_limit.assert_called_once_with("normal", sample_email_template_with_placeholders.service)

Expand Down Expand Up @@ -1734,7 +1736,7 @@ def test_save_email_should_use_template_version_from_job_not_latest(self, sample
assert not persisted_notification.sent_by
assert persisted_notification.notification_type == "email"
provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue="send-email-tasks"
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM
)

def test_should_use_email_template_subject_placeholders(self, sample_email_template_with_placeholders, mocker):
Expand All @@ -1756,7 +1758,7 @@ def test_should_use_email_template_subject_placeholders(self, sample_email_templ
assert not persisted_notification.reference
assert persisted_notification.notification_type == "email"
provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue="send-email-tasks"
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM
)

def test_save_email_uses_the_reply_to_text_when_provided(self, sample_email_template, mocker):
Expand Down Expand Up @@ -1813,7 +1815,7 @@ def test_should_use_email_template_and_persist_without_personalisation(self, sam
assert not persisted_notification.reference
assert persisted_notification.notification_type == "email"
provider_tasks.deliver_email.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue="send-email-tasks"
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM
)

def test_save_email_should_go_to_retry_queue_if_database_errors(self, sample_email_template, mocker):
Expand Down
12 changes: 6 additions & 6 deletions tests/app/notifications/rest/test_send_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_send_notification_with_placeholders_replaced(notify_api, sample_email_t
notification_id = response_data["notification"]["id"]
data.update({"template_version": sample_email_template_with_placeholders.version})

mocked.assert_called_once_with([notification_id], queue="send-email-tasks")
mocked.assert_called_once_with([notification_id], queue=QueueNames.SEND_EMAIL_MEDIUM)
assert response.status_code == 201
assert response_data["body"] == "Hello Jo\nThis is an email from GOV.UK"
assert response_data["subject"] == "Jo"
Expand Down Expand Up @@ -370,7 +370,7 @@ def test_should_allow_valid_email_notification(notify_api, sample_email_template
response_data = json.loads(response.get_data(as_text=True))["data"]
notification_id = response_data["notification"]["id"]
app.celery.provider_tasks.deliver_email.apply_async.assert_called_once_with(
[notification_id], queue="send-email-tasks"
[notification_id], queue=QueueNames.SEND_EMAIL_MEDIUM
)

assert response.status_code == 201
Expand Down Expand Up @@ -563,7 +563,7 @@ def test_should_send_email_if_team_api_key_and_a_service_user(client, sample_ema
headers=[("Content-Type", "application/json"), auth_header],
)

app.celery.provider_tasks.deliver_email.apply_async.assert_called_once_with([fake_uuid], queue="send-email-tasks")
app.celery.provider_tasks.deliver_email.apply_async.assert_called_once_with([fake_uuid], queue=QueueNames.SEND_EMAIL_MEDIUM)
assert response.status_code == 201


Expand Down Expand Up @@ -660,7 +660,7 @@ def test_should_send_sms_if_team_api_key_and_a_service_user(client, sample_templ

@pytest.mark.parametrize(
"template_type,queue_name",
[(SMS_TYPE, QueueNames.SEND_SMS_MEDIUM), (EMAIL_TYPE, "send-email-tasks")],
[(SMS_TYPE, QueueNames.SEND_SMS_MEDIUM), (EMAIL_TYPE, QueueNames.SEND_EMAIL_MEDIUM)],
)
def test_should_persist_notification(
client,
Expand Down Expand Up @@ -710,7 +710,7 @@ def test_should_persist_notification(

@pytest.mark.parametrize(
"template_type,queue_name",
[(SMS_TYPE, QueueNames.SEND_SMS_MEDIUM), (EMAIL_TYPE, "send-email-tasks")],
[(SMS_TYPE, QueueNames.SEND_SMS_MEDIUM), (EMAIL_TYPE, QueueNames.SEND_EMAIL_MEDIUM)],
)
def test_should_delete_notification_and_return_error_if_sqs_fails(
client,
Expand Down Expand Up @@ -1028,7 +1028,7 @@ def test_send_notification_uses_appropriate_queue_when_template_has_process_type
if notification_type == SMS_TYPE:
expected_queue = QueueNames.SEND_SMS_HIGH if process_type == "priority" else QueueNames.SEND_SMS_LOW
else:
expected_queue = f"{process_type}-tasks"
expected_queue = QueueNames.SEND_EMAIL_HIGH if process_type == "priority" else QueueNames.SEND_EMAIL_LOW
mocked.assert_called_once_with([notification_id], queue=expected_queue)


Expand Down
6 changes: 3 additions & 3 deletions tests/app/notifications/test_process_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ class TestSendNotificationQueue:
"deliver_throttled_sms",
),
(False, None, "sms", "normal", None, QueueNames.SEND_SMS_MEDIUM, "deliver_sms"),
(False, None, "email", "normal", None, "send-email-tasks", "deliver_email"),
(False, None, "email", "normal", None, QueueNames.SEND_EMAIL_MEDIUM, "deliver_email"),
(False, None, "sms", "team", None, QueueNames.SEND_SMS_MEDIUM, "deliver_sms"),
(
False,
Expand Down Expand Up @@ -677,7 +677,7 @@ class TestChooseQueue:
"send-throttled-sms-tasks",
),
(False, None, "sms", "normal", None, QueueNames.SEND_SMS_MEDIUM),
(False, None, "email", "normal", None, "send-email-tasks"),
(False, None, "email", "normal", None, QueueNames.SEND_EMAIL_MEDIUM),
(False, None, "sms", "team", None, QueueNames.SEND_SMS_MEDIUM),
(
False,
Expand Down Expand Up @@ -975,7 +975,7 @@ def test_db_save_and_send_notification_saves_to_db(self, client, sample_template
"deliver_throttled_sms",
),
("sms", "normal", None, QueueNames.SEND_SMS_MEDIUM, "deliver_sms"),
("email", "normal", None, "send-email-tasks", "deliver_email"),
("email", "normal", None, QueueNames.SEND_EMAIL_MEDIUM, "deliver_email"),
("sms", "team", None, QueueNames.SEND_SMS_MEDIUM, "deliver_sms"),
("sms", "test", None, "research-mode-tasks", "deliver_sms"),
(
Expand Down
Loading
Loading