diff --git a/cdmtaskservice/jaws/output.py b/cdmtaskservice/jaws/output.py index 653b0de..a8d4c30 100644 --- a/cdmtaskservice/jaws/output.py +++ b/cdmtaskservice/jaws/output.py @@ -6,7 +6,7 @@ import json from typing import NamedTuple -from cdmtaskservice.arg_checkers import not_falsy as _not_falsy +from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string from cdmtaskservice.jaws.wdl import OUTPUT_FILES, OUTPUT_DIR, STDOUTS, STDERRS @@ -56,7 +56,7 @@ def parse_outputs_json(outputs_json: io.BytesIO) -> OutputsJSON: for key, val in js.items(): if key.endswith(OUTPUT_FILES): for files in val: - outfiles.update({f.split(f"/{OUTPUT_DIR}/")[-1]: f for f in files}) + outfiles.update({get_relative_file_path(f): f for f in files}) # assume files are ordered correctly. If this is wrong sort by path first elif key.endswith(STDOUTS): stdo = val @@ -66,3 +66,11 @@ def parse_outputs_json(outputs_json: io.BytesIO) -> OutputsJSON: # shouldn't happen, but let's not fail silently if it does raise ValueError(f"unexpected JAWS outputs.json key: {key}") return OutputsJSON(outfiles, stdo, stde) + + +def get_relative_file_path(file: str) -> str: + """ + Given a JAWS output file path, get the file path relative to the container output directoy, + e.g. the file that was written from the container's perspective. + """ + return _require_string(file, "file").split(f"/{OUTPUT_DIR}/")[-1] diff --git a/cdmtaskservice/jobflows/nersc_jaws.py b/cdmtaskservice/jobflows/nersc_jaws.py index 93c9493..24d8ac0 100644 --- a/cdmtaskservice/jobflows/nersc_jaws.py +++ b/cdmtaskservice/jobflows/nersc_jaws.py @@ -237,13 +237,36 @@ async def upload_complete(self, job: models.AdminJobDetails): # TDOO LOGGING Add any relevant logs from the task / download task output file in state # call. Alternatively, just add code to fetch them from NERSC rather # than storing them permanently. 99% of the time they'll be uninteresting - # TODO DATA CORRECTNESS IMPORTANT upload the file list & MD5 of the files, check the MD5s - # vs the e-tags in Minio and save the files & e-tags to the job - # TODO DISKSPACE will need to clean up job results @ NERSC - await self._mongo.update_job_state( - job.id, - models.JobState.UPLOAD_SUBMITTED, - models.JobState.COMPLETE, - # TODO TEST will need a way to mock out timestamps - timestamp.utcdatetime() - ) + await self._coman.run_coroutine(self._upload_complete(job)) + + async def _upload_complete(self, job: models.AdminJobDetails): + logr = logging.getLogger(__name__) + try: + md5s = await self._nman.get_uploaded_JAWS_files(job) + filemd5s = {os.path.join(job.job_input.output_dir, f): md5 for f, md5 in md5s.items()} + # TODO PERF parsing the paths for the zillionth time + # TODO PERF configure / set concurrency + s3objs = await self._s3.get_object_meta(S3Paths(filemd5s.keys())) + outfiles = [] + for o in s3objs: + if o.e_tag != filemd5s[o.path]: + raise ValueError( + f"Expected Etag {filemd5s[o.path]} but got {o.e_tag} for uploaded " + + f"file {o.path}" + ) + outfiles.append(models.S3FileOutput(file=o.path, etag=o.e_tag)) + # TODO DISKSPACE will need to clean up job results @ NERSC + # TODO NEXT save output files to mongo and remove log line + logr.info(f"outfiles:\n{outfiles}") + await self._mongo.update_job_state( + job.id, + models.JobState.UPLOAD_SUBMITTED, + models.JobState.COMPLETE, + # TODO TEST will need a way to mock out timestamps + timestamp.utcdatetime() + ) + except Exception as e: + # TODO LOGGING figure out how logging it going to work etc. + logr.exception(f"Error completing job {job.id}") + # TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise + raise e diff --git a/cdmtaskservice/models.py b/cdmtaskservice/models.py index 9b4ab91..c69ee3a 100644 --- a/cdmtaskservice/models.py +++ b/cdmtaskservice/models.py @@ -669,6 +669,20 @@ def name_with_digest(self): return f"{self.name}@{self.digest}" +class S3FileOutput(BaseModel): + """ Am output file in an S3 instance. """ + + # no validators since this is an outgoing data structure only + file: Annotated[str, Field( + example="mybucket/foo/bar/baz.jpg", + description="A path to an object in an S3 instance, starting with the bucket.", + )] + etag: Annotated[str, Field( + example="a70a4d1732484e75434df2c08570e1b2-3", + description="The S3 e-tag of the file. Weak e-tags are not supported. " + )] + + class Job(BaseModel): """ Information about a job. @@ -690,6 +704,7 @@ class Job(BaseModel): ], description="A list of tuples of (job_state, time_job_state_entered)." )] + outputs: list[S3FileOutput] | None = None # TODO ERRORHANDLING add error field and class diff --git a/cdmtaskservice/nersc/manager.py b/cdmtaskservice/nersc/manager.py index 10e4122..1a3e060 100644 --- a/cdmtaskservice/nersc/manager.py +++ b/cdmtaskservice/nersc/manager.py @@ -542,3 +542,20 @@ async def upload_JAWS_job_files( concurrency, insecure_ssl, ) + + async def get_uploaded_JAWS_files(self, job: models.Job) -> dict[str, str]: + """ + Get the list of files that were uploaded to S3 as part of a JAWS job. + + Returns a dict of file paths relative to the output directory of a container to their + MD5s. + + Expects that the upload_JAWS_job_files function has been run, and will error otherwise. + """ + _not_falsy(job, "job") + path = self._dtn_scratch / _CTS_SCRATCH_ROOT_DIR / job.id / _MD5_JSON_FILE_NAME + cli = self._client_provider() + dtns = await cli.compute(Machine.dtns) + md5s_json = await self._get_async_path(dtns, path).download() + file_to_md5 = json.load(md5s_json) + return {jaws_output.get_relative_file_path(f): md5 for f, md5 in file_to_md5.items()}