Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Dask unscheduled workers #1032

Draft
wants to merge 1 commit into
base: development
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions smac/runner/dask_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import time
from pathlib import Path
import asyncio

import dask
from ConfigSpace import Configuration
Expand Down Expand Up @@ -50,6 +51,9 @@ class DaskParallelRunner(AbstractRunner):
A runner to run in a distributed fashion. Will be distributed using `n_workers`.
patience: int, default to 5
How much to wait for workers (seconds) to be available if one fails.
worker_timeout: float, defaults to 60
Time the dask client waits for workers to be scheduled whenever a trial is submitted.
Ensures that workers are actually scheduled.
dask_client: Client | None, defaults to None
User-created dask client, which can be used to start a dask cluster and then attach SMAC to it. This will not
be closed automatically and will have to be closed manually if provided explicitly. If none is provided
Expand All @@ -60,6 +64,7 @@ def __init__(
self,
single_worker: AbstractRunner,
patience: int = 5,
worker_timeout: float = 60,
dask_client: Client | None = None,
):
super().__init__(
Expand All @@ -76,6 +81,7 @@ def __init__(
# Dask related variables
self._scheduler_file: Path | None = None
self._patience = patience
self._worker_timeout = worker_timeout

self._client: Client
self._close_client_at_del: bool
Expand Down Expand Up @@ -124,6 +130,15 @@ def submit_trial(self, trial_info: TrialInfo, **dask_data_to_scatter: dict[str,
For example, when your target function has a big dataset shared across all the target function,
this argument is very useful.
"""
# Wait for workers to be scheduled
try:
self._client.wait_for_workers(n_workers=1, timeout=self._worker_timeout)
except asyncio.exceptions.TimeoutError as error:
logger.debug(f"No worker could be scheduled in time after {self._worker_timeout}s on the cluster. "
"Try increasing `worker_timeout`.")
raise error


# Check for resources or block till one is available
if self.count_available_workers() <= 0:
logger.debug("No worker available. Waiting for one to be available...")
Expand Down