Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BatchWithFlusher to iavl tree #771

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added __debug_bin
Binary file not shown.
5 changes: 4 additions & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ type BatchWithFlusher struct {
var _ dbm.Batch = &BatchWithFlusher{}

// Ethereum has found that commit of 100KB is optimal, ref ethereum/go-ethereum#15115
// var defaultFlushThreshold = 100000
var DefaultFlushThreshold = 100000

// NewBatchWithFlusher returns new BatchWithFlusher wrapping the passed in batch
func NewBatchWithFlusher(db dbm.DB, flushThreshold int) *BatchWithFlusher {
if flushThreshold <= 0 {
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
panic("flushThreshold can't be zero or negative")
}
return &BatchWithFlusher{
db: db,
batch: db.NewBatchWithSize(flushThreshold),
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func randBytes(length int) []byte {
}

func prepareTree(b *testing.B, db db.DB, size, keyLen, dataLen int) (*iavl.MutableTree, [][]byte) {
t, err := iavl.NewMutableTreeWithOpts(db, size, nil, false, log.NewNopLogger())
t, err := iavl.NewMutableTreeWithOpts(db, size, nil, false, log.NewNopLogger(), iavl.DefaultFlushThreshold)
require.NoError(b, err)
keys := make([][]byte, size)

Expand Down
3 changes: 2 additions & 1 deletion immutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func NewImmutableTreeWithOpts(db dbm.DB, cacheSize int, opts *Options, skipFastS
return &ImmutableTree{
logger: lg,
// NodeDB-backed Tree.
ndb: newNodeDB(db, cacheSize, opts, lg),
// flushthreshold is 0 since there's no batch commit action.
ndb: newNodeDB(db, cacheSize, opts, lg, 0),
skipFastStorageUpgrade: skipFastStorageUpgrade,
}
}
Expand Down
4 changes: 2 additions & 2 deletions import.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func newImporter(tree *MutableTree, version int64) (*Importer, error) {
return &Importer{
tree: tree,
version: version,
batch: tree.ndb.db.NewBatch(),
batch: NewBatch(tree.ndb.db, tree.ndb.flushThreshold),
stack: make([]*Node, 0, 8),
nonces: make([]int32, version+1),
}, nil
Expand Down Expand Up @@ -84,7 +84,7 @@ func (i *Importer) writeNode(node *Node) error {
return err
}
i.batch.Close()
i.batch = i.tree.ndb.db.NewBatch()
i.batch = NewBatch(i.tree.ndb.db, i.tree.ndb.flushThreshold)
i.batchSize = 0
}

Expand Down
2 changes: 1 addition & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func TestNodeIterator_Success(t *testing.T) {
}

func TestNodeIterator_WithEmptyRoot(t *testing.T) {
itr, err := NewNodeIterator(nil, newNodeDB(dbm.NewMemDB(), 0, nil, log.NewNopLogger()))
itr, err := NewNodeIterator(nil, newNodeDB(dbm.NewMemDB(), 0, nil, log.NewNopLogger(), 0))
require.NoError(t, err)
require.False(t, itr.Valid())
}
6 changes: 3 additions & 3 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ type MutableTree struct {

// NewMutableTree returns a new tree with the specified cache size and datastore.
func NewMutableTree(db dbm.DB, cacheSize int, skipFastStorageUpgrade bool, lg log.Logger) (*MutableTree, error) {
return NewMutableTreeWithOpts(db, cacheSize, nil, skipFastStorageUpgrade, lg)
return NewMutableTreeWithOpts(db, cacheSize, nil, skipFastStorageUpgrade, lg, DefaultFlushThreshold)
}

// NewMutableTreeWithOpts returns a new tree with the specified options.
func NewMutableTreeWithOpts(db dbm.DB, cacheSize int, opts *Options, skipFastStorageUpgrade bool, lg log.Logger) (*MutableTree, error) {
ndb := newNodeDB(db, cacheSize, opts, lg)
func NewMutableTreeWithOpts(db dbm.DB, cacheSize int, opts *Options, skipFastStorageUpgrade bool, lg log.Logger, flushThreshold int) (*MutableTree, error) {
ndb := newNodeDB(db, cacheSize, opts, lg, flushThreshold)
head := &ImmutableTree{ndb: ndb, skipFastStorageUpgrade: skipFastStorageUpgrade}

return &MutableTree{
Expand Down
22 changes: 12 additions & 10 deletions mutable_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestMutableTree_LoadVersion_Empty(t *testing.T) {

func TestMutableTree_InitialVersion(t *testing.T) {
memDB := db.NewMemDB()
tree, err := NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 9}, false, log.NewNopLogger())
tree, err := NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 9}, false, log.NewNopLogger(), DefaultFlushThreshold)
require.NoError(t, err)

_, err = tree.Set([]byte("a"), []byte{0x01})
Expand All @@ -274,20 +274,20 @@ func TestMutableTree_InitialVersion(t *testing.T) {
assert.EqualValues(t, 10, version)

// Reloading the tree with the same initial version is fine
tree, err = NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 9}, false, log.NewNopLogger())
tree, err = NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 9}, false, log.NewNopLogger(), DefaultFlushThreshold)
require.NoError(t, err)
version, err = tree.Load()
require.NoError(t, err)
assert.EqualValues(t, 10, version)

// Reloading the tree with an initial version beyond the lowest should error
tree, err = NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 10}, false, log.NewNopLogger())
tree, err = NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 10}, false, log.NewNopLogger(), DefaultFlushThreshold)
require.NoError(t, err)
_, err = tree.Load()
require.Error(t, err)

// Reloading the tree with a lower initial version is fine, and new versions can be produced
tree, err = NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 3}, false, log.NewNopLogger())
tree, err = NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 3}, false, log.NewNopLogger(), DefaultFlushThreshold)
require.NoError(t, err)
version, err = tree.Load()
require.NoError(t, err)
Expand Down Expand Up @@ -813,7 +813,7 @@ func TestUpgradeStorageToFast_DbErrorConstructor_Failure(t *testing.T) {
expectedError := errors.New("some db error")

dbMock.EXPECT().Get(gomock.Any()).Return(nil, expectedError).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(nil).Times(1)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)

tree, err := NewMutableTree(dbMock, 0, false, log.NewNopLogger())
Expand All @@ -835,12 +835,12 @@ func TestUpgradeStorageToFast_DbErrorEnableFastStorage_Failure(t *testing.T) {
rIterMock.EXPECT().Key().Return(nodeKeyFormat.Key(1))
rIterMock.EXPECT().Close().Return(nil).Times(1)

expectedError := errors.New("some db error")
expectedError := errors.New("some error")

batchMock := mock.NewMockBatch(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return(nil, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(1)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)

iterMock := mock.NewMockIterator(ctrl)
Expand All @@ -849,6 +849,7 @@ func TestUpgradeStorageToFast_DbErrorEnableFastStorage_Failure(t *testing.T) {
iterMock.EXPECT().Valid().Times(2)
iterMock.EXPECT().Close()

batchMock.EXPECT().GetByteSize().Return(100, nil).Times(1)
batchMock.EXPECT().Set(gomock.Any(), gomock.Any()).Return(expectedError).Times(1)

tree, err := NewMutableTree(dbMock, 0, false, log.NewNopLogger())
Expand Down Expand Up @@ -889,7 +890,7 @@ func TestFastStorageReUpgradeProtection_NoForceUpgrade_Success(t *testing.T) {
batchMock := mock.NewMockBatch(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(1)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version

tree, err := NewMutableTree(dbMock, 0, false, log.NewNopLogger())
Expand Down Expand Up @@ -937,7 +938,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_

// dbMock represents the underlying database under the hood of nodeDB
dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(3)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(3)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version
startFormat := fastKeyFormat.Key()
endFormat := fastKeyFormat.Key()
Expand All @@ -956,6 +957,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
updatedExpectedStorageVersion := make([]byte, len(expectedStorageVersion))
copy(updatedExpectedStorageVersion, expectedStorageVersion)
updatedExpectedStorageVersion[len(updatedExpectedStorageVersion)-1]++
batchMock.EXPECT().GetByteSize().Return(100, nil).Times(2)
batchMock.EXPECT().Delete(fastKeyFormat.Key(fastNodeKeyToDelete)).Return(nil).Times(1)
batchMock.EXPECT().Set(metadataKeyFormat.Key([]byte(storageVersionKey)), updatedExpectedStorageVersion).Return(nil).Times(1)
batchMock.EXPECT().Write().Return(nil).Times(2)
Expand Down Expand Up @@ -1438,7 +1440,7 @@ func TestMutableTree_InitialVersion_FirstVersion(t *testing.T) {
initialVersion := int64(1000)
tree, err := NewMutableTreeWithOpts(db, 0, &Options{
InitialVersion: uint64(initialVersion),
}, true, log.NewNopLogger())
}, true, log.NewNopLogger(), DefaultFlushThreshold)
require.NoError(t, err)

_, err = tree.Set([]byte("hello"), []byte("world"))
Expand Down
18 changes: 14 additions & 4 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ type nodeDB struct {
latestVersion int64 // Latest version of nodeDB.
nodeCache cache.Cache // Cache for nodes in the regular tree that consists of key-value pairs at any version.
fastNodeCache cache.Cache // Cache for nodes in the fast index that represents only key-value pairs at the latest version.
flushThreshold int
}

func newNodeDB(db dbm.DB, cacheSize int, opts *Options, lg log.Logger) *nodeDB {
func newNodeDB(db dbm.DB, cacheSize int, opts *Options, lg log.Logger, flushThreshold int) *nodeDB {
if opts == nil {
o := DefaultOptions()
opts = &o
Expand All @@ -87,14 +88,15 @@ func newNodeDB(db dbm.DB, cacheSize int, opts *Options, lg log.Logger) *nodeDB {
return &nodeDB{
logger: lg,
db: db,
batch: db.NewBatch(),
batch: NewBatch(db, flushThreshold),
opts: *opts,
firstVersion: 0,
latestVersion: 0, // initially invalid
nodeCache: cache.New(cacheSize),
fastNodeCache: cache.New(fastNodeCacheSize),
versionReaders: make(map[int64]uint32, 8),
storageVersion: string(storeVersion),
flushThreshold: flushThreshold,
}
}

Expand Down Expand Up @@ -135,6 +137,14 @@ func (ndb *nodeDB) GetNode(nk *NodeKey) (*Node, error) {
return node, nil
}

func NewBatch(db dbm.DB, flushThreshold int) dbm.Batch {
if flushThreshold == 0 {
return db.NewBatch()
} else {
return NewBatchWithFlusher(db, flushThreshold)
}
}

func (ndb *nodeDB) GetFastNode(key []byte) (*fastnode.Node, error) {
if !ndb.hasUpgradedToFastStorage() {
return nil, errors.New("storage version is not fast")
Expand Down Expand Up @@ -351,7 +361,7 @@ func (ndb *nodeDB) writeBatch() error {
return err
}

ndb.batch = ndb.db.NewBatch()
ndb.batch = NewBatch(ndb.db, ndb.flushThreshold)

return nil
}
Expand Down Expand Up @@ -642,7 +652,7 @@ func (ndb *nodeDB) Commit() error {
}

ndb.batch.Close()
ndb.batch = ndb.db.NewBatch()
ndb.batch = NewBatch(ndb.db, ndb.flushThreshold)

return nil
}
Expand Down
Loading