Skip to content

Commit

Permalink
#1488 BUG: Fix Twilio delivered messages getting stuck in sent (#1516)
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanParish authored Oct 30, 2023
1 parent 8061a84 commit c25359e
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 91 deletions.
29 changes: 10 additions & 19 deletions app/celery/process_delivery_status_result_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

from app.dao.notifications_dao import (
dao_get_notification_by_reference,
dao_update_notification,
dao_update_notification_by_id,
update_notification_status_by_id,
FINAL_STATUS_STATES
)

from typing import Tuple
Expand All @@ -18,20 +19,7 @@
from app.celery.exceptions import AutoRetryException
from app.dao.service_callback_dao import dao_get_callback_include_payload_status

from app.models import (
NOTIFICATION_DELIVERED,
NOTIFICATION_TECHNICAL_FAILURE,
NOTIFICATION_PERMANENT_FAILURE,
Notification,
NOTIFICATION_PREFERENCES_DECLINED,
)

FINAL_STATUS_STATES = [
NOTIFICATION_DELIVERED,
NOTIFICATION_PERMANENT_FAILURE,
NOTIFICATION_TECHNICAL_FAILURE,
NOTIFICATION_PREFERENCES_DECLINED,
]
from app.models import Notification


# Create SQS Queue for Process Deliver Status.
Expand Down Expand Up @@ -194,10 +182,13 @@ def _calculate_pricing(price_in_millicents_usd: float, notification: Notificatio
""" Calculate pricing """
current_app.logger.info("Calculate Pricing")
if price_in_millicents_usd > 0.0:
notification.status = notification_status
notification.segments_count = number_of_message_parts
notification.cost_in_millicents = price_in_millicents_usd
dao_update_notification(notification)
dao_update_notification_by_id(
notification_id=notification.id,
status=notification_status,
segments_count=number_of_message_parts,
cost_in_millicents=price_in_millicents_usd,
updated_at=datetime.utcnow()
)
else:
# notification_id - is the UID in the database for the notification
# status - is the notification platform status generated earlier
Expand Down
9 changes: 1 addition & 8 deletions app/celery/process_pinpoint_receipt_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
dao_get_notification_by_reference,
dao_update_notification,
update_notification_status_by_id,
FINAL_STATUS_STATES
)
from app.feature_flags import FeatureFlag, is_feature_enabled
from app.models import (
Expand All @@ -21,18 +22,10 @@
NOTIFICATION_SENDING,
NOTIFICATION_TEMPORARY_FAILURE,
NOTIFICATION_PERMANENT_FAILURE,
NOTIFICATION_PREFERENCES_DECLINED,
Notification
)
from app.celery.service_callback_tasks import check_and_queue_callback_task

FINAL_STATUS_STATES = [
NOTIFICATION_DELIVERED,
NOTIFICATION_PERMANENT_FAILURE,
NOTIFICATION_TECHNICAL_FAILURE,
NOTIFICATION_PREFERENCES_DECLINED
]

_record_status_status_mapping = {
'SUCCESSFUL': NOTIFICATION_DELIVERED,
'DELIVERED': NOTIFICATION_DELIVERED,
Expand Down
206 changes: 163 additions & 43 deletions app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
)
from notifications_utils.statsd_decorators import statsd
from notifications_utils.timezones import convert_local_timezone_to_utc, convert_utc_to_local_timezone
from sqlalchemy import asc, desc, func, select
from sqlalchemy import asc, desc, func, select, update
from sqlalchemy.exc import ArgumentError
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.exc import NoResultFound
Expand Down Expand Up @@ -46,6 +47,7 @@
NOTIFICATION_TEMPORARY_FAILURE,
NOTIFICATION_PERMANENT_FAILURE,
NOTIFICATION_SENT,
NOTIFICATION_PREFERENCES_DECLINED,
SMS_TYPE,
EMAIL_TYPE,
ServiceDataRetention,
Expand All @@ -54,14 +56,21 @@
from app.utils import get_local_timezone_midnight_in_utc
from app.utils import midnight_n_days_ago, escape_special_characters

TRANSIENT_NOTIFICATION_STATUSES = {
TRANSIENT_STATUSES = (
NOTIFICATION_CREATED,
NOTIFICATION_SENDING,
NOTIFICATION_PENDING,
NOTIFICATION_SENT,
NOTIFICATION_PENDING_VIRUS_CHECK,
NOTIFICATION_TEMPORARY_FAILURE
}
)

FINAL_STATUS_STATES = (
NOTIFICATION_DELIVERED,
NOTIFICATION_PERMANENT_FAILURE,
NOTIFICATION_TECHNICAL_FAILURE,
NOTIFICATION_PREFERENCES_DECLINED,
)


@statsd(namespace="dao")
Expand Down Expand Up @@ -104,11 +113,92 @@ def country_records_delivery(phone_prefix):
return dlr and dlr.lower() == 'yes'


def _get_notification_status_update_statement(notification_id: str, incoming_status: str, **kwargs):
"""
Generates an update statement for the given notification id and status.
Preserves status order and ensures a transient status will not override a final status.
:param notification_id: String value of notification uuid to be updated
:param status: String value of the status the given notification will attempt to update to
:return: An update statement to be executed, or None
"""
notification = db.session.get(Notification, notification_id)
if notification is None:
current_app.logger.error('No notification found when attempting to update status for notification %s',
notification_id)
return None

current_status = notification.status
incoming_status = _decide_permanent_temporary_failure(current_status=current_status, status=incoming_status)

# do not update if the incoming status happens before the current status
if incoming_status in TRANSIENT_STATUSES and current_status in TRANSIENT_STATUSES:
if TRANSIENT_STATUSES.index(incoming_status) < TRANSIENT_STATUSES.index(current_status):
current_app.logger.warning(
'An erroneous attempt was made to transition notification id %s from %s to %s',
notification_id,
current_status,
incoming_status
)
return None

# add status to values dict if it doesn't have anything
if len(kwargs) < 1:
kwargs['status'] = incoming_status
elif incoming_status == NOTIFICATION_TEMPORARY_FAILURE:
kwargs['status'] = NOTIFICATION_TEMPORARY_FAILURE

kwargs['updated_at'] = datetime.utcnow()

# when updating make sure notification is not in final state via query
update_statement = (
update(Notification)
.where(
db.and_(
Notification.id == notification_id,
Notification.status.not_in(FINAL_STATUS_STATES)
)
)
.values(kwargs)
)

return update_statement


def _update_notification_status(notification, status):
status = _decide_permanent_temporary_failure(current_status=notification.status, status=status)
notification.status = status
dao_update_notification(notification)
return notification
"""
Update the notification status if it should be updated.
"""
if not (status in TRANSIENT_STATUSES or status in FINAL_STATUS_STATES):
current_app.logger.error(
'Attempting to update notification %s to a status that does not exist %s', notification.id, status
)
return notification

update_statement = _get_notification_status_update_statement(notification.id, status)

try:
db.session.execute(update_statement)
db.session.commit()
except NoResultFound as e:
current_app.logger.warning(
'No result found when attempting to update notification %s to status %s - The exception: %s',
notification.id, status, e
)
return notification
except ArgumentError as e:
current_app.logger.warning(
'Cannot update notification %s to status %s - exception: %s',
notification.id, status, e
)
return notification
except Exception as e:
current_app.logger.error(
'An error occured when attempting to update notification %s to status %s - The exception %s',
notification.id, status, e
)
return notification

return db.session.get(Notification, notification.id)


@statsd(namespace="dao")
Expand All @@ -121,8 +211,6 @@ def update_notification_status_by_id(
current_status: str = None
) -> Notification:

# the order of notification status that must be maintained
order_matrix = (NOTIFICATION_SENDING, NOTIFICATION_SENT, NOTIFICATION_DELIVERED)
notification_query = Notification.query.with_for_update().filter(Notification.id == notification_id)
if current_status is not None:
notification_query.filter(Notification.status == current_status)
Expand All @@ -136,47 +224,24 @@ def update_notification_status_by_id(
)
return None

if notification.status not in TRANSIENT_NOTIFICATION_STATUSES:
if notification.status not in TRANSIENT_STATUSES:
duplicate_update_warning(notification, status)
return None

if notification.international and not country_records_delivery(notification.phone_prefix):
return None

if not notification.sent_by and sent_by:
notification.sent_by = sent_by

if is_feature_enabled(FeatureFlag.NOTIFICATION_FAILURE_REASON_ENABLED) and status_reason:
notification.status_reason = status_reason

# prevents sent -> sending
if (notification.status == NOTIFICATION_SENT) and (status == NOTIFICATION_SENDING):
current_app.logger.warning(
'attempt was made to transition notification id %s from %s to %s',
notification_id,
notification.status,
status
)
return None

# the new and current status must both be in the order matrix
if (current_status in order_matrix) and (status in order_matrix):
# get the order of the statuses
current_status_index = order_matrix.index(notification.status)
new_status_index = order_matrix.index(status)

# do not update the database if the new status happens before the current status in the database
if new_status_index < current_status_index:
current_app.logger.warning(
'attempt was made to transition notification id %s from %s to %s',
notification_id,
notification.status,
status
)
return None

return _update_notification_status(
notification=notification,
status=status
return dao_update_notification_by_id(
notification_id=notification.id,
status=status,
status_reason=notification.status_reason,
sent_by=notification.sent_by
)


Expand All @@ -190,19 +255,74 @@ def update_notification_status_by_reference(reference, status):
current_app.logger.error("Notification not found for reference %s (update to %s)", reference, status)
return None

if notification.status not in {
NOTIFICATION_SENDING,
NOTIFICATION_PENDING
}:
if notification.status not in TRANSIENT_STATUSES:
duplicate_update_warning(notification, status)
return None

# don't update status by reference if not at least "sending"
current_index = TRANSIENT_STATUSES.index(notification.status)
sending_index = TRANSIENT_STATUSES.index(NOTIFICATION_SENDING)
if current_index < sending_index:
return None

return _update_notification_status(
notification=notification,
status=status
)


@statsd(namespace="dao")
@transactional
def dao_update_notification_by_id(notification_id: str, **kwargs):
"""
Update the notification by ID, ensure kwargs paramaters are named appropriately according to the notification model.
:param notification_id: The notification uuid in string form
:param kwargs: The notification key-value pairs to be updated
Note: Ensure keys are valid according to notification model
:return: Notification or None
"""
kwargs['updated_at'] = datetime.utcnow()
status = kwargs.get('status')

if notification_id and status is not None:
update_statement = _get_notification_status_update_statement(
notification_id=notification_id,
incoming_status=status,
**kwargs
)
elif notification_id:
update_statement = (
update(Notification)
.where(Notification.id == notification_id)
.values(kwargs)
)
else:
current_app.logger.error('ERROR: Attempting to update without providing required notification ID field.')
return None

try:
db.session.execute(update_statement)
db.session.commit()
except NoResultFound as e:
current_app.logger.warning(
'No notification found with id %s when attempting to update - exception %s', notification_id, e
)
return None
except ArgumentError as e:
current_app.logger.warning(
'Cannot update notification %s - exception: %s', notification_id, status, e
)
return None
except Exception as e:
current_app.logger.error(
'Unexpected exception thrown attempting to update notification %s, given parameters for updating '
'notification may be incorrect - Exception: %s', notification_id, e
)
return None

return db.session.get(Notification, notification_id)


@statsd(namespace="dao")
@transactional
def dao_update_notification(notification):
Expand Down
Loading

0 comments on commit c25359e

Please sign in to comment.