Skip to content

Commit

Permalink
Merge pull request #160 from kbase/dev-service
Browse files Browse the repository at this point in the history
Check S3 etags against md5s calculated at NERSC
  • Loading branch information
MrCreosote authored Jan 10, 2025
2 parents c6341e2 + 1c4c5b7 commit 239bc22
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 12 deletions.
12 changes: 10 additions & 2 deletions cdmtaskservice/jaws/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json
from typing import NamedTuple

from cdmtaskservice.arg_checkers import not_falsy as _not_falsy
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string
from cdmtaskservice.jaws.wdl import OUTPUT_FILES, OUTPUT_DIR, STDOUTS, STDERRS


Expand Down Expand Up @@ -56,7 +56,7 @@ def parse_outputs_json(outputs_json: io.BytesIO) -> OutputsJSON:
for key, val in js.items():
if key.endswith(OUTPUT_FILES):
for files in val:
outfiles.update({f.split(f"/{OUTPUT_DIR}/")[-1]: f for f in files})
outfiles.update({get_relative_file_path(f): f for f in files})
# assume files are ordered correctly. If this is wrong sort by path first
elif key.endswith(STDOUTS):
stdo = val
Expand All @@ -66,3 +66,11 @@ def parse_outputs_json(outputs_json: io.BytesIO) -> OutputsJSON:
# shouldn't happen, but let's not fail silently if it does
raise ValueError(f"unexpected JAWS outputs.json key: {key}")
return OutputsJSON(outfiles, stdo, stde)


def get_relative_file_path(file: str) -> str:
"""
Given a JAWS output file path, get the file path relative to the container output directoy,
e.g. the file that was written from the container's perspective.
"""
return _require_string(file, "file").split(f"/{OUTPUT_DIR}/")[-1]
43 changes: 33 additions & 10 deletions cdmtaskservice/jobflows/nersc_jaws.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,36 @@ async def upload_complete(self, job: models.AdminJobDetails):
# TDOO LOGGING Add any relevant logs from the task / download task output file in state
# call. Alternatively, just add code to fetch them from NERSC rather
# than storing them permanently. 99% of the time they'll be uninteresting
# TODO DATA CORRECTNESS IMPORTANT upload the file list & MD5 of the files, check the MD5s
# vs the e-tags in Minio and save the files & e-tags to the job
# TODO DISKSPACE will need to clean up job results @ NERSC
await self._mongo.update_job_state(
job.id,
models.JobState.UPLOAD_SUBMITTED,
models.JobState.COMPLETE,
# TODO TEST will need a way to mock out timestamps
timestamp.utcdatetime()
)
await self._coman.run_coroutine(self._upload_complete(job))

async def _upload_complete(self, job: models.AdminJobDetails):
logr = logging.getLogger(__name__)
try:
md5s = await self._nman.get_uploaded_JAWS_files(job)
filemd5s = {os.path.join(job.job_input.output_dir, f): md5 for f, md5 in md5s.items()}
# TODO PERF parsing the paths for the zillionth time
# TODO PERF configure / set concurrency
s3objs = await self._s3.get_object_meta(S3Paths(filemd5s.keys()))
outfiles = []
for o in s3objs:
if o.e_tag != filemd5s[o.path]:
raise ValueError(
f"Expected Etag {filemd5s[o.path]} but got {o.e_tag} for uploaded "
+ f"file {o.path}"
)
outfiles.append(models.S3FileOutput(file=o.path, etag=o.e_tag))
# TODO DISKSPACE will need to clean up job results @ NERSC
# TODO NEXT save output files to mongo and remove log line
logr.info(f"outfiles:\n{outfiles}")
await self._mongo.update_job_state(
job.id,
models.JobState.UPLOAD_SUBMITTED,
models.JobState.COMPLETE,
# TODO TEST will need a way to mock out timestamps
timestamp.utcdatetime()
)
except Exception as e:
# TODO LOGGING figure out how logging it going to work etc.
logr.exception(f"Error completing job {job.id}")
# TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise
raise e
15 changes: 15 additions & 0 deletions cdmtaskservice/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,20 @@ def name_with_digest(self):
return f"{self.name}@{self.digest}"


class S3FileOutput(BaseModel):
""" Am output file in an S3 instance. """

# no validators since this is an outgoing data structure only
file: Annotated[str, Field(
example="mybucket/foo/bar/baz.jpg",
description="A path to an object in an S3 instance, starting with the bucket.",
)]
etag: Annotated[str, Field(
example="a70a4d1732484e75434df2c08570e1b2-3",
description="The S3 e-tag of the file. Weak e-tags are not supported. "
)]


class Job(BaseModel):
"""
Information about a job.
Expand All @@ -690,6 +704,7 @@ class Job(BaseModel):
],
description="A list of tuples of (job_state, time_job_state_entered)."
)]
outputs: list[S3FileOutput] | None = None
# TODO ERRORHANDLING add error field and class


Expand Down
17 changes: 17 additions & 0 deletions cdmtaskservice/nersc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,20 @@ async def upload_JAWS_job_files(
concurrency,
insecure_ssl,
)

async def get_uploaded_JAWS_files(self, job: models.Job) -> dict[str, str]:
"""
Get the list of files that were uploaded to S3 as part of a JAWS job.
Returns a dict of file paths relative to the output directory of a container to their
MD5s.
Expects that the upload_JAWS_job_files function has been run, and will error otherwise.
"""
_not_falsy(job, "job")
path = self._dtn_scratch / _CTS_SCRATCH_ROOT_DIR / job.id / _MD5_JSON_FILE_NAME
cli = self._client_provider()
dtns = await cli.compute(Machine.dtns)
md5s_json = await self._get_async_path(dtns, path).download()
file_to_md5 = json.load(md5s_json)
return {jaws_output.get_relative_file_path(f): md5 for f, md5 in file_to_md5.items()}

0 comments on commit 239bc22

Please sign in to comment.