From 83b1d221595937579e61385d6ddb6f27f7ffdc15 Mon Sep 17 00:00:00 2001 From: Kyle MacMillan Date: Wed, 22 May 2024 18:42:14 -0400 Subject: [PATCH] Added additional auth and encryption --- .../vetext_incoming_forwarder_lambda.py | 91 ++++++++++++++----- .../test_vetext_incoming_forwarder_lambda.py | 58 +++++++++++- 2 files changed, 125 insertions(+), 24 deletions(-) diff --git a/lambda_functions/vetext_incoming_forwarder_lambda/vetext_incoming_forwarder_lambda.py b/lambda_functions/vetext_incoming_forwarder_lambda/vetext_incoming_forwarder_lambda.py index c0eb0c9219..48d1c56c61 100644 --- a/lambda_functions/vetext_incoming_forwarder_lambda/vetext_incoming_forwarder_lambda.py +++ b/lambda_functions/vetext_incoming_forwarder_lambda/vetext_incoming_forwarder_lambda.py @@ -1,11 +1,13 @@ """This module is used to transfer incoming twilio requests to a Vetext endpoint.""" import base64 +from cryptography.fernet import Fernet, MultiFernet import json import logging import os import sys from functools import lru_cache +from typing import Union from urllib.parse import parse_qsl, parse_qs import boto3 @@ -25,36 +27,78 @@ HTTPTIMEOUT = (3.05, 1) TWILIO_AUTH_TOKEN_SSM_NAME = os.getenv('TWILIO_AUTH_TOKEN_SSM_NAME') +TWILIO_PH_AUTH_TOKEN_SSM_NAME = os.getenv('TWILIO_PH_AUTH_TOKEN_SSM_NAME') +LOG_ENCRYPTION_SSM_NAME = os.getenv('LOG_ENCRYPTION_SSM_NAME') -if TWILIO_AUTH_TOKEN_SSM_NAME is None or TWILIO_AUTH_TOKEN_SSM_NAME == 'DEFAULT': - sys.exit('A required environment variable is not set. Please set TWILIO_AUTH_TOKEN_SSM_NAME') +if TWILIO_AUTH_TOKEN_SSM_NAME is None or TWILIO_AUTH_TOKEN_SSM_NAME == 'DEFAULT' or LOG_ENCRYPTION_SSM_NAME is None: + sys.exit('A required environment variable is not set. Please ensure all env variables are set') TWILIO_VETEXT_PATH = '/twoway/vettext' TWILIO_VETEXT2_PATH = '/twoway/vetext2' -def get_twilio_token() -> str: - """ - Is run on instantiation. - Defined here and in delivery_status_processor - @return: Twilio Token from SSM +def get_ssm_params(params: Union[list[str], str]) -> Union[list[str], str]: + """Collects parameter(s) depending on params passed in + + Args: + params (Union[list[str], str]): parameter names + Returns: + Union[list[str], str]: The value(s) of the given parameter(s) """ try: - if TWILIO_AUTH_TOKEN_SSM_NAME == 'unit_test': - return 'bad_twilio_auth' ssm_client = boto3.client('ssm') - auth_ssm_key = os.getenv('TWILIO_AUTH_TOKEN_SSM_NAME', '') - if not auth_ssm_key: - logger.error('TWILIO_AUTH_TOKEN_SSM_NAME not set') + if isinstance(params, list): + response = ssm_client.get_parameters( + Names=params, + WithDecryption=True, + ) + params_value = [parameter['Value'] for parameter in response['Parameters']] + else: + response = ssm_client.get_parameter( + Name=params, + WithDecryption=True, + ) + params_value = response['Parameter']['Value'] + except Exception as exc: + logger.error('Failed to get parameter value for: %s - encountered: %s', params, exc) + sys.exit('Unable to retrieve parameter store value, exiting') + + return params_value + + +def get_twilio_tokens() -> list[str]: + """ + Is run during execution environment setup. + @return: List of Twilio auth tokens from SSM + """ + try: + if TWILIO_AUTH_TOKEN_SSM_NAME == 'unit_test' or TWILIO_PH_AUTH_TOKEN_SSM_NAME == 'unit_test': + # item 0 was the auth token used to sign the body of the request + return ['bad_twilio_auth', 'invalid_auth', 'unit_test'] - response = ssm_client.get_parameter(Name=auth_ssm_key, WithDecryption=True) - return response.get('Parameter').get('Value') + return get_ssm_params([TWILIO_AUTH_TOKEN_SSM_NAME, TWILIO_PH_AUTH_TOKEN_SSM_NAME]) except Exception: - logger.error('Failed to retrieve Twilio Auth') - return None + logger.error('Failed to retrieve required paramaters from SSM') + sys.exit('Unable to retrieve required auth token(s), exiting') + + +def get_encryption() -> MultiFernet: + """Collects the log encryption key(s) and sets up the MultiFernet used for log encryption""" + if LOG_ENCRYPTION_SSM_NAME == 'fake_value': + return MultiFernet([Fernet(Fernet.generate_key()), Fernet(Fernet.generate_key())]) + try: + encryption_log_key_str = get_ssm_params(LOG_ENCRYPTION_SSM_NAME) + key_list: list[str] = encryption_log_key_str.replace(' ', '').split(',') + multifernet = MultiFernet([Fernet(key.encode()) for key in key_list]) + except Exception as exc: + logger.error('Failed to set encryption key for failed validation logging: %s', exc) + sys.exit('Unable to retrieve/set required encryption keys, exiting') + + return multifernet -auth_token = get_twilio_token() +auth_tokens = get_twilio_tokens() +encryption = get_encryption() def validate_twilio_event(event: dict) -> bool: @@ -68,16 +112,21 @@ def validate_twilio_event(event: dict) -> bool: try: signature = event['headers'].get('x-twilio-signature', '') - if not auth_token or not signature: - logger.error('TWILIO_AUTH_TOKEN or signature not set') + if not auth_tokens or not signature: + logger.error('Twilio auth token(s) or signature not set') return False - validator = RequestValidator(auth_token) + print(f'{auth_tokens=}') + print(f'{event=}') + print(f"{signature == event['headers']['x-twilio-signature']=}", signature) + validators = [RequestValidator(auth_token) for auth_token in auth_tokens] uri = f"https://{event['headers']['host']}/vanotify{event['path']}" decoded = base64.b64decode(event.get('body')).decode('utf-8') params = parse_qs(decoded, keep_blank_values=True) params = {k: v[0] for k, v in params.items()} - return validator.validate(uri=uri, params=params, signature=signature) + valid_list = [validator.validate(uri=uri, params=params, signature=signature) for validator in validators] + print(f'{valid_list=}') + return any(valid_list) except Exception as e: logger.error('Error validating request origin: %s', e) return False diff --git a/tests/lambda_functions/vetext_incoming_forwarder_lambda/test_vetext_incoming_forwarder_lambda.py b/tests/lambda_functions/vetext_incoming_forwarder_lambda/test_vetext_incoming_forwarder_lambda.py index c26136b4e6..e9f67e5e3e 100644 --- a/tests/lambda_functions/vetext_incoming_forwarder_lambda/test_vetext_incoming_forwarder_lambda.py +++ b/tests/lambda_functions/vetext_incoming_forwarder_lambda/test_vetext_incoming_forwarder_lambda.py @@ -6,6 +6,7 @@ ) """ +from copy import deepcopy import pytest import json import base64 @@ -134,6 +135,8 @@ def missing_domain_env_param(monkeypatch): monkeypatch.setenv('vetext_api_endpoint_path', VETEXT_URI_PATH) monkeypatch.setenv('vetext_api_auth_ssm_path', 'ssm') monkeypatch.setenv('TWILIO_AUTH_TOKEN_SSM_NAME', 'unit_test') + monkeypatch.setenv('TWILIO_PH_AUTH_TOKEN_SSM_NAME', 'unit_test') + monkeypatch.setenv('LOG_ENCRYPTION_SSM_NAME', 'fake_value') @pytest.fixture @@ -141,6 +144,8 @@ def missing_api_endpoint_path_env_param(monkeypatch): monkeypatch.setenv('vetext_api_endpoint_domain', VETEXT_DOMAIN) monkeypatch.setenv('vetext_api_auth_ssm_path', 'ssm') monkeypatch.setenv('TWILIO_AUTH_TOKEN_SSM_NAME', 'unit_test') + monkeypatch.setenv('TWILIO_PH_AUTH_TOKEN_SSM_NAME', 'unit_test') + monkeypatch.setenv('LOG_ENCRYPTION_SSM_NAME', 'fake_value') @pytest.fixture @@ -148,6 +153,8 @@ def missing_ssm_path_env_param(monkeypatch): monkeypatch.setenv('vetext_api_endpoint_domain', VETEXT_DOMAIN) monkeypatch.setenv('vetext_api_endpoint_path', VETEXT_URI_PATH) monkeypatch.setenv('TWILIO_AUTH_TOKEN_SSM_NAME', 'unit_test') + monkeypatch.setenv('TWILIO_PH_AUTH_TOKEN_SSM_NAME', 'unit_test') + monkeypatch.setenv('LOG_ENCRYPTION_SSM_NAME', 'fake_value') @pytest.fixture @@ -165,6 +172,8 @@ def all_path_env_param_set(monkeypatch): monkeypatch.setenv('vetext_request_dead_letter_sqs_url', 'someurl') monkeypatch.setenv('TWILIO_AUTH_TOKEN_SSM_NAME', 'unit_test') + monkeypatch.setenv('TWILIO_PH_AUTH_TOKEN_SSM_NAME', 'unit_test') + monkeypatch.setenv('LOG_ENCRYPTION_SSM_NAME', 'fake_value') LAMBDA_MODULE = 'lambda_functions.vetext_incoming_forwarder_lambda.vetext_incoming_forwarder_lambda' @@ -214,8 +223,9 @@ def test_request_makes_vetext2_call(mocker, monkeypatch, all_path_env_param_set, mocker.patch(f'{LAMBDA_MODULE}.read_from_ssm', return_value='ssm') mock_requests = mocker.patch(f'{LAMBDA_MODULE}.requests.post', return_value=mocked_requests_post_success()) - event['path'] = '/twoway/vetext2' - response = vetext_incoming_forwarder_lambda_handler(event, False) + new_event = deepcopy(event) + new_event['path'] = '/twoway/vetext2' + response = vetext_incoming_forwarder_lambda_handler(new_event, False) assert mock_requests.call_count == 1 assert mock_requests.call_args[0][0] == 'https://some.domain-two/some/path/two' @@ -472,7 +482,7 @@ def test_twilio_validate_failure(mocker, monkeypatch, all_path_env_param_set): vetext_incoming_forwarder_lambda_handler, ) - broken_headers = albInvokedWithoutAddOn + broken_headers = deepcopy(albInvokedWithoutAddOn) broken_headers['headers']['x-twilio-signature'] = 'spoofed' response = vetext_incoming_forwarder_lambda_handler(broken_headers, True) assert response['statusCode'] == 403 @@ -481,3 +491,45 @@ def test_twilio_validate_failure(mocker, monkeypatch, all_path_env_param_set): del missing_header['headers']['x-twilio-signature'] response = vetext_incoming_forwarder_lambda_handler(missing_header, True) assert response['statusCode'] == 403 + + +@pytest.mark.parametrize('event', [albInvokedWithoutAddOn, albInvokeWithAddOn]) +def test_ut_validate_twilio_event_returns_true(all_path_env_param_set, event): + from lambda_functions.vetext_incoming_forwarder_lambda.vetext_incoming_forwarder_lambda import ( + validate_twilio_event, + ) + + assert validate_twilio_event(event) + + +@pytest.mark.parametrize('event', [albInvokedWithoutAddOn, albInvokeWithAddOn]) +def test_ut_validate_twilio_event_returns_false(all_path_env_param_set, event): + from lambda_functions.vetext_incoming_forwarder_lambda.vetext_incoming_forwarder_lambda import ( + validate_twilio_event, + ) + + new_event = deepcopy(event) + new_event['headers']['x-twilio-signature'] = 'invalid' + assert not validate_twilio_event(new_event) + + +@pytest.mark.parametrize( + 'fernet', + [ + 'xY32dXHvcBordkq7Kjbn9M8imSZDyVUztEmgbKT5nLo=', + 'xY32dXHvcBordkq7Kjbn9M8imSZDyVUztEmgbKT5nLo=,dLe_roaT68nvG-uHE0I4VRNwAsSIRGpVxybg0mPGaR4=', + ], +) +@pytest.mark.parametrize('logs', ['test value', {'record': {'test': 'data'}}, None]) +def test_ut_multifernet(all_path_env_param_set, mocker, logs, fernet): + mocker.patch( + f'{LAMBDA_MODULE}.get_ssm_params', + return_value=fernet, # Fake + ) + from lambda_functions.vetext_incoming_forwarder_lambda.vetext_incoming_forwarder_lambda import ( + get_encryption, + ) + + encryption = get_encryption() + encrypted_value = encryption.encrypt(f'{logs}'.encode()) + assert str(logs) == encryption.decrypt(encrypted_value).decode()