Skip to content

Commit

Permalink
refactoring, setup background task
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 25, 2024
1 parent 3fe2e6f commit 79aab7f
Showing 1 changed file with 33 additions and 13 deletions.
46 changes: 33 additions & 13 deletions services/director/src/simcore_service_director/registry_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import httpx
from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped]
from fastapi import FastAPI, status
from servicelib.background_task import start_periodic_task, stop_periodic_task
from servicelib.logging_utils import log_catch, log_context
from servicelib.utils import limited_as_completed
from tenacity import retry
Expand Down Expand Up @@ -189,26 +190,26 @@ async def registry_request(
app: FastAPI,
*,
path: str,
method: str = "GET",
no_cache: bool = False,
method: str,
use_cache: bool,
**session_kwargs,
) -> tuple[dict, Mapping]:
cache: SimpleMemoryCache = app.state.registry_cache_memory
cache_key = f"{method}_{path}"
if not no_cache and (cached_response := await cache.get(cache_key)):
if use_cache and (cached_response := await cache.get(cache_key)):
assert isinstance(cached_response, tuple) # nosec
return cast(tuple[dict, Mapping], cached_response)

app_settings = get_application_settings(app)
try:
response, response_headers = await _retried_request(
app, path, method, **session_kwargs
app, path, method.upper(), **session_kwargs
)
except httpx.RequestError as exc:
msg = f"Unknown error while accessing registry: {exc!s} via {exc.request}"
raise DirectorRuntimeError(msg=msg) from exc

if not no_cache and app_settings.DIRECTOR_REGISTRY_CACHING and method == "GET":
if app_settings.DIRECTOR_REGISTRY_CACHING and method.upper() == "GET":
await cache.set(
cache_key,
(response, response_headers),
Expand Down Expand Up @@ -236,16 +237,29 @@ async def _wait_until_registry_responsive(app: FastAPI) -> None:
await _wait_until_registry_responsive(app)


async def _list_all_services_task(app: FastAPI) -> None:
with log_context(_logger, logging.INFO, msg="Listing all services"):
await list_services(app, ServiceType.ALL)


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
cache = Cache(Cache.MEMORY)
assert isinstance(cache, SimpleMemoryCache) # nosec
app.state.registry_cache_memory = cache
await _setup_registry(app)
app_settings = get_application_settings(app)
app.state.auto_cache_task = None
if app_settings.DIRECTOR_REGISTRY_CACHING:
app.state.auto_cache_task = await start_periodic_task(
_list_all_services_task,
interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2,
task_name="director-auto-cache-task",
)

async def on_shutdown() -> None:
# nothing to do here
...
if app.state.auto_cache_task:
await stop_periodic_task(app.state.auto_cache_task)

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)
Expand All @@ -270,15 +284,17 @@ async def _list_repositories_gen(
) -> AsyncGenerator[list[str], None]:
with log_context(_logger, logging.DEBUG, msg="listing repositories"):
path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
result, headers = await registry_request(app, path=path) # initial call
result, headers = await registry_request(
app, path=path, method="GET", use_cache=True
) # initial call

while True:
if "Link" in headers:
next_path = (
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
)
prefetch_task = asyncio.create_task(
registry_request(app, path=next_path)
registry_request(app, path=next_path, method="GET", use_cache=True)
)
else:
prefetch_task = None
Expand All @@ -300,14 +316,16 @@ async def list_image_tags_gen(
) -> AsyncGenerator[list[str], None]:
with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"):
path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
tags, headers = await registry_request(app, path=path) # initial call
tags, headers = await registry_request(
app, path=path, method="GET", use_cache=True
) # initial call
while True:
if "Link" in headers:
next_path = (
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
)
prefetch_task = asyncio.create_task(
registry_request(app, path=next_path)
registry_request(app, path=next_path, method="GET", use_cache=True)
)
else:
prefetch_task = None
Expand Down Expand Up @@ -342,7 +360,7 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None:
SEE https://distribution.github.io/distribution/spec/api/#digest-header
"""
path = f"{image}/manifests/{tag}"
_, headers = await registry_request(app, path=path)
_, headers = await registry_request(app, path=path, method="GET", use_cache=True)

headers = headers or {}
return headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None)
Expand All @@ -355,7 +373,9 @@ async def get_image_labels(

_logger.debug("getting image labels of %s:%s", image, tag)
path = f"{image}/manifests/{tag}"
request_result, headers = await registry_request(app, path=path)
request_result, headers = await registry_request(
app, path=path, method="GET", use_cache=True
)
v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"])
container_config: dict[str, Any] = v1_compatibility_key.get(
"container_config", v1_compatibility_key["config"]
Expand Down

0 comments on commit 79aab7f

Please sign in to comment.