diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 5b71c3dc4dcd..216d8e4b5223 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -219,10 +219,6 @@ async def registry_request( return response, response_headers -async def _is_registry_responsive(app: FastAPI) -> None: - await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0) - - async def _setup_registry(app: FastAPI) -> None: @retry( wait=wait_fixed(1), @@ -231,15 +227,15 @@ async def _setup_registry(app: FastAPI) -> None: reraise=True, ) async def _wait_until_registry_responsive(app: FastAPI) -> None: - await _is_registry_responsive(app) + await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0) with log_context(_logger, logging.INFO, msg="Connecting to docker registry"): await _wait_until_registry_responsive(app) -async def _list_all_services_task(app: FastAPI) -> None: - with log_context(_logger, logging.INFO, msg="Listing all services"): - await list_services(app, ServiceType.ALL) +async def _list_all_services_task(*, app: FastAPI) -> None: + with log_context(_logger, logging.INFO, msg="Updating cache with services"): + await list_services(app, ServiceType.ALL, update_cache=True) def setup(app: FastAPI) -> None: @@ -251,10 +247,11 @@ async def on_startup() -> None: app_settings = get_application_settings(app) app.state.auto_cache_task = None if app_settings.DIRECTOR_REGISTRY_CACHING: - app.state.auto_cache_task = await start_periodic_task( + app.state.auto_cache_task = start_periodic_task( _list_all_services_task, interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2, task_name="director-auto-cache-task", + app=app, ) async def on_shutdown() -> None: @@ -280,12 +277,12 @@ def _get_prefix(service_type: ServiceType) -> str: async def _list_repositories_gen( - app: FastAPI, service_type: ServiceType + app: FastAPI, service_type: ServiceType, *, update_cache: bool ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg="listing repositories"): path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" result, headers = await registry_request( - app, path=path, method="GET", use_cache=True + app, path=path, method="GET", use_cache=not update_cache ) # initial call while True: @@ -294,7 +291,9 @@ async def _list_repositories_gen( str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") ) prefetch_task = asyncio.create_task( - registry_request(app, path=next_path, method="GET", use_cache=True) + registry_request( + app, path=next_path, method="GET", use_cache=not update_cache + ) ) else: prefetch_task = None @@ -312,12 +311,12 @@ async def _list_repositories_gen( async def list_image_tags_gen( - app: FastAPI, image_key: str + app: FastAPI, image_key: str, *, update_cache=False ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" tags, headers = await registry_request( - app, path=path, method="GET", use_cache=True + app, path=path, method="GET", use_cache=not update_cache ) # initial call while True: if "Link" in headers: @@ -325,7 +324,9 @@ async def list_image_tags_gen( str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") ) prefetch_task = asyncio.create_task( - registry_request(app, path=next_path, method="GET", use_cache=True) + registry_request( + app, path=next_path, method="GET", use_cache=not update_cache + ) ) else: prefetch_task = None @@ -367,14 +368,14 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None: async def get_image_labels( - app: FastAPI, image: str, tag: str + app: FastAPI, image: str, tag: str, *, update_cache=False ) -> tuple[dict[str, str], str | None]: """Returns image labels and the image manifest digest""" _logger.debug("getting image labels of %s:%s", image, tag) path = f"{image}/manifests/{tag}" request_result, headers = await registry_request( - app, path=path, method="GET", use_cache=True + app, path=path, method="GET", use_cache=not update_cache ) v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"]) container_config: dict[str, Any] = v1_compatibility_key.get( @@ -391,10 +392,12 @@ async def get_image_labels( async def get_image_details( - app: FastAPI, image_key: str, image_tag: str + app: FastAPI, image_key: str, image_tag: str, *, update_cache=False ) -> dict[str, Any]: image_details: dict = {} - labels, image_manifest_digest = await get_image_labels(app, image_key, image_tag) + labels, image_manifest_digest = await get_image_labels( + app, image_key, image_tag, update_cache=update_cache + ) if image_manifest_digest: # Adds manifest as extra key in the response similar to org.opencontainers.image.base.digest @@ -422,11 +425,18 @@ async def get_image_details( return image_details -async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]: +async def get_repo_details( + app: FastAPI, image_key: str, *, update_cache=False +) -> list[dict[str, Any]]: repo_details = [] - async for image_tags in list_image_tags_gen(app, image_key): + async for image_tags in list_image_tags_gen( + app, image_key, update_cache=update_cache + ): async for image_details_future in limited_as_completed( - (get_image_details(app, image_key, tag) for tag in image_tags), + ( + get_image_details(app, image_key, tag, update_cache=update_cache) + for tag in image_tags + ), limit=get_application_settings( app ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, @@ -437,16 +447,23 @@ async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]] return repo_details -async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]: +async def list_services( + app: FastAPI, service_type: ServiceType, *, update_cache=False +) -> list[dict]: with log_context(_logger, logging.DEBUG, msg="listing services"): services = [] concurrency_limit = get_application_settings( app ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS - async for repos in _list_repositories_gen(app, service_type): + async for repos in _list_repositories_gen( + app, service_type, update_cache=update_cache + ): # only list as service if it actually contains the necessary labels async for repo_details_future in limited_as_completed( - (get_repo_details(app, repo) for repo in repos), + ( + get_repo_details(app, repo, update_cache=update_cache) + for repo in repos + ), limit=concurrency_limit, ): with log_catch(_logger, reraise=False):