From 3bf25b26eccf2040f1b0b9974fb99bcf1d1c9739 Mon Sep 17 00:00:00 2001 From: Gavin Date: Thu, 2 Jan 2025 12:36:27 -0800 Subject: [PATCH] Upload job result files to S3 --- cdmtaskservice/app_state.py | 2 +- cdmtaskservice/callback_url_paths.py | 11 +++++ cdmtaskservice/jaws/output.py | 68 ++++++++++++++++++++++++++ cdmtaskservice/jaws/wdl.py | 33 ++++++++++--- cdmtaskservice/jobflows/nersc_jaws.py | 42 ++++++++++++---- cdmtaskservice/nersc/manager.py | 69 +++++++++++++++++++++++---- test/jaws/output_test.py | 7 +++ 7 files changed, 209 insertions(+), 23 deletions(-) create mode 100644 cdmtaskservice/jaws/output.py create mode 100644 test/jaws/output_test.py diff --git a/cdmtaskservice/app_state.py b/cdmtaskservice/app_state.py index c65c7d8..8e4ea84 100644 --- a/cdmtaskservice/app_state.py +++ b/cdmtaskservice/app_state.py @@ -101,11 +101,11 @@ async def build_app( jaws_client = await JAWSClient.create(cfg.jaws_url, cfg.jaws_token) logr.info("Done") mongodao = await MongoDAO.create(mongocli[cfg.mongo_db]) + # TODO CODE once the nerscjawsflow is done merge job_state and job_submit back together job_state = JobState(mongodao) nerscjawsflow = NERSCJAWSRunner( # this has a lot of required args, yech nerscman, jaws_client, - job_state, # TODO CODE if this isn't necessary, remove and recombine with job_submit mongodao, s3, s3_external, diff --git a/cdmtaskservice/callback_url_paths.py b/cdmtaskservice/callback_url_paths.py index 156dfb3..97d0104 100644 --- a/cdmtaskservice/callback_url_paths.py +++ b/cdmtaskservice/callback_url_paths.py @@ -6,6 +6,7 @@ _CALLBACK = "callback" _DOWNLOAD_COMPLETE = "download" _JOB_COMPLETE = "job" +_UPLOAD_COMPLETE = "upload" def get_download_complete_callback(root_url: str = None, job_id: str = None): @@ -28,6 +29,16 @@ def get_job_complete_callback(root_url: str = None, job_id: str = None): return _get_callback(_JOB_COMPLETE, root_url, job_id) +def get_upload_complete_callback(root_url: str = None, job_id: str = None): + """ + Get a url or path for a service callback to communicate that an upload is complete. + + root_url - prepend the path with the given root url. + job_id - suffix the path with a job ID. + """ + return _get_callback(_UPLOAD_COMPLETE, root_url, job_id) + + def _get_callback(subpath: str, root_url: str = None, job_id: str = None): cb = [root_url] if root_url else [] cb += [_CALLBACK, subpath] diff --git a/cdmtaskservice/jaws/output.py b/cdmtaskservice/jaws/output.py new file mode 100644 index 0000000..653b0de --- /dev/null +++ b/cdmtaskservice/jaws/output.py @@ -0,0 +1,68 @@ +""" +Code for handling JAWS output files. +""" + +import io +import json +from typing import NamedTuple + +from cdmtaskservice.arg_checkers import not_falsy as _not_falsy +from cdmtaskservice.jaws.wdl import OUTPUT_FILES, OUTPUT_DIR, STDOUTS, STDERRS + + +OUTPUTS_JSON_FILE = "outputs.json" +""" +The name of the file in the JAWS output directory containing the output file paths. +""" + + +class OutputsJSON(NamedTuple): + """ + The parsed contents of the JAWS outputs.json file, which contains the paths to files + output by the JAWS job. + """ + + output_files: dict[str, str] + """ + The result file paths of the job as a dict of the relative path in the output directory + for the job container to the relative path in the JAWS results directory. + """ + # Could maybe be a little more efficient by wrapping the paths in a class and parsing the + # container path on demand... YAGNI + + stdout: list[str] + """ + The standard out file paths relative to the JAWS results directory, ordered by + container number. + """ + + stderr: list[str] + """ + The standard error file paths relative to the JAWS results directory, ordered by + container number. + """ + + +def parse_outputs_json(outputs_json: io.BytesIO) -> OutputsJSON: + """ + Parse the JAWS outputs.json file from a completed run. If any output files have the same + path inside their specific container, only one path is returned and which path is returned + is not specified. + """ + js = json.load(_not_falsy(outputs_json, "outputs_json")) + outfiles = {} + stdo = [] + stde = [] + for key, val in js.items(): + if key.endswith(OUTPUT_FILES): + for files in val: + outfiles.update({f.split(f"/{OUTPUT_DIR}/")[-1]: f for f in files}) + # assume files are ordered correctly. If this is wrong sort by path first + elif key.endswith(STDOUTS): + stdo = val + elif key.endswith(STDERRS): + stde = val + else: + # shouldn't happen, but let's not fail silently if it does + raise ValueError(f"unexpected JAWS outputs.json key: {key}") + return OutputsJSON(outfiles, stdo, stde) diff --git a/cdmtaskservice/jaws/wdl.py b/cdmtaskservice/jaws/wdl.py index 2310375..9f05e2f 100644 --- a/cdmtaskservice/jaws/wdl.py +++ b/cdmtaskservice/jaws/wdl.py @@ -22,6 +22,27 @@ from cdmtaskservice.input_file_locations import determine_file_locations +OUTPUT_FILES = "output_files" +""" +The key specifying the location of output files in the WDL output. +""" + +STDOUTS = "stdouts" +""" +The key specifying the location of standard out files in the WDL output. +""" + +STDERRS = "stderrs" +""" +The key specifying the location of standard error files in the WDL output. +""" + +OUTPUT_DIR = "__output__" +""" +The directory containing the job output. The directory name is expected to be unique in the path. +""" + + _WDL_VERSION = "1.0" # Cromwell, and therefore JAWS, only supports 1.0 _IMAGE_TRANS_CHARS = str.maketrans({".": "_", "-": "_", "/": "_", ":": "_"}) @@ -139,9 +160,9 @@ def _generate_wdl(job: Job, workflow_name: str, manifests: bool): }} }} output {{ - Array[Array[File]] output_files = run_container.output_files - Array[File] stdouts = run_container.stdout - Array[File] stderrs = run_container.stderr + Array[Array[File]] {OUTPUT_FILES} = run_container.output_files + Array[File] {STDOUTS} = run_container.stdout + Array[File] {STDERRS} = run_container.stderr }} }} @@ -155,11 +176,11 @@ def _generate_wdl(job: Job, workflow_name: str, manifests: bool): command <<< # ensure host mount points exist mkdir -p ./__input__ - mkdir -p ./__output__ + mkdir -p ./{OUTPUT_DIR} # TODO MOUNTING remove hack for collections containers ln -s __input__ collectionssource - ln -s __output__ collectionsdata + ln -s {OUTPUT_DIR} collectionsdata # link any manifest file into the mount point if [[ -n "~{{manifest}}" ]]; then @@ -186,7 +207,7 @@ def _generate_wdl(job: Job, workflow_name: str, manifests: bool): echo "Entrypoint exit code: $EC" # list the output of the command - find ./__output__ -type f > ./output_files.txt + find ./{OUTPUT_DIR} -type f > ./output_files.txt exit $EC >>> diff --git a/cdmtaskservice/jobflows/nersc_jaws.py b/cdmtaskservice/jobflows/nersc_jaws.py index 82a8c3e..0cbd7f0 100644 --- a/cdmtaskservice/jobflows/nersc_jaws.py +++ b/cdmtaskservice/jobflows/nersc_jaws.py @@ -3,19 +3,23 @@ """ import logging +import os +from pathlib import Path from typing import Any from cdmtaskservice import models from cdmtaskservice import timestamp from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string -from cdmtaskservice.callback_url_paths import get_download_complete_callback +from cdmtaskservice.callback_url_paths import ( + get_download_complete_callback, + get_upload_complete_callback, +) from cdmtaskservice.coroutine_manager import CoroutineWrangler from cdmtaskservice.exceptions import InvalidJobStateError from cdmtaskservice.jaws.client import JAWSClient -from cdmtaskservice.job_state import JobState from cdmtaskservice.mongo import MongoDAO from cdmtaskservice.nersc.manager import NERSCManager -from cdmtaskservice.s3.client import S3Client, S3ObjectMeta +from cdmtaskservice.s3.client import S3Client, S3ObjectMeta, S3PresignedPost from cdmtaskservice.s3.paths import S3Paths # Not sure how other flows would work and how much code they might share. For now just make @@ -31,7 +35,6 @@ def __init__( self, nersc_manager: NERSCManager, jaws_client: JAWSClient, - job_state: JobState, mongodao: MongoDAO, s3_client: S3Client, s3_external_client: S3Client, @@ -44,7 +47,6 @@ def __init__( nersc_manager - the NERSC manager. jaws_client - a JAWS Central client. - job_state - the job state manager. mongodao - the Mongo DAO object. s3_client - an S3 client pointed to the data stores. s3_external_client - an S3 client pointing to an external URL for the S3 data stores @@ -57,7 +59,6 @@ def __init__( """ self._nman = _not_falsy(nersc_manager, "nersc_manager") self._jaws = _not_falsy(jaws_client, "jaws_client") - self._jstate = _not_falsy(job_state, "job_state") self._mongo = _not_falsy(mongodao, "mongodao") self._s3 = _not_falsy(s3_client, "s3_client") self._s3ext = _not_falsy(s3_external_client, "s3_external_client") @@ -182,5 +183,30 @@ async def job_complete(self, job: models.AdminJobDetails): async def _upload_files(self, job: models.AdminJobDetails, jaws_info: dict[str, Any]): logr = logging.getLogger(__name__) - # TODO REMOVE after implementing file upload - logr.info(f"Starting file upload for job {job.id} JAWS run {jaws_info['id']}") + + async def presign(output_files: list[Path]) -> list[S3PresignedPost]: + root = job.job_input.output_dir + # TODO PERF this parses the paths yet again + # TODO RELIABILITY config / set expiration time + paths = S3Paths([os.path.join(root, f) for f in output_files]) + return await self._s3ext.presign_post_urls(paths) + + try: + # TODO PERF config / set concurrency + # TODO DISKSPACE will need to clean up job results @ NERSC + # TODO LOGGING make the remote code log summary of results and upload and store + upload_task_id = await self._nman.upload_JAWS_job_files( + job, + jaws_info["output_dir"], + presign, + get_upload_complete_callback(self._callback_root, job.id), + insecure_ssl=self._s3insecure, + ) + logr.info(f"got upload task id {upload_task_id} for job {job.id}") + # TODO UPLOAD save task ID in mongo & update job state + # See notes above about adding the NERSC task id to the job + except Exception as e: + # TODO LOGGING figure out how logging it going to work etc. + logr.exception(f"Error starting upload for job {job.id}") + # TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise + raise e diff --git a/cdmtaskservice/nersc/manager.py b/cdmtaskservice/nersc/manager.py index a1904a4..e3e32e8 100644 --- a/cdmtaskservice/nersc/manager.py +++ b/cdmtaskservice/nersc/manager.py @@ -8,6 +8,7 @@ import inspect import json import logging +import os from pathlib import Path from sfapi_client import AsyncClient from sfapi_client.exceptions import SfApiError @@ -15,7 +16,7 @@ from sfapi_client.compute import Machine, AsyncCompute import sys from types import ModuleType -from typing import Self +from typing import Self, Awaitable from cdmtaskservice import models from cdmtaskservice.arg_checkers import ( @@ -23,7 +24,10 @@ require_string as _require_string, check_int as _check_int, ) -from cdmtaskservice.jaws import wdl +from cdmtaskservice.jaws import ( + wdl, + output as jaws_output +) from cdmtaskservice.manifest_files import generate_manifest_files from cdmtaskservice.nersc import remote from cdmtaskservice.s3.client import S3ObjectMeta, S3PresignedPost @@ -239,8 +243,7 @@ async def _upload_file_to_nersc( await compute.run(cmd) # skip some API calls vs. the upload example in the NERSC docs # don't use a directory as the target or it makes an API call - asrp = AsyncRemotePath(path=target, compute=compute) - asrp.perms = "-" # hack to prevent an unnecessary network call + asrp = self._get_async_path(compute, target) # TODO ERRORHANDLING throw custom errors if file: with open(file, "rb") as f: @@ -251,6 +254,11 @@ async def _upload_file_to_nersc( cmd = f"{dtw}chmod {chmod} {target}" await compute.run(cmd) + def _get_async_path(self, compute: AsyncCompute, target: Path) -> AsyncRemotePath: + asrp = AsyncRemotePath(path=target, compute=compute) + asrp.perms = "-" # hack to prevent an unnecessary network call + return asrp + async def download_s3_files( self, job_id: str, @@ -268,8 +276,7 @@ async def download_s3_files( objects - the S3 files to download. presigned_urls - the presigned download URLs for each object, in the same order as the objects. - callback_url - the URL to provide to NERSC as a callback for when the download is - complete. + callback_url - the URL to GET as a callback for when the download is complete. concurrency - the number of simultaneous downloads to process. insecure_ssl - whether to skip the cert check for the S3 URL. @@ -297,8 +304,7 @@ async def upload_s3_files( remote_files - the files to upload. presigned_urls - the presigned upload URLs for each file, in the same order as the file. - callback_url - the URL to provide to NERSC as a callback for when the upload is - complete. + callback_url - the URL to GET as a callback for when the upload is complete. concurrency - the number of simultaneous uploads to process. insecure_ssl - whether to skip the cert check for the S3 URL. @@ -475,3 +481,50 @@ def _get_manifest_file_paths(self, job_id: str, count: int) -> list[Path]: if count == 0: return [] return [_JOB_MANIFESTS / f"{_MANIFEST_FILE_PREFIX}{c}" for c in range(1, count + 1)] + + async def upload_JAWS_job_files( + self, + job: models.Job, + jaws_output_dir: Path, + files_to_urls: Callable[[list[Path]], Awaitable[list[S3PresignedPost]]], + callback_url: str, + concurrency: int = 10, + insecure_ssl: bool = False + ) -> str: + """ + Upload a set of output files from a JAWS run to S3. + + job - the job being processed. No other transfers should be occurring for the job. + jaws_output_dir - the NERSC output directory of the JAWS job containing the output files, + manifests, etc. + files_to_urls - an async function that provides a list of presigned S3 upload urls + given relative paths to files. The returned list must be in the same order as the input + list. + callback_url - the URL to GET as a callback for when the upload is complete. + concurrency - the number of simultaneous uploads to process. + insecure_ssl - whether to skip the cert check for the S3 URL. + + Returns the NERSC task ID for the upload. + """ + _not_falsy(job, "job") + _not_falsy(files_to_urls, "files_to_urls") + cburl = _require_string(callback_url, "callback_url") + _check_int(concurrency, "concurrency") + outfilepath = Path( + _require_string(jaws_output_dir, "jaws_output_dir")) / jaws_output.OUTPUTS_JSON_FILE + cli = self._client_provider() + dtns = await cli.compute(Machine.dtns) + outputs_json = await self._get_async_path(dtns, outfilepath).download() + outputs = jaws_output.parse_outputs_json(outputs_json) + container_files = list(outputs.output_files.keys()) + presigns = await files_to_urls(container_files) + # TODO CORRECTNESS IMPORTANT log etags in remote code, upload and return + # TODO LOGGING upload job stdout and stderr logs + return await self.upload_s3_files( + job.id, + [os.path.join(jaws_output_dir, outputs.output_files[f]) for f in container_files], + presigns, + cburl, + concurrency, + insecure_ssl, + ) diff --git a/test/jaws/output_test.py b/test/jaws/output_test.py new file mode 100644 index 0000000..c933d98 --- /dev/null +++ b/test/jaws/output_test.py @@ -0,0 +1,7 @@ +# TODO TEST add tests + +from cdmtaskservice.jaws import output # @UnusedImport + + +def test_noop(): + pass