Skip to content

Commit

Permalink
Merge pull request #136 from kbase/dev-service
Browse files Browse the repository at this point in the history
Generate wdl and input.json files for job submit
  • Loading branch information
MrCreosote authored Dec 20, 2024
2 parents 38079c7 + 9e18e84 commit 620d1db
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 49 deletions.
1 change: 1 addition & 0 deletions cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ async def build_app(
sfapi_client = await NERSCSFAPIClientProvider.create(Path(cfg.sfapi_cred_path), cfg.sfapi_user)
logr.info("Done")
logr.info("Setting up NERSC manager and installing code at NERSC...")
# TODO MULTICLUSTER service won't start if perlmutter is down, need to make it more dynamic
remote_code_loc = Path(cfg.nersc_remote_code_dir) / VERSION
nerscman = await NERSCManager.create(
sfapi_client.get_client, remote_code_loc, cfg.jaws_token, cfg.jaws_group
Expand Down
40 changes: 3 additions & 37 deletions cdmtaskservice/jobflows/nersc_jaws.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
"""

import logging
import os

from cdmtaskservice import models
from cdmtaskservice import timestamp
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string
from cdmtaskservice.callback_url_paths import get_download_complete_callback
from cdmtaskservice.coroutine_manager import CoroutineWrangler
from cdmtaskservice.exceptions import InvalidJobStateError
from cdmtaskservice.input_file_locations import determine_file_locations
from cdmtaskservice.job_state import JobState
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.nersc.manager import NERSCManager
Expand Down Expand Up @@ -135,39 +133,7 @@ async def download_complete(self, job: models.AdminJobDetails):
await self._coman.run_coroutine(self._download_complete(job))

async def _download_complete(self, job: models.AdminJobDetails):
jaws_job_id = await self._nman.run_JAWS(job)
# TODO JAWS record job ID in DB
logr = logging.getLogger(__name__)
manifest_files = self._generate_manifest_files(job)
# TODO REMOVE these lines
logr.info(f"*** manifiles: {len(manifest_files)}")
for m in manifest_files:
logr.info(m)
logr.info("***")

def _generate_manifest_files(self, job: models.AdminJobDetails) -> list[str]:
# If we support multiple compute sites this should be moved to a general code location
# Currently leave here rather than creating yet another module
manifests = []
mani_spec = job.job_input.params.get_file_parameter()
if not mani_spec or mani_spec.type is not models.ParameterType.MANIFEST_FILE:
return manifests
file_to_rel_path = determine_file_locations(job.job_input)
for files in job.job_input.get_files_per_container().files:
manifest = ""
if mani_spec.manifest_file_header:
manifest = f"{mani_spec.manifest_file_header}\n"
for f in files:
match mani_spec.manifest_file_format:
case models.ManifestFileFormat.DATA_IDS:
manifest += f"{f.data_id}\n"
case models.ManifestFileFormat.FILES:
f = os.path.join(
job.job_input.params.input_mount_point, file_to_rel_path[f]
)
manifest += f"{f}\n"
case _:
# Can't currently happen but for future safety
raise ValueError(
f"Unknown manifest file format: {manifest.manifest_file_format}"
)
manifests.append(manifest)
return manifests
logr.info(f"JAWS job id: {jaws_job_id}")
41 changes: 41 additions & 0 deletions cdmtaskservice/manifest_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Methods for dealing with manifest files for jobs.
"""

import os

from cdmtaskservice import models
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy
from cdmtaskservice.input_file_locations import determine_file_locations


def generate_manifest_files(job: models.Job) -> list[str]:
"""
Generate a set of manifest files as strings for a job, or return an empty list if no
manifest files are specified.
"""
manifests = []
mani_spec = _not_falsy(job, "job").job_input.params.get_file_parameter()
if not mani_spec or mani_spec.type is not models.ParameterType.MANIFEST_FILE:
return manifests
file_to_rel_path = determine_file_locations(job.job_input)
for files in job.job_input.get_files_per_container().files:
manifest = ""
if mani_spec.manifest_file_header:
manifest = f"{mani_spec.manifest_file_header}\n"
for f in files:
match mani_spec.manifest_file_format:
case models.ManifestFileFormat.DATA_IDS:
manifest += f"{f.data_id}\n"
case models.ManifestFileFormat.FILES:
f = os.path.join(
job.job_input.params.input_mount_point, file_to_rel_path[f]
)
manifest += f"{f}\n"
case _:
# Can't currently happen but for future safety
raise ValueError(
f"Unknown manifest file format: {manifest.manifest_file_format}"
)
manifests.append(manifest)
return manifests
58 changes: 47 additions & 11 deletions cdmtaskservice/nersc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@
from types import ModuleType
from typing import Self

from cdmtaskservice.arg_checkers import not_falsy, require_string, check_int
from cdmtaskservice import models
from cdmtaskservice.arg_checkers import (
not_falsy as _not_falsy,
require_string as _require_string,
check_int as _check_int,
)
from cdmtaskservice.jaws import wdl
from cdmtaskservice.manifest_files import generate_manifest_files
from cdmtaskservice.nersc import remote
from cdmtaskservice.s3.client import S3ObjectMeta, S3PresignedPost

Expand All @@ -39,6 +46,9 @@
_SEC_PER_GB = 2 * 60 # may want to make this configurable

_CTS_SCRATCH_ROOT_DIR = Path("cdm_task_service")
_JOB_FILES = "files"
_MANIFESTS = "manifests"
_MANIFEST_FILE_PREFIX = "manifest-"


_JAWS_CONF_FILENAME = "jaws.conf"
Expand Down Expand Up @@ -129,11 +139,11 @@ def __init__(
client_provider: Callable[[], str],
nersc_code_path: Path,
):
self._client_provider = not_falsy(client_provider, "client_provider")
self._client_provider = _not_falsy(client_provider, "client_provider")
self._nersc_code_path = self._check_path(nersc_code_path, "nersc_code_path")

def _check_path(self, path: Path, name: str):
not_falsy(path, name)
_not_falsy(path, name)
# commands are ok with relative paths but file uploads are not
if path.expanduser().absolute() != path:
raise ValueError(f"{name} must be absolute to the NERSC root dir")
Expand Down Expand Up @@ -305,9 +315,9 @@ def _create_download_manifest(
concurrency: int,
insecure_ssl: bool,
) -> io.BytesIO:
require_string(job_id, "job_id")
not_falsy(objects, "objects")
not_falsy(presigned_urls, "presigned_urls")
_require_string(job_id, "job_id")
_not_falsy(objects, "objects")
_not_falsy(presigned_urls, "presigned_urls")
if len(objects) != len(presigned_urls):
raise ValueError("Must provide same number of paths and urls")
manifest = self._base_manifest("download", concurrency, insecure_ssl)
Expand All @@ -318,24 +328,26 @@ def _create_download_manifest(
# job ID and the minio path and have it handle the rest.
# This allows JAWS / Cromwell to cache the files if they have the
# same path, which they won't if there's a job ID in the mix
"outputpath": str(self._dtn_scratch /
_CTS_SCRATCH_ROOT_DIR/ job_id / "files" / meta.path),
"outputpath": self._localize_s3_path(job_id, meta.path),
"etag": meta.e_tag,
"partsize": meta.effective_part_size,
"size": meta.size,
} for url, meta in zip(presigned_urls, objects)
]
return io.BytesIO(json.dumps({"file-transfers": manifest}).encode())

def _localize_s3_path(self, job_id: str, s3path: str) -> str:
return str(self._dtn_scratch / _CTS_SCRATCH_ROOT_DIR/ job_id / _JOB_FILES / s3path)

def _create_upload_manifest(
self,
remote_files: list[Path],
presigned_urls: list[S3PresignedPost],
concurrency: int,
insecure_ssl: bool,
) -> io.BytesIO:
not_falsy(remote_files, "remote_files")
not_falsy(presigned_urls, "presigned_urls")
_not_falsy(remote_files, "remote_files")
_not_falsy(presigned_urls, "presigned_urls")
if len(remote_files) != len(presigned_urls):
raise ValueError("Must provide same number of files and urls")
manifest = self._base_manifest("upload", concurrency, insecure_ssl)
Expand All @@ -351,8 +363,32 @@ def _create_upload_manifest(
def _base_manifest(self, op: str, concurrency: int, insecure_ssl: bool):
return {
"op": op,
"concurrency": check_int(concurrency, "concurrency"),
"concurrency": _check_int(concurrency, "concurrency"),
"insecure-ssl": insecure_ssl,
"min-timeout-sec": _MIN_TIMEOUT_SEC,
"sec-per-GB": _SEC_PER_GB,
}

async def run_JAWS(self, job: models.Job) -> str:
"""
Run a JAWS job at NERSC and return the job ID.
"""
if not _not_falsy(job, "job").job_input.inputs_are_S3File():
raise ValueError("Job files must be S3File objects")
manifest_files = generate_manifest_files(job)
manifest_file_paths = self._get_manifest_file_paths(job.id, len(manifest_files))
fmap = {m: self._localize_s3_path(job.id, m.file) for m in job.job_input.input_files}
wdljson = wdl.generate_wdl(job, fmap, manifest_file_paths)
# TODO REMOVE these lines
logr = logging.getLogger(__name__)
for m in manifest_files:
logr.info("***")
logr.info(m)
logr.info(f"*** wdl:\n{wdljson.wdl}\njson:\n{json.dumps(wdljson.input_json, indent=4)}")
return "fake_job_id"

def _get_manifest_file_paths(self, job_id: str, count: int) -> list[Path]:
if count == 0:
return []
pre = self._dtn_scratch / _CTS_SCRATCH_ROOT_DIR / job_id / _MANIFESTS
return [pre / f"{_MANIFEST_FILE_PREFIX}{c}" for c in range(1, count + 1)]
4 changes: 3 additions & 1 deletion cdmtaskservice_config.toml.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ has_nersc_account_role = "{{ KBCTS_HAS_NERSC_ACCOUNT_ROLE or "HAS_NERSC_ACCOUNT"
sfapi_cred_path = "{{ KBCTS_SFAPI_CRED_PATH or "" }}"

# The user associated with the client credentials. If the client credentials are updated but
# the user doesn't match they will not be accepted.
# the user doesn't match they will not be accepted. It is advised to create a collaboration
# user for the service. The jaws.conf file will be created in the user's home directory on
# service startup.
sfapi_user = "{{ KBCTS_SFAPI_USER or "" }}"

# Where to store remote code at NERSC. This must be writeable by the service account.
Expand Down
7 changes: 7 additions & 0 deletions test/manifest_files_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# TODO TEST add tests

from cdmtaskservice import manifest_files # @UnusedImport


def test_noop():
pass

0 comments on commit 620d1db

Please sign in to comment.