-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use pinpoint for designated templates #2152
Changes from 29 commits
9740520
064e36d
b9d0d3a
0fa34c1
7e69706
b3d12e5
fe263c6
d18ce27
fceab9f
b8ceb91
1a6ba9e
6e3d339
ca2da8c
7ec198b
3db48e8
d2e8471
c8eb6c5
6c090ce
3f59f27
646b86d
bde0f68
12f539f
965fbff
26d4419
5710df2
28524bb
efa0df9
d1612c2
c08cc14
aed401f
e205bc1
41e7eaa
40e4c87
f09cec5
fde4812
040a361
da4006f
188d0b6
b4e2bd8
afec912
337c594
7fb51e1
7723702
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -192,6 +192,57 @@ def sns_failed_callback(provider_response, reference=None, timestamp="2016-06-28 | |
return _sns_callback(body) | ||
|
||
|
||
# Note that 1467074434 = 2016-06-28 00:40:34.558 UTC | ||
def pinpoint_success_callback(reference=None, timestamp=1467074434, destination="+1XXX5550100"): | ||
body = { | ||
"eventType": "TEXT_DELIVERED", | ||
"eventVersion": "1.0", | ||
"eventTimestamp": timestamp, | ||
"isFinal": True, | ||
"originationPhoneNumber": "+13655550100", | ||
"destinationPhoneNumber": destination, | ||
"isoCountryCode": "CA", | ||
"mcc": "302", | ||
"mnc": "610", | ||
"carrierName": "Bell Cellular Inc. / Aliant Telecom", | ||
"messageId": reference, | ||
"messageRequestTimestamp": timestamp, | ||
"messageEncoding": "GSM", | ||
"messageType": "TRANSACTIONAL", | ||
"messageStatus": "DELIVERED", | ||
"messageStatusDescription": "Message has been accepted by phone", | ||
"totalMessageParts": 1, | ||
"totalMessagePrice": 0.00581, | ||
"totalCarrierFee": 0.006, | ||
} | ||
|
||
return _pinpoint_callback(body) | ||
|
||
|
||
# Note that 1467074434 = 2016-06-28 00:40:34.558 UTC | ||
def pinpoint_failed_callback(provider_response, reference=None, timestamp=1467074434, destination="+1XXX5550100"): | ||
body = { | ||
"eventType": "TEXT_CARRIER_UNREACHABLE", | ||
"eventVersion": "1.0", | ||
"eventTimestamp": timestamp, | ||
"isFinal": True, | ||
"originationPhoneNumber": "+13655550100", | ||
"destinationPhoneNumber": destination, | ||
"isoCountryCode": "CA", | ||
"messageId": reference, | ||
"messageRequestTimestamp": timestamp, | ||
"messageEncoding": "GSM", | ||
"messageType": "TRANSACTIONAL", | ||
"messageStatus": "CARRIER_UNREACHABLE", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I expect the returned status to be the same as AWS SNS? Maybe we could output these as a log for our testing so we can get the unique values and if that differ from the current implementation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently if we do not recognize the status we get back we log a warning. We should definitely keep an eye on these. Possibly we should add an alert that would warn us about these in Slack. |
||
"messageStatusDescription": provider_response, | ||
"totalMessageParts": 1, | ||
"totalMessagePrice": 0.00581, | ||
"totalCarrierFee": 0.006, | ||
jimleroyer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
return _pinpoint_callback(body) | ||
|
||
|
||
def _ses_bounce_callback(reference, bounce_type, bounce_subtype=None): | ||
ses_message_body = { | ||
"bounce": { | ||
|
@@ -267,3 +318,19 @@ def _sns_callback(body): | |
"UnsubscribeUrl": "https://sns.ca-central-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REACTED]", | ||
"MessageAttributes": {}, | ||
} | ||
|
||
|
||
def _pinpoint_callback(body): | ||
return { | ||
"Type": "Notification", | ||
"MessageId": "8e83c020-1234-1234-1234-92a8ee9baa0a", | ||
"TopicArn": "arn:aws:sns:ca-central-1:12341234:ses_notifications", | ||
"Subject": None, | ||
"Message": json.dumps(body), | ||
"Timestamp": "2017-11-17T12:14:03.710Z", | ||
"SignatureVersion": "1", | ||
"Signature": "[REDACTED]", | ||
"SigningCertUrl": "https://sns.ca-central-1.amazonaws.com/SimpleNotificationService-[REDACTED].pem", | ||
"UnsubscribeUrl": "https://sns.ca-central-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REACTED]", | ||
"MessageAttributes": {}, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
from datetime import datetime | ||
from typing import Literal, Union | ||
|
||
|
||
from flask import current_app, json | ||
from notifications_utils.statsd_decorators import statsd | ||
from sqlalchemy.orm.exc import NoResultFound | ||
|
||
from app import notify_celery, statsd_client | ||
from app.config import QueueNames | ||
from app.dao import notifications_dao | ||
from app.models import ( | ||
NOTIFICATION_DELIVERED, | ||
NOTIFICATION_PERMANENT_FAILURE, | ||
NOTIFICATION_SENT, | ||
NOTIFICATION_TECHNICAL_FAILURE, | ||
NOTIFICATION_TEMPORARY_FAILURE, | ||
PINPOINT_PROVIDER, | ||
) | ||
from app.notifications.callbacks import _check_and_queue_callback_task | ||
from celery.exceptions import Retry | ||
|
||
# Pinpoint receipts are of the form: | ||
# { | ||
# "eventType": "TEXT_DELIVERED", | ||
# "eventVersion": "1.0", | ||
# "eventTimestamp": 1712944268877, | ||
# "isFinal": true, | ||
# "originationPhoneNumber": "+13655550100", | ||
# "destinationPhoneNumber": "+16135550123", | ||
# "isoCountryCode": "CA", | ||
# "mcc": "302", | ||
# "mnc": "610", | ||
# "carrierName": "Bell Cellular Inc. / Aliant Telecom", | ||
# "messageId": "221bc70c-7ee6-4987-b1ba-9684ba25be20", | ||
# "messageRequestTimestamp": 1712944267685, | ||
# "messageEncoding": "GSM", | ||
# "messageType": "TRANSACTIONAL", | ||
# "messageStatus": "DELIVERED", | ||
# "messageStatusDescription": "Message has been accepted by phone", | ||
# "totalMessageParts": 1, | ||
# "totalMessagePrice": 0.00581, | ||
# "totalCarrierFee": 0.006 | ||
# } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice documentation to have in the code! |
||
|
||
|
||
@notify_celery.task(bind=True, name="process-pinpoint-result", max_retries=5, default_retry_delay=300) | ||
@statsd(namespace="tasks") | ||
def process_pinpoint_results(self, response): | ||
|
||
try: | ||
receipt = json.loads(response["Message"]) | ||
reference = receipt["messageId"] | ||
status = receipt["messageStatus"] | ||
provider_response = receipt["messageStatusDescription"] | ||
|
||
notification_status = determine_pinpoint_status(status, provider_response) | ||
if not notification_status: | ||
current_app.logger.warning(f"unhandled provider response for reference {reference}, received '{provider_response}'") | ||
notification_status = NOTIFICATION_TECHNICAL_FAILURE # revert to tech failure by default | ||
|
||
try: | ||
notification = notifications_dao.dao_get_notification_by_reference(reference) | ||
except NoResultFound: | ||
try: | ||
current_app.logger.warning( | ||
f"RETRY {self.request.retries}: notification not found for Pinpoint reference {reference} (update to {notification_status}). " | ||
f"Callback may have arrived before notification was persisted to the DB. Adding task to retry queue" | ||
) | ||
self.retry(queue=QueueNames.RETRY) | ||
except self.MaxRetriesExceededError: | ||
current_app.logger.warning( | ||
f"notification not found for Pinpoint reference: {reference} (update to {notification_status}). Giving up." | ||
) | ||
return | ||
if notification.sent_by != PINPOINT_PROVIDER: | ||
current_app.logger.exception(f"Pinpoint callback handled notification {notification.id} not sent by Pinpoint") | ||
return | ||
|
||
if notification.status != NOTIFICATION_SENT: | ||
notifications_dao._duplicate_update_warning(notification, notification_status) | ||
return | ||
|
||
notifications_dao._update_notification_status( | ||
notification=notification, | ||
status=notification_status, | ||
provider_response=provider_response, | ||
) | ||
|
||
if notification_status != NOTIFICATION_DELIVERED: | ||
current_app.logger.info( | ||
( | ||
f"Pinpoint delivery failed: notification id {notification.id} and reference {reference} has error found. " | ||
f"Provider response: {provider_response}" | ||
) | ||
) | ||
else: | ||
current_app.logger.info( | ||
f"Pinpoint callback return status of {notification_status} for notification: {notification.id}" | ||
) | ||
|
||
statsd_client.incr(f"callback.sns.{notification_status}") # TODO: do we want a Pinpoint metric here? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would say yes! it might come up very useful to have our own pinpoint metric, the downside though is we'd have to update our dashboards with duplicate widgets for the ones tracking the SNS one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I shall add, though it's not clear to me that we have the sns ones in alarms or dashboards... at least, I've found |
||
|
||
if notification.sent_at: | ||
statsd_client.timing_with_dates( | ||
"callback.sns.elapsed-time", datetime.utcnow(), notification.sent_at | ||
) # TODO: do we want a Pinpoint metric here? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also same here! I would think yes. |
||
|
||
_check_and_queue_callback_task(notification) | ||
|
||
return True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we return true here, but simple return (no value) for others? This was also picked up by our scanner over here: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this is a good question. It's what the other tasks do, and all I could think of was that the tasks were returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm I wonder if this function was called elsewhere maybe and hence expected to return something, maybe the older CLI commands? 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ha! I was excluding our tests when looking to see how it's called. Our tests use the return value to determine if the function successfully processed the receipt ( I'll add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect the tests to be able to determine the end by verifying if the Do you still want to remove that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah... having code in the app just for test verification is ugly and also could lead to tests passing when it doesn't really work (like if there was a |
||
|
||
except Retry: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have the impression this could be reworked. My guess is we want to let the retry exception triggered at line 67 surface up? and not have this be caught by the generic exception clause that follows, which will also put it in to a retry? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's basically my understanding - if a task is retried then a We could add some sort of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't have anything specific in mind unfortunately and it seems it is the best way we have! |
||
raise | ||
|
||
except Exception as e: | ||
current_app.logger.exception(f"Error processing Pinpoint results: {str(e)}") | ||
self.retry(queue=QueueNames.RETRY) | ||
|
||
|
||
def determine_pinpoint_status( | ||
status: str, provider_response: str) -> Union[str, None]: | ||
|
||
"""Determine the notification status based on the SMS status and provider response. | ||
|
||
Args: | ||
status (str): message status from AWS | ||
provider_response (str): detailed status from the SMS provider | ||
|
||
Returns: | ||
Union[str, None]: the notification status or None if the status is not handled | ||
""" | ||
|
||
if status == "DELIVERED": | ||
return NOTIFICATION_DELIVERED | ||
|
||
response_lower = provider_response.lower() | ||
match response_lower: | ||
case response_lower if "blocked" in response_lower: | ||
return NOTIFICATION_TECHNICAL_FAILURE | ||
case response_lower if "invalid" in response_lower: | ||
return NOTIFICATION_TECHNICAL_FAILURE | ||
case response_lower if "is opted out" in response_lower: | ||
return NOTIFICATION_PERMANENT_FAILURE | ||
case response_lower if "unknown error" in response_lower: | ||
return NOTIFICATION_TECHNICAL_FAILURE | ||
case response_lower if "exceed max price" in response_lower: | ||
return NOTIFICATION_TECHNICAL_FAILURE | ||
case "Phone carrier is currently unreachable/unavailable": | ||
return NOTIFICATION_TEMPORARY_FAILURE | ||
case "Phone is currently unreachable/unavailable": | ||
return NOTIFICATION_PERMANENT_FAILURE | ||
case _: | ||
return None |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,61 @@ | ||||||
from time import monotonic | ||||||
|
||||||
import boto3 | ||||||
import phonenumbers | ||||||
from botocore.exceptions import ClientError | ||||||
|
||||||
|
||||||
from app.clients.sms import SmsClient | ||||||
|
||||||
|
||||||
class AwsPinpointClient(SmsClient): | ||||||
""" | ||||||
AWS Pinpoint SMS client | ||||||
""" | ||||||
|
||||||
def init_app(self, current_app, statsd_client, *args, **kwargs): | ||||||
self._client = boto3.client("pinpoint-sms-voice-v2", region_name="ca-central-1") | ||||||
super(AwsPinpointClient, self).__init__(*args, **kwargs) | ||||||
# super(SmsClient, self).__init__(*args, **kwargs) | ||||||
self.current_app = current_app | ||||||
self.name = "pinpoint" | ||||||
self.statsd_client = statsd_client | ||||||
|
||||||
def get_name(self): | ||||||
return self.name | ||||||
|
||||||
def send_sms(self, to, content, reference, multi=True, sender=None): | ||||||
pool_id = self.current_app.config["AWS_PINPOINT_SC_POOL_ID"] | ||||||
messageType = "TRANSACTIONAL" | ||||||
matched = False | ||||||
|
||||||
for match in phonenumbers.PhoneNumberMatcher(to, "US"): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's used as the default region, ie if you don't start your number with a region code then we assume it's US / CA. |
||||||
matched = True | ||||||
|
||||||
to = phonenumbers.format_number(match.number, phonenumbers.PhoneNumberFormat.E164) | ||||||
destinationNumber = to | ||||||
|
||||||
try: | ||||||
start_time = monotonic() | ||||||
response = self._client.send_text_message( | ||||||
DestinationPhoneNumber=destinationNumber, | ||||||
OriginationIdentity=pool_id, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🥳 |
||||||
MessageBody=content, | ||||||
MessageType=messageType, | ||||||
ConfigurationSetName=self.current_app.config["AWS_PINPOINT_CONFIGURATION_SET_NAME"], | ||||||
) | ||||||
except ClientError as e: | ||||||
self.statsd_client.incr("clients.pinpoint.error") | ||||||
raise Exception(e) | ||||||
except Exception as e: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can combine these two if the logic is exactly the same.
Suggested change
There might be syntactic sugar to avoid the tuple and rather use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 a |
||||||
self.statsd_client.incr("clients.pinpoint.error") | ||||||
raise Exception(e) | ||||||
finally: | ||||||
elapsed_time = monotonic() - start_time | ||||||
self.current_app.logger.info("AWS Pinpoint request finished in {}".format(elapsed_time)) | ||||||
self.statsd_client.timing("clients.pinpoint.request-time", elapsed_time) | ||||||
self.statsd_client.incr("clients.pinpoint.success") | ||||||
return response["MessageId"] | ||||||
|
||||||
if not matched: | ||||||
self.statsd_client.incr("clients.pinpoint.error") | ||||||
self.current_app.logger.error("No valid numbers found in {}".format(to)) | ||||||
raise ValueError("No valid numbers found for SMS delivery") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like these mocks and it's a good doc to know what is returned by the callback.