From 9e18e8430532afbff93e29991150abb3fde95dbf Mon Sep 17 00:00:00 2001 From: Gavin Date: Thu, 19 Dec 2024 13:45:21 -0800 Subject: [PATCH] Generate wdl and input.json files for job submit Moved the code into the NERSC manager as it makes sense for it to figure out the various file paths --- cdmtaskservice/app_state.py | 1 + cdmtaskservice/jobflows/nersc_jaws.py | 40 ++---------------- cdmtaskservice/manifest_files.py | 41 +++++++++++++++++++ cdmtaskservice/nersc/manager.py | 58 ++++++++++++++++++++++----- cdmtaskservice_config.toml.jinja | 4 +- test/manifest_files_test.py | 7 ++++ 6 files changed, 102 insertions(+), 49 deletions(-) create mode 100644 cdmtaskservice/manifest_files.py create mode 100644 test/manifest_files_test.py diff --git a/cdmtaskservice/app_state.py b/cdmtaskservice/app_state.py index 87356bb..a42c84d 100644 --- a/cdmtaskservice/app_state.py +++ b/cdmtaskservice/app_state.py @@ -70,6 +70,7 @@ async def build_app( sfapi_client = await NERSCSFAPIClientProvider.create(Path(cfg.sfapi_cred_path), cfg.sfapi_user) logr.info("Done") logr.info("Setting up NERSC manager and installing code at NERSC...") + # TODO MULTICLUSTER service won't start if perlmutter is down, need to make it more dynamic remote_code_loc = Path(cfg.nersc_remote_code_dir) / VERSION nerscman = await NERSCManager.create( sfapi_client.get_client, remote_code_loc, cfg.jaws_token, cfg.jaws_group diff --git a/cdmtaskservice/jobflows/nersc_jaws.py b/cdmtaskservice/jobflows/nersc_jaws.py index f2930cb..b5dc0f6 100644 --- a/cdmtaskservice/jobflows/nersc_jaws.py +++ b/cdmtaskservice/jobflows/nersc_jaws.py @@ -3,7 +3,6 @@ """ import logging -import os from cdmtaskservice import models from cdmtaskservice import timestamp @@ -11,7 +10,6 @@ from cdmtaskservice.callback_url_paths import get_download_complete_callback from cdmtaskservice.coroutine_manager import CoroutineWrangler from cdmtaskservice.exceptions import InvalidJobStateError -from cdmtaskservice.input_file_locations import determine_file_locations from cdmtaskservice.job_state import JobState from cdmtaskservice.mongo import MongoDAO from cdmtaskservice.nersc.manager import NERSCManager @@ -135,39 +133,7 @@ async def download_complete(self, job: models.AdminJobDetails): await self._coman.run_coroutine(self._download_complete(job)) async def _download_complete(self, job: models.AdminJobDetails): + jaws_job_id = await self._nman.run_JAWS(job) + # TODO JAWS record job ID in DB logr = logging.getLogger(__name__) - manifest_files = self._generate_manifest_files(job) - # TODO REMOVE these lines - logr.info(f"*** manifiles: {len(manifest_files)}") - for m in manifest_files: - logr.info(m) - logr.info("***") - - def _generate_manifest_files(self, job: models.AdminJobDetails) -> list[str]: - # If we support multiple compute sites this should be moved to a general code location - # Currently leave here rather than creating yet another module - manifests = [] - mani_spec = job.job_input.params.get_file_parameter() - if not mani_spec or mani_spec.type is not models.ParameterType.MANIFEST_FILE: - return manifests - file_to_rel_path = determine_file_locations(job.job_input) - for files in job.job_input.get_files_per_container().files: - manifest = "" - if mani_spec.manifest_file_header: - manifest = f"{mani_spec.manifest_file_header}\n" - for f in files: - match mani_spec.manifest_file_format: - case models.ManifestFileFormat.DATA_IDS: - manifest += f"{f.data_id}\n" - case models.ManifestFileFormat.FILES: - f = os.path.join( - job.job_input.params.input_mount_point, file_to_rel_path[f] - ) - manifest += f"{f}\n" - case _: - # Can't currently happen but for future safety - raise ValueError( - f"Unknown manifest file format: {manifest.manifest_file_format}" - ) - manifests.append(manifest) - return manifests + logr.info(f"JAWS job id: {jaws_job_id}") diff --git a/cdmtaskservice/manifest_files.py b/cdmtaskservice/manifest_files.py new file mode 100644 index 0000000..49d9a92 --- /dev/null +++ b/cdmtaskservice/manifest_files.py @@ -0,0 +1,41 @@ +""" +Methods for dealing with manifest files for jobs. +""" + +import os + +from cdmtaskservice import models +from cdmtaskservice.arg_checkers import not_falsy as _not_falsy +from cdmtaskservice.input_file_locations import determine_file_locations + + +def generate_manifest_files(job: models.Job) -> list[str]: + """ + Generate a set of manifest files as strings for a job, or return an empty list if no + manifest files are specified. + """ + manifests = [] + mani_spec = _not_falsy(job, "job").job_input.params.get_file_parameter() + if not mani_spec or mani_spec.type is not models.ParameterType.MANIFEST_FILE: + return manifests + file_to_rel_path = determine_file_locations(job.job_input) + for files in job.job_input.get_files_per_container().files: + manifest = "" + if mani_spec.manifest_file_header: + manifest = f"{mani_spec.manifest_file_header}\n" + for f in files: + match mani_spec.manifest_file_format: + case models.ManifestFileFormat.DATA_IDS: + manifest += f"{f.data_id}\n" + case models.ManifestFileFormat.FILES: + f = os.path.join( + job.job_input.params.input_mount_point, file_to_rel_path[f] + ) + manifest += f"{f}\n" + case _: + # Can't currently happen but for future safety + raise ValueError( + f"Unknown manifest file format: {manifest.manifest_file_format}" + ) + manifests.append(manifest) + return manifests diff --git a/cdmtaskservice/nersc/manager.py b/cdmtaskservice/nersc/manager.py index 09c3944..354f8ee 100644 --- a/cdmtaskservice/nersc/manager.py +++ b/cdmtaskservice/nersc/manager.py @@ -16,7 +16,14 @@ from types import ModuleType from typing import Self -from cdmtaskservice.arg_checkers import not_falsy, require_string, check_int +from cdmtaskservice import models +from cdmtaskservice.arg_checkers import ( + not_falsy as _not_falsy, + require_string as _require_string, + check_int as _check_int, +) +from cdmtaskservice.jaws import wdl +from cdmtaskservice.manifest_files import generate_manifest_files from cdmtaskservice.nersc import remote from cdmtaskservice.s3.client import S3ObjectMeta, S3PresignedPost @@ -39,6 +46,9 @@ _SEC_PER_GB = 2 * 60 # may want to make this configurable _CTS_SCRATCH_ROOT_DIR = Path("cdm_task_service") +_JOB_FILES = "files" +_MANIFESTS = "manifests" +_MANIFEST_FILE_PREFIX = "manifest-" _JAWS_CONF_FILENAME = "jaws.conf" @@ -129,11 +139,11 @@ def __init__( client_provider: Callable[[], str], nersc_code_path: Path, ): - self._client_provider = not_falsy(client_provider, "client_provider") + self._client_provider = _not_falsy(client_provider, "client_provider") self._nersc_code_path = self._check_path(nersc_code_path, "nersc_code_path") def _check_path(self, path: Path, name: str): - not_falsy(path, name) + _not_falsy(path, name) # commands are ok with relative paths but file uploads are not if path.expanduser().absolute() != path: raise ValueError(f"{name} must be absolute to the NERSC root dir") @@ -305,9 +315,9 @@ def _create_download_manifest( concurrency: int, insecure_ssl: bool, ) -> io.BytesIO: - require_string(job_id, "job_id") - not_falsy(objects, "objects") - not_falsy(presigned_urls, "presigned_urls") + _require_string(job_id, "job_id") + _not_falsy(objects, "objects") + _not_falsy(presigned_urls, "presigned_urls") if len(objects) != len(presigned_urls): raise ValueError("Must provide same number of paths and urls") manifest = self._base_manifest("download", concurrency, insecure_ssl) @@ -318,8 +328,7 @@ def _create_download_manifest( # job ID and the minio path and have it handle the rest. # This allows JAWS / Cromwell to cache the files if they have the # same path, which they won't if there's a job ID in the mix - "outputpath": str(self._dtn_scratch / - _CTS_SCRATCH_ROOT_DIR/ job_id / "files" / meta.path), + "outputpath": self._localize_s3_path(job_id, meta.path), "etag": meta.e_tag, "partsize": meta.effective_part_size, "size": meta.size, @@ -327,6 +336,9 @@ def _create_download_manifest( ] return io.BytesIO(json.dumps({"file-transfers": manifest}).encode()) + def _localize_s3_path(self, job_id: str, s3path: str) -> str: + return str(self._dtn_scratch / _CTS_SCRATCH_ROOT_DIR/ job_id / _JOB_FILES / s3path) + def _create_upload_manifest( self, remote_files: list[Path], @@ -334,8 +346,8 @@ def _create_upload_manifest( concurrency: int, insecure_ssl: bool, ) -> io.BytesIO: - not_falsy(remote_files, "remote_files") - not_falsy(presigned_urls, "presigned_urls") + _not_falsy(remote_files, "remote_files") + _not_falsy(presigned_urls, "presigned_urls") if len(remote_files) != len(presigned_urls): raise ValueError("Must provide same number of files and urls") manifest = self._base_manifest("upload", concurrency, insecure_ssl) @@ -351,8 +363,32 @@ def _create_upload_manifest( def _base_manifest(self, op: str, concurrency: int, insecure_ssl: bool): return { "op": op, - "concurrency": check_int(concurrency, "concurrency"), + "concurrency": _check_int(concurrency, "concurrency"), "insecure-ssl": insecure_ssl, "min-timeout-sec": _MIN_TIMEOUT_SEC, "sec-per-GB": _SEC_PER_GB, } + + async def run_JAWS(self, job: models.Job) -> str: + """ + Run a JAWS job at NERSC and return the job ID. + """ + if not _not_falsy(job, "job").job_input.inputs_are_S3File(): + raise ValueError("Job files must be S3File objects") + manifest_files = generate_manifest_files(job) + manifest_file_paths = self._get_manifest_file_paths(job.id, len(manifest_files)) + fmap = {m: self._localize_s3_path(job.id, m.file) for m in job.job_input.input_files} + wdljson = wdl.generate_wdl(job, fmap, manifest_file_paths) + # TODO REMOVE these lines + logr = logging.getLogger(__name__) + for m in manifest_files: + logr.info("***") + logr.info(m) + logr.info(f"*** wdl:\n{wdljson.wdl}\njson:\n{json.dumps(wdljson.input_json, indent=4)}") + return "fake_job_id" + + def _get_manifest_file_paths(self, job_id: str, count: int) -> list[Path]: + if count == 0: + return [] + pre = self._dtn_scratch / _CTS_SCRATCH_ROOT_DIR / job_id / _MANIFESTS + return [pre / f"{_MANIFEST_FILE_PREFIX}{c}" for c in range(1, count + 1)] diff --git a/cdmtaskservice_config.toml.jinja b/cdmtaskservice_config.toml.jinja index d8013a0..03c19ba 100644 --- a/cdmtaskservice_config.toml.jinja +++ b/cdmtaskservice_config.toml.jinja @@ -28,7 +28,9 @@ has_nersc_account_role = "{{ KBCTS_HAS_NERSC_ACCOUNT_ROLE or "HAS_NERSC_ACCOUNT" sfapi_cred_path = "{{ KBCTS_SFAPI_CRED_PATH or "" }}" # The user associated with the client credentials. If the client credentials are updated but -# the user doesn't match they will not be accepted. +# the user doesn't match they will not be accepted. It is advised to create a collaboration +# user for the service. The jaws.conf file will be created in the user's home directory on +# service startup. sfapi_user = "{{ KBCTS_SFAPI_USER or "" }}" # Where to store remote code at NERSC. This must be writeable by the service account. diff --git a/test/manifest_files_test.py b/test/manifest_files_test.py new file mode 100644 index 0000000..09ebf84 --- /dev/null +++ b/test/manifest_files_test.py @@ -0,0 +1,7 @@ +# TODO TEST add tests + +from cdmtaskservice import manifest_files # @UnusedImport + + +def test_noop(): + pass