diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c2ce4bf7..b4b45d26 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ # See https://pre-commit.com/hooks.html for more hooks repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.6.0 + rev: v4.5.0 hooks: - id: trailing-whitespace exclude: "^tests/" @@ -18,7 +18,7 @@ repos: # Python - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.4.2 + rev: v0.3.4 hooks: - id: ruff args: ["--fix"] diff --git a/dpdispatcher/contexts/hdfs_context.py b/dpdispatcher/contexts/hdfs_context.py index 4661ca79..890d3cfa 100644 --- a/dpdispatcher/contexts/hdfs_context.py +++ b/dpdispatcher/contexts/hdfs_context.py @@ -138,7 +138,7 @@ def download( shutil.rmtree(gz_dir, ignore_errors=True) os.mkdir(os.path.join(self.local_root, "tmp")) rfile_tgz = f"{self.remote_root}/{submission.submission_hash}_*_download.tar.gz" - lfile_tgz = "%s/tmp/" % (self.local_root) + lfile_tgz = f"{self.local_root}/tmp/" HDFS.copy_to_local(rfile_tgz, lfile_tgz) tgz_file_list = glob(os.path.join(self.local_root, "tmp/*_download.tar.gz")) @@ -164,7 +164,7 @@ def download( os.path.join( self.local_root, task.task_work_path, - "tag_failure_download_%s" % jj, + f"tag_failure_download_{jj}", ), "w", ) as fp: @@ -198,9 +198,7 @@ def download( if check_exists: if mark_failure: with open( - os.path.join( - self.local_root, "tag_failure_download_%s" % jj - ), + os.path.join(self.local_root, f"tag_failure_download_{jj}"), "w", ) as fp: pass diff --git a/dpdispatcher/contexts/local_context.py b/dpdispatcher/contexts/local_context.py index 4a1ff631..905a8d6e 100644 --- a/dpdispatcher/contexts/local_context.py +++ b/dpdispatcher/contexts/local_context.py @@ -153,7 +153,7 @@ def download( tag_file_path = os.path.join( self.local_root, ii.task_work_path, - "tag_failure_download_%s" % kk, + f"tag_failure_download_{kk}", ) with open(tag_file_path, "w") as fp: pass @@ -181,7 +181,7 @@ def download( tag_file_path = os.path.join( self.local_root, ii.task_work_path, - "tag_failure_download_%s" % jj, + f"tag_failure_download_{jj}", ) with open(tag_file_path, "w") as fp: pass @@ -227,7 +227,7 @@ def download( if check_exists: if mark_failure: tag_file_path = os.path.join( - self.local_root, "tag_failure_download_%s" % kk + self.local_root, f"tag_failure_download_{kk}" ) with open(tag_file_path, "w") as fp: pass @@ -252,7 +252,7 @@ def download( if mark_failure: with open( os.path.join( - self.local_root, "tag_failure_download_%s" % jj + self.local_root, f"tag_failure_download_{jj}" ), "w", ) as fp: diff --git a/dpdispatcher/contexts/ssh_context.py b/dpdispatcher/contexts/ssh_context.py index e465e149..959b6445 100644 --- a/dpdispatcher/contexts/ssh_context.py +++ b/dpdispatcher/contexts/ssh_context.py @@ -300,7 +300,7 @@ def exec_command(self, cmd): # retry for up to 3 times # ensure alive self.ensure_alive() - raise RetrySignal("SSH session not active in calling %s" % cmd) from e + raise RetrySignal(f"SSH session not active in calling {cmd}") from e @property def sftp(self): @@ -628,8 +628,7 @@ def upload( # check sha256 # `:` means pass: https://stackoverflow.com/a/2421592/9567349 _, stdout, _ = self.block_checkcall( - "sha256sum -c %s --quiet >.sha256sum_stdout 2>/dev/null || :" - % shlex.quote(sha256_file) + f"sha256sum -c {shlex.quote(sha256_file)} --quiet >.sha256sum_stdout 2>/dev/null || :" ) self.sftp.remove(sha256_file) # regenerate file list @@ -708,7 +707,7 @@ def download( os.path.join( self.local_root, ii.task_work_path, - "tag_failure_download_%s" % jj, + f"tag_failure_download_{jj}", ), "w", ) as fp: @@ -758,9 +757,9 @@ def block_checkcall(self, cmd, asynchronously=False, stderr_whitelist=None): assert self.remote_root is not None self.ssh_session.ensure_alive() if asynchronously: - cmd = "nohup %s >/dev/null &" % cmd + cmd = f"nohup {cmd} >/dev/null &" stdin, stdout, stderr = self.ssh_session.exec_command( - ("cd %s ;" % shlex.quote(self.remote_root)) + cmd + (f"cd {shlex.quote(self.remote_root)} ;") + cmd ) exit_status = stdout.channel.recv_exit_status() if exit_status != 0: @@ -779,7 +778,7 @@ def block_call(self, cmd): assert self.remote_root is not None self.ssh_session.ensure_alive() stdin, stdout, stderr = self.ssh_session.exec_command( - ("cd %s ;" % shlex.quote(self.remote_root)) + cmd + (f"cd {shlex.quote(self.remote_root)} ;") + cmd ) exit_status = stdout.channel.recv_exit_status() return exit_status, stdin, stdout, stderr @@ -846,12 +845,12 @@ def _rmtree(self, remotepath, verbose=False): # Thus, it's better to use system's `rm` to remove a directory, which may # save a lot of time. if verbose: - dlog.info("removing %s" % remotepath) + dlog.info(f"removing {remotepath}") # In some supercomputers, it's very slow to remove large numbers of files # (e.g. directory containing trajectory) due to bad I/O performance. # So an asynchronously option is provided. self.block_checkcall( - "rm -rf %s" % shlex.quote(remotepath), + f"rm -rf {shlex.quote(remotepath)}", asynchronously=self.clean_asynchronously, ) @@ -921,7 +920,7 @@ def _put_files( f"from {from_f} to {self.ssh_session.username} @ {self.ssh_session.hostname} : {to_f} Error!" ) # remote extract - self.block_checkcall("tar xf %s" % of) + self.block_checkcall(f"tar xf {of}") # clean up os.remove(from_f) self.sftp.remove(to_f) diff --git a/dpdispatcher/machine.py b/dpdispatcher/machine.py index 214a4902..be3db075 100644 --- a/dpdispatcher/machine.py +++ b/dpdispatcher/machine.py @@ -261,7 +261,7 @@ def gen_script_env(self, job): source_list = job.resources.source_list for ii in source_list: - line = "{ source %s; } \n" % ii + line = f"{{ source {ii}; }} \n" source_files_part += line export_envs_part = "" @@ -466,7 +466,7 @@ def kill(self, job): job : Job job """ - dlog.warning("Job %s should be manually killed" % job.job_id) + dlog.warning(f"Job {job.job_id} should be manually killed") def get_exit_code(self, job): """Get exit code of the job. diff --git a/dpdispatcher/machines/distributed_shell.py b/dpdispatcher/machines/distributed_shell.py index 70a7a94e..d9097173 100644 --- a/dpdispatcher/machines/distributed_shell.py +++ b/dpdispatcher/machines/distributed_shell.py @@ -64,7 +64,7 @@ def gen_script_env(self, job): source_list = job.resources.source_list for ii in source_list: - line = "{ source %s; } \n" % ii + line = f"{{ source {ii}; }} \n" source_files_part += line export_envs_part = "" @@ -96,7 +96,7 @@ def gen_script_env(self, job): def gen_script_end(self, job): all_task_dirs = "" for task in job.job_task_list: - all_task_dirs += "%s " % task.task_work_path + all_task_dirs += f"{task.task_work_path} " job_tag_finished = job.job_hash + "_job_tag_finished" flag_if_job_task_fail = job.job_hash + "_flag_if_job_task_fail" diff --git a/dpdispatcher/machines/lsf.py b/dpdispatcher/machines/lsf.py index 94a225c4..346291e7 100644 --- a/dpdispatcher/machines/lsf.py +++ b/dpdispatcher/machines/lsf.py @@ -121,7 +121,7 @@ def check_status(self, job): return JobStatus.unsubmitted ret, stdin, stdout, stderr = self.context.block_call("bjobs " + job_id) err_str = stderr.read().decode("utf-8") - if ("Job <%s> is not found" % job_id) in err_str: + if (f"Job <{job_id}> is not found") in err_str: if self.check_finish_tag(job): return JobStatus.finished else: diff --git a/dpdispatcher/machines/slurm.py b/dpdispatcher/machines/slurm.py index cb0a27e6..28f47b25 100644 --- a/dpdispatcher/machines/slurm.py +++ b/dpdispatcher/machines/slurm.py @@ -254,7 +254,7 @@ def gen_script_header(self, job): ).as_posix() if not self.context.check_file_exists(task_tag_finished): job_array.add(ii // slurm_job_size) - return super().gen_script_header(job) + "\n#SBATCH --array=%s" % ( + return super().gen_script_header(job) + "\n#SBATCH --array={}".format( ",".join(map(str, job_array)) ) return super().gen_script_header(job) + "\n#SBATCH --array=0-%d" % (