Skip to content

Commit

Permalink
Merge branch 'main' into merge_layer
Browse files Browse the repository at this point in the history
Signed-off-by: Timothy Johnson <[email protected]>
  • Loading branch information
t1m0thyj committed Sep 28, 2023
2 parents 1f4821a + ebc5c3f commit 5da5f80
Show file tree
Hide file tree
Showing 4 changed files with 500 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ All notable changes to the Zowe Client Python SDK will be documented in this fil
- Feature: Added method to load profile properties from environment variables
- Bugfix: Fixed exception handling in session.py [#213] (https://github.com/zowe/zowe-client-python-sdk/issues/213)
- Feature: Added a CredentialManager class to securely retrieve values from credentials and manage multiple credential entries on Windows [#134](https://github.com/zowe/zowe-client-python-sdk/issues/134)
- Feature: Added method to Save profile properties to zowe.config.json file [#73](https://github.com/zowe/zowe-client-python-sdk/issues/73)
- Feature: Added method to Save secure profile properties to vault [#72](https://github.com/zowe/zowe-client-python-sdk/issues/72)
- Bugfix: Fixed issue for datasets and jobs with special characters in URL [#211] (https://github.com/zowe/zowe-client-python-sdk/issues/211)
- Feature: Added method to load profile properties from environment variables
- BugFix: Validation of zowe.config.json file matching the schema [#192](https://github.com/zowe/zowe-client-python-sdk/issues/192)
160 changes: 148 additions & 12 deletions src/core/zowe/core_for_zowe_sdk/config_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

# Profile datatype is used by ConfigFile to return Profile Data along with
# metadata such as profile_name and secure_props_not_found


class Profile(NamedTuple):
data: dict = {}
name: str = ""
Expand Down Expand Up @@ -236,7 +238,6 @@ def get_profile(
profile_name = self.get_profilename_from_profiletype(
profile_type=profile_type
)

props: dict = self.load_profile_properties(profile_name=profile_name)

return Profile(props, profile_name, self._missing_secure_props)
Expand Down Expand Up @@ -340,8 +341,6 @@ def load_profile_properties(self, profile_name: str) -> dict:
Load exact profile properties (without prepopulated fields from base profile)
from the profile dict and populate fields from the secure credentials storage
"""
# if self.profiles is None:
# self.init_from_file()
props = {}
lst = profile_name.split(".")
secure_fields: list = []
Expand All @@ -350,24 +349,23 @@ def load_profile_properties(self, profile_name: str) -> dict:
profile_name = ".".join(lst)
profile = self.find_profile(profile_name, self.profiles)
if profile is not None:
props = { **profile.get("properties", {}), **props }
props = {**profile.get("properties", {}), **props}
secure_fields.extend(profile.get("secure", []))
else:
warnings.warn(
f"Profile {profile_name} not found",
ProfileNotFoundWarning
)
f"Profile {profile_name} not found",
ProfileNotFoundWarning
)
lst.pop()


# load secure props only if there are secure fields
if secure_fields and self.secure_props:
# load properties with key as profile.{profile_name}.properties.{*}
for (key, value) in self.secure_props.items():
if re.match(
"profiles\\." + profile_name + "\\.properties\\.[a-z]+", key
):
property_name = key.split(".")[3]
property_name = key.split(".")[-1]
if property_name in secure_fields:
props[property_name] = value
secure_fields.remove(property_name)
Expand All @@ -377,7 +375,145 @@ def load_profile_properties(self, profile_name: str) -> dict:

return props

def load_secure_properties(self, profile_name: str):
secure_props = {}
lst = profile_name.split(".")
def __set_or_create_nested_profile(self, profile_name, profile_data):
"""
Set or create a nested profile.
"""
path = self.get_profile_path_from_name(profile_name)
keys = path.split(".")[1:]
nested_profiles = self.profiles
for key in keys:
nested_profiles = nested_profiles.setdefault(key, {})
nested_profiles.update(profile_data)

def __is_secure(self, json_path: str, property_name: str) -> bool:
"""
Check whether the given JSON path corresponds to a secure property.
Parameters:
json_path (str): The JSON path of the property to check.
property_name (str): The name of the property to check.
Returns:
bool: True if the property should be stored securely, False otherwise.
"""

profile = self.find_profile(json_path, self.profiles)
if profile and profile.get("secure"):
return property_name in profile["secure"]
return False

def set_property(self, json_path, value, secure=None) -> None:
"""
Set a property in the profile, storing it securely if necessary.
Parameters:
json_path (str): The JSON path of the property to set.
value (str): The value to be set for the property.
profile_name (str): The name of the profile to set the property in.
secure (bool): If True, the property will be stored securely. Default is None.
"""
if self.profiles is None:
self.init_from_file()

# Checking whether the property should be stored securely or in plain text
property_name = json_path.split(".")[-1]
profile_name = self.get_profile_name_from_path(json_path)
# check if the property is already secure
is_property_secure = self.__is_secure(profile_name, property_name)
is_secure = secure if secure is not None else is_property_secure

current_profile = self.find_profile(profile_name, self.profiles) or {}
current_properties = current_profile.setdefault("properties", {})
current_secure = current_profile.setdefault("secure", [])
if is_secure:
CredentialManager.load_secure_props()
if not is_property_secure:
current_secure.append(property_name)

CredentialManager.secure_props[self.filepath] = {
**CredentialManager.secure_props.get(self.filepath, {}), json_path: value}
current_properties.pop(property_name, None)

else:
if is_property_secure:
CredentialManager.secure_props[self.filepath].pop(json_path,None)
current_secure.remove(property_name)
current_properties[property_name] = value

current_profile["properties"] = current_properties
current_profile["secure"] = current_secure
self.__set_or_create_nested_profile(profile_name, current_profile)

def set_profile(self, profile_path: str, profile_data: dict) -> None:
"""
Set a profile in the config file.
Parameters:
profile_path (str): The path of the profile to be set. eg: profiles.zosmf
profile_data (dict): The data to be set for the profile.
"""
if self.profiles is None:
self.init_from_file()
profile_name = self.get_profile_name_from_path(profile_path)
if "secure" in profile_data:
# Checking if the profile has a 'secure' field with values
secure_fields = profile_data["secure"]
current_profile = self.find_profile(profile_name,self.profiles) or {}
existing_secure_fields = current_profile.get("secure", [])
new_secure_fields = [field for field in secure_fields if field not in existing_secure_fields]

# JSON paths for new secure properties and store their values in CredentialManager.secure_props
CredentialManager.load_secure_props()
CredentialManager.secure_props[self.filepath] = {}
for field in new_secure_fields:
json_path = f"{profile_path}.properties.{field}"
profile_value = profile_data["properties"][field]
CredentialManager.secure_props[self.filepath][json_path] = profile_value
# Updating the 'secure' field of the profile with the combined list of secure fields
profile_data["secure"] = existing_secure_fields + new_secure_fields
# If a field is provided in the 'secure' list and its value exists in 'profile_data', remove it
profile_data["properties"] = {
field: value
for field, value in profile_data.get("properties", {}).items()
if field not in profile_data["secure"]
}
self.__set_or_create_nested_profile(profile_name, profile_data)


def save(self, secure_props=True):
"""
Save the config file to disk. and secure props to vault
parameters:
secure_props (bool): If True, the secure properties will be stored in the vault. Default is False.
Returns:
None
"""
# Updating the config file with any changes
if self.profiles is None:
try:
self.init_from_file()
except FileNotFoundError:
pass

elif any(self.profiles.values()):
with open(self.filepath, 'w') as file:
self.jsonc["profiles"] = self.profiles
commentjson.dump(self.jsonc, file, indent=4)
if secure_props:
CredentialManager.save_secure_props()


def get_profile_name_from_path(self, path: str) -> str:
"""
Get the name of the profile from the given path.
"""
segments = path.split(".")
profile_name = ".".join(segments[i] for i in range(1, len(segments), 2) if segments[i - 1] != "properties")
return profile_name

def get_profile_path_from_name(self, short_path: str) -> str:
"""
Get the path of the profile from the given name.
"""
return re.sub(r'(^|\.)', r'\1profiles.', short_path)
101 changes: 99 additions & 2 deletions src/core/zowe/core_for_zowe_sdk/profile_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from copy import deepcopy

from .config_file import ConfigFile, Profile
from .credential_manager import CredentialManager
from .custom_warnings import (
ConfigNotFoundWarning,
ProfileNotFoundWarning,
Expand Down Expand Up @@ -57,7 +58,8 @@ def __init__(self, appname: str = "zowe", show_warnings: bool = True):
self.project_config = ConfigFile(type=TEAM_CONFIG, name=appname)
self.project_user_config = ConfigFile(type=USER_CONFIG, name=appname)

self.global_config = ConfigFile(type=TEAM_CONFIG, name=GLOBAL_CONFIG_NAME)
self.global_config = ConfigFile(
type=TEAM_CONFIG, name=GLOBAL_CONFIG_NAME)
try:
self.global_config.location = GLOBAl_CONFIG_LOCATION
except Exception:
Expand All @@ -66,7 +68,8 @@ def __init__(self, appname: str = "zowe", show_warnings: bool = True):
ConfigNotFoundWarning,
)

self.global_user_config = ConfigFile(type=USER_CONFIG, name=GLOBAL_CONFIG_NAME)
self.global_user_config = ConfigFile(
type=USER_CONFIG, name=GLOBAL_CONFIG_NAME)
try:
self.global_user_config.location = GLOBAl_CONFIG_LOCATION
except Exception:
Expand Down Expand Up @@ -333,3 +336,97 @@ def load(
profile_props[k] = env_var[k]

return profile_props

def get_highest_priority_layer(self, json_path: str) -> Optional[ConfigFile]:
"""
Get the highest priority layer (configuration file) based on the given profile name
Parameters:
profile_name (str): The name of the profile to look for in the layers.
Returns:
Optional[ConfigFile]: The highest priority layer (configuration file) that contains the specified profile,
or None if the profile is not found in any layer.
"""
highest_layer = None
longest_match = ""
layers = [
self.project_user_config,
self.project_config,
self.global_user_config,
self.global_config
]

original_name = layers[0].get_profile_name_from_path(json_path)

for layer in layers:
try:
layer.init_from_file()
except FileNotFoundError:
continue
parts = original_name.split(".")
current_name = ""

while parts:
current_name = ".".join(parts)
profile = layer.find_profile(current_name, layer.profiles)

if profile is not None and len(current_name) > len(longest_match):
highest_layer = layer
longest_match = current_name

else:
parts.pop()
if original_name == longest_match:
break

if highest_layer is None:
highest_layer = layer

if highest_layer is None:
raise FileNotFoundError(f"Could not find a valid layer for {json_path}")

return highest_layer


def set_property(self, json_path, value, secure=None) -> None:
"""
Set a property in the profile, storing it securely if necessary.
Parameters:
json_path (str): The JSON path of the property to set.
value (str): The value to be set for the property.
secure (bool): If True, the property will be stored securely. Default is None.
"""

# highest priority layer for the given profile name
highest_priority_layer = self.get_highest_priority_layer(json_path)

# Set the property in the highest priority layer

highest_priority_layer.set_property(json_path, value, secure=secure)

def set_profile(self, profile_path: str, profile_data: dict) -> None:
"""
Set a profile in the highest priority layer (configuration file) based on the given profile name
Parameters:
profile_path (str): TThe path of the profile to be set. eg: profiles.zosmf
profile_data (dict): The data of the profile to set.
"""
highest_priority_layer = self.get_highest_priority_layer(profile_path)

highest_priority_layer.set_profile(profile_path, profile_data)

def save(self) -> None:
"""
Save the layers (configuration files) to disk.
"""
layers = [self.project_user_config,
self.project_config,
self.global_user_config,
self.global_config]

for layer in layers:
layer.save(False)
CredentialManager.save_secure_props()
Loading

0 comments on commit 5da5f80

Please sign in to comment.