Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge job_submit back into job_state #162

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from cdmtaskservice.jaws.client import JAWSClient
from cdmtaskservice.jobflows.nersc_jaws import NERSCJAWSRunner
from cdmtaskservice.job_state import JobState
from cdmtaskservice.job_submit import JobSubmit
from cdmtaskservice.kb_auth import KBaseAuth
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.nersc.client import NERSCSFAPIClientProvider
Expand All @@ -38,7 +37,6 @@
sfapi_client: NERSCSFAPIClientProvider
jaws_client: JAWSClient
s3_client: S3Client
job_submit: JobSubmit
job_state: JobState
images: Images
# TODO CODE make an abstract jobflow class or something. For now just duck type them
Expand Down Expand Up @@ -101,8 +99,6 @@
jaws_client = await JAWSClient.create(cfg.jaws_url, cfg.jaws_token)
logr.info("Done")
mongodao = await MongoDAO.create(mongocli[cfg.mongo_db])
# TODO CODE once the nerscjawsflow is done merge job_state and job_submit back together
job_state = JobState(mongodao)
nerscjawsflow = NERSCJAWSRunner( # this has a lot of required args, yech
nerscman,
jaws_client,
Expand All @@ -116,11 +112,11 @@
runners = {models.Cluster.PERLMUTTER_JAWS: nerscjawsflow}
imginfo = await DockerImageInfo.create(Path(cfg.crane_path).expanduser().absolute())
images = Images(mongodao, imginfo)
job_submit = JobSubmit(mongodao, s3, coman, runners)
job_state = JobState(mongodao, s3, coman, runners)

Check warning on line 115 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L115

Added line #L115 was not covered by tests
app.state._mongo = mongocli
app.state._coroman = coman
app.state._cdmstate = AppState(
auth, sfapi_client, jaws_client, s3, job_submit, job_state, images, runners
auth, sfapi_client, jaws_client, s3, job_state, images, runners
)
except:
mongocli.close()
Expand Down
2 changes: 1 addition & 1 deletion cdmtaskservice/error_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from cdmtaskservice.http_bearer import MissingTokenError, InvalidAuthHeaderError
from cdmtaskservice.images import NoEntrypointError
from cdmtaskservice.image_remote_lookup import ImageNameParseError, ImageInfoFetchError
from cdmtaskservice.job_submit import ETagMismatchError
from cdmtaskservice.job_state import ETagMismatchError
from cdmtaskservice.kb_auth import InvalidTokenError, MissingRoleError
from cdmtaskservice.mongo import (
ImageTagExistsError,
Expand Down
82 changes: 80 additions & 2 deletions cdmtaskservice/job_state.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,98 @@
"""
Manages getting and updating job state.
Manages job state.
"""

import logging
import uuid
from typing import Any

from cdmtaskservice import models
from cdmtaskservice import kb_auth
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string
from cdmtaskservice.coroutine_manager import CoroutineWrangler
from cdmtaskservice.exceptions import UnauthorizedError
from cdmtaskservice.image_remote_lookup import parse_image_name
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.s3.client import S3Client
from cdmtaskservice.s3.paths import S3Paths
from cdmtaskservice.timestamp import utcdatetime

class JobState:
"""
A manager for CDM job state.
"""

def __init__(self, mongo: MongoDAO):
def __init__(
self,
mongo: MongoDAO,
s3client: S3Client,
coro_manager: CoroutineWrangler,
job_runners: dict[models.Cluster, Any], # Make abstract class if necessary
):
"""
mongo - a MongoDB DAO object.
s3Client - an S3Client pointed at the S3 storage system to use.
coro_manager - a coroutine manager.
job_runners - a mapping of remote compute cluster to the job runner for that cluster.
"""
self._s3 = _not_falsy(s3client, "s3client")

Check warning on line 38 in cdmtaskservice/job_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_state.py#L38

Added line #L38 was not covered by tests
self._mongo = _not_falsy(mongo, "mongo")
self._coman = _not_falsy(coro_manager, "coro_manager")
self._runners = _not_falsy(job_runners, "job_runners")

Check warning on line 41 in cdmtaskservice/job_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_state.py#L40-L41

Added lines #L40 - L41 were not covered by tests

async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> str:
"""
Submit a job.

job_input - the input for the job.
user - the username of the user submitting the job.

Returns the opaque job ID.
"""
_not_falsy(job_input, "job_input")
_not_falsy(user, "user")

Check warning on line 53 in cdmtaskservice/job_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_state.py#L52-L53

Added lines #L52 - L53 were not covered by tests
# Could parallelize these ops but probably not worth it
parsedimage = parse_image_name(job_input.image)
tag = parsedimage.tag
if not parsedimage.tag and not parsedimage.digest:
tag = "latest"
image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag)
await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0])
paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files]

Check warning on line 61 in cdmtaskservice/job_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_state.py#L55-L61

Added lines #L55 - L61 were not covered by tests
# TODO PERF may wan to make concurrency configurable here
# TODO PERF this checks the file path syntax again, consider some way to avoid
meta = await self._s3.get_object_meta(S3Paths(paths))
new_input = []
for m, f in zip(meta, job_input.input_files):
data_id = None
if isinstance(f, models.S3File):
data_id = f.data_id
if f.etag and f.etag != m.e_tag:
raise ETagMismatchError(

Check warning on line 71 in cdmtaskservice/job_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_state.py#L64-L71

Added lines #L64 - L71 were not covered by tests
f"The expected ETag '{f.etag} for the path '{f.file}' does not match "
+ f"the actual ETag '{m.e_tag}'"
)
# no need to validate the path again
new_input.append(models.S3File.model_construct(

Check warning on line 76 in cdmtaskservice/job_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_state.py#L76

Added line #L76 was not covered by tests
file=m.path, etag=m.e_tag, data_id=data_id)
)
ji = job_input.model_copy(update={"input_files": new_input})
job_id = str(uuid.uuid4()) # TODO TEST for testing we'll need to set up a mock for this
job = models.Job(

Check warning on line 81 in cdmtaskservice/job_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_state.py#L79-L81

Added lines #L79 - L81 were not covered by tests
id=job_id,
job_input=ji,
user=user.user,
image=image,
state=models.JobState.CREATED,
transition_times=[
(models.JobState.CREATED, utcdatetime())
]
)
# TDDO JOBSUBMIT if reference data is required, is it staged?
await self._mongo.save_job(job)

Check warning on line 92 in cdmtaskservice/job_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_state.py#L92

Added line #L92 was not covered by tests
# Pass in the meta to avoid potential race conditions w/ etag changes
await self._coman.run_coroutine(self._runners[job.job_input.cluster].start_job(job, meta))
return job_id

Check warning on line 95 in cdmtaskservice/job_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_state.py#L94-L95

Added lines #L94 - L95 were not covered by tests

async def get_job(
self,
Expand All @@ -45,3 +119,7 @@
msg = f"Admin user {user} accessed {job.user}'s job {job_id}"
logging.getLogger(__name__).info(msg)
return job


class ETagMismatchError(Exception):
""" Thrown when an specified ETag does not match the expected ETag. """
97 changes: 0 additions & 97 deletions cdmtaskservice/job_submit.py

This file was deleted.

4 changes: 2 additions & 2 deletions cdmtaskservice/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@
job_input: models.JobInput,
user: kb_auth.KBaseUser=Depends(_AUTH),
) -> SubmitJobResponse:
job_submit = app_state.get_app_state(r).job_submit
return SubmitJobResponse(job_id=await job_submit.submit(job_input, user))
job_state = app_state.get_app_state(r).job_state
return SubmitJobResponse(job_id=await job_state.submit(job_input, user))

Check warning on line 122 in cdmtaskservice/routes.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/routes.py#L121-L122

Added lines #L121 - L122 were not covered by tests


_ANN_JOB_ID = Annotated[str, FastPath(
Expand Down
7 changes: 0 additions & 7 deletions test/job_submit_test.py

This file was deleted.

Loading