-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
send emails using 3 new queues #1997
Changes from 2 commits
9f9fd5e
6e0c512
bbb743e
070e90d
f2e0ea9
db6a1e6
722c5c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,11 +110,7 @@ file. Copy that file to `.env` and customize it to your needs. | |
|
||
## To run the queues | ||
``` | ||
scripts/run_celery.sh | ||
``` | ||
|
||
``` | ||
scripts/run_celery_sms.sh | ||
scripts/run_celery_local.sh | ||
``` | ||
|
||
``` | ||
|
@@ -179,7 +175,7 @@ To help debug full code paths of emails and SMS, we have a special email and pho | |
set in the application's configuration. As it stands at the moment these are the following: | ||
|
||
| Notification Type | Test destination | | ||
|-------------------|--------------------------| | ||
| ----------------- | ------------------------ | | ||
| Email | [email protected] | | ||
| SMS | +16135550123 | | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
#!/bin/sh | ||
|
||
# runs celery with all celery queues except send-throttled-sms-tasks, send-sms-* and send-email-* | ||
|
||
set -e | ||
|
||
echo "Start celery, concurrency: ${CELERY_CONCURRENCY-4}" | ||
|
||
celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q database-tasks,-priority-database-tasks.fifo,-normal-database-tasks,-bulk-database-tasks,job-tasks,notify-internal-tasks,periodic-tasks,priority-tasks,normal-tasks,bulk-tasks,reporting-tasks,research-mode-tasks,retry-tasks, | ||
service-callbacks,delivery-receipts |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
#!/bin/sh | ||
|
||
# runs celery with only the send-email-* queues | ||
|
||
set -e | ||
|
||
echo "Start celery, concurrency: ${CELERY_CONCURRENCY-4}" | ||
|
||
# TODO: we shouldn't be using the send-email-tasks queue anymore - once we verify this we can remove it | ||
celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=${CELERY_CONCURRENCY-4} -Q send-email-tasks,send-email-high,send-email-medium,send-email-low |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -920,7 +920,7 @@ def test_process_rows_sends_save_task( | |
"to": "recip", | ||
"row_number": "row_num", | ||
"personalisation": {"foo": "bar"}, | ||
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else "send-{}-tasks".format(template_type), | ||
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else QueueNames.SEND_EMAIL_MEDIUM, | ||
"client_reference": reference, | ||
"sender_id": str(sender_id) if sender_id else None, | ||
}, | ||
|
@@ -930,12 +930,12 @@ def test_process_rows_sends_save_task( | |
@pytest.mark.parametrize( | ||
"csv_bulk_threshold, template_process_type, expected_queue", | ||
[ | ||
(1_000, PRIORITY, "priority-tasks"), # keep priority when no thresholds are met | ||
(1, PRIORITY, "bulk-tasks"), # autoswitch to bulk queue if bulk threshold is met, even if in priority. | ||
(1, NORMAL, "bulk-tasks"), # autoswitch to bulk queue if bulk threshold is met. | ||
(1_000, NORMAL, "send-email-tasks"), # keep normal priority | ||
(1, BULK, "bulk-tasks"), # keep bulk priority | ||
(1_000, BULK, "send-email-tasks"), # autoswitch to normal queue if normal threshold is met. | ||
(1_000, PRIORITY, QueueNames.SEND_EMAIL_HIGH), # keep priority when no thresholds are met | ||
(1, PRIORITY, QueueNames.SEND_EMAIL_LOW), # autoswitch to bulk queue if bulk threshold is met, even if in priority. | ||
(1, NORMAL, QueueNames.SEND_EMAIL_LOW), # autoswitch to bulk queue if bulk threshold is met. | ||
(1_000, NORMAL, QueueNames.SEND_EMAIL_MEDIUM), # keep normal priority | ||
(1, BULK, QueueNames.SEND_EMAIL_LOW), # keep bulk priority | ||
(1_000, BULK, QueueNames.SEND_EMAIL_MEDIUM), # autoswitch to normal queue if normal threshold is met. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So much better. 👏 |
||
], | ||
) | ||
def test_should_redirect_email_job_to_queue_depending_on_csv_threshold( | ||
|
@@ -1103,7 +1103,7 @@ def test_process_rows_works_without_key_type( | |
"to": "recip", | ||
"row_number": "row_num", | ||
"personalisation": {"foo": "bar"}, | ||
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else "send-{}-tasks".format(template_type), | ||
"queue": QueueNames.SEND_SMS_MEDIUM if template_type == SMS_TYPE else QueueNames.SEND_EMAIL_MEDIUM, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't that change trigger failures for SMS notification type test parameters? 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the SMS ones are being set to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh I read that change incorrectly, it's all orthogonal now and good |
||
"sender_id": str(sender_id) if sender_id else None, | ||
"client_reference": reference, | ||
}, | ||
|
@@ -1582,7 +1582,7 @@ def test_save_emails_should_use_redis_cache_to_retrieve_service_and_template_whe | |
assert persisted_notification.personalisation == {"name": "Jo"} | ||
assert persisted_notification._personalisation == signer_personalisation.sign({"name": "Jo"}) | ||
assert persisted_notification.notification_type == "email" | ||
mocked_deliver_email.assert_called_once_with([str(persisted_notification.id)], queue="send-email-tasks") | ||
mocked_deliver_email.assert_called_once_with([str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM) | ||
if sender_id: | ||
mocked_get_sender_id.assert_called_once_with(persisted_notification.service_id, sender_id) | ||
|
||
|
@@ -1633,9 +1633,11 @@ def test_should_put_save_email_task_in_research_mode_queue_if_research_mode_serv | |
[str(persisted_notification.id)], queue="research-mode-tasks" | ||
) | ||
|
||
@pytest.mark.parametrize("process_type", ["priority", "bulk"]) | ||
@pytest.mark.parametrize( | ||
"process_type,expected_queue", [("priority", QueueNames.SEND_EMAIL_HIGH), ("bulk", QueueNames.SEND_EMAIL_LOW)] | ||
) | ||
def test_should_route_save_email_task_to_appropriate_queue_according_to_template_process_type( | ||
self, notify_db_session, mocker, process_type | ||
self, notify_db_session, mocker, process_type, expected_queue | ||
): | ||
service = create_service() | ||
template = create_template(service=service, template_type="email", process_type=process_type) | ||
|
@@ -1648,14 +1650,12 @@ def test_should_route_save_email_task_to_appropriate_queue_according_to_template | |
save_emails(service.id, [signer_notification.sign(notification)], notification_id) | ||
|
||
persisted_notification = Notification.query.one() | ||
provider_tasks.deliver_email.apply_async.assert_called_once_with( | ||
[str(persisted_notification.id)], queue=f"{process_type}-tasks" | ||
) | ||
provider_tasks.deliver_email.apply_async.assert_called_once_with([str(persisted_notification.id)], queue=expected_queue) | ||
|
||
def test_should_route_save_email_task_to_bulk_on_large_csv_file(self, notify_db_session, mocker): | ||
service = create_service() | ||
template = create_template(service=service, template_type="email", process_type="normal") | ||
notification = _notification_json(template, to="[email protected]", queue="bulk-tasks") | ||
notification = _notification_json(template, to="[email protected]", queue=QueueNames.SEND_EMAIL_LOW) | ||
|
||
mocker.patch("app.celery.provider_tasks.deliver_email.apply_async") | ||
|
||
|
@@ -1664,7 +1664,9 @@ def test_should_route_save_email_task_to_bulk_on_large_csv_file(self, notify_db_ | |
save_emails(service.id, [signer_notification.sign(notification)], notification_id) | ||
|
||
persisted_notification = Notification.query.one() | ||
provider_tasks.deliver_email.apply_async.assert_called_once_with([str(persisted_notification.id)], queue="bulk-tasks") | ||
provider_tasks.deliver_email.apply_async.assert_called_once_with( | ||
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_LOW | ||
) | ||
|
||
def test_should_use_email_template_and_persist( | ||
self, notify_api, sample_email_template_with_placeholders, sample_api_key, mocker | ||
|
@@ -1705,7 +1707,7 @@ def test_should_use_email_template_and_persist( | |
assert persisted_notification.notification_type == "email" | ||
|
||
provider_tasks.deliver_email.apply_async.assert_called_once_with( | ||
[str(persisted_notification.id)], queue="send-email-tasks" | ||
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM | ||
) | ||
mock_over_daily_limit.assert_called_once_with("normal", sample_email_template_with_placeholders.service) | ||
|
||
|
@@ -1734,7 +1736,7 @@ def test_save_email_should_use_template_version_from_job_not_latest(self, sample | |
assert not persisted_notification.sent_by | ||
assert persisted_notification.notification_type == "email" | ||
provider_tasks.deliver_email.apply_async.assert_called_once_with( | ||
[str(persisted_notification.id)], queue="send-email-tasks" | ||
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM | ||
) | ||
|
||
def test_should_use_email_template_subject_placeholders(self, sample_email_template_with_placeholders, mocker): | ||
|
@@ -1756,7 +1758,7 @@ def test_should_use_email_template_subject_placeholders(self, sample_email_templ | |
assert not persisted_notification.reference | ||
assert persisted_notification.notification_type == "email" | ||
provider_tasks.deliver_email.apply_async.assert_called_once_with( | ||
[str(persisted_notification.id)], queue="send-email-tasks" | ||
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM | ||
) | ||
|
||
def test_save_email_uses_the_reply_to_text_when_provided(self, sample_email_template, mocker): | ||
|
@@ -1813,7 +1815,7 @@ def test_should_use_email_template_and_persist_without_personalisation(self, sam | |
assert not persisted_notification.reference | ||
assert persisted_notification.notification_type == "email" | ||
provider_tasks.deliver_email.apply_async.assert_called_once_with( | ||
[str(persisted_notification.id)], queue="send-email-tasks" | ||
[str(persisted_notification.id)], queue=QueueNames.SEND_EMAIL_MEDIUM | ||
) | ||
|
||
def test_save_email_should_go_to_retry_queue_if_database_errors(self, sample_email_template, mocker): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run_celery_core_tasks as an alternative name maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's less clunky!