From 486749a3496750a53fbf0f8d6aac50f99b54487a Mon Sep 17 00:00:00 2001 From: Gavin Date: Fri, 3 Jan 2025 15:43:30 -0800 Subject: [PATCH] Merge job_submit back into job_state oops --- cdmtaskservice/app_state.py | 8 +-- cdmtaskservice/error_mapping.py | 2 +- cdmtaskservice/job_state.py | 82 +++++++++++++++++++++++++++- cdmtaskservice/job_submit.py | 97 --------------------------------- cdmtaskservice/routes.py | 4 +- test/job_submit_test.py | 7 --- 6 files changed, 85 insertions(+), 115 deletions(-) delete mode 100644 cdmtaskservice/job_submit.py delete mode 100644 test/job_submit_test.py diff --git a/cdmtaskservice/app_state.py b/cdmtaskservice/app_state.py index 8e4ea84..73300bf 100644 --- a/cdmtaskservice/app_state.py +++ b/cdmtaskservice/app_state.py @@ -20,7 +20,6 @@ from cdmtaskservice.jaws.client import JAWSClient from cdmtaskservice.jobflows.nersc_jaws import NERSCJAWSRunner from cdmtaskservice.job_state import JobState -from cdmtaskservice.job_submit import JobSubmit from cdmtaskservice.kb_auth import KBaseAuth from cdmtaskservice.mongo import MongoDAO from cdmtaskservice.nersc.client import NERSCSFAPIClientProvider @@ -38,7 +37,6 @@ class AppState(NamedTuple): sfapi_client: NERSCSFAPIClientProvider jaws_client: JAWSClient s3_client: S3Client - job_submit: JobSubmit job_state: JobState images: Images # TODO CODE make an abstract jobflow class or something. For now just duck type them @@ -101,8 +99,6 @@ async def build_app( jaws_client = await JAWSClient.create(cfg.jaws_url, cfg.jaws_token) logr.info("Done") mongodao = await MongoDAO.create(mongocli[cfg.mongo_db]) - # TODO CODE once the nerscjawsflow is done merge job_state and job_submit back together - job_state = JobState(mongodao) nerscjawsflow = NERSCJAWSRunner( # this has a lot of required args, yech nerscman, jaws_client, @@ -116,11 +112,11 @@ async def build_app( runners = {models.Cluster.PERLMUTTER_JAWS: nerscjawsflow} imginfo = await DockerImageInfo.create(Path(cfg.crane_path).expanduser().absolute()) images = Images(mongodao, imginfo) - job_submit = JobSubmit(mongodao, s3, coman, runners) + job_state = JobState(mongodao, s3, coman, runners) app.state._mongo = mongocli app.state._coroman = coman app.state._cdmstate = AppState( - auth, sfapi_client, jaws_client, s3, job_submit, job_state, images, runners + auth, sfapi_client, jaws_client, s3, job_state, images, runners ) except: mongocli.close() diff --git a/cdmtaskservice/error_mapping.py b/cdmtaskservice/error_mapping.py index 5a74b22..c81170c 100644 --- a/cdmtaskservice/error_mapping.py +++ b/cdmtaskservice/error_mapping.py @@ -10,7 +10,7 @@ from cdmtaskservice.http_bearer import MissingTokenError, InvalidAuthHeaderError from cdmtaskservice.images import NoEntrypointError from cdmtaskservice.image_remote_lookup import ImageNameParseError, ImageInfoFetchError -from cdmtaskservice.job_submit import ETagMismatchError +from cdmtaskservice.job_state import ETagMismatchError from cdmtaskservice.kb_auth import InvalidTokenError, MissingRoleError from cdmtaskservice.mongo import ( ImageTagExistsError, diff --git a/cdmtaskservice/job_state.py b/cdmtaskservice/job_state.py index 4968ba8..f122555 100644 --- a/cdmtaskservice/job_state.py +++ b/cdmtaskservice/job_state.py @@ -1,24 +1,98 @@ """ -Manages getting and updating job state. +Manages job state. """ import logging +import uuid +from typing import Any from cdmtaskservice import models +from cdmtaskservice import kb_auth from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string +from cdmtaskservice.coroutine_manager import CoroutineWrangler from cdmtaskservice.exceptions import UnauthorizedError +from cdmtaskservice.image_remote_lookup import parse_image_name from cdmtaskservice.mongo import MongoDAO +from cdmtaskservice.s3.client import S3Client +from cdmtaskservice.s3.paths import S3Paths +from cdmtaskservice.timestamp import utcdatetime class JobState: """ A manager for CDM job state. """ - def __init__(self, mongo: MongoDAO): + def __init__( + self, + mongo: MongoDAO, + s3client: S3Client, + coro_manager: CoroutineWrangler, + job_runners: dict[models.Cluster, Any], # Make abstract class if necessary + ): """ mongo - a MongoDB DAO object. + s3Client - an S3Client pointed at the S3 storage system to use. + coro_manager - a coroutine manager. + job_runners - a mapping of remote compute cluster to the job runner for that cluster. """ + self._s3 = _not_falsy(s3client, "s3client") self._mongo = _not_falsy(mongo, "mongo") + self._coman = _not_falsy(coro_manager, "coro_manager") + self._runners = _not_falsy(job_runners, "job_runners") + + async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> str: + """ + Submit a job. + + job_input - the input for the job. + user - the username of the user submitting the job. + + Returns the opaque job ID. + """ + _not_falsy(job_input, "job_input") + _not_falsy(user, "user") + # Could parallelize these ops but probably not worth it + parsedimage = parse_image_name(job_input.image) + tag = parsedimage.tag + if not parsedimage.tag and not parsedimage.digest: + tag = "latest" + image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag) + await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0]) + paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files] + # TODO PERF may wan to make concurrency configurable here + # TODO PERF this checks the file path syntax again, consider some way to avoid + meta = await self._s3.get_object_meta(S3Paths(paths)) + new_input = [] + for m, f in zip(meta, job_input.input_files): + data_id = None + if isinstance(f, models.S3File): + data_id = f.data_id + if f.etag and f.etag != m.e_tag: + raise ETagMismatchError( + f"The expected ETag '{f.etag} for the path '{f.file}' does not match " + + f"the actual ETag '{m.e_tag}'" + ) + # no need to validate the path again + new_input.append(models.S3File.model_construct( + file=m.path, etag=m.e_tag, data_id=data_id) + ) + ji = job_input.model_copy(update={"input_files": new_input}) + job_id = str(uuid.uuid4()) # TODO TEST for testing we'll need to set up a mock for this + job = models.Job( + id=job_id, + job_input=ji, + user=user.user, + image=image, + state=models.JobState.CREATED, + transition_times=[ + (models.JobState.CREATED, utcdatetime()) + ] + ) + # TDDO JOBSUBMIT if reference data is required, is it staged? + await self._mongo.save_job(job) + # Pass in the meta to avoid potential race conditions w/ etag changes + await self._coman.run_coroutine(self._runners[job.job_input.cluster].start_job(job, meta)) + return job_id async def get_job( self, @@ -45,3 +119,7 @@ async def get_job( msg = f"Admin user {user} accessed {job.user}'s job {job_id}" logging.getLogger(__name__).info(msg) return job + + +class ETagMismatchError(Exception): + """ Thrown when an specified ETag does not match the expected ETag. """ diff --git a/cdmtaskservice/job_submit.py b/cdmtaskservice/job_submit.py deleted file mode 100644 index 318057e..0000000 --- a/cdmtaskservice/job_submit.py +++ /dev/null @@ -1,97 +0,0 @@ -""" -Manages submitting jobs. -""" - -import uuid -from typing import Any - -from cdmtaskservice import kb_auth -from cdmtaskservice import models -from cdmtaskservice.arg_checkers import not_falsy as _not_falsy -from cdmtaskservice.coroutine_manager import CoroutineWrangler -from cdmtaskservice.image_remote_lookup import parse_image_name -from cdmtaskservice.mongo import MongoDAO -from cdmtaskservice.s3.client import S3Client -from cdmtaskservice.s3.paths import S3Paths -from cdmtaskservice.timestamp import utcdatetime - -class JobSubmit: - """ - A manager for submitting CDM jobs. - """ - - def __init__( - self, - mongo: MongoDAO, - s3client: S3Client, - coro_manager: CoroutineWrangler, - job_runners: dict[models.Cluster, Any], # Make abstract class if necessary - ): - """ - mongo - a MongoDB DAO object. - s3Client - an S3Client pointed at the S3 storage system to use. - coro_manager - a coroutine manager. - job_runners - a mapping of remote compute cluster to the job runner for that cluster. - """ - self._s3 = _not_falsy(s3client, "s3client") - self._mongo = _not_falsy(mongo, "mongo") - self._coman = _not_falsy(coro_manager, "coro_manager") - self._runners = _not_falsy(job_runners, "job_runners") - - async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> str: - """ - Submit a job. - - job_input - the input for the job. - user - the username of the user submitting the job. - - Returns the opaque job ID. - """ - _not_falsy(job_input, "job_input") - _not_falsy(user, "user") - # Could parallelize these ops but probably not worth it - parsedimage = parse_image_name(job_input.image) - tag = parsedimage.tag - if not parsedimage.tag and not parsedimage.digest: - tag = "latest" - image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag) - await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0]) - paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files] - # TODO PERF may wan to make concurrency configurable here - # TODO PERF this checks the file path syntax again, consider some way to avoid - meta = await self._s3.get_object_meta(S3Paths(paths)) - new_input = [] - for m, f in zip(meta, job_input.input_files): - data_id = None - if isinstance(f, models.S3File): - data_id = f.data_id - if f.etag and f.etag != m.e_tag: - raise ETagMismatchError( - f"The expected ETag '{f.etag} for the path '{f.file}' does not match " - + f"the actual ETag '{m.e_tag}'" - ) - # no need to validate the path again - new_input.append(models.S3File.model_construct( - file=m.path, etag=m.e_tag, data_id=data_id) - ) - ji = job_input.model_copy(update={"input_files": new_input}) - job_id = str(uuid.uuid4()) # TODO TEST for testing we'll need to set up a mock for this - job = models.Job( - id=job_id, - job_input=ji, - user=user.user, - image=image, - state=models.JobState.CREATED, - transition_times=[ - (models.JobState.CREATED, utcdatetime()) - ] - ) - # TDDO JOBSUBMIT if reference data is required, is it staged? - await self._mongo.save_job(job) - # Pass in the meta to avoid potential race conditions w/ etag changes - await self._coman.run_coroutine(self._runners[job.job_input.cluster].start_job(job, meta)) - return job_id - - -class ETagMismatchError(Exception): - """ Thrown when an specified ETag does not match the expected ETag. """ diff --git a/cdmtaskservice/routes.py b/cdmtaskservice/routes.py index 52c05f4..4df9541 100644 --- a/cdmtaskservice/routes.py +++ b/cdmtaskservice/routes.py @@ -118,8 +118,8 @@ async def submit_job( job_input: models.JobInput, user: kb_auth.KBaseUser=Depends(_AUTH), ) -> SubmitJobResponse: - job_submit = app_state.get_app_state(r).job_submit - return SubmitJobResponse(job_id=await job_submit.submit(job_input, user)) + job_state = app_state.get_app_state(r).job_state + return SubmitJobResponse(job_id=await job_state.submit(job_input, user)) _ANN_JOB_ID = Annotated[str, FastPath( diff --git a/test/job_submit_test.py b/test/job_submit_test.py deleted file mode 100644 index 27171ad..0000000 --- a/test/job_submit_test.py +++ /dev/null @@ -1,7 +0,0 @@ -# TODO TEST add tests - -from cdmtaskservice import job_submit # @UnusedImport - - -def test_noop(): - pass