Skip to content

Commit

Permalink
Merge pull request #164 from kbase/dev-service
Browse files Browse the repository at this point in the history
Add registration info to the image structure & a GET endpoint.
  • Loading branch information
MrCreosote authored Jan 10, 2025
2 parents 76ac328 + 0edfb8a commit 5b95708
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 23 deletions.
1 change: 1 addition & 0 deletions cdmtaskservice/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def create_app():
app.add_middleware(GZipMiddleware)
app.include_router(routes.ROUTER_GENERAL)
app.include_router(routes.ROUTER_JOBS)
app.include_router(routes.ROUTER_IMAGES)
app.include_router(routes.ROUTER_ADMIN)
app.include_router(routes.ROUTER_CALLBACKS)

Expand Down
2 changes: 1 addition & 1 deletion cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async def build_app(
runners = {models.Cluster.PERLMUTTER_JAWS: nerscjawsflow}
imginfo = await DockerImageInfo.create(Path(cfg.crane_path).expanduser().absolute())
images = Images(mongodao, imginfo)
job_state = JobState(mongodao, s3, coman, runners)
job_state = JobState(mongodao, s3, images, coman, runners)
app.state._mongo = mongocli
app.state._coroman = coman
app.state._cdmstate = AppState(
Expand Down
2 changes: 1 addition & 1 deletion cdmtaskservice/image_remote_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async def _run_crane(self, *args) -> (int, str, str):


# TODO TEST add tests calling this method specifically
def parse_image_name(image_name: str) -> reference.Reference:
def parse_image_name(image_name: str) -> ParsedImageName:
image_name = image_name.strip() if image_name is not None else None
# Image name rules: https://docs.docker.com/reference/cli/docker/image/tag/
# Don't do an exhaustive check here, but enough that we're reasonably confident
Expand Down
26 changes: 22 additions & 4 deletions cdmtaskservice/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

from cdmtaskservice import models
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy
from cdmtaskservice.image_remote_lookup import DockerImageInfo
from cdmtaskservice.image_remote_lookup import DockerImageInfo, parse_image_name
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.timestamp import utcdatetime


class Images:
Expand All @@ -23,22 +24,39 @@ def __init__(self, mongo: MongoDAO, imageinfo: DockerImageInfo):
self._mongo = _not_falsy(mongo, "mongo")
self._iminfo = _not_falsy(imageinfo, "imageinfo")

async def register(self, imagename: str):
async def register(self, imagename: str, username: str) -> models.Image:
"""
Register an image to the service.
imagename - the name of the docker image, e.g. gchr.io/kbase/checkm2:6.7.1
username - the name of the user registering the image.
"""
normedname = await self._iminfo.normalize_image_name(imagename)
# Just use the sha for entrypoint lookup to ensure we get the right image
entrypoint = await self._iminfo.get_entrypoint_from_name(normedname.name_with_digest)
if not entrypoint:
raise NoEntrypointError(f"Image {imagename} does not have an entrypoint")
# TODO DATAINTEG add username and date created
# TODO REFDATA allow specifying refdata for image
img = models.Image(
name=normedname.name,
digest = normedname.digest,
tag=normedname.tag,
entrypoint=entrypoint
entrypoint=entrypoint,
registered_by=username,
registered_on=utcdatetime()
)
await self._mongo.save_image(img)
return img

async def get_image(self, imagename) -> models.Image:
"""
Get an image.
"""
parsedimage = parse_image_name(imagename)
tag = parsedimage.tag
if not parsedimage.tag and not parsedimage.digest:
tag = "latest"
return await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag)


class NoEntrypointError(Exception):
Expand Down
11 changes: 5 additions & 6 deletions cdmtaskservice/job_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string
from cdmtaskservice.coroutine_manager import CoroutineWrangler
from cdmtaskservice.exceptions import UnauthorizedError
from cdmtaskservice.image_remote_lookup import parse_image_name
from cdmtaskservice.images import Images
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.s3.client import S3Client
from cdmtaskservice.s3.paths import S3Paths
Expand All @@ -26,15 +26,18 @@ def __init__(
self,
mongo: MongoDAO,
s3client: S3Client,
images: Images,
coro_manager: CoroutineWrangler,
job_runners: dict[models.Cluster, Any], # Make abstract class if necessary
):
"""
mongo - a MongoDB DAO object.
s3Client - an S3Client pointed at the S3 storage system to use.
images - a handler for getting images.
coro_manager - a coroutine manager.
job_runners - a mapping of remote compute cluster to the job runner for that cluster.
"""
self._images = _not_falsy(images, "images")
self._s3 = _not_falsy(s3client, "s3client")
self._mongo = _not_falsy(mongo, "mongo")
self._coman = _not_falsy(coro_manager, "coro_manager")
Expand All @@ -52,11 +55,7 @@ async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> s
_not_falsy(job_input, "job_input")
_not_falsy(user, "user")
# Could parallelize these ops but probably not worth it
parsedimage = parse_image_name(job_input.image)
tag = parsedimage.tag
if not parsedimage.tag and not parsedimage.digest:
tag = "latest"
image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag)
image = await self._images.get_image(job_input.image)
await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0])
paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files]
# TODO PERF may want to make concurrency configurable here
Expand Down
8 changes: 8 additions & 0 deletions cdmtaskservice/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,14 @@ class Image(BaseModel):
description="The image tag at registration time. "
+ "The tag may no longer point to the same image."
)] = None
registered_by: Annotated[str, Field(
example="aparkin",
description="The username of the user that registered this image."
)]
registered_on: Annotated[datetime.datetime, Field(
example="2024-10-24T22:35:40Z",
description="The time of registration."
)]
# TODO REFERENCEDATA add reference data ID

@property
Expand Down
40 changes: 29 additions & 11 deletions cdmtaskservice/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
# may need to split these into different files if this file gets too big
ROUTER_GENERAL = APIRouter(tags=["General"])
ROUTER_JOBS = APIRouter(tags=["Jobs"], prefix="/jobs")
ROUTER_IMAGES = APIRouter(tags=["Images"], prefix="/images")
ROUTER_ADMIN = APIRouter(tags=["Admin"], prefix="/admin")
ROUTER_CALLBACKS = APIRouter(tags=["Callbacks"])

Expand Down Expand Up @@ -146,6 +147,32 @@ async def get_job(
return await job_state.get_job(job_id, user.user)


_ANN_IMAGE_ID = Annotated[str, FastPath(
example="ghcr.io/kbase/collections:checkm2_0.1.6"
+ "@sha256:c9291c94c382b88975184203100d119cba865c1be91b1c5891749ee02193d380",
description="The name of a docker image. Include the SHA to ensure "
+ "referencing the correct image.",
# Don't bother validating other than some basic checks, validation will occur when
# checking / getting the image SHA from the remote repository
min_length=1,
max_length=1000,
)]


@ROUTER_IMAGES.get(
"/{image_id:path}",
response_model=models.Image,
summary="Get an image by name",
description="Get an image previously registered to the system by the image name."
)
async def get_image(
r: Request,
image_id: _ANN_IMAGE_ID,
# Public for now - any reason for these to be private to KBase staff?
):
return await app_state.get_app_state(r).images.get_image(image_id)


@ROUTER_ADMIN.post(
"/images/{image_id:path}",
response_model=models.Image,
Expand All @@ -157,21 +184,12 @@ async def get_job(
)
async def approve_image(
r: Request,
image_id: Annotated[str, FastPath(
example="ghcr.io/kbase/collections:checkm2_0.1.6"
+ "@sha256:c9291c94c382b88975184203100d119cba865c1be91b1c5891749ee02193d380",
description="The Docker image to run for the job. Include the SHA to ensure the "
+ "exact code requested is run.",
# Don't bother validating other than some basic checks, validation will occur when
# checking / getting the image SHA from the remote repository
min_length=1,
max_length=1000,
)],
image_id: _ANN_IMAGE_ID,
user: kb_auth.KBaseUser=Depends(_AUTH)
) -> models.Image:
_ensure_admin(user, "Only service administrators can approve images.")
images = app_state.get_app_state(r).images
return await images.register(image_id)
return await images.register(image_id, user.user)


@ROUTER_ADMIN.get(
Expand Down

0 comments on commit 5b95708

Please sign in to comment.