diff --git a/src/cli.py b/src/cli.py index 8d05fc1..09b6be4 100644 --- a/src/cli.py +++ b/src/cli.py @@ -64,14 +64,29 @@ def main(): # vault test: -from presidio_anonymizer.entities import OperatorConfig -def vault_encrypt(text): - return text + "x" -operators = {"DEFAULT": OperatorConfig("custom", {"lambda": vault_encrypt})} - -t = "Hi my name is Qwerty and I live in London. My number is 07440 123456." -res = text_analyzer(t, "en") -anon_res = text_anonymizer(t, res, operators) - -print(anon_res) +# VAULT_URL = "http://127.0.0.1:8200" +# from presidio_anonymizer.anonymizer_engine import AnonymizerEngine +# from presidio_anonymizer.deanonymize_engine import DeanonymizeEngine +# from presidio_anonymizer.entities import OperatorConfig +# from text.text import text_analyzer +# from operators.vault import VaultEncrypt, VaultDecrypt +# print("Analyze:") +# t = "Hi my name is Qwerty and I live in London. My number is 07440 123456." +# res = text_analyzer(t, "en") +# print(res) + +# print("Anonymize:") +# anonymizer = AnonymizerEngine() +# anonymizer.add_anonymizer(VaultEncrypt) +# operators = {"DEFAULT": OperatorConfig("vault_encrypt", {"vault_url": VAULT_URL})} +# anon_res = anonymizer.anonymize(t, res, operators) +# print(anon_res.text) + + +# print("Deanonymize:") +# deanonymizer = DeanonymizeEngine() +# deanonymizer.add_deanonymizer(VaultDecrypt) +# de_ops = {"DEFAULT": OperatorConfig("vault_decrypt", {"vault_url": VAULT_URL})} +# deanon_res = deanonymizer.deanonymize(anon_res.text, anon_res.items, de_ops) +# print(deanon_res.text) diff --git a/src/operators/__init__.py b/src/operators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/operators/vault.py b/src/operators/vault.py new file mode 100644 index 0000000..978104d --- /dev/null +++ b/src/operators/vault.py @@ -0,0 +1,80 @@ +import base64 +from typing import Dict +from urllib.parse import urlparse + +import hvac +from presidio_anonymizer.entities import InvalidParamException +from presidio_anonymizer.operators import Operator, OperatorType + + +class VaultEncrypt(Operator): + def _base64ify(self, bytes_or_str): + if isinstance(bytes_or_str, str): + input_bytes = bytes_or_str.encode('utf8') + else: + input_bytes = bytes_or_str + + output_bytes = base64.urlsafe_b64encode(input_bytes) + return output_bytes.decode('ascii') + + def operate(self, text: str, params: Dict = None) -> str: + vault_url = params.get("vault_url") + client = hvac.Client(url=vault_url) + + encrypt_data_response = client.secrets.transit.encrypt_data( + name='orders', + plaintext=self._base64ify(text), + ) + + ciphertext = encrypt_data_response['data']['ciphertext'] + return ciphertext + + def validate(self, params: Dict = None) -> None: + vault_url = params.get("vault_url") + if isinstance(vault_url, str): + result = urlparse(vault_url) + if result.scheme and result.netloc: + pass + else: + raise InvalidParamException(f"Invalid input, vault_url must be a valid URL.") + else: + raise InvalidParamException(f"Invalid input, vault_url must be a string.") + + def operator_name(self) -> str: + return "vault_encrypt" + + def operator_type(self) -> OperatorType: + return OperatorType.Anonymize + + + +class VaultDecrypt(Operator): + def operate(self, text: str, params: Dict = None) -> str: + vault_url = params.get("vault_url") + client = hvac.Client(url=vault_url) + + decrypt_data_response = client.secrets.transit.decrypt_data( + name='orders', + ciphertext=text, + ) + + encodedtext = decrypt_data_response['data']['plaintext'] + plaintext = base64.b64decode(encodedtext).decode('utf8') + return plaintext + + def validate(self, params: Dict = None) -> None: + vault_url = params.get("vault_url") + if isinstance(vault_url, str): + result = urlparse(vault_url) + if result.scheme and result.netloc: + pass + else: + raise InvalidParamException(f"Invalid input, vault_url must be a valid URL.") + else: + raise InvalidParamException(f"Invalid input, vault_url must be a string.") + + def operator_name(self) -> str: + return "vault_decrypt" + + def operator_type(self) -> OperatorType: + return OperatorType.Deanonymize diff --git a/src/vault.py b/src/vault.py deleted file mode 100644 index 8304748..0000000 --- a/src/vault.py +++ /dev/null @@ -1,50 +0,0 @@ -from text.text import text_analyzer, text_anonymizer -from presidio_anonymizer.entities import OperatorConfig -import base64 -import hvac -import sys - -VAULT_URL = "http://127.0.0.1:8200" - - -def base64ify(bytes_or_str): - """Helper method to perform base64 encoding across Python 2.7 and Python 3.X""" - if isinstance(bytes_or_str, str): - input_bytes = bytes_or_str.encode('utf8') - else: - input_bytes = bytes_or_str - - output_bytes = base64.urlsafe_b64encode(input_bytes) - return output_bytes.decode('ascii') - - -def vault_encrypt(plaintext): - client = hvac.Client(url=VAULT_URL) - - encrypt_data_response = client.secrets.transit.encrypt_data( - name='orders', - plaintext=base64ify(plaintext), - ) - - ciphertext = encrypt_data_response['data']['ciphertext'] - return ciphertext - - -def vault_decrypt(ciphertext): - client = hvac.Client(url=VAULT_URL) - - decrypt_data_response = client.secrets.transit.decrypt_data( - name='orders', - ciphertext=ciphertext, - ) - - encodedtext = decrypt_data_response['data']['plaintext'] - plaintext = base64.b64decode(encodedtext).decode('utf8') - return plaintext - -operators = {"DEFAULT": OperatorConfig("custom", {"lambda": vault_encrypt})} -t = "Hi my name is Qwerty and I live in London. My number is 07440 123456." -res = text_analyzer(t, "en") -anon_res = text_anonymizer(t, res, operators) - -print(anon_res)