From 54b71ef63e71d2c408e225cd287d91d6d49a4e26 Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Tue, 16 Jan 2024 13:59:53 +0100 Subject: [PATCH] Make restore workers pool configurable Starting as many Python interpreters as there are CPUs can be problematic, particularly when running tests using myhoard and when these tests are also parallelized. --- README.md | 5 +++++ myhoard/controller.py | 3 +++ myhoard/myhoard.py | 5 +++++ myhoard/restore_coordinator.py | 5 +++-- test/__init__.py | 1 + test/test_restore_coordinator.py | 2 ++ 6 files changed, 19 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index be71228..22e6fba 100644 --- a/README.md +++ b/README.md @@ -344,6 +344,11 @@ The full path and file name prefix of relay logs. This must be the same as the corresponding MySQL configuration option except full path is always required here. +**restore_download_workers_count** + +Number of worker processes downloading binlogs during a restore, +defaults to `max(cpu_count - 1, 1)` + **restore_free_memory_percentage** Maximum percentage of system memory to allow xtrabackup to use while diff --git a/myhoard/controller.py b/myhoard/controller.py index 0abc372..b66b039 100644 --- a/myhoard/controller.py +++ b/myhoard/controller.py @@ -156,6 +156,7 @@ def __init__( state_dir, stats, temp_dir, + restore_download_workers_count: int, restore_free_memory_percentage=None, xtrabackup_settings: Dict[str, int], ): @@ -197,6 +198,7 @@ def __init__( self.optimize_tables_before_backup = optimize_tables_before_backup self.restart_mysqld_callback = restart_mysqld_callback self.restore_max_binlog_bytes = restore_max_binlog_bytes + self.restore_download_workers_count = restore_download_workers_count self.restore_free_memory_percentage: Optional[int] = restore_free_memory_percentage self.restore_coordinator: Optional[RestoreCoordinator] = None self.seen_basebackup_infos: Dict[str, BaseBackup] = {} @@ -849,6 +851,7 @@ def _create_restore_coordinator_if_missing(self): self.log.info("Creating new restore coordinator") self.restore_coordinator = RestoreCoordinator( binlog_streams=options["binlog_streams"], + download_workers_count=self.restore_download_workers_count, file_storage_config=storage_config, max_binlog_bytes=self.restore_max_binlog_bytes, free_memory_percentage=self.restore_free_memory_percentage, diff --git a/myhoard/myhoard.py b/myhoard/myhoard.py index a1ab85f..08dcb32 100644 --- a/myhoard/myhoard.py +++ b/myhoard/myhoard.py @@ -9,6 +9,7 @@ import asyncio import json import logging +import multiprocessing import os import signal import subprocess @@ -191,6 +192,9 @@ async def _start(self): tags=statsd_config["tags"], ) mysql = self.config["mysql"] + restore_download_workers_count = self.config.get("restore_download_workers", None) + if restore_download_workers_count is None: + restore_download_workers_count = max(multiprocessing.cpu_count() - 1, 1) self.controller = Controller( backup_settings=self.config["backup_settings"], backup_sites=self.config["backup_sites"], @@ -204,6 +208,7 @@ async def _start(self): optimize_tables_before_backup=self.config.get("optimize_tables_before_backup", False), restart_mysqld_callback=self._restart_mysqld, restore_max_binlog_bytes=self.config["restore_max_binlog_bytes"], + restore_download_workers_count=restore_download_workers_count, restore_free_memory_percentage=self.config.get("restore_free_memory_percentage"), server_id=self.config["server_id"], state_dir=self.config["state_directory"], diff --git a/myhoard/restore_coordinator.py b/myhoard/restore_coordinator.py index a3241da..6db49e8 100644 --- a/myhoard/restore_coordinator.py +++ b/myhoard/restore_coordinator.py @@ -155,6 +155,7 @@ def __init__( self, *, binlog_streams: List[BinlogStream], + download_workers_count: int, file_storage_config, free_memory_percentage, max_binlog_bytes=None, @@ -184,6 +185,7 @@ def __init__( # a basebackup fails for any reason but earlier backups are available and basebackup from one of those # can be successfully restored. self.binlog_streams = binlog_streams + self.download_workers_count = download_workers_count self.current_file = None self.file_storage: Optional[BaseTransfer] = None self.file_storage_config = file_storage_config @@ -1518,14 +1520,13 @@ def _relay_log_prefetch_name(self, *, index: int) -> str: return f"{local_name}.prefetch" def _start_process_pool(self) -> None: - process_count = max(multiprocessing.cpu_count() - 1, 1) config = { "object_storage": self.file_storage_config, "rsa_private_key_pem": self.rsa_private_key_pem.decode("ascii"), } self.worker_processes = [ self.mp_context.Process(target=download_binlog, args=(config, self.queue_out, self.queue_in)) - for _ in range(process_count) + for _ in range(self.download_workers_count) ] for worker in self.worker_processes: worker.start() diff --git a/test/__init__.py b/test/__init__.py index 1d66b78..9f67479 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -98,6 +98,7 @@ def build_controller( mysql_relay_log_prefix=mysql_config.config_options.relay_log_file_prefix, restart_mysqld_callback=lambda **kwargs: restart_mysql(mysql_config, **kwargs), restore_max_binlog_bytes=2 * 1024 * 1024, + restore_download_workers_count=2, server_id=mysql_config.server_id, state_dir=state_dir, stats=build_statsd_client(), diff --git a/test/test_restore_coordinator.py b/test/test_restore_coordinator.py index 0d03557..d9c8fce 100644 --- a/test/test_restore_coordinator.py +++ b/test/test_restore_coordinator.py @@ -284,6 +284,7 @@ def stream_is_closed(stream): {"site": "default", "stream_id": bs1.state["stream_id"]}, {"site": "default", "stream_id": bs2.state["stream_id"]}, ], + download_workers_count=2, file_storage_config={ "directory": backup_target_location, "storage_type": "local", @@ -391,6 +392,7 @@ def test_empty_last_relay(running_state, session_tmpdir, mysql_master, mysql_emp # We do not connect to the db in this call we just need a RestoreCoordinator object rc = RestoreCoordinator( binlog_streams=[], + download_workers_count=2, file_storage_config={}, free_memory_percentage=80, mysql_client_params=restored_connect_options,