Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async pruning #25

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,36 @@ func (tree *MutableTree) DeleteVersions(versions ...int64) error {
return nil
}

func (tree *MutableTree) DeleteVersionsAsync(versions ...int64) error {
logger.Debug("DELETING VERSIONS: %v\n", versions)

if len(versions) == 0 {
return nil
}

sort.Slice(versions, func(i, j int) bool {
return versions[i] < versions[j]
})

// Find ordered data and delete by interval
intervals := map[int64]int64{}
var fromVersion int64
for _, version := range versions {
if version-fromVersion != intervals[fromVersion] {
fromVersion = version
}
intervals[fromVersion]++
}

for fromVersion, sortedBatchSize := range intervals {
if err := tree.DeleteVersionsRangeAsync(fromVersion, fromVersion+sortedBatchSize); err != nil {
return err
}
}

return nil
}

// DeleteVersionsRange removes versions from an interval from the MutableTree (not inclusive).
// An error is returned if any single version has active readers.
// All writes happen in a single batch with a single commit.
Expand All @@ -1119,6 +1149,24 @@ func (tree *MutableTree) DeleteVersionsRange(fromVersion, toVersion int64) error
return nil
}

func (tree *MutableTree) DeleteVersionsRangeAsync(fromVersion, toVersion int64) error {
if err := tree.ndb.DeleteVersionsRangeAsync(fromVersion, toVersion); err != nil {
return err
}

if err := tree.ndb.CommitDeletion(); err != nil {
return err
}

tree.mtx.Lock()
defer tree.mtx.Unlock()
for version := fromVersion; version < toVersion; version++ {
delete(tree.versions, version)
}

return nil
}

// DeleteVersion deletes a tree version from disk. The version can then no
// longer be accessed.
func (tree *MutableTree) DeleteVersion(version int64) error {
Expand Down
8 changes: 4 additions & 4 deletions mutable_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func TestUpgradeStorageToFast_DbErrorConstructor_Failure(t *testing.T) {
expectedError := errors.New("some db error")

dbMock.EXPECT().Get(gomock.Any()).Return(nil, expectedError).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(2)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)

tree, err := NewMutableTree(dbMock, 0, false)
Expand All @@ -852,7 +852,7 @@ func TestUpgradeStorageToFast_DbErrorEnableFastStorage_Failure(t *testing.T) {
batchMock := mock.NewMockBatch(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return(nil, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(2)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)

iterMock := mock.NewMockIterator(ctrl)
Expand Down Expand Up @@ -901,7 +901,7 @@ func TestFastStorageReUpgradeProtection_NoForceUpgrade_Success(t *testing.T) {
batchMock := mock.NewMockBatch(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(2)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version

tree, err := NewMutableTree(dbMock, 0, false)
Expand Down Expand Up @@ -949,7 +949,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_

// dbMock represents the underlying database under the hood of nodeDB
dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(3)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(4)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version
startFormat := fastKeyFormat.Key()
endFormat := fastKeyFormat.Key()
Expand Down
87 changes: 87 additions & 0 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type nodeDB struct {
mtx sync.Mutex // Read/write lock.
db dbm.DB // Persistent node storage.
batch dbm.Batch // Batched writing buffer.
deletionBatch dbm.Batch // Batched deletion buffer.
opts Options // Options to customize for pruning/writing
versionReaders map[int64]uint32 // Number of active version readers
storageVersion string // Storage version
Expand All @@ -96,6 +97,7 @@ func newNodeDB(db dbm.DB, cacheSize int, opts *Options) *nodeDB {
return &nodeDB{
db: db,
batch: db.NewBatch(),
deletionBatch: db.NewBatch(),
opts: *opts,
latestVersion: 0, // initially invalid
nodeCache: cache.New(cacheSize),
Expand Down Expand Up @@ -512,6 +514,73 @@ func (ndb *nodeDB) DeleteVersionsFrom(version int64) error {
return nil
}

func (ndb *nodeDB) DeleteVersionsRangeAsync(fromVersion, toVersion int64) error {
ndb.mtx.Lock()
latest, err := ndb.getLatestVersion()
ndb.mtx.Unlock()
if err != nil {
return err
}
if latest < toVersion {
return errors.Errorf("cannot delete latest saved version (%d)", latest)
}

ndb.mtx.Lock()
predecessor, err := ndb.getPreviousVersion(fromVersion)
ndb.mtx.Unlock()
if err != nil {
return err
}

ndb.mtx.Lock()
for v, r := range ndb.versionReaders {
if v < toVersion && v > predecessor && r != 0 {
ndb.mtx.Unlock()
return errors.Errorf("unable to delete version %v with %v active readers", v, r)
}
}
ndb.mtx.Unlock()

for version := fromVersion; version < toVersion; version++ {
err := ndb.traverseOrphansVersion(version, func(key, hash []byte) error {
var from, to int64
orphanKeyFormat.Scan(key, &to, &from)
if err := ndb.deletionBatch.Delete(key); err != nil {
return err
}
if from > predecessor {
if err := ndb.deletionBatch.Delete(ndb.nodeKey(hash)); err != nil {
return err
}
ndb.mtx.Lock()
ndb.nodeCache.Remove(hash)
ndb.mtx.Unlock()
} else {
key := ndb.orphanKey(from, predecessor, hash)
if err := ndb.deletionBatch.Set(key, hash); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}

err = ndb.traverseRange(rootKeyFormat.Key(fromVersion), rootKeyFormat.Key(toVersion), func(k, v []byte) error {
if err := ndb.deletionBatch.Delete(k); err != nil {
return err
}
return nil
})

if err != nil {
return err
}
return nil
}

// DeleteVersionsRange deletes versions from an interval (not inclusive).
func (ndb *nodeDB) DeleteVersionsRange(fromVersion, toVersion int64) error {
if fromVersion >= toVersion {
Expand Down Expand Up @@ -884,6 +953,24 @@ func (ndb *nodeDB) Commit() error {
return nil
}

// Write deletion to disk.
func (ndb *nodeDB) CommitDeletion() error {
var err error
if ndb.opts.Sync {
err = ndb.deletionBatch.WriteSync()
} else {
err = ndb.deletionBatch.Write()
}
if err != nil {
return errors.Wrap(err, "failed to write batch")
}

ndb.deletionBatch.Close()
ndb.deletionBatch = ndb.db.NewBatch()

return nil
}

func (ndb *nodeDB) HasRoot(version int64) (bool, error) {
return ndb.db.Has(ndb.rootKey(version))
}
Expand Down
10 changes: 5 additions & 5 deletions nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestNewNoDbStorage_StorageVersionInDb_Success(t *testing.T) {
dbMock := mock.NewMockDB(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return([]byte(expectedVersion), nil).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(2)

ndb := newNodeDB(dbMock, 0, nil)
require.Equal(t, expectedVersion, ndb.storageVersion)
Expand All @@ -65,7 +65,7 @@ func TestNewNoDbStorage_ErrorInConstructor_DefaultSet(t *testing.T) {
dbMock := mock.NewMockDB(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return(nil, errors.New("some db error")).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(2)

ndb := newNodeDB(dbMock, 0, nil)
require.Equal(t, expectedVersion, ndb.getStorageVersion())
Expand All @@ -78,7 +78,7 @@ func TestNewNoDbStorage_DoesNotExist_DefaultSet(t *testing.T) {
dbMock := mock.NewMockDB(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return(nil, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(2)

ndb := newNodeDB(dbMock, 0, nil)
require.Equal(t, expectedVersion, ndb.getStorageVersion())
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestSetStorageVersion_DBFailure_OldKept(t *testing.T) {
expectedFastCacheVersion := 2

dbMock.EXPECT().Get(gomock.Any()).Return([]byte(defaultStorageVersionValue), nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(2)

// rIterMock is used to get the latest version from disk. We are mocking that rIterMock returns latestTreeVersion from disk
rIterMock.EXPECT().Valid().Return(true).Times(1)
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestSetStorageVersion_InvalidVersionFailure_OldKept(t *testing.T) {
invalidStorageVersion := fastStorageVersionValue + fastStorageVersionDelimiter + "1" + fastStorageVersionDelimiter + "2"

dbMock.EXPECT().Get(gomock.Any()).Return([]byte(invalidStorageVersion), nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(2)

ndb := newNodeDB(dbMock, 0, nil)
require.Equal(t, invalidStorageVersion, ndb.getStorageVersion())
Expand Down