diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index a65762c10b..3f90b93fd0 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -862,20 +862,11 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig return err } - fileReader, err := is.storeDriver.Reader(src, 0) - if err != nil { - is.log.Error().Err(err).Str("blob", src).Msg("failed to open file") - - return zerr.ErrUploadNotFound - } - - defer fileReader.Close() - - srcDigest, err := godigest.FromReader(fileReader) + srcDigest, err := getBlobDigest(is, src) if err != nil { is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") - return zerr.ErrBadBlobDigest + return err } if srcDigest != dstDigest { @@ -902,7 +893,7 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig defer is.Unlock(&lockLatency) if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) { - err = is.DedupeBlob(src, dstDigest, dst) + err = is.DedupeBlob(src, dstDigest, repo, dst) if err := inject.Error(err); err != nil { is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). Str("dst", dst).Msg("unable to dedupe blob") @@ -981,7 +972,7 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi dst := is.BlobPath(repo, dstDigest) if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) { - if err := is.DedupeBlob(src, dstDigest, dst); err != nil { + if err := is.DedupeBlob(src, dstDigest, repo, dst); err != nil { is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). Str("dst", dst).Msg("unable to dedupe blob") @@ -999,7 +990,7 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi return uuid, int64(nbytes), nil } -func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error { +func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo string, dst string) error { retry: is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: enter") @@ -1029,12 +1020,11 @@ retry: } else { // cache record exists, but due to GC and upgrades from older versions, // disk content and cache records may go out of sync - if is.cache.UsesRelativePaths() { dstRecord = path.Join(is.rootDir, dstRecord) } - _, err := is.storeDriver.Stat(dstRecord) + blobInfo, err := is.storeDriver.Stat(dstRecord) if err != nil { is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat") // the actual blob on disk may have been removed by GC, so sync the cache @@ -1062,6 +1052,21 @@ retry: return err } + } else { + // if it's same file then it was already uploaded, check if blob is corrupted + if size, err := getBlobSizeFromRepoManifests(is, dstRepo, dstDigest, is.log); err == nil { + if size != blobInfo.Size() { + if err := is.storeDriver.Move(src, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob") + + return err + } + + is.log.Debug().Str("src", src).Msg("dedupe: remove") + + return nil + } + } } // remove temp blobupload @@ -1130,9 +1135,20 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6 binfo, err := is.storeDriver.Stat(blobPath) if err == nil && binfo.Size() > 0 { - is.log.Debug().Str("blob path", blobPath).Msg("blob path found") + // try to find blob size in blob descriptors, if blob can not be found + size, err := getBlobSizeFromRepoManifests(is, repo, digest, is.log) + if err != nil || size == binfo.Size() { + // blob not found in descriptors, can not compare, just return + is.log.Debug().Str("blob path", blobPath).Msg("blob path found") + + return true, binfo.Size(), nil //nolint: nilerr + } + + if size != binfo.Size() { + is.log.Debug().Str("blob path", blobPath).Msg("blob path found, but it's corrupted") - return true, binfo.Size(), nil + return false, -1, zerr.ErrBlobNotFound + } } // otherwise is a 'deduped' blob (empty file) @@ -1630,6 +1646,74 @@ func (is *ImageStore) deleteBlob(repo string, digest godigest.Digest) error { return nil } +// Get blob size from it's manifest contents, if blob can not be found (manifest not uploaded yet) it will return -1. +func getBlobSizeFromRepoManifests(imgStore *ImageStore, repo string, blobDigest godigest.Digest, log zlog.Logger, +) (int64, error) { + index, err := common.GetIndex(imgStore, repo, log) + if err != nil { + return -1, err + } + + for _, desc := range index.Manifests { + switch desc.MediaType { + case ispec.MediaTypeImageManifest: + if size, _ := getBlobSizeFromManifest(imgStore, repo, blobDigest, desc, log); size > -1 { + return size, nil + } + case ispec.MediaTypeImageIndex: + indexImage, err := common.GetImageIndex(imgStore, repo, desc.Digest, log) + if err != nil { + return -1, err + } + + for _, indexManifestDesc := range indexImage.Manifests { + if size, _ := getBlobSizeFromManifest(imgStore, repo, blobDigest, indexManifestDesc, log); size > -1 { + return size, nil + } + } + } + } + + return -1, zerr.ErrBlobNotFound +} + +func getBlobSizeFromManifest(imgStore *ImageStore, repo string, blobDigest godigest.Digest, + desc ispec.Descriptor, log zlog.Logger, +) (int64, error) { + manifest, err := common.GetImageManifest(imgStore, repo, desc.Digest, log) + if err != nil { + return -1, err + } + + if manifest.Config.Digest == blobDigest { + return manifest.Config.Size, nil + } + + for _, layer := range manifest.Layers { + if layer.Digest == blobDigest { + return layer.Size, nil + } + } + + return -1, nil +} + +func getBlobDigest(imgStore *ImageStore, path string) (godigest.Digest, error) { + fileReader, err := imgStore.storeDriver.Reader(path, 0) + if err != nil { + return "", zerr.ErrUploadNotFound + } + + defer fileReader.Close() + + digest, err := godigest.FromReader(fileReader) + if err != nil { + return "", zerr.ErrBadBlobDigest + } + + return digest, nil +} + func (is *ImageStore) GetAllBlobs(repo string) ([]string, error) { dir := path.Join(is.rootDir, repo, "blobs", "sha256") diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 571475be31..79c6bc8ae9 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -818,7 +818,7 @@ func FuzzDedupeBlob(f *testing.F) { t.Error(err) } - err = imgStore.DedupeBlob(src, blobDigest, dst) + err = imgStore.DedupeBlob(src, blobDigest, "repoName", dst) if err != nil { t.Error(err) } @@ -1514,7 +1514,7 @@ func TestDedupe(t *testing.T) { Convey("Dedupe", t, func(c C) { Convey("Nil ImageStore", func() { var is storageTypes.ImageStore - So(func() { _ = is.DedupeBlob("", "", "") }, ShouldPanic) + So(func() { _ = is.DedupeBlob("", "", "", "") }, ShouldPanic) }) Convey("Valid ImageStore", func() { @@ -1530,7 +1530,7 @@ func TestDedupe(t *testing.T) { il := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) - So(il.DedupeBlob("", "", ""), ShouldNotBeNil) + So(il.DedupeBlob("", "", "", ""), ShouldNotBeNil) }) }) } diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index a96486dd10..cd6e05924e 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -3552,7 +3552,7 @@ func TestS3DedupeErr(t *testing.T) { digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest") // trigger unable to insert blob record - err := imgStore.DedupeBlob("", digest, "") + err := imgStore.DedupeBlob("", digest, "", "") So(err, ShouldNotBeNil) imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ @@ -3565,11 +3565,11 @@ func TestS3DedupeErr(t *testing.T) { }) // trigger unable to rename blob - err = imgStore.DedupeBlob("", digest, "dst") + err = imgStore.DedupeBlob("", digest, "", "dst") So(err, ShouldNotBeNil) // trigger retry - err = imgStore.DedupeBlob("", digest, "dst") + err = imgStore.DedupeBlob("", digest, "", "dst") So(err, ShouldNotBeNil) }) @@ -3586,11 +3586,11 @@ func TestS3DedupeErr(t *testing.T) { }) digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest") - err := imgStore.DedupeBlob("", digest, "dst") + err := imgStore.DedupeBlob("", digest, "", "dst") So(err, ShouldBeNil) // error will be triggered in driver.SameFile() - err = imgStore.DedupeBlob("", digest, "dst2") + err = imgStore.DedupeBlob("", digest, "", "dst2") So(err, ShouldBeNil) }) @@ -3606,10 +3606,10 @@ func TestS3DedupeErr(t *testing.T) { }) digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest") - err := imgStore.DedupeBlob("", digest, "dst") + err := imgStore.DedupeBlob("", digest, "", "dst") So(err, ShouldBeNil) - err = imgStore.DedupeBlob("", digest, "dst2") + err = imgStore.DedupeBlob("", digest, "", "dst2") So(err, ShouldNotBeNil) }) @@ -3622,10 +3622,10 @@ func TestS3DedupeErr(t *testing.T) { }) digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest") - err := imgStore.DedupeBlob("", digest, "dst") + err := imgStore.DedupeBlob("", digest, "", "dst") So(err, ShouldBeNil) - err = imgStore.DedupeBlob("", digest, "") + err = imgStore.DedupeBlob("", digest, "", "") So(err, ShouldNotBeNil) }) @@ -3641,10 +3641,10 @@ func TestS3DedupeErr(t *testing.T) { }) digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest") - err := imgStore.DedupeBlob("", digest, "dst") + err := imgStore.DedupeBlob("", digest, "", "dst") So(err, ShouldBeNil) - err = imgStore.DedupeBlob("", digest, "dst") + err = imgStore.DedupeBlob("", digest, "", "dst") So(err, ShouldNotBeNil) }) @@ -3665,7 +3665,7 @@ func TestS3DedupeErr(t *testing.T) { digest := godigest.NewDigestFromEncoded(godigest.SHA256, "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") - err := imgStore.DedupeBlob("repo", digest, "dst") + err := imgStore.DedupeBlob("repo", digest, "", "dst") So(err, ShouldBeNil) _, _, err = imgStore.CheckBlob("repo", digest) @@ -3686,7 +3686,7 @@ func TestS3DedupeErr(t *testing.T) { digest := godigest.NewDigestFromEncoded(godigest.SHA256, "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") - err := imgStore.DedupeBlob("repo", digest, "dst") + err := imgStore.DedupeBlob("repo", digest, "", "dst") So(err, ShouldBeNil) _, _, err = imgStore.CheckBlob("repo", digest) @@ -3704,7 +3704,7 @@ func TestS3DedupeErr(t *testing.T) { digest := godigest.NewDigestFromEncoded(godigest.SHA256, "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") - err := imgStore.DedupeBlob("repo", digest, "dst") + err := imgStore.DedupeBlob("repo", digest, "", "dst") So(err, ShouldBeNil) _, _, err = imgStore.CheckBlob("repo", digest) @@ -3719,10 +3719,10 @@ func TestS3DedupeErr(t *testing.T) { digest := godigest.NewDigestFromEncoded(godigest.SHA256, "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") - err := imgStore.DedupeBlob("/src/dst", digest, "/repo1/dst1") + err := imgStore.DedupeBlob("/src/dst", digest, "", "/repo1/dst1") So(err, ShouldBeNil) - err = imgStore.DedupeBlob("/src/dst", digest, "/repo2/dst2") + err = imgStore.DedupeBlob("/src/dst", digest, "", "/repo2/dst2") So(err, ShouldBeNil) // copy cache db to the new imagestore @@ -3767,10 +3767,10 @@ func TestS3DedupeErr(t *testing.T) { digest := godigest.NewDigestFromEncoded(godigest.SHA256, "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") - err := imgStore.DedupeBlob("/src/dst", digest, "/repo1/dst1") + err := imgStore.DedupeBlob("/src/dst", digest, "", "/repo1/dst1") So(err, ShouldBeNil) - err = imgStore.DedupeBlob("/src/dst", digest, "/repo2/dst2") + err = imgStore.DedupeBlob("/src/dst", digest, "", "/repo2/dst2") So(err, ShouldBeNil) // copy cache db to the new imagestore @@ -3871,7 +3871,7 @@ func TestS3DedupeErr(t *testing.T) { }, }) - err := imgStore.DedupeBlob("repo", digest, blobPath) + err := imgStore.DedupeBlob("repo", digest, "", blobPath) So(err, ShouldBeNil) _, _, err = imgStore.CheckBlob("repo2", digest) @@ -3922,11 +3922,11 @@ func TestInjectDedupe(t *testing.T) { return &FileInfoMock{}, errS3 }, }) - err := imgStore.DedupeBlob("blob", "digest", "newblob") + err := imgStore.DedupeBlob("blob", "digest", "", "newblob") So(err, ShouldBeNil) injected := inject.InjectFailure(0) - err = imgStore.DedupeBlob("blob", "digest", "newblob") + err = imgStore.DedupeBlob("blob", "digest", "", "newblob") if injected { So(err, ShouldNotBeNil) } else { @@ -3934,7 +3934,7 @@ func TestInjectDedupe(t *testing.T) { } injected = inject.InjectFailure(1) - err = imgStore.DedupeBlob("blob", "digest", "newblob") + err = imgStore.DedupeBlob("blob", "digest", "", "newblob") if injected { So(err, ShouldNotBeNil) } else { diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 023990677a..1c3fabec6b 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -1156,6 +1156,72 @@ func TestDeleteBlobsInUse(t *testing.T) { } } +func TestReuploadCorruptedBlob(t *testing.T) { + for _, testcase := range testCases { + testcase := testcase + t.Run(testcase.testCaseName, func(t *testing.T) { + var imgStore storageTypes.ImageStore + var testDir, tdir string + var store driver.StorageDriver + var driver storageTypes.Driver + + log := log.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + + if testcase.storageType == storageConstants.S3StorageDriverName { + tskip.SkipS3(t) + + uuid, err := guuid.NewV4() + if err != nil { + panic(err) + } + + testDir = path.Join("/oci-repo-test", uuid.String()) + tdir = t.TempDir() + + store, imgStore, _ = createObjectsStore(testDir, tdir) + driver = s3.New(store) + defer cleanupStorage(store, testDir) + } else { + tdir = t.TempDir() + cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{ + RootDir: tdir, + Name: "cache", + UseRelPaths: true, + }, log) + driver = local.New(true) + imgStore = imagestore.NewImageStore(tdir, tdir, true, + true, log, metrics, nil, driver, cacheDriver) + } + + Convey("CheckBlob() should return BlobNotFound on corrupted blobs", t, func() { + storeController := storage.StoreController{DefaultStore: imgStore} + + image := CreateRandomImage() + + err := WriteImageToFileSystem(image, repoName, tag, storeController) + So(err, ShouldBeNil) + + blobDigest := godigest.FromBytes(image.Layers[0]) + blobPath := imgStore.BlobPath(repoName, blobDigest) + + ok, size, err := imgStore.CheckBlob(repoName, blobDigest) + So(ok, ShouldBeTrue) + So(size, ShouldEqual, len(image.Layers[0])) + So(err, ShouldBeNil) + + _, err = driver.WriteFile(blobPath, []byte("corrupted")) + So(err, ShouldBeNil) + + ok, size, err = imgStore.CheckBlob(repoName, blobDigest) + So(ok, ShouldBeFalse) + So(size, ShouldNotEqual, len(image.Layers[0])) + So(err, ShouldEqual, zerr.ErrBlobNotFound) + }) + }) + } +} + func TestStorageHandler(t *testing.T) { for _, testcase := range testCases { testcase := testcase diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index 14e5569a8c..65f33cae1a 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -41,7 +41,7 @@ type ImageStore interface { //nolint:interfacebloat BlobUploadInfo(repo, uuid string) (int64, error) FinishBlobUpload(repo, uuid string, body io.Reader, digest godigest.Digest) error FullBlobUpload(repo string, body io.Reader, digest godigest.Digest) (string, int64, error) - DedupeBlob(src string, dstDigest godigest.Digest, dst string) error + DedupeBlob(src string, dstDigest godigest.Digest, dstRepo, dst string) error DeleteBlobUpload(repo, uuid string) error BlobPath(repo string, digest godigest.Digest) string CheckBlob(repo string, digest godigest.Digest) (bool, int64, error) diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index 2252c07a9e..4d998de358 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -31,7 +31,7 @@ type MockedImageStore struct { PutBlobChunkFn func(repo string, uuid string, from int64, to int64, body io.Reader) (int64, error) FinishBlobUploadFn func(repo string, uuid string, body io.Reader, digest godigest.Digest) error FullBlobUploadFn func(repo string, body io.Reader, digest godigest.Digest) (string, int64, error) - DedupeBlobFn func(src string, dstDigest godigest.Digest, dst string) error + DedupeBlobFn func(src string, dstDigest godigest.Digest, dstRepo, dst string) error DeleteBlobUploadFn func(repo string, uuid string) error BlobPathFn func(repo string, digest godigest.Digest) string CheckBlobFn func(repo string, digest godigest.Digest) (bool, int64, error) @@ -231,9 +231,9 @@ func (is MockedImageStore) FullBlobUpload(repo string, body io.Reader, digest go return "", 0, nil } -func (is MockedImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error { +func (is MockedImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo, dst string) error { if is.DedupeBlobFn != nil { - return is.DedupeBlobFn(src, dstDigest, dst) + return is.DedupeBlobFn(src, dstDigest, dstRepo, dst) } return nil