Skip to content

Commit

Permalink
shard: Clean up inhumed containers by GC
Browse files Browse the repository at this point in the history
Closes #1663.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Dec 1, 2023
1 parent 4a2b3fc commit 0c3d4c1
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 24 deletions.
38 changes: 17 additions & 21 deletions pkg/local_object_storage/shard/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,34 +193,18 @@ func (s *Shard) removeGarbage() {
return
}

buf := make([]oid.Address, 0, s.rmBatchSize)

var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
buf = append(buf, g.Address())

if len(buf) == s.rmBatchSize {
return meta.ErrInterruptIterator
}

return nil
})

// iterate over metabase's objects with GC mark
// (no more than s.rmBatchSize objects)
err := s.metaBase.IterateOverGarbage(iterPrm)
gObjs, gContainers, err := s.metaBase.GetGarbage(s.rmBatchSize)
if err != nil {
s.log.Warn("iterator over metabase graveyard failed",
zap.String("error", err.Error()),
s.log.Warn("fetching garbage objects",
zap.Error(err),

Check warning on line 199 in pkg/local_object_storage/shard/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/shard/gc.go#L198-L199

Added lines #L198 - L199 were not covered by tests
)

return
} else if len(buf) == 0 {
return
}

var deletePrm DeletePrm
deletePrm.SetAddresses(buf...)
deletePrm.SetAddresses(gObjs...)
deletePrm.skipNotFoundError = true

// delete accumulated objects
_, err = s.delete(deletePrm)
Expand All @@ -231,6 +215,18 @@ func (s *Shard) removeGarbage() {

return
}

// objects are removed, clean up empty container (all the object
// were deleted from the disk) information from the metabase
for _, cID := range gContainers {
err = s.metaBase.DeleteContainer(cID)
if err != nil {
s.log.Warn("clean up container in metabase",
zap.Stringer("cID", cID),
zap.Error(err),
)

Check warning on line 227 in pkg/local_object_storage/shard/gc.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/shard/gc.go#L224-L227

Added lines #L224 - L227 were not covered by tests
}
}
}

func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
Expand Down
64 changes: 64 additions & 0 deletions pkg/local_object_storage/shard/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package shard_test

import (
"context"
"errors"
"fmt"
"path/filepath"
"testing"
"time"
Expand All @@ -14,6 +16,7 @@ import (
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/util"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/nspcc-dev/neofs-sdk-go/object"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
Expand Down Expand Up @@ -117,3 +120,64 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) {
return shard.IsErrNotFound(err)
}, 3*time.Second, 1*time.Second, "lock expiration should free object removal")
}

func TestGC_ContainerCleanup(t *testing.T) {
sh := newCustomShard(t, t.TempDir(), true,
nil,
nil,
shard.WithGCRemoverSleepInterval(10*time.Millisecond))
defer releaseShard(sh, t)

const numOfObjs = 10
cID := cidtest.ID()
oo := make([]oid.Address, 0, numOfObjs)

for i := 0; i < numOfObjs; i++ {
var putPrm shard.PutPrm

obj := generateObjectWithCID(t, cID)
addAttribute(obj, fmt.Sprintf("foo%d", i), fmt.Sprintf("bar%d", i))
if i%2 == 0 {
addPayload(obj, 1<<5) // small
} else {
addPayload(obj, 1<<20) // big
}
putPrm.SetObject(obj)

_, err := sh.Put(putPrm)
require.NoError(t, err)

oo = append(oo, objectCore.AddressOf(obj))
}

res, err := sh.ListContainers(shard.ListContainersPrm{})
require.NoError(t, err)
require.Len(t, res.Containers(), 1)

for _, o := range oo {
var getPrm shard.GetPrm
getPrm.SetAddress(o)

_, err = sh.Get(getPrm)
require.NoError(t, err)
}

require.NoError(t, sh.InhumeContainer(cID))

require.Eventually(t, func() bool {
res, err = sh.ListContainers(shard.ListContainersPrm{})
require.NoError(t, err)

for _, o := range oo {
var getPrm shard.GetPrm
getPrm.SetAddress(o)

_, err = sh.Get(getPrm)
if !errors.Is(err, apistatus.ObjectNotFound{}) {
return false
}
}

return len(res.Containers()) == 0
}, time.Second, 100*time.Millisecond)
}
6 changes: 3 additions & 3 deletions pkg/local_object_storage/shard/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
nil)
}

func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option) *shard.Shard {
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts []writecache.Option, bsOpts []blobstor.Option, options ...shard.Option) *shard.Shard {
if enableWriteCache {
rootPath = filepath.Join(rootPath, "wc")
} else {
Expand All @@ -66,7 +66,7 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
}
}

opts := []shard.Option{
opts := append([]shard.Option{
shard.WithLogger(zap.L()),
shard.WithBlobStorOptions(bsOpts...),
shard.WithMetaBaseOptions(
Expand All @@ -80,7 +80,7 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
[]writecache.Option{writecache.WithPath(filepath.Join(rootPath, "wcache"))},
wcOpts...)...,
),
}
}, options...)

sh := shard.New(opts...)

Expand Down

0 comments on commit 0c3d4c1

Please sign in to comment.