Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redact multiline secrets #238

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions schemachange/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import copy
import hashlib
import json
import os
Expand Down Expand Up @@ -74,6 +75,7 @@
_log_auth_type ="Proceeding with %s authentication"
_log_pk_enc ="No private key passphrase provided. Assuming the key is not encrypted."
_log_okta_ep ="Okta Endpoint: %s"
_err_non_str_secret ="Found a secret that is not of type str."

#endregion Global Variables

Expand Down Expand Up @@ -183,7 +185,7 @@ def redact(self, context: str) -> str:
redacted = context
if redacted:
for secret in self.__secrets:
redacted = redacted.replace(secret, "*" * len(secret))
redacted = redacted.replace(secret, '\n'.join(["*" * len(l) for l in secret.split('\n')]))
return redacted


Expand Down Expand Up @@ -799,7 +801,10 @@ def inner_extract_dictionary_secrets(dictionary: Dict[str, Any], child_of_secret
else :
extracted_secrets = extracted_secrets | inner_extract_dictionary_secrets(value, child_of_secrets)
elif child_of_secrets or "SECRET" in key.upper():
if not isinstance(value, str):
raise ValueError(_err_non_str_secret)
extracted_secrets.add(value.strip())

return extracted_secrets

extracted = set()
Expand All @@ -809,6 +814,33 @@ def inner_extract_dictionary_secrets(dictionary: Dict[str, Any], child_of_secret
extracted = inner_extract_dictionary_secrets(config["vars"])
return extracted


def redact_config_vars(config_vars: Dict[str, Any]) -> Dict[str, Any]:
# Our goal is simply to print the config vars to stdout, but:
# - If we serialize to YAML and then redact, we won't redact multiline secrets,
# because YAML dump would add leading whitespace to them, changing them.
# - If we serialize to JSON and then redact, we may end up redacting floats, integers, etc,
# and we could not deserialise anymore in order to finally serialize to redacted YAML.

# Therefore, we have this function here, which sole purpose is to redact the config_vars
# dict leaf nodes (only those of type str). This dict can then be serialized to YAML
# or any other format by the caller.

# Using an inner function, cause I don't want to expose this generic signature to
# the caller, given the limited scope of what we are trying to accomplish
def inner_redact_config_vars(ob: Any) -> Any:
if isinstance(ob, dict):
return {key: inner_redact_config_vars(value) for key, value in ob.items()}
elif isinstance(ob, list):
return [inner_redact_config_vars(value) for value in ob]
elif isinstance(ob, str):
return SecretManager.global_redact(ob)
# Just to be sure, making a deep copy, cause ob can be a reference type
return copy.deepcopy(ob)

return inner_redact_config_vars(config_vars)


def main(argv=sys.argv):
parser = argparse.ArgumentParser(prog = 'schemachange', description = 'Apply schema changes to a Snowflake account. Full readme at https://github.com/Snowflake-Labs/schemachange', formatter_class = argparse.RawTextHelpFormatter)
subcommands = parser.add_subparsers(dest='subcommand')
Expand Down Expand Up @@ -883,11 +915,10 @@ def main(argv=sys.argv):
print("Using variables: {}")
else:
print("Using variables:")
print(textwrap.indent( \
SecretManager.global_redact(yaml.dump( \
config['vars'], \
sort_keys=False, \
default_flow_style=False)), prefix = " "))
print(textwrap.indent(yaml.dump( \
redact_config_vars(config['vars']), \
sort_keys=False, \
default_flow_style=False), prefix = " "))

# Finally, execute the command
if args.subcommand == 'render':
Expand Down
7 changes: 7 additions & 0 deletions tests/test_SecretManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ def test_SecretManager_given_secrets_when_redact_then_return_redacted_value():
assert result == "Hello *****!"


def test_SecretManager_given_multiline_secrets_when_redact_then_return_redacted_value():
sm = SecretManager()
sm.add("Hello\nworld")
result = sm.redact("Hello\nworld!")
assert result == "*****\n*****!"


def test_SecretManager_given_secrets_when_clear_then_should_hold_zero_secrets():
sm = SecretManager()
sm.add("world")
Expand Down
9 changes: 9 additions & 0 deletions tests/test_extract_config_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,12 @@ def test_extract_config_secrets_given__vars_with_same_secret_twice_then_only_ext

assert len(results) == 1
assert "SECRET_VALUE" in results


def test_extract_config_secrets_given__vars_with_multiline_secret_then_preserve_newlines():
config = {"vars": {"secrets" : {"database_name": "SECRET\nVALUE"} } }

results = extract_config_secrets(config)

assert len(results) == 1
assert "SECRET\nVALUE" in results
61 changes: 61 additions & 0 deletions tests/test_redact_config_vars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import pytest
from schemachange.cli import redact_config_vars, SecretManager


def test_redact_config_vars_given_secret_val_should_redact_it():
config_vars = {'secret' : 'secret_val'}
sm = SecretManager()
SecretManager.set_global_manager(sm)
sm.add('secret_val')
results = redact_config_vars(config_vars)
redacted = {'secret' : '**********'}
assert results == redacted


def test_redact_config_vars_given_number_like_secret_should_redact_it():
config_vars = {'secret' : '123'}
sm = SecretManager()
SecretManager.set_global_manager(sm)
sm.add('123')
results = redact_config_vars(config_vars)
redacted = {'secret' : '***'}
assert results == redacted


def test_redact_config_vars_given_number_like_secret_should_redact_it_but_not_an_actual_number():
config_vars = {'secret' : '123', 'public': 123}
sm = SecretManager()
SecretManager.set_global_manager(sm)
sm.add('123')
results = redact_config_vars(config_vars)
redacted = {'secret' : '***', 'public': 123}
assert results == redacted


def test_redact_config_vars_given_secret_should_redact_it_in_any_string_it_finds_it():
config_vars = {'secrets': {'password' : 'hi'}, 'jdbc': 'jdbc:mysql://127.0.0.1;user=john;password=hi'}
sm = SecretManager()
SecretManager.set_global_manager(sm)
sm.add('hi')
results = redact_config_vars(config_vars)
redacted = {'secrets': {'password' : '**'}, 'jdbc': 'jdbc:mysql://127.0.0.1;user=john;password=**'}
assert results == redacted


def test_redact_config_vars_given_secret_should_redact_it_in_any_string_it_finds_it_even_in_lists():
# At the moment, it's not documented that we could specify lists as values for variables, but,
# nothing is preventing people from trying things. Just in case, redact_config_vars is
# able to also redact strings inside lists. That's why this test is here.
# Finally note that tuples are not scanned, just making the point that this redact function cannot
# do everything; YAML deserialises lists as Python lists, not tuples, so we have no use case for tuples.
config_vars = {'secrets': {'password' : 'hi'}, 'jdbc': 'jdbc:mysql://127.0.0.1',
'accounts': [{'user': 'john', 'password': 'hi'}], 'greetings': ['hi', 'hello'], 'greetings_tuple': ('hi',)}
sm = SecretManager()
SecretManager.set_global_manager(sm)
sm.add('hi')
results = redact_config_vars(config_vars)
redacted = {'secrets': {'password' : '**'}, 'jdbc': 'jdbc:mysql://127.0.0.1',
'accounts': [{'user': 'john', 'password': '**'}], 'greetings': ['**', 'hello'], 'greetings_tuple': ('hi',)}
assert results == redacted