From 47a79d9a198b22da56d6f0ccd65f88be2be5f21c Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 14:49:28 +0100 Subject: [PATCH 1/3] refactoring, setup background task --- .../registry_proxy.py | 46 +++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 9fa703a24db..5b71c3dc4dc 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -10,6 +10,7 @@ import httpx from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped] from fastapi import FastAPI, status +from servicelib.background_task import start_periodic_task, stop_periodic_task from servicelib.logging_utils import log_catch, log_context from servicelib.utils import limited_as_completed from tenacity import retry @@ -189,26 +190,26 @@ async def registry_request( app: FastAPI, *, path: str, - method: str = "GET", - no_cache: bool = False, + method: str, + use_cache: bool, **session_kwargs, ) -> tuple[dict, Mapping]: cache: SimpleMemoryCache = app.state.registry_cache_memory cache_key = f"{method}_{path}" - if not no_cache and (cached_response := await cache.get(cache_key)): + if use_cache and (cached_response := await cache.get(cache_key)): assert isinstance(cached_response, tuple) # nosec return cast(tuple[dict, Mapping], cached_response) app_settings = get_application_settings(app) try: response, response_headers = await _retried_request( - app, path, method, **session_kwargs + app, path, method.upper(), **session_kwargs ) except httpx.RequestError as exc: msg = f"Unknown error while accessing registry: {exc!s} via {exc.request}" raise DirectorRuntimeError(msg=msg) from exc - if not no_cache and app_settings.DIRECTOR_REGISTRY_CACHING and method == "GET": + if app_settings.DIRECTOR_REGISTRY_CACHING and method.upper() == "GET": await cache.set( cache_key, (response, response_headers), @@ -236,16 +237,29 @@ async def _wait_until_registry_responsive(app: FastAPI) -> None: await _wait_until_registry_responsive(app) +async def _list_all_services_task(app: FastAPI) -> None: + with log_context(_logger, logging.INFO, msg="Listing all services"): + await list_services(app, ServiceType.ALL) + + def setup(app: FastAPI) -> None: async def on_startup() -> None: cache = Cache(Cache.MEMORY) assert isinstance(cache, SimpleMemoryCache) # nosec app.state.registry_cache_memory = cache await _setup_registry(app) + app_settings = get_application_settings(app) + app.state.auto_cache_task = None + if app_settings.DIRECTOR_REGISTRY_CACHING: + app.state.auto_cache_task = await start_periodic_task( + _list_all_services_task, + interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2, + task_name="director-auto-cache-task", + ) async def on_shutdown() -> None: - # nothing to do here - ... + if app.state.auto_cache_task: + await stop_periodic_task(app.state.auto_cache_task) app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) @@ -270,7 +284,9 @@ async def _list_repositories_gen( ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg="listing repositories"): path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - result, headers = await registry_request(app, path=path) # initial call + result, headers = await registry_request( + app, path=path, method="GET", use_cache=True + ) # initial call while True: if "Link" in headers: @@ -278,7 +294,7 @@ async def _list_repositories_gen( str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") ) prefetch_task = asyncio.create_task( - registry_request(app, path=next_path) + registry_request(app, path=next_path, method="GET", use_cache=True) ) else: prefetch_task = None @@ -300,14 +316,16 @@ async def list_image_tags_gen( ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" - tags, headers = await registry_request(app, path=path) # initial call + tags, headers = await registry_request( + app, path=path, method="GET", use_cache=True + ) # initial call while True: if "Link" in headers: next_path = ( str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") ) prefetch_task = asyncio.create_task( - registry_request(app, path=next_path) + registry_request(app, path=next_path, method="GET", use_cache=True) ) else: prefetch_task = None @@ -342,7 +360,7 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None: SEE https://distribution.github.io/distribution/spec/api/#digest-header """ path = f"{image}/manifests/{tag}" - _, headers = await registry_request(app, path=path) + _, headers = await registry_request(app, path=path, method="GET", use_cache=True) headers = headers or {} return headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None) @@ -355,7 +373,9 @@ async def get_image_labels( _logger.debug("getting image labels of %s:%s", image, tag) path = f"{image}/manifests/{tag}" - request_result, headers = await registry_request(app, path=path) + request_result, headers = await registry_request( + app, path=path, method="GET", use_cache=True + ) v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"]) container_config: dict[str, Any] = v1_compatibility_key.get( "container_config", v1_compatibility_key["config"] From 9e5ea53e6128f75b83229da3e34b6ef2b039670e Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 22 Nov 2024 16:11:42 +0100 Subject: [PATCH 2/3] done --- .../registry_proxy.py | 67 ++++++++++++------- 1 file changed, 42 insertions(+), 25 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 5b71c3dc4dc..216d8e4b522 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -219,10 +219,6 @@ async def registry_request( return response, response_headers -async def _is_registry_responsive(app: FastAPI) -> None: - await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0) - - async def _setup_registry(app: FastAPI) -> None: @retry( wait=wait_fixed(1), @@ -231,15 +227,15 @@ async def _setup_registry(app: FastAPI) -> None: reraise=True, ) async def _wait_until_registry_responsive(app: FastAPI) -> None: - await _is_registry_responsive(app) + await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0) with log_context(_logger, logging.INFO, msg="Connecting to docker registry"): await _wait_until_registry_responsive(app) -async def _list_all_services_task(app: FastAPI) -> None: - with log_context(_logger, logging.INFO, msg="Listing all services"): - await list_services(app, ServiceType.ALL) +async def _list_all_services_task(*, app: FastAPI) -> None: + with log_context(_logger, logging.INFO, msg="Updating cache with services"): + await list_services(app, ServiceType.ALL, update_cache=True) def setup(app: FastAPI) -> None: @@ -251,10 +247,11 @@ async def on_startup() -> None: app_settings = get_application_settings(app) app.state.auto_cache_task = None if app_settings.DIRECTOR_REGISTRY_CACHING: - app.state.auto_cache_task = await start_periodic_task( + app.state.auto_cache_task = start_periodic_task( _list_all_services_task, interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2, task_name="director-auto-cache-task", + app=app, ) async def on_shutdown() -> None: @@ -280,12 +277,12 @@ def _get_prefix(service_type: ServiceType) -> str: async def _list_repositories_gen( - app: FastAPI, service_type: ServiceType + app: FastAPI, service_type: ServiceType, *, update_cache: bool ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg="listing repositories"): path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" result, headers = await registry_request( - app, path=path, method="GET", use_cache=True + app, path=path, method="GET", use_cache=not update_cache ) # initial call while True: @@ -294,7 +291,9 @@ async def _list_repositories_gen( str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") ) prefetch_task = asyncio.create_task( - registry_request(app, path=next_path, method="GET", use_cache=True) + registry_request( + app, path=next_path, method="GET", use_cache=not update_cache + ) ) else: prefetch_task = None @@ -312,12 +311,12 @@ async def _list_repositories_gen( async def list_image_tags_gen( - app: FastAPI, image_key: str + app: FastAPI, image_key: str, *, update_cache=False ) -> AsyncGenerator[list[str], None]: with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"): path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}" tags, headers = await registry_request( - app, path=path, method="GET", use_cache=True + app, path=path, method="GET", use_cache=not update_cache ) # initial call while True: if "Link" in headers: @@ -325,7 +324,9 @@ async def list_image_tags_gen( str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/") ) prefetch_task = asyncio.create_task( - registry_request(app, path=next_path, method="GET", use_cache=True) + registry_request( + app, path=next_path, method="GET", use_cache=not update_cache + ) ) else: prefetch_task = None @@ -367,14 +368,14 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None: async def get_image_labels( - app: FastAPI, image: str, tag: str + app: FastAPI, image: str, tag: str, *, update_cache=False ) -> tuple[dict[str, str], str | None]: """Returns image labels and the image manifest digest""" _logger.debug("getting image labels of %s:%s", image, tag) path = f"{image}/manifests/{tag}" request_result, headers = await registry_request( - app, path=path, method="GET", use_cache=True + app, path=path, method="GET", use_cache=not update_cache ) v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"]) container_config: dict[str, Any] = v1_compatibility_key.get( @@ -391,10 +392,12 @@ async def get_image_labels( async def get_image_details( - app: FastAPI, image_key: str, image_tag: str + app: FastAPI, image_key: str, image_tag: str, *, update_cache=False ) -> dict[str, Any]: image_details: dict = {} - labels, image_manifest_digest = await get_image_labels(app, image_key, image_tag) + labels, image_manifest_digest = await get_image_labels( + app, image_key, image_tag, update_cache=update_cache + ) if image_manifest_digest: # Adds manifest as extra key in the response similar to org.opencontainers.image.base.digest @@ -422,11 +425,18 @@ async def get_image_details( return image_details -async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]: +async def get_repo_details( + app: FastAPI, image_key: str, *, update_cache=False +) -> list[dict[str, Any]]: repo_details = [] - async for image_tags in list_image_tags_gen(app, image_key): + async for image_tags in list_image_tags_gen( + app, image_key, update_cache=update_cache + ): async for image_details_future in limited_as_completed( - (get_image_details(app, image_key, tag) for tag in image_tags), + ( + get_image_details(app, image_key, tag, update_cache=update_cache) + for tag in image_tags + ), limit=get_application_settings( app ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS, @@ -437,16 +447,23 @@ async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]] return repo_details -async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]: +async def list_services( + app: FastAPI, service_type: ServiceType, *, update_cache=False +) -> list[dict]: with log_context(_logger, logging.DEBUG, msg="listing services"): services = [] concurrency_limit = get_application_settings( app ).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS - async for repos in _list_repositories_gen(app, service_type): + async for repos in _list_repositories_gen( + app, service_type, update_cache=update_cache + ): # only list as service if it actually contains the necessary labels async for repo_details_future in limited_as_completed( - (get_repo_details(app, repo) for repo in repos), + ( + get_repo_details(app, repo, update_cache=update_cache) + for repo in repos + ), limit=concurrency_limit, ): with log_catch(_logger, reraise=False): From af754e6c8b9f7eff96dcfe949f44ba15ee64d207 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 25 Nov 2024 10:43:52 +0100 Subject: [PATCH 3/3] disable auto-caching for test --- .../src/simcore_service_director/registry_proxy.py | 13 +++++++++---- services/director/tests/unit/test_registry_proxy.py | 10 ++++++++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 216d8e4b522..a1adfe977a0 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -318,6 +318,7 @@ async def list_image_tags_gen( tags, headers = await registry_request( app, path=path, method="GET", use_cache=not update_cache ) # initial call + assert "tags" in tags # nosec while True: if "Link" in headers: next_path = ( @@ -331,11 +332,15 @@ async def list_image_tags_gen( else: prefetch_task = None - yield list( - filter( - VERSION_REG.match, - tags["tags"], + yield ( + list( + filter( + VERSION_REG.match, + tags["tags"], + ) ) + if tags["tags"] is not None + else [] ) if prefetch_task: tags, headers = await prefetch_task diff --git a/services/director/tests/unit/test_registry_proxy.py b/services/director/tests/unit/test_registry_proxy.py index 7861179323c..538e4220f0d 100644 --- a/services/director/tests/unit/test_registry_proxy.py +++ b/services/director/tests/unit/test_registry_proxy.py @@ -4,10 +4,12 @@ import asyncio import json import time +from unittest import mock import pytest from fastapi import FastAPI from pytest_benchmark.plugin import BenchmarkFixture +from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.docker_registry import RegistrySettings @@ -228,9 +230,17 @@ def configure_registry_caching( ) +@pytest.fixture +def with_disabled_auto_caching(mocker: MockerFixture) -> mock.Mock: + return mocker.patch( + "simcore_service_director.registry_proxy._list_all_services_task", autospec=True + ) + + async def test_registry_caching( configure_registry_access: EnvVarsDict, configure_registry_caching: EnvVarsDict, + with_disabled_auto_caching: mock.Mock, app_settings: ApplicationSettings, app: FastAPI, push_services,