Skip to content

Commit

Permalink
Merge pull request #131 from kbase/dev-service
Browse files Browse the repository at this point in the history
Get a job as an admin
  • Loading branch information
MrCreosote authored Dec 13, 2024
2 parents 19c6978 + c643b68 commit f82c102
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 41 deletions.
23 changes: 19 additions & 4 deletions cdmtaskservice/job_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Manages getting and updating job state.
"""

import logging

from cdmtaskservice import kb_auth
from cdmtaskservice import models
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string
Expand All @@ -19,15 +21,28 @@ def __init__(self, mongo: MongoDAO):
"""
self._mongo = _not_falsy(mongo, "mongo")

async def get_job(self, job_id: str, user: kb_auth.KBaseUser) -> models.Job:
async def get_job(
self,
job_id: str,
user: kb_auth.KBaseUser,
as_admin: bool = False
) -> models.Job | models.AdminJobDetails:
"""
Get a job based on its ID. If the provided user doesn't match the job's owner,
an error is thrown.
job_id - the job ID
user - the user requesting the job.
as_admin - True if the user should always have access to the job and should access
additional job details.
"""
# TODO ADMIN add way for admins to get any job, as_admin query param maybe
_not_falsy(user, "user")
job = await self._mongo.get_job(_require_string(job_id, "job_id"))
if job.user != user.user:
job = await self._mongo.get_job(_require_string(job_id, "job_id"), as_admin=as_admin)
if not as_admin and job.user != user.user:
# reveals the job ID exists in the system but I don't see a problem with that
raise UnauthorizedError(f"User {user.user} may not access job {job_id}")
msg = f"User {user.user} accessed job {job_id}"
if as_admin:
msg = f"Admin user {user.user} accessed {job.user}'s job {job_id}"
logging.getLogger(__name__).info(msg)
return job
13 changes: 10 additions & 3 deletions cdmtaskservice/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,20 @@ async def save_job(self, job: models.Job):
# to ensure unique IDs
await self._col_jobs.insert_one(jobd)

async def get_job(self, job_id: str):
""" Get a job by its ID. """
async def get_job(
self, job_id: str, as_admin: bool = False
) -> models.Job | models.AdminJobDetails:
"""
Get a job by its ID.
job_id - the job ID.
as_admin - get additional details about the job.
"""
doc = await self._col_jobs.find_one({models.FLD_JOB_ID: _require_string(job_id, "job_id")})
if not doc:
raise NoSuchJobError(f"No job with ID '{job_id}' exists")
# TODO PERF build up the job piece by piece to skip S3 path validation
return models.Job(**doc)
return models.AdminJobDetails(**doc) if as_admin else models.Job(**doc)

_FLD_NERSC_DL_TASK = f"{models.FLD_JOB_NERSC_DETAILS}.{models.FLD_NERSC_DETAILS_DL_TASK_ID}"

Expand Down
87 changes: 53 additions & 34 deletions cdmtaskservice/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ async def submit_job(
return SubmitJobResponse(job_id=await job_submit.submit(job_input, user))


_ANN_JOB_ID = Annotated[str, FastPath(
example="f0c24820-d792-4efa-a38b-2458ed8ec88f",
description="The job ID.",
pattern=r"^[\w-]+$",
min_length=1,
max_length=50,
)]


@ROUTER_JOBS.get(
"/{job_id}",
response_model=models.Job,
Expand All @@ -117,17 +126,55 @@ async def submit_job(
)
async def get_job(
r: Request,
job_id: Annotated[str, Field(
example="f0c24820-d792-4efa-a38b-2458ed8ec88f",
description="The job ID.",
pattern=r"^[\w-]+$",
job_id: _ANN_JOB_ID,
user: kb_auth.KBaseUser=Depends(_AUTH),
):
job_state = app_state.get_app_state(r).job_state
return await job_state.get_job(job_id, user)


@ROUTER_ADMIN.post(
"/images/{image_id:path}",
response_model=models.Image,
summary="Approve an image",
description="Approve a Docker image for use with this service. "
+ "The image must be publicly acessible and have an entrypoint. "
+ "The image may not already exist in the system."
)
async def approve_image(
r: Request,
image_id: Annotated[str, FastPath(
example="ghcr.io/kbase/collections:checkm2_0.1.6"
+ "@sha256:c9291c94c382b88975184203100d119cba865c1be91b1c5891749ee02193d380",
description="The Docker image to run for the job. Include the SHA to ensure the "
+ "exact code requested is run.",
# Don't bother validating other than some basic checks, validation will occur when
# checking / getting the image SHA from the remote repository
min_length=1,
max_length=50,
max_length=1000,
)],
user: kb_auth.KBaseUser=Depends(_AUTH)
):
_ensure_admin(user, "Only service administrators can approve images.")
images = app_state.get_app_state(r).images
return await images.register(image_id)


@ROUTER_ADMIN.get(
"/jobs/{job_id}",
response_model=models.AdminJobDetails,
summary="Get a job as an admin",
description="Get any job, regardless of ownership, with additional details about the job run."
)
async def get_job_admin(
r: Request,
job_id: _ANN_JOB_ID,
user: kb_auth.KBaseUser=Depends(_AUTH),
):
_ensure_admin(user, "Only service administrators can get jobs as an admin.")
job_state = app_state.get_app_state(r).job_state
return await job_state.get_job(job_id, user)
return await job_state.get_job(job_id, user, as_admin=True)


class NERSCClientInfo(BaseModel):
Expand Down Expand Up @@ -176,33 +223,5 @@ async def get_nersc_client_info(
)


@ROUTER_ADMIN.post(
"/image/register/{image_id:path}",
response_model=models.Image,
summary="Approve an image",
description="Approve a Docker image for use with this service. "
+ "The image must be publicly acessible and have an entrypoint. "
+ "The image may not already exist in the system."
)
async def approve_image(
r: Request,
image_id: Annotated[str, FastPath(
example="ghcr.io/kbase/collections:checkm2_0.1.6"
+ "@sha256:c9291c94c382b88975184203100d119cba865c1be91b1c5891749ee02193d380",
description="The Docker image to run for the job. Include the SHA to ensure the "
+ "exact code requested is run.",
# Don't bother validating other than some basic checks, validation will occur when
# checking / getting the image SHA from the remote repository
min_length=1,
max_length=1000,
)],
user: kb_auth.KBaseUser=Depends(_AUTH)
):
_ensure_admin(user, "Only service administrators can approve images.")
images = app_state.get_app_state(r).images
return await images.register(image_id)


class ClientLifeTimeError(Exception):
""" An error thrown when a client's lifetime is less than required. """

0 comments on commit f82c102

Please sign in to comment.