diff --git a/smac/runner/dask_runner.py b/smac/runner/dask_runner.py index b9aade401..efd06c70b 100644 --- a/smac/runner/dask_runner.py +++ b/smac/runner/dask_runner.py @@ -4,6 +4,7 @@ import time from pathlib import Path +import asyncio import dask from ConfigSpace import Configuration @@ -50,6 +51,9 @@ class DaskParallelRunner(AbstractRunner): A runner to run in a distributed fashion. Will be distributed using `n_workers`. patience: int, default to 5 How much to wait for workers (seconds) to be available if one fails. + worker_timeout: float, defaults to 60 + Time the dask client waits for workers to be scheduled whenever a trial is submitted. + Ensures that workers are actually scheduled. dask_client: Client | None, defaults to None User-created dask client, which can be used to start a dask cluster and then attach SMAC to it. This will not be closed automatically and will have to be closed manually if provided explicitly. If none is provided @@ -60,6 +64,7 @@ def __init__( self, single_worker: AbstractRunner, patience: int = 5, + worker_timeout: float = 60, dask_client: Client | None = None, ): super().__init__( @@ -76,6 +81,7 @@ def __init__( # Dask related variables self._scheduler_file: Path | None = None self._patience = patience + self._worker_timeout = worker_timeout self._client: Client self._close_client_at_del: bool @@ -124,6 +130,15 @@ def submit_trial(self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str, For example, when your target function has a big dataset shared across all the target function, this argument is very useful. """ + # Wait for workers to be scheduled + try: + self._client.wait_for_workers(n_workers=1, timeout=self._worker_timeout) + except asyncio.exceptions.TimeoutError as error: + logger.debug(f"No worker could be scheduled in time after {self._worker_timeout}s on the cluster. " + "Try increasing `worker_timeout`.") + raise error + + # Check for resources or block till one is available if self.count_available_workers() <= 0: logger.debug("No worker available. Waiting for one to be available...")