Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pinpoint for designated templates #2152

Merged
merged 43 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
9740520
rough in pinpoint
sastels Apr 5, 2024
064e36d
add pool_id env var
sastels Apr 5, 2024
b9d0d3a
sending with pinpoint pool
sastels Apr 9, 2024
0fa34c1
use pinpoint pool for specified templates
sastels Apr 9, 2024
7e69706
format
sastels Apr 9, 2024
b3d12e5
tweak
sastels Apr 9, 2024
fe263c6
Merge branch 'main' into rough-in-pinpoint
sastels Apr 9, 2024
d18ce27
add configuration set
sastels Apr 11, 2024
fceab9f
add task for processing pinpoint receipts
sastels Apr 12, 2024
b8ceb91
add mocks for pinpoint receipt task testing
sastels Apr 12, 2024
1a6ba9e
rough in test_process_pinpoint_receipts_tasks.py
sastels Apr 12, 2024
6e3d339
make tests pass
sastels Apr 12, 2024
ca2da8c
add explicit return
sastels Apr 12, 2024
7ec198b
Merge branch 'main' into rough-in-pinpoint
sastels Apr 12, 2024
3db48e8
tweak
sastels Apr 18, 2024
d2e8471
tweak
sastels Apr 22, 2024
c8eb6c5
Merge branch 'main' into rough-in-pinpoint
sastels Apr 22, 2024
6c090ce
working now
sastels Apr 22, 2024
3f59f27
add new env vars to .env.examples
sastels Apr 23, 2024
646b86d
Update .env.example
sastels Apr 24, 2024
bde0f68
Update .env.example
sastels Apr 24, 2024
12f539f
Update app/clients/sms/aws_pinpoint.py
sastels Apr 24, 2024
965fbff
Update app/config.py
sastels Apr 24, 2024
26d4419
Update app/delivery/send_to_providers.py
sastels Apr 24, 2024
5710df2
Update app/config.py
sastels Apr 24, 2024
28524bb
Merge branch 'main' into rough-in-pinpoint
sastels Apr 24, 2024
efa0df9
add typing / docstring for determine_pinpoint_status
sastels Apr 24, 2024
d1612c2
wip add pattern matching
sastels Apr 25, 2024
c08cc14
matchy-matchy
sastels Apr 25, 2024
aed401f
Merge branch 'main' into rough-in-pinpoint
sastels May 6, 2024
e205bc1
formatting
sastels May 6, 2024
41e7eaa
add if/else alternative to matching
sastels May 6, 2024
40e4c87
fix case
sastels May 6, 2024
f09cec5
Merge branch 'main' into rough-in-pinpoint
sastels May 6, 2024
fde4812
add return
sastels May 6, 2024
040a361
use new callback.pinpoint metrics
sastels May 6, 2024
da4006f
remove redundant CliendError catching
sastels May 6, 2024
188d0b6
Merge branch 'main' into rough-in-pinpoint
sastels May 6, 2024
b4e2bd8
format
sastels May 6, 2024
afec912
refactor process_pinpoint_results tests
sastels May 7, 2024
337c594
if/else ftw
sastels May 7, 2024
7fb51e1
format
sastels May 7, 2024
7723702
Merge branch 'main' into rough-in-pinpoint
sastels May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
PerformancePlatformClient,
)
from app.clients.salesforce.salesforce_client import SalesforceClient
from app.clients.sms.aws_pinpoint import AwsPinpointClient
from app.clients.sms.aws_sns import AwsSnsClient
from app.dbsetup import RoutingSQLAlchemy
from app.encryption import CryptoSigner
Expand All @@ -45,6 +46,7 @@
notify_celery = NotifyCelery()
aws_ses_client = AwsSesClient()
aws_sns_client = AwsSnsClient()
aws_pinpoint_client = AwsPinpointClient()
signer_notification = CryptoSigner()
signer_personalisation = CryptoSigner()
signer_complaint = CryptoSigner()
Expand Down Expand Up @@ -107,6 +109,7 @@ def create_app(application, config=None):
statsd_client.init_app(application)
logging.init_app(application, statsd_client)
aws_sns_client.init_app(application, statsd_client=statsd_client)
aws_pinpoint_client.init_app(application, statsd_client=statsd_client)
aws_ses_client.init_app(application.config["AWS_REGION"], statsd_client=statsd_client)
notify_celery.init_app(application)

Expand All @@ -120,7 +123,7 @@ def create_app(application, config=None):

performance_platform_client.init_app(application)
document_download_client.init_app(application)
clients.init_app(sms_clients=[aws_sns_client], email_clients=[aws_ses_client])
clients.init_app(sms_clients=[aws_sns_client, aws_pinpoint_client], email_clients=[aws_ses_client])

if application.config["FF_SALESFORCE_CONTACT"]:
salesforce_client.init_app(application)
Expand Down
141 changes: 141 additions & 0 deletions app/celery/process_pinpoint_receipts_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from datetime import datetime

from flask import current_app, json
from notifications_utils.statsd_decorators import statsd
from sqlalchemy.orm.exc import NoResultFound

from app import notify_celery, statsd_client
from app.config import QueueNames
from app.dao import notifications_dao
from app.models import (
NOTIFICATION_DELIVERED,
NOTIFICATION_PERMANENT_FAILURE,
NOTIFICATION_SENT,
NOTIFICATION_TECHNICAL_FAILURE,
NOTIFICATION_TEMPORARY_FAILURE,
PINPOINT_PROVIDER,
)
from app.notifications.callbacks import _check_and_queue_callback_task
from celery.exceptions import Retry


# Pinpoint receipts are of the form:
# {
# "eventType": "TEXT_DELIVERED",
# "eventVersion": "1.0",
# "eventTimestamp": 1712944268877,
# "isFinal": true,
# "originationPhoneNumber": "+13655550100",
# "destinationPhoneNumber": "+16135550123",
# "isoCountryCode": "CA",
# "mcc": "302",
# "mnc": "610",
# "carrierName": "Bell Cellular Inc. / Aliant Telecom",
# "messageId": "221bc70c-7ee6-4987-b1ba-9684ba25be20",
# "messageRequestTimestamp": 1712944267685,
# "messageEncoding": "GSM",
# "messageType": "TRANSACTIONAL",
# "messageStatus": "DELIVERED",
# "messageStatusDescription": "Message has been accepted by phone",
# "totalMessageParts": 1,
# "totalMessagePrice": 0.00581,
# "totalCarrierFee": 0.006
# }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice documentation to have in the code!


@notify_celery.task(bind=True, name="process-pinpoint-result", max_retries=5, default_retry_delay=300)
@statsd(namespace="tasks")
def process_pinpoint_results(self, response):
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this part of the resurrected code too? Not to be done right now, but as we want to move away from Celery processing the callbacks, I was wondering if that would not be a good opportunity to have pinpoint results processing all in lambda sooner with this work. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is mostly the old code (and also pretty much a copy of the current sns/sms receipt processing).

I thought about moving it all to the new lambda, but figured that for a first step we just get this all in and working and then we could think about refactoring.

Admittedly, the pinpoint receipts would be a good first target if, for example, we restrict the shortcode to GCNotify initially.

try:
receipt = json.loads(response)
reference = receipt["messageId"]
status = receipt["messageStatus"]
provider_response = receipt["messageStatusDescription"]

notification_status = determine_pinpoint_status(status, provider_response)
if not notification_status:
current_app.logger.warning(f"unhandled provider response for reference {reference}, received '{provider_response}'")
notification_status = NOTIFICATION_TECHNICAL_FAILURE # revert to tech failure by default

try:
notification = notifications_dao.dao_get_notification_by_reference(reference)
except NoResultFound:
try:
current_app.logger.warning(
f"RETRY {self.request.retries}: notification not found for Pinpoint reference {reference} (update to {notification_status}). "
f"Callback may have arrived before notification was persisted to the DB. Adding task to retry queue"
)
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.warning(
f"notification not found for Pinpoint reference: {reference} (update to {notification_status}). Giving up."
)
return
if notification.sent_by != PINPOINT_PROVIDER:
current_app.logger.exception(f"Pinpoint callback handled notification {notification.id} not sent by Pinpoint")
return

if notification.status != NOTIFICATION_SENT:
notifications_dao._duplicate_update_warning(notification, notification_status)
return

notifications_dao._update_notification_status(
notification=notification,
status=notification_status,
provider_response=provider_response,
)

if notification_status != NOTIFICATION_DELIVERED:
current_app.logger.info(
(
f"Pinpoint delivery failed: notification id {notification.id} and reference {reference} has error found. "
f"Provider response: {provider_response}"
)
)
else:
current_app.logger.info(f"Pinpoint callback return status of {notification_status} for notification: {notification.id}")

statsd_client.incr(f"callback.sns.{notification_status}") # TODO: do we want a Pinpoint metric here?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say yes! it might come up very useful to have our own pinpoint metric, the downside though is we'd have to update our dashboards with duplicate widgets for the ones tracking the SNS one.

Copy link
Collaborator Author

@sastels sastels May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shall add, though it's not clear to me that we have the sns ones in alarms or dashboards... at least, I've found callback_ses_elapsed-time in our SLO dashboard, but callback_sns_elapsed-time does not appear to be used anywhere... Possibly we should have a "callbacks" dashboard with all these stats.


if notification.sent_at:
statsd_client.timing_with_dates("callback.sns.elapsed-time", datetime.utcnow(), notification.sent_at) # TODO: do we want a Pinpoint metric here?

_check_and_queue_callback_task(notification)

return True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we return true here, but simple return (no value) for others? This was also picked up by our scanner over here:
https://github.com/cds-snc/notification-api/security/code-scanning/804

Copy link
Collaborator Author

@sastels sastels Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a good question. It's what the other tasks do, and all I could think of was that the tasks were returning None if they failed and True if they succeeded. My googling wasn't able to confirm or dispute this theory however :/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I wonder if this function was called elsewhere maybe and hence expected to return something, maybe the older CLI commands? 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha! I was excluding our tests when looking to see how it's called. Our tests use the return value to determine if the function successfully processed the receipt (True) or if there was a problem (None).

I'll add a return at the end of the function to quiet down the scanner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect the tests to be able to determine the end by verifying if the _check_and_queue_callback_task function was called but the code wanted it simpler I guess.

Do you still want to remove that True here? 👀

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... having code in the app just for test verification is ugly and also could lead to tests passing when it doesn't really work (like if there was a return True where there shouldn't be). I changed the tests to just checking that callback task as you suggested.


except Retry:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the impression this could be reworked. My guess is we want to let the retry exception triggered at line 67 surface up? and not have this be caught by the generic exception clause that follows, which will also put it in to a retry?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's basically my understanding - if a task is retried then a Retry exception is raised, and we want to guard the subsequent "something went wrong, let's manually retry it" code in this case.

We could add some sort of if e != Retry to the except Exception as e: block instead, but I think the way we already have it is more pythonic. Or were you thinking of a different approach to refactoring?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't have anything specific in mind unfortunately and it seems it is the best way we have!

raise

except Exception as e:
current_app.logger.exception(f"Error processing Pinpoint results: {str(e)}")
self.retry(queue=QueueNames.RETRY)


def determine_pinpoint_status(sns_status, provider_response):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add typing info to the function's signature please? 🙏

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! plus a docstring. Unfortunately I couldn't figure out a way to restrict the return values to NOTIFICATION_DELIVERED, NOTIFICATION_TECHNICAL_FAILURE, etc. Typing really didn't want variables involved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we'd have to turn these into Enum I guess. str should be good enough for now. Thanks!

if sns_status == "DELIVERED":
return NOTIFICATION_DELIVERED

# See all the possible provider responses
# https://docs.aws.amazon.com/sns/latest/dg/sms_stats_cloudwatch.html#sms_stats_delivery_fail_reasons
reasons = {
"Blocked as spam by phone carrier": NOTIFICATION_TECHNICAL_FAILURE,
"Destination is on a blocked list": NOTIFICATION_TECHNICAL_FAILURE,
"Invalid phone number": NOTIFICATION_TECHNICAL_FAILURE,
"Message body is invalid": NOTIFICATION_TECHNICAL_FAILURE,
"Phone carrier has blocked this message": NOTIFICATION_TECHNICAL_FAILURE,
"Phone carrier is currently unreachable/unavailable": NOTIFICATION_TEMPORARY_FAILURE,
"Phone has blocked SMS": NOTIFICATION_TECHNICAL_FAILURE,
"Phone is on a blocked list": NOTIFICATION_TECHNICAL_FAILURE,
"Phone is currently unreachable/unavailable": NOTIFICATION_PERMANENT_FAILURE,
"Phone number is opted out": NOTIFICATION_PERMANENT_FAILURE,
"This delivery would exceed max price": NOTIFICATION_TECHNICAL_FAILURE,
"Unknown error attempting to reach phone": NOTIFICATION_TECHNICAL_FAILURE,
}

status = reasons.get(provider_response) # could be None
if not status:
# TODO: Pattern matching in Python 3.10 should simplify this overall function logic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was copied over older code I assume? we can try this pattern matching now? 👀

Copy link
Collaborator Author

@sastels sastels May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some matching but I think it's cleaner refactored into an if/elif block (see comment in the code). Thoughts?

if "is opted out" in provider_response:
return NOTIFICATION_PERMANENT_FAILURE

return status
64 changes: 64 additions & 0 deletions app/clients/sms/aws_pinpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from time import monotonic

import boto3
import phonenumbers
from botocore.exceptions import ClientError
Fixed Show fixed Hide fixed

from app.clients.sms import SmsClient


class AwsPinpointClient(SmsClient):
"""
AWS Pinpoint SMS client
"""

def init_app(self, current_app, statsd_client, *args, **kwargs):
self._client = boto3.client("pinpoint-sms-voice-v2", region_name="ca-central-1")
super(AwsPinpointClient, self).__init__(*args, **kwargs)
# super(SmsClient, self).__init__(*args, **kwargs)
self.current_app = current_app
self.name = "pinpoint"
self.statsd_client = statsd_client

def get_name(self):
return self.name

def send_sms(self, to, content, reference, multi=True, sender=None):
pool_id = self.current_app.config["AWS_PINPOINT_POOL_ID"]
sastels marked this conversation as resolved.
Show resolved Hide resolved
messageType = "TRANSACTIONAL"
matched = False

for match in phonenumbers.PhoneNumberMatcher(to, "US"): # SJA why is this a loop?
matched = True
Dismissed Show dismissed Hide dismissed
to = phonenumbers.format_number(match.number, phonenumbers.PhoneNumberFormat.E164)
destinationNumber = to

try:
start_time = monotonic()

# from https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/pinpoint-sms-voice-v2/client/send_text_message.html
response = self._client.send_text_message(
DestinationPhoneNumber=destinationNumber,
OriginationIdentity=pool_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

MessageBody=content,
MessageType=messageType,
ConfigurationSetName=self.current_app.config["AWS_PINPOINT_CONFIGURATION_SET_NAME"],
)

except ClientError as e:
self.statsd_client.incr("clients.pinpoint.error")
raise Exception(e)
except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can combine these two if the logic is exactly the same.

Suggested change
except Exception as e:
except (ClientError, Exception) as e:

There might be syntactic sugar to avoid the tuple and rather use the | operator nowadays, not sure..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 a ClientError is an Exception so I think we can just get rid of the ClientError bit alltogether!

self.statsd_client.incr("clients.pinpoint.error")
raise Exception(e)
finally:
elapsed_time = monotonic() - start_time
self.current_app.logger.info("AWS Pinpoint request finished in {}".format(elapsed_time))
self.statsd_client.timing("clients.pinpoint.request-time", elapsed_time)
self.statsd_client.incr("clients.pinpoint.success")
return response["MessageId"]

if not matched:
self.statsd_client.incr("clients.pinpoint.error")
self.current_app.logger.error("No valid numbers found in {}".format(to))
raise ValueError("No valid numbers found for SMS delivery")
3 changes: 3 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ class Config(object):
AWS_SES_ACCESS_KEY = os.getenv("AWS_SES_ACCESS_KEY")
AWS_SES_SECRET_KEY = os.getenv("AWS_SES_SECRET_KEY")
AWS_PINPOINT_REGION = os.getenv("AWS_PINPOINT_REGION", "us-west-2")
AWS_PINPOINT_POOL_ID = os.getenv("AWS_PINPOINT_POOL_ID", None)
sastels marked this conversation as resolved.
Show resolved Hide resolved
AWS_PINPOINT_CONFIGURATION_SET_NAME = os.getenv("AWS_PINPOINT_CONFIGURATION_SET_NAME", "pinpoint-configuration")
AWS_PINPOINT_TEMPLATE_IDS = env.list("AWS_PINPOINT_TEMPLATE_IDS", [])
sastels marked this conversation as resolved.
Show resolved Hide resolved
AWS_US_TOLL_FREE_NUMBER = os.getenv("AWS_US_TOLL_FREE_NUMBER")
CSV_UPLOAD_BUCKET_NAME = os.getenv("CSV_UPLOAD_BUCKET_NAME", "notification-alpha-canada-ca-csv-upload")
ASSET_DOMAIN = os.getenv("ASSET_DOMAIN", "assets.notification.canada.ca")
Expand Down
6 changes: 5 additions & 1 deletion app/delivery/send_to_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def send_sms_to_provider(notification):
notification.id,
notification.international,
notification.reply_to_text,
template_id=notification.template_id,
)

template_dict = dao_get_template_by_id(notification.template_id, notification.template_version).__dict__
Expand Down Expand Up @@ -334,7 +335,10 @@ def update_notification_to_sending(notification, provider):
dao_update_notification(notification)


def provider_to_use(notification_type, notification_id, international=False, sender=None):
def provider_to_use(notification_type, notification_id, international=False, sender=None, template_id=None):
if notification_type == SMS_TYPE and template_id is not None and str(template_id) in Config.AWS_PINPOINT_TEMPLATE_IDS:
sastels marked this conversation as resolved.
Show resolved Hide resolved
return clients.get_client_by_name_and_type("pinpoint", SMS_TYPE)

active_providers_in_order = [
p for p in get_provider_details_by_notification_type(notification_type, international) if p.active
]
Expand Down
3 changes: 2 additions & 1 deletion app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1272,9 +1272,10 @@ def get_link(self):


SNS_PROVIDER = "sns"
PINPOINT_PROVIDER = "pinpoint"
SES_PROVIDER = "ses"

SMS_PROVIDERS = [SNS_PROVIDER]
SMS_PROVIDERS = [SNS_PROVIDER, PINPOINT_PROVIDER]
EMAIL_PROVIDERS = [SES_PROVIDER]
PROVIDERS = SMS_PROVIDERS + EMAIL_PROVIDERS

Expand Down
Loading