From 41e9702f5ba8646bde323ff99efe47ba2e9ca618 Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Thu, 21 Sep 2023 14:47:19 +0300 Subject: [PATCH] fix(dedupe): run dedupe only for repositories found at startup no need to run dedupe/restore blobs for images being pushed or synced while running dedupe task, they are already deduped/restored inline. Signed-off-by: Petu Eusebiu --- pkg/storage/common/common.go | 26 +++++- pkg/storage/gc/gc.go | 10 +- pkg/storage/imagestore/imagestore.go | 21 ++++- pkg/storage/local/local_test.go | 11 +++ pkg/storage/s3/s3_test.go | 131 ++++++++++----------------- pkg/storage/types/types.go | 2 +- pkg/test/mocks/image_store_mock.go | 6 +- 7 files changed, 108 insertions(+), 99 deletions(-) diff --git a/pkg/storage/common/common.go b/pkg/storage/common/common.go index c97c92c2d..533c078f5 100644 --- a/pkg/storage/common/common.go +++ b/pkg/storage/common/common.go @@ -869,14 +869,37 @@ type DedupeTaskGenerator struct { and generating a task for each unprocessed one*/ lastDigests []godigest.Digest done bool + repos []string // list of repos on which we run dedupe Log zlog.Logger } func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { var err error + /* at first run get from storage currently found repositories so that we skip the ones that gets synced/uploaded + while this generator runs, there are deduped/restored inline, no need to run dedupe/restore again */ + if len(gen.repos) == 0 { + gen.repos, err = gen.ImgStore.GetRepositories() + if err != nil { + //nolint: dupword + gen.Log.Error().Err(err).Msg("dedupe rebuild: unable to to get list of repositories") + + return nil, err + } + + // if still no repos + if len(gen.repos) == 0 { + gen.Log.Info().Msg("dedupe rebuild: no repositories found in storage, finished.") + + // no repositories in storage, no need to continue + gen.done = true + + return nil, nil + } + } + // get all blobs from storage.imageStore and group them by digest - gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.lastDigests) + gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests) if err != nil { gen.Log.Error().Err(err).Msg("dedupe rebuild: failed to get next digest") @@ -910,6 +933,7 @@ func (gen *DedupeTaskGenerator) IsReady() bool { func (gen *DedupeTaskGenerator) Reset() { gen.lastDigests = []godigest.Digest{} gen.duplicateBlobs = []string{} + gen.repos = []string{} gen.digest = "" gen.done = false } diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go index 641793484..220bfd689 100644 --- a/pkg/storage/gc/gc.go +++ b/pkg/storage/gc/gc.go @@ -259,7 +259,7 @@ func (gc GarbageCollect) cleanReferrer(repo string, index *ispec.Index, manifest } if !referenced { - gced, err = gc.gcManifest(repo, index, manifestDesc, signatureType, subject.Digest) + gced, err = gc.gcManifest(repo, index, manifestDesc, signatureType, subject.Digest, gc.opts.Delay) if err != nil { return false, err } @@ -275,7 +275,7 @@ func (gc GarbageCollect) cleanReferrer(repo string, index *ispec.Index, manifest referenced := isManifestReferencedInIndex(index, subjectDigest) if !referenced { - gced, err = gc.gcManifest(repo, index, manifestDesc, storage.CosignType, subjectDigest) + gced, err = gc.gcManifest(repo, index, manifestDesc, storage.CosignType, subjectDigest, gc.opts.Delay) if err != nil { return false, err } @@ -288,11 +288,11 @@ func (gc GarbageCollect) cleanReferrer(repo string, index *ispec.Index, manifest // gcManifest removes a manifest entry from an index and syncs metaDB accordingly if the blob is older than gc.Delay. func (gc GarbageCollect) gcManifest(repo string, index *ispec.Index, desc ispec.Descriptor, - signatureType string, subjectDigest godigest.Digest, + signatureType string, subjectDigest godigest.Digest, delay time.Duration, ) (bool, error) { var gced bool - canGC, err := isBlobOlderThan(gc.imgStore, repo, desc.Digest, gc.opts.Delay, gc.log) + canGC, err := isBlobOlderThan(gc.imgStore, repo, desc.Digest, delay, gc.log) if err != nil { gc.log.Error().Err(err).Str("repository", repo).Str("digest", desc.Digest.String()). Str("delay", gc.opts.Delay.String()).Msg("gc: failed to check if blob is older than delay") @@ -370,7 +370,7 @@ func (gc GarbageCollect) cleanUntaggedManifests(repo string, index *ispec.Index, if desc.MediaType == ispec.MediaTypeImageManifest || desc.MediaType == ispec.MediaTypeImageIndex { _, ok := desc.Annotations[ispec.AnnotationRefName] if !ok { - gced, err = gc.gcManifest(repo, index, desc, "", "") + gced, err = gc.gcManifest(repo, index, desc, "", "", gc.opts.RetentionDelay) if err != nil { return false, err } diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index 3c52500b5..a65762c10 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -299,6 +299,12 @@ func (is *ImageStore) GetRepositories() ([]string, error) { return nil } + // skip .sync and .uploads dirs no need to try to validate them + if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) || + strings.HasSuffix(fileInfo.Path(), storageConstants.BlobUploadDir) { + return driver.ErrSkipDir + } + rel, err := filepath.Rel(is.rootDir, fileInfo.Path()) if err != nil { return nil //nolint:nilerr // ignore paths that are not under root dir @@ -1647,7 +1653,8 @@ func (is *ImageStore) GetAllBlobs(repo string) ([]string, error) { return ret, nil } -func (is *ImageStore) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) (godigest.Digest, []string, error) { +func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest, +) (godigest.Digest, []string, error) { var lockLatency time.Time dir := is.rootDir @@ -1660,13 +1667,19 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) var digest godigest.Digest err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error { - // skip blobs under .sync - if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) { + // skip blobs under .sync and .uploads + if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) || + strings.HasSuffix(fileInfo.Path(), storageConstants.BlobUploadDir) { return driver.ErrSkipDir } if fileInfo.IsDir() { - return nil + // skip repositories not found in repos + repo := path.Base(fileInfo.Path()) + + if !zcommon.Contains(repos, repo) && repo != "blobs" && repo != "sha256" { + return driver.ErrSkipDir + } } blobDigest := godigest.NewDigestFromEncoded("sha256", path.Base(fileInfo.Path())) diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 6b806ffbf..93d7d9a94 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -1176,6 +1176,17 @@ func TestDedupeLinks(t *testing.T) { imgStore = local.NewImageStore(dir, testCase.dedupe, true, log, metrics, nil, nil) } + // run on empty image store + // switch dedupe to true from false + taskScheduler, cancel := runAndGetScheduler() + + // rebuild with dedupe true + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + // wait until rebuild finishes + time.Sleep(1 * time.Second) + + cancel() + // manifest1 upload, err := imgStore.NewBlobUpload("dedupe1") So(err, ShouldBeNil) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index 607284ef5..33af4cae3 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -2289,14 +2289,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs) + So(err, ShouldNotBeNil) }) Convey("Trigger GetContent error in restoreDedupedBlobs()", t, func() { @@ -2341,14 +2338,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs) + So(err, ShouldNotBeNil) }) Convey("Trigger GetContent error in restoreDedupedBlobs()", t, func() { @@ -2393,14 +2387,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs) + So(err, ShouldNotBeNil) }) Convey("Trigger Stat() error in restoreDedupedBlobs()", t, func() { @@ -2442,19 +2433,13 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs) + So(err, ShouldNotBeNil) Convey("Trigger Stat() error in dedupeBlobs()", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - imgStore := createMockStorage(testDir, t.TempDir(), true, &StorageDriverMock{ StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) { @@ -2493,11 +2478,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(500 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs) + So(err, ShouldNotBeNil) }) }) @@ -2544,14 +2529,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs) + So(err, ShouldNotBeNil) }) //nolint: dupl @@ -2595,14 +2577,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs) + So(err, ShouldNotBeNil) }) //nolint: dupl @@ -2646,14 +2625,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs) + So(err, ShouldNotBeNil) }) Convey("Trigger getNextDigestWithBlobPaths err", t, func() { @@ -2664,14 +2640,8 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) - - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + _, _, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldNotBeNil) }) Convey("Trigger cache errors", t, func() { @@ -2762,14 +2732,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs) + So(err, ShouldNotBeNil) }) Convey("on dedupe blob", func() { @@ -2787,14 +2754,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs) + So(err, ShouldNotBeNil) }) Convey("on else branch", func() { @@ -2808,14 +2772,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() - - // rebuild with dedupe false, should have all blobs with content - imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + So(err, ShouldBeNil) - // wait until rebuild finishes - time.Sleep(200 * time.Millisecond) + err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs) + So(err, ShouldNotBeNil) }) }) } diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index 75efddda3..c07367bdc 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -52,7 +52,7 @@ type ImageStore interface { //nolint:interfacebloat GetOrasReferrers(repo string, digest godigest.Digest, artifactType string) ([]artifactspec.Descriptor, error) RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error - GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) (godigest.Digest, []string, error) + GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error) GetAllBlobs(repo string) ([]string, error) } diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index f591f6eaf..2252c07a9 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -50,7 +50,7 @@ type MockedImageStore struct { RunGCPeriodicallyFn func(interval time.Duration, sch *scheduler.Scheduler) RunDedupeBlobsFn func(interval time.Duration, sch *scheduler.Scheduler) RunDedupeForDigestFn func(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error - GetNextDigestWithBlobPathsFn func(lastDigests []godigest.Digest) (godigest.Digest, []string, error) + GetNextDigestWithBlobPathsFn func(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error) GetAllBlobsFn func(repo string) ([]string, error) CleanupRepoFn func(repo string, blobs []godigest.Digest, removeRepo bool) (int, error) PutIndexContentFn func(repo string, index ispec.Index) error @@ -372,10 +372,10 @@ func (is MockedImageStore) RunDedupeForDigest(digest godigest.Digest, dedupe boo return nil } -func (is MockedImageStore) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest, +func (is MockedImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest, ) (godigest.Digest, []string, error) { if is.GetNextDigestWithBlobPathsFn != nil { - return is.GetNextDigestWithBlobPathsFn(lastDigests) + return is.GetNextDigestWithBlobPathsFn(repos, lastDigests) } return "", []string{}, nil