diff --git a/cdmtaskservice/job_state.py b/cdmtaskservice/job_state.py index 5f0aa83..43093d8 100644 --- a/cdmtaskservice/job_state.py +++ b/cdmtaskservice/job_state.py @@ -2,6 +2,8 @@ Manages getting and updating job state. """ +import logging + from cdmtaskservice import kb_auth from cdmtaskservice import models from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string @@ -19,15 +21,28 @@ def __init__(self, mongo: MongoDAO): """ self._mongo = _not_falsy(mongo, "mongo") - async def get_job(self, job_id: str, user: kb_auth.KBaseUser) -> models.Job: + async def get_job( + self, + job_id: str, + user: kb_auth.KBaseUser, + as_admin: bool = False + ) -> models.Job | models.AdminJobDetails: """ Get a job based on its ID. If the provided user doesn't match the job's owner, an error is thrown. + + job_id - the job ID + user - the user requesting the job. + as_admin - True if the user should always have access to the job and should access + additional job details. """ - # TODO ADMIN add way for admins to get any job, as_admin query param maybe _not_falsy(user, "user") - job = await self._mongo.get_job(_require_string(job_id, "job_id")) - if job.user != user.user: + job = await self._mongo.get_job(_require_string(job_id, "job_id"), as_admin=as_admin) + if not as_admin and job.user != user.user: # reveals the job ID exists in the system but I don't see a problem with that raise UnauthorizedError(f"User {user.user} may not access job {job_id}") + msg = f"User {user.user} accessed job {job_id}" + if as_admin: + msg = f"Admin user {user.user} accessed {job.user}'s job {job_id}" + logging.getLogger(__name__).info(msg) return job diff --git a/cdmtaskservice/mongo.py b/cdmtaskservice/mongo.py index ff90624..c0d34ab 100644 --- a/cdmtaskservice/mongo.py +++ b/cdmtaskservice/mongo.py @@ -121,13 +121,20 @@ async def save_job(self, job: models.Job): # to ensure unique IDs await self._col_jobs.insert_one(jobd) - async def get_job(self, job_id: str): - """ Get a job by its ID. """ + async def get_job( + self, job_id: str, as_admin: bool = False + ) -> models.Job | models.AdminJobDetails: + """ + Get a job by its ID. + + job_id - the job ID. + as_admin - get additional details about the job. + """ doc = await self._col_jobs.find_one({models.FLD_JOB_ID: _require_string(job_id, "job_id")}) if not doc: raise NoSuchJobError(f"No job with ID '{job_id}' exists") # TODO PERF build up the job piece by piece to skip S3 path validation - return models.Job(**doc) + return models.AdminJobDetails(**doc) if as_admin else models.Job(**doc) _FLD_NERSC_DL_TASK = f"{models.FLD_JOB_NERSC_DETAILS}.{models.FLD_NERSC_DETAILS_DL_TASK_ID}" diff --git a/cdmtaskservice/routes.py b/cdmtaskservice/routes.py index c4fa1a4..eba429a 100644 --- a/cdmtaskservice/routes.py +++ b/cdmtaskservice/routes.py @@ -109,6 +109,15 @@ async def submit_job( return SubmitJobResponse(job_id=await job_submit.submit(job_input, user)) +_ANN_JOB_ID = Annotated[str, FastPath( + example="f0c24820-d792-4efa-a38b-2458ed8ec88f", + description="The job ID.", + pattern=r"^[\w-]+$", + min_length=1, + max_length=50, +)] + + @ROUTER_JOBS.get( "/{job_id}", response_model=models.Job, @@ -117,17 +126,55 @@ async def submit_job( ) async def get_job( r: Request, - job_id: Annotated[str, Field( - example="f0c24820-d792-4efa-a38b-2458ed8ec88f", - description="The job ID.", - pattern=r"^[\w-]+$", + job_id: _ANN_JOB_ID, + user: kb_auth.KBaseUser=Depends(_AUTH), +): + job_state = app_state.get_app_state(r).job_state + return await job_state.get_job(job_id, user) + + +@ROUTER_ADMIN.post( + "/images/{image_id:path}", + response_model=models.Image, + summary="Approve an image", + description="Approve a Docker image for use with this service. " + + "The image must be publicly acessible and have an entrypoint. " + + "The image may not already exist in the system." + +) +async def approve_image( + r: Request, + image_id: Annotated[str, FastPath( + example="ghcr.io/kbase/collections:checkm2_0.1.6" + + "@sha256:c9291c94c382b88975184203100d119cba865c1be91b1c5891749ee02193d380", + description="The Docker image to run for the job. Include the SHA to ensure the " + + "exact code requested is run.", + # Don't bother validating other than some basic checks, validation will occur when + # checking / getting the image SHA from the remote repository min_length=1, - max_length=50, + max_length=1000, )], + user: kb_auth.KBaseUser=Depends(_AUTH) +): + _ensure_admin(user, "Only service administrators can approve images.") + images = app_state.get_app_state(r).images + return await images.register(image_id) + + +@ROUTER_ADMIN.get( + "/jobs/{job_id}", + response_model=models.AdminJobDetails, + summary="Get a job as an admin", + description="Get any job, regardless of ownership, with additional details about the job run." +) +async def get_job_admin( + r: Request, + job_id: _ANN_JOB_ID, user: kb_auth.KBaseUser=Depends(_AUTH), ): + _ensure_admin(user, "Only service administrators can get jobs as an admin.") job_state = app_state.get_app_state(r).job_state - return await job_state.get_job(job_id, user) + return await job_state.get_job(job_id, user, as_admin=True) class NERSCClientInfo(BaseModel): @@ -176,33 +223,5 @@ async def get_nersc_client_info( ) -@ROUTER_ADMIN.post( - "/image/register/{image_id:path}", - response_model=models.Image, - summary="Approve an image", - description="Approve a Docker image for use with this service. " - + "The image must be publicly acessible and have an entrypoint. " - + "The image may not already exist in the system." - -) -async def approve_image( - r: Request, - image_id: Annotated[str, FastPath( - example="ghcr.io/kbase/collections:checkm2_0.1.6" - + "@sha256:c9291c94c382b88975184203100d119cba865c1be91b1c5891749ee02193d380", - description="The Docker image to run for the job. Include the SHA to ensure the " - + "exact code requested is run.", - # Don't bother validating other than some basic checks, validation will occur when - # checking / getting the image SHA from the remote repository - min_length=1, - max_length=1000, - )], - user: kb_auth.KBaseUser=Depends(_AUTH) -): - _ensure_admin(user, "Only service administrators can approve images.") - images = app_state.get_app_state(r).images - return await images.register(image_id) - - class ClientLifeTimeError(Exception): """ An error thrown when a client's lifetime is less than required. """