diff --git a/cdmtaskservice/app.py b/cdmtaskservice/app.py index 693a4b2..393b26e 100644 --- a/cdmtaskservice/app.py +++ b/cdmtaskservice/app.py @@ -67,6 +67,7 @@ def create_app(): app.add_middleware(GZipMiddleware) app.include_router(routes.ROUTER_GENERAL) app.include_router(routes.ROUTER_JOBS) + app.include_router(routes.ROUTER_IMAGES) app.include_router(routes.ROUTER_ADMIN) app.include_router(routes.ROUTER_CALLBACKS) diff --git a/cdmtaskservice/app_state.py b/cdmtaskservice/app_state.py index 73300bf..9a74241 100644 --- a/cdmtaskservice/app_state.py +++ b/cdmtaskservice/app_state.py @@ -112,7 +112,7 @@ async def build_app( runners = {models.Cluster.PERLMUTTER_JAWS: nerscjawsflow} imginfo = await DockerImageInfo.create(Path(cfg.crane_path).expanduser().absolute()) images = Images(mongodao, imginfo) - job_state = JobState(mongodao, s3, coman, runners) + job_state = JobState(mongodao, s3, images, coman, runners) app.state._mongo = mongocli app.state._coroman = coman app.state._cdmstate = AppState( diff --git a/cdmtaskservice/image_remote_lookup.py b/cdmtaskservice/image_remote_lookup.py index 191d63d..ad1cc34 100644 --- a/cdmtaskservice/image_remote_lookup.py +++ b/cdmtaskservice/image_remote_lookup.py @@ -169,7 +169,7 @@ async def _run_crane(self, *args) -> (int, str, str): # TODO TEST add tests calling this method specifically -def parse_image_name(image_name: str) -> reference.Reference: +def parse_image_name(image_name: str) -> ParsedImageName: image_name = image_name.strip() if image_name is not None else None # Image name rules: https://docs.docker.com/reference/cli/docker/image/tag/ # Don't do an exhaustive check here, but enough that we're reasonably confident diff --git a/cdmtaskservice/images.py b/cdmtaskservice/images.py index 2a8f0ca..a80888b 100644 --- a/cdmtaskservice/images.py +++ b/cdmtaskservice/images.py @@ -4,8 +4,9 @@ from cdmtaskservice import models from cdmtaskservice.arg_checkers import not_falsy as _not_falsy -from cdmtaskservice.image_remote_lookup import DockerImageInfo +from cdmtaskservice.image_remote_lookup import DockerImageInfo, parse_image_name from cdmtaskservice.mongo import MongoDAO +from cdmtaskservice.timestamp import utcdatetime class Images: @@ -23,22 +24,39 @@ def __init__(self, mongo: MongoDAO, imageinfo: DockerImageInfo): self._mongo = _not_falsy(mongo, "mongo") self._iminfo = _not_falsy(imageinfo, "imageinfo") - async def register(self, imagename: str): + async def register(self, imagename: str, username: str) -> models.Image: + """ + Register an image to the service. + + imagename - the name of the docker image, e.g. gchr.io/kbase/checkm2:6.7.1 + username - the name of the user registering the image. + """ normedname = await self._iminfo.normalize_image_name(imagename) # Just use the sha for entrypoint lookup to ensure we get the right image entrypoint = await self._iminfo.get_entrypoint_from_name(normedname.name_with_digest) if not entrypoint: raise NoEntrypointError(f"Image {imagename} does not have an entrypoint") - # TODO DATAINTEG add username and date created # TODO REFDATA allow specifying refdata for image img = models.Image( name=normedname.name, digest = normedname.digest, tag=normedname.tag, - entrypoint=entrypoint + entrypoint=entrypoint, + registered_by=username, + registered_on=utcdatetime() ) await self._mongo.save_image(img) return img + + async def get_image(self, imagename) -> models.Image: + """ + Get an image. + """ + parsedimage = parse_image_name(imagename) + tag = parsedimage.tag + if not parsedimage.tag and not parsedimage.digest: + tag = "latest" + return await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag) class NoEntrypointError(Exception): diff --git a/cdmtaskservice/job_state.py b/cdmtaskservice/job_state.py index d6edbff..6a13560 100644 --- a/cdmtaskservice/job_state.py +++ b/cdmtaskservice/job_state.py @@ -11,7 +11,7 @@ from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string from cdmtaskservice.coroutine_manager import CoroutineWrangler from cdmtaskservice.exceptions import UnauthorizedError -from cdmtaskservice.image_remote_lookup import parse_image_name +from cdmtaskservice.images import Images from cdmtaskservice.mongo import MongoDAO from cdmtaskservice.s3.client import S3Client from cdmtaskservice.s3.paths import S3Paths @@ -26,15 +26,18 @@ def __init__( self, mongo: MongoDAO, s3client: S3Client, + images: Images, coro_manager: CoroutineWrangler, job_runners: dict[models.Cluster, Any], # Make abstract class if necessary ): """ mongo - a MongoDB DAO object. s3Client - an S3Client pointed at the S3 storage system to use. + images - a handler for getting images. coro_manager - a coroutine manager. job_runners - a mapping of remote compute cluster to the job runner for that cluster. """ + self._images = _not_falsy(images, "images") self._s3 = _not_falsy(s3client, "s3client") self._mongo = _not_falsy(mongo, "mongo") self._coman = _not_falsy(coro_manager, "coro_manager") @@ -52,11 +55,7 @@ async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> s _not_falsy(job_input, "job_input") _not_falsy(user, "user") # Could parallelize these ops but probably not worth it - parsedimage = parse_image_name(job_input.image) - tag = parsedimage.tag - if not parsedimage.tag and not parsedimage.digest: - tag = "latest" - image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag) + image = await self._images.get_image(job_input.image) await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0]) paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files] # TODO PERF may want to make concurrency configurable here diff --git a/cdmtaskservice/models.py b/cdmtaskservice/models.py index 7a5cc43..87f064a 100644 --- a/cdmtaskservice/models.py +++ b/cdmtaskservice/models.py @@ -660,6 +660,14 @@ class Image(BaseModel): description="The image tag at registration time. " + "The tag may no longer point to the same image." )] = None + registered_by: Annotated[str, Field( + example="aparkin", + description="The username of the user that registered this image." + )] + registered_on: Annotated[datetime.datetime, Field( + example="2024-10-24T22:35:40Z", + description="The time of registration." + )] # TODO REFERENCEDATA add reference data ID @property diff --git a/cdmtaskservice/routes.py b/cdmtaskservice/routes.py index 4df9541..c0bb216 100644 --- a/cdmtaskservice/routes.py +++ b/cdmtaskservice/routes.py @@ -35,6 +35,7 @@ # may need to split these into different files if this file gets too big ROUTER_GENERAL = APIRouter(tags=["General"]) ROUTER_JOBS = APIRouter(tags=["Jobs"], prefix="/jobs") +ROUTER_IMAGES = APIRouter(tags=["Images"], prefix="/images") ROUTER_ADMIN = APIRouter(tags=["Admin"], prefix="/admin") ROUTER_CALLBACKS = APIRouter(tags=["Callbacks"]) @@ -146,6 +147,32 @@ async def get_job( return await job_state.get_job(job_id, user.user) +_ANN_IMAGE_ID = Annotated[str, FastPath( + example="ghcr.io/kbase/collections:checkm2_0.1.6" + + "@sha256:c9291c94c382b88975184203100d119cba865c1be91b1c5891749ee02193d380", + description="The name of a docker image. Include the SHA to ensure " + + "referencing the correct image.", + # Don't bother validating other than some basic checks, validation will occur when + # checking / getting the image SHA from the remote repository + min_length=1, + max_length=1000, +)] + + +@ROUTER_IMAGES.get( + "/{image_id:path}", + response_model=models.Image, + summary="Get an image by name", + description="Get an image previously registered to the system by the image name." +) +async def get_image( + r: Request, + image_id: _ANN_IMAGE_ID, + # Public for now - any reason for these to be private to KBase staff? +): + return await app_state.get_app_state(r).images.get_image(image_id) + + @ROUTER_ADMIN.post( "/images/{image_id:path}", response_model=models.Image, @@ -157,21 +184,12 @@ async def get_job( ) async def approve_image( r: Request, - image_id: Annotated[str, FastPath( - example="ghcr.io/kbase/collections:checkm2_0.1.6" - + "@sha256:c9291c94c382b88975184203100d119cba865c1be91b1c5891749ee02193d380", - description="The Docker image to run for the job. Include the SHA to ensure the " - + "exact code requested is run.", - # Don't bother validating other than some basic checks, validation will occur when - # checking / getting the image SHA from the remote repository - min_length=1, - max_length=1000, - )], + image_id: _ANN_IMAGE_ID, user: kb_auth.KBaseUser=Depends(_AUTH) ) -> models.Image: _ensure_admin(user, "Only service administrators can approve images.") images = app_state.get_app_state(r).images - return await images.register(image_id) + return await images.register(image_id, user.user) @ROUTER_ADMIN.get(