diff --git a/.env.example b/.env.example index 8e60a9b5ae..cb36eefda5 100644 --- a/.env.example +++ b/.env.example @@ -19,3 +19,6 @@ AWS_PINPOINT_REGION=us-west-2 AWS_EMF_ENVIRONMENT=local CONTACT_FORM_EMAIL_ADDRESS = "" + +AWS_PINPOINT_SC_POOL_ID= +AWS_PINPOINT_SC_TEMPLATE_IDS= diff --git a/app/__init__.py b/app/__init__.py index 77a2a7d545..c3c144620e 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -28,6 +28,7 @@ PerformancePlatformClient, ) from app.clients.salesforce.salesforce_client import SalesforceClient +from app.clients.sms.aws_pinpoint import AwsPinpointClient from app.clients.sms.aws_sns import AwsSnsClient from app.dbsetup import RoutingSQLAlchemy from app.encryption import CryptoSigner @@ -45,6 +46,7 @@ notify_celery = NotifyCelery() aws_ses_client = AwsSesClient() aws_sns_client = AwsSnsClient() +aws_pinpoint_client = AwsPinpointClient() signer_notification = CryptoSigner() signer_personalisation = CryptoSigner() signer_complaint = CryptoSigner() @@ -107,6 +109,7 @@ def create_app(application, config=None): statsd_client.init_app(application) logging.init_app(application, statsd_client) aws_sns_client.init_app(application, statsd_client=statsd_client) + aws_pinpoint_client.init_app(application, statsd_client=statsd_client) aws_ses_client.init_app(application.config["AWS_REGION"], statsd_client=statsd_client) notify_celery.init_app(application) @@ -120,7 +123,7 @@ def create_app(application, config=None): performance_platform_client.init_app(application) document_download_client.init_app(application) - clients.init_app(sms_clients=[aws_sns_client], email_clients=[aws_ses_client]) + clients.init_app(sms_clients=[aws_sns_client, aws_pinpoint_client], email_clients=[aws_ses_client]) if application.config["FF_SALESFORCE_CONTACT"]: salesforce_client.init_app(application) diff --git a/app/aws/mocks.py b/app/aws/mocks.py index 46c6f5fe10..19aa7a4108 100644 --- a/app/aws/mocks.py +++ b/app/aws/mocks.py @@ -192,6 +192,57 @@ def sns_failed_callback(provider_response, reference=None, timestamp="2016-06-28 return _sns_callback(body) +# Note that 1467074434 = 2016-06-28 00:40:34.558 UTC +def pinpoint_success_callback(reference=None, timestamp=1467074434, destination="+1XXX5550100"): + body = { + "eventType": "TEXT_DELIVERED", + "eventVersion": "1.0", + "eventTimestamp": timestamp, + "isFinal": True, + "originationPhoneNumber": "+13655550100", + "destinationPhoneNumber": destination, + "isoCountryCode": "CA", + "mcc": "302", + "mnc": "610", + "carrierName": "Bell Cellular Inc. / Aliant Telecom", + "messageId": reference, + "messageRequestTimestamp": timestamp, + "messageEncoding": "GSM", + "messageType": "TRANSACTIONAL", + "messageStatus": "DELIVERED", + "messageStatusDescription": "Message has been accepted by phone", + "totalMessageParts": 1, + "totalMessagePrice": 0.00581, + "totalCarrierFee": 0.006, + } + + return _pinpoint_callback(body) + + +# Note that 1467074434 = 2016-06-28 00:40:34.558 UTC +def pinpoint_failed_callback(provider_response, reference=None, timestamp=1467074434, destination="+1XXX5550100"): + body = { + "eventType": "TEXT_CARRIER_UNREACHABLE", + "eventVersion": "1.0", + "eventTimestamp": timestamp, + "isFinal": True, + "originationPhoneNumber": "+13655550100", + "destinationPhoneNumber": destination, + "isoCountryCode": "CA", + "messageId": reference, + "messageRequestTimestamp": timestamp, + "messageEncoding": "GSM", + "messageType": "TRANSACTIONAL", + "messageStatus": "CARRIER_UNREACHABLE", + "messageStatusDescription": provider_response, + "totalMessageParts": 1, + "totalMessagePrice": 0.00581, + "totalCarrierFee": 0.006, + } + + return _pinpoint_callback(body) + + def _ses_bounce_callback(reference, bounce_type, bounce_subtype=None): ses_message_body = { "bounce": { @@ -267,3 +318,19 @@ def _sns_callback(body): "UnsubscribeUrl": "https://sns.ca-central-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REACTED]", "MessageAttributes": {}, } + + +def _pinpoint_callback(body): + return { + "Type": "Notification", + "MessageId": "8e83c020-1234-1234-1234-92a8ee9baa0a", + "TopicArn": "arn:aws:sns:ca-central-1:12341234:ses_notifications", + "Subject": None, + "Message": json.dumps(body), + "Timestamp": "2017-11-17T12:14:03.710Z", + "SignatureVersion": "1", + "Signature": "[REDACTED]", + "SigningCertUrl": "https://sns.ca-central-1.amazonaws.com/SimpleNotificationService-[REDACTED].pem", + "UnsubscribeUrl": "https://sns.ca-central-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REACTED]", + "MessageAttributes": {}, + } diff --git a/app/celery/process_pinpoint_receipts_tasks.py b/app/celery/process_pinpoint_receipts_tasks.py new file mode 100644 index 0000000000..5e9148b2cd --- /dev/null +++ b/app/celery/process_pinpoint_receipts_tasks.py @@ -0,0 +1,146 @@ +from datetime import datetime +from typing import Union + +from flask import current_app, json +from notifications_utils.statsd_decorators import statsd +from sqlalchemy.orm.exc import NoResultFound + +from app import notify_celery, statsd_client +from app.config import QueueNames +from app.dao import notifications_dao +from app.models import ( + NOTIFICATION_DELIVERED, + NOTIFICATION_PERMANENT_FAILURE, + NOTIFICATION_SENT, + NOTIFICATION_TECHNICAL_FAILURE, + NOTIFICATION_TEMPORARY_FAILURE, + PINPOINT_PROVIDER, +) +from app.notifications.callbacks import _check_and_queue_callback_task +from celery.exceptions import Retry + +# Pinpoint receipts are of the form: +# { +# "eventType": "TEXT_DELIVERED", +# "eventVersion": "1.0", +# "eventTimestamp": 1712944268877, +# "isFinal": true, +# "originationPhoneNumber": "+13655550100", +# "destinationPhoneNumber": "+16135550123", +# "isoCountryCode": "CA", +# "mcc": "302", +# "mnc": "610", +# "carrierName": "Bell Cellular Inc. / Aliant Telecom", +# "messageId": "221bc70c-7ee6-4987-b1ba-9684ba25be20", +# "messageRequestTimestamp": 1712944267685, +# "messageEncoding": "GSM", +# "messageType": "TRANSACTIONAL", +# "messageStatus": "DELIVERED", +# "messageStatusDescription": "Message has been accepted by phone", +# "totalMessageParts": 1, +# "totalMessagePrice": 0.00581, +# "totalCarrierFee": 0.006 +# } + + +@notify_celery.task(bind=True, name="process-pinpoint-result", max_retries=5, default_retry_delay=300) +@statsd(namespace="tasks") +def process_pinpoint_results(self, response): + try: + receipt = json.loads(response["Message"]) + reference = receipt["messageId"] + status = receipt["messageStatus"] + provider_response = receipt["messageStatusDescription"] + + notification_status = determine_pinpoint_status(status, provider_response) + if not notification_status: + current_app.logger.warning(f"unhandled provider response for reference {reference}, received '{provider_response}'") + notification_status = NOTIFICATION_TECHNICAL_FAILURE # revert to tech failure by default + + try: + notification = notifications_dao.dao_get_notification_by_reference(reference) + except NoResultFound: + try: + current_app.logger.warning( + f"RETRY {self.request.retries}: notification not found for Pinpoint reference {reference} (update to {notification_status}). " + f"Callback may have arrived before notification was persisted to the DB. Adding task to retry queue" + ) + self.retry(queue=QueueNames.RETRY) + except self.MaxRetriesExceededError: + current_app.logger.warning( + f"notification not found for Pinpoint reference: {reference} (update to {notification_status}). Giving up." + ) + return + if notification.sent_by != PINPOINT_PROVIDER: + current_app.logger.exception(f"Pinpoint callback handled notification {notification.id} not sent by Pinpoint") + return + + if notification.status != NOTIFICATION_SENT: + notifications_dao._duplicate_update_warning(notification, notification_status) + return + + notifications_dao._update_notification_status( + notification=notification, + status=notification_status, + provider_response=provider_response, + ) + + if notification_status != NOTIFICATION_DELIVERED: + current_app.logger.info( + ( + f"Pinpoint delivery failed: notification id {notification.id} and reference {reference} has error found. " + f"Provider response: {provider_response}" + ) + ) + else: + current_app.logger.info( + f"Pinpoint callback return status of {notification_status} for notification: {notification.id}" + ) + + statsd_client.incr(f"callback.pinpoint.{notification_status}") + + if notification.sent_at: + statsd_client.timing_with_dates("callback.pinpoint.elapsed-time", datetime.utcnow(), notification.sent_at) + + _check_and_queue_callback_task(notification) + + except Retry: + raise + + except Exception as e: + current_app.logger.exception(f"Error processing Pinpoint results: {str(e)}") + self.retry(queue=QueueNames.RETRY) + + +def determine_pinpoint_status(status: str, provider_response: str) -> Union[str, None]: + """Determine the notification status based on the SMS status and provider response. + + Args: + status (str): message status from AWS + provider_response (str): detailed status from the SMS provider + + Returns: + Union[str, None]: the notification status or None if the status is not handled + """ + + if status == "DELIVERED": + return NOTIFICATION_DELIVERED + + response_lower = provider_response.lower() + + if "blocked" in response_lower: + return NOTIFICATION_TECHNICAL_FAILURE + elif "invalid" in response_lower: + return NOTIFICATION_TECHNICAL_FAILURE + elif "is opted out" in response_lower: + return NOTIFICATION_PERMANENT_FAILURE + elif "unknown error" in response_lower: + return NOTIFICATION_TECHNICAL_FAILURE + elif "exceed max price" in response_lower: + return NOTIFICATION_TECHNICAL_FAILURE + elif "phone carrier is currently unreachable/unavailable" in response_lower: + return NOTIFICATION_TEMPORARY_FAILURE + elif "phone is currently unreachable/unavailable" in response_lower: + return NOTIFICATION_PERMANENT_FAILURE + else: + return None diff --git a/app/clients/sms/aws_pinpoint.py b/app/clients/sms/aws_pinpoint.py new file mode 100644 index 0000000000..37140323c0 --- /dev/null +++ b/app/clients/sms/aws_pinpoint.py @@ -0,0 +1,57 @@ +from time import monotonic + +import boto3 +import phonenumbers + +from app.clients.sms import SmsClient + + +class AwsPinpointClient(SmsClient): + """ + AWS Pinpoint SMS client + """ + + def init_app(self, current_app, statsd_client, *args, **kwargs): + self._client = boto3.client("pinpoint-sms-voice-v2", region_name="ca-central-1") + super(AwsPinpointClient, self).__init__(*args, **kwargs) + # super(SmsClient, self).__init__(*args, **kwargs) + self.current_app = current_app + self.name = "pinpoint" + self.statsd_client = statsd_client + + def get_name(self): + return self.name + + def send_sms(self, to, content, reference, multi=True, sender=None): + pool_id = self.current_app.config["AWS_PINPOINT_SC_POOL_ID"] + messageType = "TRANSACTIONAL" + matched = False + + for match in phonenumbers.PhoneNumberMatcher(to, "US"): + matched = True + to = phonenumbers.format_number(match.number, phonenumbers.PhoneNumberFormat.E164) + destinationNumber = to + + try: + start_time = monotonic() + response = self._client.send_text_message( + DestinationPhoneNumber=destinationNumber, + OriginationIdentity=pool_id, + MessageBody=content, + MessageType=messageType, + ConfigurationSetName=self.current_app.config["AWS_PINPOINT_CONFIGURATION_SET_NAME"], + ) + except Exception as e: + self.statsd_client.incr("clients.pinpoint.error") + raise Exception(e) + finally: + elapsed_time = monotonic() - start_time + self.current_app.logger.info("AWS Pinpoint request finished in {}".format(elapsed_time)) + self.statsd_client.timing("clients.pinpoint.request-time", elapsed_time) + self.statsd_client.incr("clients.pinpoint.success") + return response["MessageId"] + + if not matched: + self.statsd_client.incr("clients.pinpoint.error") + self.current_app.logger.error("No valid numbers found in {}".format(to)) + raise ValueError("No valid numbers found for SMS delivery") diff --git a/app/config.py b/app/config.py index c4bb23fdac..2fd0cc9a19 100644 --- a/app/config.py +++ b/app/config.py @@ -266,6 +266,9 @@ class Config(object): AWS_SES_ACCESS_KEY = os.getenv("AWS_SES_ACCESS_KEY") AWS_SES_SECRET_KEY = os.getenv("AWS_SES_SECRET_KEY") AWS_PINPOINT_REGION = os.getenv("AWS_PINPOINT_REGION", "us-west-2") + AWS_PINPOINT_SC_POOL_ID = os.getenv("AWS_PINPOINT_SC_POOL_ID", None) + AWS_PINPOINT_CONFIGURATION_SET_NAME = os.getenv("AWS_PINPOINT_CONFIGURATION_SET_NAME", "pinpoint-configuration") + AWS_PINPOINT_SC_TEMPLATE_IDS = env.list("AWS_PINPOINT_SC_TEMPLATE_IDS", []) AWS_US_TOLL_FREE_NUMBER = os.getenv("AWS_US_TOLL_FREE_NUMBER") CSV_UPLOAD_BUCKET_NAME = os.getenv("CSV_UPLOAD_BUCKET_NAME", "notification-alpha-canada-ca-csv-upload") ASSET_DOMAIN = os.getenv("ASSET_DOMAIN", "assets.notification.canada.ca") diff --git a/app/delivery/send_to_providers.py b/app/delivery/send_to_providers.py index ce24bc1131..c291bbd16a 100644 --- a/app/delivery/send_to_providers.py +++ b/app/delivery/send_to_providers.py @@ -46,6 +46,7 @@ NOTIFICATION_SENT, NOTIFICATION_TECHNICAL_FAILURE, NOTIFICATION_VIRUS_SCAN_FAILED, + PINPOINT_PROVIDER, SMS_TYPE, BounceRateStatus, Notification, @@ -68,6 +69,7 @@ def send_sms_to_provider(notification): notification.id, notification.international, notification.reply_to_text, + template_id=notification.template_id, ) template_dict = dao_get_template_by_id(notification.template_id, notification.template_version).__dict__ @@ -334,9 +336,15 @@ def update_notification_to_sending(notification, provider): dao_update_notification(notification) -def provider_to_use(notification_type, notification_id, international=False, sender=None): +def provider_to_use(notification_type, notification_id, international=False, sender=None, template_id=None): + # Temporary redirect setup for template IDs that are meant for the short code usage. + if notification_type == SMS_TYPE and template_id is not None and str(template_id) in Config.AWS_PINPOINT_SC_TEMPLATE_IDS: + return clients.get_client_by_name_and_type("pinpoint", SMS_TYPE) + active_providers_in_order = [ - p for p in get_provider_details_by_notification_type(notification_type, international) if p.active + p + for p in get_provider_details_by_notification_type(notification_type, international) + if p.active and p.identifier != PINPOINT_PROVIDER ] if not active_providers_in_order: diff --git a/app/models.py b/app/models.py index 215ad47372..f79867918e 100644 --- a/app/models.py +++ b/app/models.py @@ -1276,9 +1276,10 @@ def get_link(self): SNS_PROVIDER = "sns" +PINPOINT_PROVIDER = "pinpoint" SES_PROVIDER = "ses" -SMS_PROVIDERS = [SNS_PROVIDER] +SMS_PROVIDERS = [SNS_PROVIDER, PINPOINT_PROVIDER] EMAIL_PROVIDERS = [SES_PROVIDER] PROVIDERS = SMS_PROVIDERS + EMAIL_PROVIDERS diff --git a/tests/app/celery/test_process_pinpoint_receipts_tasks.py b/tests/app/celery/test_process_pinpoint_receipts_tasks.py new file mode 100644 index 0000000000..03932428bd --- /dev/null +++ b/tests/app/celery/test_process_pinpoint_receipts_tasks.py @@ -0,0 +1,209 @@ +from datetime import datetime + +import pytest +from freezegun import freeze_time + +from app import statsd_client +from app.aws.mocks import pinpoint_failed_callback, pinpoint_success_callback +from app.celery.process_pinpoint_receipts_tasks import process_pinpoint_results +from app.dao.notifications_dao import get_notification_by_id +from app.models import ( + NOTIFICATION_DELIVERED, + NOTIFICATION_PERMANENT_FAILURE, + NOTIFICATION_SENT, + NOTIFICATION_TECHNICAL_FAILURE, + NOTIFICATION_TEMPORARY_FAILURE, +) +from app.notifications.callbacks import create_delivery_status_callback_data +from celery.exceptions import MaxRetriesExceededError +from tests.app.conftest import create_sample_notification +from tests.app.db import ( + create_notification, + create_service_callback_api, + save_notification, +) + + +def test_process_pinpoint_results_delivered(sample_template, notify_db, notify_db_session, mocker): + mock_logger = mocker.patch("app.celery.process_pinpoint_receipts_tasks.current_app.logger.info") + mock_callback_task = mocker.patch("app.notifications.callbacks._check_and_queue_callback_task") + + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_template, + reference="ref", + status=NOTIFICATION_SENT, + sent_by="pinpoint", + sent_at=datetime.utcnow(), + ) + assert get_notification_by_id(notification.id).status == NOTIFICATION_SENT + + process_pinpoint_results(pinpoint_success_callback(reference="ref")) + + assert mock_callback_task.called_once_with(get_notification_by_id(notification.id)) + assert get_notification_by_id(notification.id).status == NOTIFICATION_DELIVERED + assert get_notification_by_id(notification.id).provider_response == "Message has been accepted by phone" + + mock_logger.assert_called_once_with(f"Pinpoint callback return status of delivered for notification: {notification.id}") + + +@pytest.mark.parametrize( + "provider_response, expected_status, should_log_warning, should_save_provider_response", + [ + ( + "Blocked as spam by phone carrier", + NOTIFICATION_TECHNICAL_FAILURE, + False, + True, + ), + ( + "Phone carrier is currently unreachable/unavailable", + NOTIFICATION_TEMPORARY_FAILURE, + False, + True, + ), + ( + "Phone is currently unreachable/unavailable", + NOTIFICATION_PERMANENT_FAILURE, + False, + True, + ), + ("This is not a real response", NOTIFICATION_TECHNICAL_FAILURE, True, True), + ], +) +def test_process_pinpoint_results_failed( + sample_template, + notify_db, + notify_db_session, + mocker, + provider_response, + expected_status, + should_log_warning, + should_save_provider_response, +): + mock_logger = mocker.patch("app.celery.process_pinpoint_receipts_tasks.current_app.logger.info") + mock_warning_logger = mocker.patch("app.celery.process_pinpoint_receipts_tasks.current_app.logger.warning") + mock_callback_task = mocker.patch("app.notifications.callbacks._check_and_queue_callback_task") + + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_template, + reference="ref", + status=NOTIFICATION_SENT, + sent_by="pinpoint", + sent_at=datetime.utcnow(), + ) + assert get_notification_by_id(notification.id).status == NOTIFICATION_SENT + process_pinpoint_results(pinpoint_failed_callback(provider_response=provider_response, reference="ref")) + + assert mock_callback_task.called_once_with(get_notification_by_id(notification.id)) + assert get_notification_by_id(notification.id).status == expected_status + + if should_save_provider_response: + assert get_notification_by_id(notification.id).provider_response == provider_response + else: + assert get_notification_by_id(notification.id).provider_response is None + + mock_logger.assert_called_once_with( + ( + f"Pinpoint delivery failed: notification id {notification.id} and reference ref has error found. " + f"Provider response: {provider_response}" + ) + ) + + assert mock_warning_logger.call_count == int(should_log_warning) + + +def test_pinpoint_callback_should_retry_if_notification_is_missing(notify_db, mocker): + mock_retry = mocker.patch("app.celery.process_pinpoint_receipts_tasks.process_pinpoint_results.retry") + mock_callback_task = mocker.patch("app.notifications.callbacks._check_and_queue_callback_task") + + process_pinpoint_results(pinpoint_success_callback(reference="ref")) + + mock_callback_task.assert_not_called() + assert mock_retry.call_count == 1 + + +def test_pinpoint_callback_should_give_up_after_max_tries(notify_db, mocker): + mocker.patch( + "app.celery.process_pinpoint_receipts_tasks.process_pinpoint_results.retry", + side_effect=MaxRetriesExceededError, + ) + mock_logger = mocker.patch("app.celery.process_pinpoint_receipts_tasks.current_app.logger.warning") + mock_callback_task = mocker.patch("app.notifications.callbacks._check_and_queue_callback_task") + + process_pinpoint_results(pinpoint_success_callback(reference="ref")) is None + mock_callback_task.assert_not_called() + + mock_logger.assert_called_with("notification not found for Pinpoint reference: ref (update to delivered). Giving up.") + + +def test_process_pinpoint_results_retry_called(sample_template, mocker): + save_notification( + create_notification( + sample_template, + reference="ref1", + sent_at=datetime.utcnow(), + status=NOTIFICATION_SENT, + sent_by="pinpoint", + ) + ) + + mocker.patch( + "app.dao.notifications_dao._update_notification_status", + side_effect=Exception("EXPECTED"), + ) + mocked = mocker.patch("app.celery.process_pinpoint_receipts_tasks.process_pinpoint_results.retry") + process_pinpoint_results(response=pinpoint_success_callback(reference="ref1")) + assert mocked.call_count == 1 + + +def test_process_pinpoint_results_does_not_process_other_providers(sample_template, mocker): + mock_logger = mocker.patch("app.celery.process_pinpoint_receipts_tasks.current_app.logger.exception") + mock_dao = mocker.patch("app.dao.notifications_dao._update_notification_status") + save_notification( + create_notification( + sample_template, + reference="ref1", + sent_at=datetime.utcnow(), + status=NOTIFICATION_SENT, + sent_by="sns", + ) + ) + + process_pinpoint_results(response=pinpoint_success_callback(reference="ref1")) is None + assert mock_logger.called_once_with("") + assert not mock_dao.called + + +def test_process_pinpoint_results_calls_service_callback(sample_template, notify_db_session, notify_db, mocker): + with freeze_time("2021-01-01T12:00:00"): + mocker.patch("app.statsd_client.incr") + mocker.patch("app.statsd_client.timing_with_dates") + mock_send_status = mocker.patch("app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async") + mock_callback = mocker.patch("app.notifications.callbacks._check_and_queue_callback_task") + + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_template, + reference="ref", + status=NOTIFICATION_SENT, + sent_by="pinpoint", + sent_at=datetime.utcnow(), + ) + callback_api = create_service_callback_api(service=sample_template.service, url="https://example.com") + assert get_notification_by_id(notification.id).status == NOTIFICATION_SENT + + process_pinpoint_results(pinpoint_success_callback(reference="ref")) + + assert mock_callback.called_once_with(get_notification_by_id(notification.id)) + assert get_notification_by_id(notification.id).status == NOTIFICATION_DELIVERED + assert get_notification_by_id(notification.id).provider_response == "Message has been accepted by phone" + statsd_client.timing_with_dates.assert_any_call("callback.pinpoint.elapsed-time", datetime.utcnow(), notification.sent_at) + statsd_client.incr.assert_any_call("callback.pinpoint.delivered") + updated_notification = get_notification_by_id(notification.id) + signed_data = create_delivery_status_callback_data(updated_notification, callback_api) + mock_send_status.assert_called_once_with([str(notification.id), signed_data], queue="service-callbacks")