Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split job submit from job state #126

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from cdmtaskservice.images import Images
from cdmtaskservice.jobflows.nersc_jaws import NERSCJAWSRunner
from cdmtaskservice.job_state import JobState
from cdmtaskservice.job_submit import JobSubmit
from cdmtaskservice.kb_auth import KBaseAuth
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.nersc.client import NERSCSFAPIClientProvider
Expand All @@ -34,6 +35,7 @@
auth: KBaseAuth
sfapi_client: NERSCSFAPIClientProvider
s3_client: S3Client
job_submit: JobSubmit
job_state: JobState
images: Images
# TODO CODE make an abstract jobflow class or something. For now just duck type them
Expand Down Expand Up @@ -86,7 +88,7 @@
logr.info("Done")
try:
mongodao = await MongoDAO.create(mongocli[cfg.mongo_db])
job_state = JobState(mongodao, s3)
job_state = JobState(mongodao)

Check warning on line 91 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L91

Added line #L91 was not covered by tests
nerscjawsflow = NERSCJAWSRunner(
nerscman,
job_state,
Expand All @@ -100,9 +102,10 @@
runners = {models.Cluster.PERLMUTTER_JAWS: nerscjawsflow}
imginfo = await DockerImageInfo.create(Path(cfg.crane_path).expanduser().absolute())
images = Images(mongodao, imginfo)
job_submit = JobSubmit(mongodao, s3)

Check warning on line 105 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L105

Added line #L105 was not covered by tests
app.state._mongo = mongocli
app.state._cdmstate = AppState(
auth, sfapi_client, s3, job_state, images, runners
auth, sfapi_client, s3, job_submit, job_state, images, runners
)
except:
mongocli.close()
Expand Down
2 changes: 1 addition & 1 deletion cdmtaskservice/error_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from cdmtaskservice.http_bearer import MissingTokenError, InvalidAuthHeaderError
from cdmtaskservice.images import NoEntrypointError
from cdmtaskservice.image_remote_lookup import ImageNameParseError, ImageInfoFetchError
from cdmtaskservice.job_state import ETagMismatchError
from cdmtaskservice.job_submit import ETagMismatchError
from cdmtaskservice.kb_auth import InvalidTokenError, MissingRoleError
from cdmtaskservice.mongo import (
ImageTagExistsError,
Expand Down
72 changes: 4 additions & 68 deletions cdmtaskservice/job_state.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,23 @@
"""
Manages submitting jobs and getting and updating their current state.
Manages getting and updating job state.
"""

import uuid

from cdmtaskservice import kb_auth
from cdmtaskservice import models
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string
from cdmtaskservice.exceptions import UnauthorizedError
from cdmtaskservice.image_remote_lookup import parse_image_name
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.s3.client import S3Client
from cdmtaskservice.s3.paths import S3Paths
from cdmtaskservice.timestamp import utcdatetime

class JobState:
"""
A manager for CDM job state.
"""

def __init__(self, mongo: MongoDAO, s3client: S3Client): # TODO MONOGO client
def __init__(self, mongo: MongoDAO):
"""
s3Client - an S3Client pointed at the S3 storage system to use.
mongo - a MongoDB DAO object.
"""
self._s3 = _not_falsy(s3client, "s3client")
self._mongo = _not_falsy(mongo, "mongo")

async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> str:
"""
Submit a job.

job_input - the input for the job.
user - the username of the user submitting the job.

Returns the opaque job ID.
"""
_not_falsy(job_input, "job_input")
_not_falsy(user, "user")
# Could parallelize these ops but probably not worth it
parsedimage = parse_image_name(job_input.image)
tag = parsedimage.tag
if not parsedimage.tag and not parsedimage.digest:
tag = "latest"
image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag)
await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0])
paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files]
# TODO PERF may wan to make concurrency configurable here
# TODO PERF this checks the file path syntax again, consider some way to avoid
meta = await self._s3.get_object_meta(S3Paths(paths))
new_input = []
for m, f in zip(meta, job_input.input_files):
data_id = None
if isinstance(f, models.S3File):
data_id = f.data_id
if f.etag and f.etag != m.e_tag:
raise ETagMismatchError(
f"The expected ETag '{f.etag} for the path '{f.file}' does not match "
+ f"the actual ETag '{m.e_tag}'"
)
# no need to validate the path again
new_input.append(models.S3File.model_construct(
file=m.path, etag=m.e_tag, data_id=data_id)
)
ji = job_input.model_copy(update={"input_files": new_input})
job_id = str(uuid.uuid4()) # TODO TEST for testing we'll need to set up a mock for this
job = models.Job(
id=job_id,
job_input=ji,
user=user.user,
image=image,
state=models.JobState.CREATED,
transition_times=[
(models.JobState.CREATED, utcdatetime())
]
)
await self._mongo.save_job(job)
# TDDO JOBSUBMIT if reference data is required, is it staged?
return job_id


async def get_job(self, job_id: str, user: kb_auth.KBaseUser) -> models.Job:
"""
Expand All @@ -88,10 +28,6 @@ async def get_job(self, job_id: str, user: kb_auth.KBaseUser) -> models.Job:
_not_falsy(user, "user")
job = await self._mongo.get_job(_require_string(job_id, "job_id"))
if job.user != user.user:
# reveals the job ID exist in the system but I don't see a problem with that
# reveals the job ID exists in the system but I don't see a problem with that
raise UnauthorizedError(f"User {user.user} may not access job {job_id}")
return job


class ETagMismatchError(Exception):
""" Thrown when an specified ETag does not match the expected ETag. """
83 changes: 83 additions & 0 deletions cdmtaskservice/job_submit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Manages submitting jobs.
"""

import uuid

from cdmtaskservice import kb_auth
from cdmtaskservice import models
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy
from cdmtaskservice.image_remote_lookup import parse_image_name
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.s3.client import S3Client
from cdmtaskservice.s3.paths import S3Paths
from cdmtaskservice.timestamp import utcdatetime

class JobSubmit:
"""
A manager for submitting CDM jobs.
"""

def __init__(self, mongo: MongoDAO, s3client: S3Client):
"""
mongo - a MongoDB DAO object.
s3Client - an S3Client pointed at the S3 storage system to use.
"""
self._s3 = _not_falsy(s3client, "s3client")
self._mongo = _not_falsy(mongo, "mongo")

Check warning on line 27 in cdmtaskservice/job_submit.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_submit.py#L26-L27

Added lines #L26 - L27 were not covered by tests

async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> str:
"""
Submit a job.

job_input - the input for the job.
user - the username of the user submitting the job.

Returns the opaque job ID.
"""
_not_falsy(job_input, "job_input")
_not_falsy(user, "user")

Check warning on line 39 in cdmtaskservice/job_submit.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_submit.py#L38-L39

Added lines #L38 - L39 were not covered by tests
# Could parallelize these ops but probably not worth it
parsedimage = parse_image_name(job_input.image)
tag = parsedimage.tag
if not parsedimage.tag and not parsedimage.digest:
tag = "latest"
image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag)
await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0])
paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files]

Check warning on line 47 in cdmtaskservice/job_submit.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_submit.py#L41-L47

Added lines #L41 - L47 were not covered by tests
# TODO PERF may wan to make concurrency configurable here
# TODO PERF this checks the file path syntax again, consider some way to avoid
meta = await self._s3.get_object_meta(S3Paths(paths))
new_input = []
for m, f in zip(meta, job_input.input_files):
data_id = None
if isinstance(f, models.S3File):
data_id = f.data_id
if f.etag and f.etag != m.e_tag:
raise ETagMismatchError(

Check warning on line 57 in cdmtaskservice/job_submit.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_submit.py#L50-L57

Added lines #L50 - L57 were not covered by tests
f"The expected ETag '{f.etag} for the path '{f.file}' does not match "
+ f"the actual ETag '{m.e_tag}'"
)
# no need to validate the path again
new_input.append(models.S3File.model_construct(

Check warning on line 62 in cdmtaskservice/job_submit.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_submit.py#L62

Added line #L62 was not covered by tests
file=m.path, etag=m.e_tag, data_id=data_id)
)
ji = job_input.model_copy(update={"input_files": new_input})
job_id = str(uuid.uuid4()) # TODO TEST for testing we'll need to set up a mock for this
job = models.Job(

Check warning on line 67 in cdmtaskservice/job_submit.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_submit.py#L65-L67

Added lines #L65 - L67 were not covered by tests
id=job_id,
job_input=ji,
user=user.user,
image=image,
state=models.JobState.CREATED,
transition_times=[
(models.JobState.CREATED, utcdatetime())
]
)
# TDDO JOBSUBMIT if reference data is required, is it staged?
await self._mongo.save_job(job)
return job_id

Check warning on line 79 in cdmtaskservice/job_submit.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_submit.py#L78-L79

Added lines #L78 - L79 were not covered by tests


class ETagMismatchError(Exception):
""" Thrown when an specified ETag does not match the expected ETag. """
4 changes: 2 additions & 2 deletions cdmtaskservice/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@
job_input: models.JobInput,
user: kb_auth.KBaseUser=Depends(_AUTH),
):
job_state = app_state.get_app_state(r).job_state
return SubmitJobResponse(job_id=await job_state.submit(job_input, user))
job_submit = app_state.get_app_state(r).job_submit
return SubmitJobResponse(job_id=await job_submit.submit(job_input, user))

Check warning on line 109 in cdmtaskservice/routes.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/routes.py#L108-L109

Added lines #L108 - L109 were not covered by tests


@ROUTER_JOBS.get(
Expand Down
7 changes: 7 additions & 0 deletions test/job_submit_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# TODO TEST add tests

from cdmtaskservice import job_submit # @UnusedImport


def test_noop():
pass
Loading