Skip to content

Commit

Permalink
Merge pull request #157 from kbase/dev-service
Browse files Browse the repository at this point in the history
Add job completion callback endpoint and mongo update
  • Loading branch information
MrCreosote authored Jan 9, 2025
2 parents de2d569 + 765dec3 commit 8c04309
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 10 deletions.
28 changes: 27 additions & 1 deletion cdmtaskservice/jobflows/nersc_jaws.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ async def presign(output_files: list[Path]) -> list[S3PresignedPost]:

try:
# TODO PERF config / set concurrency
# TODO DISKSPACE will need to clean up job results @ NERSC
# TODO LOGGING make the remote code log summary of results and upload and store
# TODO LOGGING upload the container std* logs to S3 and store locations in job
# or maybe store in GFS? Should discuss with group how this should work
task_id = await self._nman.upload_JAWS_job_files(
job,
jaws_info["output_dir"],
Expand All @@ -216,3 +217,28 @@ async def presign(output_files: list[Path]) -> list[S3PresignedPost]:
logr.exception(f"Error starting upload for job {job.id}")
# TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise
raise e

async def upload_complete(self, job: models.AdminJobDetails):
"""
Complete a job after the upload is complete. The job is expected to be in the
upload submitted state.
"""
if _not_falsy(job, "job").state != models.JobState.UPLOAD_SUBMITTED:
raise InvalidJobStateError("Job must be in the upload submitted state")
# TODO ERRHANDLING IMPORTANT pull the task from the SFAPI. If it a) doesn't exist or b) has
# no errors, continue, otherwise put the job into an errored state.
# TODO ERRHANDLING IMPORTANT upload the output file from the upload task and check for
# errors. If any exist, put the job into an errored state.
# TDOO LOGGING Add any relevant logs from the task / download task output file in state
# call. Alternatively, just add code to fetch them from NERSC rather
# than storing them permanently. 99% of the time they'll be uninteresting
# TODO DATA CORRECTNESS IMPORTANT upload the file list & MD5 of the files, check the MD5s
# vs the e-tags in Minio and save the files & e-tags to the job
# TODO DISKSPACE will need to clean up job results @ NERSC
await self._mongo.update_job_state(
job.id,
models.JobState.UPLOAD_SUBMITTED,
models.JobState.COMPLETE,
# TODO TEST will need a way to mock out timestamps
timestamp.utcdatetime()
)
48 changes: 39 additions & 9 deletions cdmtaskservice/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@
APIRouter,
Depends,
Request,
Response,
status,
Query,
Path as FastPath
)
from pydantic import BaseModel, Field
from typing import Annotated
from typing import Annotated, Any

from cdmtaskservice import app_state
from cdmtaskservice import kb_auth
from cdmtaskservice import models
from cdmtaskservice.callback_url_paths import (
get_download_complete_callback,
get_job_complete_callback,
get_upload_complete_callback,
)
from cdmtaskservice.exceptions import UnauthorizedError
from cdmtaskservice.git_commit import GIT_COMMIT
Expand Down Expand Up @@ -237,32 +240,59 @@ async def get_nersc_client_info(
f"/{get_download_complete_callback()}/{{job_id}}",
summary="Report data download complete",
description="Report that data download for a job is complete. This method is not expected "
+ "to be called by users."
+ "to be called by users.",
status_code = status.HTTP_204_NO_CONTENT,
response_class=Response,
)
async def download_complete(
r: Request,
job_id: _ANN_JOB_ID
):
logging.getLogger(__name__).info(f"Download reported as complete for job {job_id}")
appstate = app_state.get_app_state(r)
job = await appstate.job_state.get_job(job_id, _SERVICE_USER, as_admin=True)
await appstate.runners[job.job_input.cluster].download_complete(job)
runner, job = await _callback_handling(r, "Download", job_id)
await runner.download_complete(job)


@ROUTER_CALLBACKS.get(
f"/{get_job_complete_callback()}/{{job_id}}",
summary="Report job complete",
description="Report a remote job is complete. This method is not expected "
+ "to be called by users."
+ "to be called by users.",
status_code = status.HTTP_204_NO_CONTENT,
response_class=Response,
)
async def job_complete(
r: Request,
job_id: _ANN_JOB_ID
):
logging.getLogger(__name__).info(f"Remote job reported as complete for job {job_id}")
runner, job = await _callback_handling(r, "Remote job", job_id)
await runner.job_complete(job)


@ROUTER_CALLBACKS.get(
f"/{get_upload_complete_callback()}/{{job_id}}",
summary="Report data upload complete",
description="Report that data upload for a job is complete. This method is not expected "
+ "to be called by users.",
status_code = status.HTTP_204_NO_CONTENT,
response_class=Response,
)
async def upload_complete(
r: Request,
job_id: _ANN_JOB_ID
):
runner, job = await _callback_handling(r, "Upload", job_id)
await runner.upload_complete(job)


async def _callback_handling(
r: Request, operation: str, job_id: str
) -> (Any, models.AdminJobDetails):
# Any in the tuple is the job flow runner. Would need to make an abstract class to type it
# YAGNI
logging.getLogger(__name__).info(f"{operation} reported as complete for job {job_id}")
appstate = app_state.get_app_state(r)
job = await appstate.job_state.get_job(job_id, _SERVICE_USER, as_admin=True)
await appstate.runners[job.job_input.cluster].job_complete(job)
return appstate.runners[job.job_input.cluster], job


class ClientLifeTimeError(Exception):
Expand Down

0 comments on commit 8c04309

Please sign in to comment.