Skip to content

Commit

Permalink
Bugfix WDL and add mounting hacks
Browse files Browse the repository at this point in the history
Needed to add a few hacks to the WDL and submit call to get things to
work for the collections containers. When mounting works with JAWS they
can be removed

Example run:
```
In [2]: genomes = [
   ...:     "test-bucket/collections_image_test/NONE/CDM/FastGenomics/GCA_000008
   ...: 085.1/GCA_000008085.1_ASM808v1_genomic.fna.gz",
   ...:     "test-bucket/collections_image_test/NONE/CDM/FastGenomics/GCA_000010
   ...: 565.1/GCA_000010565.1_ASM1056v1_genomic.fna.gz",
   ...:     "test-bucket/collections_image_test/NONE/CDM/FastGenomics/GCA_000145
   ...: 985.1/GCA_000145985.1_ASM14598v1_genomic.fna.gz",
   ...:     "test-bucket/collections_image_test/NONE/CDM/FastGenomics/GCA_000147
   ...: 015.1/GCA_000147015.1_ASM14701v1_genomic.fna.gz",
   ...: ]

In [3]: genomess3 = [
   ...:     {
   ...:         "file": f,
   ...:         "data_id": f.split("/")[-1].split("_ASM")[0],
   ...:     }
   ...:     for f in genomes
   ...: ]

In [4]: res = requests.post("http://localhost:5000/jobs",
headers={"Authorizatio
   ...: n": f"Bearer {token_ci}"}, json={
   ...:     "cluster": "perlmutter-jaws",
   ...:     "image": "ghcr.io/kbase/collections:checkm2_0.1.6",
   ...:     "params": {
   ...:         "environment": {
   ...:             "DATA_ID_FILE":
   ...:                 {
   ...:                     "type": "manifest_file",
   ...:                     "manifest_file_format": "data_ids",
   ...:                     "manifest_file_header": "genome_id",
   ...:                 },
   ...:             "ENV": "NONE",
   ...:             "KBASE_COLLECTION": "CDM",
   ...:             "SOURCE_VER": "FastGenomics",
   ...:             "LOAD_VER": "FastGenomics.1",
   ...:             "ROOT_DIR": ".",
   ...:             "JOB_ID": {"type": "container_number"},
   ...:             "THREADS_PER_TOOL_RUN": "4",
   ...:             "SOURCE_FILE_EXT": ".fna.gz",
   ...:             # Hack until mounting works
   ...:             "CHECKM2DB":
"/refdata/refdata/checkm2/1.0.1/uniref100.KO.1.
   ...: dmnd"
   ...:          }
   ...:     },
   ...:     "input_files": genomess3,
   ...:     "input_roots": ["test-bucket/collections_image_test"],
   ...:     "output_dir": "test-bucket/collections_image_test_out",
   ...:     "runtime": 600,
   ...:     "num_containers": 3,
   ...:     "memory": "100GB",
   ...: })
```
  • Loading branch information
MrCreosote committed Jan 7, 2025
1 parent affa4fb commit 1270dac
Showing 1 changed file with 28 additions and 17 deletions.
45 changes: 28 additions & 17 deletions cdmtaskservice/jaws/wdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
_WDL_VERSION = "1.0" # Cromwell, and therefore JAWS, only supports 1.0

_IMAGE_TRANS_CHARS = str.maketrans({".": "_", "-": "_", "/": "_", ":": "_"})
_CONTAINER_NUM = "container_num"


class JawsInput(NamedTuple):
Expand Down Expand Up @@ -75,7 +74,7 @@ def generate_wdl(
environment = []
cmdlines = []
mfl = [None] * job_files.containers if not manifest_file_list else manifest_file_list
for files, manifest in zip(job_files.files, mfl):
for i, (files, manifest) in enumerate(zip(job_files.files, mfl)):

Check warning on line 77 in cdmtaskservice/jaws/wdl.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jaws/wdl.py#L77

Added line #L77 was not covered by tests
ins = []
rels = []
for f in files:
Expand All @@ -86,15 +85,15 @@ def generate_wdl(
input_files.append(ins)
relpaths.append(rels)
cmd = [shlex.quote(c) for c in job.image.entrypoint]
cmd.extend(_process_flag_args(job, files, file_to_rel_path, manifest))
cmd.extend(_process_pos_args(job, files, file_to_rel_path, manifest))
cmd.extend(_process_flag_args(job, i, files, file_to_rel_path, manifest))
cmd.extend(_process_pos_args(job, i, files, file_to_rel_path, manifest))

Check warning on line 89 in cdmtaskservice/jaws/wdl.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jaws/wdl.py#L88-L89

Added lines #L88 - L89 were not covered by tests
cmdlines.append(cmd)
environment.append(_process_environment(job, files, file_to_rel_path, manifest))
environment.append(_process_environment(job, i, files, file_to_rel_path, manifest))

Check warning on line 91 in cdmtaskservice/jaws/wdl.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jaws/wdl.py#L91

Added line #L91 was not covered by tests
input_json = {
f"{workflow_name}.input_files_list": input_files,
f"{workflow_name}.file_locs_list": relpaths,
f"{workflow_name}.environment_list": environment,
f"{workflow_name}.cmdline_list": cmdlines
f"{workflow_name}.cmdline_list": cmdlines,
}
if manifest_file_list:
input_json[f"{workflow_name}.manifest_list"] = [str(m) for m in manifest_file_list]
Expand Down Expand Up @@ -133,7 +132,6 @@ def _generate_wdl(job: Job, workflow_name: str, manifests: bool):
scatter (i in range(length(input_files_list))) {{
call run_container {{
input:
{_CONTAINER_NUM} = i,
input_files = input_files_list[i],
file_locs = file_locs_list[i],
environ = environment_list[i],
Expand All @@ -149,7 +147,6 @@ def _generate_wdl(job: Job, workflow_name: str, manifests: bool):
task run_container {{
input {{
Int {_CONTAINER_NUM}
Array[File] input_files
Array[String] file_locs
Array[String] environ
Expand All @@ -160,9 +157,13 @@ def _generate_wdl(job: Job, workflow_name: str, manifests: bool):
mkdir -p ./__input__
mkdir -p ./__output__
# TODO MOUNTING remove hack for collections containers
ln -s __input__ collectionssource
ln -s __output__ collectionsdata
# link any manifest file into the mount point
if [[ -n "${{manifest}}" ]]; then
ln ${{manifest}} ./__input__/$(basename ${{manifest}})
if [[ -n "~{{manifest}}" ]]; then
ln ~{{manifest}} ./__input__/$(basename ~{{manifest}})
fi
# link the input files into the mount point
Expand Down Expand Up @@ -210,6 +211,7 @@ def _generate_wdl(job: Job, workflow_name: str, manifests: bool):

def _process_flag_args(
job: Job,
container_num: int,
files: list[S3File],
file_to_rel_path: dict[S3File, Path],
manifest: Path | None,
Expand All @@ -218,41 +220,48 @@ def _process_flag_args(
if job.job_input.params.flag_args:
for flag, p in job.job_input.params.flag_args.items():
cmd.extend(_process_parameter(
p, job, files, file_to_rel_path, manifest, as_list=True, flag=flag
p, job, container_num, files, file_to_rel_path, manifest, as_list=True, flag=flag
))
return cmd


def _process_pos_args(
job: Job,
container_num: int,
files: list[S3File],
file_to_rel_path: dict[S3File, Path],
manifest: Path | None,
) -> list[str]:
cmd = []
if job.job_input.params.positional_args:
for p in job.job_input.params.positional_args:
cmd.extend(_process_parameter(p, job, files, file_to_rel_path, manifest, as_list=True))
cmd.extend(_process_parameter(

Check warning on line 238 in cdmtaskservice/jaws/wdl.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jaws/wdl.py#L238

Added line #L238 was not covered by tests
p, job, container_num, files, file_to_rel_path, manifest, as_list=True
))
return cmd


def _process_environment(
job: Job,
container_num: int,
files: list[S3File],
file_to_rel_path: dict[S3File, Path],
manifest: Path | None,
) -> list[str]:
env = []
if job.job_input.params.environment:
for envkey, enval in job.job_input.params.environment.items():
enval = _process_parameter(enval, job, files, file_to_rel_path, manifest)
enval = _process_parameter(

Check warning on line 254 in cdmtaskservice/jaws/wdl.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jaws/wdl.py#L254

Added line #L254 was not covered by tests
enval, job, container_num, files, file_to_rel_path, manifest
)
env.append(f'{envkey}={enval}')
return env


def _process_parameter(
param: str | Parameter,
job: Job,
container_num: int,
files: list[S3File],
file_to_rel_path: dict[S3File, Path],
manifest: Path | None,
Expand All @@ -266,7 +275,7 @@ def _process_parameter(
case ParameterType.MANIFEST_FILE: # implies manifest file is not None
param = _handle_manifest(job, manifest, flag)
case ParameterType.CONTAINTER_NUMBER:
param = _handle_container_num(flag)
param = _handle_container_num(flag, container_num)

Check warning on line 278 in cdmtaskservice/jaws/wdl.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jaws/wdl.py#L278

Added line #L278 was not covered by tests
case _:
# should be impossible but make code future proof
raise ValueError(f"Unexpected parameter type: {_}")
Expand All @@ -280,9 +289,9 @@ def _process_parameter(
return [param] if as_list and not isinstance(param, list) else param


def _handle_container_num(flag: str) -> str | list[str]:
def _handle_container_num(flag: str, container_num: int) -> str | list[str]:
# similar to the function below
cn = f"${_CONTAINER_NUM}"
cn = str(container_num)

Check warning on line 294 in cdmtaskservice/jaws/wdl.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jaws/wdl.py#L294

Added line #L294 was not covered by tests
if flag:
if flag.endswith("="):
# TODO TEST not sure if this will work
Expand All @@ -295,7 +304,9 @@ def _handle_container_num(flag: str) -> str | list[str]:


def _handle_manifest(job: Job, manifest: Path, flag: str) -> str | list[str]:
pth = os.path.join(job.job_input.params.input_mount_point, manifest.name)
# TODO MOUNTING remove this hack when mounting works
#pth = os.path.join(job.job_input.params.input_mount_point, manifest.name)
pth = os.path.join("./__input__", manifest.name)

Check warning on line 309 in cdmtaskservice/jaws/wdl.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jaws/wdl.py#L309

Added line #L309 was not covered by tests
# This is the same as the command separated list case below... not sure if using a common fn
# makes sense
if flag:
Expand Down

0 comments on commit 1270dac

Please sign in to comment.