diff --git a/CHANGELOG.md b/CHANGELOG.md index 197aa4c5..ef48d36a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ All notable changes to the Zowe Client Python SDK will be documented in this fil - Feature: Added method to load profile properties from environment variables - Feature: Added a CredentialManager class to securely retrieve values from credentials and manage multiple credential entries on Windows [#134](https://github.com/zowe/zowe-client-python-sdk/issues/134) +- Feature: Added method to Save profile properties to zowe.config.json file [#73](https://github.com/zowe/zowe-client-python-sdk/issues/73) +- Feature: Added method to Save secure profile properties to vault [#72](https://github.com/zowe/zowe-client-python-sdk/issues/72) - Bugfix: Fixed issue for datasets and jobs with special characters in URL [#211] (https://github.com/zowe/zowe-client-python-sdk/issues/211) - Bugfix: Fixed exception handling in session.py [#213] (https://github.com/zowe/zowe-client-python-sdk/issues/213) - BugFix: Validation of zowe.config.json file matching the schema [#192](https://github.com/zowe/zowe-client-python-sdk/issues/192) diff --git a/src/core/zowe/core_for_zowe_sdk/config_file.py b/src/core/zowe/core_for_zowe_sdk/config_file.py index b9e01b3d..265a08ae 100644 --- a/src/core/zowe/core_for_zowe_sdk/config_file.py +++ b/src/core/zowe/core_for_zowe_sdk/config_file.py @@ -44,6 +44,8 @@ # Profile datatype is used by ConfigFile to return Profile Data along with # metadata such as profile_name and secure_props_not_found + + class Profile(NamedTuple): data: dict = {} name: str = "" @@ -135,7 +137,7 @@ def init_from_file( # loading secure props is done in load_profile_properties # since we want to try loading secure properties only when # we know that the profile has saved properties - # self.load_secure_props() + # CredentialManager.load_secure_props() def validate_schema( self @@ -230,7 +232,6 @@ def get_profile( profile_name = self.get_profilename_from_profiletype( profile_type=profile_type ) - props: dict = self.load_profile_properties(profile_name=profile_name) return Profile(props, profile_name, self._missing_secure_props) @@ -334,8 +335,6 @@ def load_profile_properties(self, profile_name: str) -> dict: Load exact profile properties (without prepopulated fields from base profile) from the profile dict and populate fields from the secure credentials storage """ - # if self.profiles is None: - # self.init_from_file() props = {} lst = profile_name.split(".") secure_fields: list = [] @@ -344,16 +343,15 @@ def load_profile_properties(self, profile_name: str) -> dict: profile_name = ".".join(lst) profile = self.find_profile(profile_name, self.profiles) if profile is not None: - props = { **profile.get("properties", {}), **props } + props = {**profile.get("properties", {}), **props} secure_fields.extend(profile.get("secure", [])) else: warnings.warn( - f"Profile {profile_name} not found", - ProfileNotFoundWarning - ) + f"Profile {profile_name} not found", + ProfileNotFoundWarning + ) lst.pop() - # load secure props only if there are secure fields if secure_fields: CredentialManager.load_secure_props() @@ -363,7 +361,7 @@ def load_profile_properties(self, profile_name: str) -> dict: if re.match( "profiles\\." + profile_name + "\\.properties\\.[a-z]+", key ): - property_name = key.split(".")[3] + property_name = key.split(".")[-1] if property_name in secure_fields: props[property_name] = value secure_fields.remove(property_name) @@ -372,3 +370,146 @@ def load_profile_properties(self, profile_name: str) -> dict: # self._missing_secure_props.extend(secure_fields) return props + + def __set_or_create_nested_profile(self, profile_name, profile_data): + """ + Set or create a nested profile. + """ + path = self.get_profile_path_from_name(profile_name) + keys = path.split(".")[1:] + nested_profiles = self.profiles + for key in keys: + nested_profiles = nested_profiles.setdefault(key, {}) + nested_profiles.update(profile_data) + + def __is_secure(self, json_path: str, property_name: str) -> bool: + """ + Check whether the given JSON path corresponds to a secure property. + + Parameters: + json_path (str): The JSON path of the property to check. + property_name (str): The name of the property to check. + + Returns: + bool: True if the property should be stored securely, False otherwise. + """ + + profile = self.find_profile(json_path, self.profiles) + if profile and profile.get("secure"): + return property_name in profile["secure"] + return False + + def set_property(self, json_path, value, secure=None) -> None: + """ + Set a property in the profile, storing it securely if necessary. + + Parameters: + json_path (str): The JSON path of the property to set. + value (str): The value to be set for the property. + profile_name (str): The name of the profile to set the property in. + secure (bool): If True, the property will be stored securely. Default is None. + """ + if self.profiles is None: + self.init_from_file() + + # Checking whether the property should be stored securely or in plain text + property_name = json_path.split(".")[-1] + profile_name = self.get_profile_name_from_path(json_path) + # check if the property is already secure + is_property_secure = self.__is_secure(profile_name, property_name) + is_secure = secure if secure is not None else is_property_secure + + current_profile = self.find_profile(profile_name, self.profiles) or {} + current_properties = current_profile.setdefault("properties", {}) + current_secure = current_profile.setdefault("secure", []) + if is_secure: + CredentialManager.load_secure_props() + if not is_property_secure: + current_secure.append(property_name) + + CredentialManager.secure_props[self.filepath] = { + **CredentialManager.secure_props.get(self.filepath, {}), json_path: value} + current_properties.pop(property_name, None) + + else: + if is_property_secure: + CredentialManager.secure_props[self.filepath].pop(json_path,None) + current_secure.remove(property_name) + current_properties[property_name] = value + + current_profile["properties"] = current_properties + current_profile["secure"] = current_secure + self.__set_or_create_nested_profile(profile_name, current_profile) + + def set_profile(self, profile_path: str, profile_data: dict) -> None: + """ + Set a profile in the config file. + + Parameters: + profile_path (str): The path of the profile to be set. eg: profiles.zosmf + profile_data (dict): The data to be set for the profile. + """ + if self.profiles is None: + self.init_from_file() + profile_name = self.get_profile_name_from_path(profile_path) + if "secure" in profile_data: + # Checking if the profile has a 'secure' field with values + secure_fields = profile_data["secure"] + current_profile = self.find_profile(profile_name,self.profiles) or {} + existing_secure_fields = current_profile.get("secure", []) + new_secure_fields = [field for field in secure_fields if field not in existing_secure_fields] + + # JSON paths for new secure properties and store their values in CredentialManager.secure_props + CredentialManager.load_secure_props() + CredentialManager.secure_props[self.filepath] = {} + for field in new_secure_fields: + json_path = f"{profile_path}.properties.{field}" + profile_value = profile_data["properties"][field] + CredentialManager.secure_props[self.filepath][json_path] = profile_value + # Updating the 'secure' field of the profile with the combined list of secure fields + profile_data["secure"] = existing_secure_fields + new_secure_fields + # If a field is provided in the 'secure' list and its value exists in 'profile_data', remove it + profile_data["properties"] = { + field: value + for field, value in profile_data.get("properties", {}).items() + if field not in profile_data["secure"] + } + self.__set_or_create_nested_profile(profile_name, profile_data) + + + def save(self, secure_props=True): + """ + Save the config file to disk. and secure props to vault + parameters: + secure_props (bool): If True, the secure properties will be stored in the vault. Default is False. + Returns: + None + """ + # Updating the config file with any changes + if self.profiles is None: + try: + self.init_from_file() + except FileNotFoundError: + pass + + elif any(self.profiles.values()): + with open(self.filepath, 'w') as file: + self.jsonc["profiles"] = self.profiles + commentjson.dump(self.jsonc, file, indent=4) + if secure_props: + CredentialManager.save_secure_props() + + + def get_profile_name_from_path(self, path: str) -> str: + """ + Get the name of the profile from the given path. + """ + segments = path.split(".") + profile_name = ".".join(segments[i] for i in range(1, len(segments), 2) if segments[i - 1] != "properties") + return profile_name + + def get_profile_path_from_name(self, short_path: str) -> str: + """ + Get the path of the profile from the given name. + """ + return re.sub(r'(^|\.)', r'\1profiles.', short_path) \ No newline at end of file diff --git a/src/core/zowe/core_for_zowe_sdk/profile_manager.py b/src/core/zowe/core_for_zowe_sdk/profile_manager.py index aa49b954..fc57c364 100644 --- a/src/core/zowe/core_for_zowe_sdk/profile_manager.py +++ b/src/core/zowe/core_for_zowe_sdk/profile_manager.py @@ -17,6 +17,7 @@ from typing import Optional from .config_file import ConfigFile, Profile +from .credential_manager import CredentialManager from .custom_warnings import ( ConfigNotFoundWarning, ProfileNotFoundWarning, @@ -55,7 +56,8 @@ def __init__(self, appname: str = "zowe", show_warnings: bool = True): self.project_config = ConfigFile(type=TEAM_CONFIG, name=appname) self.project_user_config = ConfigFile(type=USER_CONFIG, name=appname) - self.global_config = ConfigFile(type=TEAM_CONFIG, name=GLOBAL_CONFIG_NAME) + self.global_config = ConfigFile( + type=TEAM_CONFIG, name=GLOBAL_CONFIG_NAME) try: self.global_config.location = GLOBAl_CONFIG_LOCATION except Exception: @@ -64,7 +66,8 @@ def __init__(self, appname: str = "zowe", show_warnings: bool = True): ConfigNotFoundWarning, ) - self.global_user_config = ConfigFile(type=USER_CONFIG, name=GLOBAL_CONFIG_NAME) + self.global_user_config = ConfigFile( + type=USER_CONFIG, name=GLOBAL_CONFIG_NAME) try: self.global_user_config.location = GLOBAl_CONFIG_LOCATION except Exception: @@ -325,3 +328,97 @@ def load( profile_props[k] = env_var[k] return profile_props + + def get_highest_priority_layer(self, json_path: str) -> Optional[ConfigFile]: + """ + Get the highest priority layer (configuration file) based on the given profile name + + Parameters: + profile_name (str): The name of the profile to look for in the layers. + + Returns: + Optional[ConfigFile]: The highest priority layer (configuration file) that contains the specified profile, + or None if the profile is not found in any layer. + """ + highest_layer = None + longest_match = "" + layers = [ + self.project_user_config, + self.project_config, + self.global_user_config, + self.global_config + ] + + original_name = layers[0].get_profile_name_from_path(json_path) + + for layer in layers: + try: + layer.init_from_file() + except FileNotFoundError: + continue + parts = original_name.split(".") + current_name = "" + + while parts: + current_name = ".".join(parts) + profile = layer.find_profile(current_name, layer.profiles) + + if profile is not None and len(current_name) > len(longest_match): + highest_layer = layer + longest_match = current_name + + else: + parts.pop() + if original_name == longest_match: + break + + if highest_layer is None: + highest_layer = layer + + if highest_layer is None: + raise FileNotFoundError(f"Could not find a valid layer for {json_path}") + + return highest_layer + + + def set_property(self, json_path, value, secure=None) -> None: + """ + Set a property in the profile, storing it securely if necessary. + + Parameters: + json_path (str): The JSON path of the property to set. + value (str): The value to be set for the property. + secure (bool): If True, the property will be stored securely. Default is None. + """ + + # highest priority layer for the given profile name + highest_priority_layer = self.get_highest_priority_layer(json_path) + + # Set the property in the highest priority layer + + highest_priority_layer.set_property(json_path, value, secure=secure) + + def set_profile(self, profile_path: str, profile_data: dict) -> None: + """ + Set a profile in the highest priority layer (configuration file) based on the given profile name + + Parameters: + profile_path (str): TThe path of the profile to be set. eg: profiles.zosmf + profile_data (dict): The data of the profile to set. + """ + highest_priority_layer = self.get_highest_priority_layer(profile_path) + + highest_priority_layer.set_profile(profile_path, profile_data) + + def save(self) -> None: + """ + Save the layers (configuration files) to disk. + """ + layers = [self.project_user_config, + self.project_config, + self.global_user_config, + self.global_config] + + for layer in layers: + layer.save(False) + CredentialManager.save_secure_props() diff --git a/tests/unit/test_zowe_core.py b/tests/unit/test_zowe_core.py index bf71ef6f..963ce129 100644 --- a/tests/unit/test_zowe_core.py +++ b/tests/unit/test_zowe_core.py @@ -13,7 +13,7 @@ from jsonschema import validate, ValidationError, SchemaError from pyfakefs.fake_filesystem_unittest import TestCase from unittest import mock -from unittest.mock import patch +from unittest.mock import call, patch from zowe.core_for_zowe_sdk.validators import validate_config_json from zowe.core_for_zowe_sdk import ( @@ -700,7 +700,255 @@ def test_profile_loading_with_env_variables(self, get_pass_func): } self.assertEqual(props, expected_props) + def test_get_highest_priority_layer(self): + """ + Test that get_highest_priority_layer returns the highest priority layer with a valid profile data dictionary. + """ + # Set up mock profiles in the layers + project_user_config = mock.MagicMock(spec=ConfigFile) + project_user_config.find_profile.return_value = mock.MagicMock() + project_user_config.find_profile.return_value.data = {"profiles": "zosmf"} + + # Set up the ProfileManager + profile_manager = ProfileManager() + profile_manager.project_user_config = project_user_config + project_user_config.get_profile_name_from_path.return_value = "zosmf" + # Call the function being tested + result_layer = profile_manager.get_highest_priority_layer("zosmf") + + # Assert the results + self.assertEqual(result_layer, project_user_config) + + @patch("zowe.core_for_zowe_sdk.ProfileManager.get_highest_priority_layer") + def test_profile_manager_set_property(self, get_highest_priority_layer_mock): + """ + Test that set_property calls the set_property method of the highest priority layer. + """ + json_path = "profiles.zosmf.properties.user" + value = "samadpls" + secure = True + + # Set up mock for the highest priority layer + highest_priority_layer_mock = mock.MagicMock(spec=ConfigFile) + get_highest_priority_layer_mock.return_value = highest_priority_layer_mock + + profile_manager = ProfileManager() + + # Mock the behavior of _set_property method in highest_priority_layer + highest_priority_layer_mock.set_property.return_value = None + + # Call the method being tested + profile_manager.set_property(json_path, value, secure) + + # Assert the method calls + highest_priority_layer_mock.set_property.assert_called_with(json_path, value, secure=secure) + + + @patch("zowe.core_for_zowe_sdk.ConfigFile.save") + @patch("zowe.core_for_zowe_sdk.CredentialManager.save_secure_props") + def test_profile_manager_save(self, mock_save_secure_props, mock_save): + """ + Test that save calls the save method of all layers. + """ + profile_manager = ProfileManager() + profile_manager.save() + expected_calls = [call(False) for _ in range(4)] + mock_save.assert_has_calls(expected_calls) + mock_save_secure_props.assert_called_once() + + @mock.patch("zowe.core_for_zowe_sdk.ProfileManager.get_highest_priority_layer") + def test_profile_manager_set_profile(self, get_highest_priority_layer_mock): + """ + Test that set_profile calls the set_profile method of the highest priority layer. + """ + profile_path = "profiles.zosmf" + profile_data = { + "properties": { + "user": "admin", + "password": "password1" + } + } + + highest_priority_layer_mock = mock.MagicMock(spec=ConfigFile) + get_highest_priority_layer_mock.return_value = highest_priority_layer_mock + profile_manager = ProfileManager() + + highest_priority_layer_mock.set_profile.return_value = None + profile_manager.set_profile(profile_path, profile_data) + + highest_priority_layer_mock.set_profile.assert_called_with(profile_path, profile_data) + + @mock.patch("zowe.core_for_zowe_sdk.ConfigFile.get_profile_path_from_name") + def test_set_or_create_nested_profile(self, mock_get_profile_path): + """ + Test that __set_or_create_nested_profile calls the get_profile_path_from_name method and sets the profile data. + """ + mock_get_profile_path.return_value = "profiles.zosmf" + config_file = ConfigFile( name="zowe_abcd", type="User Config", profiles={}) + profile_data = { + "properties": { + "user": "samadpls", + "password": "password1" + } + } + config_file._ConfigFile__set_or_create_nested_profile("zosmf", profile_data) + expected_profiles = { + "zosmf": { + "properties": { + "user": "samadpls", + "password": "password1" + } + } + } + self.assertEqual(config_file.profiles, expected_profiles) + + @mock.patch("zowe.core_for_zowe_sdk.ConfigFile.find_profile") + def test_is_secure(self, mock_find_profile): + """ + Test that __is_secure returns True if the property is secure and False otherwise. + """ + config_file = ConfigFile(name="zowe_abcd", type="User Config", profiles={}) + mock_find_profile.return_value = { + "properties": {"user":"samadpls"}, + "secure": ["password"] + } + is_secure_secure = config_file._ConfigFile__is_secure("zosmf", "password") + is_secure_non_secure = config_file._ConfigFile__is_secure("zosmf", "user") + + self.assertTrue(is_secure_secure) + self.assertFalse(is_secure_non_secure) + + @mock.patch("zowe.core_for_zowe_sdk.ConfigFile.get_profile_name_from_path") + @mock.patch("zowe.core_for_zowe_sdk.ConfigFile.find_profile") + @mock.patch("zowe.core_for_zowe_sdk.ConfigFile._ConfigFile__is_secure") + @patch("keyring.get_password", side_effect=keyring_get_password) + def test_config_file_set_property(self, get_pass_func, mock_is_secure, mock_find_profile, mock_get_profile_name): + """ + Test that set_property calls the __is_secure, find_profile and get_profile_name_from_path methods. + """ + cwd_up_dir_path = os.path.dirname(CWD) + cwd_up_file_path = os.path.join(cwd_up_dir_path, "zowe.config.json") + os.chdir(CWD) + shutil.copy(self.original_file_path, cwd_up_file_path) + self.setUpCreds(cwd_up_file_path, { + "profiles.zosmf.properties.user": "admin" + }) + config_file = ConfigFile(name="zowe_abcd", type="User Config", profiles= {}) + mock_is_secure.return_value = False + mock_find_profile.return_value = {"properties": {"port": 1443}, "secure": []} + mock_get_profile_name.return_value = "zosmf" + + config_file.set_property("profiles.zosmf.properties.user", "admin", secure=True) + + mock_is_secure.assert_called_with("zosmf", "user") + mock_find_profile.assert_called_with("zosmf", config_file.profiles) + mock_get_profile_name.assert_called_with("profiles.zosmf.properties.user") + self.assertEqual(config_file.profiles, { + "zosmf": { + "properties": {"port": 1443}, + "secure": ["user"] + } + }) + + def test_get_profile_name_from_path(self): + """ + Test that get_profile_name_from_path returns the profile name from the path. + """ + config_file = ConfigFile(name="zowe_abcd", type="User Config") + profile_name = config_file.get_profile_name_from_path("profiles.lpar1.profiles.zosmf.properties.user") + self.assertEqual(profile_name, "lpar1.zosmf") + + def test_get_profile_path_from_name(self): + """ + Test that get_profile_path_from_name returns the profile path from the name. + """ + config_file = ConfigFile(name="zowe_abcd", type="User Config") + profile_path_1 = config_file.get_profile_path_from_name("lpar1.zosmf") + self.assertEqual(profile_path_1, "profiles.lpar1.profiles.zosmf") + + @patch("keyring.get_password", side_effect=keyring_get_password) + def test_config_file_set_profile(self,get_pass_func): + """ + Test the set_profile method. + """ + cwd_up_dir_path = os.path.dirname(CWD) + cwd_up_file_path = os.path.join(cwd_up_dir_path, "zowe.config.json") + os.chdir(CWD) + shutil.copy(self.original_file_path, cwd_up_file_path) + self.setUpCreds(cwd_up_file_path, { + "profiles.zosmf.properties.user": "abc", + "profiles.zosmf.properties.password": "def" + }) + initial_profiles = { + "zosmf": { + "properties": { + "port": 1443 + }, + "secure": [] + } + } + config_file = ConfigFile("User Config", "zowe.config.json", cwd_up_dir_path , profiles=initial_profiles) + profile_data = { + "type": "zosmf", + "properties": { + "port": 443, + "user": "abc", + "password": "def" + }, + "secure": ["user", "password"] + } + + with patch("zowe.core_for_zowe_sdk.ConfigFile.get_profile_name_from_path", return_value="zosmf"): + with patch("zowe.core_for_zowe_sdk.ConfigFile.find_profile", return_value=initial_profiles["zosmf"]): + config_file.set_profile("profiles.zosmf", profile_data) + + expected_secure_props = { + cwd_up_file_path: { + "profiles.zosmf.properties.user": "abc", + "profiles.zosmf.properties.password": "def" + } + } + expected_profiles = { + "zosmf": { + 'type': 'zosmf', + "properties": { + "port": 443, + }, + "secure": ["user", "password"] + } + } + self.assertEqual(CredentialManager.secure_props, expected_secure_props) + self.assertEqual(config_file.profiles, expected_profiles) + + @patch("zowe.core_for_zowe_sdk.CredentialManager.save_secure_props") + def test_config_file_save(self, mock_save_secure_props): + """ + Test saving a config file with secure properties. + """ + cwd_up_dir_path = os.path.dirname(CWD) + cwd_up_file_path = os.path.join(cwd_up_dir_path, "zowe.config.json") + os.chdir(CWD) + shutil.copy(self.original_file_path, cwd_up_file_path) + config_data = { + "profiles": { + "zosmf": { + "properties": { + "user": "admin", + "port": 1443 + }, + "secure": ["user"] + } + } + } + with patch("builtins.open", mock.mock_open()) as mock_file: + config_file = ConfigFile("User Config", "zowe.config.json", cwd_up_dir_path , profiles=config_data.copy()) + config_file.jsonc = config_data + config_file.save() + mock_save_secure_props.assert_called_once() + mock_file.assert_called_once_with(cwd_up_file_path, 'w') + mock_file.return_value.__enter__.return_value.write.asser_called() + class TestValidateConfigJsonClass(unittest.TestCase): """Testing the validate_config_json function""" @@ -733,3 +981,5 @@ def test_validate_config_json_invalid(self): validate_config_json(path_to_invalid_config, path_to_invalid_schema, cwd = FIXTURES_PATH) self.assertEqual(str(actual_info.exception), str(expected_info.exception)) + + \ No newline at end of file