From aee933172d409ee5b585bbdbecc1c454cf727669 Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Sun, 21 Jan 2024 18:14:23 +0000 Subject: [PATCH] Limit concurrent `flux build`s of the same path --- flux_local/kustomize.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/flux_local/kustomize.py b/flux_local/kustomize.py index 7ac10af5..eaeb0f9d 100644 --- a/flux_local/kustomize.py +++ b/flux_local/kustomize.py @@ -35,6 +35,8 @@ """ from aiofiles.ospath import isdir +import asyncio +from contextlib import asynccontextmanager import logging from pathlib import Path import tempfile @@ -65,6 +67,8 @@ FLUX_BIN = "flux" HELM_RELEASE_KIND = "HelmRelease" +# Used to limit access to specific resources +_LOCK_MAP: dict[str, asyncio.Lock] = {} class Kustomize: """Library for issuing a kustomize command.""" @@ -189,6 +193,19 @@ async def run(self, stdin: bytes | None = None) -> bytes: return self._out +@asynccontextmanager +async def _resource_lock(key: str): + """Run while holding a lock for the specified resource. + + This is not threadsafe and expected to be run in the asyncio loop. + """ + if not (lock := _LOCK_MAP.get(key)): + lock = asyncio.Lock() + _LOCK_MAP[key] = lock + async with lock: + yield + + class FluxBuild(Task): """A task that issues a flux build command.""" @@ -230,7 +247,10 @@ async def run(self, stdin: bytes | None = None) -> bytes: input_ks = str(kustomization_data).encode("utf-8") task = Command(args, cwd=None, exc=KustomizeException) - return await task.run(stdin=input_ks) + # `flux build` may mutate `kustomization.yaml` so we need to use the path as a resource key + resource_key = str(self._path.resolve()) + async with _resource_lock(resource_key): + return await task.run(stdin=input_ks) def __str__(self) -> str: """Render as a debug string."""