From d7006678285b4fae261acd8afdd2cd35e1fa94a7 Mon Sep 17 00:00:00 2001
From: a
Date: Fri, 3 Nov 2023 17:28:01 -0500
Subject: [PATCH] feat(sync): local tmp store
Signed-off-by: a
---
pkg/cli/server/root.go | 6 -
pkg/extensions/config/sync/config.go | 1 +
pkg/extensions/extension_sync.go | 39 +-
.../sync/{local.go => destination.go} | 29 +-
pkg/extensions/sync/service.go | 47 +-
pkg/extensions/sync/sync.go | 2 +-
pkg/extensions/sync/sync_internal_test.go | 12 +-
pkg/storage/imagestore/imagestore.go | 8 +
test/blackbox/sync_remote.bats | 534 ++++++++++++++++++
9 files changed, 623 insertions(+), 55 deletions(-)
rename pkg/extensions/sync/{local.go => destination.go} (88%)
create mode 100644 test/blackbox/sync_remote.bats
diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go
index d49f3902aa..b6b256fa3f 100644
--- a/pkg/cli/server/root.go
+++ b/pkg/cli/server/root.go
@@ -381,12 +381,6 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error {
return zerr.ErrBadConfig
}
- // enforce filesystem storage in case sync feature is enabled
- if config.Extensions != nil && config.Extensions.Sync != nil {
- log.Error().Err(zerr.ErrBadConfig).Msg("sync supports only filesystem storage")
-
- return zerr.ErrBadConfig
- }
}
// enforce s3 driver on subpaths in case of using storage driver
diff --git a/pkg/extensions/config/sync/config.go b/pkg/extensions/config/sync/config.go
index 96c1361d49..eda4844b28 100644
--- a/pkg/extensions/config/sync/config.go
+++ b/pkg/extensions/config/sync/config.go
@@ -15,6 +15,7 @@ type Credentials struct {
type Config struct {
Enable *bool
CredentialsFile string
+ TmpDir string
Registries []RegistryConfig
}
diff --git a/pkg/extensions/extension_sync.go b/pkg/extensions/extension_sync.go
index 852112dc0d..a8f9c16f9f 100644
--- a/pkg/extensions/extension_sync.go
+++ b/pkg/extensions/extension_sync.go
@@ -6,6 +6,7 @@ package extensions
import (
"net"
"net/url"
+ "os"
"strings"
zerr "zotregistry.io/zot/errors"
@@ -41,23 +42,31 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
isPeriodical := len(registryConfig.Content) != 0 && registryConfig.PollInterval != 0
isOnDemand := registryConfig.OnDemand
- if isPeriodical || isOnDemand {
- service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile,
- storeController, metaDB, log)
- if err != nil {
- return nil, err
- }
+ if !(isPeriodical || isOnDemand) {
+ continue
+ }
- if isPeriodical {
- // add to task scheduler periodic sync
- gen := sync.NewTaskGenerator(service, log)
- sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
- }
+ tmpDir := config.Extensions.Sync.TmpDir
+ if tmpDir == "" {
+ // use an os tmpdir as tmpdir if not set
+ tmpDir = os.TempDir()
+ }
- if isOnDemand {
- // onDemand services used in routes.go
- onDemand.Add(service)
- }
+ service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile, tmpDir,
+ storeController, metaDB, log)
+ if err != nil {
+ return nil, err
+ }
+
+ if isPeriodical {
+ // add to task scheduler periodic sync
+ gen := sync.NewTaskGenerator(service, log)
+ sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
+ }
+
+ if isOnDemand {
+ // onDemand services used in routes.go
+ onDemand.Add(service)
}
}
diff --git a/pkg/extensions/sync/local.go b/pkg/extensions/sync/destination.go
similarity index 88%
rename from pkg/extensions/sync/local.go
rename to pkg/extensions/sync/destination.go
index 86ccf357c5..36483d4cf1 100644
--- a/pkg/extensions/sync/local.go
+++ b/pkg/extensions/sync/destination.go
@@ -29,25 +29,34 @@ import (
storageTypes "zotregistry.io/zot/pkg/storage/types"
)
-type LocalRegistry struct {
+type DestinationRegistry struct {
storeController storage.StoreController
tempStorage OciLayoutStorage
metaDB mTypes.MetaDB
log log.Logger
}
-func NewLocalRegistry(storeController storage.StoreController, metaDB mTypes.MetaDB, log log.Logger) Local {
- return &LocalRegistry{
+func NewDestinationRegistry(
+ storeController storage.StoreController,
+ tmpStorage OciLayoutStorage,
+ metaDB mTypes.MetaDB,
+ log log.Logger,
+) Destination {
+ if tmpStorage == nil {
+ // to allow passing nil we can do this, noting that it will only work for a local StoreController
+ tmpStorage = NewOciLayoutStorage(storeController)
+ }
+ return &DestinationRegistry{
storeController: storeController,
metaDB: metaDB,
// first we sync from remote (using containers/image copy from docker:// to oci:) to a temp imageStore
// then we copy the image from tempStorage to zot's storage using ImageStore APIs
- tempStorage: NewOciLayoutStorage(storeController),
+ tempStorage: tmpStorage,
log: log,
}
}
-func (registry *LocalRegistry) CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error) {
+func (registry *DestinationRegistry) CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error) {
// check image already synced
imageStore := registry.storeController.GetImageStore(repo)
@@ -75,16 +84,16 @@ func (registry *LocalRegistry) CanSkipImage(repo, tag string, imageDigest digest
return true, nil
}
-func (registry *LocalRegistry) GetContext() *types.SystemContext {
+func (registry *DestinationRegistry) GetContext() *types.SystemContext {
return registry.tempStorage.GetContext()
}
-func (registry *LocalRegistry) GetImageReference(repo, reference string) (types.ImageReference, error) {
+func (registry *DestinationRegistry) GetImageReference(repo, reference string) (types.ImageReference, error) {
return registry.tempStorage.GetImageReference(repo, reference)
}
// finalize a syncing image.
-func (registry *LocalRegistry) CommitImage(imageReference types.ImageReference, repo, reference string) error {
+func (registry *DestinationRegistry) CommitImage(imageReference types.ImageReference, repo, reference string) error {
imageStore := registry.storeController.GetImageStore(repo)
tempImageStore := getImageStoreFromImageReference(imageReference, repo, reference)
@@ -180,7 +189,7 @@ func (registry *LocalRegistry) CommitImage(imageReference types.ImageReference,
return nil
}
-func (registry *LocalRegistry) copyManifest(repo string, manifestContent []byte, reference string,
+func (registry *DestinationRegistry) copyManifest(repo string, manifestContent []byte, reference string,
tempImageStore storageTypes.ImageStore,
) error {
imageStore := registry.storeController.GetImageStore(repo)
@@ -239,7 +248,7 @@ func (registry *LocalRegistry) copyManifest(repo string, manifestContent []byte,
}
// Copy a blob from one image store to another image store.
-func (registry *LocalRegistry) copyBlob(repo string, blobDigest digest.Digest, blobMediaType string,
+func (registry *DestinationRegistry) copyBlob(repo string, blobDigest digest.Digest, blobMediaType string,
tempImageStore storageTypes.ImageStore,
) error {
imageStore := registry.storeController.GetImageStore(repo)
diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go
index a121c7cef1..594044df8e 100644
--- a/pkg/extensions/sync/service.go
+++ b/pkg/extensions/sync/service.go
@@ -20,13 +20,14 @@ import (
"zotregistry.io/zot/pkg/log"
mTypes "zotregistry.io/zot/pkg/meta/types"
"zotregistry.io/zot/pkg/storage"
+ "zotregistry.io/zot/pkg/storage/local"
)
type BaseService struct {
config syncconf.RegistryConfig
credentials syncconf.CredentialsFile
remote Remote
- local Local
+ destination Destination
retryOptions *retry.RetryOptions
contentManager ContentManager
storeController storage.StoreController
@@ -37,8 +38,13 @@ type BaseService struct {
log log.Logger
}
-func New(opts syncconf.RegistryConfig, credentialsFilepath string,
- storeController storage.StoreController, metadb mTypes.MetaDB, log log.Logger,
+func New(
+ opts syncconf.RegistryConfig,
+ credentialsFilepath string,
+ tmpDir string,
+ storeController storage.StoreController,
+ metadb mTypes.MetaDB,
+ log log.Logger,
) (Service, error) {
service := &BaseService{}
@@ -60,7 +66,14 @@ func New(opts syncconf.RegistryConfig, credentialsFilepath string,
service.credentials = credentialsFile
service.contentManager = NewContentManager(opts.Content, log)
- service.local = NewLocalRegistry(storeController, metadb, log)
+
+ tmpImageStore := local.NewImageStore(tmpDir,
+ false, false, log, nil, nil, nil,
+ )
+
+ tmpStorage := NewOciLayoutStorage(storage.StoreController{DefaultStore: tmpImageStore})
+
+ service.destination = NewDestinationRegistry(storeController, tmpStorage, metadb, log)
retryOptions := &retry.RetryOptions{}
@@ -289,7 +302,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
service.log.Info().Str("repo", repo).Msgf("sync: syncing tags %v", tags)
// apply content.destination rule
- localRepo := service.contentManager.GetRepoDestination(repo)
+ destinationRepo := service.contentManager.GetRepoDestination(repo)
for _, tag := range tags {
select {
@@ -305,7 +318,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
var manifestDigest digest.Digest
if err = retry.RetryIfNecessary(ctx, func() error {
- manifestDigest, err = service.syncTag(ctx, localRepo, repo, tag)
+ manifestDigest, err = service.syncTag(ctx, destinationRepo, repo, tag)
return err
}, service.retryOptions); err != nil {
@@ -322,7 +335,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
if manifestDigest != "" {
if err = retry.RetryIfNecessary(ctx, func() error {
- err = service.references.SyncAll(ctx, localRepo, repo, manifestDigest.String())
+ err = service.references.SyncAll(ctx, destinationRepo, repo, manifestDigest.String())
if errors.Is(err, zerr.ErrSyncReferrerNotFound) {
return nil
}
@@ -342,8 +355,8 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
return nil
}
-func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, tag string) (digest.Digest, error) {
- copyOptions := getCopyOptions(service.remote.GetContext(), service.local.GetContext())
+func (service *BaseService) syncTag(ctx context.Context, destinationRepo, remoteRepo, tag string) (digest.Digest, error) {
+ copyOptions := getCopyOptions(service.remote.GetContext(), service.destination.GetContext())
policyContext, err := getPolicyContext(service.log)
if err != nil {
@@ -386,38 +399,38 @@ func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo,
}
}
- skipImage, err := service.local.CanSkipImage(localRepo, tag, manifestDigest)
+ skipImage, err := service.destination.CanSkipImage(destinationRepo, tag, manifestDigest)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
- Str("repo", localRepo).Str("reference", tag).
+ Str("repo", destinationRepo).Str("reference", tag).
Msg("couldn't check if the local image can be skipped")
}
if !skipImage {
- localImageRef, err := service.local.GetImageReference(localRepo, tag)
+ localImageRef, err := service.destination.GetImageReference(destinationRepo, tag)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
- Str("repo", localRepo).Str("reference", tag).Msg("couldn't get a local image reference")
+ Str("repo", destinationRepo).Str("reference", tag).Msg("couldn't get a local image reference")
return "", err
}
service.log.Info().Str("remote image", remoteImageRef.DockerReference().String()).
- Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("syncing image")
+ Str("local image", fmt.Sprintf("%s:%s", destinationRepo, tag)).Msg("syncing image")
_, err = copy.Image(ctx, policyContext, localImageRef, remoteImageRef, ©Options)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("remote image", remoteImageRef.DockerReference().String()).
- Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("coulnd't sync image")
+ Str("local image", fmt.Sprintf("%s:%s", destinationRepo, tag)).Msg("coulnd't sync image")
return "", err
}
- err = service.local.CommitImage(localImageRef, localRepo, tag)
+ err = service.destination.CommitImage(localImageRef, destinationRepo, tag)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
- Str("repo", localRepo).Str("reference", tag).Msg("couldn't commit image to local image store")
+ Str("repo", destinationRepo).Str("reference", tag).Msg("couldn't commit image to local image store")
return "", err
}
diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go
index bb4dd0bf4e..c285c5726d 100644
--- a/pkg/extensions/sync/sync.go
+++ b/pkg/extensions/sync/sync.go
@@ -65,7 +65,7 @@ type Remote interface {
}
// Local registry.
-type Local interface {
+type Destination interface {
Registry
// Check if an image is already synced
CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error)
diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go
index 4f025cd90c..538318c0cd 100644
--- a/pkg/extensions/sync/sync_internal_test.go
+++ b/pkg/extensions/sync/sync_internal_test.go
@@ -162,7 +162,7 @@ func TestService(t *testing.T) {
URLs: []string{"http://localhost"},
}
- service, err := New(conf, "", storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
+ service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
So(err, ShouldBeNil)
err = service.SyncRepo(context.Background(), "repo")
@@ -170,7 +170,7 @@ func TestService(t *testing.T) {
})
}
-func TestLocalRegistry(t *testing.T) {
+func TestDestinationRegistry(t *testing.T) {
Convey("make StoreController", t, func() {
dir := t.TempDir()
@@ -185,7 +185,7 @@ func TestLocalRegistry(t *testing.T) {
syncImgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)
repoName := "repo"
- registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, log)
+ registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, nil, log)
imageReference, err := registry.GetImageReference(repoName, "1.0")
So(err, ShouldBeNil)
So(imageReference, ShouldNotBeNil)
@@ -302,7 +302,7 @@ func TestLocalRegistry(t *testing.T) {
syncImgStore := local.NewImageStore(dir, true, true, log, metrics, linter, cacheDriver)
repoName := "repo"
- registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, log)
+ registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, nil, log)
err = registry.CommitImage(imageReference, repoName, "1.0")
So(err, ShouldBeNil)
@@ -336,7 +336,7 @@ func TestLocalRegistry(t *testing.T) {
})
Convey("trigger metaDB error on index manifest in CommitImage()", func() {
- registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, mocks.MetaDBMock{
+ registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, mocks.MetaDBMock{
SetRepoReferenceFn: func(ctx context.Context, repo string, reference string, imageMeta mTypes.ImageMeta) error {
if reference == "1.0" {
return zerr.ErrRepoMetaNotFound
@@ -351,7 +351,7 @@ func TestLocalRegistry(t *testing.T) {
})
Convey("trigger metaDB error on image manifest in CommitImage()", func() {
- registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, mocks.MetaDBMock{
+ registry := NewDestinationRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, mocks.MetaDBMock{
SetRepoReferenceFn: func(ctx context.Context, repo, reference string, imageMeta mTypes.ImageMeta) error {
return zerr.ErrRepoMetaNotFound
},
diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go
index a388d9cd3f..57a845f73c 100644
--- a/pkg/storage/imagestore/imagestore.go
+++ b/pkg/storage/imagestore/imagestore.go
@@ -392,6 +392,14 @@ func (is *ImageStore) GetNextRepository(repo string) (string, error) {
driverErr := &driver.Error{}
+ // some s3 implementations (eg, digitalocean spaces) will return pathnotfounderror for walk but not list
+ // therefore, we must also catch that error here.
+ if errors.As(err, &driver.PathNotFoundError{}) {
+ is.log.Debug().Msg("empty rootDir")
+
+ return "", nil
+ }
+
if errors.Is(err, io.EOF) ||
(errors.As(err, driverErr) && errors.Is(driverErr.Enclosed, io.EOF)) {
return store, nil
diff --git a/test/blackbox/sync_remote.bats b/test/blackbox/sync_remote.bats
new file mode 100644
index 0000000000..6a1ee20261
--- /dev/null
+++ b/test/blackbox/sync_remote.bats
@@ -0,0 +1,534 @@
+# Note: Intended to be run as "make run-blackbox-tests" or "make run-blackbox-ci"
+# Makefile target installs & checks all necessary tooling
+# Extra tools that are not covered in Makefile target needs to be added in verify_prerequisites()
+
+load helpers_zot
+load helpers_wait
+load helpers_cloud
+
+
+function verify_prerequisites() {
+ if [ ! $(command -v curl) ]; then
+ echo "you need to install curl as a prerequisite to running the tests" >&3
+ return 1
+ fi
+
+ if [ ! $(command -v jq) ]; then
+ echo "you need to install jq as a prerequisite to running the tests" >&3
+ return 1
+ fi
+
+ return 0
+}
+
+function setup_file() {
+ export COSIGN_PASSWORD=""
+ export COSIGN_OCI_EXPERIMENTAL=1
+ export COSIGN_EXPERIMENTAL=1
+
+ # Verify prerequisites are available
+ if ! $(verify_prerequisites); then
+ exit 1
+ fi
+
+ # Download test data to folder common for the entire suite, not just this file
+ skopeo --insecure-policy copy --format=oci docker://ghcr.io/project-zot/golang:1.20 oci:${TEST_DATA_DIR}/golang:1.20
+
+
+ # Setup zot server
+ local zot_sync_per_root_dir=${BATS_FILE_TMPDIR}/zot-per
+ local zot_sync_ondemand_root_dir=${BATS_FILE_TMPDIR}/zot-ondemand
+
+ local zot_sync_per_config_file=${BATS_FILE_TMPDIR}/zot_sync_per_config.json
+ local zot_sync_ondemand_config_file=${BATS_FILE_TMPDIR}/zot_sync_ondemand_config.json
+
+ local zot_minimal_root_dir=${BATS_FILE_TMPDIR}/zot-minimal
+ local zot_minimal_config_file=${BATS_FILE_TMPDIR}/zot_minimal_config.json
+
+ local oci_data_dir=${BATS_FILE_TMPDIR}/oci
+ mkdir -p ${zot_sync_per_root_dir}
+ mkdir -p ${zot_sync_ondemand_root_dir}
+ mkdir -p ${zot_minimal_root_dir}
+ mkdir -p ${oci_data_dir}
+
+ cat >${zot_sync_per_config_file} <${zot_sync_ondemand_config_file} <${zot_minimal_config_file} <${trust_policy_file} < config.json
+echo "hello world" > artifact.txt
+run oras push --plain-http 127.0.0.1:9000/hello-artifact:v2 \
+ --config config.json:application/vnd.acme.rocket.config.v1+json artifact.txt:text/plain -d -v
+ [ "$status" -eq 0 ]
+ rm -f artifact.txt
+ rm -f config.json
+}
+
+@test "sync oras artifact periodically" {
+# # wait for oras artifact to be copied
+run sleep 15s
+run oras pull --plain-http 127.0.0.1:8081/hello-artifact:v2 -d -v
+[ "$status" -eq 0 ]
+grep -q "hello world" artifact.txt
+rm -f artifact.txt
+}
+
+@test "sync oras artifact on demand" {
+run oras pull --plain-http 127.0.0.1:8082/hello-artifact:v2 -d -v
+[ "$status" -eq 0 ]
+grep -q "hello world" artifact.txt
+rm -f artifact.txt
+}
+
+# sync helm chart
+@test "push helm chart" {
+run helm package ${BATS_FILE_TMPDIR}/helm-charts/charts/zot -d ${BATS_FILE_TMPDIR}
+[ "$status" -eq 0 ]
+local chart_version=$(awk '/version/{printf $2}' ${BATS_FILE_TMPDIR}/helm-charts/charts/zot/Chart.yaml)
+run helm push ${BATS_FILE_TMPDIR}/zot-${chart_version}.tgz oci://localhost:9000/zot-chart
+[ "$status" -eq 0 ]
+}
+
+@test "sync helm chart periodically" {
+# wait for helm chart to be copied
+run sleep 15s
+
+local chart_version=$(awk '/version/{printf $2}' ${BATS_FILE_TMPDIR}/helm-charts/charts/zot/Chart.yaml)
+run helm pull oci://localhost:8081/zot-chart/zot --version ${chart_version} -d ${BATS_FILE_TMPDIR}
+[ "$status" -eq 0 ]
+}
+
+@test "sync helm chart on demand" {
+local chart_version=$(awk '/version/{printf $2}' ${BATS_FILE_TMPDIR}/helm-charts/charts/zot/Chart.yaml)
+run helm pull oci://localhost:8082/zot-chart/zot --version ${chart_version} -d ${BATS_FILE_TMPDIR}
+[ "$status" -eq 0 ]
+}
+
+# sync OCI artifacts
+@test "push OCI artifact (oci image mediatype) with regclient" {
+run regctl registry set localhost:9000 --tls disabled
+run regctl registry set localhost:8081 --tls disabled
+run regctl registry set localhost:8082 --tls disabled
+
+run regctl artifact put localhost:9000/artifact:demo <