Skip to content

Commit

Permalink
#2011 - Notification status and status_reason consistency (#2030)
Browse files Browse the repository at this point in the history
  • Loading branch information
kalbfled authored Oct 8, 2024
1 parent 1c94753 commit 3a31e91
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 118 deletions.
29 changes: 22 additions & 7 deletions app/celery/process_delivery_status_result_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def process_delivery_status(
self,
event: CeleryEvent,
) -> bool:
"""Celery task for updating the delivery status of a notification"""
"""
This is a Celery task for updating the delivery status of a notification.
"""

# preset variables to address "unbounded local variable"
sqs_message = None
Expand All @@ -56,7 +58,7 @@ def process_delivery_status(
current_app.logger.info('retrieved delivery status body: %s', body)

# get notification_platform_status
notification_platform_status = _get_notification_platform_status(self, provider, body, sqs_message)
notification_platform_status: dict = _get_notification_platform_status(self, provider, body, sqs_message)

# get parameters from notification platform status
current_app.logger.info('Get Notification Parameters')
Expand All @@ -68,7 +70,7 @@ def process_delivery_status(
price_in_millicents_usd,
) = _get_notification_parameters(notification_platform_status)

# retrieves the inbound message for this provider we are updating the status of the outbound message
# Retrieve the inbound message for this provider. We are updating the status of the outbound message.
notification, should_exit = attempt_to_get_notification(
reference, notification_status, self.request.retries * self.default_retry_delay
)
Expand All @@ -90,7 +92,10 @@ def process_delivery_status(
number_of_message_parts,
getattr(notification, 'id', 'unknown'),
)
_calculate_pricing(price_in_millicents_usd, notification, notification_status, number_of_message_parts)

_calculate_pricing_and_update_notification(
price_in_millicents_usd, notification, notification_status, number_of_message_parts
)

current_app.logger.info(
'%s callback return status of %s for notification: %s',
Expand Down Expand Up @@ -194,20 +199,30 @@ def _get_notification_parameters(notification_platform_status: dict) -> Tuple[st
return payload, reference, notification_status, number_of_message_parts, price_in_millicents_usd


def _calculate_pricing(
def _calculate_pricing_and_update_notification(
price_in_millicents_usd: float, notification: Notification, notification_status: str, number_of_message_parts: int
):
"""Calculate pricing"""
"""
Calculate pricing, and update the notification.
"""

current_app.logger.debug('Calculate pricing and update notification %s', notification.id)

# Delivered messages should not have an associated reason.
status_reason = None if (notification_status == NOTIFICATION_DELIVERED) else notification.status_reason

if price_in_millicents_usd > 0.0:
dao_update_notification_by_id(
notification_id=notification.id,
status=notification_status,
status_reason=status_reason,
segments_count=number_of_message_parts,
cost_in_millicents=price_in_millicents_usd,
)
else:
update_notification_delivery_status(notification_id=notification.id, new_status=notification_status)
update_notification_delivery_status(
notification_id=notification.id, new_status=notification_status, new_status_reason=status_reason
)


def _get_notification_platform_status(
Expand Down
9 changes: 7 additions & 2 deletions app/celery/process_pinpoint_receipt_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def process_pinpoint_results(
)

try:
# This is the new status.
notification_status = get_notification_status(event_type, record_status, reference)

notification, should_exit = attempt_to_get_notification(
Expand All @@ -133,11 +134,15 @@ def process_pinpoint_results(
notification.status_reason = 'The veteran responded with STOP.'
elif record_status == 'OPTED_OUT':
notification.status_reason = 'The veteran is opted-out at the Pinpoint level.'
elif notification_status == NOTIFICATION_DELIVERED:
# Never include a status reason for a delivered notification.
notification.status_reason = None

if price_in_millicents_usd > 0.0:
dao_update_notification_by_id(
notification_id=notification.id,
status=notification_status,
status_reason=notification.status_reason,
segments_count=number_of_message_parts,
cost_in_millicents=price_in_millicents_usd,
)
Expand Down Expand Up @@ -215,8 +220,8 @@ def attempt_to_get_notification(
def check_notification_status(notification: Notification, notification_status: str) -> bool:
"""
Check if the notification status should be updated. If the status has not changed, or if the status is in a final
state, do not update the notification. *Unless* the status is being updated to delivered from a different final
status, in which case, the notification should always be updated.
state, do not update the notification *unless* the status is being updated to delivered from a different final
status. In that case, always update the notification.
Args:
notification (Notification): The notification to check.
Expand Down
44 changes: 25 additions & 19 deletions app/celery/process_ses_receipts_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Notification,
EMAIL_TYPE,
KEY_TYPE_NORMAL,
NOTIFICATION_DELIVERED,
NOTIFICATION_SENDING,
NOTIFICATION_PENDING,
NOTIFICATION_PERMANENT_FAILURE,
Expand Down Expand Up @@ -172,6 +173,7 @@ def process_ses_results( # noqa: C901 (too complex 14 > 10)

aws_response_dict = get_aws_responses(notification_type)

# This is the prospective, updated status.
notification_status = aws_response_dict['notification_status']
reference = ses_message['mail']['messageId']

Expand All @@ -187,8 +189,27 @@ def process_ses_results( # noqa: C901 (too complex 14 > 10)
)
return

# Add status reason to notification if the status is some kind of failure
# Prevent regressing bounce status. Note that this is a test of the existing status; not the new status.
if (
notification.status_reason
and 'bounce' in notification.status_reason
and notification.status
in {
NOTIFICATION_TEMPORARY_FAILURE,
NOTIFICATION_PERMANENT_FAILURE,
}
):
# async from AWS means we may get a delivered status after a bounce, in rare cases
current_app.logger.warning(
'Notification: %s was marked as a bounce, cannot be updated to: %s',
notification.id,
notification_status,
)
return

# This is a test of the new status. Is it a bounce?
if notification_status in (NOTIFICATION_TEMPORARY_FAILURE, NOTIFICATION_PERMANENT_FAILURE):
# Add the failure status reason to the notification.
if notification_status == NOTIFICATION_PERMANENT_FAILURE:
status_reason = 'Failed to deliver email due to hard bounce'
else:
Expand All @@ -209,24 +230,9 @@ def process_ses_results( # noqa: C901 (too complex 14 > 10)
check_and_queue_va_profile_email_status_callback(notification)

return

# Prevent regressing bounce status
if (
notification.status_reason
and 'bounce' in notification.status_reason
and notification.status
in {
NOTIFICATION_TEMPORARY_FAILURE,
NOTIFICATION_PERMANENT_FAILURE,
}
):
# async from AWS means we may get a delivered status after a bounce, in rare cases
current_app.logger.warning(
'Notification: %s was marked as a bounce, cannot be updated to: %s',
notification.id,
notification_status,
)
return
elif notification_status == NOTIFICATION_DELIVERED:
# Delivered messages should never have a status reason.
notification.status_reason = None

if notification.status not in (NOTIFICATION_SENDING, NOTIFICATION_PENDING):
notifications_dao.duplicate_update_warning(notification, notification_status)
Expand Down
38 changes: 26 additions & 12 deletions app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def country_records_delivery(phone_prefix):
def _get_notification_status_update_statement(
notification_id: str,
incoming_status: str,
incoming_status_reason: str | None,
**kwargs,
):
"""
Expand All @@ -136,13 +137,15 @@ def _get_notification_status_update_statement(
Args:
notification_id (str): String value of notification uuid to be updated
incoming_status (str): String value of the status the given notification will attempt to update to
notification (Notification): The notification to update, or None
incoming_status (str): String value of the status to which the given notification will attempt to update
incoming_status_reason (str): String value of the status reason to which the given notification will
attempt to update, or None
**kwargs: Additional key-value pairs to be updated
Returns:
update_statement: An update statement to be executed, or None if the notification should not be updated
"""

notification = db.session.get(Notification, notification_id)
if notification is None:
current_app.logger.error(
Expand All @@ -156,6 +159,9 @@ def _get_notification_status_update_statement(
elif incoming_status == NOTIFICATION_TEMPORARY_FAILURE:
kwargs['status'] = NOTIFICATION_TEMPORARY_FAILURE

if incoming_status_reason is not None:
kwargs['status_reason'] = incoming_status_reason

kwargs['updated_at'] = datetime.utcnow()

# Define the allowed update conditions
Expand Down Expand Up @@ -191,8 +197,7 @@ def _get_notification_status_update_statement(


def _update_notification_status(
notification: Notification,
status: str,
notification: Notification, status: str, status_reason: str | None = None
) -> Notification:
"""
Update the notification status if it should be updated.
Expand All @@ -212,7 +217,7 @@ def _update_notification_status(
'Attempting to update notification %s to a status that does not exist %s', notification.id, status
)

update_statement = _get_notification_status_update_statement(notification.id, status)
update_statement = _get_notification_status_update_statement(notification.id, status, status_reason)

try:
db.session.execute(update_statement)
Expand Down Expand Up @@ -277,6 +282,7 @@ def update_notification_status_by_id(
def update_notification_delivery_status(
notification_id: UUID,
new_status: str,
new_status_reason: str = None,
) -> None:
"""
Update a notification's delivery status.
Expand All @@ -287,10 +293,15 @@ def update_notification_delivery_status(
notification_id (UUID): Notification id,
new_status (str): Status to update the notification to,
"""

current_app.logger.info('Update notification: %s to status: %s', notification_id, new_status)
stmt = _get_notification_status_update_statement(
notification_id=notification_id,
incoming_status=new_status,
incoming_status_reason=new_status_reason,
)

try:
stmt = _get_notification_status_update_statement(notification_id=notification_id, incoming_status=new_status)
db.session.execute(stmt)
db.session.commit()
except Exception as e:
Expand All @@ -301,10 +312,7 @@ def update_notification_delivery_status(

@statsd(namespace='dao')
@transactional
def update_notification_status_by_reference(
reference,
status,
):
def update_notification_status_by_reference(reference: UUID, status: str, status_reason: str | None = None):
# this is used to update letters and emails
stmt = select(Notification).where(Notification.reference == reference)
notification = db.session.scalar(stmt)
Expand All @@ -323,7 +331,7 @@ def update_notification_status_by_reference(
if current_index < sending_index:
return None

return _update_notification_status(notification=notification, status=status)
return _update_notification_status(notification=notification, status=status, status_reason=status_reason)


@statsd(namespace='dao')
Expand All @@ -343,12 +351,14 @@ def dao_update_notification_by_id(
Returns:
Notification: The updated notification or None if the notification was not found
"""

kwargs['updated_at'] = datetime.utcnow()
status = kwargs.get('status')
status_reason = kwargs.get('status_reason')

if notification_id and status is not None:
update_statement = _get_notification_status_update_statement(
notification_id=notification_id, incoming_status=status, **kwargs
notification_id=notification_id, incoming_status=status, incoming_status_reason=status_reason, **kwargs
)
elif notification_id:
update_statement = update(Notification).where(Notification.id == notification_id).values(kwargs)
Expand Down Expand Up @@ -820,6 +830,10 @@ def dao_update_notifications_by_reference(
references,
update_dict,
):
"""
references - A list or tuple; update or notificaitons in this iterable.
"""

stmt = update(Notification).where(Notification.reference.in_(references)).values(**update_dict)
updated_count = db.session.execute(stmt, execution_options={'synchronize_session': False}).rowcount

Expand Down
Loading

0 comments on commit 3a31e91

Please sign in to comment.