Skip to content

Commit

Permalink
Parallel torsiondrive optimisations (#277)
Browse files Browse the repository at this point in the history
* make cli workers use the solo celery pool, enable parallel torsiondrives

* remove logging calls in the torsiondrive

* try concurrent.futures pool to stop slurm hang

* switch back to multiprocessing and use spawn in the pool

* clean up comment

* typo in context

* fix worker test
  • Loading branch information
jthorton authored Aug 4, 2023
1 parent 4c4422f commit ef63c9f
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 9 deletions.
5 changes: 4 additions & 1 deletion openff/bespokefit/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ def worker_cli(worker_type: str):

settings = current_settings()

worker_kwargs = {}

if worker_type == "fragmenter":
worker_settings = settings.fragmenter_settings
elif worker_type == "qc-compute":
worker_settings = settings.qc_compute_settings
worker_kwargs["pool"] = "solo"
else:
worker_settings = settings.optimizer_settings

Expand All @@ -59,4 +62,4 @@ def worker_cli(worker_type: str):
worker_status.stop()
console.print(f"[[green]✓[/green]] bespoke {worker_type} worker launched")

spawn_worker(worker_app, concurrency=1, asynchronous=False)
spawn_worker(worker_app, concurrency=1, asynchronous=False, **worker_kwargs)
10 changes: 8 additions & 2 deletions openff/bespokefit/executor/services/qcgenerator/qcengine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from multiprocessing import Pool
from multiprocessing import current_process, get_context
from typing import Dict, List, Union

from qcelemental.models import FailedOperation
Expand Down Expand Up @@ -43,6 +43,10 @@ def _spawn_optimizations(
settings = current_settings()
program = input_model.optimization_spec.keywords["program"]
opts_per_worker = settings.BEFLOW_QC_COMPUTE_WORKER_N_TASKS
# we can only split the tasks if the celery worker is the main process so if not set back to 1
if current_process().name != "MainProcess":
opts_per_worker = 1

if program == "psi4" and opts_per_worker == "auto":
# we recommend 8 cores per worker for psi4 from our qcfractal jobs
opts_per_worker = max([int(config.ncores / 8), 1])
Expand All @@ -55,7 +59,9 @@ def _spawn_optimizations(
# split the resources based on the number of tasks
n_workers = int(min([n_jobs, opts_per_worker]))
opt_config = _divide_config(config=config, n_workers=n_workers)
with Pool(processes=n_workers) as pool:

# Using fork can hang on our local HPC so pin to use spawn
with get_context("spawn").Pool(processes=n_workers) as pool:
tasks = {
grid_point: [
pool.apply_async(
Expand Down
11 changes: 9 additions & 2 deletions openff/bespokefit/executor/services/qcgenerator/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,18 @@ def compute_torsion_drive(task_json: str) -> TorsionDriveResult:
),
)

# run all torsiondrives through our custom procedure which handles parallel optimisations
return_value = qcengine.compute_procedure(
input_schema, "torsiondrive", raise_error=True, local_options=_task_config()
input_schema,
"TorsionDriveParallel",
raise_error=True,
task_config=_task_config(),
)

if isinstance(return_value, TorsionDriveResult):
_task_logger.info(
f"1D TorsionDrive successfully completed in {return_value.provenance.wall_time}"
)
return_value = TorsionDriveResult(
**return_value.dict(exclude={"optimization_history", "stdout", "stderr"}),
optimization_history={},
Expand Down Expand Up @@ -204,7 +211,7 @@ def compute_optimization(
input_schema,
task.optimization_spec.program,
raise_error=True,
local_options=_task_config(),
task_config=_task_config(),
)

if isinstance(return_value, OptimizationResult):
Expand Down
7 changes: 4 additions & 3 deletions openff/bespokefit/executor/utilities/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,20 @@ def configure_celery_app(
return celery_app


def _spawn_worker(celery_app, concurrency: int = 1):
def _spawn_worker(celery_app, concurrency: int = 1, **kwargs):
worker = celery_app.Worker(
concurrency=concurrency,
loglevel="INFO",
logfile=f"celery-{celery_app.main}.log",
quiet=True,
hostname=celery_app.main,
**kwargs,
)
worker.start()


def spawn_worker(
celery_app, concurrency: int = 1, asynchronous: bool = True
celery_app, concurrency: int = 1, asynchronous: bool = True, **kwargs
) -> Optional[multiprocessing.Process]:
if concurrency < 1:
return
Expand All @@ -81,7 +82,7 @@ def spawn_worker(
return worker_process

else:
_spawn_worker(celery_app, concurrency)
_spawn_worker(celery_app, concurrency, **kwargs)


def get_task_information(app: Celery, task_id: str) -> TaskInformation:
Expand Down
2 changes: 1 addition & 1 deletion openff/bespokefit/tests/cli/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_launch_worker(worker_type, runner, monkeypatch):
in test_celery/test_spawn_worker
"""

def mock_spawn(*args):
def mock_spawn(*args, **kwargs):
return True

monkeypatch.setattr(celery, "_spawn_worker", mock_spawn)
Expand Down

0 comments on commit ef63c9f

Please sign in to comment.