Skip to content

Commit

Permalink
Use pinpoint for designated templates (#2152)
Browse files Browse the repository at this point in the history
  • Loading branch information
sastels authored May 8, 2024
1 parent 5206d78 commit 3262a4a
Show file tree
Hide file tree
Showing 9 changed files with 501 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ AWS_PINPOINT_REGION=us-west-2
AWS_EMF_ENVIRONMENT=local

CONTACT_FORM_EMAIL_ADDRESS = ""

AWS_PINPOINT_SC_POOL_ID=
AWS_PINPOINT_SC_TEMPLATE_IDS=
5 changes: 4 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
PerformancePlatformClient,
)
from app.clients.salesforce.salesforce_client import SalesforceClient
from app.clients.sms.aws_pinpoint import AwsPinpointClient
from app.clients.sms.aws_sns import AwsSnsClient
from app.dbsetup import RoutingSQLAlchemy
from app.encryption import CryptoSigner
Expand All @@ -45,6 +46,7 @@
notify_celery = NotifyCelery()
aws_ses_client = AwsSesClient()
aws_sns_client = AwsSnsClient()
aws_pinpoint_client = AwsPinpointClient()
signer_notification = CryptoSigner()
signer_personalisation = CryptoSigner()
signer_complaint = CryptoSigner()
Expand Down Expand Up @@ -107,6 +109,7 @@ def create_app(application, config=None):
statsd_client.init_app(application)
logging.init_app(application, statsd_client)
aws_sns_client.init_app(application, statsd_client=statsd_client)
aws_pinpoint_client.init_app(application, statsd_client=statsd_client)
aws_ses_client.init_app(application.config["AWS_REGION"], statsd_client=statsd_client)
notify_celery.init_app(application)

Expand All @@ -120,7 +123,7 @@ def create_app(application, config=None):

performance_platform_client.init_app(application)
document_download_client.init_app(application)
clients.init_app(sms_clients=[aws_sns_client], email_clients=[aws_ses_client])
clients.init_app(sms_clients=[aws_sns_client, aws_pinpoint_client], email_clients=[aws_ses_client])

if application.config["FF_SALESFORCE_CONTACT"]:
salesforce_client.init_app(application)
Expand Down
67 changes: 67 additions & 0 deletions app/aws/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,57 @@ def sns_failed_callback(provider_response, reference=None, timestamp="2016-06-28
return _sns_callback(body)


# Note that 1467074434 = 2016-06-28 00:40:34.558 UTC
def pinpoint_success_callback(reference=None, timestamp=1467074434, destination="+1XXX5550100"):
body = {
"eventType": "TEXT_DELIVERED",
"eventVersion": "1.0",
"eventTimestamp": timestamp,
"isFinal": True,
"originationPhoneNumber": "+13655550100",
"destinationPhoneNumber": destination,
"isoCountryCode": "CA",
"mcc": "302",
"mnc": "610",
"carrierName": "Bell Cellular Inc. / Aliant Telecom",
"messageId": reference,
"messageRequestTimestamp": timestamp,
"messageEncoding": "GSM",
"messageType": "TRANSACTIONAL",
"messageStatus": "DELIVERED",
"messageStatusDescription": "Message has been accepted by phone",
"totalMessageParts": 1,
"totalMessagePrice": 0.00581,
"totalCarrierFee": 0.006,
}

return _pinpoint_callback(body)


# Note that 1467074434 = 2016-06-28 00:40:34.558 UTC
def pinpoint_failed_callback(provider_response, reference=None, timestamp=1467074434, destination="+1XXX5550100"):
body = {
"eventType": "TEXT_CARRIER_UNREACHABLE",
"eventVersion": "1.0",
"eventTimestamp": timestamp,
"isFinal": True,
"originationPhoneNumber": "+13655550100",
"destinationPhoneNumber": destination,
"isoCountryCode": "CA",
"messageId": reference,
"messageRequestTimestamp": timestamp,
"messageEncoding": "GSM",
"messageType": "TRANSACTIONAL",
"messageStatus": "CARRIER_UNREACHABLE",
"messageStatusDescription": provider_response,
"totalMessageParts": 1,
"totalMessagePrice": 0.00581,
"totalCarrierFee": 0.006,
}

return _pinpoint_callback(body)


def _ses_bounce_callback(reference, bounce_type, bounce_subtype=None):
ses_message_body = {
"bounce": {
Expand Down Expand Up @@ -267,3 +318,19 @@ def _sns_callback(body):
"UnsubscribeUrl": "https://sns.ca-central-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REACTED]",
"MessageAttributes": {},
}


def _pinpoint_callback(body):
return {
"Type": "Notification",
"MessageId": "8e83c020-1234-1234-1234-92a8ee9baa0a",
"TopicArn": "arn:aws:sns:ca-central-1:12341234:ses_notifications",
"Subject": None,
"Message": json.dumps(body),
"Timestamp": "2017-11-17T12:14:03.710Z",
"SignatureVersion": "1",
"Signature": "[REDACTED]",
"SigningCertUrl": "https://sns.ca-central-1.amazonaws.com/SimpleNotificationService-[REDACTED].pem",
"UnsubscribeUrl": "https://sns.ca-central-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REACTED]",
"MessageAttributes": {},
}
146 changes: 146 additions & 0 deletions app/celery/process_pinpoint_receipts_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from datetime import datetime
from typing import Union

from flask import current_app, json
from notifications_utils.statsd_decorators import statsd
from sqlalchemy.orm.exc import NoResultFound

from app import notify_celery, statsd_client
from app.config import QueueNames
from app.dao import notifications_dao
from app.models import (
NOTIFICATION_DELIVERED,
NOTIFICATION_PERMANENT_FAILURE,
NOTIFICATION_SENT,
NOTIFICATION_TECHNICAL_FAILURE,
NOTIFICATION_TEMPORARY_FAILURE,
PINPOINT_PROVIDER,
)
from app.notifications.callbacks import _check_and_queue_callback_task
from celery.exceptions import Retry

# Pinpoint receipts are of the form:
# {
# "eventType": "TEXT_DELIVERED",
# "eventVersion": "1.0",
# "eventTimestamp": 1712944268877,
# "isFinal": true,
# "originationPhoneNumber": "+13655550100",
# "destinationPhoneNumber": "+16135550123",
# "isoCountryCode": "CA",
# "mcc": "302",
# "mnc": "610",
# "carrierName": "Bell Cellular Inc. / Aliant Telecom",
# "messageId": "221bc70c-7ee6-4987-b1ba-9684ba25be20",
# "messageRequestTimestamp": 1712944267685,
# "messageEncoding": "GSM",
# "messageType": "TRANSACTIONAL",
# "messageStatus": "DELIVERED",
# "messageStatusDescription": "Message has been accepted by phone",
# "totalMessageParts": 1,
# "totalMessagePrice": 0.00581,
# "totalCarrierFee": 0.006
# }


@notify_celery.task(bind=True, name="process-pinpoint-result", max_retries=5, default_retry_delay=300)
@statsd(namespace="tasks")
def process_pinpoint_results(self, response):
try:
receipt = json.loads(response["Message"])
reference = receipt["messageId"]
status = receipt["messageStatus"]
provider_response = receipt["messageStatusDescription"]

notification_status = determine_pinpoint_status(status, provider_response)
if not notification_status:
current_app.logger.warning(f"unhandled provider response for reference {reference}, received '{provider_response}'")
notification_status = NOTIFICATION_TECHNICAL_FAILURE # revert to tech failure by default

try:
notification = notifications_dao.dao_get_notification_by_reference(reference)
except NoResultFound:
try:
current_app.logger.warning(
f"RETRY {self.request.retries}: notification not found for Pinpoint reference {reference} (update to {notification_status}). "
f"Callback may have arrived before notification was persisted to the DB. Adding task to retry queue"
)
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.warning(
f"notification not found for Pinpoint reference: {reference} (update to {notification_status}). Giving up."
)
return
if notification.sent_by != PINPOINT_PROVIDER:
current_app.logger.exception(f"Pinpoint callback handled notification {notification.id} not sent by Pinpoint")
return

if notification.status != NOTIFICATION_SENT:
notifications_dao._duplicate_update_warning(notification, notification_status)
return

notifications_dao._update_notification_status(
notification=notification,
status=notification_status,
provider_response=provider_response,
)

if notification_status != NOTIFICATION_DELIVERED:
current_app.logger.info(
(
f"Pinpoint delivery failed: notification id {notification.id} and reference {reference} has error found. "
f"Provider response: {provider_response}"
)
)
else:
current_app.logger.info(
f"Pinpoint callback return status of {notification_status} for notification: {notification.id}"
)

statsd_client.incr(f"callback.pinpoint.{notification_status}")

if notification.sent_at:
statsd_client.timing_with_dates("callback.pinpoint.elapsed-time", datetime.utcnow(), notification.sent_at)

_check_and_queue_callback_task(notification)

except Retry:
raise

except Exception as e:
current_app.logger.exception(f"Error processing Pinpoint results: {str(e)}")
self.retry(queue=QueueNames.RETRY)


def determine_pinpoint_status(status: str, provider_response: str) -> Union[str, None]:
"""Determine the notification status based on the SMS status and provider response.
Args:
status (str): message status from AWS
provider_response (str): detailed status from the SMS provider
Returns:
Union[str, None]: the notification status or None if the status is not handled
"""

if status == "DELIVERED":
return NOTIFICATION_DELIVERED

response_lower = provider_response.lower()

if "blocked" in response_lower:
return NOTIFICATION_TECHNICAL_FAILURE
elif "invalid" in response_lower:
return NOTIFICATION_TECHNICAL_FAILURE
elif "is opted out" in response_lower:
return NOTIFICATION_PERMANENT_FAILURE
elif "unknown error" in response_lower:
return NOTIFICATION_TECHNICAL_FAILURE
elif "exceed max price" in response_lower:
return NOTIFICATION_TECHNICAL_FAILURE
elif "phone carrier is currently unreachable/unavailable" in response_lower:
return NOTIFICATION_TEMPORARY_FAILURE
elif "phone is currently unreachable/unavailable" in response_lower:
return NOTIFICATION_PERMANENT_FAILURE
else:
return None
57 changes: 57 additions & 0 deletions app/clients/sms/aws_pinpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from time import monotonic

import boto3
import phonenumbers

from app.clients.sms import SmsClient


class AwsPinpointClient(SmsClient):
"""
AWS Pinpoint SMS client
"""

def init_app(self, current_app, statsd_client, *args, **kwargs):
self._client = boto3.client("pinpoint-sms-voice-v2", region_name="ca-central-1")
super(AwsPinpointClient, self).__init__(*args, **kwargs)
# super(SmsClient, self).__init__(*args, **kwargs)
self.current_app = current_app
self.name = "pinpoint"
self.statsd_client = statsd_client

def get_name(self):
return self.name

def send_sms(self, to, content, reference, multi=True, sender=None):
pool_id = self.current_app.config["AWS_PINPOINT_SC_POOL_ID"]
messageType = "TRANSACTIONAL"
matched = False

for match in phonenumbers.PhoneNumberMatcher(to, "US"):
matched = True
to = phonenumbers.format_number(match.number, phonenumbers.PhoneNumberFormat.E164)
destinationNumber = to

try:
start_time = monotonic()
response = self._client.send_text_message(
DestinationPhoneNumber=destinationNumber,
OriginationIdentity=pool_id,
MessageBody=content,
MessageType=messageType,
ConfigurationSetName=self.current_app.config["AWS_PINPOINT_CONFIGURATION_SET_NAME"],
)
except Exception as e:
self.statsd_client.incr("clients.pinpoint.error")
raise Exception(e)
finally:
elapsed_time = monotonic() - start_time
self.current_app.logger.info("AWS Pinpoint request finished in {}".format(elapsed_time))
self.statsd_client.timing("clients.pinpoint.request-time", elapsed_time)
self.statsd_client.incr("clients.pinpoint.success")
return response["MessageId"]

if not matched:
self.statsd_client.incr("clients.pinpoint.error")
self.current_app.logger.error("No valid numbers found in {}".format(to))
raise ValueError("No valid numbers found for SMS delivery")
3 changes: 3 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ class Config(object):
AWS_SES_ACCESS_KEY = os.getenv("AWS_SES_ACCESS_KEY")
AWS_SES_SECRET_KEY = os.getenv("AWS_SES_SECRET_KEY")
AWS_PINPOINT_REGION = os.getenv("AWS_PINPOINT_REGION", "us-west-2")
AWS_PINPOINT_SC_POOL_ID = os.getenv("AWS_PINPOINT_SC_POOL_ID", None)
AWS_PINPOINT_CONFIGURATION_SET_NAME = os.getenv("AWS_PINPOINT_CONFIGURATION_SET_NAME", "pinpoint-configuration")
AWS_PINPOINT_SC_TEMPLATE_IDS = env.list("AWS_PINPOINT_SC_TEMPLATE_IDS", [])
AWS_US_TOLL_FREE_NUMBER = os.getenv("AWS_US_TOLL_FREE_NUMBER")
CSV_UPLOAD_BUCKET_NAME = os.getenv("CSV_UPLOAD_BUCKET_NAME", "notification-alpha-canada-ca-csv-upload")
ASSET_DOMAIN = os.getenv("ASSET_DOMAIN", "assets.notification.canada.ca")
Expand Down
12 changes: 10 additions & 2 deletions app/delivery/send_to_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
NOTIFICATION_SENT,
NOTIFICATION_TECHNICAL_FAILURE,
NOTIFICATION_VIRUS_SCAN_FAILED,
PINPOINT_PROVIDER,
SMS_TYPE,
BounceRateStatus,
Notification,
Expand All @@ -68,6 +69,7 @@ def send_sms_to_provider(notification):
notification.id,
notification.international,
notification.reply_to_text,
template_id=notification.template_id,
)

template_dict = dao_get_template_by_id(notification.template_id, notification.template_version).__dict__
Expand Down Expand Up @@ -334,9 +336,15 @@ def update_notification_to_sending(notification, provider):
dao_update_notification(notification)


def provider_to_use(notification_type, notification_id, international=False, sender=None):
def provider_to_use(notification_type, notification_id, international=False, sender=None, template_id=None):
# Temporary redirect setup for template IDs that are meant for the short code usage.
if notification_type == SMS_TYPE and template_id is not None and str(template_id) in Config.AWS_PINPOINT_SC_TEMPLATE_IDS:
return clients.get_client_by_name_and_type("pinpoint", SMS_TYPE)

active_providers_in_order = [
p for p in get_provider_details_by_notification_type(notification_type, international) if p.active
p
for p in get_provider_details_by_notification_type(notification_type, international)
if p.active and p.identifier != PINPOINT_PROVIDER
]

if not active_providers_in_order:
Expand Down
3 changes: 2 additions & 1 deletion app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,9 +1276,10 @@ def get_link(self):


SNS_PROVIDER = "sns"
PINPOINT_PROVIDER = "pinpoint"
SES_PROVIDER = "ses"

SMS_PROVIDERS = [SNS_PROVIDER]
SMS_PROVIDERS = [SNS_PROVIDER, PINPOINT_PROVIDER]
EMAIL_PROVIDERS = [SES_PROVIDER]
PROVIDERS = SMS_PROVIDERS + EMAIL_PROVIDERS

Expand Down
Loading

0 comments on commit 3262a4a

Please sign in to comment.