From 0c3d4c14fbcd247b6eca7eca817f945e9e1f1e6f Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 1 Dec 2023 19:57:21 +0300 Subject: [PATCH] shard: Clean up inhumed containers by GC Closes #1663. Signed-off-by: Pavel Karpy --- pkg/local_object_storage/shard/gc.go | 38 ++++++------ pkg/local_object_storage/shard/gc_test.go | 64 ++++++++++++++++++++ pkg/local_object_storage/shard/shard_test.go | 6 +- 3 files changed, 84 insertions(+), 24 deletions(-) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 0ab09c1055f..9005ec8f357 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -193,34 +193,18 @@ func (s *Shard) removeGarbage() { return } - buf := make([]oid.Address, 0, s.rmBatchSize) - - var iterPrm meta.GarbageIterationPrm - iterPrm.SetHandler(func(g meta.GarbageObject) error { - buf = append(buf, g.Address()) - - if len(buf) == s.rmBatchSize { - return meta.ErrInterruptIterator - } - - return nil - }) - - // iterate over metabase's objects with GC mark - // (no more than s.rmBatchSize objects) - err := s.metaBase.IterateOverGarbage(iterPrm) + gObjs, gContainers, err := s.metaBase.GetGarbage(s.rmBatchSize) if err != nil { - s.log.Warn("iterator over metabase graveyard failed", - zap.String("error", err.Error()), + s.log.Warn("fetching garbage objects", + zap.Error(err), ) - return - } else if len(buf) == 0 { return } var deletePrm DeletePrm - deletePrm.SetAddresses(buf...) + deletePrm.SetAddresses(gObjs...) + deletePrm.skipNotFoundError = true // delete accumulated objects _, err = s.delete(deletePrm) @@ -231,6 +215,18 @@ func (s *Shard) removeGarbage() { return } + + // objects are removed, clean up empty container (all the object + // were deleted from the disk) information from the metabase + for _, cID := range gContainers { + err = s.metaBase.DeleteContainer(cID) + if err != nil { + s.log.Warn("clean up container in metabase", + zap.Stringer("cID", cID), + zap.Error(err), + ) + } + } } func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index bf7ab44921b..62addf6a0a6 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -2,6 +2,8 @@ package shard_test import ( "context" + "errors" + "fmt" "path/filepath" "testing" "time" @@ -14,6 +16,7 @@ import ( meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/util" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/object" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" @@ -117,3 +120,64 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) { return shard.IsErrNotFound(err) }, 3*time.Second, 1*time.Second, "lock expiration should free object removal") } + +func TestGC_ContainerCleanup(t *testing.T) { + sh := newCustomShard(t, t.TempDir(), true, + nil, + nil, + shard.WithGCRemoverSleepInterval(10*time.Millisecond)) + defer releaseShard(sh, t) + + const numOfObjs = 10 + cID := cidtest.ID() + oo := make([]oid.Address, 0, numOfObjs) + + for i := 0; i < numOfObjs; i++ { + var putPrm shard.PutPrm + + obj := generateObjectWithCID(t, cID) + addAttribute(obj, fmt.Sprintf("foo%d", i), fmt.Sprintf("bar%d", i)) + if i%2 == 0 { + addPayload(obj, 1<<5) // small + } else { + addPayload(obj, 1<<20) // big + } + putPrm.SetObject(obj) + + _, err := sh.Put(putPrm) + require.NoError(t, err) + + oo = append(oo, objectCore.AddressOf(obj)) + } + + res, err := sh.ListContainers(shard.ListContainersPrm{}) + require.NoError(t, err) + require.Len(t, res.Containers(), 1) + + for _, o := range oo { + var getPrm shard.GetPrm + getPrm.SetAddress(o) + + _, err = sh.Get(getPrm) + require.NoError(t, err) + } + + require.NoError(t, sh.InhumeContainer(cID)) + + require.Eventually(t, func() bool { + res, err = sh.ListContainers(shard.ListContainersPrm{}) + require.NoError(t, err) + + for _, o := range oo { + var getPrm shard.GetPrm + getPrm.SetAddress(o) + + _, err = sh.Get(getPrm) + if !errors.Is(err, apistatus.ObjectNotFound{}) { + return false + } + } + + return len(res.Containers()) == 0 + }, time.Second, 100*time.Millisecond) +} diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 0473878b6ed..41e4a9406a1 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -41,7 +41,7 @@ func newShard(t testing.TB, enableWriteCache bool) *shard.Shard { nil) } -func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option) *shard.Shard { +func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option, options ...shard.Option) *shard.Shard { if enableWriteCache { rootPath = filepath.Join(rootPath, "wc") } else { @@ -66,7 +66,7 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts } } - opts := []shard.Option{ + opts := append([]shard.Option{ shard.WithLogger(zap.L()), shard.WithBlobStorOptions(bsOpts...), shard.WithMetaBaseOptions( @@ -80,7 +80,7 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option{writecache.WithPath(filepath.Join(rootPath, "wcache"))}, wcOpts...)..., ), - } + }, options...) sh := shard.New(opts...)