From e11b3e4262c565574d95ef5a0b930e44543dc1cc Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Thu, 12 Oct 2023 16:51:59 +0300 Subject: [PATCH] feat(api): repair corrupted blobs when pushed again CheckBlob() returns ErrBlobNotFound on corrupted blobs closes #1922 Signed-off-by: Petu Eusebiu --- pkg/storage/imagestore/imagestore.go | 46 +++++++++++++------ pkg/storage/storage_test.go | 66 ++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 14 deletions(-) diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index a65762c10b..6bb465c9f0 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -862,20 +862,11 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig return err } - fileReader, err := is.storeDriver.Reader(src, 0) - if err != nil { - is.log.Error().Err(err).Str("blob", src).Msg("failed to open file") - - return zerr.ErrUploadNotFound - } - - defer fileReader.Close() - - srcDigest, err := godigest.FromReader(fileReader) + srcDigest, err := getBlobDigest(is, src) if err != nil { is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") - return zerr.ErrBadBlobDigest + return err } if srcDigest != dstDigest { @@ -1029,7 +1020,6 @@ retry: } else { // cache record exists, but due to GC and upgrades from older versions, // disk content and cache records may go out of sync - if is.cache.UsesRelativePaths() { dstRecord = path.Join(is.rootDir, dstRecord) } @@ -1060,6 +1050,14 @@ retry: if err := is.cache.PutBlob(dstDigest, dst); err != nil { is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record") + return err + } + } else { + // if its same file then it was already uploaded, let's overwrite + // move the blob from uploads to final dest + if err := is.storeDriver.Move(src, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob") + return err } } @@ -1130,9 +1128,13 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6 binfo, err := is.storeDriver.Stat(blobPath) if err == nil && binfo.Size() > 0 { - is.log.Debug().Str("blob path", blobPath).Msg("blob path found") + blobDigest, _ := getBlobDigest(is, blobPath) + // validate that found blob is not corrupted + if digest == blobDigest { + is.log.Debug().Str("blob path", blobPath).Msg("blob path found") - return true, binfo.Size(), nil + return true, binfo.Size(), nil + } } // otherwise is a 'deduped' blob (empty file) @@ -1630,6 +1632,22 @@ func (is *ImageStore) deleteBlob(repo string, digest godigest.Digest) error { return nil } +func getBlobDigest(imgStore *ImageStore, path string) (godigest.Digest, error) { + fileReader, err := imgStore.storeDriver.Reader(path, 0) + if err != nil { + return "", zerr.ErrUploadNotFound + } + + defer fileReader.Close() + + digest, err := godigest.FromReader(fileReader) + if err != nil { + return "", zerr.ErrBadBlobDigest + } + + return digest, nil +} + func (is *ImageStore) GetAllBlobs(repo string) ([]string, error) { dir := path.Join(is.rootDir, repo, "blobs", "sha256") diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 023990677a..1c3fabec6b 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -1156,6 +1156,72 @@ func TestDeleteBlobsInUse(t *testing.T) { } } +func TestReuploadCorruptedBlob(t *testing.T) { + for _, testcase := range testCases { + testcase := testcase + t.Run(testcase.testCaseName, func(t *testing.T) { + var imgStore storageTypes.ImageStore + var testDir, tdir string + var store driver.StorageDriver + var driver storageTypes.Driver + + log := log.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + + if testcase.storageType == storageConstants.S3StorageDriverName { + tskip.SkipS3(t) + + uuid, err := guuid.NewV4() + if err != nil { + panic(err) + } + + testDir = path.Join("/oci-repo-test", uuid.String()) + tdir = t.TempDir() + + store, imgStore, _ = createObjectsStore(testDir, tdir) + driver = s3.New(store) + defer cleanupStorage(store, testDir) + } else { + tdir = t.TempDir() + cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{ + RootDir: tdir, + Name: "cache", + UseRelPaths: true, + }, log) + driver = local.New(true) + imgStore = imagestore.NewImageStore(tdir, tdir, true, + true, log, metrics, nil, driver, cacheDriver) + } + + Convey("CheckBlob() should return BlobNotFound on corrupted blobs", t, func() { + storeController := storage.StoreController{DefaultStore: imgStore} + + image := CreateRandomImage() + + err := WriteImageToFileSystem(image, repoName, tag, storeController) + So(err, ShouldBeNil) + + blobDigest := godigest.FromBytes(image.Layers[0]) + blobPath := imgStore.BlobPath(repoName, blobDigest) + + ok, size, err := imgStore.CheckBlob(repoName, blobDigest) + So(ok, ShouldBeTrue) + So(size, ShouldEqual, len(image.Layers[0])) + So(err, ShouldBeNil) + + _, err = driver.WriteFile(blobPath, []byte("corrupted")) + So(err, ShouldBeNil) + + ok, size, err = imgStore.CheckBlob(repoName, blobDigest) + So(ok, ShouldBeFalse) + So(size, ShouldNotEqual, len(image.Layers[0])) + So(err, ShouldEqual, zerr.ErrBlobNotFound) + }) + }) + } +} + func TestStorageHandler(t *testing.T) { for _, testcase := range testCases { testcase := testcase