diff --git a/VMEncryption/main/BekUtil.py b/VMEncryption/main/BekUtil.py index 20336a3ca..91ca51c25 100644 --- a/VMEncryption/main/BekUtil.py +++ b/VMEncryption/main/BekUtil.py @@ -15,7 +15,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import sys +import os from Common import CommonVariables from IMDSUtil import IMDSStoredResults from BekUtilVolumeImpl import BekUtilVolumeImpl @@ -43,6 +44,19 @@ def __init__(self, disk_util, logger, encryption_environment=None): def generate_passphrase(self): return self.bekUtilImpl.generate_passphrase() + def store_bek_passphrase_file_name(self,encryption_config, passphrase,key_file_name): + '''this function is used to store passphrase to specific file name''' + #update new passphrase to key_file_path. + key_file_dir = os.path.dirname(self.get_bek_passphrase_file(encryption_config)) + key_file_path = os.path.join(key_file_dir,key_file_name) + if sys.version_info[0] < 3: + if isinstance(passphrase, str): + passphrase = passphrase.decode('utf-8') + with open(key_file_path, 'wb') as f: + f.write(passphrase) + #making sure the permissions are read only to root user. + os.chmod(key_file_path,0o400) + def store_bek_passphrase(self, encryption_config, passphrase): return self.bekUtilImpl.store_bek_passphrase(encryption_config,passphrase) diff --git a/VMEncryption/main/Common.py b/VMEncryption/main/Common.py index bae6e7fb5..ee3395440 100644 --- a/VMEncryption/main/Common.py +++ b/VMEncryption/main/Common.py @@ -60,9 +60,17 @@ class CommonVariables: """ CVM LUKS2 header token """ + #this token id is used to store Primary ADE (encryption setting + wrapped passphrase) token. cvm_ade_vm_encryption_token_id = 5 + #this token id is used to store backup of token id 5, during KEK rotation. + cvm_ade_vm_encryption_backup_token_id = 6 ADEEncryptionVersionInLuksToken_1_0='1.0' - PassphraseNameValue = 'LUKSPasswordProtector' + PassphraseNameValueProtected = 'LUKSPasswordProtector' + #this token type is used to store Primary ADE token. Type id: 5 + AzureDiskEncryptionToken = 'Azure_Disk_Encryption' + #this token type is used to store backup ADE token. Type id: 6 + #this token used for recovery to token 5 in case of reboot/interruption happened during KEK rotation. + AzureDiskEncryptionBackUpToken='Azure_Disk_Encryption_BackUp' """ IMDS IP: """ diff --git a/VMEncryption/main/CryptMountConfigUtil.py b/VMEncryption/main/CryptMountConfigUtil.py index 7ae6e27ce..4d3de3440 100644 --- a/VMEncryption/main/CryptMountConfigUtil.py +++ b/VMEncryption/main/CryptMountConfigUtil.py @@ -142,7 +142,7 @@ def _restore_backup_crypttab_info(self,crypt_item,passphrase_file): return False if not os.path.exists(azure_crypt_mount_backup_location) and not os.path.exists(crypttab_backup_location): - self.logger.log(msg=("MountPoint info not found for" + device_item_real_path), level=CommonVariables.ErrorLevel) + self.logger.log(msg=("MountPoint info not found for" + crypt_item.dev_path), level=CommonVariables.ErrorLevel) # Not sure when this happens.. # in this case also, just add an entry to the azure_crypt_mount without a mount point. self.add_crypt_item(crypt_item) @@ -265,19 +265,21 @@ def device_unlock_using_luks2_header(self): threads = [] lock = threading.Lock() for device_item in device_items: - if device_item.file_system == "crypto_LUKS": + if device_item.file_system == "crypto_LUKS": + #restore LUKS2 token using BackUp. + self.disk_util.restore_luks2_token(device_name=device_item.name) device_item_path = self.disk_util.get_device_path(device_item.name) azure_item_path = azure_name_table[device_item_path] if device_item_path in azure_name_table else device_item_path thread = threading.Thread(target=self._device_unlock_using_luks2_header,args=(device_item.name,device_item_path,azure_item_path,lock)) threads.append(thread) thread.start() for thread in threads: - thread.join() + thread.join() self.logger.log("device_unlock_using_luks2_header End") def consolidate_azure_crypt_mount(self, passphrase_file): """ - Reads the backup files from block devices that have a LUKS header and adds it to the cenral azure_crypt_mount file + Reads the backup files from block devices that have a LUKS2 header and adds it to the central azure_crypt_mount file """ self.logger.log("Consolidating azure_crypt_mount") diff --git a/VMEncryption/main/DiskUtil.py b/VMEncryption/main/DiskUtil.py index 5de5e9b69..f15ff1eff 100644 --- a/VMEncryption/main/DiskUtil.py +++ b/VMEncryption/main/DiskUtil.py @@ -24,6 +24,7 @@ from subprocess import Popen import traceback import glob +import tempfile from EncryptionConfig import EncryptionConfig from DecryptionMarkConfig import DecryptionMarkConfig @@ -192,73 +193,146 @@ def secure_key_release_operation(self,protectorbase64,kekUrl,operation,attestati return None self.logger.log("secure_key_release_operation {0} end.".format(operation)) return process_comm.stdout.strip() - - def import_token(self,device_path,passphrase_file,public_settings): - '''this function reads passphrase from passphrase file, wrap it and update in token field of LUKS2 header.''' + + def import_token_data(self, device_path, token_data, token_id): + '''Updating token_data json object to LUKS2 header's Tokens field. + token data is as follow for version 1.0. + "version": "1.0", + "type": "Azure_Disk_Encryption", + "keyslots": [], + "KekVaultResourceId": "", + "KeyEncryptionKeyURL": "", + "KeyVaultResourceId": "", + "KeyVaultURL": "https://.vault.azure.net/", + "AttestationURL": null, + "PassphraseName": "LUKSPasswordProtector", + "Passphrase": "M53XE09n7O9r2AdKa7FYRYe..." + ''' + self.logger.log(msg="import_token_data for device: {0} started.".format(device_path)) + if not token_data or not isinstance(token_data, dict): + self.logger.log(level=CommonVariables.WarningLevel, msg="import_token_data: token_data: {0} for device: {1} is not valid.".format(token_data,device_path)) + return False + if not token_id: + self.logger.log(level= CommonVariables.WarningLevel, msg = "import_token_data: token_id: {0} for device: {1} is not valid.".format(token_id,device_path) ) + return False + temp_file = tempfile.NamedTemporaryFile(delete=False,mode='w+') + json.dump(token_data,temp_file,indent=4) + temp_file.close() + cmd = "cryptsetup token import --json-file {0} --token-id {1} {2}".format(temp_file.name,token_id,device_path) + process_comm = ProcessCommunicator() + status = self.command_executor.Execute(cmd,communicator=process_comm) + self.logger.log(msg="import_token_data: device: {0} status: {1}".format(device_path,status)) + os.unlink(temp_file.name) + return status==CommonVariables.process_success + + def import_token(self, device_path, passphrase_file, public_settings, + passphrase_name_value=CommonVariables.PassphraseNameValueProtected): + '''This function reads passphrase from passphrase_file, do SKR and wrap passphrase with securely + released key. Then it updates metadata (required encryption settings for SKR + wrapped passphrase) + to primary token id: 5 type: Azure_Disk_Encryption in Tokens field of LUKS2 header.''' self.logger.log(msg="import_token for device: {0} started.".format(device_path)) - protector = "" + self.logger.log(msg="import_token for passphrase file path: {0}.".format(passphrase_file)) + if not passphrase_file or not os.path.exists(passphrase_file): + self.logger.log(level=CommonVariables.WarningLevel,msg="import_token for passphrase file path: {0} not exists.".format(passphrase_file)) + return False + protector= "" with open(passphrase_file,"rb") as protector_file: #passphrase stored in keyfile is base64 protector = protector_file.read().decode('utf-8') - KekVaultResourceId=public_settings.get(CommonVariables.KekVaultResourceIdKey) - KeyEncryptionKeyUrl=public_settings.get(CommonVariables.KeyEncryptionKeyURLKey) - AttestationUrl = public_settings.get(CommonVariables.AttestationURLKey) - wrappedProtector = self.secure_key_release_operation(protectorbase64=protector, - kekUrl=KeyEncryptionKeyUrl, + kek_vault_resource_id=public_settings.get(CommonVariables.KekVaultResourceIdKey) + key_encryption_key_url=public_settings.get(CommonVariables.KeyEncryptionKeyURLKey) + attestation_url = public_settings.get(CommonVariables.AttestationURLKey) + if passphrase_name_value == CommonVariables.PassphraseNameValueProtected: + protector = self.secure_key_release_operation(protectorbase64=protector, + kekUrl=key_encryption_key_url, operation=CommonVariables.secure_key_release_wrap, - attestationUrl=AttestationUrl) - if not wrappedProtector: + attestationUrl=attestation_url) + else: + self.logger.log(msg="import_token passphrase is not wrapped, value of passphrase name key: {0}".format(passphrase_name_value)) + + if not protector: self.logger.log("import_token protector wrapping is unsuccessful for device {0}".format(device_path)) return False data={ "version":CommonVariables.ADEEncryptionVersionInLuksToken_1_0, "type":"Azure_Disk_Encryption", "keyslots":[], - CommonVariables.KekVaultResourceIdKey:KekVaultResourceId, - CommonVariables.KeyEncryptionKeyURLKey:KeyEncryptionKeyUrl, + CommonVariables.KekVaultResourceIdKey:kek_vault_resource_id, + CommonVariables.KeyEncryptionKeyURLKey:key_encryption_key_url, CommonVariables.KeyVaultResourceIdKey:public_settings.get(CommonVariables.KeyVaultResourceIdKey), CommonVariables.KeyVaultURLKey:public_settings.get(CommonVariables.KeyVaultURLKey), - CommonVariables.AttestationURLKey:AttestationUrl, - CommonVariables.PassphraseNameKey:CommonVariables.PassphraseNameValue, - CommonVariables.PassphraseKey:wrappedProtector + CommonVariables.AttestationURLKey:attestation_url, + CommonVariables.PassphraseNameKey:passphrase_name_value, + CommonVariables.PassphraseKey:protector } - #TODO: needed to decide on temp path. - custom_cmk = os.path.join("/var/lib/azure_disk_encryption_config/","custom_cmk.json") - out_file = open(custom_cmk,"w") - json.dump(data,out_file,indent=4) - out_file.close() - cmd = "cryptsetup token import --json-file {0} --token-id {1} {2}".format(custom_cmk,CommonVariables.cvm_ade_vm_encryption_token_id,device_path) - process_comm = ProcessCommunicator() - status = self.command_executor.Execute(cmd,communicator=process_comm) - self.logger.log(msg="import_token: device: {0} status: {1}".format(device_path,status)) - os.remove(custom_cmk) + status = self.import_token_data(device_path=device_path, + token_data=data, + token_id=CommonVariables.cvm_ade_vm_encryption_token_id) self.logger.log(msg="import_token: device: {0} end.".format(device_path)) - return status==CommonVariables.process_success - - def export_token(self,device_name): - '''This function reads token id from luks2 header field and unwrap passphrase''' - self.logger.log("export_token to device {0} started.".format(device_name)) - device_path = os.path.join("/dev",device_name) - protector = None - cmd = "cryptsetup token export --token-id {0} {1}".format(CommonVariables.cvm_ade_vm_encryption_token_id,device_path) + return status + + def read_token(self, device_name, token_id): + '''this functions reads tokens from LUKS2 header.''' + device_path = self.get_device_path(dev_name=device_name) + if not device_path or not token_id: + self.logger.log(level=CommonVariables.WarningLevel, + msg="read_token: Inputs are not valid. device_name: {0}, token id: {1}".format(device_name,token_id)) + return None + cmd = "cryptsetup token export --token-id {0} {1}".format(token_id,device_path) process_comm = ProcessCommunicator() status = self.command_executor.Execute(cmd, communicator=process_comm) if status != 0: - self.logger.log("export_token token id {0} not found in device {1} LUKS header".format(CommonVariables.cvm_ade_vm_encryption_token_id,device_name)) + self.logger.log(level=CommonVariables.WarningLevel, + msg="read_token: token id: {0} is not found for device_name: {1} in LUKS header".format(token_id,device_name)) return None token = process_comm.stdout - disk_encryption_setting=json.loads(token) + return json.loads(token) + + def remove_token(self, device_name, token_id): + '''this function remove the token''' + device_path = self.get_device_path(dev_name=device_name) + if not device_path or not token_id: + self.logger.log(level=CommonVariables.WarningLevel, + msg="remove_token: Inputs are not valid. device name: {0}, token_id: {1}".format(device_name,token_id)) + return False + cmd = "cryptsetup token remove --token-id {0} {1}".format(token_id,device_path) + process_comm = ProcessCommunicator() + status = self.command_executor.Execute(cmd, communicator=process_comm) + if status != 0: + self.logger.log(level=CommonVariables.WarningLevel, + msg="remove_token: token id: {0} is not found for device_name: {1} in LUKS header".format(token_id,device_name)) + return False + return True + + def export_token(self,device_name): + '''This function reads wrapped passphrase from LUKS2 Tokens for + token id:5, which belongs to primary token type: Azure_Disk_Encryption + and do SKR and returns unwrapped passphrase''' + self.logger.log("export_token: for device_name: {0} started.".format(device_name)) + device_path = self.get_device_path(device_name) + if not device_path: + self.logger.log(level= CommonVariables.WarningLevel, msg="export_token Input is not valid. device name: {0}".format(device_name)) + return None + protector = None + cvm_ade_vm_encryption_token_id = self.get_token_id(header_or_dev_path=device_path,token_name=CommonVariables.AzureDiskEncryptionToken) + if not cvm_ade_vm_encryption_token_id: + self.logger.log("export_token token id {0} not found in device {1} LUKS header".format(cvm_ade_vm_encryption_token_id,device_name)) + return None + disk_encryption_setting=self.read_token(device_name=device_name,token_id=cvm_ade_vm_encryption_token_id) if disk_encryption_setting['version'] != CommonVariables.ADEEncryptionVersionInLuksToken_1_0: self.logger.log("export_token token version {0} is not a vaild version.".format(disk_encryption_setting['version'])) return None - keyEncryptionKeyUrl=disk_encryption_setting[CommonVariables.KeyEncryptionKeyURLKey] - wrappedProtector = disk_encryption_setting[CommonVariables.PassphraseKey] - attestationUrl = disk_encryption_setting[CommonVariables.AttestationURLKey] - if wrappedProtector: + key_encryption_key_url=disk_encryption_setting[CommonVariables.KeyEncryptionKeyURLKey] + wrapped_protector = disk_encryption_setting[CommonVariables.PassphraseKey] + attestation_url = disk_encryption_setting[CommonVariables.AttestationURLKey] + if disk_encryption_setting[CommonVariables.PassphraseNameKey] != CommonVariables.PassphraseNameValueProtected: + self.logger.log(level=CommonVariables.WarningLevel, msg="passphrase is not Protected. No need to do SKR.") + return wrapped_protector if wrapped_protector else None + if wrapped_protector: #unwrap the protector. - protector=self.secure_key_release_operation(attestationUrl=attestationUrl, - kekUrl=keyEncryptionKeyUrl, - protectorbase64=wrappedProtector, + protector=self.secure_key_release_operation(attestationUrl=attestation_url, + kekUrl=key_encryption_key_url, + protectorbase64=wrapped_protector, operation=CommonVariables.secure_key_release_unwrap) self.logger.log("export_token to device {0} end.".format(device_name)) return protector @@ -397,6 +471,51 @@ def luks_get_uuid(self, header_or_dev_path): return splits[1] return None + def get_token_id(self, header_or_dev_path, token_name): + '''if LUKS2 header has token name return the id else return none.''' + if not header_or_dev_path or not os.path.exists(header_or_dev_path) or not token_name: + self.logger.log("get_token_id: invalid input, header_or_dev_path:{0} token_name:{1}".format(header_or_dev_path,token_name)) + return None + luks_dump_out = self._luks_get_header_dump(header_or_dev_path) + tokens = self._extract_luksv2_token(luks_dump_out) + for token in tokens: + if len(token) == 2 and token[1] == token_name: + return token[0] + return None + + def restore_luks2_token(self, device_name=None): + '''this function restores token + type:Azure_Disk_Encryption_BackUp, id:6 to type:Azure_Disk_Encryption id:5, + this function acts on 4 scenarios. + 1. both token id: 5 and 6 present in LUKS2 Tokens field, due to reboot/interrupt during + KEK rotation, such case remove token id 5 has latest data so remove token id 6. + 2. token id 5 present but 6 is not present in LUKS2 Tokens field. do nothing. + 3. token id 5 not present but 6 present in LUKS2 Tokens field, restore token id 5 using + token id 6, then remove token id 6. + 4. no token ids 5 or 6 present in LUKS2 Tokens field, do nothing.''' + device_path = self.get_device_path(device_name) + if not device_path: + self.logger.log(level=CommonVariables.WarningLevel,msg="restore_luks2_token invalid input. device_name = {0}".format(device_name)) + return + ade_token_id_primary = self.get_token_id(header_or_dev_path=device_path,token_name=CommonVariables.AzureDiskEncryptionToken) + ade_token_id_backup = self.get_token_id(header_or_dev_path=device_path,token_name=CommonVariables.AzureDiskEncryptionBackUpToken) + if not ade_token_id_backup: + #do nothing + return + if ade_token_id_primary: + #remove backup token id + self.remove_token(device_name=device_name,token_id=ade_token_id_backup) + return + #ade_token_id_backup having value but ade_token_id_primary is none + self.logger.log("restore luks2 token for device {0} is started.".format(device_name)) + #read from backup and update AzureDiskEncryptionToken + data = self.read_token(device_name=device_name,token_id=ade_token_id_backup) + data['type']=CommonVariables.AzureDiskEncryptionToken + self.import_token_data(device_path=device_path,token_data=data,token_id=CommonVariables.cvm_ade_vm_encryption_token_id) + #remove backup + self.remove_token(device_name=device_name,token_id=ade_token_id_backup) + self.logger.log("restore luks2 token id {0} to {1} for device {2} is successful.".format(ade_token_id_backup,CommonVariables.cvm_ade_vm_encryption_token_id,device_name)) + def _get_cryptsetup_version(self): # get version of currently installed cryptsetup cryptsetup_cmd = "{0} --version".format(self.distro_patcher.cryptsetup_path) @@ -410,6 +529,47 @@ def _extract_luks_version_from_dump(self, luks_dump_out): if "version:" in line.lower(): return line.split()[-1] + def _extract_luksv2_token(self, luks_dump_out): + """ + A luks v2 luksheader looks kind of like this: (inessential stuff removed) + + LUKS header information + Version: 2 + Data segments: + 0: crypt + offset: 0 [bytes] + length: 5539430400 [bytes] + cipher: aes-xts-plain64 + sector: 512 [bytes] + Keyslots: + 1: luks2 + Key: 512 bits + 3: reencrypt (unbound) + Key: 8 bits + Tokens: + 6: Azure_Disk_Encryption_BackUp + 5: Azure_Disk_Encryption + ... + """ + if not luks_dump_out: + return [] + lines = luks_dump_out.split("\n") + token_segment = False + token_lines = [] + for line in lines: + parts = line.split(":") + if len(parts)<2: + continue + if token_segment and parts[1].strip() == '': + break + if "tokens" in parts[0].strip().lower(): + token_segment = True + continue + if token_segment and self._isnumeric(parts[0].strip()): + token_lines.append([int(parts[0].strip()),parts[1].strip()]) + continue + return token_lines + def _extract_luksv2_keyslot_lines(self, luks_dump_out): """ A luks v2 luksheader looks kind of like this: (inessential stuff removed) diff --git a/VMEncryption/main/ExtensionParameter.py b/VMEncryption/main/ExtensionParameter.py index f1c4a0cf4..2b682eb67 100644 --- a/VMEncryption/main/ExtensionParameter.py +++ b/VMEncryption/main/ExtensionParameter.py @@ -174,6 +174,14 @@ def _is_kv_equivalent(self, a, b): if b[-1] == '/': b = b[:-1] return a==b + def cmk_changed(self): + '''current config CMK changed from effective config CMK.''' + if (self.KeyEncryptionKeyURL or self.get_kek_url()) and \ + (not self._is_kv_equivalent(self.KeyEncryptionKeyURL, self.get_kek_url())): + self.logger.log('Current config KeyEncryptionKeyURL {0} differs from effective config KeyEncryptionKeyURL {1}'.format(self.KeyEncryptionKeyURL, self.get_kek_url())) + return True + return False + def config_changed(self): if (self.command or self.get_command()) and \ (self.command != self.get_command() and \ diff --git a/VMEncryption/main/handle.py b/VMEncryption/main/handle.py index 479fc00b3..a2bd07fed 100644 --- a/VMEncryption/main/handle.py +++ b/VMEncryption/main/handle.py @@ -185,15 +185,17 @@ def disable_encryption(): def stamp_disks_with_settings(items_to_encrypt, encryption_config, encryption_marker=None): - if security_Type == CommonVariables.ConfidentialVM: - logger.log(msg="Do not send vm setting to host for stamping.",level=CommonVariables.InfoLevel) - return disk_util = DiskUtil(hutil=hutil, patching=DistroPatcher, logger=logger, encryption_environment=encryption_environment) crypt_mount_config_util = CryptMountConfigUtil(logger=logger, encryption_environment=encryption_environment, disk_util=disk_util) bek_util = BekUtil(disk_util, logger,encryption_environment) current_passphrase_file = bek_util.get_bek_passphrase_file(encryption_config) public_settings = get_public_settings() extension_parameter = ExtensionParameter(hutil, logger, DistroPatcher, encryption_environment, get_protected_settings(), public_settings) + if security_Type == CommonVariables.ConfidentialVM: + logger.log(msg="Do not send CVM encryption setting to host for stamping.", + level=CommonVariables.InfoLevel) + extension_parameter.commit() + return has_keystore_flag = CommonVariables.KeyStoreTypeKey in public_settings # post new encryption settings via wire server protocol @@ -270,6 +272,196 @@ def get_protected_settings(): else: return protected_settings_str +def create_temp_file(file_content): + '''This function is creating a temp file of file_content. returning a file name.''' + temp_keyfile = tempfile.NamedTemporaryFile(delete=False) + if isinstance(file_content, bytes): + temp_keyfile.write(file_content) + else: + temp_keyfile.write(file_content.encode("utf-8")) + temp_keyfile.close() + return temp_keyfile.name + +def add_new_passphrase_luks2_key_slot(disk_util,\ + existing_passphrase,\ + new_passphrase,\ + device_path,\ + luks_header_path): + '''This function is used to add a new passphrase in luks key slot.''' + logger.log("add_new_passphrase_luks2_key_slot: start!") + ret = False + try: + before_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) + logger.log("Before key addition, key slots for {0}: {1}".format(device_path, before_key_slots)) + logger.log("Adding new key for {0}".format(device_path)) + existing_passphrase_file_name = create_temp_file(existing_passphrase) + new_passphrase_file_name = create_temp_file(new_passphrase) + luks_add_result = disk_util.luks_add_key(passphrase_file=existing_passphrase_file_name, + dev_path=device_path, + mapper_name=None, + header_file=luks_header_path, + new_key_path=new_passphrase_file_name) + logger.log("luks add result is {0}".format(luks_add_result)) + after_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) + logger.log("After key addition, key slots for {0}: {1}".format(device_path, after_key_slots)) + new_key_slot = list([x[0] != x[1] for x in zip(before_key_slots, after_key_slots)]).index(True) + logger.log("New key was added in key slot {0}".format(new_key_slot)) + os.unlink(existing_passphrase_file_name) + os.unlink(new_passphrase_file_name) + ret = True + except Exception as e: + msg="add_new_passphrase_luks2_key_slot failed with error {0}, stack trace: {1}".format(e, traceback.format_exc()) + logger.log(msg=msg,level=CommonVariables.WarningLevel) + logger.log("add_new_passphrase_luks2_key_slot: end!") + return ret + +def remove_passphrase_luks2_key_slot(disk_util,\ + passphrase,\ + device_path,\ + luks_header_path): + '''This function is used for removing a passphrase from luks key slot.''' + logger.log("remove_passphrase_luks2_key_slot: start!") + ret = False + try: + before_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) + logger.log("Before key removal, key slots for {0}: {1}".format(device_path, before_key_slots)) + logger.log("Removing new key for {0}".format(device_path)) + passphrase_file_name = create_temp_file(passphrase) + luks_remove_result = disk_util.luks_remove_key(passphrase_file=passphrase_file_name, + dev_path=device_path, + header_file=luks_header_path) + logger.log("luks remove result is {0}".format(luks_remove_result)) + after_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) + logger.log("After key removal, key slots for {0}: {1}".format(device_path, after_key_slots)) + os.unlink(passphrase_file_name) + ret = True + except Exception as e: + msg="remove_passphrase_luks2_key_slot failed with error {0}, stack trace: {1}".format(e, traceback.format_exc()) + logger.log(msg=msg,level=CommonVariables.WarningLevel) + logger.log("remove_passphrase_luks2_key_slot: end!") + return ret + +def update_encryption_settings_luks2_header(extra_items_to_encrypt=None): + '''This function is used for CMK passphrase wrapping with new KEK URL and update + metadata in LUKS2 header.''' + if extra_items_to_encrypt is None: + extra_items_to_encrypt=[] + hutil.do_parse_context('UpdateEncryptionSettingsLuks2Header') + logger.log('Updating encryption settings LUKS-2 header') + # ensure cryptsetup package is still available in case it was for some reason removed after enable + try: + DistroPatcher.install_cryptsetup() + except Exception as e: + hutil.save_seq() + message = "Failed to update encryption settings with error: {0}, stack trace: {1}".format(e, traceback.format_exc()) + hutil.do_exit(exit_code=CommonVariables.missing_dependency, + operation='UpdateEncryptionSettingsLuks2Header', + status=CommonVariables.extension_error_status, + code=str(CommonVariables.missing_dependency), + message=message) + try: + public_setting = get_public_settings() + encryption_config = EncryptionConfig(encryption_environment, logger) + extension_parameter = ExtensionParameter(hutil, logger, DistroPatcher, encryption_environment, get_protected_settings(), public_setting) + disk_util = DiskUtil(hutil=hutil, patching=DistroPatcher, logger=logger, encryption_environment=encryption_environment) + bek_util = BekUtil(disk_util, logger,encryption_environment) + device_items = disk_util.get_device_items(None) + if extension_parameter.passphrase is None or extension_parameter.passphrase == "": + extension_parameter.passphrase = bek_util.generate_passphrase() + + for device_item in device_items: + device_item_path = disk_util.get_device_path(device_item.name) + if not disk_util.is_luks_device(device_item_path,None): + logger.log("Not a LUKS device, device path: {0}".format(device_item_path)) + continue + #restoring the token data to type Azure_Disk_Encryption + #It is necessary to restore if we are resuming from previous attempt, otherwise its no-op. + disk_util.restore_luks2_token(device_name=device_item.name) + logger.log("Reading passphrase from LUKS2 header, device name: {0}".format(device_item.name)) + #copy primary token to backup token for recovery, if reboot or interrupt happened during KEK rotation. + ade_primary_token_id = disk_util.get_token_id(header_or_dev_path=device_item_path,token_name=CommonVariables.AzureDiskEncryptionToken) + if not ade_primary_token_id: + logger.log("primary token type: Azure_Disk_Encryption not found for device {0}".format(device_item.name)) + continue + data = disk_util.read_token(device_name=device_item.name,token_id=ade_primary_token_id) + #writing primary token data to backup token, update token type to back up. + data['type']=CommonVariables.AzureDiskEncryptionBackUpToken + #update backup token data to backup token id:6. + disk_util.import_token_data(device_path=device_item_path,token_data=data,token_id=CommonVariables.cvm_ade_vm_encryption_backup_token_id) + #get the unwrapped passphrase from LUKS2 header. + passphrase=disk_util.export_token(device_name=device_item.name) + if not passphrase: + logger.log(level=CommonVariables.WarningLevel, + msg="No passphrase found in LUKS2 header, device name: {0}".format(device_item.name)) + continue + #remove primary token from Tokens field of LUKS2 header. + disk_util.remove_token(device_name=device_item.name,token_id=ade_primary_token_id) + logger.log("Updating wrapped passphrase to LUKS2 header with current public setting. device name {0}".format(device_item.name)) + #add new slot with new passphrase. + is_added = add_new_passphrase_luks2_key_slot(disk_util=disk_util, + existing_passphrase=passphrase, + new_passphrase=extension_parameter.passphrase, + device_path= device_item_path, + luks_header_path=None) + if not is_added: + logger.log(level=CommonVariables.WarningLevel, + msg="new passphrase is not added to LUKS2 slot. Skip operation for device: {0}".format(device_item.name)) + continue + #protect passphrase before updating to LUKS2 is done in import_token + new_passphrase_file = create_temp_file(extension_parameter.passphrase) + #save passphrase to LUKS2 header with PassphraseNameValueProtected + ret = disk_util.import_token(device_path=device_item_path, + passphrase_file=new_passphrase_file, + public_settings=public_setting, + passphrase_name_value=CommonVariables.PassphraseNameValueProtected) + if not ret: + logger.log(level=CommonVariables.WarningLevel, + msg="Update passphrase with current public setting to LUKS2 header is not successful. device path {0}".format(device_item_path)) + return None + os.unlink(new_passphrase_file) + #removing old password form key slot. + is_removed = remove_passphrase_luks2_key_slot(disk_util=disk_util, + passphrase=passphrase, + device_path=device_item_path, + luks_header_path=None) + if not is_removed: + logger.log(level=CommonVariables.WarningLevel, + msg="old passphrase is not removed from LUKS2 slot. Skip operation for device: {0}".format(device_item.name)) + #removing backup token as KEK rotation is successful here. + disk_util.remove_token(device_name=device_item.name, + token_id=CommonVariables.cvm_ade_vm_encryption_backup_token_id) + #update passphrase file for auto unlock + key_file_name = CommonVariables.encryption_key_file_name + scsi_lun_numbers = disk_util.get_azure_data_disk_controller_and_lun_numbers([os.path.realpath(device_item_path)]) + if len(scsi_lun_numbers) != 0: + scsi_controller, lun_number = scsi_lun_numbers[0] + key_file_name = "{0}_{1}_{2}".format(key_file_name,str(scsi_controller),str(lun_number)) + bek_util.store_bek_passphrase_file_name(encryption_config=encryption_config, + passphrase=extension_parameter.passphrase, + key_file_name=key_file_name) + #committing the extension parameter if KEK rotation is successful. + extension_parameter.commit() + + if len(extra_items_to_encrypt) > 0: + hutil.do_status_report(operation='UpdateEncryptionSettingsLuks2Header', + status=CommonVariables.extension_success_status, + status_code=str(CommonVariables.success), + message='Encryption settings updated in LUKS2 header') + else: + hutil.do_exit(exit_code=0, + operation='UpdateEncryptionSettingsLuks2Header', + status=CommonVariables.extension_success_status, + code=str(CommonVariables.success), + message='Encryption settings updated in LUKS2 header') + except Exception as e: + hutil.save_seq() + message = "Failed to update encryption settings Luks2 header with error: {0}, stack trace: {1}".format(e, traceback.format_exc()) + logger.log(msg=message, level=CommonVariables.ErrorLevel) + hutil.do_exit(exit_code=CommonVariables.unknown_error, + operation='UpdateEncryptionSettingsLuks2Header', + status=CommonVariables.extension_error_status, + code=str(CommonVariables.unknown_error), + message=message) def update_encryption_settings(extra_items_to_encrypt=[]): hutil.do_parse_context('UpdateEncryptionSettings') @@ -944,15 +1136,26 @@ def handle_encryption(public_settings, encryption_status, disk_util, bek_util, e if is_daemon_running(): logger.log("An operation already running. Cannot accept an update settings request.") hutil.reject_settings() - are_devices_encrypted, items_to_encrypt = are_required_devices_encrypted(volume_type, encryption_status, disk_util, bek_util, encryption_operation) - if not are_devices_encrypted: - logger.log('Required devices not encrypted for volume type {0}. Calling update to stamp encryption settings.'.format(volume_type)) - update_encryption_settings(items_to_encrypt) - logger.log('Encryption Settings stamped. Calling enable to encrypt new devices.') - enable_encryption() + are_devices_encrypted, items_to_encrypt = are_required_devices_encrypted(volume_type=volume_type, + encryption_status=encryption_status, + disk_util=disk_util, + bek_util=bek_util, + encryption_operation=encryption_operation) + if security_Type==CommonVariables.ConfidentialVM: + logger.log('Calling Update Encryption Setting in LUKS2 header.') + if extension_parameter.cmk_changed(): + update_encryption_settings_luks2_header(items_to_encrypt) + if not are_devices_encrypted: + enable_encryption() else: - logger.log('Calling Update Encryption Setting.') - update_encryption_settings() + if not are_devices_encrypted: + logger.log('Required devices not encrypted for volume type {0}. Calling update to stamp encryption settings.'.format(volume_type)) + update_encryption_settings(items_to_encrypt) + logger.log('Encryption Settings stamped. Calling enable to encrypt new devices.') + enable_encryption() + else: + logger.log('Calling Update Encryption Setting.') + update_encryption_settings() else: logger.log("Config did not change or first call, enabling encryption") encryption_marker = EncryptionMarkConfig(logger, encryption_environment) @@ -2098,7 +2301,7 @@ def mapped_device_item_match(device_item): return None -def daemon_encrypt(): +def daemon_encrypt(): logger.log("daemon_encrypt security type is {0}".format(security_Type)) public_settings = get_public_settings() if security_Type == CommonVariables.ConfidentialVM: diff --git a/VMEncryption/main/test/test_disk_util.py b/VMEncryption/main/test/test_disk_util.py index 0a6f339f8..59ace6f10 100644 --- a/VMEncryption/main/test/test_disk_util.py +++ b/VMEncryption/main/test/test_disk_util.py @@ -416,3 +416,115 @@ def test_get_luks_header_size_luks2_badoffset(self, ver_mock, dump_mock): dump_mock.return_value = "" header_size = self.disk_util.get_luks_header_size("/mocked/device/path") self.assertEqual(header_size, None) + + @mock.patch("CommandExecutor.CommandExecutor.Execute", return_value=0) + def test_import_token_data(self,cmd_exc_mock): + '''update dict type data to luks2 header''' + cmd_exc_mock.return_value = 0 + data = {} + result = self.disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) + self.assertEqual(result,False) + data = None + result = self.disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) + self.assertEqual(result,False) + data = [1,3] + result = self.disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) + self.assertEqual(result,False) + data = "test data" + result = self.disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) + self.assertEqual(result,False) + data = {'version':'1.0','type':'ADE'} + result = self.disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) + self.assertEqual(result,True) + + @mock.patch("os.path.exists", return_value=True) + @mock.patch("CommandExecutor.CommandExecutor.Execute", return_value=0) + def test_import_token(self,cmd_exc_mock,path_exit): + '''read passphrase from Passphrase_file, and update to LUKS2 header''' + cmd_exc_mock.return_value = 0 + path_exit.return_value=False + result = self.disk_util.import_token(device_path="/dev/sdc", + passphrase_file=None, + public_settings=None) + self.assertEqual(result,False) + path_exit.return_value=False + result = self.disk_util.import_token(device_path="/dev/sdc", + passphrase_file="/var/lib/file_path", + public_settings=None) + self.assertEqual(result,False) + + @mock.patch("os.path.exists") + @mock.patch("CommandExecutor.CommandExecutor.Execute", return_value=0) + def test_read_token(self,cmd_exc_mock,path_exists): + '''read token from LUKS2 header token''' + cmd_exc_mock.return_value = 1 + path_exists.return_value = False + result = self.disk_util.read_token(device_name="",token_id=None) + self.assertEqual(result,None) + cmd_exc_mock.return_value = 1 + path_exists.return_value = True + result = self.disk_util.read_token(device_name="sda",token_id=None) + self.assertEqual(result,None) + cmd_exc_mock.return_value = 1 + path_exists.return_value = True + result = self.disk_util.read_token(device_name="sda",token_id=1) + self.assertEqual(result,None) + + @mock.patch("DiskUtil.DiskUtil.get_token_id") + @mock.patch("os.path.exists") + @mock.patch("CommandExecutor.CommandExecutor.Execute", return_value=0) + def test_export_token(self,cmd_exc_mock,path_exists,ade_encryption_token): + '''read passphrase from token type Azure_Disk_Encryption''' + ade_encryption_token.return_value = 5 + cmd_exc_mock.return_value = 1 + path_exists.return_value = False + protector = self.disk_util.export_token("sda") + self.assertEqual(protector, None) + path_exists.return_value = True + ade_encryption_token.return_value = None + protector = self.disk_util.export_token("sda") + self.assertEqual(protector, None) + + def test_extract_luksv2_token(self): + '''extracting Tokens field to get token from LUKS2 header. returns list''' + luks_dump_out = None + tokens = self.disk_util._extract_luksv2_token(luks_dump_out) + self.assertEqual(len(tokens), 0) + luks_dump_out = "" + tokens = self.disk_util._extract_luksv2_token(luks_dump_out) + self.assertEqual(len(tokens), 0) + luks_dump_out = "Tokens:\n\ + 1: Azure_Disk_Encryption_BackUp\n\ + 5: Azure_Disk_Encryption" + tokens = self.disk_util._extract_luksv2_token(luks_dump_out) + self.assertEqual(len(tokens), 2) + self.assertEqual(tokens[0][0], 1) + self.assertEqual(tokens[0][1], "Azure_Disk_Encryption_BackUp") + self.assertEqual(tokens[1][0], 5) + self.assertEqual(tokens[1][1], "Azure_Disk_Encryption") + + @mock.patch("os.path.exists") + @mock.patch("DiskUtil.DiskUtil._luks_get_header_dump") + def test_get_token_id(self,luks_dump,header_or_dev_path_exist): + '''getting the token id from LUKS2 header, if not present return None''' + header_or_dev_path_exist.return_value = True + luks_dump.return_value="Tokens:\n\ + 1: Azure_Disk_Encryption_BackUp\n\ + 5: Azure_Disk_Encryption" + token_id = self.disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption_BackUp") + self.assertEqual(token_id,1) + token_id = self.disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption") + self.assertEqual(token_id,5) + luks_dump.return_value="Tokens:\n\ + 1: Azure_Disk_Encryption_BackUp" + token_id = self.disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption_BackUp") + self.assertEqual(token_id,1) + token_id = self.disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption") + self.assertEqual(token_id,None) + token_id = self.disk_util.get_token_id("","Azure_Disk_Encryption_BackUp") + self.assertEqual(token_id,None) + token_id = self.disk_util.get_token_id("/dev/sda","") + self.assertEqual(token_id,None) + header_or_dev_path_exist.return_value = False + token_id = self.disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption") + self.assertEqual(token_id,None) \ No newline at end of file