diff --git a/cdmtaskservice/app_state.py b/cdmtaskservice/app_state.py index 2b9d6ed..cf00a04 100644 --- a/cdmtaskservice/app_state.py +++ b/cdmtaskservice/app_state.py @@ -18,6 +18,7 @@ from cdmtaskservice.images import Images from cdmtaskservice.jobflows.nersc_jaws import NERSCJAWSRunner from cdmtaskservice.job_state import JobState +from cdmtaskservice.job_submit import JobSubmit from cdmtaskservice.kb_auth import KBaseAuth from cdmtaskservice.mongo import MongoDAO from cdmtaskservice.nersc.client import NERSCSFAPIClientProvider @@ -34,6 +35,7 @@ class AppState(NamedTuple): auth: KBaseAuth sfapi_client: NERSCSFAPIClientProvider s3_client: S3Client + job_submit: JobSubmit job_state: JobState images: Images # TODO CODE make an abstract jobflow class or something. For now just duck type them @@ -86,7 +88,7 @@ async def build_app( logr.info("Done") try: mongodao = await MongoDAO.create(mongocli[cfg.mongo_db]) - job_state = JobState(mongodao, s3) + job_state = JobState(mongodao) nerscjawsflow = NERSCJAWSRunner( nerscman, job_state, @@ -100,9 +102,10 @@ async def build_app( runners = {models.Cluster.PERLMUTTER_JAWS: nerscjawsflow} imginfo = await DockerImageInfo.create(Path(cfg.crane_path).expanduser().absolute()) images = Images(mongodao, imginfo) + job_submit = JobSubmit(mongodao, s3) app.state._mongo = mongocli app.state._cdmstate = AppState( - auth, sfapi_client, s3, job_state, images, runners + auth, sfapi_client, s3, job_submit, job_state, images, runners ) except: mongocli.close() diff --git a/cdmtaskservice/error_mapping.py b/cdmtaskservice/error_mapping.py index a0c989a..fbeb6db 100644 --- a/cdmtaskservice/error_mapping.py +++ b/cdmtaskservice/error_mapping.py @@ -9,7 +9,7 @@ from cdmtaskservice.http_bearer import MissingTokenError, InvalidAuthHeaderError from cdmtaskservice.images import NoEntrypointError from cdmtaskservice.image_remote_lookup import ImageNameParseError, ImageInfoFetchError -from cdmtaskservice.job_state import ETagMismatchError +from cdmtaskservice.job_submit import ETagMismatchError from cdmtaskservice.kb_auth import InvalidTokenError, MissingRoleError from cdmtaskservice.mongo import ( ImageTagExistsError, diff --git a/cdmtaskservice/job_state.py b/cdmtaskservice/job_state.py index 13df97a..5f0aa83 100644 --- a/cdmtaskservice/job_state.py +++ b/cdmtaskservice/job_state.py @@ -1,83 +1,23 @@ """ -Manages submitting jobs and getting and updating their current state. +Manages getting and updating job state. """ -import uuid - from cdmtaskservice import kb_auth from cdmtaskservice import models from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string from cdmtaskservice.exceptions import UnauthorizedError -from cdmtaskservice.image_remote_lookup import parse_image_name from cdmtaskservice.mongo import MongoDAO -from cdmtaskservice.s3.client import S3Client -from cdmtaskservice.s3.paths import S3Paths -from cdmtaskservice.timestamp import utcdatetime class JobState: """ A manager for CDM job state. """ - def __init__(self, mongo: MongoDAO, s3client: S3Client): # TODO MONOGO client + def __init__(self, mongo: MongoDAO): """ - s3Client - an S3Client pointed at the S3 storage system to use. + mongo - a MongoDB DAO object. """ - self._s3 = _not_falsy(s3client, "s3client") self._mongo = _not_falsy(mongo, "mongo") - - async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> str: - """ - Submit a job. - - job_input - the input for the job. - user - the username of the user submitting the job. - - Returns the opaque job ID. - """ - _not_falsy(job_input, "job_input") - _not_falsy(user, "user") - # Could parallelize these ops but probably not worth it - parsedimage = parse_image_name(job_input.image) - tag = parsedimage.tag - if not parsedimage.tag and not parsedimage.digest: - tag = "latest" - image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag) - await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0]) - paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files] - # TODO PERF may wan to make concurrency configurable here - # TODO PERF this checks the file path syntax again, consider some way to avoid - meta = await self._s3.get_object_meta(S3Paths(paths)) - new_input = [] - for m, f in zip(meta, job_input.input_files): - data_id = None - if isinstance(f, models.S3File): - data_id = f.data_id - if f.etag and f.etag != m.e_tag: - raise ETagMismatchError( - f"The expected ETag '{f.etag} for the path '{f.file}' does not match " - + f"the actual ETag '{m.e_tag}'" - ) - # no need to validate the path again - new_input.append(models.S3File.model_construct( - file=m.path, etag=m.e_tag, data_id=data_id) - ) - ji = job_input.model_copy(update={"input_files": new_input}) - job_id = str(uuid.uuid4()) # TODO TEST for testing we'll need to set up a mock for this - job = models.Job( - id=job_id, - job_input=ji, - user=user.user, - image=image, - state=models.JobState.CREATED, - transition_times=[ - (models.JobState.CREATED, utcdatetime()) - ] - ) - await self._mongo.save_job(job) - # TDDO JOBSUBMIT if reference data is required, is it staged? - return job_id - async def get_job(self, job_id: str, user: kb_auth.KBaseUser) -> models.Job: """ @@ -88,10 +28,6 @@ async def get_job(self, job_id: str, user: kb_auth.KBaseUser) -> models.Job: _not_falsy(user, "user") job = await self._mongo.get_job(_require_string(job_id, "job_id")) if job.user != user.user: - # reveals the job ID exist in the system but I don't see a problem with that + # reveals the job ID exists in the system but I don't see a problem with that raise UnauthorizedError(f"User {user.user} may not access job {job_id}") return job - - -class ETagMismatchError(Exception): - """ Thrown when an specified ETag does not match the expected ETag. """ diff --git a/cdmtaskservice/job_submit.py b/cdmtaskservice/job_submit.py new file mode 100644 index 0000000..2ecce30 --- /dev/null +++ b/cdmtaskservice/job_submit.py @@ -0,0 +1,83 @@ +""" +Manages submitting jobs. +""" + +import uuid + +from cdmtaskservice import kb_auth +from cdmtaskservice import models +from cdmtaskservice.arg_checkers import not_falsy as _not_falsy +from cdmtaskservice.image_remote_lookup import parse_image_name +from cdmtaskservice.mongo import MongoDAO +from cdmtaskservice.s3.client import S3Client +from cdmtaskservice.s3.paths import S3Paths +from cdmtaskservice.timestamp import utcdatetime + +class JobSubmit: + """ + A manager for submitting CDM jobs. + """ + + def __init__(self, mongo: MongoDAO, s3client: S3Client): + """ + mongo - a MongoDB DAO object. + s3Client - an S3Client pointed at the S3 storage system to use. + """ + self._s3 = _not_falsy(s3client, "s3client") + self._mongo = _not_falsy(mongo, "mongo") + + async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> str: + """ + Submit a job. + + job_input - the input for the job. + user - the username of the user submitting the job. + + Returns the opaque job ID. + """ + _not_falsy(job_input, "job_input") + _not_falsy(user, "user") + # Could parallelize these ops but probably not worth it + parsedimage = parse_image_name(job_input.image) + tag = parsedimage.tag + if not parsedimage.tag and not parsedimage.digest: + tag = "latest" + image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag) + await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0]) + paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files] + # TODO PERF may wan to make concurrency configurable here + # TODO PERF this checks the file path syntax again, consider some way to avoid + meta = await self._s3.get_object_meta(S3Paths(paths)) + new_input = [] + for m, f in zip(meta, job_input.input_files): + data_id = None + if isinstance(f, models.S3File): + data_id = f.data_id + if f.etag and f.etag != m.e_tag: + raise ETagMismatchError( + f"The expected ETag '{f.etag} for the path '{f.file}' does not match " + + f"the actual ETag '{m.e_tag}'" + ) + # no need to validate the path again + new_input.append(models.S3File.model_construct( + file=m.path, etag=m.e_tag, data_id=data_id) + ) + ji = job_input.model_copy(update={"input_files": new_input}) + job_id = str(uuid.uuid4()) # TODO TEST for testing we'll need to set up a mock for this + job = models.Job( + id=job_id, + job_input=ji, + user=user.user, + image=image, + state=models.JobState.CREATED, + transition_times=[ + (models.JobState.CREATED, utcdatetime()) + ] + ) + # TDDO JOBSUBMIT if reference data is required, is it staged? + await self._mongo.save_job(job) + return job_id + + +class ETagMismatchError(Exception): + """ Thrown when an specified ETag does not match the expected ETag. """ diff --git a/cdmtaskservice/routes.py b/cdmtaskservice/routes.py index 5cb82ed..c4fa1a4 100644 --- a/cdmtaskservice/routes.py +++ b/cdmtaskservice/routes.py @@ -105,8 +105,8 @@ async def submit_job( job_input: models.JobInput, user: kb_auth.KBaseUser=Depends(_AUTH), ): - job_state = app_state.get_app_state(r).job_state - return SubmitJobResponse(job_id=await job_state.submit(job_input, user)) + job_submit = app_state.get_app_state(r).job_submit + return SubmitJobResponse(job_id=await job_submit.submit(job_input, user)) @ROUTER_JOBS.get( diff --git a/test/job_submit_test.py b/test/job_submit_test.py new file mode 100644 index 0000000..27171ad --- /dev/null +++ b/test/job_submit_test.py @@ -0,0 +1,7 @@ +# TODO TEST add tests + +from cdmtaskservice import job_submit # @UnusedImport + + +def test_noop(): + pass