Skip to content

Commit

Permalink
feat: when adding a new role, check if a key name already exists
Browse files Browse the repository at this point in the history
  • Loading branch information
renatav committed Jan 29, 2025
1 parent 12b0148 commit 291d98a
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 44 deletions.
69 changes: 34 additions & 35 deletions taf/api/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ def add_role(
commit_msg = git_commit_message("add-role", role=role)
metadata_path = Path(METADATA_DIRECTORY_NAME, f"{role}.json")

targets_parent_role = TargetsRole()
if parent_role != "targets":
targets_parent_role.name = parent_role
targets_parent_role.paths = []

new_role = TargetsRole()
new_role.name = role
new_role.parent = targets_parent_role
new_role.paths = paths
new_role.number = keys_number
new_role.threshold = threshold
new_role.yubikey = yubikey

import pdb; pdb.set_trace()
signers, _ = load_sorted_keys_of_new_roles(
roles=new_role,
auth_repo=auth_repo,
yubikeys_data=None,
keystore=keystore_path,
skip_prompt=skip_prompt,
certs_dir=auth_repo.certs_dir,
)
with manage_repo_and_signers(
auth_repo,
roles=[parent_role],
Expand All @@ -119,27 +141,6 @@ def add_role(
commit_msg=commit_msg,
paths_to_reset_on_error=[metadata_path],
):
targets_parent_role = TargetsRole()
if parent_role != "targets":
targets_parent_role.name = parent_role
targets_parent_role.paths = []

new_role = TargetsRole()
new_role.name = role
new_role.parent = targets_parent_role
new_role.paths = paths
new_role.number = keys_number
new_role.threshold = threshold
new_role.yubikey = yubikey

signers, _ = load_sorted_keys_of_new_roles(
roles=new_role,
auth_repo=auth_repo,
yubikeys_data=None,
keystore=keystore_path,
skip_prompt=skip_prompt,
certs_dir=auth_repo.certs_dir,
)
auth_repo.create_delegated_roles([new_role], signers)
auth_repo.add_new_roles_to_snapshot([new_role.name])
auth_repo.do_timestamp()
Expand Down Expand Up @@ -289,6 +290,18 @@ def add_roles(
]
keystore_path = roles_keys_data_new.keystore

all_signers = {}
for role_to_add_data in roles_to_add_data:
signers, _ = load_sorted_keys_of_new_roles(
roles=role_to_add_data,
auth_repo=auth_repo,
yubikeys_data=None,
keystore=keystore_path,
skip_prompt=not prompt_for_keys,
certs_dir=auth_repo.certs_dir,
)
all_signers.update(signers)

with manage_repo_and_signers(
auth_repo,
roles=roles_to_load,
Expand All @@ -300,17 +313,6 @@ def add_roles(
commit=commit,
push=push,
):
all_signers = {}
for role_to_add_data in roles_to_add_data:
signers, _ = load_sorted_keys_of_new_roles(
roles=role_to_add_data,
auth_repo=auth_repo,
yubikeys_data=None,
keystore=keystore_path,
skip_prompt=not prompt_for_keys,
certs_dir=auth_repo.certs_dir,
)
all_signers.update(signers)

# TODO add key name mappings
auth_repo.create_delegated_roles(roles_to_add_data, all_signers)
Expand Down Expand Up @@ -752,9 +754,6 @@ def _initialize_roles_and_keystore_for_existing_repo(
if not roles_key_infos_dict and enter_info:
roles_key_infos_dict = _enter_roles_infos(None, roles_key_infos)
elif roles_key_infos_dict:
import pdb

pdb.set_trace()
roles_key_infos_dict = _transform_roles_dict(roles_key_infos_dict, auth_repo)
roles_keys_data = from_dict(roles_key_infos_dict, RolesKeysData)
keystore = keystore or roles_keys_data.keystore
Expand Down
27 changes: 22 additions & 5 deletions taf/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,15 @@ def _load_signer_from_keystore(
return None


def _load_and_append_yubikeys(
def _load_yubikeys(
taf_repo,
role,
key_names,
retry_on_failure,
signers_yubikeys,
hide_threshold_message,
):

signers_yubikeys = []
yubikeys = yk.yubikey_prompt(
key_names=key_names,
pin_manager=taf_repo.pin_manager,
Expand Down Expand Up @@ -240,7 +240,7 @@ def _load_and_append_yubikeys(
loaded_key_names.append(key_name)
taf_logger.info(f"Successfully loaded {key_name} from inserted YubiKey")

return loaded_key_names
return signers_yubikeys, loaded_key_names


@log_on_start(INFO, "Loading signing keys of '{role:s}'", logger=taf_logger)
Expand Down Expand Up @@ -321,14 +321,14 @@ def load_signers(
# that can be used to sign the current role, but either read the name from the
# metadata, or assign a role + counter name
if initial_yk_load_attempt or use_yubikey_for_signing_confirmed:
loaded_keys = _load_and_append_yubikeys(
loaded_signers, loaded_keys = _load_yubikeys(
taf_repo=taf_repo,
role=role,
key_names=key_names,
retry_on_failure=use_yubikey_for_signing_confirmed,
signers_yubikeys=signers_yubikeys,
hide_threshold_message=hide_threshold_message,
)
signers_yubikeys.extend(loaded_signers)
if loaded_keys:
use_yubikey_for_signing_confirmed = True
num_of_loaded_keys = len(loaded_keys)
Expand Down Expand Up @@ -379,6 +379,23 @@ def setup_roles_keys(
if users_yubikeys_details is None:
users_yubikeys_details = {}


if yubikey_ids:
if auth_repo.keys_name_mappings:
# check if some of the listed key names are already defined as signing keys
# in that case, they need to be loaded and verified
existing_key_names = {
existing_key_name: existing_key_id for existing_key_id, existing_key_name in auth_repo.keys_name_mappings.items()
}
for key_name in yubikey_ids:
if key_name in existing_key_names:
public_key_pem, scheme = auth_repo.get_public_key_of_keyid(existing_key_names[key_name])
key_data = {
"public": public_key_pem,
"scheme": scheme
}
users_yubikeys_details[key_name] = UserKeyData(**key_data)

if role.is_yubikey:
yubikey_keys, yubikey_signers = _setup_yubikey_roles_keys(
auth_repo, yubikey_ids, users_yubikeys_details, role, certs_dir, key_size
Expand Down
30 changes: 27 additions & 3 deletions taf/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,10 +1172,34 @@ def get_key_length_and_scheme_from_metadata(
pub_key = serialization.load_pem_public_key(
pub_key_pem.encode(), backend=default_backend()
)
return pub_key, scheme
return pub_key, pub_key_pem, scheme
except Exception:
return None, None

def get_public_key_of_keyid(self, keyid: str):

def _find_keyid(role_name, keyid):

_, pub_key_pem, scheme = self.get_key_length_and_scheme_from_metadata(role_name, keyid)
if pub_key_pem is not None:
return pub_key_pem, scheme

for delegation in self.get_delegations_of_role(role_name):
pub_key_pem, scheme = self._find_keyid(
delegation, keyid
)
if pub_key_pem is not None:
return pub_key_pem, scheme

_, pub_key_pem, scheme = self.get_key_length_and_scheme_from_metadata("root", keyid)
if pub_key_pem is not None:
return pub_key_pem, scheme

targets_obj = self.signed_obj("targets")
if targets_obj.delegations:
return _find_keyid("targets", keyid)


def generate_roles_description(self) -> Dict:
"""
Generate a roles description dictionary, containing information
Expand All @@ -1194,7 +1218,7 @@ def _get_delegations(role_name):
"paths": delegated_role.paths,
"terminating": delegated_role.terminating,
}
pub_key, scheme = self.get_key_length_and_scheme_from_metadata(
pub_key, _, scheme = self.get_key_length_and_scheme_from_metadata(
role_name, delegated_role.keyids[0]
)

Expand All @@ -1213,7 +1237,7 @@ def _get_delegations(role_name):
"threshold": role_obj.threshold,
"number": len(role_obj.keyids),
}
pub_key, scheme = self.get_key_length_and_scheme_from_metadata(
pub_key, _, scheme = self.get_key_length_and_scheme_from_metadata(
"root", role_obj.keyids[0]
)
roles_description[role_name]["scheme"] = scheme
Expand Down
2 changes: 1 addition & 1 deletion taf/yubikey/yubikey_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import defaultdict
import contextlib
from typing import Dict, List, Tuple
from typing import Dict, List, Optional, Tuple
from taf.tuf.keys import SSlibKey


Expand Down

0 comments on commit 291d98a

Please sign in to comment.