Skip to content

Commit

Permalink
Added additional auth and encryption
Browse files Browse the repository at this point in the history
  • Loading branch information
k-macmillan committed May 22, 2024
1 parent 20d4b5e commit 83b1d22
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""This module is used to transfer incoming twilio requests to a Vetext endpoint."""

import base64
from cryptography.fernet import Fernet, MultiFernet
import json
import logging
import os
import sys
from functools import lru_cache
from typing import Union
from urllib.parse import parse_qsl, parse_qs

import boto3
Expand All @@ -25,36 +27,78 @@
HTTPTIMEOUT = (3.05, 1)

TWILIO_AUTH_TOKEN_SSM_NAME = os.getenv('TWILIO_AUTH_TOKEN_SSM_NAME')
TWILIO_PH_AUTH_TOKEN_SSM_NAME = os.getenv('TWILIO_PH_AUTH_TOKEN_SSM_NAME')
LOG_ENCRYPTION_SSM_NAME = os.getenv('LOG_ENCRYPTION_SSM_NAME')

if TWILIO_AUTH_TOKEN_SSM_NAME is None or TWILIO_AUTH_TOKEN_SSM_NAME == 'DEFAULT':
sys.exit('A required environment variable is not set. Please set TWILIO_AUTH_TOKEN_SSM_NAME')
if TWILIO_AUTH_TOKEN_SSM_NAME is None or TWILIO_AUTH_TOKEN_SSM_NAME == 'DEFAULT' or LOG_ENCRYPTION_SSM_NAME is None:
sys.exit('A required environment variable is not set. Please ensure all env variables are set')

TWILIO_VETEXT_PATH = '/twoway/vettext'
TWILIO_VETEXT2_PATH = '/twoway/vetext2'


def get_twilio_token() -> str:
"""
Is run on instantiation.
Defined here and in delivery_status_processor
@return: Twilio Token from SSM
def get_ssm_params(params: Union[list[str], str]) -> Union[list[str], str]:
"""Collects parameter(s) depending on params passed in
Args:
params (Union[list[str], str]): parameter names
Returns:
Union[list[str], str]: The value(s) of the given parameter(s)
"""
try:
if TWILIO_AUTH_TOKEN_SSM_NAME == 'unit_test':
return 'bad_twilio_auth'
ssm_client = boto3.client('ssm')
auth_ssm_key = os.getenv('TWILIO_AUTH_TOKEN_SSM_NAME', '')
if not auth_ssm_key:
logger.error('TWILIO_AUTH_TOKEN_SSM_NAME not set')
if isinstance(params, list):
response = ssm_client.get_parameters(
Names=params,
WithDecryption=True,
)
params_value = [parameter['Value'] for parameter in response['Parameters']]
else:
response = ssm_client.get_parameter(
Name=params,
WithDecryption=True,
)
params_value = response['Parameter']['Value']
except Exception as exc:
logger.error('Failed to get parameter value for: %s - encountered: %s', params, exc)
sys.exit('Unable to retrieve parameter store value, exiting')

return params_value


def get_twilio_tokens() -> list[str]:
"""
Is run during execution environment setup.
@return: List of Twilio auth tokens from SSM
"""
try:
if TWILIO_AUTH_TOKEN_SSM_NAME == 'unit_test' or TWILIO_PH_AUTH_TOKEN_SSM_NAME == 'unit_test':
# item 0 was the auth token used to sign the body of the request
return ['bad_twilio_auth', 'invalid_auth', 'unit_test']

response = ssm_client.get_parameter(Name=auth_ssm_key, WithDecryption=True)
return response.get('Parameter').get('Value')
return get_ssm_params([TWILIO_AUTH_TOKEN_SSM_NAME, TWILIO_PH_AUTH_TOKEN_SSM_NAME])
except Exception:
logger.error('Failed to retrieve Twilio Auth')
return None
logger.error('Failed to retrieve required paramaters from SSM')
sys.exit('Unable to retrieve required auth token(s), exiting')


def get_encryption() -> MultiFernet:
"""Collects the log encryption key(s) and sets up the MultiFernet used for log encryption"""
if LOG_ENCRYPTION_SSM_NAME == 'fake_value':
return MultiFernet([Fernet(Fernet.generate_key()), Fernet(Fernet.generate_key())])
try:
encryption_log_key_str = get_ssm_params(LOG_ENCRYPTION_SSM_NAME)
key_list: list[str] = encryption_log_key_str.replace(' ', '').split(',')
multifernet = MultiFernet([Fernet(key.encode()) for key in key_list])
except Exception as exc:
logger.error('Failed to set encryption key for failed validation logging: %s', exc)
sys.exit('Unable to retrieve/set required encryption keys, exiting')

return multifernet


auth_token = get_twilio_token()
auth_tokens = get_twilio_tokens()
encryption = get_encryption()


def validate_twilio_event(event: dict) -> bool:
Expand All @@ -68,16 +112,21 @@ def validate_twilio_event(event: dict) -> bool:

try:
signature = event['headers'].get('x-twilio-signature', '')
if not auth_token or not signature:
logger.error('TWILIO_AUTH_TOKEN or signature not set')
if not auth_tokens or not signature:
logger.error('Twilio auth token(s) or signature not set')
return False
validator = RequestValidator(auth_token)
print(f'{auth_tokens=}')
print(f'{event=}')
print(f"{signature == event['headers']['x-twilio-signature']=}", signature)
validators = [RequestValidator(auth_token) for auth_token in auth_tokens]
uri = f"https://{event['headers']['host']}/vanotify{event['path']}"

decoded = base64.b64decode(event.get('body')).decode('utf-8')
params = parse_qs(decoded, keep_blank_values=True)
params = {k: v[0] for k, v in params.items()}
return validator.validate(uri=uri, params=params, signature=signature)
valid_list = [validator.validate(uri=uri, params=params, signature=signature) for validator in validators]
print(f'{valid_list=}')
return any(valid_list)
except Exception as e:
logger.error('Error validating request origin: %s', e)
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
)
"""

from copy import deepcopy
import pytest
import json
import base64
Expand Down Expand Up @@ -134,20 +135,26 @@ def missing_domain_env_param(monkeypatch):
monkeypatch.setenv('vetext_api_endpoint_path', VETEXT_URI_PATH)
monkeypatch.setenv('vetext_api_auth_ssm_path', 'ssm')
monkeypatch.setenv('TWILIO_AUTH_TOKEN_SSM_NAME', 'unit_test')
monkeypatch.setenv('TWILIO_PH_AUTH_TOKEN_SSM_NAME', 'unit_test')
monkeypatch.setenv('LOG_ENCRYPTION_SSM_NAME', 'fake_value')


@pytest.fixture
def missing_api_endpoint_path_env_param(monkeypatch):
monkeypatch.setenv('vetext_api_endpoint_domain', VETEXT_DOMAIN)
monkeypatch.setenv('vetext_api_auth_ssm_path', 'ssm')
monkeypatch.setenv('TWILIO_AUTH_TOKEN_SSM_NAME', 'unit_test')
monkeypatch.setenv('TWILIO_PH_AUTH_TOKEN_SSM_NAME', 'unit_test')
monkeypatch.setenv('LOG_ENCRYPTION_SSM_NAME', 'fake_value')


@pytest.fixture
def missing_ssm_path_env_param(monkeypatch):
monkeypatch.setenv('vetext_api_endpoint_domain', VETEXT_DOMAIN)
monkeypatch.setenv('vetext_api_endpoint_path', VETEXT_URI_PATH)
monkeypatch.setenv('TWILIO_AUTH_TOKEN_SSM_NAME', 'unit_test')
monkeypatch.setenv('TWILIO_PH_AUTH_TOKEN_SSM_NAME', 'unit_test')
monkeypatch.setenv('LOG_ENCRYPTION_SSM_NAME', 'fake_value')


@pytest.fixture
Expand All @@ -165,6 +172,8 @@ def all_path_env_param_set(monkeypatch):
monkeypatch.setenv('vetext_request_dead_letter_sqs_url', 'someurl')

monkeypatch.setenv('TWILIO_AUTH_TOKEN_SSM_NAME', 'unit_test')
monkeypatch.setenv('TWILIO_PH_AUTH_TOKEN_SSM_NAME', 'unit_test')
monkeypatch.setenv('LOG_ENCRYPTION_SSM_NAME', 'fake_value')


LAMBDA_MODULE = 'lambda_functions.vetext_incoming_forwarder_lambda.vetext_incoming_forwarder_lambda'
Expand Down Expand Up @@ -214,8 +223,9 @@ def test_request_makes_vetext2_call(mocker, monkeypatch, all_path_env_param_set,
mocker.patch(f'{LAMBDA_MODULE}.read_from_ssm', return_value='ssm')
mock_requests = mocker.patch(f'{LAMBDA_MODULE}.requests.post', return_value=mocked_requests_post_success())

event['path'] = '/twoway/vetext2'
response = vetext_incoming_forwarder_lambda_handler(event, False)
new_event = deepcopy(event)
new_event['path'] = '/twoway/vetext2'
response = vetext_incoming_forwarder_lambda_handler(new_event, False)

assert mock_requests.call_count == 1
assert mock_requests.call_args[0][0] == 'https://some.domain-two/some/path/two'
Expand Down Expand Up @@ -472,7 +482,7 @@ def test_twilio_validate_failure(mocker, monkeypatch, all_path_env_param_set):
vetext_incoming_forwarder_lambda_handler,
)

broken_headers = albInvokedWithoutAddOn
broken_headers = deepcopy(albInvokedWithoutAddOn)
broken_headers['headers']['x-twilio-signature'] = 'spoofed'
response = vetext_incoming_forwarder_lambda_handler(broken_headers, True)
assert response['statusCode'] == 403
Expand All @@ -481,3 +491,45 @@ def test_twilio_validate_failure(mocker, monkeypatch, all_path_env_param_set):
del missing_header['headers']['x-twilio-signature']
response = vetext_incoming_forwarder_lambda_handler(missing_header, True)
assert response['statusCode'] == 403


@pytest.mark.parametrize('event', [albInvokedWithoutAddOn, albInvokeWithAddOn])
def test_ut_validate_twilio_event_returns_true(all_path_env_param_set, event):
from lambda_functions.vetext_incoming_forwarder_lambda.vetext_incoming_forwarder_lambda import (
validate_twilio_event,
)

assert validate_twilio_event(event)


@pytest.mark.parametrize('event', [albInvokedWithoutAddOn, albInvokeWithAddOn])
def test_ut_validate_twilio_event_returns_false(all_path_env_param_set, event):
from lambda_functions.vetext_incoming_forwarder_lambda.vetext_incoming_forwarder_lambda import (
validate_twilio_event,
)

new_event = deepcopy(event)
new_event['headers']['x-twilio-signature'] = 'invalid'
assert not validate_twilio_event(new_event)


@pytest.mark.parametrize(
'fernet',
[
'xY32dXHvcBordkq7Kjbn9M8imSZDyVUztEmgbKT5nLo=',
'xY32dXHvcBordkq7Kjbn9M8imSZDyVUztEmgbKT5nLo=,dLe_roaT68nvG-uHE0I4VRNwAsSIRGpVxybg0mPGaR4=',
],
)
@pytest.mark.parametrize('logs', ['test value', {'record': {'test': 'data'}}, None])
def test_ut_multifernet(all_path_env_param_set, mocker, logs, fernet):
mocker.patch(
f'{LAMBDA_MODULE}.get_ssm_params',
return_value=fernet, # Fake
)
from lambda_functions.vetext_incoming_forwarder_lambda.vetext_incoming_forwarder_lambda import (
get_encryption,
)

encryption = get_encryption()
encrypted_value = encryption.encrypt(f'{logs}'.encode())
assert str(logs) == encryption.decrypt(encrypted_value).decode()

0 comments on commit 83b1d22

Please sign in to comment.