diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 9fa703a24db..a1adfe977a0 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -10,6 +10,7 @@ import httpx from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped] from fastapi import FastAPI, status +from servicelib.background_task import start_periodic_task, stop_periodic_task from servicelib.logging_utils import log_catch, log_context from servicelib.utils import limited_as_completed from tenacity import retry @@ -189,26 +190,26 @@ async def registry_request( app: FastAPI, *, path: str, - method: str = "GET", - no_cache: bool = False, + method: str, + use_cache: bool, **session_kwargs, ) -> tuple[dict, Mapping]: cache: SimpleMemoryCache = app.state.registry_cache_memory cache_key = f"{method}_{path}" - if not no_cache and (cached_response := await cache.get(cache_key)): + if use_cache and (cached_response := await cache.get(cache_key)): assert isinstance(cached_response, tuple) # nosec return cast(tuple[dict, Mapping], cached_response) app_settings = get_application_settings(app) try: response, response_headers = await _retried_request( - app, path, method, **session_kwargs + app, path, method.upper(), **session_kwargs ) except httpx.RequestError as exc: msg = f"Unknown error while accessing registry: {exc!s} via {exc.request}" raise DirectorRuntimeError(msg=msg) from exc - if not no_cache and app_settings.DIRECTOR_REGISTRY_CACHING and method == "GET": + if app_settings.DIRECTOR_REGISTRY_CACHING and method.upper() == "GET": await cache.set( cache_key, (response, response_headers), @@ -218,10 +219,6 @@ async def registry_request( return response, response_headers -async def _is_registry_responsive(app: FastAPI) -> None: - await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0) - - async def _setup_registry(app: FastAPI) -> None: @retry( wait=wait_fixed(1), @@ -230,22 +227,36 @@ async def _setup_registry(app: FastAPI) -> None: reraise=True, ) async def _wait_until_registry_responsive(app: FastAPI) -> None: - await _is_registry_responsive(app) + await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0) with log_context(_logger, logging.INFO, msg="Connecting to docker registry"): await _wait_until_registry_responsive(app) +async def _list_all_services_task(*, app: FastAPI) -> None: + with log_context(_logger, logging.INFO, msg="Updating cache with services"): + await list_services(app, ServiceType.ALL, update_cache=True) + + def setup(app: FastAPI) -> None: async def on_startup() -> None: cache = Cache(Cache.MEMORY) assert isinstance(cache, SimpleMemoryCache) # nosec app.state.registry_cache_memory = cache await _setup_registry(app) + app_settings = get_application_settings(app) + app.state.auto_cache_task = None + if app_settings.DIRECTOR_REGISTRY_CACHING: + app.state.auto_cache_task = start_periodic_task( + _list_all_services_task, + interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2, + task_name="director-auto-cache-task", + app=app, + ) async def on_shutdown() -> None: - # nothing to do here - ... + if app.state.auto_cache_task: + await stop_periodic_task(app.state.auto_cache_task) app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) @@ -266,11 +277,13 @@ def _get_prefix(service_type: ServiceType) -> str: async def _list_repositories_gen( - app: FastAPI, service_type: ServiceType + app: FastAPI, service_type: ServiceType, *, update_cache: bool ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg="listing repositories"): path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - result, headers = await registry_request(app, path=path) # initial call + result, headers = await registry_request( + app, path=path, method="GET", use_cache=not update_cache + ) # initial call while True: if "Link" in headers: @@ -278,7 +291,9 @@ async def _list_repositories_gen( str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") ) prefetch_task = asyncio.create_task( - registry_request(app, path=next_path) + registry_request( + app, path=next_path, method="GET", use_cache=not update_cache + ) ) else: prefetch_task = None @@ -296,27 +311,36 @@ async def _list_repositories_gen( async def list_image_tags_gen( - app: FastAPI, image_key: str + app: FastAPI, image_key: str, *, update_cache=False ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - tags, headers = await registry_request(app, path=path) # initial call + tags, headers = await registry_request( + app, path=path, method="GET", use_cache=not update_cache + ) # initial call + assert "tags" in tags # nosec while True: if "Link" in headers: next_path = ( str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") ) prefetch_task = asyncio.create_task( - registry_request(app, path=next_path) + registry_request( + app, path=next_path, method="GET", use_cache=not update_cache + ) ) else: prefetch_task = None - yield list( - filter( - VERSION_REG.match, - tags["tags"], + yield ( + list( + filter( + VERSION_REG.match, + tags["tags"], + ) ) + if tags["tags"] is not None + else [] ) if prefetch_task: tags, headers = await prefetch_task @@ -342,20 +366,22 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None: SEE https://distribution.github.io/distribution/spec/api/#digest-header """ path = f"{image}/manifests/{tag}" - _, headers = await registry_request(app, path=path) + _, headers = await registry_request(app, path=path, method="GET", use_cache=True) headers = headers or {} return headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None) async def get_image_labels( - app: FastAPI, image: str, tag: str + app: FastAPI, image: str, tag: str, *, update_cache=False ) -> tuple[dict[str, str], str | None]: """Returns image labels and the image manifest digest""" _logger.debug("getting image labels of %s:%s", image, tag) path = f"{image}/manifests/{tag}" - request_result, headers = await registry_request(app, path=path) + request_result, headers = await registry_request( + app, path=path, method="GET", use_cache=not update_cache + ) v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"]) container_config: dict[str, Any] = v1_compatibility_key.get( "container_config", v1_compatibility_key["config"] @@ -371,10 +397,12 @@ async def get_image_labels( async def get_image_details( - app: FastAPI, image_key: str, image_tag: str + app: FastAPI, image_key: str, image_tag: str, *, update_cache=False ) -> dict[str, Any]: image_details: dict = {} - labels, image_manifest_digest = await get_image_labels(app, image_key, image_tag) + labels, image_manifest_digest = await get_image_labels( + app, image_key, image_tag, update_cache=update_cache + ) if image_manifest_digest: # Adds manifest as extra key in the response similar to org.opencontainers.image.base.digest @@ -402,11 +430,18 @@ async def get_image_details( return image_details -async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]: +async def get_repo_details( + app: FastAPI, image_key: str, *, update_cache=False +) -> list[dict[str, Any]]: repo_details = [] - async for image_tags in list_image_tags_gen(app, image_key): + async for image_tags in list_image_tags_gen( + app, image_key, update_cache=update_cache + ): async for image_details_future in limited_as_completed( - (get_image_details(app, image_key, tag) for tag in image_tags), + ( + get_image_details(app, image_key, tag, update_cache=update_cache) + for tag in image_tags + ), limit=get_application_settings( app ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, @@ -417,16 +452,23 @@ async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]] return repo_details -async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]: +async def list_services( + app: FastAPI, service_type: ServiceType, *, update_cache=False +) -> list[dict]: with log_context(_logger, logging.DEBUG, msg="listing services"): services = [] concurrency_limit = get_application_settings( app ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS - async for repos in _list_repositories_gen(app, service_type): + async for repos in _list_repositories_gen( + app, service_type, update_cache=update_cache + ): # only list as service if it actually contains the necessary labels async for repo_details_future in limited_as_completed( - (get_repo_details(app, repo) for repo in repos), + ( + get_repo_details(app, repo, update_cache=update_cache) + for repo in repos + ), limit=concurrency_limit, ): with log_catch(_logger, reraise=False): diff --git a/services/director/tests/unit/test_registry_proxy.py b/services/director/tests/unit/test_registry_proxy.py index 7861179323c..538e4220f0d 100644 --- a/services/director/tests/unit/test_registry_proxy.py +++ b/services/director/tests/unit/test_registry_proxy.py @@ -4,10 +4,12 @@ import asyncio import json import time +from unittest import mock import pytest from fastapi import FastAPI from pytest_benchmark.plugin import BenchmarkFixture +from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.docker_registry import RegistrySettings @@ -228,9 +230,17 @@ def configure_registry_caching( ) +@pytest.fixture +def with_disabled_auto_caching(mocker: MockerFixture) -> mock.Mock: + return mocker.patch( + "simcore_service_director.registry_proxy._list_all_services_task", autospec=True + ) + + async def test_registry_caching( configure_registry_access: EnvVarsDict, configure_registry_caching: EnvVarsDict, + with_disabled_auto_caching: mock.Mock, app_settings: ApplicationSettings, app: FastAPI, push_services,