Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSM-463 - Python SDK - fix for record's data JSON where "fields": null #525

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_config_mode_dog_food(self):
sp = subprocess.run(["icacls.exe", Config.default_ini_file], capture_output=True)
if sp.stderr is not None and sp.stderr.decode() != "":
self.fail("Could not icacls.exe {}: {}".format(Config.default_ini_file,sp.stderr.decode()))
allowed_users = [user.lower(), "Administrators".lower()]
allowed_users = [user.decode().lower(), "Administrators".lower()]
for line in sp.stdout.decode().split("\n"):
parts = line[len(Config.default_ini_file):].split(":")
if len(parts) == 2:
Expand Down
7 changes: 4 additions & 3 deletions integration/keeper_secrets_manager_cli/tests/secret_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ def test_get(self):
# JSON Output w/ JQ to stdout
runner = CliRunner()
result = runner.invoke(cli, [
'secret', 'get', '-u', one.uid, '--json',
'secret', 'get', '-u', one.uid, '--json', '--inflate',
'--query', 'fields[*]'
], catch_exceptions=False)

self.assertEqual(0, result.exit_code, "the exit code was not 0")
fields = json.loads(result.output)
self.assertEqual(4, len(fields), "didn't find 4 objects in array")
self.assertEqual(6, len(fields), "didn't find 4 objects in array")

# Text Output to file
tf_name = self._make_temp_file()
Expand All @@ -183,8 +183,9 @@ def test_get(self):
result = runner.invoke(cli, [
'secret', 'get', '-u', one.uid,
'--query', '[*].fields[*].type',
'--force-array'
'--force-array', '--deflate',
], catch_exceptions=True)
print(f'result.output: {result.output}') # TODO: remove after test
data = json.loads(result.output)
self.assertEqual(4, len(data), "found 4 rows")
self.assertEqual(0, result.exit_code, "the exit code was not 0")
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/core/keeper_secrets_manager_core/dto/dtos.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def __init__(self, record_dict, secret_key, folder_uid = ''):

self.raw_json = record_data_json
self.dict = utils.json_to_dict(self.raw_json)
if self.dict and self.dict.get('fields') is None:
self.dict['fields'] = []
self.title = self.dict.get('title')
self.type = self.dict.get('type')
self.revision = record_dict.get('revision')
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/core/keeper_secrets_manager_core/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def add_file(self, name, title=None, content_type=None, url=None, content=None,

def dump(self, secret, flags=None):

fields = list(self._fields)
fields = self._fields

# If no files, the JSON has null
files = None
Expand Down
91 changes: 68 additions & 23 deletions sdk/python/core/tests/record_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest
import tempfile
import os
import tempfile
import unittest

from keeper_secrets_manager_core.storage import FileKeyValueStorage
from keeper_secrets_manager_core import SecretsManager
Expand All @@ -20,34 +20,40 @@ def tearDown(self):
os.chdir(self.orig_working_dir)

def test_the_login_record_password(self):

""" If the record type is login, the password will be placed in the instance attribute.
""" If the record type is login, the password will be placed
in the instance attribute.
"""

try:
with tempfile.NamedTemporaryFile("w", delete=False) as fh:
fh.write(MockConfig.make_json())
fh.seek(0)
secrets_manager = SecretsManager(config=FileKeyValueStorage(config_file_location=fh.name))
secrets_manager = SecretsManager(
config=FileKeyValueStorage(config_file_location=fh.name))

# A good record.
# 'fields': [...{'type': 'password', 'value': ['My Password']}...]
# 'fields':[{'type': 'password', 'value': ['My Password']}...]
good_res = mock.Response()
good = good_res.add_record(title="Good Record", record_type='login')
good = good_res.add_record(
title="Good Record", record_type='login')
good.field("login", "My Login")
good.field("password", "My Password")

# A bad record. This would be like if someone removed a password text from an existing field.
# A bad record. This would be like if someone removed
# a password text from an existing field.
# 'fields': [...{'type': 'password', 'value': []}...]
bad_res = mock.Response()
bad = bad_res.add_record(title="Bad Record", record_type='login')
bad = bad_res.add_record(
title="Bad Record", record_type='login')
bad.field("login", "My Login")
bad.field("password", [])

# An ugly record. The application didn't even add the field. We need to set flags to prune empty fields.
# An ugly record. The application didn't even add the field.
# We need to set flags to prune empty fields.
# 'fields': [...]
ugly_res = mock.Response(flags={"prune_empty_fields": True})
ugly = ugly_res.add_record(title="Ugly Record", record_type='login')
ugly = ugly_res.add_record(
title="Ugly Record", record_type='login')
ugly.field("login", "My Login")

# this will be removed from the fields array.
Expand All @@ -59,16 +65,23 @@ def test_the_login_record_password(self):
res_queue.add_response(ugly_res)

records = secrets_manager.get_secrets()
self.assertEqual(1, len(records), "didn't get 1 record for the good")
self.assertEqual("My Password", records[0].password, "did not get correct password for the good")
self.assertEqual(
1, len(records), "didn't get 1 record for the good")
self.assertEqual(
"My Password", records[0].password,
"did not get correct password for the good")

records = secrets_manager.get_secrets()
self.assertEqual(1, len(records), "didn't get 1 record for the bad")
self.assertIsNone(records[0].password, "password is defined for the bad")
self.assertEqual(
1, len(records), "didn't get 1 record for the bad")
self.assertIsNone(records[0].password,
"password is defined for the bad")

records = secrets_manager.get_secrets()
self.assertEqual(1, len(records), "didn't get 1 record for the ugly")
self.assertIsNone(records[0].password, "password is defined for the ugly")
self.assertEqual(
1, len(records), "didn't get 1 record for the ugly")
self.assertIsNone(records[0].password,
"password is defined for the ugly")
finally:
try:
os.unlink(fh.name)
Expand All @@ -77,17 +90,21 @@ def test_the_login_record_password(self):

def test_record_field(self):

rf = RecordField(field_type="login", value="test", label="Test", required=True, enforceGeneration=False,
rf = RecordField(field_type="login", value="test", label="Test",
required=True, enforceGeneration=False,
privacyScreen=True, complexity={"foo": "bar"})

value = helpers.obj_to_dict(rf)
self.assertEqual("login", value.get("type"), "type is not correct")
self.assertEqual(["test"], value.get("value"), "value is not correct")
self.assertEqual("Test", value.get("label"), "label is not correct")
self.assertTrue(value.get("required"), "required is not correct")
self.assertFalse(value.get("enforceGeneration"), "enforceGeneration is not correct")
self.assertTrue(value.get("privacyScreen"), "privacyScreen is not correct")
self.assertIsNotNone(value.get("complexity"), "complexity is not correct")
self.assertFalse(value.get("enforceGeneration"),
"enforceGeneration is not correct")
self.assertTrue(value.get("privacyScreen"),
"privacyScreen is not correct")
self.assertIsNotNone(value.get("complexity"),
"complexity is not correct")

rf = RecordField(field_type="login", value="test", privacyScreen=None)

Expand All @@ -96,7 +113,35 @@ def test_record_field(self):
self.assertEqual(["test"], value.get("value"), "value is not correct")
self.assertIsNone(value.get("label"), "label is not correct")
self.assertIsNone(value.get("required"), "required is not correct")
self.assertIsNone(value.get("enforceGeneration"), "enforceGeneration is not correct")
self.assertIsNone(value.get("enforceGeneration"),
"enforceGeneration is not correct")
assert "privacyScreen" not in value, "privacyScreen exists in dictionary"
self.assertIsNone(value.get("privacyScreen"), "privacyScreen is not correct")
self.assertIsNone(value.get("privacyScreen"),
"privacyScreen is not correct")
self.assertIsNone(value.get("complexity"), "complexity is not correct")

def test_missing_fields_section(self):
""" Test for clients that may set "fields": null in JSON data """

try:
with tempfile.NamedTemporaryFile("w", delete=False) as fh:
fh.write(MockConfig.make_json())
fh.seek(0)
secrets_manager = SecretsManager(
config=FileKeyValueStorage(config_file_location=fh.name))

res = mock.Response()
rec = res.add_record(title="MyLogin", record_type='login')
res.records[rec.uid]._fields = None
res_queue = mock.ResponseQueue(client=secrets_manager)
res_queue.add_response(res)

records = secrets_manager.get_secrets()
self.assertEqual(
1, len(records), "didn't get 1 record for MyLogin")
self.assertEqual([], records[0].dict.get('fields'))
finally:
try:
os.unlink(fh.name)
except OSError:
pass
Loading