diff --git a/taf/api/roles.py b/taf/api/roles.py index 70234f28..22afa044 100644 --- a/taf/api/roles.py +++ b/taf/api/roles.py @@ -533,6 +533,7 @@ def rotate_signing_key( with transactional_execution(auth_repo): revoke_signing_key( path=path, + pin_manager=pin_manager, key_id=key_id, roles=roles, keystore=keystore, @@ -545,6 +546,7 @@ def rotate_signing_key( add_signing_key( path=path, + pin_manager=pin_manager, roles=roles, pub_key=pub_key, keystore=keystore, @@ -1057,6 +1059,7 @@ def remove_role( ) def remove_paths( path: str, + pin_manager: PinManager, paths: List[str], keystore: str, scheme: Optional[str] = DEFAULT_RSA_SIGNATURE_SCHEME, @@ -1082,7 +1085,7 @@ def remove_paths( Returns: True if the delegation existed, False otherwise """ - auth_repo = AuthenticationRepository(path=path) + auth_repo = AuthenticationRepository(path=path, pin_manager=pin_manager) paths_to_remove_from_roles = defaultdict(list) for path_to_remove in paths: delegated_role = auth_repo.get_role_from_target_paths([path_to_remove]) diff --git a/taf/tests/test_api/roles/cli/test_roles_cmds.py b/taf/tests/test_api/roles/cli/test_roles_cmds.py index 18905bdc..d8e22d7f 100644 --- a/taf/tests/test_api/roles/cli/test_roles_cmds.py +++ b/taf/tests/test_api/roles/cli/test_roles_cmds.py @@ -1,5 +1,3 @@ -import json - from pathlib import Path from click.testing import CliRunner diff --git a/taf/tests/test_api/targets/cli/test_targets_cmds.py b/taf/tests/test_api/targets/cli/test_targets_cmds.py index d93b501e..57002e73 100644 --- a/taf/tests/test_api/targets/cli/test_targets_cmds.py +++ b/taf/tests/test_api/targets/cli/test_targets_cmds.py @@ -8,74 +8,74 @@ from taf.tools.cli.taf import taf -# def test_targets_sign_when_target_file_is_added_expect_success( -# auth_repo_when_add_repositories_json, -# library, -# keystore_delegations, -# ): -# runner = CliRunner() - -# repo_path = library / "auth" -# FILENAME = "test.txt" -# file_path = repo_path / TARGETS_DIRECTORY_NAME / FILENAME -# file_path.write_text("test") - -# runner.invoke( -# taf, -# [ -# "targets", -# "sign", -# "--path", -# f"{str(auth_repo_when_add_repositories_json.path)}", -# "--keystore", -# f"{str(keystore_delegations)}", -# ], -# ) - -# check_if_targets_signed(auth_repo_when_add_repositories_json, "targets", FILENAME) - - -# def test_targets_sign_when_target_file_is_removed_expect_success( -# auth_repo_when_add_repositories_json, -# library, -# keystore_delegations, -# ): -# runner = CliRunner() - -# repo_path = library / "auth" -# FILENAME = "test.txt" -# file_path = repo_path / TARGETS_DIRECTORY_NAME / FILENAME -# file_path.write_text("test") - -# runner.invoke( -# taf, -# [ -# "targets", -# "sign", -# "--path", -# f"{str(auth_repo_when_add_repositories_json.path)}", -# "--keystore", -# f"{str(keystore_delegations)}", -# ], -# ) -# check_if_targets_signed(auth_repo_when_add_repositories_json, "targets", FILENAME) - -# file_path.unlink() - -# runner.invoke( -# taf, -# [ -# "targets", -# "sign", -# "--path", -# f"{str(auth_repo_when_add_repositories_json.path)}", -# "--keystore", -# f"{str(keystore_delegations)}", -# ], -# ) - -# signed_target_files = auth_repo_when_add_repositories_json.get_signed_target_files() -# assert FILENAME not in signed_target_files +def test_targets_sign_when_target_file_is_added_expect_success( + auth_repo_when_add_repositories_json, + library, + keystore_delegations, +): + runner = CliRunner() + + repo_path = library / "auth" + FILENAME = "test.txt" + file_path = repo_path / TARGETS_DIRECTORY_NAME / FILENAME + file_path.write_text("test") + + runner.invoke( + taf, + [ + "targets", + "sign", + "--path", + f"{str(auth_repo_when_add_repositories_json.path)}", + "--keystore", + f"{str(keystore_delegations)}", + ], + ) + + check_if_targets_signed(auth_repo_when_add_repositories_json, "targets", FILENAME) + + +def test_targets_sign_when_target_file_is_removed_expect_success( + auth_repo_when_add_repositories_json, + library, + keystore_delegations, +): + runner = CliRunner() + + repo_path = library / "auth" + FILENAME = "test.txt" + file_path = repo_path / TARGETS_DIRECTORY_NAME / FILENAME + file_path.write_text("test") + + runner.invoke( + taf, + [ + "targets", + "sign", + "--path", + f"{str(auth_repo_when_add_repositories_json.path)}", + "--keystore", + f"{str(keystore_delegations)}", + ], + ) + check_if_targets_signed(auth_repo_when_add_repositories_json, "targets", FILENAME) + + file_path.unlink() + + runner.invoke( + taf, + [ + "targets", + "sign", + "--path", + f"{str(auth_repo_when_add_repositories_json.path)}", + "--keystore", + f"{str(keystore_delegations)}", + ], + ) + + signed_target_files = auth_repo_when_add_repositories_json.get_signed_target_files() + assert FILENAME not in signed_target_files def test_targets_add_repo_cmd_expect_success( diff --git a/taf/tests/test_updater/test_update_library/conftest.py b/taf/tests/test_updater/test_update_library/conftest.py index 1d1a363d..5c0154d4 100644 --- a/taf/tests/test_updater/test_update_library/conftest.py +++ b/taf/tests/test_updater/test_update_library/conftest.py @@ -78,7 +78,9 @@ def library_with_dependencies(origin_dir, pin_manager, request): params, root_auth_repo.path / TARGETS_DIRECTORY_NAME / DEPENDENCIES_JSON_NAME, ) - sign_target_files(origin_dir, root_repo_name, keystore=KEYSTORE_PATH, pin_manager) + sign_target_files( + origin_dir, root_repo_name, keystore=KEYSTORE_PATH, pin_manager=pin_manager + ) library[root_auth_repo.name] = {"auth_repo": root_auth_repo, "target_repos": []} yield library diff --git a/taf/tuf/repository.py b/taf/tuf/repository.py index 41852fe4..0ce2bffa 100644 --- a/taf/tuf/repository.py +++ b/taf/tuf/repository.py @@ -1185,11 +1185,35 @@ def get_key_length_and_scheme_from_metadata( ) return pub_key, pub_key_pem, scheme except Exception: - return None, None + return None, None, None + + def get_key_names_from_metadata(self, parent_role: str) -> Tuple: + """ + Return length and signing scheme of the specified key id. + This data is specified in metadata files (root or a target role that has delegations) + """ + try: + metadata = json.loads( + Path( + self.path, METADATA_DIRECTORY_NAME, f"{parent_role}.json" + ).read_text() + ) + metadata = metadata["signed"] + if "delegations" in metadata: + metadata = metadata["delegations"] + + keys = metadata["keys"] + names = { + key_id: key_data["name"] + for key_id, key_data in keys.items() + if "name" in key_data + } + return names + except Exception: + return None def get_public_key_of_keyid(self, keyid: str): def _find_keyid(role_name, keyid): - _, pub_key_pem, scheme = self.get_key_length_and_scheme_from_metadata( role_name, keyid ) @@ -1439,14 +1463,28 @@ def modify_targets( return targets_role def load_key_names(self): + def _get_keys_of_delegations(role_name): + keys = {} + role_keys = self.get_key_names_from_metadata(role_name) + if role_keys is not None: + keys.update(role_keys) + + for delegation in self.get_delegations_of_role(role_name): + delegated_signed = self.signed_obj(delegation) + if delegated_signed.delegations: + inner_roles_keys = _get_keys_of_delegations(delegation) + if inner_roles_keys: + keys.update(inner_roles_keys) + return keys + root_metadata = self.signed_obj("root") - # target_roles = self.get_all_targets_roles() name_mapping = {} keys = root_metadata.keys for key_id, key_obj in keys.items(): name_data = key_obj.unrecognized_fields if name_data is not None and "name" in name_data: name_mapping[key_id] = name_data["name"] + name_mapping.update(_get_keys_of_delegations("targets")) return name_mapping def _modify_targets_role(