Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dedupe): run dedupe only for repositories found at startup #1844

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion pkg/storage/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,14 +869,37 @@ type DedupeTaskGenerator struct {
and generating a task for each unprocessed one*/
lastDigests []godigest.Digest
done bool
repos []string // list of repos on which we run dedupe
Log zlog.Logger
}

func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
var err error

/* at first run get from storage currently found repositories so that we skip the ones that gets synced/uploaded
while this generator runs, there are deduped/restored inline, no need to run dedupe/restore again */
if len(gen.repos) == 0 {
gen.repos, err = gen.ImgStore.GetRepositories()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list could be very large potentially

if err != nil {
//nolint: dupword
gen.Log.Error().Err(err).Msg("dedupe rebuild: unable to to get list of repositories")

return nil, err
}

// if still no repos
if len(gen.repos) == 0 {
gen.Log.Info().Msg("dedupe rebuild: no repositories found in storage, finished.")

// no repositories in storage, no need to continue
gen.done = true

return nil, nil
}
}

// get all blobs from storage.imageStore and group them by digest
gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.lastDigests)
gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
if err != nil {
gen.Log.Error().Err(err).Msg("dedupe rebuild: failed to get next digest")

Expand Down Expand Up @@ -910,6 +933,7 @@ func (gen *DedupeTaskGenerator) IsReady() bool {
func (gen *DedupeTaskGenerator) Reset() {
gen.lastDigests = []godigest.Digest{}
gen.duplicateBlobs = []string{}
gen.repos = []string{}
gen.digest = ""
gen.done = false
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (gc GarbageCollect) cleanReferrer(repo string, index *ispec.Index, manifest
}

if !referenced {
gced, err = gc.gcManifest(repo, index, manifestDesc, signatureType, subject.Digest)
gced, err = gc.gcManifest(repo, index, manifestDesc, signatureType, subject.Digest, gc.opts.Delay)
if err != nil {
return false, err
}
Expand All @@ -275,7 +275,7 @@ func (gc GarbageCollect) cleanReferrer(repo string, index *ispec.Index, manifest
referenced := isManifestReferencedInIndex(index, subjectDigest)

if !referenced {
gced, err = gc.gcManifest(repo, index, manifestDesc, storage.CosignType, subjectDigest)
gced, err = gc.gcManifest(repo, index, manifestDesc, storage.CosignType, subjectDigest, gc.opts.Delay)
if err != nil {
return false, err
}
Expand All @@ -288,11 +288,11 @@ func (gc GarbageCollect) cleanReferrer(repo string, index *ispec.Index, manifest

// gcManifest removes a manifest entry from an index and syncs metaDB accordingly if the blob is older than gc.Delay.
func (gc GarbageCollect) gcManifest(repo string, index *ispec.Index, desc ispec.Descriptor,
signatureType string, subjectDigest godigest.Digest,
signatureType string, subjectDigest godigest.Digest, delay time.Duration,
) (bool, error) {
var gced bool

canGC, err := isBlobOlderThan(gc.imgStore, repo, desc.Digest, gc.opts.Delay, gc.log)
canGC, err := isBlobOlderThan(gc.imgStore, repo, desc.Digest, delay, gc.log)
if err != nil {
gc.log.Error().Err(err).Str("repository", repo).Str("digest", desc.Digest.String()).
Str("delay", gc.opts.Delay.String()).Msg("gc: failed to check if blob is older than delay")
Expand Down Expand Up @@ -370,7 +370,7 @@ func (gc GarbageCollect) cleanUntaggedManifests(repo string, index *ispec.Index,
if desc.MediaType == ispec.MediaTypeImageManifest || desc.MediaType == ispec.MediaTypeImageIndex {
_, ok := desc.Annotations[ispec.AnnotationRefName]
if !ok {
gced, err = gc.gcManifest(repo, index, desc, "", "")
gced, err = gc.gcManifest(repo, index, desc, "", "", gc.opts.RetentionDelay)
if err != nil {
return false, err
}
Expand Down
21 changes: 17 additions & 4 deletions pkg/storage/imagestore/imagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ func (is *ImageStore) GetRepositories() ([]string, error) {
return nil
}

// skip .sync and .uploads dirs no need to try to validate them
if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) ||
strings.HasSuffix(fileInfo.Path(), storageConstants.BlobUploadDir) {
return driver.ErrSkipDir
}

rel, err := filepath.Rel(is.rootDir, fileInfo.Path())
if err != nil {
return nil //nolint:nilerr // ignore paths that are not under root dir
Expand Down Expand Up @@ -1647,7 +1653,8 @@ func (is *ImageStore) GetAllBlobs(repo string) ([]string, error) {
return ret, nil
}

func (is *ImageStore) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) (godigest.Digest, []string, error) {
func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest,
) (godigest.Digest, []string, error) {
var lockLatency time.Time

dir := is.rootDir
Expand All @@ -1660,13 +1667,19 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest)
var digest godigest.Digest

err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error {
// skip blobs under .sync
if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) {
// skip blobs under .sync and .uploads
if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) ||
strings.HasSuffix(fileInfo.Path(), storageConstants.BlobUploadDir) {
return driver.ErrSkipDir
}

if fileInfo.IsDir() {
return nil
// skip repositories not found in repos
repo := path.Base(fileInfo.Path())

if !zcommon.Contains(repos, repo) && repo != "blobs" && repo != "sha256" {
return driver.ErrSkipDir
}
}

blobDigest := godigest.NewDigestFromEncoded("sha256", path.Base(fileInfo.Path()))
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,17 @@ func TestDedupeLinks(t *testing.T) {
imgStore = local.NewImageStore(dir, testCase.dedupe, true, log, metrics, nil, nil)
}

// run on empty image store
// switch dedupe to true from false
taskScheduler, cancel := runAndGetScheduler()

// rebuild with dedupe true
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
// wait until rebuild finishes
time.Sleep(1 * time.Second)

cancel()

// manifest1
upload, err := imgStore.NewBlobUpload("dedupe1")
So(err, ShouldBeNil)
Expand Down
131 changes: 46 additions & 85 deletions pkg/storage/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2289,14 +2289,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("Trigger GetContent error in restoreDedupedBlobs()", t, func() {
Expand Down Expand Up @@ -2341,14 +2338,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("Trigger GetContent error in restoreDedupedBlobs()", t, func() {
Expand Down Expand Up @@ -2393,14 +2387,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("Trigger Stat() error in restoreDedupedBlobs()", t, func() {
Expand Down Expand Up @@ -2442,19 +2433,13 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs)
So(err, ShouldNotBeNil)

Convey("Trigger Stat() error in dedupeBlobs()", func() {
taskScheduler, cancel := runAndGetScheduler()
defer cancel()

imgStore := createMockStorage(testDir, t.TempDir(), true, &StorageDriverMock{
StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) {
if path == fmt.Sprintf("path/to/%s", validDigest.Encoded()) {
Expand Down Expand Up @@ -2493,11 +2478,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(500 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, false, duplicateBlobs)
So(err, ShouldNotBeNil)
})
})

Expand Down Expand Up @@ -2544,14 +2529,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})

//nolint: dupl
Expand Down Expand Up @@ -2595,14 +2577,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})

//nolint: dupl
Expand Down Expand Up @@ -2646,14 +2625,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("Trigger getNextDigestWithBlobPaths err", t, func() {
Expand All @@ -2664,14 +2640,8 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
_, _, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldNotBeNil)
})

Convey("Trigger cache errors", t, func() {
Expand Down Expand Up @@ -2762,14 +2732,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("on dedupe blob", func() {
Expand All @@ -2787,14 +2754,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})

Convey("on else branch", func() {
Expand All @@ -2808,14 +2772,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) {
},
})

taskScheduler, cancel := runAndGetScheduler()
defer cancel()

// rebuild with dedupe false, should have all blobs with content
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{})
So(err, ShouldBeNil)

// wait until rebuild finishes
time.Sleep(200 * time.Millisecond)
err = imgStore.RunDedupeForDigest(digest, true, duplicateBlobs)
So(err, ShouldNotBeNil)
})
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type ImageStore interface { //nolint:interfacebloat
GetOrasReferrers(repo string, digest godigest.Digest, artifactType string) ([]artifactspec.Descriptor, error)
RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler)
RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error
GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) (godigest.Digest, []string, error)
GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error)
GetAllBlobs(repo string) ([]string, error)
}

Expand Down
Loading