Skip to content

Commit

Permalink
fix(dedupe): run dedupe only for repositories found at startup (#1844)
Browse files Browse the repository at this point in the history
no need to run dedupe/restore blobs for images being pushed or synced while
running dedupe task, they are already deduped/restored inline.

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk authored Sep 27, 2023
1 parent 92e382c commit c3801dc
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 99 deletions.
26 changes: 25 additions & 1 deletion pkg/storage/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,14 +869,37 @@ type DedupeTaskGenerator struct {
and generating a task for each unprocessed one*/
lastDigests []godigest.Digest
done bool
repos []string // list of repos on which we run dedupe
Log zlog.Logger
}

func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
var err error

/* at first run get from storage currently found repositories so that we skip the ones that gets synced/uploaded
while this generator runs, there are deduped/restored inline, no need to run dedupe/restore again */
if len(gen.repos) == 0 {
gen.repos, err = gen.ImgStore.GetRepositories()
if err != nil {
//nolint: dupword
gen.Log.Error().Err(err).Msg("dedupe rebuild: unable to to get list of repositories")

return nil, err
}

// if still no repos
if len(gen.repos) == 0 {
gen.Log.Info().Msg("dedupe rebuild: no repositories found in storage, finished.")

// no repositories in storage, no need to continue
gen.done = true

return nil, nil
}
}

// get all blobs from storage.imageStore and group them by digest
gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.lastDigests)
gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
if err != nil {
gen.Log.Error().Err(err).Msg("dedupe rebuild: failed to get next digest")

Expand Down Expand Up @@ -910,6 +933,7 @@ func (gen *DedupeTaskGenerator) IsReady() bool {
func (gen *DedupeTaskGenerator) Reset() {
gen.lastDigests = []godigest.Digest{}
gen.duplicateBlobs = []string{}
gen.repos = []string{}
gen.digest = ""
gen.done = false
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (gc GarbageCollect) cleanReferrer(repo string, index *ispec.Index, manifest
}

if !referenced {
gced, err = gc.gcManifest(repo, index, manifestDesc, signatureType, subject.Digest)
gced, err = gc.gcManifest(repo, index, manifestDesc, signatureType, subject.Digest, gc.opts.Delay)
if err != nil {
return false, err
}
Expand All @@ -275,7 +275,7 @@ func (gc GarbageCollect) cleanReferrer(repo string, index *ispec.Index, manifest
referenced := isManifestReferencedInIndex(index, subjectDigest)

if !referenced {
gced, err = gc.gcManifest(repo, index, manifestDesc, storage.CosignType, subjectDigest)
gced, err = gc.gcManifest(repo, index, manifestDesc, storage.CosignType, subjectDigest, gc.opts.Delay)
if err != nil {
return false, err
}
Expand All @@ -288,11 +288,11 @@ func (gc GarbageCollect) cleanReferrer(repo string, index *ispec.Index, manifest

// gcManifest removes a manifest entry from an index and syncs metaDB accordingly if the blob is older than gc.Delay.
func (gc GarbageCollect) gcManifest(repo string, index *ispec.Index, desc ispec.Descriptor,
signatureType string, subjectDigest godigest.Digest,
signatureType string, subjectDigest godigest.Digest, delay time.Duration,
) (bool, error) {
var gced bool

canGC, err := isBlobOlderThan(gc.imgStore, repo, desc.Digest, gc.opts.Delay, gc.log)
canGC, err := isBlobOlderThan(gc.imgStore, repo, desc.Digest, delay, gc.log)
if err != nil {
gc.log.Error().Err(err).Str("repository", repo).Str("digest", desc.Digest.String()).
Str("delay", gc.opts.Delay.String()).Msg("gc: failed to check if blob is older than delay")
Expand Down Expand Up @@ -370,7 +370,7 @@ func (gc GarbageCollect) cleanUntaggedManifests(repo string, index *ispec.Index,
if desc.MediaType == ispec.MediaTypeImageManifest || desc.MediaType == ispec.MediaTypeImageIndex {
_, ok := desc.Annotations[ispec.AnnotationRefName]
if !ok {
gced, err = gc.gcManifest(repo, index, desc, "", "")
gced, err = gc.gcManifest(repo, index, desc, "", "", gc.opts.RetentionDelay)
if err != nil {
return false, err
}
Expand Down
21 changes: 17 additions & 4 deletions pkg/storage/imagestore/imagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ func (is *ImageStore) GetRepositories() ([]string, error) {
return nil
}

// skip .sync and .uploads dirs no need to try to validate them
if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) ||
strings.HasSuffix(fileInfo.Path(), storageConstants.BlobUploadDir) {
return driver.ErrSkipDir
}

rel, err := filepath.Rel(is.rootDir, fileInfo.Path())
if err != nil {
return nil //nolint:nilerr // ignore paths that are not under root dir
Expand Down Expand Up @@ -1647,7 +1653,8 @@ func (is *ImageStore) GetAllBlobs(repo string) ([]string, error) {
return ret, nil
}

func (is *ImageStore) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) (godigest.Digest, []string, error) {
func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest,
) (godigest.Digest, []string, error) {
var lockLatency time.Time

dir := is.rootDir
Expand All @@ -1660,13 +1667,19 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest)
var digest godigest.Digest

err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error {
// skip blobs under .sync
if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) {
// skip blobs under .sync and .uploads
if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) ||
strings.HasSuffix(fileInfo.Path(), storageConstants.BlobUploadDir) {
return driver.ErrSkipDir
}

if fileInfo.IsDir() {
return nil
// skip repositories not found in repos
repo := path.Base(fileInfo.Path())

if !zcommon.Contains(repos, repo) && repo != "blobs" && repo != "sha256" {
return driver.ErrSkipDir
}
}

blobDigest := godigest.NewDigestFromEncoded("sha256", path.Base(fileInfo.Path()))
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,17 @@ func TestDedupeLinks(t *testing.T) {
imgStore = local.NewImageStore(dir, testCase.dedupe, true, log, metrics, nil, nil)
}

// run on empty image store
// switch dedupe to true from false
taskScheduler, cancel := runAndGetScheduler()

// rebuild with dedupe true
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
// wait until rebuild finishes
time.Sleep(1 * time.Second)

cancel()

// manifest1
upload, err := imgStore.NewBlobUpload("dedupe1")
So(err, ShouldBeNil)
Expand Down
131 changes: 46 additions & 85 deletions pkg/storage/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2289,14 +2289,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("Trigger GetContent error in restoreDedupedBlobs()", t, func() {
Expand Down Expand Up @@ -2341,14 +2338,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("Trigger GetContent error in restoreDedupedBlobs()", t, func() {
Expand Down Expand Up @@ -2393,14 +2387,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("Trigger Stat() error in restoreDedupedBlobs()", t, func() {
Expand Down Expand Up @@ -2442,19 +2433,13 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs)
So(err, ShouldNotBeNil)

Convey("Trigger Stat() error in dedupeBlobs()", func() {
taskScheduler, cancel := runAndGetScheduler()
defer cancel()

imgStore := createMockStorage(testDir, t.TempDir(), true, &StorageDriverMock{
StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) {
Expand Down Expand Up @@ -2493,11 +2478,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(500 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs)
So(err, ShouldNotBeNil)
})
})

Expand Down Expand Up @@ -2544,14 +2529,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})

//nolint: dupl
Expand Down Expand Up @@ -2595,14 +2577,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})

//nolint: dupl
Expand Down Expand Up @@ -2646,14 +2625,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("Trigger getNextDigestWithBlobPaths err", t, func() {
Expand All @@ -2664,14 +2640,8 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
_, _, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldNotBeNil)
})

Convey("Trigger cache errors", t, func() {
Expand Down Expand Up @@ -2762,14 +2732,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("on dedupe blob", func() {
Expand All @@ -2787,14 +2754,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("on else branch", func() {
Expand All @@ -2808,14 +2772,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type ImageStore interface { //nolint:interfacebloat
GetOrasReferrers(repo string, digest godigest.Digest, artifactType string) ([]artifactspec.Descriptor, error)
RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler)
RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error
GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) (godigest.Digest, []string, error)
GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error)
GetAllBlobs(repo string) ([]string, error)
}

Expand Down
Loading

0 comments on commit c3801dc

Please sign in to comment.