Skip to content

Commit

Permalink
Merge pull request #155 from kbase/dev-service
Browse files Browse the repository at this point in the history
Upload job result files to S3
  • Loading branch information
MrCreosote authored Jan 8, 2025
2 parents e8cddaa + 3bf25b2 commit bcbe82c
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ async def build_app(
jaws_client = await JAWSClient.create(cfg.jaws_url, cfg.jaws_token)
logr.info("Done")
mongodao = await MongoDAO.create(mongocli[cfg.mongo_db])
# TODO CODE once the nerscjawsflow is done merge job_state and job_submit back together
job_state = JobState(mongodao)
nerscjawsflow = NERSCJAWSRunner( # this has a lot of required args, yech
nerscman,
jaws_client,
job_state, # TODO CODE if this isn't necessary, remove and recombine with job_submit
mongodao,
s3,
s3_external,
Expand Down
11 changes: 11 additions & 0 deletions cdmtaskservice/callback_url_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
_CALLBACK = "callback"
_DOWNLOAD_COMPLETE = "download"
_JOB_COMPLETE = "job"
_UPLOAD_COMPLETE = "upload"


def get_download_complete_callback(root_url: str = None, job_id: str = None):
Expand All @@ -28,6 +29,16 @@ def get_job_complete_callback(root_url: str = None, job_id: str = None):
return _get_callback(_JOB_COMPLETE, root_url, job_id)


def get_upload_complete_callback(root_url: str = None, job_id: str = None):
"""
Get a url or path for a service callback to communicate that an upload is complete.
root_url - prepend the path with the given root url.
job_id - suffix the path with a job ID.
"""
return _get_callback(_UPLOAD_COMPLETE, root_url, job_id)


def _get_callback(subpath: str, root_url: str = None, job_id: str = None):
cb = [root_url] if root_url else []
cb += [_CALLBACK, subpath]
Expand Down
68 changes: 68 additions & 0 deletions cdmtaskservice/jaws/output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
Code for handling JAWS output files.
"""

import io
import json
from typing import NamedTuple

from cdmtaskservice.arg_checkers import not_falsy as _not_falsy
from cdmtaskservice.jaws.wdl import OUTPUT_FILES, OUTPUT_DIR, STDOUTS, STDERRS


OUTPUTS_JSON_FILE = "outputs.json"
"""
The name of the file in the JAWS output directory containing the output file paths.
"""


class OutputsJSON(NamedTuple):
"""
The parsed contents of the JAWS outputs.json file, which contains the paths to files
output by the JAWS job.
"""

output_files: dict[str, str]
"""
The result file paths of the job as a dict of the relative path in the output directory
for the job container to the relative path in the JAWS results directory.
"""
# Could maybe be a little more efficient by wrapping the paths in a class and parsing the
# container path on demand... YAGNI

stdout: list[str]
"""
The standard out file paths relative to the JAWS results directory, ordered by
container number.
"""

stderr: list[str]
"""
The standard error file paths relative to the JAWS results directory, ordered by
container number.
"""


def parse_outputs_json(outputs_json: io.BytesIO) -> OutputsJSON:
"""
Parse the JAWS outputs.json file from a completed run. If any output files have the same
path inside their specific container, only one path is returned and which path is returned
is not specified.
"""
js = json.load(_not_falsy(outputs_json, "outputs_json"))
outfiles = {}
stdo = []
stde = []
for key, val in js.items():
if key.endswith(OUTPUT_FILES):
for files in val:
outfiles.update({f.split(f"/{OUTPUT_DIR}/")[-1]: f for f in files})
# assume files are ordered correctly. If this is wrong sort by path first
elif key.endswith(STDOUTS):
stdo = val
elif key.endswith(STDERRS):
stde = val
else:
# shouldn't happen, but let's not fail silently if it does
raise ValueError(f"unexpected JAWS outputs.json key: {key}")
return OutputsJSON(outfiles, stdo, stde)
33 changes: 27 additions & 6 deletions cdmtaskservice/jaws/wdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@
from cdmtaskservice.input_file_locations import determine_file_locations


OUTPUT_FILES = "output_files"
"""
The key specifying the location of output files in the WDL output.
"""

STDOUTS = "stdouts"
"""
The key specifying the location of standard out files in the WDL output.
"""

STDERRS = "stderrs"
"""
The key specifying the location of standard error files in the WDL output.
"""

OUTPUT_DIR = "__output__"
"""
The directory containing the job output. The directory name is expected to be unique in the path.
"""


_WDL_VERSION = "1.0" # Cromwell, and therefore JAWS, only supports 1.0

_IMAGE_TRANS_CHARS = str.maketrans({".": "_", "-": "_", "/": "_", ":": "_"})
Expand Down Expand Up @@ -139,9 +160,9 @@ def _generate_wdl(job: Job, workflow_name: str, manifests: bool):
}}
}}
output {{
Array[Array[File]] output_files = run_container.output_files
Array[File] stdouts = run_container.stdout
Array[File] stderrs = run_container.stderr
Array[Array[File]] {OUTPUT_FILES} = run_container.output_files
Array[File] {STDOUTS} = run_container.stdout
Array[File] {STDERRS} = run_container.stderr
}}
}}
Expand All @@ -155,11 +176,11 @@ def _generate_wdl(job: Job, workflow_name: str, manifests: bool):
command <<<
# ensure host mount points exist
mkdir -p ./__input__
mkdir -p ./__output__
mkdir -p ./{OUTPUT_DIR}
# TODO MOUNTING remove hack for collections containers
ln -s __input__ collectionssource
ln -s __output__ collectionsdata
ln -s {OUTPUT_DIR} collectionsdata
# link any manifest file into the mount point
if [[ -n "~{{manifest}}" ]]; then
Expand All @@ -186,7 +207,7 @@ def _generate_wdl(job: Job, workflow_name: str, manifests: bool):
echo "Entrypoint exit code: $EC"
# list the output of the command
find ./__output__ -type f > ./output_files.txt
find ./{OUTPUT_DIR} -type f > ./output_files.txt
exit $EC
>>>
Expand Down
42 changes: 34 additions & 8 deletions cdmtaskservice/jobflows/nersc_jaws.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@
"""

import logging
import os
from pathlib import Path
from typing import Any

from cdmtaskservice import models
from cdmtaskservice import timestamp
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string
from cdmtaskservice.callback_url_paths import get_download_complete_callback
from cdmtaskservice.callback_url_paths import (
get_download_complete_callback,
get_upload_complete_callback,
)
from cdmtaskservice.coroutine_manager import CoroutineWrangler
from cdmtaskservice.exceptions import InvalidJobStateError
from cdmtaskservice.jaws.client import JAWSClient
from cdmtaskservice.job_state import JobState
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.nersc.manager import NERSCManager
from cdmtaskservice.s3.client import S3Client, S3ObjectMeta
from cdmtaskservice.s3.client import S3Client, S3ObjectMeta, S3PresignedPost
from cdmtaskservice.s3.paths import S3Paths

# Not sure how other flows would work and how much code they might share. For now just make
Expand All @@ -31,7 +35,6 @@ def __init__(
self,
nersc_manager: NERSCManager,
jaws_client: JAWSClient,
job_state: JobState,
mongodao: MongoDAO,
s3_client: S3Client,
s3_external_client: S3Client,
Expand All @@ -44,7 +47,6 @@ def __init__(
nersc_manager - the NERSC manager.
jaws_client - a JAWS Central client.
job_state - the job state manager.
mongodao - the Mongo DAO object.
s3_client - an S3 client pointed to the data stores.
s3_external_client - an S3 client pointing to an external URL for the S3 data stores
Expand All @@ -57,7 +59,6 @@ def __init__(
"""
self._nman = _not_falsy(nersc_manager, "nersc_manager")
self._jaws = _not_falsy(jaws_client, "jaws_client")
self._jstate = _not_falsy(job_state, "job_state")
self._mongo = _not_falsy(mongodao, "mongodao")
self._s3 = _not_falsy(s3_client, "s3_client")
self._s3ext = _not_falsy(s3_external_client, "s3_external_client")
Expand Down Expand Up @@ -182,5 +183,30 @@ async def job_complete(self, job: models.AdminJobDetails):

async def _upload_files(self, job: models.AdminJobDetails, jaws_info: dict[str, Any]):
logr = logging.getLogger(__name__)
# TODO REMOVE after implementing file upload
logr.info(f"Starting file upload for job {job.id} JAWS run {jaws_info['id']}")

async def presign(output_files: list[Path]) -> list[S3PresignedPost]:
root = job.job_input.output_dir
# TODO PERF this parses the paths yet again
# TODO RELIABILITY config / set expiration time
paths = S3Paths([os.path.join(root, f) for f in output_files])
return await self._s3ext.presign_post_urls(paths)

try:
# TODO PERF config / set concurrency
# TODO DISKSPACE will need to clean up job results @ NERSC
# TODO LOGGING make the remote code log summary of results and upload and store
upload_task_id = await self._nman.upload_JAWS_job_files(
job,
jaws_info["output_dir"],
presign,
get_upload_complete_callback(self._callback_root, job.id),
insecure_ssl=self._s3insecure,
)
logr.info(f"got upload task id {upload_task_id} for job {job.id}")
# TODO UPLOAD save task ID in mongo & update job state
# See notes above about adding the NERSC task id to the job
except Exception as e:
# TODO LOGGING figure out how logging it going to work etc.
logr.exception(f"Error starting upload for job {job.id}")
# TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise
raise e
69 changes: 61 additions & 8 deletions cdmtaskservice/nersc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,26 @@
import inspect
import json
import logging
import os
from pathlib import Path
from sfapi_client import AsyncClient
from sfapi_client.exceptions import SfApiError
from sfapi_client.paths import AsyncRemotePath
from sfapi_client.compute import Machine, AsyncCompute
import sys
from types import ModuleType
from typing import Self
from typing import Self, Awaitable

from cdmtaskservice import models
from cdmtaskservice.arg_checkers import (
not_falsy as _not_falsy,
require_string as _require_string,
check_int as _check_int,
)
from cdmtaskservice.jaws import wdl
from cdmtaskservice.jaws import (
wdl,
output as jaws_output
)
from cdmtaskservice.manifest_files import generate_manifest_files
from cdmtaskservice.nersc import remote
from cdmtaskservice.s3.client import S3ObjectMeta, S3PresignedPost
Expand Down Expand Up @@ -239,8 +243,7 @@ async def _upload_file_to_nersc(
await compute.run(cmd)
# skip some API calls vs. the upload example in the NERSC docs
# don't use a directory as the target or it makes an API call
asrp = AsyncRemotePath(path=target, compute=compute)
asrp.perms = "-" # hack to prevent an unnecessary network call
asrp = self._get_async_path(compute, target)
# TODO ERRORHANDLING throw custom errors
if file:
with open(file, "rb") as f:
Expand All @@ -251,6 +254,11 @@ async def _upload_file_to_nersc(
cmd = f"{dtw}chmod {chmod} {target}"
await compute.run(cmd)

def _get_async_path(self, compute: AsyncCompute, target: Path) -> AsyncRemotePath:
asrp = AsyncRemotePath(path=target, compute=compute)
asrp.perms = "-" # hack to prevent an unnecessary network call
return asrp

async def download_s3_files(
self,
job_id: str,
Expand All @@ -268,8 +276,7 @@ async def download_s3_files(
objects - the S3 files to download.
presigned_urls - the presigned download URLs for each object, in the same order as
the objects.
callback_url - the URL to provide to NERSC as a callback for when the download is
complete.
callback_url - the URL to GET as a callback for when the download is complete.
concurrency - the number of simultaneous downloads to process.
insecure_ssl - whether to skip the cert check for the S3 URL.
Expand Down Expand Up @@ -297,8 +304,7 @@ async def upload_s3_files(
remote_files - the files to upload.
presigned_urls - the presigned upload URLs for each file, in the same order as
the file.
callback_url - the URL to provide to NERSC as a callback for when the upload is
complete.
callback_url - the URL to GET as a callback for when the upload is complete.
concurrency - the number of simultaneous uploads to process.
insecure_ssl - whether to skip the cert check for the S3 URL.
Expand Down Expand Up @@ -475,3 +481,50 @@ def _get_manifest_file_paths(self, job_id: str, count: int) -> list[Path]:
if count == 0:
return []
return [_JOB_MANIFESTS / f"{_MANIFEST_FILE_PREFIX}{c}" for c in range(1, count + 1)]

async def upload_JAWS_job_files(
self,
job: models.Job,
jaws_output_dir: Path,
files_to_urls: Callable[[list[Path]], Awaitable[list[S3PresignedPost]]],
callback_url: str,
concurrency: int = 10,
insecure_ssl: bool = False
) -> str:
"""
Upload a set of output files from a JAWS run to S3.
job - the job being processed. No other transfers should be occurring for the job.
jaws_output_dir - the NERSC output directory of the JAWS job containing the output files,
manifests, etc.
files_to_urls - an async function that provides a list of presigned S3 upload urls
given relative paths to files. The returned list must be in the same order as the input
list.
callback_url - the URL to GET as a callback for when the upload is complete.
concurrency - the number of simultaneous uploads to process.
insecure_ssl - whether to skip the cert check for the S3 URL.
Returns the NERSC task ID for the upload.
"""
_not_falsy(job, "job")
_not_falsy(files_to_urls, "files_to_urls")
cburl = _require_string(callback_url, "callback_url")
_check_int(concurrency, "concurrency")
outfilepath = Path(
_require_string(jaws_output_dir, "jaws_output_dir")) / jaws_output.OUTPUTS_JSON_FILE
cli = self._client_provider()
dtns = await cli.compute(Machine.dtns)
outputs_json = await self._get_async_path(dtns, outfilepath).download()
outputs = jaws_output.parse_outputs_json(outputs_json)
container_files = list(outputs.output_files.keys())
presigns = await files_to_urls(container_files)
# TODO CORRECTNESS IMPORTANT log etags in remote code, upload and return
# TODO LOGGING upload job stdout and stderr logs
return await self.upload_s3_files(
job.id,
[os.path.join(jaws_output_dir, outputs.output_files[f]) for f in container_files],
presigns,
cburl,
concurrency,
insecure_ssl,
)
7 changes: 7 additions & 0 deletions test/jaws/output_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# TODO TEST add tests

from cdmtaskservice.jaws import output # @UnusedImport


def test_noop():
pass

0 comments on commit bcbe82c

Please sign in to comment.