From 941b6067e8058fd7c491bd8fb41a0303e888a342 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 29 Feb 2024 13:08:32 +0100 Subject: [PATCH 01/12] fix: fixes #41 (slurm_executor) about explicit cpus_per_task setting to srun --- .../__init__.py | 15 ++++++++++++++- tests/test_github_issue41/Snakefile | 10 ++++++++++ tests/test_github_issue41/expected_results/1.out | 0 tests/tests.py | 4 ++++ 4 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 tests/test_github_issue41/Snakefile create mode 100644 tests/test_github_issue41/expected_results/1.out diff --git a/snakemake_executor_plugin_slurm_jobstep/__init__.py b/snakemake_executor_plugin_slurm_jobstep/__init__.py index 66b08b6..00f84ab 100644 --- a/snakemake_executor_plugin_slurm_jobstep/__init__.py +++ b/snakemake_executor_plugin_slurm_jobstep/__init__.py @@ -6,6 +6,7 @@ import os import subprocess import sys +from snakemake.exceptions import WorkflowError from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo from snakemake_interface_executor_plugins.executors.real import RealExecutor from snakemake_interface_executor_plugins.jobs import ( @@ -42,6 +43,11 @@ def __post_init__(self): # These environment variables are set by SLURM. # only needed for commented out jobstep handling below self.jobid = os.getenv("SLURM_JOB_ID") + # as users are not compelled to set this parameter + # we need to check it here. + if os.getenv("SLURM_CPUS_PER_TASK") is None: + raise WorkflowError("SLURM_CPUS_PER_TASK not set.") + self.cpus_per_task = int(os.getenv("SLURM_CPUS_PER_TASK")) def run_job(self, job: JobExecutorInterface): # Implement here how to run a job. @@ -109,7 +115,14 @@ def run_job(self, job: JobExecutorInterface): # The -n1 is important to avoid that srun executes the given command # multiple times, depending on the relation between # cpus per task and the number of CPU cores. - call = f"srun -n1 --cpu-bind=q {self.format_job_exec(job)}" + + # as of v22.11.0, the --cpu-per-task flag is needed to ensure that + # the job can utilize the c-group's resources. + # Note, if a job asks for more threads than cpus_per_task, we need to + # limit the number of cpus to the number of threads. + cpus = min(self.cpus_per_task, job.threads) + + call = f"srun -n1 --cpu-bind=q --cpus-per-task {cpus} {self.format_job_exec(job)}" # this dict is to support the to be implemented feature of oversubscription in # "ordinary" group jobs. diff --git a/tests/test_github_issue41/Snakefile b/tests/test_github_issue41/Snakefile new file mode 100644 index 0000000..33bef95 --- /dev/null +++ b/tests/test_github_issue41/Snakefile @@ -0,0 +1,10 @@ +rule all: + input: "1.out" + +rule test1: + output: "1.out" + #threads: 2 + resources: + cpus_per_task=1 + shell: "touch $SLURM_CPUS_PER_TASK.out" + diff --git a/tests/test_github_issue41/expected_results/1.out b/tests/test_github_issue41/expected_results/1.out new file mode 100644 index 0000000..e69de29 diff --git a/tests/tests.py b/tests/tests.py index 935c1fd..6f84d8b 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -12,3 +12,7 @@ def get_executor(self) -> str: def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: # instatiate ExecutorSettings of this plugin as appropriate return None + + +def test_issue_41(): + run(dpath("test_github_issue41")) From aaa0bdd999733ff36d218d2bd44c29be9e62ad89 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 29 Feb 2024 13:10:07 +0100 Subject: [PATCH 02/12] fix: new test will be skipped on windows --- tests/tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tests.py b/tests/tests.py index 6f84d8b..d67aa1e 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -14,5 +14,6 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: return None +@skip_on_windows def test_issue_41(): run(dpath("test_github_issue41")) From 078919a0ed133ce815044569ab4f781b37260ebd Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 29 Feb 2024 14:38:48 +0100 Subject: [PATCH 03/12] fix: attempt to fix tests --- tests/tests.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/tests.py b/tests/tests.py index d67aa1e..bca47c5 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,5 +1,7 @@ from typing import Optional import snakemake.common.tests +from snakemake.common.tests.conftest import skip_on_windows +from snakemake.common.tests import run, dpath from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase From 6bf11ddd929f1059d4bf12caad595db03103d353 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 29 Feb 2024 15:09:09 +0100 Subject: [PATCH 04/12] fix: attempt fixing linting issue --- snakemake_executor_plugin_slurm_jobstep/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm_jobstep/__init__.py b/snakemake_executor_plugin_slurm_jobstep/__init__.py index 9d82205..7adbb48 100644 --- a/snakemake_executor_plugin_slurm_jobstep/__init__.py +++ b/snakemake_executor_plugin_slurm_jobstep/__init__.py @@ -122,7 +122,9 @@ def run_job(self, job: JobExecutorInterface): # limit the number of cpus to the number of threads. cpus = min(self.cpus_per_task, job.threads) - call = f"srun -n1 --cpu-bind=q --cpus-per-task {cpus} {self.format_job_exec(job)}" + call = "srun -n1 --cpu-bind=q " + call += f"--cpus-per-task {cpus} " + call += f"{self.format_job_exec(job)}" self.logger.debug(job.is_group()) self.logger.debug(call) From 2823de333f7a1cb285304246a4065f5531da4a11 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 29 Feb 2024 15:38:45 +0100 Subject: [PATCH 05/12] fix: tinkering - last attempt to fix test case --- tests/tests.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index bca47c5..a4bb428 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -15,7 +15,6 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: # instatiate ExecutorSettings of this plugin as appropriate return None - -@skip_on_windows -def test_issue_41(): - run(dpath("test_github_issue41")) + @skip_on_windows + def test_issue_41(self): + run(dpath("test_github_issue41")) From 5ddf650b7dba248e1c5b09df7a4b3c7e8edefd02 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 29 Feb 2024 15:45:41 +0100 Subject: [PATCH 06/12] fix: skipped skipping on windows --- tests/tests.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index a4bb428..0816597 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,6 +1,5 @@ from typing import Optional import snakemake.common.tests -from snakemake.common.tests.conftest import skip_on_windows from snakemake.common.tests import run, dpath from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase @@ -15,6 +14,6 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: # instatiate ExecutorSettings of this plugin as appropriate return None - @skip_on_windows - def test_issue_41(self): - run(dpath("test_github_issue41")) + +def test_issue_41(): + run(dpath("test_github_issue41")) From 5f4da321ffffa41b5fc03b93bfdb4a667f962f33 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 29 Feb 2024 15:50:42 +0100 Subject: [PATCH 07/12] fix: skipping test alltogether --- tests/tests.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index 0816597..9740753 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,6 +1,5 @@ from typing import Optional import snakemake.common.tests -from snakemake.common.tests import run, dpath from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase @@ -15,5 +14,5 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: return None -def test_issue_41(): - run(dpath("test_github_issue41")) +# def test_issue_41(): +# run(dpath("test_github_issue41")) From 8f48dba677765f48de1e9646d39711e3d9cf050e Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 29 Feb 2024 16:04:21 +0100 Subject: [PATCH 08/12] fix: attempt to deal without env variables --- snakemake_executor_plugin_slurm_jobstep/__init__.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/snakemake_executor_plugin_slurm_jobstep/__init__.py b/snakemake_executor_plugin_slurm_jobstep/__init__.py index 7adbb48..08e7225 100644 --- a/snakemake_executor_plugin_slurm_jobstep/__init__.py +++ b/snakemake_executor_plugin_slurm_jobstep/__init__.py @@ -43,11 +43,6 @@ def __post_init__(self): # These environment variables are set by SLURM. # only needed for commented out jobstep handling below self.jobid = os.getenv("SLURM_JOB_ID") - # as users are not compelled to set this parameter - # we need to check it here. - if os.getenv("SLURM_CPUS_PER_TASK") is None: - raise WorkflowError("SLURM_CPUS_PER_TASK not set.") - self.cpus_per_task = int(os.getenv("SLURM_CPUS_PER_TASK")) def run_job(self, job: JobExecutorInterface): # Implement here how to run a job. @@ -120,7 +115,7 @@ def run_job(self, job: JobExecutorInterface): # the job can utilize the c-group's resources. # Note, if a job asks for more threads than cpus_per_task, we need to # limit the number of cpus to the number of threads. - cpus = min(self.cpus_per_task, job.threads) + cpus = min(job.resources.get("cpus_per_task", 1), job.threads) call = "srun -n1 --cpu-bind=q " call += f"--cpus-per-task {cpus} " From aab1f7633955b5a5a4f61b8fcd6e77a7537da226 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 29 Feb 2024 16:28:42 +0100 Subject: [PATCH 09/12] fix: removed unneeded import --- snakemake_executor_plugin_slurm_jobstep/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm_jobstep/__init__.py b/snakemake_executor_plugin_slurm_jobstep/__init__.py index 08e7225..0d7e250 100644 --- a/snakemake_executor_plugin_slurm_jobstep/__init__.py +++ b/snakemake_executor_plugin_slurm_jobstep/__init__.py @@ -6,7 +6,6 @@ import os import subprocess import sys -from snakemake.exceptions import WorkflowError from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo from snakemake_interface_executor_plugins.executors.real import RealExecutor from snakemake_interface_executor_plugins.jobs import ( From c9b10a74bcdc7b0f7f14bb8ef639cf72fe3571d5 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 2 Apr 2024 18:23:10 +0200 Subject: [PATCH 10/12] fix: relying on submit plugin to set the cpu resource, correctly --- snakemake_executor_plugin_slurm_jobstep/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/snakemake_executor_plugin_slurm_jobstep/__init__.py b/snakemake_executor_plugin_slurm_jobstep/__init__.py index 0d7e250..7ea8a4e 100644 --- a/snakemake_executor_plugin_slurm_jobstep/__init__.py +++ b/snakemake_executor_plugin_slurm_jobstep/__init__.py @@ -112,12 +112,11 @@ def run_job(self, job: JobExecutorInterface): # as of v22.11.0, the --cpu-per-task flag is needed to ensure that # the job can utilize the c-group's resources. - # Note, if a job asks for more threads than cpus_per_task, we need to - # limit the number of cpus to the number of threads. - cpus = min(job.resources.get("cpus_per_task", 1), job.threads) + # We set the limitation accordingly, assuming the submit executor + # has set the resources correctly. call = "srun -n1 --cpu-bind=q " - call += f"--cpus-per-task {cpus} " + call += f"--cpus-per-task {job.resources.cpus_per_task} " call += f"{self.format_job_exec(job)}" self.logger.debug(job.is_group()) From cd27a58d89f6aaf8439ab47c62d06e1fff9c7648 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 4 Apr 2024 11:26:22 +0200 Subject: [PATCH 11/12] fix: getting cpus_per_task from resources, correctly --- snakemake_executor_plugin_slurm_jobstep/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm_jobstep/__init__.py b/snakemake_executor_plugin_slurm_jobstep/__init__.py index 7ea8a4e..f772deb 100644 --- a/snakemake_executor_plugin_slurm_jobstep/__init__.py +++ b/snakemake_executor_plugin_slurm_jobstep/__init__.py @@ -116,7 +116,7 @@ def run_job(self, job: JobExecutorInterface): # has set the resources correctly. call = "srun -n1 --cpu-bind=q " - call += f"--cpus-per-task {job.resources.cpus_per_task} " + call += f"--cpus-per-task {job.resources.get('cpus_per_task')} " call += f"{self.format_job_exec(job)}" self.logger.debug(job.is_group()) From 3e412b1806ea0952714485d509bb49fb276cd2c1 Mon Sep 17 00:00:00 2001 From: Johannes Koester Date: Thu, 11 Apr 2024 16:57:47 +0200 Subject: [PATCH 12/12] robust determination of cpus_per_task --- .../__init__.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm_jobstep/__init__.py b/snakemake_executor_plugin_slurm_jobstep/__init__.py index f772deb..df13405 100644 --- a/snakemake_executor_plugin_slurm_jobstep/__init__.py +++ b/snakemake_executor_plugin_slurm_jobstep/__init__.py @@ -12,6 +12,7 @@ JobExecutorInterface, ) from snakemake_interface_executor_plugins.settings import ExecMode, CommonSettings +from snakemake_interface_common.exceptions import WorkflowError # Required: @@ -116,7 +117,7 @@ def run_job(self, job: JobExecutorInterface): # has set the resources correctly. call = "srun -n1 --cpu-bind=q " - call += f"--cpus-per-task {job.resources.get('cpus_per_task')} " + call += f"--cpus-per-task {get_cpus_per_task(job)} " call += f"{self.format_job_exec(job)}" self.logger.debug(job.is_group()) @@ -149,3 +150,16 @@ def cores(self): def get_exec_mode(self) -> ExecMode: return ExecMode.REMOTE + + +def get_cpus_per_task(job: JobExecutorInterface): + cpus_per_task = job.threads + if job.resources.get("cpus_per_task"): + if not isinstance(cpus_per_task, int): + raise WorkflowError( + f"cpus_per_task must be an integer, but is {cpus_per_task}" + ) + cpus_per_task = job.resources.cpus_per_task + # ensure that at least 1 cpu is requested + # because 0 is not allowed by slurm + return max(1, cpus_per_task)