diff --git a/lib/charms/mysql/v0/backups.py b/lib/charms/mysql/v0/backups.py index 2a31c2a39..266e45496 100644 --- a/lib/charms/mysql/v0/backups.py +++ b/lib/charms/mysql/v0/backups.py @@ -55,12 +55,14 @@ def is_unit_blocked(self) -> bool: from charms.mysql.v0.mysql import ( MySQLConfigureInstanceError, MySQLCreateClusterError, + MySQLCreateClusterSetError, MySQLDeleteTempBackupDirectoryError, MySQLDeleteTempRestoreDirectoryError, MySQLEmptyDataDirectoryError, MySQLExecuteBackupCommandsError, MySQLGetMemberStateError, MySQLInitializeJujuOperationsTableError, + MySQLKillSessionError, MySQLOfflineModeAndHiddenInstanceExistsError, MySQLPrepareBackupForRestoreError, MySQLRescanClusterError, @@ -80,13 +82,14 @@ def is_unit_blocked(self) -> bool: from ops.charm import ActionEvent from ops.framework import Object from ops.jujuversion import JujuVersion -from ops.model import ActiveStatus, BlockedStatus +from ops.model import BlockedStatus, MaintenanceStatus from constants import MYSQL_DATA_DIR logger = logging.getLogger(__name__) MYSQL_BACKUPS = "mysql-backups" +S3_INTEGRATOR_RELATION_NAME = "s3-parameters" # The unique Charmhub library identifier, never change it LIBID = "183844304be247129572309a5fb1e47c" @@ -96,7 +99,7 @@ def is_unit_blocked(self) -> bool: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 8 +LIBPATCH = 9 if typing.TYPE_CHECKING: @@ -117,6 +120,10 @@ def __init__(self, charm: "MySQLOperatorCharm", s3_integrator: S3Requirer) -> No self.framework.observe(self.charm.on.restore_action, self._on_restore) # ------------------ Helpers ------------------ + @property + def _s3_integrator_relation_exists(self) -> bool: + """Returns whether a relation with the s3-integrator exists.""" + return bool(self.model.get_relation(S3_INTEGRATOR_RELATION_NAME)) def _retrieve_s3_parameters(self) -> Tuple[Dict[str, str], List[str]]: """Retrieve S3 parameters from the S3 integrator relation. @@ -176,11 +183,7 @@ def _upload_logs_to_s3( Returns: bool indicating success """ - logs = f"""Stdout: -{stdout} - -Stderr: -{stderr}""" + logs = f"Stdout:\n{stdout}\n\nStderr:\n{stderr}" logger.debug(f"Logs to upload to S3 at location {log_filename}:\n{logs}") logger.info( @@ -206,7 +209,7 @@ def _on_list_backups(self, event: ActionEvent) -> None: List backups available to restore by this application. """ - if not self.charm.s3_integrator_relation_exists: + if not self._s3_integrator_relation_exists: event.fail("Missing relation with S3 integrator charm") return @@ -222,7 +225,9 @@ def _on_list_backups(self, event: ActionEvent) -> None: event.set_results({"backups": self._format_backups_list(backups)}) except Exception as e: error_message = ( - e.message if hasattr(e, "message") else "Failed to retrieve backup ids from S3" + getattr(e, "message") + if hasattr(e, "message") + else "Failed to retrieve backup ids from S3" ) logger.error(error_message) event.fail(error_message) @@ -233,7 +238,7 @@ def _on_create_backup(self, event: ActionEvent) -> None: """Handle the create backup action.""" logger.info("A backup has been requested on unit") - if not self.charm.s3_integrator_relation_exists: + if not self._s3_integrator_relation_exists: logger.error("Backup failed: missing relation with S3 integrator charm") event.fail("Missing relation with S3 integrator charm") return @@ -263,12 +268,13 @@ def _on_create_backup(self, event: ActionEvent) -> None: # Test uploading metadata to S3 to test credentials before backup juju_version = JujuVersion.from_environ() - metadata = f"""Date Backup Requested: {datetime_backup_requested} -Model Name: {self.model.name} -Application Name: {self.model.app.name} -Unit Name: {self.charm.unit.name} -Juju Version: {str(juju_version)} -""" + metadata = ( + f"Date Backup Requested: {datetime_backup_requested}\n" + f"Model Name: {self.model.name}\n" + f"Application Name: {self.model.app.name}\n" + f"Unit Name: {self.charm.unit.name}\n" + f"Juju Version: {str(juju_version)}\n" + ) if not upload_content_to_s3(metadata, f"{backup_path}.metadata", s3_parameters): logger.error("Backup failed: Failed to upload metadata to provided S3") @@ -313,6 +319,7 @@ def _on_create_backup(self, event: ActionEvent) -> None: "backup-id": datetime_backup_requested, } ) + self.charm._on_update_status(None) def _can_unit_perform_backup(self) -> Tuple[bool, Optional[str]]: """Validates whether this unit can perform a backup. @@ -380,8 +387,9 @@ def _backup(self, backup_path: str, s3_parameters: Dict) -> Tuple[bool, Optional Returns: tuple of (success, error_message) """ try: + self.charm.unit.status = MaintenanceStatus("Running backup...") logger.info("Running the xtrabackup commands") - stdout = self.charm._mysql.execute_backup_commands( + stdout, _ = self.charm._mysql.execute_backup_commands( backup_path, s3_parameters, ) @@ -436,7 +444,7 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: Returns: a boolean indicating whether restore should be run """ - if not self.charm.s3_integrator_relation_exists: + if not self._s3_integrator_relation_exists: error_message = "Missing relation with S3 integrator charm" logger.error(f"Restore failed: {error_message}") event.fail(error_message) @@ -481,7 +489,7 @@ def _on_restore(self, event: ActionEvent) -> None: if not self._pre_restore_checks(event): return - backup_id = event.params.get("backup-id").strip().strip("/") + backup_id = event.params["backup-id"].strip().strip("/") logger.info(f"A restore with backup-id {backup_id} has been requested on unit") # Retrieve and validate missing S3 parameters @@ -500,6 +508,7 @@ def _on_restore(self, event: ActionEvent) -> None: return # Run operations to prepare for the restore + self.charm.unit.status = MaintenanceStatus("Running pre-restore operations") success, error_message = self._pre_restore() if not success: logger.error(f"Restore failed: {error_message}") @@ -520,6 +529,7 @@ def _on_restore(self, event: ActionEvent) -> None: return # Run post-restore operations + self.charm.unit.status = MaintenanceStatus("Running post-restore operations") success, error_message = self._post_restore() if not success: logger.error(f"Restore failed: {error_message}") @@ -533,22 +543,29 @@ def _on_restore(self, event: ActionEvent) -> None: "completed": "ok", } ) + # update status as soon as possible + self.charm._on_update_status(None) - def _pre_restore(self) -> Tuple[bool, Optional[str]]: + def _pre_restore(self) -> Tuple[bool, str]: """Perform operations that need to be done before performing a restore. Returns: tuple of (success, error_message) """ + if not self.charm._mysql.is_mysqld_running(): + return True, "" + try: + logger.info("Stopping mysqld before restoring the backup") + self.charm._mysql.kill_client_sessions() self.charm._mysql.stop_mysqld() + except MySQLKillSessionError: + return False, "Failed to kill client sessions" except MySQLStopMySQLDError: return False, "Failed to stop mysqld" - return True, None + return True, "" - def _restore( - self, backup_id: str, s3_parameters: Dict[str, str] - ) -> Tuple[bool, bool, Optional[str]]: + def _restore(self, backup_id: str, s3_parameters: Dict[str, str]) -> Tuple[bool, bool, str]: """Run the restore operations. Args: @@ -558,18 +575,21 @@ def _restore( Returns: tuple of (success, recoverable_error, error_message) """ try: - logger.info("Running xbcloud get commands to retrieve the backup") + logger.info( + "Running xbcloud get commands to retrieve the backup\n" + "This operation can take long time depending on backup size and network speed" + ) + self.charm.unit.status = MaintenanceStatus("Downloading backup...") stdout, stderr, backup_location = self.charm._mysql.retrieve_backup_with_xbcloud( backup_id, s3_parameters, ) - logger.debug(f"Stdout of xbcloud get commands: {stdout}") - logger.debug(f"Stderr of xbcloud get commands: {stderr}") except MySQLRetrieveBackupWithXBCloudError: return False, True, f"Failed to retrieve backup {backup_id}" try: logger.info("Preparing retrieved backup using xtrabackup prepare") + self.charm.unit.status = MaintenanceStatus("Preparing for restore backup...") stdout, stderr = self.charm._mysql.prepare_backup_for_restore(backup_location) logger.debug(f"Stdout of xtrabackup prepare command: {stdout}") logger.debug(f"Stderr of xtrabackup prepare command: {stderr}") @@ -583,6 +603,7 @@ def _restore( return False, False, "Failed to empty the data directory" try: + self.charm.unit.status = MaintenanceStatus("Restoring backup...") logger.info("Restoring the backup") stdout, stderr = self.charm._mysql.restore_backup(backup_location) logger.debug(f"Stdout of xtrabackup move-back command: {stdout}") @@ -590,9 +611,9 @@ def _restore( except MySQLRestoreBackupError: return False, False, f"Failed to restore backup {backup_id}" - return True, True, None + return True, True, "" - def _clean_data_dir_and_start_mysqld(self) -> Tuple[bool, Optional[str]]: + def _clean_data_dir_and_start_mysqld(self) -> Tuple[bool, str]: """Run idempotent operations run after restoring a backup. Returns tuple of (success, error_message) @@ -613,9 +634,9 @@ def _clean_data_dir_and_start_mysqld(self) -> Tuple[bool, Optional[str]]: except MySQLStartMySQLDError: return False, "Failed to start mysqld" - return True, None + return True, "" - def _post_restore(self) -> Tuple[bool, Optional[str]]: + def _post_restore(self) -> Tuple[bool, str]: """Run operations required after restoring a backup. Returns: tuple of (success, error_message) @@ -638,24 +659,18 @@ def _post_restore(self) -> Tuple[bool, Optional[str]]: logger.info("Creating cluster on restored node") unit_label = self.charm.unit.name.replace("/", "-") self.charm._mysql.create_cluster(unit_label) + self.charm._mysql.create_cluster_set() self.charm._mysql.initialize_juju_units_operations_table() self.charm._mysql.rescan_cluster() - logger.info("Retrieving instance cluster state and role") - state, role = self.charm._mysql.get_member_state() except MySQLCreateClusterError: return False, "Failed to create InnoDB cluster on restored instance" + except MySQLCreateClusterSetError: + return False, "Failed to create InnoDB cluster-set on restored instance" except MySQLInitializeJujuOperationsTableError: return False, "Failed to initialize the juju operations table" except MySQLRescanClusterError: return False, "Failed to rescan the cluster" - except MySQLGetMemberStateError: - return False, "Failed to retrieve member state in restored instance" - self.charm.unit_peer_data["member-role"] = role - self.charm.unit_peer_data["member-state"] = state - - self.charm.unit.status = ActiveStatus() - - return True, None + return True, "" diff --git a/lib/charms/mysql/v0/mysql.py b/lib/charms/mysql/v0/mysql.py index dba87decf..f1f165393 100644 --- a/lib/charms/mysql/v0/mysql.py +++ b/lib/charms/mysql/v0/mysql.py @@ -130,7 +130,7 @@ def wait_until_mysql_connection(self) -> None: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 60 +LIBPATCH = 61 UNIT_TEARDOWN_LOCKNAME = "unit-teardown" UNIT_ADD_LOCKNAME = "unit-add" @@ -2736,50 +2736,50 @@ def execute_backup_commands( group: Optional[str] = None, ) -> Tuple[str, str]: """Executes commands to create a backup with the given args.""" - nproc_command = "nproc".split() + nproc_command = ["nproc"] make_temp_dir_command = f"mktemp --directory {tmp_base_directory}/xtra_backup_XXXX".split() try: nproc, _ = self._execute_commands(nproc_command) tmp_dir, _ = self._execute_commands(make_temp_dir_command, user=user, group=group) - except MySQLExecError as e: + except MySQLExecError: logger.exception("Failed to execute commands prior to running backup") - raise MySQLExecuteBackupCommandsError(e.message) - except Exception as e: + raise MySQLExecuteBackupCommandsError + except Exception: # Catch all other exceptions to prevent the database being stuck in # a bad state due to pre-backup operations logger.exception("Failed to execute commands prior to running backup") - raise MySQLExecuteBackupCommandsError(e) + raise MySQLExecuteBackupCommandsError # TODO: remove flags --no-server-version-check # when MySQL and XtraBackup versions are in sync - xtrabackup_commands = f""" -{xtrabackup_location} --defaults-file={defaults_config_file} - --defaults-group=mysqld - --no-version-check - --parallel={nproc} - --user={self.backups_user} - --password={self.backups_password} - --socket={mysqld_socket_file} - --lock-ddl - --backup - --stream=xbstream - --xtrabackup-plugin-dir={xtrabackup_plugin_dir} - --target-dir={tmp_dir} - --no-server-version-check - | {xbcloud_location} put - --curl-retriable-errors=7 - --insecure - --parallel=10 - --md5 - --storage=S3 - --s3-region={s3_parameters["region"]} - --s3-bucket={s3_parameters["bucket"]} - --s3-endpoint={s3_parameters["endpoint"]} - --s3-api-version={s3_parameters["s3-api-version"]} - --s3-bucket-lookup={s3_parameters["s3-uri-style"]} - {s3_path} -""".split() + xtrabackup_commands = [ + f"{xtrabackup_location} --defaults-file={defaults_config_file}", + "--defaults-group=mysqld", + "--no-version-check", + f"--parallel={nproc}", + f"--user={self.backups_user}", + f"--password={self.backups_password}", + f"--socket={mysqld_socket_file}", + "--lock-ddl", + "--backup", + "--stream=xbstream", + f"--xtrabackup-plugin-dir={xtrabackup_plugin_dir}", + f"--target-dir={tmp_dir}", + "--no-server-version-check", + f"| {xbcloud_location} put", + "--curl-retriable-errors=7", + "--insecure", + "--parallel=10", + "--md5", + "--storage=S3", + f"--s3-region={s3_parameters['region']}", + f"--s3-bucket={s3_parameters['bucket']}", + f"--s3-endpoint={s3_parameters['endpoint']}", + f"--s3-api-version={s3_parameters['s3-api-version']}", + f"--s3-bucket-lookup={s3_parameters['s3-uri-style']}", + f"{s3_path}", + ] try: logger.debug( @@ -2796,15 +2796,16 @@ def execute_backup_commands( "ACCESS_KEY_ID": s3_parameters["access-key"], "SECRET_ACCESS_KEY": s3_parameters["secret-key"], }, + stream_output="stderr", ) except MySQLExecError as e: logger.exception("Failed to execute backup commands") raise MySQLExecuteBackupCommandsError(e.message) - except Exception as e: + except Exception: # Catch all other exceptions to prevent the database being stuck in # a bad state due to pre-backup operations logger.exception("Failed to execute backup commands") - raise MySQLExecuteBackupCommandsError(e) + raise MySQLExecuteBackupCommandsError def delete_temp_backup_directory( self, @@ -2848,7 +2849,7 @@ def retrieve_backup_with_xbcloud( mysql container. This temp dir is supposed to be on the same volume as the mysql data directory to reduce latency for IOPS. """ - nproc_command = "nproc".split() + nproc_command = ["nproc"] make_temp_dir_command = ( f"mktemp --directory {temp_restore_directory}/#mysql_sst_XXXX".split() ) @@ -2865,23 +2866,23 @@ def retrieve_backup_with_xbcloud( logger.exception("Failed to execute commands prior to running xbcloud get") raise MySQLRetrieveBackupWithXBCloudError(e.message) - retrieve_backup_command = f""" -{xbcloud_location} get - --curl-retriable-errors=7 - --parallel=10 - --storage=S3 - --s3-region={s3_parameters["region"]} - --s3-bucket={s3_parameters["bucket"]} - --s3-endpoint={s3_parameters["endpoint"]} - --s3-bucket-lookup={s3_parameters["s3-uri-style"]} - --s3-api-version={s3_parameters["s3-api-version"]} - {s3_parameters["path"]}/{backup_id} - | {xbstream_location} - --decompress - -x - -C {tmp_dir} - --parallel={nproc} -""".split() + retrieve_backup_command = [ + f"{xbcloud_location} get", + "--curl-retriable-errors=7", + "--parallel=10", + "--storage=S3", + f"--s3-region={s3_parameters['region']}", + f"--s3-bucket={s3_parameters['bucket']}", + f"--s3-endpoint={s3_parameters['endpoint']}", + f"--s3-bucket-lookup={s3_parameters['s3-uri-style']}", + f"--s3-api-version={s3_parameters['s3-api-version']}", + f"{s3_parameters['path']}/{backup_id}", + f"| {xbstream_location}", + "--decompress", + "-x", + f"-C {tmp_dir}", + f"--parallel={nproc}", + ] try: logger.debug(f"Command to retrieve backup: {' '.join(retrieve_backup_command)}") @@ -2896,6 +2897,7 @@ def retrieve_backup_with_xbcloud( }, user=user, group=group, + stream_output="stderr", ) return (stdout, stderr, tmp_dir) except MySQLExecError as e: @@ -2921,14 +2923,15 @@ def prepare_backup_for_restore( except MySQLGetAutoTunningParametersError as e: raise MySQLPrepareBackupForRestoreError(e.message) - prepare_backup_command = f""" -{xtrabackup_location} --prepare - --use-memory={innodb_buffer_pool_size} - --no-version-check - --rollback-prepared-trx - --xtrabackup-plugin-dir={xtrabackup_plugin_dir} - --target-dir={backup_location} -""".split() + prepare_backup_command = [ + xtrabackup_location, + "--prepare", + f"--use-memory={innodb_buffer_pool_size}", + "--no-version-check", + "--rollback-prepared-trx", + f"--xtrabackup-plugin-dir={xtrabackup_plugin_dir}", + f"--target-dir={backup_location}", + ] try: logger.debug( @@ -2954,7 +2957,17 @@ def empty_data_files( group=None, ) -> None: """Empty the mysql data directory in preparation of backup restore.""" - empty_data_files_command = f"find {mysql_data_directory} -not -path {mysql_data_directory}/#mysql_sst_* -not -path {mysql_data_directory} -delete".split() + empty_data_files_command = [ + "find", + mysql_data_directory, + "-not", + "-path", + f"{mysql_data_directory}/#mysql_sst_*", + "-not", + "-path", + mysql_data_directory, + "-delete", + ] try: logger.debug(f"Command to empty data directory: {' '.join(empty_data_files_command)}") @@ -2966,9 +2979,9 @@ def empty_data_files( except MySQLExecError as e: logger.exception("Failed to empty data directory in prep for backup restore") raise MySQLEmptyDataDirectoryError(e.message) - except Exception as e: + except Exception: logger.exception("Failed to empty data directory in prep for backup restore") - raise MySQLEmptyDataDirectoryError(e) + raise MySQLEmptyDataDirectoryError def restore_backup( self, @@ -2981,16 +2994,17 @@ def restore_backup( group=None, ) -> Tuple[str, str]: """Restore the provided prepared backup.""" - restore_backup_command = f""" -{xtrabackup_location} --defaults-file={defaults_config_file} - --defaults-group=mysqld - --datadir={mysql_data_directory} - --no-version-check - --move-back - --force-non-empty-directories - --xtrabackup-plugin-dir={xtrabackup_plugin_directory} - --target-dir={backup_location} -""".split() + restore_backup_command = [ + xtrabackup_location, + f"--defaults-file={defaults_config_file}", + "--defaults-group=mysqld", + f"--datadir={mysql_data_directory}", + "--no-version-check", + "--move-back", + "--force-non-empty-directories", + f"--xtrabackup-plugin-dir={xtrabackup_plugin_directory}", + f"--target-dir={backup_location}", + ] try: logger.debug(f"Command to restore backup: {' '.join(restore_backup_command)}") @@ -3003,9 +3017,9 @@ def restore_backup( except MySQLExecError as e: logger.exception("Failed to restore backup") raise MySQLRestoreBackupError(e.message) - except Exception as e: + except Exception: logger.exception("Failed to restore backup") - raise MySQLRestoreBackupError(e) + raise MySQLRestoreBackupError def delete_temp_restore_directory( self, @@ -3015,7 +3029,13 @@ def delete_temp_restore_directory( ) -> None: """Delete the temp restore directory from the mysql data directory.""" logger.info(f"Deleting temp restore directory in {temp_restore_directory}") - delete_temp_restore_directory_command = f"find {temp_restore_directory} -wholename {temp_restore_directory}/#mysql_sst_* -delete".split() + delete_temp_restore_directory_command = [ + "find", + temp_restore_directory, + "-wholename", + f"{temp_restore_directory}/#mysql_sst_*", + "-delete", + ] try: logger.debug( @@ -3037,7 +3057,8 @@ def _execute_commands( bash: bool = False, user: Optional[str] = None, group: Optional[str] = None, - env_extra: Dict = None, + env_extra: Dict = {}, + stream_output: Optional[str] = None, ) -> Tuple[str, str]: """Execute commands on the server where MySQL is running.""" raise NotImplementedError @@ -3078,7 +3099,10 @@ def tls_setup( raise MySQLTLSSetupError("Failed to set custom TLS configuration") def kill_unencrypted_sessions(self) -> None: - """Kill non local, non system open unencrypted connections.""" + """Kill non local, non system open unencrypted connections. + + Raises: MySQLKillSessionError if there is an issue killing the sessions. + """ kill_connections_command = ( f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", ( @@ -3097,6 +3121,29 @@ def kill_unencrypted_sessions(self) -> None: logger.exception("Failed to kill external sessions") raise MySQLKillSessionError + def kill_client_sessions(self) -> None: + """Kill non local, non system open unencrypted connections. + + Raises: MySQLKillSessionError if there is an issue killing the sessions. + """ + kill_connections_command = ( + f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + ( + 'processes = session.run_sql("' + "SELECT processlist_id FROM performance_schema.threads WHERE " + "type = 'FOREGROUND' and connection_type is not NULL and processlist_id != CONNECTION_ID();" + '")' + ), + "process_id_list = [id[0] for id in processes.fetch_all()]", + 'for process_id in process_id_list:\n session.run_sql(f"KILL CONNECTION {process_id}")', + ) + + try: + self._run_mysqlsh_script("\n".join(kill_connections_command)) + except MySQLClientError: + logger.exception("Failed to kill external sessions") + raise MySQLKillSessionError + def check_mysqlsh_connection(self) -> bool: """Checks if it is possible to connect to the server with mysqlsh.""" connect_commands = ( diff --git a/src/charm.py b/src/charm.py index 9ba797dbc..3a4d6deec 100755 --- a/src/charm.py +++ b/src/charm.py @@ -19,7 +19,7 @@ MySQLAsyncReplicationConsumer, MySQLAsyncReplicationOffer, ) -from charms.mysql.v0.backups import MySQLBackups +from charms.mysql.v0.backups import S3_INTEGRATOR_RELATION_NAME, MySQLBackups from charms.mysql.v0.mysql import ( BYTES_1MB, Error, @@ -83,7 +83,6 @@ PASSWORD_LENGTH, PEER, ROOT_PASSWORD_KEY, - S3_INTEGRATOR_RELATION_NAME, SERVER_CONFIG_PASSWORD_KEY, SERVER_CONFIG_USERNAME, TRACING_PROTOCOL, @@ -587,11 +586,6 @@ def _has_blocked_status(self) -> bool: """Returns whether the unit is in a blocked state.""" return isinstance(self.unit.status, BlockedStatus) - @property - def s3_integrator_relation_exists(self) -> bool: - """Returns whether a relation with the s3 integrator exists.""" - return bool(self.model.get_relation(S3_INTEGRATOR_RELATION_NAME)) - @property def unit_fqdn(self) -> str: """Returns the unit's FQDN.""" diff --git a/src/constants.py b/src/constants.py index 1bae53f3b..872fbaf13 100644 --- a/src/constants.py +++ b/src/constants.py @@ -38,7 +38,6 @@ MYSQLD_CUSTOM_CONFIG_FILE = f"{MYSQLD_CONFIG_DIRECTORY}/z-custom-mysqld.cnf" MYSQL_SYSTEM_USER = "snap_daemon" MYSQL_DATA_DIR = f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/lib/mysql" -S3_INTEGRATOR_RELATION_NAME = "s3-parameters" CHARMED_MYSQL_XTRABACKUP_LOCATION = "/snap/bin/charmed-mysql.xtrabackup" CHARMED_MYSQL_XBCLOUD_LOCATION = "/snap/bin/charmed-mysql.xbcloud" CHARMED_MYSQL_XBSTREAM_LOCATION = "/snap/bin/charmed-mysql.xbstream" diff --git a/src/flush_mysql_logs.py b/src/flush_mysql_logs.py index 562bc1d80..588d75ce8 100644 --- a/src/flush_mysql_logs.py +++ b/src/flush_mysql_logs.py @@ -42,6 +42,15 @@ def __init__(self, charm: "MySQLOperatorCharm"): def _flush_mysql_logs(self, _) -> None: """Flush the specified (via LOGS_TYPE env var) mysql logs.""" + if ( + self.charm.peers is None + or self.charm.unit_peer_data.get("unit-initialized") != "True" + or not self.charm.upgrade.idle + or not self.charm._mysql.is_mysqld_running() + ): + # skip when not initialized, during an upgrade, or when mysqld is not running + return + logs_type = os.environ.get("LOGS_TYPE", "") try: diff --git a/src/mysql_vm_helpers.py b/src/mysql_vm_helpers.py index 4aa2d8688..df1fb616f 100644 --- a/src/mysql_vm_helpers.py +++ b/src/mysql_vm_helpers.py @@ -20,6 +20,7 @@ MySQLExecError, MySQLGetAutoTunningParametersError, MySQLGetAvailableMemoryError, + MySQLKillSessionError, MySQLRestoreBackupError, MySQLServiceNotRunningError, MySQLStartMySQLDError, @@ -169,7 +170,7 @@ def install_and_configure_mysql_dependencies() -> None: logger.debug( f"Installing {CHARMED_MYSQL_SNAP_NAME} revision {CHARMED_MYSQL_SNAP_REVISION}" ) - charmed_mysql.ensure(snap.SnapState.Present, revision=CHARMED_MYSQL_SNAP_REVISION) + charmed_mysql.ensure(snap.SnapState.Present, revision=str(CHARMED_MYSQL_SNAP_REVISION)) if not charmed_mysql.held: # hold the snap in charm determined revision charmed_mysql.hold() @@ -460,9 +461,9 @@ def restore_backup( capture_output=True, text=True, ) - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError: logger.exception("Failed to change data directory permissions before restoring") - raise MySQLRestoreBackupError(e) + raise MySQLRestoreBackupError stdout, stderr = super().restore_backup( backup_location, @@ -495,11 +496,11 @@ def restore_backup( capture_output=True, text=True, ) - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError: logger.exception( "Failed to change data directory permissions or ownershp after restoring" ) - raise MySQLRestoreBackupError(e) + raise MySQLRestoreBackupError return (stdout, stderr) @@ -517,7 +518,8 @@ def _execute_commands( bash: bool = False, user: str = None, group: str = None, - env_extra: Dict = None, + env_extra: Dict = {}, + stream_output: Optional[str] = None, ) -> Tuple[str, str]: """Execute commands on the server where mysql is running. @@ -527,31 +529,54 @@ def _execute_commands( user: the user with which to execute the commands group: the group with which to execute the commands env_extra: the environment variables to add to the current process’ environment + stream_output: whether to stream the output to stdout, stderr or None Returns: tuple of (stdout, stderr) Raises: MySQLExecError if there was an error executing the commands """ + stdout = stderr = "" env = os.environ.copy() if env_extra: env.update(env_extra) - try: - if bash: - commands = ["bash", "-c", "set -o pipefail; " + " ".join(commands)] - - process = subprocess.run( - commands, - user=user, - group=group, - env=env, - capture_output=True, - check=True, - encoding="utf-8", + if bash: + commands = ["bash", "-c", "set -o pipefail; " + " ".join(commands)] + + process = subprocess.Popen( + commands, + user=user, + group=group, + env=env, + encoding="utf-8", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + if stream_output == "stderr": + while process.stderr and (line := process.stderr.readline()): + logger.debug(line.strip()) + stderr += line + elif stream_output == "stdout": + while process.stdout and (line := process.stdout.readline()): + logger.debug(line.strip()) + stdout += line + + return_code = process.wait() + if return_code != 0: + message = ( + "Failed command: " + f"{' '.join(commands).replace(self.backups_password, 'xxxxxxx')};" + f" {user=}; {group=}" ) - return (process.stdout.strip(), process.stderr.strip()) - except subprocess.CalledProcessError as e: - logger.debug(f"Failed command: {commands}; user={user}; group={group}") - raise MySQLExecError(e.stderr) + logger.debug(message) + raise MySQLExecError(message) + + if not stdout and process.stdout: + stdout = process.stdout.read() + if not stderr and process.stderr: + stderr = process.stderr.read() + + return (stdout.strip(), stderr.strip()) def is_mysqld_running(self) -> bool: """Returns whether mysqld is running.""" @@ -570,7 +595,7 @@ def stop_mysqld(self) -> None: try: snap_service_operation(CHARMED_MYSQL_SNAP_NAME, CHARMED_MYSQLD_SERVICE, "stop") - except SnapServiceOperationError as e: + except (SnapServiceOperationError, MySQLKillSessionError) as e: raise MySQLStopMySQLDError(e.message) def start_mysqld(self) -> None: @@ -685,8 +710,8 @@ def _run_mysqlsh_script(self, script: str, timeout=None) -> str: return subprocess.check_output( command, stderr=subprocess.PIPE, timeout=timeout ).decode("utf-8") - except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: - raise MySQLClientError(e.stderr) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + raise MySQLClientError def _run_mysqlcli_script( self, script: str, user: str = "root", password: str = None, timeout: Optional[int] = None diff --git a/tests/integration/test_backup_aws.py b/tests/integration/test_backup_aws.py new file mode 100644 index 000000000..a75b6fa2a --- /dev/null +++ b/tests/integration/test_backup_aws.py @@ -0,0 +1,377 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +import uuid +from pathlib import Path + +import boto3 +import pytest +from pytest_operator.plugin import OpsTest + +from . import juju_ +from .helpers import ( + execute_queries_on_unit, + get_primary_unit_wrapper, + get_server_config_credentials, + get_unit_ip, + rotate_credentials, + scale_application, +) +from .high_availability.high_availability_helpers import ( + deploy_and_scale_mysql, + insert_data_into_mysql_and_validate_replication, +) + +logger = logging.getLogger(__name__) + +S3_INTEGRATOR = "s3-integrator" +S3_INTEGRATOR_CHANNEL = "latest/stable" +TIMEOUT = 10 * 60 +CLUSTER_ADMIN_USER = "clusteradmin" +CLUSTER_ADMIN_PASSWORD = "clusteradminpassword" +SERVER_CONFIG_USER = "serverconfig" +SERVER_CONFIG_PASSWORD = "serverconfigpassword" +ROOT_USER = "root" +ROOT_PASSWORD = "rootpassword" +DATABASE_NAME = "backup-database" +TABLE_NAME = "backup-table" + +backup_id, value_before_backup, value_after_backup = "", None, None + + +@pytest.fixture(scope="session") +def cloud_credentials(github_secrets) -> dict[str, str]: + """Read cloud credentials.""" + return { + "access-key": github_secrets["AWS_ACCESS_KEY"], + "secret-key": github_secrets["AWS_SECRET_KEY"], + } + + +@pytest.fixture(scope="session") +def cloud_configs(): + # Add UUID to path to avoid conflict with tests running in parallel (e.g. multiple Juju + # versions on a PR, multiple PRs) + return { + "endpoint": "https://s3.amazonaws.com", + "bucket": "data-charms-testing", + "path": f"mysql/{uuid.uuid4()}", + "region": "us-east-1", + } + + +@pytest.fixture(scope="session", autouse=True) +def clean_backups_from_buckets(cloud_configs, cloud_credentials): + """Teardown to clean up created backups from clouds.""" + yield + + logger.info("Cleaning backups from cloud buckets") + session = boto3.session.Session( # pyright: ignore + aws_access_key_id=cloud_credentials["access-key"], + aws_secret_access_key=cloud_credentials["secret-key"], + region_name=cloud_configs["region"], + ) + s3 = session.resource("s3", endpoint_url=cloud_configs["endpoint"]) + bucket = s3.Bucket(cloud_configs["bucket"]) + + # GCS doesn't support batch delete operation, so delete the objects one by one + backup_path = str(Path(cloud_configs["path"]) / backup_id) + for bucket_object in bucket.objects.filter(Prefix=backup_path): + bucket_object.delete() + + +@pytest.mark.group(1) +async def test_build_and_deploy(ops_test: OpsTest, mysql_charm_series: str) -> None: + """Simple test to ensure that the mysql charm gets deployed.""" + mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) + + primary_mysql = await get_primary_unit_wrapper(ops_test, mysql_application_name) + + logger.info("Rotating all mysql credentials") + + await rotate_credentials( + primary_mysql, username=CLUSTER_ADMIN_USER, password=CLUSTER_ADMIN_PASSWORD + ) + await rotate_credentials( + primary_mysql, username=SERVER_CONFIG_USER, password=SERVER_CONFIG_PASSWORD + ) + await rotate_credentials(primary_mysql, username=ROOT_USER, password=ROOT_PASSWORD) + + logger.info("Deploying s3 integrator") + + await ops_test.model.deploy(S3_INTEGRATOR, channel=S3_INTEGRATOR_CHANNEL) + await ops_test.model.relate(mysql_application_name, S3_INTEGRATOR) + + await ops_test.model.wait_for_idle( + apps=[S3_INTEGRATOR], + status="blocked", + raise_on_blocked=False, + timeout=TIMEOUT, + ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_backup( + ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials +) -> None: + """Test to create a backup and list backups.""" + mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) + + global backup_id, value_before_backup, value_after_backup + + zeroth_unit = ops_test.model.units[f"{mysql_application_name}/0"] + assert zeroth_unit + + primary_unit = await get_primary_unit_wrapper(ops_test, mysql_application_name) + non_primary_units = [ + unit + for unit in ops_test.model.applications[mysql_application_name].units + if unit.name != primary_unit.name + ] + + # insert data into cluster before + logger.info("Inserting value before backup") + value_before_backup = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + ) + + # set the s3 config and credentials + logger.info("Syncing credentials") + + await ops_test.model.applications[S3_INTEGRATOR].set_config(cloud_configs) + await juju_.run_action( + ops_test.model.units[f"{S3_INTEGRATOR}/0"], # pyright: ignore + "sync-s3-credentials", + **cloud_credentials, + ) + + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # list backups + logger.info("Listing existing backup ids") + + results = await juju_.run_action(zeroth_unit, "list-backups") + output = results["backups"] + backup_ids = [line.split("|")[0].strip() for line in output.split("\n")[2:]] + + # create backup + logger.info("Creating backup") + + results = await juju_.run_action(non_primary_units[0], "create-backup", **{"--wait": "5m"}) + backup_id = results["backup-id"] + + # list backups again and ensure new backup id exists + logger.info("Listing backup ids post backup") + + results = await juju_.run_action(zeroth_unit, "list-backups") + output = results["backups"] + new_backup_ids = [line.split("|")[0].strip() for line in output.split("\n")[2:]] + + assert sorted(new_backup_ids) == sorted(backup_ids + [backup_id]) + + # insert data into cluster after backup + logger.info("Inserting value after backup") + value_after_backup = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_restore_on_same_cluster( + ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials +) -> None: + """Test to restore a backup to the same mysql cluster.""" + mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) + + logger.info("Scaling mysql application to 1 unit") + async with ops_test.fast_forward(): + await scale_application(ops_test, mysql_application_name, 1) + + mysql_unit = ops_test.model.units[f"{mysql_application_name}/0"] + assert mysql_unit + mysql_unit_address = await get_unit_ip(ops_test, mysql_unit.name) + server_config_credentials = await get_server_config_credentials(mysql_unit) + + select_values_sql = [f"SELECT id FROM `{DATABASE_NAME}`.`{TABLE_NAME}`"] + + # set the s3 config and credentials + logger.info("Syncing credentials") + + await ops_test.model.applications[S3_INTEGRATOR].set_config(cloud_configs) + await juju_.run_action( + ops_test.model.units[f"{S3_INTEGRATOR}/0"], # pyright: ignore + "sync-s3-credentials", + **cloud_credentials, + ) + + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # restore the backup + logger.info(f"Restoring backup {backup_id=}") + + await juju_.run_action(mysql_unit, action_name="restore", **{"backup-id": backup_id}) + + # ensure the correct inserted values exist + logger.info( + "Ensuring that the pre-backup inserted value exists in database, while post-backup inserted value does not" + ) + + values = await execute_queries_on_unit( + mysql_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert values == [value_before_backup] + + # insert data into cluster after restore + logger.info("Inserting value after restore") + value_after_restore = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + ) + + logger.info("Ensuring that pre-backup and post-restore values exist in the database") + + values = await execute_queries_on_unit( + mysql_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert value_before_backup + assert sorted(values) == sorted([value_before_backup, value_after_restore]) + + logger.info("Scaling mysql application to 3 units") + await scale_application(ops_test, mysql_application_name, 3) + + logger.info("Ensuring inserted values before backup and after restore exist on all units") + for unit in ops_test.model.applications[mysql_application_name].units: + unit_address = await get_unit_ip(ops_test, unit.name) + + values = await execute_queries_on_unit( + unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + + assert sorted(values) == sorted([value_before_backup, value_after_restore]) + + # scale down the cluster to preserve resources for the following tests + await scale_application(ops_test, mysql_application_name, 0) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_restore_on_new_cluster( + ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials +) -> None: + """Test to restore a backup on a new mysql cluster.""" + logger.info("Deploying a new mysql cluster") + + new_mysql_application_name = await deploy_and_scale_mysql( + ops_test, + mysql_charm_series, + check_for_existing_application=False, + mysql_application_name="another-mysql", + num_units=1, + ) + + # relate to S3 integrator + await ops_test.model.relate(new_mysql_application_name, S3_INTEGRATOR) + + await ops_test.model.wait_for_idle( + apps=[new_mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # rotate all credentials + logger.info("Rotating all mysql credentials") + + primary_mysql = ops_test.model.units[f"{new_mysql_application_name}/0"] + assert primary_mysql + primary_unit_address = await get_unit_ip(ops_test, primary_mysql.name) + + await rotate_credentials( + primary_mysql, username=CLUSTER_ADMIN_USER, password=CLUSTER_ADMIN_PASSWORD + ) + await rotate_credentials( + primary_mysql, username=SERVER_CONFIG_USER, password=SERVER_CONFIG_PASSWORD + ) + await rotate_credentials(primary_mysql, username=ROOT_PASSWORD, password=ROOT_PASSWORD) + + server_config_credentials = await get_server_config_credentials(primary_mysql) + select_values_sql = [f"SELECT id FROM `{DATABASE_NAME}`.`{TABLE_NAME}`"] + + # set the s3 config and credentials + logger.info("Syncing credentials") + + await ops_test.model.applications[S3_INTEGRATOR].set_config(cloud_configs) + await juju_.run_action( + ops_test.model.units[f"{S3_INTEGRATOR}/0"], # pyright: ignore + "sync-s3-credentials", + **cloud_credentials, + ) + + await ops_test.model.wait_for_idle( + apps=[new_mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # restore the backup + logger.info(f"Restoring {backup_id=}") + + await juju_.run_action(primary_mysql, action_name="restore", **{"backup-id": backup_id}) + + # ensure the correct inserted values exist + logger.info( + "Ensuring that the pre-backup inserted value exists in database, while post-backup inserted value does not" + ) + + values = await execute_queries_on_unit( + primary_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert values == [value_before_backup] + + # insert data into cluster after restore + logger.info("Inserting value after restore") + value_after_restore = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + mysql_application_substring="another-mysql", + ) + + logger.info("Ensuring that pre-backup and post-restore values exist in the database") + + values = await execute_queries_on_unit( + primary_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert value_before_backup + assert sorted(values) == sorted([value_before_backup, value_after_restore]) diff --git a/tests/integration/test_backup_ceph.py b/tests/integration/test_backup_ceph.py new file mode 100644 index 000000000..a0a0bb622 --- /dev/null +++ b/tests/integration/test_backup_ceph.py @@ -0,0 +1,377 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +import socket +from pathlib import Path + +import boto3 +import pytest +import pytest_microceph +from pytest_operator.plugin import OpsTest + +from . import juju_ +from .helpers import ( + execute_queries_on_unit, + get_primary_unit_wrapper, + get_server_config_credentials, + get_unit_ip, + rotate_credentials, + scale_application, +) +from .high_availability.high_availability_helpers import ( + deploy_and_scale_mysql, + insert_data_into_mysql_and_validate_replication, +) + +logger = logging.getLogger(__name__) + +S3_INTEGRATOR = "s3-integrator" +S3_INTEGRATOR_CHANNEL = "latest/stable" +TIMEOUT = 10 * 60 +CLUSTER_ADMIN_USER = "clusteradmin" +CLUSTER_ADMIN_PASSWORD = "clusteradminpassword" +SERVER_CONFIG_USER = "serverconfig" +SERVER_CONFIG_PASSWORD = "serverconfigpassword" +ROOT_USER = "root" +ROOT_PASSWORD = "rootpassword" +DATABASE_NAME = "backup-database" +TABLE_NAME = "backup-table" + +backup_id, value_before_backup, value_after_backup = "", None, None + + +@pytest.fixture(scope="session") +def cloud_credentials(microceph: pytest_microceph.ConnectionInformation) -> dict[str, str]: + """Read cloud credentials.""" + return { + "access-key": microceph.access_key_id, + "secret-key": microceph.secret_access_key, + } + + +@pytest.fixture(scope="session") +def cloud_configs(microceph: pytest_microceph.ConnectionInformation): + host_ip = socket.gethostbyname(socket.gethostname()) + return { + "endpoint": f"http://{host_ip}", + "bucket": microceph.bucket, + "path": "mysql", + "region": "", + } + + +@pytest.fixture(scope="session", autouse=True) +def clean_backups_from_buckets(cloud_configs, cloud_credentials): + """Teardown to clean up created backups from clouds.""" + yield + + logger.info("Cleaning backups from cloud buckets") + session = boto3.session.Session( # pyright: ignore + aws_access_key_id=cloud_credentials["access-key"], + aws_secret_access_key=cloud_credentials["secret-key"], + region_name=cloud_configs["region"], + ) + s3 = session.resource("s3", endpoint_url=cloud_configs["endpoint"]) + bucket = s3.Bucket(cloud_configs["bucket"]) + + # GCS doesn't support batch delete operation, so delete the objects one by one + backup_path = str(Path(cloud_configs["path"]) / backup_id) + for bucket_object in bucket.objects.filter(Prefix=backup_path): + bucket_object.delete() + + +@pytest.mark.group(1) +async def test_build_and_deploy(ops_test: OpsTest, mysql_charm_series: str) -> None: + """Simple test to ensure that the mysql charm gets deployed.""" + mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) + + primary_mysql = await get_primary_unit_wrapper(ops_test, mysql_application_name) + + logger.info("Rotating all mysql credentials") + + await rotate_credentials( + primary_mysql, username=CLUSTER_ADMIN_USER, password=CLUSTER_ADMIN_PASSWORD + ) + await rotate_credentials( + primary_mysql, username=SERVER_CONFIG_USER, password=SERVER_CONFIG_PASSWORD + ) + await rotate_credentials(primary_mysql, username=ROOT_USER, password=ROOT_PASSWORD) + + logger.info("Deploying s3 integrator") + + await ops_test.model.deploy(S3_INTEGRATOR, channel=S3_INTEGRATOR_CHANNEL) + await ops_test.model.relate(mysql_application_name, S3_INTEGRATOR) + + await ops_test.model.wait_for_idle( + apps=[S3_INTEGRATOR], + status="blocked", + raise_on_blocked=False, + timeout=TIMEOUT, + ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_backup( + ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials +) -> None: + """Test to create a backup and list backups.""" + mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) + + global backup_id, value_before_backup, value_after_backup + + zeroth_unit = ops_test.model.units[f"{mysql_application_name}/0"] + assert zeroth_unit + + primary_unit = await get_primary_unit_wrapper(ops_test, mysql_application_name) + non_primary_units = [ + unit + for unit in ops_test.model.applications[mysql_application_name].units + if unit.name != primary_unit.name + ] + + # insert data into cluster before + logger.info("Inserting value before backup") + value_before_backup = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + ) + + # set the s3 config and credentials + logger.info("Syncing credentials") + + await ops_test.model.applications[S3_INTEGRATOR].set_config(cloud_configs) + await juju_.run_action( + ops_test.model.units[f"{S3_INTEGRATOR}/0"], # pyright: ignore + "sync-s3-credentials", + **cloud_credentials, + ) + + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # list backups + logger.info("Listing existing backup ids") + + results = await juju_.run_action(zeroth_unit, "list-backups") + output = results["backups"] + backup_ids = [line.split("|")[0].strip() for line in output.split("\n")[2:]] + + # create backup + logger.info("Creating backup") + + results = await juju_.run_action(non_primary_units[0], "create-backup", **{"--wait": "5m"}) + backup_id = results["backup-id"] + + # list backups again and ensure new backup id exists + logger.info("Listing backup ids post backup") + + results = await juju_.run_action(zeroth_unit, "list-backups") + output = results["backups"] + new_backup_ids = [line.split("|")[0].strip() for line in output.split("\n")[2:]] + + assert sorted(new_backup_ids) == sorted(backup_ids + [backup_id]) + + # insert data into cluster after backup + logger.info("Inserting value after backup") + value_after_backup = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_restore_on_same_cluster( + ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials +) -> None: + """Test to restore a backup to the same mysql cluster.""" + mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) + + logger.info("Scaling mysql application to 1 unit") + async with ops_test.fast_forward(): + await scale_application(ops_test, mysql_application_name, 1) + + mysql_unit = ops_test.model.units[f"{mysql_application_name}/0"] + assert mysql_unit + mysql_unit_address = await get_unit_ip(ops_test, mysql_unit.name) + server_config_credentials = await get_server_config_credentials(mysql_unit) + + select_values_sql = [f"SELECT id FROM `{DATABASE_NAME}`.`{TABLE_NAME}`"] + + # set the s3 config and credentials + logger.info("Syncing credentials") + + await ops_test.model.applications[S3_INTEGRATOR].set_config(cloud_configs) + await juju_.run_action( + ops_test.model.units[f"{S3_INTEGRATOR}/0"], # pyright: ignore + "sync-s3-credentials", + **cloud_credentials, + ) + + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # restore the backup + logger.info(f"Restoring backup {backup_id=}") + + await juju_.run_action(mysql_unit, action_name="restore", **{"backup-id": backup_id}) + + # ensure the correct inserted values exist + logger.info( + "Ensuring that the pre-backup inserted value exists in database, while post-backup inserted value does not" + ) + + values = await execute_queries_on_unit( + mysql_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert values == [value_before_backup] + + # insert data into cluster after restore + logger.info("Inserting value after restore") + value_after_restore = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + ) + + logger.info("Ensuring that pre-backup and post-restore values exist in the database") + + values = await execute_queries_on_unit( + mysql_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert value_before_backup + assert sorted(values) == sorted([value_before_backup, value_after_restore]) + + logger.info("Scaling mysql application to 3 units") + await scale_application(ops_test, mysql_application_name, 3) + + logger.info("Ensuring inserted values before backup and after restore exist on all units") + for unit in ops_test.model.applications[mysql_application_name].units: + unit_address = await get_unit_ip(ops_test, unit.name) + + values = await execute_queries_on_unit( + unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + + assert sorted(values) == sorted([value_before_backup, value_after_restore]) + + # scale down the cluster to preserve resources for the following tests + await scale_application(ops_test, mysql_application_name, 0) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_restore_on_new_cluster( + ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials +) -> None: + """Test to restore a backup on a new mysql cluster.""" + logger.info("Deploying a new mysql cluster") + + new_mysql_application_name = await deploy_and_scale_mysql( + ops_test, + mysql_charm_series, + check_for_existing_application=False, + mysql_application_name="another-mysql", + num_units=1, + ) + + # relate to S3 integrator + await ops_test.model.relate(new_mysql_application_name, S3_INTEGRATOR) + + await ops_test.model.wait_for_idle( + apps=[new_mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # rotate all credentials + logger.info("Rotating all mysql credentials") + + primary_mysql = ops_test.model.units[f"{new_mysql_application_name}/0"] + assert primary_mysql + primary_unit_address = await get_unit_ip(ops_test, primary_mysql.name) + + await rotate_credentials( + primary_mysql, username=CLUSTER_ADMIN_USER, password=CLUSTER_ADMIN_PASSWORD + ) + await rotate_credentials( + primary_mysql, username=SERVER_CONFIG_USER, password=SERVER_CONFIG_PASSWORD + ) + await rotate_credentials(primary_mysql, username=ROOT_PASSWORD, password=ROOT_PASSWORD) + + server_config_credentials = await get_server_config_credentials(primary_mysql) + select_values_sql = [f"SELECT id FROM `{DATABASE_NAME}`.`{TABLE_NAME}`"] + + # set the s3 config and credentials + logger.info("Syncing credentials") + + await ops_test.model.applications[S3_INTEGRATOR].set_config(cloud_configs) + await juju_.run_action( + ops_test.model.units[f"{S3_INTEGRATOR}/0"], # pyright: ignore + "sync-s3-credentials", + **cloud_credentials, + ) + + await ops_test.model.wait_for_idle( + apps=[new_mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # restore the backup + logger.info(f"Restoring {backup_id=}") + + await juju_.run_action(primary_mysql, action_name="restore", **{"backup-id": backup_id}) + + # ensure the correct inserted values exist + logger.info( + "Ensuring that the pre-backup inserted value exists in database, while post-backup inserted value does not" + ) + + values = await execute_queries_on_unit( + primary_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert values == [value_before_backup] + + # insert data into cluster after restore + logger.info("Inserting value after restore") + value_after_restore = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + mysql_application_substring="another-mysql", + ) + + logger.info("Ensuring that pre-backup and post-restore values exist in the database") + + values = await execute_queries_on_unit( + primary_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert value_before_backup + assert sorted(values) == sorted([value_before_backup, value_after_restore]) diff --git a/tests/integration/test_backup_gcp.py b/tests/integration/test_backup_gcp.py new file mode 100644 index 000000000..2538401fa --- /dev/null +++ b/tests/integration/test_backup_gcp.py @@ -0,0 +1,377 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +import uuid +from pathlib import Path + +import boto3 +import pytest +from pytest_operator.plugin import OpsTest + +from . import juju_ +from .helpers import ( + execute_queries_on_unit, + get_primary_unit_wrapper, + get_server_config_credentials, + get_unit_ip, + rotate_credentials, + scale_application, +) +from .high_availability.high_availability_helpers import ( + deploy_and_scale_mysql, + insert_data_into_mysql_and_validate_replication, +) + +logger = logging.getLogger(__name__) + +S3_INTEGRATOR = "s3-integrator" +S3_INTEGRATOR_CHANNEL = "latest/stable" +TIMEOUT = 10 * 60 +CLUSTER_ADMIN_USER = "clusteradmin" +CLUSTER_ADMIN_PASSWORD = "clusteradminpassword" +SERVER_CONFIG_USER = "serverconfig" +SERVER_CONFIG_PASSWORD = "serverconfigpassword" +ROOT_USER = "root" +ROOT_PASSWORD = "rootpassword" +DATABASE_NAME = "backup-database" +TABLE_NAME = "backup-table" + +backup_id, value_before_backup, value_after_backup = "", None, None + + +@pytest.fixture(scope="session") +def cloud_credentials(github_secrets) -> dict[str, str]: + """Read cloud credentials.""" + return { + "access-key": github_secrets["GCP_ACCESS_KEY"], + "secret-key": github_secrets["GCP_SECRET_KEY"], + } + + +@pytest.fixture(scope="session") +def cloud_configs(): + # Add UUID to path to avoid conflict with tests running in parallel (e.g. multiple Juju + # versions on a PR, multiple PRs) + return { + "endpoint": "https://storage.googleapis.com", + "bucket": "data-charms-testing", + "path": f"mysql/{uuid.uuid4()}", + "region": "", + } + + +@pytest.fixture(scope="session", autouse=True) +def clean_backups_from_buckets(cloud_configs, cloud_credentials): + """Teardown to clean up created backups from clouds.""" + yield + + logger.info("Cleaning backups from cloud buckets") + session = boto3.session.Session( # pyright: ignore + aws_access_key_id=cloud_credentials["access-key"], + aws_secret_access_key=cloud_credentials["secret-key"], + region_name=cloud_configs["region"], + ) + s3 = session.resource("s3", endpoint_url=cloud_configs["endpoint"]) + bucket = s3.Bucket(cloud_configs["bucket"]) + + # GCS doesn't support batch delete operation, so delete the objects one by one + backup_path = str(Path(cloud_configs["path"]) / backup_id) + for bucket_object in bucket.objects.filter(Prefix=backup_path): + bucket_object.delete() + + +@pytest.mark.group(1) +async def test_build_and_deploy(ops_test: OpsTest, mysql_charm_series: str) -> None: + """Simple test to ensure that the mysql charm gets deployed.""" + mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) + + primary_mysql = await get_primary_unit_wrapper(ops_test, mysql_application_name) + + logger.info("Rotating all mysql credentials") + + await rotate_credentials( + primary_mysql, username=CLUSTER_ADMIN_USER, password=CLUSTER_ADMIN_PASSWORD + ) + await rotate_credentials( + primary_mysql, username=SERVER_CONFIG_USER, password=SERVER_CONFIG_PASSWORD + ) + await rotate_credentials(primary_mysql, username=ROOT_USER, password=ROOT_PASSWORD) + + logger.info("Deploying s3 integrator") + + await ops_test.model.deploy(S3_INTEGRATOR, channel=S3_INTEGRATOR_CHANNEL) + await ops_test.model.relate(mysql_application_name, S3_INTEGRATOR) + + await ops_test.model.wait_for_idle( + apps=[S3_INTEGRATOR], + status="blocked", + raise_on_blocked=False, + timeout=TIMEOUT, + ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_backup( + ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials +) -> None: + """Test to create a backup and list backups.""" + mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) + + global backup_id, value_before_backup, value_after_backup + + zeroth_unit = ops_test.model.units[f"{mysql_application_name}/0"] + assert zeroth_unit + + primary_unit = await get_primary_unit_wrapper(ops_test, mysql_application_name) + non_primary_units = [ + unit + for unit in ops_test.model.applications[mysql_application_name].units + if unit.name != primary_unit.name + ] + + # insert data into cluster before + logger.info("Inserting value before backup") + value_before_backup = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + ) + + # set the s3 config and credentials + logger.info("Syncing credentials") + + await ops_test.model.applications[S3_INTEGRATOR].set_config(cloud_configs) + await juju_.run_action( + ops_test.model.units[f"{S3_INTEGRATOR}/0"], # pyright: ignore + "sync-s3-credentials", + **cloud_credentials, + ) + + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # list backups + logger.info("Listing existing backup ids") + + results = await juju_.run_action(zeroth_unit, "list-backups") + output = results["backups"] + backup_ids = [line.split("|")[0].strip() for line in output.split("\n")[2:]] + + # create backup + logger.info("Creating backup") + + results = await juju_.run_action(non_primary_units[0], "create-backup", **{"--wait": "5m"}) + backup_id = results["backup-id"] + + # list backups again and ensure new backup id exists + logger.info("Listing backup ids post backup") + + results = await juju_.run_action(zeroth_unit, "list-backups") + output = results["backups"] + new_backup_ids = [line.split("|")[0].strip() for line in output.split("\n")[2:]] + + assert sorted(new_backup_ids) == sorted(backup_ids + [backup_id]) + + # insert data into cluster after backup + logger.info("Inserting value after backup") + value_after_backup = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + ) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_restore_on_same_cluster( + ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials +) -> None: + """Test to restore a backup to the same mysql cluster.""" + mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) + + logger.info("Scaling mysql application to 1 unit") + async with ops_test.fast_forward(): + await scale_application(ops_test, mysql_application_name, 1) + + mysql_unit = ops_test.model.units[f"{mysql_application_name}/0"] + assert mysql_unit + mysql_unit_address = await get_unit_ip(ops_test, mysql_unit.name) + server_config_credentials = await get_server_config_credentials(mysql_unit) + + select_values_sql = [f"SELECT id FROM `{DATABASE_NAME}`.`{TABLE_NAME}`"] + + # set the s3 config and credentials + logger.info("Syncing credentials") + + await ops_test.model.applications[S3_INTEGRATOR].set_config(cloud_configs) + await juju_.run_action( + ops_test.model.units[f"{S3_INTEGRATOR}/0"], # pyright: ignore + "sync-s3-credentials", + **cloud_credentials, + ) + + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # restore the backup + logger.info(f"Restoring backup {backup_id=}") + + await juju_.run_action(mysql_unit, action_name="restore", **{"backup-id": backup_id}) + + # ensure the correct inserted values exist + logger.info( + "Ensuring that the pre-backup inserted value exists in database, while post-backup inserted value does not" + ) + + values = await execute_queries_on_unit( + mysql_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert values == [value_before_backup] + + # insert data into cluster after restore + logger.info("Inserting value after restore") + value_after_restore = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + ) + + logger.info("Ensuring that pre-backup and post-restore values exist in the database") + + values = await execute_queries_on_unit( + mysql_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert value_before_backup + assert sorted(values) == sorted([value_before_backup, value_after_restore]) + + logger.info("Scaling mysql application to 3 units") + await scale_application(ops_test, mysql_application_name, 3) + + logger.info("Ensuring inserted values before backup and after restore exist on all units") + for unit in ops_test.model.applications[mysql_application_name].units: + unit_address = await get_unit_ip(ops_test, unit.name) + + values = await execute_queries_on_unit( + unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + + assert sorted(values) == sorted([value_before_backup, value_after_restore]) + + # scale down the cluster to preserve resources for the following tests + await scale_application(ops_test, mysql_application_name, 0) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_restore_on_new_cluster( + ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials +) -> None: + """Test to restore a backup on a new mysql cluster.""" + logger.info("Deploying a new mysql cluster") + + new_mysql_application_name = await deploy_and_scale_mysql( + ops_test, + mysql_charm_series, + check_for_existing_application=False, + mysql_application_name="another-mysql", + num_units=1, + ) + + # relate to S3 integrator + await ops_test.model.relate(new_mysql_application_name, S3_INTEGRATOR) + + await ops_test.model.wait_for_idle( + apps=[new_mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # rotate all credentials + logger.info("Rotating all mysql credentials") + + primary_mysql = ops_test.model.units[f"{new_mysql_application_name}/0"] + assert primary_mysql + primary_unit_address = await get_unit_ip(ops_test, primary_mysql.name) + + await rotate_credentials( + primary_mysql, username=CLUSTER_ADMIN_USER, password=CLUSTER_ADMIN_PASSWORD + ) + await rotate_credentials( + primary_mysql, username=SERVER_CONFIG_USER, password=SERVER_CONFIG_PASSWORD + ) + await rotate_credentials(primary_mysql, username=ROOT_PASSWORD, password=ROOT_PASSWORD) + + server_config_credentials = await get_server_config_credentials(primary_mysql) + select_values_sql = [f"SELECT id FROM `{DATABASE_NAME}`.`{TABLE_NAME}`"] + + # set the s3 config and credentials + logger.info("Syncing credentials") + + await ops_test.model.applications[S3_INTEGRATOR].set_config(cloud_configs) + await juju_.run_action( + ops_test.model.units[f"{S3_INTEGRATOR}/0"], # pyright: ignore + "sync-s3-credentials", + **cloud_credentials, + ) + + await ops_test.model.wait_for_idle( + apps=[new_mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + # restore the backup + logger.info(f"Restoring {backup_id=}") + + await juju_.run_action(primary_mysql, action_name="restore", **{"backup-id": backup_id}) + + # ensure the correct inserted values exist + logger.info( + "Ensuring that the pre-backup inserted value exists in database, while post-backup inserted value does not" + ) + + values = await execute_queries_on_unit( + primary_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert values == [value_before_backup] + + # insert data into cluster after restore + logger.info("Inserting value after restore") + value_after_restore = await insert_data_into_mysql_and_validate_replication( + ops_test, + DATABASE_NAME, + TABLE_NAME, + mysql_application_substring="another-mysql", + ) + + logger.info("Ensuring that pre-backup and post-restore values exist in the database") + + values = await execute_queries_on_unit( + primary_unit_address, + server_config_credentials["username"], + server_config_credentials["password"], + select_values_sql, + ) + assert value_before_backup + assert sorted(values) == sorted([value_before_backup, value_after_restore]) diff --git a/tests/integration/test_backups.py b/tests/integration/test_backups.py deleted file mode 100644 index 13ca76b46..000000000 --- a/tests/integration/test_backups.py +++ /dev/null @@ -1,439 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -import logging -import socket -import uuid -from pathlib import Path - -import boto3 -import pytest -import pytest_microceph -from pytest_operator.plugin import OpsTest - -from . import juju_ -from .helpers import ( - execute_queries_on_unit, - get_primary_unit_wrapper, - get_server_config_credentials, - get_unit_ip, - rotate_credentials, - scale_application, -) -from .high_availability.high_availability_helpers import ( - deploy_and_scale_mysql, - insert_data_into_mysql_and_validate_replication, -) - -logger = logging.getLogger(__name__) - -S3_INTEGRATOR = "s3-integrator" -S3_INTEGRATOR_CHANNEL = "latest/stable" -TIMEOUT = 10 * 60 -CLUSTER_ADMIN_USER = "clusteradmin" -CLUSTER_ADMIN_PASSWORD = "clusteradminpassword" -SERVER_CONFIG_USER = "serverconfig" -SERVER_CONFIG_PASSWORD = "serverconfigpassword" -ROOT_USER = "root" -ROOT_PASSWORD = "rootpassword" -DATABASE_NAME = "backup-database" -TABLE_NAME = "backup-table" - -backups_by_cloud = {} -value_before_backup, value_after_backup = None, None - - -@pytest.fixture(scope="session") -def cloud_configs(microceph: pytest_microceph.ConnectionInformation): - # Add UUID to path to avoid conflict with tests running in parallel (e.g. multiple Juju - # versions on a PR, multiple PRs) - path = f"mysql/{uuid.uuid4()}" - - host_ip = socket.gethostbyname(socket.gethostname()) - return { - "aws": { - "endpoint": "https://s3.amazonaws.com", - "bucket": "data-charms-testing", - "path": path, - "region": "us-east-1", - }, - "gcp": { - "endpoint": "https://storage.googleapis.com", - "bucket": "data-charms-testing", - "path": path, - "region": "", - }, - "ceph": { - "endpoint": f"http://{host_ip}", - "bucket": microceph.bucket, - "path": path, - "region": "", - }, - } - - -@pytest.fixture(scope="session") -def cloud_credentials( - github_secrets, microceph: pytest_microceph.ConnectionInformation -) -> dict[str, dict[str, str]]: - """Read cloud credentials.""" - return { - "aws": { - "access-key": github_secrets["AWS_ACCESS_KEY"], - "secret-key": github_secrets["AWS_SECRET_KEY"], - }, - "gcp": { - "access-key": github_secrets["GCP_ACCESS_KEY"], - "secret-key": github_secrets["GCP_SECRET_KEY"], - }, - "ceph": { - "access-key": microceph.access_key_id, - "secret-key": microceph.secret_access_key, - }, - } - - -@pytest.fixture(scope="session", autouse=True) -def clean_backups_from_buckets(cloud_configs, cloud_credentials) -> None: - """Teardown to clean up created backups from clouds.""" - yield - - logger.info("Cleaning backups from cloud buckets") - for cloud_name, config in cloud_configs.items(): - backup = backups_by_cloud.get(cloud_name) - - if not backup: - continue - - session = boto3.session.Session( - aws_access_key_id=cloud_credentials[cloud_name]["access-key"], - aws_secret_access_key=cloud_credentials[cloud_name]["secret-key"], - region_name=config["region"], - ) - s3 = session.resource("s3", endpoint_url=config["endpoint"]) - bucket = s3.Bucket(config["bucket"]) - - # GCS doesn't support batch delete operation, so delete the objects one by one - backup_path = str(Path(config["path"]) / backups_by_cloud[cloud_name]) - for bucket_object in bucket.objects.filter(Prefix=backup_path): - bucket_object.delete() - - -@pytest.mark.group(1) -async def test_build_and_deploy(ops_test: OpsTest, mysql_charm_series: str) -> None: - """Simple test to ensure that the mysql charm gets deployed.""" - mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) - - primary_mysql = await get_primary_unit_wrapper(ops_test, mysql_application_name) - - logger.info("Rotating all mysql credentials") - - await rotate_credentials( - primary_mysql, username=CLUSTER_ADMIN_USER, password=CLUSTER_ADMIN_PASSWORD - ) - await rotate_credentials( - primary_mysql, username=SERVER_CONFIG_USER, password=SERVER_CONFIG_PASSWORD - ) - await rotate_credentials(primary_mysql, username=ROOT_USER, password=ROOT_PASSWORD) - - logger.info("Deploying s3 integrator") - - await ops_test.model.deploy(S3_INTEGRATOR, channel=S3_INTEGRATOR_CHANNEL) - await ops_test.model.relate(mysql_application_name, S3_INTEGRATOR) - - await ops_test.model.wait_for_idle( - apps=[S3_INTEGRATOR], - status="blocked", - raise_on_blocked=False, - timeout=TIMEOUT, - ) - - -@pytest.mark.group(1) -@pytest.mark.abort_on_fail -async def test_backup( - ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials -) -> None: - """Test to create a backup and list backups.""" - mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) - - global backups_by_cloud, value_before_backup, value_after_backup - - zeroth_unit = ops_test.model.units[f"{mysql_application_name}/0"] - - primary_unit = await get_primary_unit_wrapper(ops_test, mysql_application_name) - non_primary_units = [ - unit - for unit in ops_test.model.applications[mysql_application_name].units - if unit.name != primary_unit.name - ] - - # insert data into cluster before - logger.info("Inserting value before backup") - value_before_backup = await insert_data_into_mysql_and_validate_replication( - ops_test, - DATABASE_NAME, - TABLE_NAME, - ) - - for cloud_name, config in cloud_configs.items(): - # set the s3 config and credentials - logger.info(f"Syncing credentials for {cloud_name}") - - await ops_test.model.applications[S3_INTEGRATOR].set_config(config) - await juju_.run_action( - ops_test.model.units[f"{S3_INTEGRATOR}/0"], - "sync-s3-credentials", - **cloud_credentials[cloud_name], - ) - - await ops_test.model.wait_for_idle( - apps=[mysql_application_name, S3_INTEGRATOR], - status="active", - timeout=TIMEOUT, - ) - - # list backups - logger.info("Listing existing backup ids") - - results = await juju_.run_action(zeroth_unit, "list-backups") - output = results["backups"] - backup_ids = [line.split("|")[0].strip() for line in output.split("\n")[2:]] - - # create backup - logger.info("Creating backup") - - results = await juju_.run_action(non_primary_units[0], "create-backup") - backup_id = results["backup-id"] - - # list backups again and ensure new backup id exists - logger.info("Listing backup ids post backup") - - results = await juju_.run_action(zeroth_unit, "list-backups") - output = results["backups"] - new_backup_ids = [line.split("|")[0].strip() for line in output.split("\n")[2:]] - - assert sorted(new_backup_ids) == sorted(backup_ids + [backup_id]) - - backups_by_cloud[cloud_name] = backup_id - - # insert data into cluster after backup - logger.info("Inserting value after backup") - value_after_backup = await insert_data_into_mysql_and_validate_replication( - ops_test, - DATABASE_NAME, - TABLE_NAME, - ) - - -@pytest.mark.group(1) -@pytest.mark.abort_on_fail -async def test_restore_on_same_cluster( - ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials -) -> None: - """Test to restore a backup to the same mysql cluster.""" - mysql_application_name = await deploy_and_scale_mysql(ops_test, mysql_charm_series) - - logger.info("Scaling mysql application to 1 unit") - async with ops_test.fast_forward(): - await scale_application(ops_test, mysql_application_name, 1) - - mysql_unit = ops_test.model.units[f"{mysql_application_name}/0"] - mysql_unit_address = await get_unit_ip(ops_test, mysql_unit.name) - server_config_credentials = await get_server_config_credentials(mysql_unit) - - select_values_sql = [f"SELECT id FROM `{DATABASE_NAME}`.`{TABLE_NAME}`"] - - for cloud_name, config in cloud_configs.items(): - assert backups_by_cloud[cloud_name] - - # set the s3 config and credentials - logger.info(f"Syncing credentials for {cloud_name}") - - await ops_test.model.applications[S3_INTEGRATOR].set_config(config) - await juju_.run_action( - ops_test.model.units[f"{S3_INTEGRATOR}/0"], - "sync-s3-credentials", - **cloud_credentials[cloud_name], - ) - - await ops_test.model.wait_for_idle( - apps=[mysql_application_name, S3_INTEGRATOR], - status="active", - timeout=TIMEOUT, - ) - - # restore the backup - logger.info(f"Restoring backup with id {backups_by_cloud[cloud_name]}") - - await juju_.run_action( - mysql_unit, action_name="restore", **{"backup-id": backups_by_cloud[cloud_name]} - ) - - # ensure the correct inserted values exist - logger.info( - "Ensuring that the pre-backup inserted value exists in database, while post-backup inserted value does not" - ) - - values = await execute_queries_on_unit( - mysql_unit_address, - server_config_credentials["username"], - server_config_credentials["password"], - select_values_sql, - ) - assert values == [value_before_backup] - - # insert data into cluster after restore - logger.info(f"Inserting value after restore from {cloud_name}") - value_after_restore = await insert_data_into_mysql_and_validate_replication( - ops_test, - DATABASE_NAME, - TABLE_NAME, - ) - - logger.info("Ensuring that pre-backup and post-restore values exist in the database") - - values = await execute_queries_on_unit( - mysql_unit_address, - server_config_credentials["username"], - server_config_credentials["password"], - select_values_sql, - ) - assert sorted(values) == sorted([value_before_backup, value_after_restore]) - - logger.info("Scaling mysql application to 3 units") - await scale_application(ops_test, mysql_application_name, 3) - - logger.info("Ensuring inserted values before backup and after restore exist on all units") - for unit in ops_test.model.applications[mysql_application_name].units: - unit_address = await get_unit_ip(ops_test, unit.name) - - values = await execute_queries_on_unit( - unit_address, - server_config_credentials["username"], - server_config_credentials["password"], - select_values_sql, - ) - - assert sorted(values) == sorted([value_before_backup, value_after_restore]) - - # scale down the cluster to preserve resources for the following tests - await scale_application(ops_test, mysql_application_name, 0) - - -@pytest.mark.group(1) -@pytest.mark.abort_on_fail -async def test_restore_on_new_cluster( - ops_test: OpsTest, mysql_charm_series: str, cloud_configs, cloud_credentials -) -> None: - """Test to restore a backup on a new mysql cluster.""" - logger.info("Deploying a new mysql cluster") - - new_mysql_application_name = await deploy_and_scale_mysql( - ops_test, - mysql_charm_series, - check_for_existing_application=False, - mysql_application_name="another-mysql", - num_units=1, - ) - - # relate to S3 integrator - await ops_test.model.relate(new_mysql_application_name, S3_INTEGRATOR) - - await ops_test.model.wait_for_idle( - apps=[new_mysql_application_name, S3_INTEGRATOR], - status="active", - timeout=TIMEOUT, - ) - - # rotate all credentials - logger.info("Rotating all mysql credentials") - - primary_mysql = ops_test.model.units[f"{new_mysql_application_name}/0"] - primary_unit_address = await get_unit_ip(ops_test, primary_mysql.name) - - await rotate_credentials( - primary_mysql, username="clusteradmin", password=CLUSTER_ADMIN_PASSWORD - ) - await rotate_credentials( - primary_mysql, username="serverconfig", password=SERVER_CONFIG_PASSWORD - ) - await rotate_credentials(primary_mysql, username="root", password=ROOT_PASSWORD) - - server_config_credentials = await get_server_config_credentials(primary_mysql) - select_values_sql = [f"SELECT id FROM `{DATABASE_NAME}`.`{TABLE_NAME}`"] - - for cloud_name, config in cloud_configs.items(): - assert backups_by_cloud[cloud_name] - - # set the s3 config and credentials - logger.info(f"Syncing credentials for {cloud_name}") - - await ops_test.model.applications[S3_INTEGRATOR].set_config(config) - await juju_.run_action( - ops_test.model.units[f"{S3_INTEGRATOR}/0"], - "sync-s3-credentials", - **cloud_credentials[cloud_name], - ) - - await ops_test.model.wait_for_idle( - apps=[new_mysql_application_name, S3_INTEGRATOR], - status="active", - timeout=TIMEOUT, - ) - - # restore the backup - logger.info(f"Restoring backup with id {backups_by_cloud[cloud_name]}") - - await juju_.run_action( - primary_mysql, action_name="restore", **{"backup-id": backups_by_cloud[cloud_name]} - ) - - # ensure the correct inserted values exist - logger.info( - "Ensuring that the pre-backup inserted value exists in database, while post-backup inserted value does not" - ) - - values = await execute_queries_on_unit( - primary_unit_address, - server_config_credentials["username"], - server_config_credentials["password"], - select_values_sql, - ) - assert values == [value_before_backup] - - # insert data into cluster after restore - logger.info(f"Inserting value after restore from {cloud_name}") - value_after_restore = await insert_data_into_mysql_and_validate_replication( - ops_test, - DATABASE_NAME, - TABLE_NAME, - mysql_application_substring="another-mysql", - ) - - logger.info("Ensuring that pre-backup and post-restore values exist in the database") - - values = await execute_queries_on_unit( - primary_unit_address, - server_config_credentials["username"], - server_config_credentials["password"], - select_values_sql, - ) - assert sorted(values) == sorted([value_before_backup, value_after_restore]) - - logger.info("Scaling mysql application to 3 units") - await scale_application(ops_test, new_mysql_application_name, 3) - - logger.info("Ensuring inserted values before backup and after restore exist on all units") - for unit in ops_test.model.applications[new_mysql_application_name].units: - unit_address = await get_unit_ip(ops_test, unit.name) - - values = await execute_queries_on_unit( - unit_address, - server_config_credentials["username"], - server_config_credentials["password"], - select_values_sql, - ) - - assert sorted(values) == sorted([value_before_backup, value_after_restore]) diff --git a/tests/unit/test_backups.py b/tests/unit/test_backups.py index b6c2239ca..d6ae06ddb 100644 --- a/tests/unit/test_backups.py +++ b/tests/unit/test_backups.py @@ -28,7 +28,7 @@ from ops.testing import Harness from charm import MySQLOperatorCharm -from constants import S3_INTEGRATOR_RELATION_NAME +from lib.charms.mysql.v0.backups import S3_INTEGRATOR_RELATION_NAME from .helpers import patch_network_get @@ -430,7 +430,7 @@ def test_pre_backup_failure( self.assertEqual(error_message, "Error setting instance option tag:_hidden") @patch_network_get(private_address="1.1.1.1") - @patch("mysql_vm_helpers.MySQL.execute_backup_commands", return_value="stdout") + @patch("mysql_vm_helpers.MySQL.execute_backup_commands", return_value=("stdout", "stderr")) @patch("charms.mysql.v0.backups.MySQLBackups._upload_logs_to_s3") def test_backup( self, @@ -451,7 +451,7 @@ def test_backup( _upload_logs_to_s3.assert_called_once_with("stdout", "", "/path.backup.log", s3_params) @patch_network_get(private_address="1.1.1.1") - @patch("mysql_vm_helpers.MySQL.execute_backup_commands", return_value="stdout") + @patch("mysql_vm_helpers.MySQL.execute_backup_commands", return_value=("stdout", "stderr")) @patch("charms.mysql.v0.backups.MySQLBackups._upload_logs_to_s3") def test_backup_failure( self, @@ -670,7 +670,6 @@ def test_on_restore_failure( # test failure of recoverable _restore() _restore.return_value = (False, True, "restore error") - self.charm.unit.status = ActiveStatus() event = MagicMock() params_mock = {} @@ -682,13 +681,11 @@ def test_on_restore_failure( event.set_results.assert_not_called() event.fail.assert_called_once_with("restore error") _clean_data_dir_and_start_mysqld.assert_called_once() - self.assertTrue(isinstance(self.charm.unit.status, ActiveStatus)) _clean_data_dir_and_start_mysqld.reset_mock() # test failure of unrecoverable _restore() _restore.return_value = (False, False, "restore error") - self.charm.unit.status = ActiveStatus() event = MagicMock() params_mock = {} @@ -750,17 +747,24 @@ def test_on_restore_failure( event.fail.assert_not_called() @patch_network_get(private_address="1.1.1.1") + @patch("mysql_vm_helpers.MySQL.is_mysqld_running", return_value=True) + @patch("mysql_vm_helpers.MySQL.kill_client_sessions") @patch("mysql_vm_helpers.MySQL.stop_mysqld") - def test_pre_restore(self, _stop_mysqld): + def test_pre_restore(self, _stop_mysqld, _kill_client_sessions, _mysqld_running): """Test _pre_restore().""" success, error = self.mysql_backups._pre_restore() self.assertTrue(success) - self.assertIsNone(error) + self.assertEqual(error, "") + _mysqld_running.assert_called_once() + _stop_mysqld.assert_called_once() + _kill_client_sessions.assert_called_once() @patch_network_get(private_address="1.1.1.1") + @patch("mysql_vm_helpers.MySQL.is_mysqld_running", return_value=True) + @patch("mysql_vm_helpers.MySQL.kill_client_sessions") @patch("mysql_vm_helpers.MySQL.stop_mysqld") - def test_pre_restore_failure(self, _stop_mysqld): + def test_pre_restore_failure(self, _stop_mysqld, _kill_client_sessions, _mysqld_running): """Test failure of _pre_restore().""" _stop_mysqld.side_effect = MySQLStopMySQLDError() @@ -796,7 +800,7 @@ def test_restore( self.assertTrue(success) self.assertTrue(recoverable) - self.assertIsNone(error) + self.assertEqual(error, "") @patch_network_get(private_address="1.1.1.1") @patch( @@ -863,7 +867,7 @@ def test_clean_data_dir_and_start_mysqld(self, ___, __, _): success, error = self.mysql_backups._clean_data_dir_and_start_mysqld() self.assertTrue(success) - self.assertIsNone(error) + self.assertEqual(error, "") @patch_network_get(private_address="1.1.1.1") @patch("mysql_vm_helpers.MySQL.start_mysqld") @@ -901,37 +905,33 @@ def test_clean_data_dir_and_start_mysqld_failure( ) @patch("mysql_vm_helpers.MySQL.configure_instance") @patch("mysql_vm_helpers.MySQL.wait_until_mysql_connection") + @patch("mysql_vm_helpers.MySQL.create_cluster_set") @patch("mysql_vm_helpers.MySQL.create_cluster") @patch("mysql_vm_helpers.MySQL.initialize_juju_units_operations_table") @patch("mysql_vm_helpers.MySQL.rescan_cluster") - @patch("mysql_vm_helpers.MySQL.get_member_state", return_value=("online", "primary")) def test_post_restore( self, - _get_member_state, _rescan_cluster, _initialize_juju_units_operations_table, _create_cluster, + _create_cluster_set, _wait_until_mysql_connection, _configure_instance, _clean_data_dir_and_start_mysqld, ): """Test _post_restore().""" - self.charm.unit.status = MaintenanceStatus() - success, error_message = self.mysql_backups._post_restore() self.assertTrue(success) - self.assertIsNone(error_message) + self.assertEqual(error_message, "") _clean_data_dir_and_start_mysqld.assert_called_once() _configure_instance.assert_called_once_with(create_cluster_admin=False) _wait_until_mysql_connection.assert_called_once() _create_cluster.assert_called_once() + _create_cluster_set.assert_called_once() _initialize_juju_units_operations_table.assert_called_once() _rescan_cluster.assert_called_once() - _get_member_state.assert_called_once() - - self.assertTrue(isinstance(self.charm.unit.status, ActiveStatus)) @patch_network_get(private_address="1.1.1.1") @patch( @@ -940,16 +940,16 @@ def test_post_restore( ) @patch("mysql_vm_helpers.MySQL.configure_instance") @patch("mysql_vm_helpers.MySQL.wait_until_mysql_connection") + @patch("mysql_vm_helpers.MySQL.create_cluster_set") @patch("mysql_vm_helpers.MySQL.create_cluster") @patch("mysql_vm_helpers.MySQL.initialize_juju_units_operations_table") @patch("mysql_vm_helpers.MySQL.rescan_cluster") - @patch("mysql_vm_helpers.MySQL.get_member_state", return_value=("online", "primary")) def test_post_restore_failure( self, - _get_member_state, _rescan_cluster, _initialize_juju_units_operations_table, _create_cluster, + _create_cluster_set, _wait_until_mysql_connection, _configure_instance, _clean_data_dir_and_start_mysqld, @@ -957,14 +957,6 @@ def test_post_restore_failure( """Test failure of _post_restore().""" self.charm.unit.status = MaintenanceStatus() - # test failure of get_member_state() - _get_member_state.side_effect = MySQLGetMemberStateError() - - success, error_message = self.mysql_backups._post_restore() - self.assertFalse(success) - self.assertEqual(error_message, "Failed to retrieve member state in restored instance") - self.assertTrue(isinstance(self.charm.unit.status, MaintenanceStatus)) - # test failure of rescan_cluster() _rescan_cluster.side_effect = MySQLRescanClusterError() diff --git a/tests/unit/test_mysql.py b/tests/unit/test_mysql.py index 3024f0955..23fb20665 100644 --- a/tests/unit/test_mysql.py +++ b/tests/unit/test_mysql.py @@ -1094,33 +1094,33 @@ def test_execute_backup_commands(self, _execute_commands): _expected_tmp_dir_commands = ( "mktemp --directory /tmp/base/directory/xtra_backup_XXXX".split() ) - _expected_xtrabackup_commands = """ -/xtrabackup/location --defaults-file=/defaults/file.cnf - --defaults-group=mysqld - --no-version-check - --parallel=16 - --user=backups - --password=backupspassword - --socket=/mysqld/socket/file.sock - --lock-ddl - --backup - --stream=xbstream - --xtrabackup-plugin-dir=/xtrabackup/plugin/dir - --target-dir=/tmp/base/directory/xtra_backup_ABCD - --no-server-version-check - | /xbcloud/location put - --curl-retriable-errors=7 - --insecure - --parallel=10 - --md5 - --storage=S3 - --s3-region=s3_region - --s3-bucket=s3_bucket - --s3-endpoint=s3_endpoint - --s3-api-version=s3_api_version - --s3-bucket-lookup=s3_uri_style - s3_directory -""".split() + _expected_xtrabackup_commands = [ + "/xtrabackup/location --defaults-file=/defaults/file.cnf", + "--defaults-group=mysqld", + "--no-version-check", + "--parallel=16", + "--user=backups", + "--password=backupspassword", + "--socket=/mysqld/socket/file.sock", + "--lock-ddl", + "--backup", + "--stream=xbstream", + "--xtrabackup-plugin-dir=/xtrabackup/plugin/dir", + "--target-dir=/tmp/base/directory/xtra_backup_ABCD", + "--no-server-version-check", + "| /xbcloud/location put", + "--curl-retriable-errors=7", + "--insecure", + "--parallel=10", + "--md5", + "--storage=S3", + "--s3-region=s3_region", + "--s3-bucket=s3_bucket", + "--s3-endpoint=s3_endpoint", + "--s3-api-version=s3_api_version", + "--s3-bucket-lookup=s3_uri_style", + "s3_directory", + ] self.assertEqual( sorted(_execute_commands.mock_calls), @@ -1137,6 +1137,7 @@ def test_execute_backup_commands(self, _execute_commands): "ACCESS_KEY_ID": "s3_access_key", "SECRET_ACCESS_KEY": "s3_secret_key", }, + stream_output="stderr", ), ] ), @@ -1258,23 +1259,23 @@ def test_retrieve_backup_with_xbcloud( _expected_temp_dir_commands = ( "mktemp --directory mysql/data/directory/#mysql_sst_XXXX".split() ) - _expected_retrieve_backup_commands = """ -xbcloud/location get - --curl-retriable-errors=7 - --parallel=10 - --storage=S3 - --s3-region=s3_region - --s3-bucket=s3_bucket - --s3-endpoint=s3_endpoint - --s3-bucket-lookup=s3_uri_style - --s3-api-version=s3_api_version - s3_path/backup-id - | xbstream/location - --decompress - -x - -C mysql/data/directory/#mysql_sst_ABCD - --parallel=16 -""".split() + _expected_retrieve_backup_commands = [ + "xbcloud/location get", + "--curl-retriable-errors=7", + "--parallel=10", + "--storage=S3", + "--s3-region=s3_region", + "--s3-bucket=s3_bucket", + "--s3-endpoint=s3_endpoint", + "--s3-bucket-lookup=s3_uri_style", + "--s3-api-version=s3_api_version", + "s3_path/backup-id", + "| xbstream/location", + "--decompress", + "-x", + "-C mysql/data/directory/#mysql_sst_ABCD", + "--parallel=16", + ] self.assertEqual( sorted(_execute_commands.mock_calls), @@ -1291,6 +1292,7 @@ def test_retrieve_backup_with_xbcloud( }, user="test-user", group="test-group", + stream_output="stderr", ), ] ), diff --git a/tests/unit/test_mysqlsh_helpers.py b/tests/unit/test_mysqlsh_helpers.py index 0d7aacb42..cdfe248bf 100644 --- a/tests/unit/test_mysqlsh_helpers.py +++ b/tests/unit/test_mysqlsh_helpers.py @@ -349,9 +349,12 @@ def test_create_custom_mysqld_config_exception( with self.assertRaises(MySQLCreateCustomMySQLDConfigError): self.mysql.write_mysqld_config(profile="production", memory_limit=None) - @patch("subprocess.run") - def test_execute_commands(self, _run): + @patch("subprocess.Popen") + def test_execute_commands(self, _popen): """Test a successful execution of _execute_commands.""" + process = MagicMock() + _popen.return_value = process + process.wait.return_value = 0 self.mysql._execute_commands( ["ls", "-la", "|", "wc", "-l"], bash=True, @@ -359,22 +362,24 @@ def test_execute_commands(self, _run): group="test_group", env_extra={"envA": "valueA"}, ) - env = os.environ + env = os.environ.copy() env.update({"envA": "valueA"}) - _run.assert_called_once_with( + _popen.assert_called_once_with( ["bash", "-c", "set -o pipefail; ls -la | wc -l"], user="test_user", group="test_group", env=env, - capture_output=True, - check=True, encoding="utf-8", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, ) - @patch("subprocess.run") - def test_execute_commands_exception(self, _run): + @patch("subprocess.Popen") + def test_execute_commands_exception(self, _popen): """Test a failure in execution of _execute_commands.""" - _run.side_effect = subprocess.CalledProcessError(cmd="", returncode=-1) + process = MagicMock() + _popen.return_value = process + process.wait.return_value = -1 with self.assertRaises(MySQLExecError): self.mysql._execute_commands( @@ -402,8 +407,9 @@ def test_stop_mysqld(self, _snap_service_operation): CHARMED_MYSQL_SNAP_NAME, CHARMED_MYSQLD_SERVICE, "stop" ) + @patch("mysql_vm_helpers.MySQL.kill_client_sessions") @patch("mysql_vm_helpers.snap_service_operation") - def test_stop_mysqld_failure(self, _snap_service_operation): + def test_stop_mysqld_failure(self, _snap_service_operation, _): """Test failure of stop_mysqld().""" _snap_service_operation.side_effect = SnapServiceOperationError("failure")