Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 22, 2024
1 parent 4b2fcad commit 3ed57ec
Showing 1 changed file with 42 additions and 25 deletions.
67 changes: 42 additions & 25 deletions services/director/src/simcore_service_director/registry_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,6 @@ async def registry_request(
return response, response_headers


async def _is_registry_responsive(app: FastAPI) -> None:
await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0)


async def _setup_registry(app: FastAPI) -> None:
@retry(
wait=wait_fixed(1),
Expand All @@ -231,15 +227,15 @@ async def _setup_registry(app: FastAPI) -> None:
reraise=True,
)
async def _wait_until_registry_responsive(app: FastAPI) -> None:
await _is_registry_responsive(app)
await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0)

with log_context(_logger, logging.INFO, msg="Connecting to docker registry"):
await _wait_until_registry_responsive(app)


async def _list_all_services_task(app: FastAPI) -> None:
with log_context(_logger, logging.INFO, msg="Listing all services"):
await list_services(app, ServiceType.ALL)
async def _list_all_services_task(*, app: FastAPI) -> None:
with log_context(_logger, logging.INFO, msg="Updating cache with services"):
await list_services(app, ServiceType.ALL, update_cache=True)


def setup(app: FastAPI) -> None:
Expand All @@ -251,10 +247,11 @@ async def on_startup() -> None:
app_settings = get_application_settings(app)
app.state.auto_cache_task = None
if app_settings.DIRECTOR_REGISTRY_CACHING:
app.state.auto_cache_task = await start_periodic_task(
app.state.auto_cache_task = start_periodic_task(
_list_all_services_task,
interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2,
task_name="director-auto-cache-task",
app=app,
)

async def on_shutdown() -> None:
Expand All @@ -280,12 +277,12 @@ def _get_prefix(service_type: ServiceType) -> str:


async def _list_repositories_gen(
app: FastAPI, service_type: ServiceType
app: FastAPI, service_type: ServiceType, *, update_cache: bool
) -> AsyncGenerator[list[str], None]:
with log_context(_logger, logging.DEBUG, msg="listing repositories"):
path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
result, headers = await registry_request(
app, path=path, method="GET", use_cache=True
app, path=path, method="GET", use_cache=not update_cache
) # initial call

while True:
Expand All @@ -294,7 +291,9 @@ async def _list_repositories_gen(
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
)
prefetch_task = asyncio.create_task(
registry_request(app, path=next_path, method="GET", use_cache=True)
registry_request(
app, path=next_path, method="GET", use_cache=not update_cache
)
)
else:
prefetch_task = None
Expand All @@ -312,20 +311,22 @@ async def _list_repositories_gen(


async def list_image_tags_gen(
app: FastAPI, image_key: str
app: FastAPI, image_key: str, *, update_cache=False
) -> AsyncGenerator[list[str], None]:
with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"):
path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
tags, headers = await registry_request(
app, path=path, method="GET", use_cache=True
app, path=path, method="GET", use_cache=not update_cache
) # initial call
while True:
if "Link" in headers:
next_path = (
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
)
prefetch_task = asyncio.create_task(
registry_request(app, path=next_path, method="GET", use_cache=True)
registry_request(
app, path=next_path, method="GET", use_cache=not update_cache
)
)
else:
prefetch_task = None
Expand Down Expand Up @@ -367,14 +368,14 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None:


async def get_image_labels(
app: FastAPI, image: str, tag: str
app: FastAPI, image: str, tag: str, *, update_cache=False
) -> tuple[dict[str, str], str | None]:
"""Returns image labels and the image manifest digest"""

_logger.debug("getting image labels of %s:%s", image, tag)
path = f"{image}/manifests/{tag}"
request_result, headers = await registry_request(
app, path=path, method="GET", use_cache=True
app, path=path, method="GET", use_cache=not update_cache
)
v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"])
container_config: dict[str, Any] = v1_compatibility_key.get(
Expand All @@ -391,10 +392,12 @@ async def get_image_labels(


async def get_image_details(
app: FastAPI, image_key: str, image_tag: str
app: FastAPI, image_key: str, image_tag: str, *, update_cache=False
) -> dict[str, Any]:
image_details: dict = {}
labels, image_manifest_digest = await get_image_labels(app, image_key, image_tag)
labels, image_manifest_digest = await get_image_labels(
app, image_key, image_tag, update_cache=update_cache
)

if image_manifest_digest:
# Adds manifest as extra key in the response similar to org.opencontainers.image.base.digest
Expand Down Expand Up @@ -422,11 +425,18 @@ async def get_image_details(
return image_details


async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]:
async def get_repo_details(
app: FastAPI, image_key: str, *, update_cache=False
) -> list[dict[str, Any]]:
repo_details = []
async for image_tags in list_image_tags_gen(app, image_key):
async for image_tags in list_image_tags_gen(
app, image_key, update_cache=update_cache
):
async for image_details_future in limited_as_completed(
(get_image_details(app, image_key, tag) for tag in image_tags),
(
get_image_details(app, image_key, tag, update_cache=update_cache)
for tag in image_tags
),
limit=get_application_settings(
app
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS,
Expand All @@ -437,16 +447,23 @@ async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]
return repo_details


async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]:
async def list_services(
app: FastAPI, service_type: ServiceType, *, update_cache=False
) -> list[dict]:
with log_context(_logger, logging.DEBUG, msg="listing services"):
services = []
concurrency_limit = get_application_settings(
app
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS
async for repos in _list_repositories_gen(app, service_type):
async for repos in _list_repositories_gen(
app, service_type, update_cache=update_cache
):
# only list as service if it actually contains the necessary labels
async for repo_details_future in limited_as_completed(
(get_repo_details(app, repo) for repo in repos),
(
get_repo_details(app, repo, update_cache=update_cache)
for repo in repos
),
limit=concurrency_limit,
):
with log_catch(_logger, reraise=False):
Expand Down

0 comments on commit 3ed57ec

Please sign in to comment.