Skip to content

Commit

Permalink
Merge branch 'main' into 2137-send-va-profile-sms-delivery-status
Browse files Browse the repository at this point in the history
  • Loading branch information
MackHalliday authored Dec 4, 2024
2 parents ae3c97e + 3613186 commit d49251f
Show file tree
Hide file tree
Showing 25 changed files with 265 additions and 69 deletions.
2 changes: 1 addition & 1 deletion .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fileignoreconfig:
- filename: lambda_functions/va_profile/va_profile_opt_in_out_lambda.py
checksum: a05165537ffbfac90000c5d04d8628251d771f6d1334c91c3aed28bf6c32368c
- filename: poetry.lock
checksum: 375493d3f6a4c5d0530b40cc06c15f68cd87f1e929f3b73ec456f414a1fc2d57
checksum: 0d77076ee13746c0b551ea8bd40cf15a6f92277102b6d2020bf87a39394af49c
- filename: scripts/trigger_task.py
checksum: 0e9d244dbe285de23fc84bb643407963dacf7d25a3358373f01f6272fb217778
- filename: tests/README.md
Expand Down
3 changes: 3 additions & 0 deletions app/authentication/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
from typing import Callable
from uuid import uuid4

from flask import request, current_app, g
from flask_jwt_extended import verify_jwt_in_request, current_user
Expand Down Expand Up @@ -165,6 +166,8 @@ def wrapper(


def validate_service_api_key_auth(): # noqa: C901
# Set the id here for tracking purposes - becomes notification id
g.request_id = str(uuid4())
request_helper.check_proxy_header_before_request()

auth_token = get_auth_token(request)
Expand Down
86 changes: 84 additions & 2 deletions app/celery/celery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import time

from celery import Celery, Task
from celery.signals import worker_process_shutdown, worker_shutting_down, worker_process_init
from celery import Celery, signals, Task
from celery.signals import task_prerun, task_postrun, worker_process_shutdown, worker_shutting_down, worker_process_init
from flask import current_app


Expand Down Expand Up @@ -90,3 +91,84 @@ def init_app(
)

self.conf.update(app.config['CELERY_SETTINGS'])


class CeleryRequestIdFilter(logging.Filter):
def __init__(self, request_id: str, name=''):
self.request_id = request_id
super().__init__(name)

def filter(self, record) -> bool:
"""Determine if the specified record is to be logged.
Args:
record (LogRecord): The log record representing this log
Returns:
bool: If the record should be logged
"""
record.requestId = self.request_id
return True


def _get_request_id(task_id: str, *args, **kwargs) -> str:
"""Get the notification id if it is available, otherwise use the task id.
Args:
task_id (str): Celery task id
Returns:
str: The request_id to use for all logging related to this task
"""
request_id = ''
try:
# Depending on the call it may be an arg
if len(args) > 1:
# Example: tasks = [deliver_email.si(notification_id=str(notification.id))]; chain(*tasks).apply_async()
request_id = args[1].get('kwargs', {}).get('notification_id', '')

# or kwarg - separated for readability
if not request_id:
# Example: deliver_email.apply_async(args=(),kwargs={'notification_id':str(notification.id)})
request_id = kwargs.get('kwargs', {}).get('notification_id', task_id)
except AttributeError:
logger = logging.getLogger()
logger.exception('celery prerun args: %s | kwargs: %s | task_id: %s', args, kwargs, task_id)
request_id = task_id
return request_id


@task_prerun.connect
def add_id_to_logger(task_id, task, *args, **kwargs):
"""Create filter for all logs related to this task.
Args:
task_id (str): The celery task id
task (Task): The celery Task object
"""
request_id = _get_request_id(task_id, args, kwargs)
current_app.logger.addFilter(CeleryRequestIdFilter(request_id, f'celery-{request_id}'))


@task_postrun.connect
def id_cleanup_logger(task_id, task, *args, **kwargs):
"""Removes previously created filters when they are no longer necessary.
Args:
task_id (str): The celery task id
task (Task): The celery Task object
"""
request_id = _get_request_id(task_id, args, kwargs)
for filter in current_app.logger.filters:
if filter.name == f'celery-{request_id}':
current_app.logger.removeFilter(filter)


@signals.setup_logging.connect
def remove_log_handler(*args, **kwargs) -> None:
"""Remove Celery log handler.
Just by using .connect this will disable the logger hijacking.
https://docs.celeryq.dev/en/stable/userguide/signals.html#setup-logging
"""
pass
4 changes: 2 additions & 2 deletions app/celery/contact_information_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def handle_lookup_contact_info_exception(
f"Can't proceed after querying VA Profile for contact information for {notification.id}. "
'Stopping execution of following tasks. Notification has been updated to permanent-failure.'
)
current_app.logger.warning('%s - %s: %s', e.__class__.__name__, str(e), message)
current_app.logger.info('%s - %s: %s', e.__class__.__name__, str(e), message)

update_notification_status_by_id(
notification.id, NOTIFICATION_PERMANENT_FAILURE, status_reason=e.failure_reason
Expand All @@ -149,11 +149,11 @@ def handle_lookup_contact_info_exception(
# Expected chain termination
lookup_task.request.chain = None
elif isinstance(e, (VAProfileIDNotFoundException, VAProfileNonRetryableException)):
current_app.logger.exception(e)
message = (
f'The task lookup_contact_info failed for notification {notification.id}. '
'Notification has been updated to permanent-failure'
)
current_app.logger.info(message)
update_notification_status_by_id(
notification.id, NOTIFICATION_PERMANENT_FAILURE, status_reason=e.failure_reason
)
Expand Down
2 changes: 2 additions & 0 deletions app/celery/process_ses_receipts_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
from uuid import uuid4

import iso8601
from app.celery.common import log_notification_total_time
Expand Down Expand Up @@ -330,6 +331,7 @@ def process_ses_smtp_results(
created_at=headers['date'],
status=notification_status,
reference=ses_message['mail']['messageId'],
notification_id=uuid4(),
)

if notification_type == 'Complaint':
Expand Down
25 changes: 19 additions & 6 deletions app/celery/service_callback_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,13 @@ def check_and_queue_service_callback_task(notification: Notification, payload=No
# build dictionary for notification
notification_data = create_delivery_status_callback_data(notification, service_callback_api, payload)
send_delivery_status_to_service.apply_async(
[service_callback_api.id, str(notification.id), notification_data], queue=QueueNames.CALLBACKS
args=(),
kwargs={
'service_callback_id': service_callback_api.id,
'notification_id': str(notification.id),
'encrypted_status_update': notification_data,
},
queue=QueueNames.CALLBACKS,
)
else:
current_app.logger.debug(
Expand All @@ -367,6 +373,7 @@ def send_delivery_status_from_notification(
callback_signature: str,
callback_url: str,
notification_data: dict[str, str],
notification_id: str,
) -> None:
"""
Send a delivery status notification to the given callback URL.
Expand All @@ -393,7 +400,7 @@ def send_delivery_status_from_notification(
response.raise_for_status()
except Timeout as e:
current_app.logger.warning(
'Timeout error sending callback for notification %s, url %s', notification_data['id'], callback_url
'Timeout error sending callback for notification %s, url %s', notification_id, callback_url
)
raise AutoRetryException(f'Found {type(e).__name__}, autoretrying...', e)
except RequestException as e:
Expand All @@ -403,7 +410,7 @@ def send_delivery_status_from_notification(
if e.response is not None and e.response.status_code == 429 or e.response.status_code >= 500:
current_app.logger.warning(
'Retryable error sending callback for notification %s, url %s | status code: %s, exception: %s',
notification_data.get('id'),
notification_id,
callback_url,
e.response.status_code if e.response is not None else 'unknown',
str(e),
Expand All @@ -412,7 +419,7 @@ def send_delivery_status_from_notification(
else:
current_app.logger.warning(
'Non-retryable error sending callback for notification %s, url %s | status code: %s, exception: %s',
notification_data.get('id'),
notification_id,
callback_url,
e.response.status_code if e.response is not None else 'unknown',
str(e),
Expand All @@ -421,7 +428,7 @@ def send_delivery_status_from_notification(

current_app.logger.debug(
'Callback successfully sent for notification %s, url: %s | status code: %d',
notification_data.get('id'),
notification_id,
callback_url,
response.status_code,
)
Expand All @@ -440,7 +447,13 @@ def check_and_queue_notification_callback_task(notification: Notification) -> No
callback_signature = generate_callback_signature(notification.api_key_id, notification_data)

send_delivery_status_from_notification.apply_async(
[callback_signature, notification.callback_url, notification_data],
args=(),
kwargs={
'callback_signature': callback_signature,
'callback_url': notification.callback_url,
'notification_data': notification_data,
'notification_id': str(notification.id),
},
queue=QueueNames.CALLBACKS,
)

Expand Down
9 changes: 6 additions & 3 deletions app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,14 @@ def save_sms(

if is_feature_enabled(FeatureFlag.SMS_SENDER_RATE_LIMIT_ENABLED) and sms_sender and sms_sender.rate_limit:
provider_tasks.deliver_sms_with_rate_limiting.apply_async(
[str(saved_notification.id)],
args=(),
kwargs={'notification_id': str(saved_notification.id)},
queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.NOTIFY,
)
else:
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)],
args=(),
kwargs={'notification_id': str(saved_notification.id)},
queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.NOTIFY,
)

Expand Down Expand Up @@ -289,7 +291,8 @@ def save_email(
)

provider_tasks.deliver_email.apply_async(
[str(saved_notification.id)],
args=(),
kwargs={'notification_id': str(saved_notification.id)},
queue=QueueNames.SEND_EMAIL if not service.research_mode else QueueNames.NOTIFY,
)

Expand Down
3 changes: 3 additions & 0 deletions app/integrations/comp_and_pen/scheduled_message_helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from uuid import uuid4

import boto3
from boto3.dynamodb.conditions import Attr
from flask import current_app
Expand Down Expand Up @@ -179,6 +181,7 @@ def send_comp_and_pen_sms(
sms_sender_id=sms_sender_id,
recipient=recipient,
recipient_item=recipient_item,
notification_id=uuid4(),
)
except Exception as e:
current_app.logger.critical(
Expand Down
18 changes: 11 additions & 7 deletions app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from datetime import datetime

from flask import current_app
from flask import current_app, g
from celery import chain

from notifications_utils.clients import redis
Expand Down Expand Up @@ -92,7 +92,9 @@ def persist_notification(
notification_created_at = created_at or datetime.utcnow()

if notification_id is None:
notification_id = uuid.uuid4()
# utils sets this so we can unify logging
# Any internal code that calls this method in a loop cannot use g (Example: send_notification_to_service_users)
notification_id = g.request_id if getattr(g, 'request_id', '') else uuid.uuid4()

notification = Notification(
id=notification_id,
Expand Down Expand Up @@ -170,12 +172,14 @@ def send_notification_to_queue(

if communication_item_id is not None:
if recipient_id_type != IdentifierType.VA_PROFILE_ID.value:
tasks.append(lookup_va_profile_id.si(notification.id).set(queue=QueueNames.LOOKUP_VA_PROFILE_ID))
tasks.append(
lookup_va_profile_id.si(notification_id=notification.id).set(queue=QueueNames.LOOKUP_VA_PROFILE_ID)
)

# Including sms_sender_id is necessary so the correct sender can be chosen.
# https://docs.celeryq.dev/en/v4.4.7/userguide/canvas.html#immutability
deliver_task, queue = _get_delivery_task(notification, research_mode, queue, sms_sender_id)
tasks.append(deliver_task.si(str(notification.id), sms_sender_id).set(queue=queue))
tasks.append(deliver_task.si(notification_id=str(notification.id), sms_sender_id=sms_sender_id).set(queue=queue))

try:
# This executes the task list. Each task calls a function that makes a request to
Expand Down Expand Up @@ -261,15 +265,15 @@ def send_to_queue_for_recipient_info_based_on_recipient_identifier(

else:
tasks = [
lookup_va_profile_id.si(notification.id).set(queue=QueueNames.LOOKUP_VA_PROFILE_ID),
lookup_va_profile_id.si(notification_id=notification.id).set(queue=QueueNames.LOOKUP_VA_PROFILE_ID),
send_va_onsite_notification_task.s(str(notification.template.id), onsite_enabled).set(
queue=QueueNames.NOTIFY
),
]

tasks.append(lookup_contact_info.si(notification.id).set(queue=QueueNames.LOOKUP_CONTACT_INFO))
tasks.append(lookup_contact_info.si(notification_id=notification.id).set(queue=QueueNames.LOOKUP_CONTACT_INFO))
deliver_task, deliver_queue = _get_delivery_task(notification)
tasks.append(deliver_task.si(notification.id).set(queue=deliver_queue))
tasks.append(deliver_task.si(notification_id=notification.id).set(queue=deliver_queue))

try:
# This executes the task list. Each task calls a function that makes a request to
Expand Down
2 changes: 2 additions & 0 deletions app/notifications/send_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def send_notification_bypass_route(
sms_sender_id: str = None,
recipient_item: dict = None,
api_key_type: str = KEY_TYPE_NORMAL,
notification_id: UUID | None = None,
):
"""
This will create a notification and add it to the proper celery queue using the given parameters.
Expand Down Expand Up @@ -118,6 +119,7 @@ def send_notification_bypass_route(
recipient_identifier=recipient_item,
sms_sender_id=sms_sender_id,
reply_to_text=reply_to_text,
notification_id=notification_id,
)

if recipient_item is not None:
Expand Down
3 changes: 3 additions & 0 deletions app/service/sender.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from uuid import uuid4

from flask import current_app

from app.config import QueueNames
Expand Down Expand Up @@ -32,6 +34,7 @@ def send_notification_to_service_users(
api_key_id=None,
key_type=KEY_TYPE_NORMAL,
reply_to_text=notify_service.get_default_reply_to_email_address(),
notification_id=uuid4(),
)
send_notification_to_queue(notification, False, queue=QueueNames.NOTIFY)

Expand Down
2 changes: 1 addition & 1 deletion app/va/va_profile/va_profile_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def get_mobile_telephone_from_contact_info(self, contact_info: ContactInformatio
self.statsd_client.incr('clients.va-profile.get-telephone.failure')
self.statsd_client.incr(f'clients.va-profile.get-{self.PHONE_BIO_TYPE}.no-{self.PHONE_BIO_TYPE}')
raise NoContactInfoException(
f'No {self.PHONE_BIO_TYPE} in response for VA Profile ID {contact_info.get("vaProfileId")}'
f'No {self.PHONE_BIO_TYPE} in response for VA Profile ID {contact_info.get("vaProfileId")} '
f'with AuditId {contact_info.get(self.TX_AUDIT_ID)}'
)

Expand Down
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion tests/app/celery/test_nightly_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,15 @@ def test_timeout_notifications_sends_status_update_to_service(
notify_db_session.session.refresh(notification)
encrypted_data = create_delivery_status_callback_data(notification, callback_api)

mocked.assert_called_with([callback_id, str(notification.id), encrypted_data], queue=QueueNames.CALLBACKS)
mocked.assert_called_with(
args=(),
kwargs={
'service_callback_id': callback_id,
'notification_id': str(notification.id),
'encrypted_status_update': encrypted_data,
},
queue=QueueNames.CALLBACKS,
)


def test_send_daily_performance_stats_calls_does_not_send_if_inactive(client, mocker):
Expand Down
Loading

0 comments on commit d49251f

Please sign in to comment.