Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save job outputs to Mongo #161

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cdmtaskservice/jobflows/nersc_jaws.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,9 @@
)
outfiles.append(models.S3FileOutput(file=o.path, etag=o.e_tag))
# TODO DISKSPACE will need to clean up job results @ NERSC
# TODO NEXT save output files to mongo and remove log line
logr.info(f"outfiles:\n{outfiles}")
await self._mongo.update_job_state(
await self._mongo.add_output_files_to_job(

Check warning on line 259 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L259

Added line #L259 was not covered by tests
job.id,
outfiles,
models.JobState.UPLOAD_SUBMITTED,
models.JobState.COMPLETE,
# TODO TEST will need a way to mock out timestamps
Expand Down
1 change: 1 addition & 0 deletions cdmtaskservice/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
FLD_NERSC_DETAILS_UL_TASK_ID = "upload_task_id"
FLD_JOB_JAWS_DETAILS = "jaws_details"
FLD_JAWS_DETAILS_RUN_ID = "run_id"
FLD_JOB_OUTPUTS = "outputs"


# https://en.wikipedia.org/wiki/Filename#Comparison_of_filename_limitations
Expand Down
24 changes: 23 additions & 1 deletion cdmtaskservice/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
state: models.JobState,
time: datetime.datetime,
push: dict[str, Any] | None = None,
set_: dict[str, Any] | None = None,
):
res = await self._col_jobs.update_one(
{
Expand All @@ -156,7 +157,7 @@
(_not_falsy(state, "state").value, _not_falsy(time, "time")
)
},
"$set": {models.FLD_JOB_STATE: state.value}
"$set": (set_ if set_ else {}) | {models.FLD_JOB_STATE: state.value}
},
)
if not res.matched_count:
Expand Down Expand Up @@ -250,6 +251,27 @@
await self._update_job_state(job_id, current_state, state, time, push={
self._FLD_NERSC_UL_TASK: _require_string(task_id, "task_id")
})

async def add_output_files_to_job(
self,
job_id: str,
output: list[models.S3FileOutput],
current_state: models.JobState,
state: models.JobState,
time: datetime.datetime
):
"""
Add output files to a job and update the state.

Arguments are as update_job_state except for the addition of:

output - the output files.
"""
out = [o.model_dump() for o in _not_falsy(output, "output")]
await self._update_job_state(

Check warning on line 271 in cdmtaskservice/mongo.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/mongo.py#L270-L271

Added lines #L270 - L271 were not covered by tests
job_id, current_state, state, time, set_={models.FLD_JOB_OUTPUTS: out}
)


class NoSuchImageError(Exception):
""" The image does not exist in the system. """
Expand Down
1 change: 0 additions & 1 deletion cdmtaskservice/nersc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ async def upload_JAWS_job_files(
outputs = jaws_output.parse_outputs_json(outputs_json)
container_files = list(outputs.output_files.keys())
presigns = await files_to_urls(container_files)
# TODO CORRECTNESS IMPORTANT log etags in remote code, upload and return
# TODO LOGGING upload job stdout and stderr logs
return await self.upload_s3_files(
job.id,
Expand Down
Loading