Skip to content

Commit

Permalink
use custom operators from presidio for vault
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaykarle committed Sep 19, 2024
1 parent 4f2be88 commit cd6d338
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 60 deletions.
35 changes: 25 additions & 10 deletions src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,29 @@ def main():


# vault test:
from presidio_anonymizer.entities import OperatorConfig
def vault_encrypt(text):
return text + "x"

operators = {"DEFAULT": OperatorConfig("custom", {"lambda": vault_encrypt})}

t = "Hi my name is Qwerty and I live in London. My number is 07440 123456."
res = text_analyzer(t, "en")
anon_res = text_anonymizer(t, res, operators)

print(anon_res)
# VAULT_URL = "http://127.0.0.1:8200"
# from presidio_anonymizer.anonymizer_engine import AnonymizerEngine
# from presidio_anonymizer.deanonymize_engine import DeanonymizeEngine
# from presidio_anonymizer.entities import OperatorConfig
# from text.text import text_analyzer
# from operators.vault import VaultEncrypt, VaultDecrypt
# print("Analyze:")
# t = "Hi my name is Qwerty and I live in London. My number is 07440 123456."
# res = text_analyzer(t, "en")
# print(res)

# print("Anonymize:")
# anonymizer = AnonymizerEngine()
# anonymizer.add_anonymizer(VaultEncrypt)
# operators = {"DEFAULT": OperatorConfig("vault_encrypt", {"vault_url": VAULT_URL})}
# anon_res = anonymizer.anonymize(t, res, operators)
# print(anon_res.text)


# print("Deanonymize:")
# deanonymizer = DeanonymizeEngine()
# deanonymizer.add_deanonymizer(VaultDecrypt)
# de_ops = {"DEFAULT": OperatorConfig("vault_decrypt", {"vault_url": VAULT_URL})}
# deanon_res = deanonymizer.deanonymize(anon_res.text, anon_res.items, de_ops)
# print(deanon_res.text)
Empty file added src/operators/__init__.py
Empty file.
80 changes: 80 additions & 0 deletions src/operators/vault.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import base64
from typing import Dict
from urllib.parse import urlparse

import hvac
from presidio_anonymizer.entities import InvalidParamException
from presidio_anonymizer.operators import Operator, OperatorType


class VaultEncrypt(Operator):
def _base64ify(self, bytes_or_str):
if isinstance(bytes_or_str, str):
input_bytes = bytes_or_str.encode('utf8')
else:
input_bytes = bytes_or_str

output_bytes = base64.urlsafe_b64encode(input_bytes)
return output_bytes.decode('ascii')

def operate(self, text: str, params: Dict = None) -> str:
vault_url = params.get("vault_url")
client = hvac.Client(url=vault_url)

encrypt_data_response = client.secrets.transit.encrypt_data(
name='orders',
plaintext=self._base64ify(text),
)

ciphertext = encrypt_data_response['data']['ciphertext']
return ciphertext

def validate(self, params: Dict = None) -> None:
vault_url = params.get("vault_url")
if isinstance(vault_url, str):
result = urlparse(vault_url)
if result.scheme and result.netloc:
pass
else:
raise InvalidParamException(f"Invalid input, vault_url must be a valid URL.")
else:
raise InvalidParamException(f"Invalid input, vault_url must be a string.")

def operator_name(self) -> str:
return "vault_encrypt"

def operator_type(self) -> OperatorType:
return OperatorType.Anonymize



class VaultDecrypt(Operator):
def operate(self, text: str, params: Dict = None) -> str:
vault_url = params.get("vault_url")
client = hvac.Client(url=vault_url)

decrypt_data_response = client.secrets.transit.decrypt_data(
name='orders',
ciphertext=text,
)

encodedtext = decrypt_data_response['data']['plaintext']
plaintext = base64.b64decode(encodedtext).decode('utf8')
return plaintext

def validate(self, params: Dict = None) -> None:
vault_url = params.get("vault_url")
if isinstance(vault_url, str):
result = urlparse(vault_url)
if result.scheme and result.netloc:
pass
else:
raise InvalidParamException(f"Invalid input, vault_url must be a valid URL.")
else:
raise InvalidParamException(f"Invalid input, vault_url must be a string.")

def operator_name(self) -> str:
return "vault_decrypt"

def operator_type(self) -> OperatorType:
return OperatorType.Deanonymize
50 changes: 0 additions & 50 deletions src/vault.py

This file was deleted.

0 comments on commit cd6d338

Please sign in to comment.