From c25359e444e82c21878ae40589a55dfd53f5c3c2 Mon Sep 17 00:00:00 2001 From: Evan Parish <104009494+EvanParish@users.noreply.github.com> Date: Mon, 30 Oct 2023 12:46:52 -0500 Subject: [PATCH] #1488 BUG: Fix Twilio delivered messages getting stuck in sent (#1516) --- .../process_delivery_status_result_tasks.py | 29 +-- app/celery/process_pinpoint_receipt_tasks.py | 9 +- app/dao/notifications_dao.py | 206 ++++++++++++++---- .../notification_dao/test_notification_dao.py | 43 ++-- tests/app/service/test_rest.py | 2 + 5 files changed, 198 insertions(+), 91 deletions(-) diff --git a/app/celery/process_delivery_status_result_tasks.py b/app/celery/process_delivery_status_result_tasks.py index ec9e9aefee..8a9dbbb3a2 100644 --- a/app/celery/process_delivery_status_result_tasks.py +++ b/app/celery/process_delivery_status_result_tasks.py @@ -5,8 +5,9 @@ from app.dao.notifications_dao import ( dao_get_notification_by_reference, - dao_update_notification, + dao_update_notification_by_id, update_notification_status_by_id, + FINAL_STATUS_STATES ) from typing import Tuple @@ -18,20 +19,7 @@ from app.celery.exceptions import AutoRetryException from app.dao.service_callback_dao import dao_get_callback_include_payload_status -from app.models import ( - NOTIFICATION_DELIVERED, - NOTIFICATION_TECHNICAL_FAILURE, - NOTIFICATION_PERMANENT_FAILURE, - Notification, - NOTIFICATION_PREFERENCES_DECLINED, -) - -FINAL_STATUS_STATES = [ - NOTIFICATION_DELIVERED, - NOTIFICATION_PERMANENT_FAILURE, - NOTIFICATION_TECHNICAL_FAILURE, - NOTIFICATION_PREFERENCES_DECLINED, -] +from app.models import Notification # Create SQS Queue for Process Deliver Status. @@ -194,10 +182,13 @@ def _calculate_pricing(price_in_millicents_usd: float, notification: Notificatio """ Calculate pricing """ current_app.logger.info("Calculate Pricing") if price_in_millicents_usd > 0.0: - notification.status = notification_status - notification.segments_count = number_of_message_parts - notification.cost_in_millicents = price_in_millicents_usd - dao_update_notification(notification) + dao_update_notification_by_id( + notification_id=notification.id, + status=notification_status, + segments_count=number_of_message_parts, + cost_in_millicents=price_in_millicents_usd, + updated_at=datetime.utcnow() + ) else: # notification_id - is the UID in the database for the notification # status - is the notification platform status generated earlier diff --git a/app/celery/process_pinpoint_receipt_tasks.py b/app/celery/process_pinpoint_receipt_tasks.py index 8a03c826b2..6aa0d54a21 100644 --- a/app/celery/process_pinpoint_receipt_tasks.py +++ b/app/celery/process_pinpoint_receipt_tasks.py @@ -13,6 +13,7 @@ dao_get_notification_by_reference, dao_update_notification, update_notification_status_by_id, + FINAL_STATUS_STATES ) from app.feature_flags import FeatureFlag, is_feature_enabled from app.models import ( @@ -21,18 +22,10 @@ NOTIFICATION_SENDING, NOTIFICATION_TEMPORARY_FAILURE, NOTIFICATION_PERMANENT_FAILURE, - NOTIFICATION_PREFERENCES_DECLINED, Notification ) from app.celery.service_callback_tasks import check_and_queue_callback_task -FINAL_STATUS_STATES = [ - NOTIFICATION_DELIVERED, - NOTIFICATION_PERMANENT_FAILURE, - NOTIFICATION_TECHNICAL_FAILURE, - NOTIFICATION_PREFERENCES_DECLINED -] - _record_status_status_mapping = { 'SUCCESSFUL': NOTIFICATION_DELIVERED, 'DELIVERED': NOTIFICATION_DELIVERED, diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 7471f104c6..cf76fd1229 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -16,7 +16,8 @@ ) from notifications_utils.statsd_decorators import statsd from notifications_utils.timezones import convert_local_timezone_to_utc, convert_utc_to_local_timezone -from sqlalchemy import asc, desc, func, select +from sqlalchemy import asc, desc, func, select, update +from sqlalchemy.exc import ArgumentError from sqlalchemy.engine.row import Row from sqlalchemy.orm import joinedload from sqlalchemy.orm.exc import NoResultFound @@ -46,6 +47,7 @@ NOTIFICATION_TEMPORARY_FAILURE, NOTIFICATION_PERMANENT_FAILURE, NOTIFICATION_SENT, + NOTIFICATION_PREFERENCES_DECLINED, SMS_TYPE, EMAIL_TYPE, ServiceDataRetention, @@ -54,14 +56,21 @@ from app.utils import get_local_timezone_midnight_in_utc from app.utils import midnight_n_days_ago, escape_special_characters -TRANSIENT_NOTIFICATION_STATUSES = { +TRANSIENT_STATUSES = ( NOTIFICATION_CREATED, NOTIFICATION_SENDING, NOTIFICATION_PENDING, NOTIFICATION_SENT, NOTIFICATION_PENDING_VIRUS_CHECK, NOTIFICATION_TEMPORARY_FAILURE -} +) + +FINAL_STATUS_STATES = ( + NOTIFICATION_DELIVERED, + NOTIFICATION_PERMANENT_FAILURE, + NOTIFICATION_TECHNICAL_FAILURE, + NOTIFICATION_PREFERENCES_DECLINED, +) @statsd(namespace="dao") @@ -104,11 +113,92 @@ def country_records_delivery(phone_prefix): return dlr and dlr.lower() == 'yes' +def _get_notification_status_update_statement(notification_id: str, incoming_status: str, **kwargs): + """ + Generates an update statement for the given notification id and status. + Preserves status order and ensures a transient status will not override a final status. + :param notification_id: String value of notification uuid to be updated + :param status: String value of the status the given notification will attempt to update to + :return: An update statement to be executed, or None + """ + notification = db.session.get(Notification, notification_id) + if notification is None: + current_app.logger.error('No notification found when attempting to update status for notification %s', + notification_id) + return None + + current_status = notification.status + incoming_status = _decide_permanent_temporary_failure(current_status=current_status, status=incoming_status) + + # do not update if the incoming status happens before the current status + if incoming_status in TRANSIENT_STATUSES and current_status in TRANSIENT_STATUSES: + if TRANSIENT_STATUSES.index(incoming_status) < TRANSIENT_STATUSES.index(current_status): + current_app.logger.warning( + 'An erroneous attempt was made to transition notification id %s from %s to %s', + notification_id, + current_status, + incoming_status + ) + return None + + # add status to values dict if it doesn't have anything + if len(kwargs) < 1: + kwargs['status'] = incoming_status + elif incoming_status == NOTIFICATION_TEMPORARY_FAILURE: + kwargs['status'] = NOTIFICATION_TEMPORARY_FAILURE + + kwargs['updated_at'] = datetime.utcnow() + + # when updating make sure notification is not in final state via query + update_statement = ( + update(Notification) + .where( + db.and_( + Notification.id == notification_id, + Notification.status.not_in(FINAL_STATUS_STATES) + ) + ) + .values(kwargs) + ) + + return update_statement + + def _update_notification_status(notification, status): - status = _decide_permanent_temporary_failure(current_status=notification.status, status=status) - notification.status = status - dao_update_notification(notification) - return notification + """ + Update the notification status if it should be updated. + """ + if not (status in TRANSIENT_STATUSES or status in FINAL_STATUS_STATES): + current_app.logger.error( + 'Attempting to update notification %s to a status that does not exist %s', notification.id, status + ) + return notification + + update_statement = _get_notification_status_update_statement(notification.id, status) + + try: + db.session.execute(update_statement) + db.session.commit() + except NoResultFound as e: + current_app.logger.warning( + 'No result found when attempting to update notification %s to status %s - The exception: %s', + notification.id, status, e + ) + return notification + except ArgumentError as e: + current_app.logger.warning( + 'Cannot update notification %s to status %s - exception: %s', + notification.id, status, e + ) + return notification + except Exception as e: + current_app.logger.error( + 'An error occured when attempting to update notification %s to status %s - The exception %s', + notification.id, status, e + ) + return notification + + return db.session.get(Notification, notification.id) @statsd(namespace="dao") @@ -121,8 +211,6 @@ def update_notification_status_by_id( current_status: str = None ) -> Notification: - # the order of notification status that must be maintained - order_matrix = (NOTIFICATION_SENDING, NOTIFICATION_SENT, NOTIFICATION_DELIVERED) notification_query = Notification.query.with_for_update().filter(Notification.id == notification_id) if current_status is not None: notification_query.filter(Notification.status == current_status) @@ -136,47 +224,24 @@ def update_notification_status_by_id( ) return None - if notification.status not in TRANSIENT_NOTIFICATION_STATUSES: + if notification.status not in TRANSIENT_STATUSES: duplicate_update_warning(notification, status) return None if notification.international and not country_records_delivery(notification.phone_prefix): return None + if not notification.sent_by and sent_by: notification.sent_by = sent_by if is_feature_enabled(FeatureFlag.NOTIFICATION_FAILURE_REASON_ENABLED) and status_reason: notification.status_reason = status_reason - # prevents sent -> sending - if (notification.status == NOTIFICATION_SENT) and (status == NOTIFICATION_SENDING): - current_app.logger.warning( - 'attempt was made to transition notification id %s from %s to %s', - notification_id, - notification.status, - status - ) - return None - - # the new and current status must both be in the order matrix - if (current_status in order_matrix) and (status in order_matrix): - # get the order of the statuses - current_status_index = order_matrix.index(notification.status) - new_status_index = order_matrix.index(status) - - # do not update the database if the new status happens before the current status in the database - if new_status_index < current_status_index: - current_app.logger.warning( - 'attempt was made to transition notification id %s from %s to %s', - notification_id, - notification.status, - status - ) - return None - - return _update_notification_status( - notification=notification, - status=status + return dao_update_notification_by_id( + notification_id=notification.id, + status=status, + status_reason=notification.status_reason, + sent_by=notification.sent_by ) @@ -190,19 +255,74 @@ def update_notification_status_by_reference(reference, status): current_app.logger.error("Notification not found for reference %s (update to %s)", reference, status) return None - if notification.status not in { - NOTIFICATION_SENDING, - NOTIFICATION_PENDING - }: + if notification.status not in TRANSIENT_STATUSES: duplicate_update_warning(notification, status) return None + # don't update status by reference if not at least "sending" + current_index = TRANSIENT_STATUSES.index(notification.status) + sending_index = TRANSIENT_STATUSES.index(NOTIFICATION_SENDING) + if current_index < sending_index: + return None + return _update_notification_status( notification=notification, status=status ) +@statsd(namespace="dao") +@transactional +def dao_update_notification_by_id(notification_id: str, **kwargs): + """ + Update the notification by ID, ensure kwargs paramaters are named appropriately according to the notification model. + :param notification_id: The notification uuid in string form + :param kwargs: The notification key-value pairs to be updated + Note: Ensure keys are valid according to notification model + :return: Notification or None + """ + kwargs['updated_at'] = datetime.utcnow() + status = kwargs.get('status') + + if notification_id and status is not None: + update_statement = _get_notification_status_update_statement( + notification_id=notification_id, + incoming_status=status, + **kwargs + ) + elif notification_id: + update_statement = ( + update(Notification) + .where(Notification.id == notification_id) + .values(kwargs) + ) + else: + current_app.logger.error('ERROR: Attempting to update without providing required notification ID field.') + return None + + try: + db.session.execute(update_statement) + db.session.commit() + except NoResultFound as e: + current_app.logger.warning( + 'No notification found with id %s when attempting to update - exception %s', notification_id, e + ) + return None + except ArgumentError as e: + current_app.logger.warning( + 'Cannot update notification %s - exception: %s', notification_id, status, e + ) + return None + except Exception as e: + current_app.logger.error( + 'Unexpected exception thrown attempting to update notification %s, given parameters for updating ' + 'notification may be incorrect - Exception: %s', notification_id, e + ) + return None + + return db.session.get(Notification, notification_id) + + @statsd(namespace="dao") @transactional def dao_update_notification(notification): diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 16de4d9dcd..f9ffc8dca2 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -45,6 +45,7 @@ NOTIFICATION_TEMPORARY_FAILURE, NOTIFICATION_SENDING, NOTIFICATION_PENDING, + NOTIFICATION_PENDING_VIRUS_CHECK, NOTIFICATION_SENT, NOTIFICATION_DELIVERED, KEY_TYPE_NORMAL, @@ -127,27 +128,27 @@ def test_should_not_update_status_by_id_if_not_sending_and_does_not_update_job(s def test_should_not_update_status_by_reference_if_not_sending_and_does_not_update_job(sample_job): notification = create_notification( - template=sample_job.template, status='delivered', reference='reference', job=sample_job + template=sample_job.template, status=NOTIFICATION_DELIVERED, reference='reference', job=sample_job ) - assert Notification.query.get(notification.id).status == 'delivered' - assert not update_notification_status_by_reference('reference', 'failed') - assert Notification.query.get(notification.id).status == 'delivered' + assert Notification.query.get(notification.id).status == NOTIFICATION_DELIVERED + assert not update_notification_status_by_reference('reference', NOTIFICATION_PERMANENT_FAILURE) + assert Notification.query.get(notification.id).status == NOTIFICATION_DELIVERED assert sample_job == Job.query.get(notification.job_id) def test_should_update_status_by_id_if_created(sample_template, sample_notification): - assert Notification.query.get(sample_notification.id).status == 'created' - updated = update_notification_status_by_id(sample_notification.id, 'failed') - assert Notification.query.get(sample_notification.id).status == 'failed' - assert updated.status == 'failed' + assert Notification.query.get(sample_notification.id).status == NOTIFICATION_CREATED + updated = update_notification_status_by_id(sample_notification.id, NOTIFICATION_PERMANENT_FAILURE) + assert Notification.query.get(sample_notification.id).status == NOTIFICATION_PERMANENT_FAILURE + assert updated.status == NOTIFICATION_PERMANENT_FAILURE def test_should_update_status_by_id_if_pending_virus_check(sample_letter_template): - notification = create_notification(template=sample_letter_template, status='pending-virus-check') - assert Notification.query.get(notification.id).status == 'pending-virus-check' - updated = update_notification_status_by_id(notification.id, 'cancelled') - assert Notification.query.get(notification.id).status == 'cancelled' - assert updated.status == 'cancelled' + notification = create_notification(template=sample_letter_template, status=NOTIFICATION_PENDING_VIRUS_CHECK) + assert Notification.query.get(notification.id).status == NOTIFICATION_PENDING_VIRUS_CHECK + updated = update_notification_status_by_id(notification.id, NOTIFICATION_PERMANENT_FAILURE) + assert Notification.query.get(notification.id).status == NOTIFICATION_PERMANENT_FAILURE + assert updated.status == NOTIFICATION_PERMANENT_FAILURE def test_should_update_status_by_id_and_set_sent_by(sample_template): @@ -161,14 +162,14 @@ def test_should_update_status_by_id_and_set_sent_by(sample_template): def test_should_not_update_status_by_reference_if_from_country_with_no_delivery_receipts(sample_template): notification = create_notification( sample_template, - status=NOTIFICATION_SENT, + status=NOTIFICATION_DELIVERED, reference='foo' ) - res = update_notification_status_by_reference('foo', 'failed') + response = update_notification_status_by_reference('foo', 'failed') - assert res is None - assert notification.status == NOTIFICATION_SENT + assert response is None + assert notification.status == NOTIFICATION_DELIVERED def test_should_not_update_status_by_id_if_sent_to_country_with_unknown_delivery_receipts(sample_template): @@ -214,10 +215,10 @@ def test_should_not_update_status_by_id_if_sent_to_country_with_delivery_receipt def test_should_not_update_status_by_reference_if_not_sending(sample_template): - notification = create_notification(template=sample_template, status='created', reference='reference') - assert Notification.query.get(notification.id).status == 'created' - updated = update_notification_status_by_reference('reference', 'failed') - assert Notification.query.get(notification.id).status == 'created' + notification = create_notification(template=sample_template, status=NOTIFICATION_CREATED, reference='reference') + assert Notification.query.get(notification.id).status == NOTIFICATION_CREATED + updated = update_notification_status_by_reference('reference', NOTIFICATION_PERMANENT_FAILURE) + assert Notification.query.get(notification.id).status == NOTIFICATION_CREATED assert not updated diff --git a/tests/app/service/test_rest.py b/tests/app/service/test_rest.py index 95dd6e238d..cf91e65f21 100644 --- a/tests/app/service/test_rest.py +++ b/tests/app/service/test_rest.py @@ -2956,6 +2956,7 @@ def test_cancel_notification_for_service_raises_invalid_request_when_letter_is_i @pytest.mark.parametrize('notification_status', ['created', 'pending-virus-check']) @freeze_time('2018-07-07 16:00:00') +@pytest.mark.skip(reason="Endpoint disabled and slated for removal") def test_cancel_notification_for_service_updates_letter_if_letter_is_in_cancellable_state( admin_request, sample_letter_notification, @@ -2990,6 +2991,7 @@ def test_cancel_notification_for_service_raises_error_if_its_too_late_to_cancel( @freeze_time('2018-7-7 16:00:00') +@pytest.mark.skip(reason="Endpoint disabled and slated for removal") def test_cancel_notification_for_service_updates_letter_if_still_time_to_cancel( admin_request, sample_letter_notification,