Skip to content

Commit

Permalink
make vault key a param and start adding some unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaykarle committed Sep 19, 2024
1 parent cd6d338 commit 570d2b4
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ def main():
# print("Anonymize:")
# anonymizer = AnonymizerEngine()
# anonymizer.add_anonymizer(VaultEncrypt)
# operators = {"DEFAULT": OperatorConfig("vault_encrypt", {"vault_url": VAULT_URL})}
# operators = {"DEFAULT": OperatorConfig("vault_encrypt", {"vault_url": VAULT_URL, "key": "orders"})}
# anon_res = anonymizer.anonymize(t, res, operators)
# print(anon_res.text)


# print("Deanonymize:")
# deanonymizer = DeanonymizeEngine()
# deanonymizer.add_deanonymizer(VaultDecrypt)
# de_ops = {"DEFAULT": OperatorConfig("vault_decrypt", {"vault_url": VAULT_URL})}
# de_ops = {"DEFAULT": OperatorConfig("vault_decrypt", {"vault_url": VAULT_URL, "key": "orders"})}
# deanon_res = deanonymizer.deanonymize(anon_res.text, anon_res.items, de_ops)
# print(deanon_res.text)
26 changes: 20 additions & 6 deletions src/operators/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ def _base64ify(self, bytes_or_str):

def operate(self, text: str, params: Dict = None) -> str:
vault_url = params.get("vault_url")
client = hvac.Client(url=vault_url)
key = params.get("key")

client = hvac.Client(url=vault_url)
encrypt_data_response = client.secrets.transit.encrypt_data(
name='orders',
name=key,
plaintext=self._base64ify(text),
)

ciphertext = encrypt_data_response['data']['ciphertext']

return ciphertext

def validate(self, params: Dict = None) -> None:
Expand All @@ -40,6 +41,12 @@ def validate(self, params: Dict = None) -> None:
else:
raise InvalidParamException(f"Invalid input, vault_url must be a string.")

key = params.get("key")
if isinstance(key, str) and key:
pass
else:
raise InvalidParamException(f"Invalid input, key must be a valid encryption key name.")

def operator_name(self) -> str:
return "vault_encrypt"

Expand All @@ -51,15 +58,16 @@ def operator_type(self) -> OperatorType:
class VaultDecrypt(Operator):
def operate(self, text: str, params: Dict = None) -> str:
vault_url = params.get("vault_url")
client = hvac.Client(url=vault_url)
key = params.get("key")

client = hvac.Client(url=vault_url)
decrypt_data_response = client.secrets.transit.decrypt_data(
name='orders',
name=key,
ciphertext=text,
)

encodedtext = decrypt_data_response['data']['plaintext']
plaintext = base64.b64decode(encodedtext).decode('utf8')

return plaintext

def validate(self, params: Dict = None) -> None:
Expand All @@ -73,6 +81,12 @@ def validate(self, params: Dict = None) -> None:
else:
raise InvalidParamException(f"Invalid input, vault_url must be a string.")

key = params.get("key")
if isinstance(key, str) and key:
pass
else:
raise InvalidParamException(f"Invalid input, key must be a valid encryption key name.")

def operator_name(self) -> str:
return "vault_decrypt"

Expand Down
47 changes: 47 additions & 0 deletions tests/operators/vault_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pytest

from presidio_anonymizer.entities import InvalidParamException
from operators.vault import VaultEncrypt, VaultDecrypt


class TestVaultEncrypt:
def test_given_valid_key_raises_no_exceptions(self):
VaultEncrypt().validate(params={"vault_url": "http://127.0.0.1:8200", "key": "foobar"})

def test_given_invalid_key_raises_exceptions(self):
with pytest.raises(
InvalidParamException,
match="Invalid input, key must be a valid encryption key name.",
):
VaultEncrypt().validate(params={"vault_url": "http://127.0.0.1:8200", "key": 1})

def test_given_valid_url_raises_no_exceptions(self):
VaultEncrypt().validate(params={"vault_url": "http://127.0.0.1:8200", "key": "foobar"})

def test_given_invalid_url_raises_exceptions(self):
with pytest.raises(
InvalidParamException,
match="Invalid input, vault_url must be a valid URL.",
):
VaultEncrypt().validate(params={"vault_url": "http:/127.0.0.1:8200", "key": "foobar"})

class TestVaultDecrypt:
def test_given_valid_key_raises_no_exceptions(self):
VaultDecrypt().validate(params={"vault_url": "http://127.0.0.1:8200", "key": "foobar"})

def test_given_invalid_key_raises_exceptions(self):
with pytest.raises(
InvalidParamException,
match="Invalid input, key must be a valid encryption key name.",
):
VaultDecrypt().validate(params={"vault_url": "http://127.0.0.1:8200", "key": 1})

def test_given_valid_url_raises_no_exceptions(self):
VaultDecrypt().validate(params={"vault_url": "http://127.0.0.1:8200", "key": "foobar"})

def test_given_invalid_url_raises_exceptions(self):
with pytest.raises(
InvalidParamException,
match="Invalid input, vault_url must be a valid URL.",
):
VaultDecrypt().validate(params={"vault_url": "http:/127.0.0.1:8200", "key": "foobar"})

0 comments on commit 570d2b4

Please sign in to comment.