diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 9fa703a24dbb..5b71c3dc4dcd 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -10,6 +10,7 @@ import httpx from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped] from fastapi import FastAPI, status +from servicelib.background_task import start_periodic_task, stop_periodic_task from servicelib.logging_utils import log_catch, log_context from servicelib.utils import limited_as_completed from tenacity import retry @@ -189,26 +190,26 @@ async def registry_request( app: FastAPI, *, path: str, - method: str = "GET", - no_cache: bool = False, + method: str, + use_cache: bool, **session_kwargs, ) -> tuple[dict, Mapping]: cache: SimpleMemoryCache = app.state.registry_cache_memory cache_key = f"{method}_{path}" - if not no_cache and (cached_response := await cache.get(cache_key)): + if use_cache and (cached_response := await cache.get(cache_key)): assert isinstance(cached_response, tuple) # nosec return cast(tuple[dict, Mapping], cached_response) app_settings = get_application_settings(app) try: response, response_headers = await _retried_request( - app, path, method, **session_kwargs + app, path, method.upper(), **session_kwargs ) except httpx.RequestError as exc: msg = f"Unknown error while accessing registry: {exc!s} via {exc.request}" raise DirectorRuntimeError(msg=msg) from exc - if not no_cache and app_settings.DIRECTOR_REGISTRY_CACHING and method == "GET": + if app_settings.DIRECTOR_REGISTRY_CACHING and method.upper() == "GET": await cache.set( cache_key, (response, response_headers), @@ -236,16 +237,29 @@ async def _wait_until_registry_responsive(app: FastAPI) -> None: await _wait_until_registry_responsive(app) +async def _list_all_services_task(app: FastAPI) -> None: + with log_context(_logger, logging.INFO, msg="Listing all services"): + await list_services(app, ServiceType.ALL) + + def setup(app: FastAPI) -> None: async def on_startup() -> None: cache = Cache(Cache.MEMORY) assert isinstance(cache, SimpleMemoryCache) # nosec app.state.registry_cache_memory = cache await _setup_registry(app) + app_settings = get_application_settings(app) + app.state.auto_cache_task = None + if app_settings.DIRECTOR_REGISTRY_CACHING: + app.state.auto_cache_task = await start_periodic_task( + _list_all_services_task, + interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2, + task_name="director-auto-cache-task", + ) async def on_shutdown() -> None: - # nothing to do here - ... + if app.state.auto_cache_task: + await stop_periodic_task(app.state.auto_cache_task) app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) @@ -270,7 +284,9 @@ async def _list_repositories_gen( ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg="listing repositories"): path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - result, headers = await registry_request(app, path=path) # initial call + result, headers = await registry_request( + app, path=path, method="GET", use_cache=True + ) # initial call while True: if "Link" in headers: @@ -278,7 +294,7 @@ async def _list_repositories_gen( str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") ) prefetch_task = asyncio.create_task( - registry_request(app, path=next_path) + registry_request(app, path=next_path, method="GET", use_cache=True) ) else: prefetch_task = None @@ -300,14 +316,16 @@ async def list_image_tags_gen( ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - tags, headers = await registry_request(app, path=path) # initial call + tags, headers = await registry_request( + app, path=path, method="GET", use_cache=True + ) # initial call while True: if "Link" in headers: next_path = ( str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") ) prefetch_task = asyncio.create_task( - registry_request(app, path=next_path) + registry_request(app, path=next_path, method="GET", use_cache=True) ) else: prefetch_task = None @@ -342,7 +360,7 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None: SEE https://distribution.github.io/distribution/spec/api/#digest-header """ path = f"{image}/manifests/{tag}" - _, headers = await registry_request(app, path=path) + _, headers = await registry_request(app, path=path, method="GET", use_cache=True) headers = headers or {} return headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None) @@ -355,7 +373,9 @@ async def get_image_labels( _logger.debug("getting image labels of %s:%s", image, tag) path = f"{image}/manifests/{tag}" - request_result, headers = await registry_request(app, path=path) + request_result, headers = await registry_request( + app, path=path, method="GET", use_cache=True + ) v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"]) container_config: dict[str, Any] = v1_compatibility_key.get( "container_config", v1_compatibility_key["config"]