Skip to content

Commit

Permalink
REL-4242 - Python SDK v16.6.2 - Empty fields and no dependency on hel…
Browse files Browse the repository at this point in the history
…per (#527)

* Update Python SDK to version 16.6.2 and updates change log

* KSM-463 - Python SDK - fix for record's data JSON where "fields": null (#525)

* KSM-458 Remove core dep on helper module

Refactor add_custom_field not to use the helper module. The method
will accept an instance of FieldType, however it is not tied to the
keeper_secrets_manager_helper.v3.field_type.FieldType module.

Allow the ability to set the type, label, and value not using the
FieldType instance. The method will still take the param `field`,
however it will also take `field_type`, `label`, and `value`. This
allows adding a custom field without have to use the helper module.

Removed references `keeper-secrets-manager-helper` from setup.py
and requirements.txt. This should break the circular reference.
  • Loading branch information
maksimu authored Sep 15, 2023
1 parent 3c4839b commit 67b1dc4
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 37 deletions.
2 changes: 1 addition & 1 deletion integration/keeper_secrets_manager_cli/tests/misc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_config_mode_dog_food(self):
sp = subprocess.run(["icacls.exe", Config.default_ini_file], capture_output=True)
if sp.stderr is not None and sp.stderr.decode() != "":
self.fail("Could not icacls.exe {}: {}".format(Config.default_ini_file,sp.stderr.decode()))
allowed_users = [user.lower(), "Administrators".lower()]
allowed_users = [user.decode().lower(), "Administrators".lower()]
for line in sp.stdout.decode().split("\n"):
parts = line[len(Config.default_ini_file):].split(":")
if len(parts) == 2:
Expand Down
Empty file removed sdk/python/__init__.py
Empty file.
4 changes: 4 additions & 0 deletions sdk/python/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ For more information see our official documentation page https://docs.keeper.io/

# Change Log

## 16.6.2
* KSM-463 - Python SDK - Fix a bug when fields is null
* KSM-458 - Python SDK - Remove core's dependency on the helper module. Fixes [issue 488](https://github.com/Keeper-Security/secrets-manager/issues/488)

## 16.6.1
* KSM 444 - Python - Added folderUid and innerFolderUid to Record

Expand Down
35 changes: 26 additions & 9 deletions sdk/python/core/keeper_secrets_manager_core/dto/dtos.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from keeper_secrets_manager_core import utils, helpers
from keeper_secrets_manager_core.crypto import CryptoUtils
from keeper_secrets_manager_core.exceptions import KeeperError
from keeper_secrets_manager_helper.v3.field_type import FieldType


class Record:
Expand Down Expand Up @@ -54,6 +53,8 @@ def __init__(self, record_dict, secret_key, folder_uid = ''):

self.raw_json = record_data_json
self.dict = utils.json_to_dict(self.raw_json)
if self.dict and self.dict.get('fields') is None:
self.dict['fields'] = []
self.title = self.dict.get('title')
self.type = self.dict.get('type')
self.revision = record_dict.get('revision')
Expand Down Expand Up @@ -184,16 +185,32 @@ def set_custom_field_value(self, field_type, value):
field["value"] = value
self._update()

def add_custom_field(self, field: FieldType) -> bool:
if issubclass(type(field), FieldType):
if self.dict.get('custom', None) is None:
self.dict['custom'] = []
custom = self.dict['custom']
def add_custom_field(self, field=None, field_type=None, label=None, value=None) -> bool:
if self.dict.get('custom', None) is None:
self.dict['custom'] = []
custom = self.dict['custom']

# Make backward compatible. Assumes keeper_secrets_manager_helper.v#.field_type.FieldType is passed in.
if field is not None:
if field.__class__.__name__ != "FieldType":
raise ValueError("The field is not an instance of FieldType")
fdict = field.to_dict()
custom.append(fdict)
self._update()
return True
return False
else:
if field_type is None:
return False
if isinstance(value, list) is False:
value = [value]
field_dict = {
"type": field_type,
"value": value
}
if label is not None:
field_dict["label"] = label
custom.append(field_dict)

self._update()
return True

# TODO: Deprecate this for better getter and setters
def field(self, field_type, value=None, single=False):
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/core/keeper_secrets_manager_core/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def add_file(self, name, title=None, content_type=None, url=None, content=None,

def dump(self, secret, flags=None):

fields = list(self._fields)
fields = list(self._fields) if isinstance(self._fields, list) else self._fields

# If no files, the JSON has null
files = None
Expand Down
1 change: 0 additions & 1 deletion sdk/python/core/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
keeper-secrets-manager-helper>=1.0.4
ecdsa
cryptography>=39.0.1
requests>=2.28.2
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
long_description = fp.read()

install_requires = [
'keeper-secrets-manager-helper>=1.0.4',
'ecdsa',
'requests',
'cryptography>=39.0.1',
Expand All @@ -20,7 +19,7 @@

setup(
name="keeper-secrets-manager-core",
version="16.6.1",
version="16.6.2",
description="Keeper Secrets Manager for Python 3",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
176 changes: 153 additions & 23 deletions sdk/python/core/tests/record_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest
import tempfile
import os
import tempfile
import unittest

from keeper_secrets_manager_core.storage import FileKeyValueStorage
from keeper_secrets_manager_core import SecretsManager
Expand All @@ -20,34 +20,40 @@ def tearDown(self):
os.chdir(self.orig_working_dir)

def test_the_login_record_password(self):

""" If the record type is login, the password will be placed in the instance attribute.
""" If the record type is login, the password will be placed
in the instance attribute.
"""

try:
with tempfile.NamedTemporaryFile("w", delete=False) as fh:
fh.write(MockConfig.make_json())
fh.seek(0)
secrets_manager = SecretsManager(config=FileKeyValueStorage(config_file_location=fh.name))
secrets_manager = SecretsManager(
config=FileKeyValueStorage(config_file_location=fh.name))

# A good record.
# 'fields': [...{'type': 'password', 'value': ['My Password']}...]
# 'fields':[{'type': 'password', 'value': ['My Password']}...]
good_res = mock.Response()
good = good_res.add_record(title="Good Record", record_type='login')
good = good_res.add_record(
title="Good Record", record_type='login')
good.field("login", "My Login")
good.field("password", "My Password")

# A bad record. This would be like if someone removed a password text from an existing field.
# A bad record. This would be like if someone removed
# a password text from an existing field.
# 'fields': [...{'type': 'password', 'value': []}...]
bad_res = mock.Response()
bad = bad_res.add_record(title="Bad Record", record_type='login')
bad = bad_res.add_record(
title="Bad Record", record_type='login')
bad.field("login", "My Login")
bad.field("password", [])

# An ugly record. The application didn't even add the field. We need to set flags to prune empty fields.
# An ugly record. The application didn't even add the field.
# We need to set flags to prune empty fields.
# 'fields': [...]
ugly_res = mock.Response(flags={"prune_empty_fields": True})
ugly = ugly_res.add_record(title="Ugly Record", record_type='login')
ugly = ugly_res.add_record(
title="Ugly Record", record_type='login')
ugly.field("login", "My Login")

# this will be removed from the fields array.
Expand All @@ -59,16 +65,23 @@ def test_the_login_record_password(self):
res_queue.add_response(ugly_res)

records = secrets_manager.get_secrets()
self.assertEqual(1, len(records), "didn't get 1 record for the good")
self.assertEqual("My Password", records[0].password, "did not get correct password for the good")
self.assertEqual(
1, len(records), "didn't get 1 record for the good")
self.assertEqual(
"My Password", records[0].password,
"did not get correct password for the good")

records = secrets_manager.get_secrets()
self.assertEqual(1, len(records), "didn't get 1 record for the bad")
self.assertIsNone(records[0].password, "password is defined for the bad")
self.assertEqual(
1, len(records), "didn't get 1 record for the bad")
self.assertIsNone(records[0].password,
"password is defined for the bad")

records = secrets_manager.get_secrets()
self.assertEqual(1, len(records), "didn't get 1 record for the ugly")
self.assertIsNone(records[0].password, "password is defined for the ugly")
self.assertEqual(
1, len(records), "didn't get 1 record for the ugly")
self.assertIsNone(records[0].password,
"password is defined for the ugly")
finally:
try:
os.unlink(fh.name)
Expand All @@ -77,17 +90,21 @@ def test_the_login_record_password(self):

def test_record_field(self):

rf = RecordField(field_type="login", value="test", label="Test", required=True, enforceGeneration=False,
rf = RecordField(field_type="login", value="test", label="Test",
required=True, enforceGeneration=False,
privacyScreen=True, complexity={"foo": "bar"})

value = helpers.obj_to_dict(rf)
self.assertEqual("login", value.get("type"), "type is not correct")
self.assertEqual(["test"], value.get("value"), "value is not correct")
self.assertEqual("Test", value.get("label"), "label is not correct")
self.assertTrue(value.get("required"), "required is not correct")
self.assertFalse(value.get("enforceGeneration"), "enforceGeneration is not correct")
self.assertTrue(value.get("privacyScreen"), "privacyScreen is not correct")
self.assertIsNotNone(value.get("complexity"), "complexity is not correct")
self.assertFalse(value.get("enforceGeneration"),
"enforceGeneration is not correct")
self.assertTrue(value.get("privacyScreen"),
"privacyScreen is not correct")
self.assertIsNotNone(value.get("complexity"),
"complexity is not correct")

rf = RecordField(field_type="login", value="test", privacyScreen=None)

Expand All @@ -96,7 +113,120 @@ def test_record_field(self):
self.assertEqual(["test"], value.get("value"), "value is not correct")
self.assertIsNone(value.get("label"), "label is not correct")
self.assertIsNone(value.get("required"), "required is not correct")
self.assertIsNone(value.get("enforceGeneration"), "enforceGeneration is not correct")
self.assertIsNone(value.get("enforceGeneration"),
"enforceGeneration is not correct")
assert "privacyScreen" not in value, "privacyScreen exists in dictionary"
self.assertIsNone(value.get("privacyScreen"), "privacyScreen is not correct")
self.assertIsNone(value.get("privacyScreen"),
"privacyScreen is not correct")
self.assertIsNone(value.get("complexity"), "complexity is not correct")

def test_add_custom_field_by_param(self):

try:
with tempfile.NamedTemporaryFile("w", delete=False) as fh:
fh.write(MockConfig.make_json())
fh.seek(0)
secrets_manager = SecretsManager(config=FileKeyValueStorage(config_file_location=fh.name))

res = mock.Response()
mock_record = res.add_record(title="Good Record", record_type='login')
mock_record.field("login", "My Login")
mock_record.field("password", "My Password")

res_queue = mock.ResponseQueue(client=secrets_manager)
res_queue.add_response(res)

records = secrets_manager.get_secrets()
record = records[0]

record.add_custom_field(
field_type='text',
label="My Label",
value="My Value"
)
new_value = record.get_custom_field("My Label")
self.assertEqual("text", new_value.get("type"))
self.assertEqual("My Label", new_value.get("label"))
self.assertEqual(['My Value'], new_value.get("value"))
finally:
try:
os.unlink(fh.name)
except OSError:
pass

def test_add_custom_field_by_field_type(self):

class FieldType:

def __init__(self, field_type, label, value):
self.field_type = field_type
self.label = label
self.value = value

def to_dict(self):
return {
"type": self.field_type,
"label": self.label,
"value": self.value
}

try:
with tempfile.NamedTemporaryFile("w", delete=False) as fh:
fh.write(MockConfig.make_json())
fh.seek(0)
secrets_manager = SecretsManager(config=FileKeyValueStorage(config_file_location=fh.name))

res = mock.Response()
mock_record = res.add_record(title="Good Record", record_type='login')
mock_record.field("login", "My Login")
mock_record.field("password", "My Password")

res_queue = mock.ResponseQueue(client=secrets_manager)
res_queue.add_response(res)

records = secrets_manager.get_secrets()
record = records[0]

field = FieldType(
field_type="text",
label="My Label",
value=["My Value"]
)

record.add_custom_field(field=field)

new_value = record.get_custom_field("My Label")
self.assertEqual("text", new_value.get("type"))
self.assertEqual("My Label", new_value.get("label"))
self.assertEqual(['My Value'], new_value.get("value"))
finally:
try:
os.unlink(fh.name)
except OSError:
pass

def test_missing_fields_section(self):
""" Test for clients that may set "fields": null in JSON data """

try:
with tempfile.NamedTemporaryFile("w", delete=False) as fh:
fh.write(MockConfig.make_json())
fh.seek(0)
secrets_manager = SecretsManager(
config=FileKeyValueStorage(config_file_location=fh.name))

res = mock.Response()
rec = res.add_record(title="MyLogin", record_type='login')
res.records[rec.uid]._fields = None
res_queue = mock.ResponseQueue(client=secrets_manager)
res_queue.add_response(res)

records = secrets_manager.get_secrets()
self.assertEqual(
1, len(records), "didn't get 1 record for MyLogin")
self.assertEqual([], records[0].dict.get('fields'))
finally:
try:
os.unlink(fh.name)
except OSError:
pass

0 comments on commit 67b1dc4

Please sign in to comment.