Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Director-v0: added background task that pre-builds the registry cache #6814

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 75 additions & 33 deletions services/director/src/simcore_service_director/registry_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import httpx
from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped]
from fastapi import FastAPI, status
from servicelib.background_task import start_periodic_task, stop_periodic_task
from servicelib.logging_utils import log_catch, log_context
from servicelib.utils import limited_as_completed
from tenacity import retry
Expand Down Expand Up @@ -189,26 +190,26 @@ async def registry_request(
app: FastAPI,
*,
path: str,
method: str = "GET",
no_cache: bool = False,
method: str,
use_cache: bool,
**session_kwargs,
) -> tuple[dict, Mapping]:
cache: SimpleMemoryCache = app.state.registry_cache_memory
cache_key = f"{method}_{path}"
if not no_cache and (cached_response := await cache.get(cache_key)):
if use_cache and (cached_response := await cache.get(cache_key)):
assert isinstance(cached_response, tuple) # nosec
return cast(tuple[dict, Mapping], cached_response)

app_settings = get_application_settings(app)
try:
response, response_headers = await _retried_request(
app, path, method, **session_kwargs
app, path, method.upper(), **session_kwargs
)
except httpx.RequestError as exc:
msg = f"Unknown error while accessing registry: {exc!s} via {exc.request}"
raise DirectorRuntimeError(msg=msg) from exc

if not no_cache and app_settings.DIRECTOR_REGISTRY_CACHING and method == "GET":
if app_settings.DIRECTOR_REGISTRY_CACHING and method.upper() == "GET":
await cache.set(
cache_key,
(response, response_headers),
Expand All @@ -218,10 +219,6 @@ async def registry_request(
return response, response_headers


async def _is_registry_responsive(app: FastAPI) -> None:
await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0)


async def _setup_registry(app: FastAPI) -> None:
@retry(
wait=wait_fixed(1),
Expand All @@ -230,22 +227,36 @@ async def _setup_registry(app: FastAPI) -> None:
reraise=True,
)
async def _wait_until_registry_responsive(app: FastAPI) -> None:
await _is_registry_responsive(app)
await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0)

with log_context(_logger, logging.INFO, msg="Connecting to docker registry"):
await _wait_until_registry_responsive(app)


async def _list_all_services_task(*, app: FastAPI) -> None:
with log_context(_logger, logging.INFO, msg="Updating cache with services"):
await list_services(app, ServiceType.ALL, update_cache=True)


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
cache = Cache(Cache.MEMORY)
assert isinstance(cache, SimpleMemoryCache) # nosec
app.state.registry_cache_memory = cache
await _setup_registry(app)
app_settings = get_application_settings(app)
app.state.auto_cache_task = None
if app_settings.DIRECTOR_REGISTRY_CACHING:
app.state.auto_cache_task = start_periodic_task(
_list_all_services_task,
interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2,
task_name="director-auto-cache-task",
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
app=app,
)

async def on_shutdown() -> None:
# nothing to do here
...
if app.state.auto_cache_task:
await stop_periodic_task(app.state.auto_cache_task)

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)
Expand All @@ -266,19 +277,23 @@ def _get_prefix(service_type: ServiceType) -> str:


async def _list_repositories_gen(
app: FastAPI, service_type: ServiceType
app: FastAPI, service_type: ServiceType, *, update_cache: bool
) -> AsyncGenerator[list[str], None]:
with log_context(_logger, logging.DEBUG, msg="listing repositories"):
path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
result, headers = await registry_request(app, path=path) # initial call
result, headers = await registry_request(
app, path=path, method="GET", use_cache=not update_cache
) # initial call

while True:
if "Link" in headers:
next_path = (
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
)
prefetch_task = asyncio.create_task(
registry_request(app, path=next_path)
registry_request(
app, path=next_path, method="GET", use_cache=not update_cache
)
)
else:
prefetch_task = None
Expand All @@ -296,27 +311,36 @@ async def _list_repositories_gen(


async def list_image_tags_gen(
app: FastAPI, image_key: str
app: FastAPI, image_key: str, *, update_cache=False
) -> AsyncGenerator[list[str], None]:
with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"):
path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
tags, headers = await registry_request(app, path=path) # initial call
tags, headers = await registry_request(
app, path=path, method="GET", use_cache=not update_cache
) # initial call
assert "tags" in tags # nosec
while True:
if "Link" in headers:
next_path = (
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
)
prefetch_task = asyncio.create_task(
registry_request(app, path=next_path)
registry_request(
app, path=next_path, method="GET", use_cache=not update_cache
)
)
else:
prefetch_task = None

yield list(
filter(
VERSION_REG.match,
tags["tags"],
yield (
list(
filter(
VERSION_REG.match,
tags["tags"],
)
)
if tags["tags"] is not None
else []
)
if prefetch_task:
tags, headers = await prefetch_task
Expand All @@ -342,20 +366,22 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None:
SEE https://distribution.github.io/distribution/spec/api/#digest-header
"""
path = f"{image}/manifests/{tag}"
_, headers = await registry_request(app, path=path)
_, headers = await registry_request(app, path=path, method="GET", use_cache=True)

headers = headers or {}
return headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None)


async def get_image_labels(
app: FastAPI, image: str, tag: str
app: FastAPI, image: str, tag: str, *, update_cache=False
) -> tuple[dict[str, str], str | None]:
"""Returns image labels and the image manifest digest"""

_logger.debug("getting image labels of %s:%s", image, tag)
path = f"{image}/manifests/{tag}"
request_result, headers = await registry_request(app, path=path)
request_result, headers = await registry_request(
app, path=path, method="GET", use_cache=not update_cache
)
v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"])
container_config: dict[str, Any] = v1_compatibility_key.get(
"container_config", v1_compatibility_key["config"]
Expand All @@ -371,10 +397,12 @@ async def get_image_labels(


async def get_image_details(
app: FastAPI, image_key: str, image_tag: str
app: FastAPI, image_key: str, image_tag: str, *, update_cache=False
) -> dict[str, Any]:
image_details: dict = {}
labels, image_manifest_digest = await get_image_labels(app, image_key, image_tag)
labels, image_manifest_digest = await get_image_labels(
app, image_key, image_tag, update_cache=update_cache
)

if image_manifest_digest:
# Adds manifest as extra key in the response similar to org.opencontainers.image.base.digest
Expand Down Expand Up @@ -402,11 +430,18 @@ async def get_image_details(
return image_details


async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]:
async def get_repo_details(
app: FastAPI, image_key: str, *, update_cache=False
) -> list[dict[str, Any]]:
repo_details = []
async for image_tags in list_image_tags_gen(app, image_key):
async for image_tags in list_image_tags_gen(
app, image_key, update_cache=update_cache
):
async for image_details_future in limited_as_completed(
(get_image_details(app, image_key, tag) for tag in image_tags),
(
get_image_details(app, image_key, tag, update_cache=update_cache)
for tag in image_tags
),
limit=get_application_settings(
app
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS,
Expand All @@ -417,16 +452,23 @@ async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]
return repo_details


async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]:
async def list_services(
app: FastAPI, service_type: ServiceType, *, update_cache=False
) -> list[dict]:
with log_context(_logger, logging.DEBUG, msg="listing services"):
services = []
concurrency_limit = get_application_settings(
app
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS
async for repos in _list_repositories_gen(app, service_type):
async for repos in _list_repositories_gen(
app, service_type, update_cache=update_cache
):
# only list as service if it actually contains the necessary labels
async for repo_details_future in limited_as_completed(
(get_repo_details(app, repo) for repo in repos),
(
get_repo_details(app, repo, update_cache=update_cache)
for repo in repos
),
limit=concurrency_limit,
):
with log_catch(_logger, reraise=False):
Expand Down
10 changes: 10 additions & 0 deletions services/director/tests/unit/test_registry_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import asyncio
import json
import time
from unittest import mock

import pytest
from fastapi import FastAPI
from pytest_benchmark.plugin import BenchmarkFixture
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
from settings_library.docker_registry import RegistrySettings
Expand Down Expand Up @@ -228,9 +230,17 @@ def configure_registry_caching(
)


@pytest.fixture
def with_disabled_auto_caching(mocker: MockerFixture) -> mock.Mock:
return mocker.patch(
"simcore_service_director.registry_proxy._list_all_services_task", autospec=True
)


async def test_registry_caching(
configure_registry_access: EnvVarsDict,
configure_registry_caching: EnvVarsDict,
with_disabled_auto_caching: mock.Mock,
app_settings: ApplicationSettings,
app: FastAPI,
push_services,
Expand Down
Loading