Skip to content

Commit

Permalink
fix: iterate over target roles while looking for keys
Browse files Browse the repository at this point in the history
  • Loading branch information
renatav committed Jan 29, 2025
1 parent 09ef0cc commit 891905d
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 75 deletions.
5 changes: 4 additions & 1 deletion taf/api/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ def rotate_signing_key(
with transactional_execution(auth_repo):
revoke_signing_key(
path=path,
pin_manager=pin_manager,
key_id=key_id,
roles=roles,
keystore=keystore,
Expand All @@ -545,6 +546,7 @@ def rotate_signing_key(

add_signing_key(
path=path,
pin_manager=pin_manager,
roles=roles,
pub_key=pub_key,
keystore=keystore,
Expand Down Expand Up @@ -1057,6 +1059,7 @@ def remove_role(
)
def remove_paths(
path: str,
pin_manager: PinManager,
paths: List[str],
keystore: str,
scheme: Optional[str] = DEFAULT_RSA_SIGNATURE_SCHEME,
Expand All @@ -1082,7 +1085,7 @@ def remove_paths(
Returns:
True if the delegation existed, False otherwise
"""
auth_repo = AuthenticationRepository(path=path)
auth_repo = AuthenticationRepository(path=path, pin_manager=pin_manager)
paths_to_remove_from_roles = defaultdict(list)
for path_to_remove in paths:
delegated_role = auth_repo.get_role_from_target_paths([path_to_remove])
Expand Down
2 changes: 0 additions & 2 deletions taf/tests/test_api/roles/cli/test_roles_cmds.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json

from pathlib import Path

from click.testing import CliRunner
Expand Down
136 changes: 68 additions & 68 deletions taf/tests/test_api/targets/cli/test_targets_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,74 +8,74 @@
from taf.tools.cli.taf import taf


# def test_targets_sign_when_target_file_is_added_expect_success(
# auth_repo_when_add_repositories_json,
# library,
# keystore_delegations,
# ):
# runner = CliRunner()

# repo_path = library / "auth"
# FILENAME = "test.txt"
# file_path = repo_path / TARGETS_DIRECTORY_NAME / FILENAME
# file_path.write_text("test")

# runner.invoke(
# taf,
# [
# "targets",
# "sign",
# "--path",
# f"{str(auth_repo_when_add_repositories_json.path)}",
# "--keystore",
# f"{str(keystore_delegations)}",
# ],
# )

# check_if_targets_signed(auth_repo_when_add_repositories_json, "targets", FILENAME)


# def test_targets_sign_when_target_file_is_removed_expect_success(
# auth_repo_when_add_repositories_json,
# library,
# keystore_delegations,
# ):
# runner = CliRunner()

# repo_path = library / "auth"
# FILENAME = "test.txt"
# file_path = repo_path / TARGETS_DIRECTORY_NAME / FILENAME
# file_path.write_text("test")

# runner.invoke(
# taf,
# [
# "targets",
# "sign",
# "--path",
# f"{str(auth_repo_when_add_repositories_json.path)}",
# "--keystore",
# f"{str(keystore_delegations)}",
# ],
# )
# check_if_targets_signed(auth_repo_when_add_repositories_json, "targets", FILENAME)

# file_path.unlink()

# runner.invoke(
# taf,
# [
# "targets",
# "sign",
# "--path",
# f"{str(auth_repo_when_add_repositories_json.path)}",
# "--keystore",
# f"{str(keystore_delegations)}",
# ],
# )

# signed_target_files = auth_repo_when_add_repositories_json.get_signed_target_files()
# assert FILENAME not in signed_target_files
def test_targets_sign_when_target_file_is_added_expect_success(
auth_repo_when_add_repositories_json,
library,
keystore_delegations,
):
runner = CliRunner()

repo_path = library / "auth"
FILENAME = "test.txt"
file_path = repo_path / TARGETS_DIRECTORY_NAME / FILENAME
file_path.write_text("test")

runner.invoke(
taf,
[
"targets",
"sign",
"--path",
f"{str(auth_repo_when_add_repositories_json.path)}",
"--keystore",
f"{str(keystore_delegations)}",
],
)

check_if_targets_signed(auth_repo_when_add_repositories_json, "targets", FILENAME)


def test_targets_sign_when_target_file_is_removed_expect_success(
auth_repo_when_add_repositories_json,
library,
keystore_delegations,
):
runner = CliRunner()

repo_path = library / "auth"
FILENAME = "test.txt"
file_path = repo_path / TARGETS_DIRECTORY_NAME / FILENAME
file_path.write_text("test")

runner.invoke(
taf,
[
"targets",
"sign",
"--path",
f"{str(auth_repo_when_add_repositories_json.path)}",
"--keystore",
f"{str(keystore_delegations)}",
],
)
check_if_targets_signed(auth_repo_when_add_repositories_json, "targets", FILENAME)

file_path.unlink()

runner.invoke(
taf,
[
"targets",
"sign",
"--path",
f"{str(auth_repo_when_add_repositories_json.path)}",
"--keystore",
f"{str(keystore_delegations)}",
],
)

signed_target_files = auth_repo_when_add_repositories_json.get_signed_target_files()
assert FILENAME not in signed_target_files


def test_targets_add_repo_cmd_expect_success(
Expand Down
4 changes: 3 additions & 1 deletion taf/tests/test_updater/test_update_library/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def library_with_dependencies(origin_dir, pin_manager, request):
params,
root_auth_repo.path / TARGETS_DIRECTORY_NAME / DEPENDENCIES_JSON_NAME,
)
sign_target_files(origin_dir, root_repo_name, keystore=KEYSTORE_PATH, pin_manager)
sign_target_files(
origin_dir, root_repo_name, keystore=KEYSTORE_PATH, pin_manager=pin_manager
)

library[root_auth_repo.name] = {"auth_repo": root_auth_repo, "target_repos": []}
yield library
44 changes: 41 additions & 3 deletions taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,11 +1185,35 @@ def get_key_length_and_scheme_from_metadata(
)
return pub_key, pub_key_pem, scheme
except Exception:
return None, None
return None, None, None

def get_key_names_from_metadata(self, parent_role: str) -> Tuple:
"""
Return length and signing scheme of the specified key id.
This data is specified in metadata files (root or a target role that has delegations)
"""
try:
metadata = json.loads(
Path(
self.path, METADATA_DIRECTORY_NAME, f"{parent_role}.json"
).read_text()
)
metadata = metadata["signed"]
if "delegations" in metadata:
metadata = metadata["delegations"]

keys = metadata["keys"]
names = {
key_id: key_data["name"]
for key_id, key_data in keys.items()
if "name" in key_data
}
return names
except Exception:
return None

def get_public_key_of_keyid(self, keyid: str):
def _find_keyid(role_name, keyid):

_, pub_key_pem, scheme = self.get_key_length_and_scheme_from_metadata(
role_name, keyid
)
Expand Down Expand Up @@ -1439,14 +1463,28 @@ def modify_targets(
return targets_role

def load_key_names(self):
def _get_keys_of_delegations(role_name):
keys = {}
role_keys = self.get_key_names_from_metadata(role_name)
if role_keys is not None:
keys.update(role_keys)

for delegation in self.get_delegations_of_role(role_name):
delegated_signed = self.signed_obj(delegation)
if delegated_signed.delegations:
inner_roles_keys = _get_keys_of_delegations(delegation)
if inner_roles_keys:
keys.update(inner_roles_keys)
return keys

root_metadata = self.signed_obj("root")
# target_roles = self.get_all_targets_roles()
name_mapping = {}
keys = root_metadata.keys
for key_id, key_obj in keys.items():
name_data = key_obj.unrecognized_fields
if name_data is not None and "name" in name_data:
name_mapping[key_id] = name_data["name"]
name_mapping.update(_get_keys_of_delegations("targets"))
return name_mapping

def _modify_targets_role(
Expand Down

0 comments on commit 891905d

Please sign in to comment.