Skip to content

Commit

Permalink
#2058 Twilio status override, Celery scheduler (#2088)
Browse files Browse the repository at this point in the history
  • Loading branch information
mchlwellman authored Nov 4, 2024
1 parent a352af6 commit eae4afb
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 3 deletions.
50 changes: 50 additions & 0 deletions app/celery/twilio_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from datetime import datetime, timedelta, timezone

from flask import current_app
from sqlalchemy import select

from app import db, notify_celery, twilio_sms_client
from app.celery.exceptions import NonRetryableException
from app.constants import NOTIFICATION_STATUS_TYPES_COMPLETED
from app.models import Notification


def _get_notifications() -> list:
"""Returns a list of notifications not in final state."""

current_app.logger.info('Getting notifications to update status')
one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1)
stmt = (
select(Notification)
.where(Notification.notification_type == 'sms')
.where(Notification.sent_by == 'twilio')
.where(~Notification.status.in_(NOTIFICATION_STATUS_TYPES_COMPLETED))
.where(Notification.created_at < one_hour_ago)
.order_by(Notification.created_at)
.limit(current_app.config['TWILIO_STATUS_PAGE_SIZE'])
)
return db.session.scalars(stmt).all()


@notify_celery.task(name='update-twilio-status')
def update_twilio_status():
"""Update the status of notifications sent via Twilio. This task is scheduled to run every 5 minutes. It fetches
notifications that are not in a final state, limited to the config setting TWILIO_STATUS_PAGE_SIZE, and updates
their status using the app's Twilio client.
"""
notifications = _get_notifications()
current_app.logger.info('Found %s notifications to update', len(notifications))

for notification in notifications:
current_app.logger.info('Updating notification %s with status %s', notification.id, notification.status)
try:
twilio_sms_client.update_notification_status_override(notification.reference)
except NonRetryableException as e:
current_app.logger.error(
'Failed to update notification %s: %s due to rate limit, aborting.', str(notification.id), str(e)
)
break
else:
current_app.logger.info('Notification %s updated', notification.id)

current_app.logger.info('Finished updating notifications')
9 changes: 6 additions & 3 deletions app/clients/sms/twilio.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,11 @@ def get_twilio_message(self, message_sid: str) -> MessageInstance | None:
message = None
try:
message = self._client.messages(message_sid).fetch()
except TwilioRestException:
except TwilioRestException as e:
self.logger.exception('Twilio message not found: %s', message_sid)
if e.status == 429:
self.logger.exception('Twilio rate limit exceeded')
raise NonRetryableException('Twilio rate limit exceeded') from e
return message

def send_sms(
Expand Down Expand Up @@ -303,7 +306,6 @@ def update_notification_status_override(self, message_sid: str) -> None:
self.logger.info('Updating notification status for message: %s', message_sid)

message = self.get_twilio_message(message_sid)
self.logger.debug('Twilio message: %s', message)

if message:
status, status_reason = self._evaluate_status(message_sid, message.status, [])
Expand All @@ -318,8 +320,9 @@ def update_notification_status_override(self, message_sid: str) -> None:
update_dict,
)
self.logger.info(
'Updated notification status for message: %s. Updated %s notifications and %s notification history',
'Updated notification status for message: %s to %s. Updated %s notifications and %s notification history',
message_sid,
status,
updated_count,
updated_history_count,
)
Expand Down
8 changes: 8 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ class Config(object):
'app.celery.process_delivery_status_result_tasks',
'app.celery.v3.notification_tasks',
'app.celery.process_ses_receipts_tasks',
'app.celery.twilio_tasks',
),
'beat_schedule': {
# app/celery/scheduled_tasks.py
Expand Down Expand Up @@ -320,6 +321,11 @@ class Config(object):
'schedule': crontab(hour='13-21', day_of_month='24-31', minute='*/2'),
'options': {'queue': QueueNames.PERIODIC},
},
'update-twilio-status': {
'task': 'update-twilio-status',
'schedule': crontab(hour='*', minute='*/5'),
'options': {'queue': QueueNames.PERIODIC},
},
},
'task_queues': [Queue(queue, Exchange('default'), routing_key=queue) for queue in QueueNames.all_queues()],
'task_routes': {
Expand Down Expand Up @@ -362,6 +368,8 @@ class Config(object):
TWILIO_CALLBACK_USERNAME = os.environ.get('TWILIO_CALLBACK_USERNAME', '')
TWILIO_CALLBACK_PASSWORD = os.environ.get('TWILIO_CALLBACK_PASSWORD', '')
TWILIO_AUTH_TOKEN = os.environ.get('TWILIO_AUTH_TOKEN', '')
TWILIO_STATUS_PAGE_SIZE = 500

FIRETEXT_INBOUND_SMS_AUTH = json.loads(os.getenv('FIRETEXT_INBOUND_SMS_AUTH', '[]'))
MMG_INBOUND_SMS_AUTH = json.loads(os.getenv('MMG_INBOUND_SMS_AUTH', '[]'))
MMG_INBOUND_SMS_USERNAME = json.loads(os.getenv('MMG_INBOUND_SMS_USERNAME', '[]'))
Expand Down
116 changes: 116 additions & 0 deletions tests/app/celery/test_twilio_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from datetime import datetime, timedelta, timezone

from unittest.mock import patch

import pytest

from app.celery.exceptions import NonRetryableException
from app.celery.twilio_tasks import _get_notifications, update_twilio_status
from app.constants import (
NOTIFICATION_CREATED,
NOTIFICATION_SENDING,
NOTIFICATION_SENT,
NOTIFICATION_DELIVERED,
NOTIFICATION_TECHNICAL_FAILURE,
NOTIFICATION_TEMPORARY_FAILURE,
NOTIFICATION_PERMANENT_FAILURE,
NOTIFICATION_PREFERENCES_DECLINED,
)


@pytest.mark.parametrize(
'status, expected',
[
(NOTIFICATION_CREATED, True),
(NOTIFICATION_SENDING, True),
(NOTIFICATION_SENT, False),
(NOTIFICATION_DELIVERED, False),
(NOTIFICATION_TECHNICAL_FAILURE, False),
(NOTIFICATION_TEMPORARY_FAILURE, False),
(NOTIFICATION_PERMANENT_FAILURE, False),
(NOTIFICATION_PREFERENCES_DECLINED, False),
],
)
def test_get_notifications_statuses(sample_notification, status, expected):
"""Test that _get_notifications() returns either a list with the test notification, or an empty list, depending
on the parametrized status. If the status is in the NOTIFICATION_STATUS_TYPES_COMPLETED list, the notification is
not returned."""
created_at = datetime.now(timezone.utc) - timedelta(minutes=90)
notification = sample_notification(created_at=created_at, status=status, sent_by='twilio')

notifications = _get_notifications()
notification_ids = [n.id for n in notifications]
if expected:
assert notification.id in notification_ids
else:
assert notification.id not in notification_ids


@pytest.mark.parametrize(
'minute_offset, expected',
[
(5, False),
(45, False),
(90, True),
(180, True),
],
)
def test_get_notifications_datefilter(sample_notification, minute_offset, expected):
"""Test that _get_notifications() returns either a list with the test notification, or an empty list, depending
on the parametrized minute_offset. If the notification was created less than one hour ago, it is not returned."""
created_at = datetime.now(timezone.utc) - timedelta(minutes=minute_offset)
notification = sample_notification(created_at=created_at, status=NOTIFICATION_CREATED, sent_by='twilio')

notifications = _get_notifications()
notification_ids = [n.id for n in notifications]
if expected:
assert notification.id in notification_ids
else:
assert notification.id not in notification_ids


def test_update_twilio_status_with_results(mocker, sample_notification):
"""Test that update_twilio_status() calls twilio_sms_client.update_notification_status_override() with the
notification reference when there are notifications to update."""
notification = sample_notification(status=NOTIFICATION_CREATED, sent_by='twilio')

mocker.patch('app.celery.twilio_tasks._get_notifications', return_value=[notification])

with patch(
'app.celery.twilio_tasks.twilio_sms_client.update_notification_status_override'
) as mock_update_notification_status_override:
update_twilio_status()

mock_update_notification_status_override.assert_called_once_with(notification.reference)


def test_update_twilio_status_no_results(mocker):
"""Test that update_twilio_status() does not call twilio_sms_client.update_notification_status_override() when
there are no notifications to update."""
mocker.patch('app.celery.twilio_tasks._get_notifications', return_value=[])

with patch(
'app.celery.twilio_tasks.twilio_sms_client.update_notification_status_override'
) as mock_update_notification_status_override:
update_twilio_status()

mock_update_notification_status_override.assert_not_called()


def test_update_twilio_status_exception(mocker, sample_notification):
"""Test that update_twilio_status() logs an error when twilio_sms_client.update_notification_status_override()
raises a NonRetryableException, and does not update any more notifications."""
created_at = datetime.now(timezone.utc) - timedelta(minutes=99)
notification_one = sample_notification(status=NOTIFICATION_CREATED, sent_by='twilio', created_at=created_at)
created_at = datetime.now(timezone.utc) - timedelta(minutes=90)
sample_notification(status=NOTIFICATION_CREATED, sent_by='twilio', created_at=created_at)
mock_twilio_status_override = mocker.patch(
'app.celery.twilio_tasks.twilio_sms_client.update_notification_status_override',
side_effect=[NonRetryableException('Test exception')],
)
mock_logger = mocker.patch('app.celery.twilio_tasks.current_app.logger.error')
update_twilio_status()
mock_logger.assert_called_once_with(
'Failed to update notification %s: %s due to rate limit, aborting.', str(notification_one.id), 'Test exception'
)
mock_twilio_status_override.assert_called_once_with(notification_one.reference)

0 comments on commit eae4afb

Please sign in to comment.