diff --git a/CHANGELOG.md b/CHANGELOG.md index d35d81d3f..c50db7974 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - [#952](https://github.com/cosmos/iavl/pull/952) Add `DeleteVersionsFrom(int64)` API. - [#961](https://github.com/cosmos/iavl/pull/961) Add new `GetLatestVersion` API to get the latest version. +- [#965](https://github.com/cosmos/iavl/pull/965) Use expected interface for expected IAVL `Logger`. ## v1.2.0 May 13, 2024 diff --git a/migrate_test.go b/migrate_test.go index 3ace1ab15..573cbfe75 100644 --- a/migrate_test.go +++ b/migrate_test.go @@ -8,7 +8,6 @@ import ( "os/exec" "path" "testing" - "time" "github.com/stretchr/testify/require" @@ -254,18 +253,6 @@ func TestPruning(t *testing.T) { } } - // Wait for pruning to finish - for i := 0; i < 100; i++ { - _, _, err := tree.SaveVersion() - require.NoError(t, err) - isLeacy, err := tree.ndb.hasLegacyVersion(int64(legacyVersion)) - require.NoError(t, err) - if !isLeacy { - break - } - // Simulate the consensus state update - time.Sleep(500 * time.Millisecond) - } // Reload the tree tree = NewMutableTree(db, 0, false, NewNopLogger()) versions := tree.AvailableVersions() diff --git a/mutable_tree.go b/mutable_tree.go index a671c3cd3..b88cca899 100644 --- a/mutable_tree.go +++ b/mutable_tree.go @@ -693,6 +693,17 @@ func (tree *MutableTree) GetVersioned(key []byte, version int64) ([]byte, error) return nil, nil } +// SetCommitting sets a flag to indicate that the tree is in the process of being saved. +// This is used to prevent parallel writing from async pruning. +func (tree *MutableTree) SetCommitting() { + tree.ndb.SetCommitting() +} + +// UnsetCommitting unsets the flag to indicate that the tree is no longer in the process of being saved. +func (tree *MutableTree) UnsetCommitting() { + tree.ndb.UnsetCommitting() +} + // SaveVersion saves a new tree version to disk, based on the current state of // the tree. Returns the hash and new version number. func (tree *MutableTree) SaveVersion() ([]byte, int64, error) { @@ -1025,10 +1036,7 @@ func (tree *MutableTree) saveNewNodes(version int64) error { var recursiveAssignKey func(*Node) ([]byte, error) recursiveAssignKey = func(node *Node) ([]byte, error) { if node.nodeKey != nil { - if node.nodeKey.nonce != 0 { - return node.nodeKey.GetKey(), nil - } - return node.hash, nil + return node.GetKey(), nil } nonce++ node.nodeKey = &NodeKey{ diff --git a/nodedb.go b/nodedb.go index a71708b31..cd334a564 100644 --- a/nodedb.go +++ b/nodedb.go @@ -37,10 +37,6 @@ const ( defaultStorageVersionValue = "1.0.0" fastStorageVersionValue = "1.1.0" fastNodeCacheSize = 100000 - - // This is used to avoid the case which pruning blocks the main process. - deleteBatchCount = 1000 - deletePauseDuration = 100 * time.Millisecond ) var ( @@ -86,9 +82,12 @@ type nodeDB struct { storageVersion string // Storage version firstVersion int64 // First version of nodeDB. latestVersion int64 // Latest version of nodeDB. + pruneVersion int64 // Version to prune up to. legacyLatestVersion int64 // Latest version of nodeDB in legacy format. nodeCache cache.Cache // Cache for nodes in the regular tree that consists of key-value pairs at any version. fastNodeCache cache.Cache // Cache for nodes in the fast index that represents only key-value pairs at the latest version. + isCommitting bool // Flag to indicate that the nodeDB is committing. + chCommitting chan struct{} // Channel to signal that the committing is done. } func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB { @@ -98,7 +97,7 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB { storeVersion = []byte(defaultStorageVersionValue) } - return &nodeDB{ + ndb := &nodeDB{ logger: lg, db: db, batch: NewBatchWithFlusher(db, opts.FlushThreshold), @@ -106,11 +105,19 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB { firstVersion: 0, latestVersion: 0, // initially invalid legacyLatestVersion: 0, + pruneVersion: 0, nodeCache: cache.New(cacheSize), fastNodeCache: cache.New(fastNodeCacheSize), versionReaders: make(map[int64]uint32, 8), storageVersion: string(storeVersion), + chCommitting: make(chan struct{}, 1), + } + + if opts.AsyncPruning { + go ndb.startPruning() } + + return ndb } // GetNode gets a node from memory or disk. If it is an inner node, it does not @@ -243,6 +250,33 @@ func (ndb *nodeDB) SaveFastNodeNoCache(node *fastnode.Node) error { return ndb.saveFastNodeUnlocked(node, false) } +// SetCommitting sets the committing flag to true. +// This is used to let the pruning process know that the nodeDB is committing. +func (ndb *nodeDB) SetCommitting() { + for len(ndb.chCommitting) > 0 { + <-ndb.chCommitting + } + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + ndb.isCommitting = true +} + +// UnsetCommitting sets the committing flag to false. +// This is used to let the pruning process know that the nodeDB is done committing. +func (ndb *nodeDB) UnsetCommitting() { + ndb.mtx.Lock() + ndb.isCommitting = false + ndb.mtx.Unlock() + ndb.chCommitting <- struct{}{} +} + +// IsCommitting returns true if the nodeDB is committing, false otherwise. +func (ndb *nodeDB) IsCommitting() bool { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + return ndb.isCommitting +} + // SetFastStorageVersionToBatch sets storage version to fast where the version is // 1.1.0-. Returns error if storage version is incorrect or on // db error, nil otherwise. Requires changes to be committed after to be persisted. @@ -330,6 +364,37 @@ func (ndb *nodeDB) Has(nk []byte) (bool, error) { return ndb.db.Has(ndb.nodeKey(nk)) } +// deleteFromPruning deletes the orphan nodes from the pruning process. +func (ndb *nodeDB) deleteFromPruning(key []byte) error { + if ndb.IsCommitting() { + // if the nodeDB is committing, the pruning process will be done after the committing. + <-ndb.chCommitting + } + + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + return ndb.batch.Delete(key) +} + +// saveNodeFromPruning saves the orphan nodes to the pruning process. +func (ndb *nodeDB) saveNodeFromPruning(node *Node) error { + if ndb.IsCommitting() { + // if the nodeDB is committing, the pruning process will be done after the committing. + <-ndb.chCommitting + } + + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + + // Save node bytes to db. + var buf bytes.Buffer + buf.Grow(node.encodedSize()) + if err := node.writeBytes(&buf); err != nil { + return err + } + return ndb.batch.Set(ndb.nodeKey(node.GetKey()), buf.Bytes()) +} + // deleteVersion deletes a tree version from disk. // deletes orphans func (ndb *nodeDB) deleteVersion(version int64) error { @@ -342,7 +407,7 @@ func (ndb *nodeDB) deleteVersion(version int64) error { if orphan.nodeKey.nonce == 0 && !orphan.isLegacy { // if the orphan is a reformatted root, it can be a legacy root // so it should be removed from the pruning process. - if err := ndb.batch.Delete(ndb.legacyNodeKey(orphan.hash)); err != nil { + if err := ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash)); err != nil { return err } } @@ -354,9 +419,9 @@ func (ndb *nodeDB) deleteVersion(version int64) error { } nk := orphan.GetKey() if orphan.isLegacy { - return ndb.batch.Delete(ndb.legacyNodeKey(nk)) + return ndb.deleteFromPruning(ndb.legacyNodeKey(nk)) } - return ndb.batch.Delete(ndb.nodeKey(nk)) + return ndb.deleteFromPruning(ndb.nodeKey(nk)) }); err != nil { return err } @@ -365,7 +430,7 @@ func (ndb *nodeDB) deleteVersion(version int64) error { if rootKey == nil || !bytes.Equal(rootKey, literalRootKey) { // if the root key is not matched with the literal root key, it means the given root // is a reference root to the previous version. - if err := ndb.batch.Delete(ndb.nodeKey(literalRootKey)); err != nil { + if err := ndb.deleteFromPruning(ndb.nodeKey(literalRootKey)); err != nil { return err } } @@ -381,12 +446,12 @@ func (ndb *nodeDB) deleteVersion(version int64) error { return err } // ensure that the given version is not included in the root search - if err := ndb.batch.Delete(ndb.nodeKey(literalRootKey)); err != nil { + if err := ndb.deleteFromPruning(ndb.nodeKey(literalRootKey)); err != nil { return err } // instead, the root should be reformatted to (version, 0) root.nodeKey.nonce = 0 - if err := ndb.SaveNode(root); err != nil { + if err := ndb.saveNodeFromPruning(root); err != nil { return err } } @@ -420,27 +485,22 @@ func (ndb *nodeDB) deleteLegacyNodes(version int64, nk []byte) error { // deleteLegacyVersions deletes all legacy versions from disk. func (ndb *nodeDB) deleteLegacyVersions(legacyLatestVersion int64) error { - count := 0 - - checkDeletePause := func() { - count++ - if count%deleteBatchCount == 0 { - time.Sleep(deletePauseDuration) - count = 0 - } + // Delete the last version for the legacyLastVersion + if err := ndb.traverseOrphans(legacyLatestVersion, legacyLatestVersion+1, func(orphan *Node) error { + return ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash)) + }); err != nil { + return err } // Delete orphans for all legacy versions if err := ndb.traversePrefix(legacyOrphanKeyFormat.Key(), func(key, value []byte) error { - checkDeletePause() - if err := ndb.batch.Delete(key); err != nil { + if err := ndb.deleteFromPruning(key); err != nil { return err } var fromVersion, toVersion int64 legacyOrphanKeyFormat.Scan(key, &toVersion, &fromVersion) if (fromVersion <= legacyLatestVersion && toVersion < legacyLatestVersion) || fromVersion > legacyLatestVersion { - checkDeletePause() - return ndb.batch.Delete(ndb.legacyNodeKey(value)) + return ndb.deleteFromPruning(ndb.legacyNodeKey(value)) } return nil }); err != nil { @@ -448,8 +508,7 @@ func (ndb *nodeDB) deleteLegacyVersions(legacyLatestVersion int64) error { } // Delete all legacy roots if err := ndb.traversePrefix(legacyRootKeyFormat.Key(), func(key, _ []byte) error { - checkDeletePause() - return ndb.batch.Delete(key) + return ndb.deleteFromPruning(key) }); err != nil { return err } @@ -515,8 +574,45 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error { return nil } +// startPruning starts the pruning process. +func (ndb *nodeDB) startPruning() { + for { + ndb.mtx.Lock() + toVersion := ndb.pruneVersion + ndb.mtx.Unlock() + + if toVersion == 0 { + time.Sleep(100 * time.Millisecond) + continue + } + + if err := ndb.deleteVersionsTo(toVersion); err != nil { + ndb.logger.Error("Error while pruning", "err", err) + time.Sleep(1 * time.Second) + continue + } + + ndb.mtx.Lock() + if ndb.pruneVersion <= toVersion { + ndb.pruneVersion = 0 + } + ndb.mtx.Unlock() + } +} + // DeleteVersionsTo deletes the oldest versions up to the given version from disk. func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error { + if !ndb.opts.AsyncPruning { + return ndb.deleteVersionsTo(toVersion) + } + + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + ndb.pruneVersion = toVersion + return nil +} + +func (ndb *nodeDB) deleteVersionsTo(toVersion int64) error { legacyLatestVersion, err := ndb.getLegacyLatestVersion() if err != nil { return err @@ -553,20 +649,12 @@ func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error { // Delete the legacy versions if legacyLatestVersion >= first { - // Delete the last version for the legacyLastVersion - if err := ndb.traverseOrphans(legacyLatestVersion, legacyLatestVersion+1, func(orphan *Node) error { - return ndb.batch.Delete(ndb.legacyNodeKey(orphan.hash)) - }); err != nil { - return err + if err := ndb.deleteLegacyVersions(legacyLatestVersion); err != nil { + ndb.logger.Error("Error deleting legacy versions", "err", err) } + first = legacyLatestVersion + 1 // reset the legacy latest version forcibly to avoid multiple calls ndb.resetLegacyLatestVersion(-1) - go func() { - if err := ndb.deleteLegacyVersions(legacyLatestVersion); err != nil { - ndb.logger.Error("Error deleting legacy versions", "err", err) - } - }() - first = legacyLatestVersion + 1 } for version := first; version <= toVersion; version++ { diff --git a/options.go b/options.go index c679cfca1..520c2170f 100644 --- a/options.go +++ b/options.go @@ -84,6 +84,9 @@ type Options struct { // Ethereum has found that commit of 100KB is optimal, ref ethereum/go-ethereum#15115 FlushThreshold int + + // AsyncPruning is a flag to enable async pruning + AsyncPruning bool } // DefaultOptions returns the default options for IAVL. @@ -118,3 +121,10 @@ func FlushThresholdOption(ft int) Option { opts.FlushThreshold = ft } } + +// AsyncPruningOption sets the AsyncPruning for the tree. +func AsyncPruningOption(asyncPruning bool) Option { + return func(opts *Options) { + opts.AsyncPruning = asyncPruning + } +} diff --git a/prune_test.go b/prune_test.go new file mode 100644 index 000000000..be75b98f2 --- /dev/null +++ b/prune_test.go @@ -0,0 +1,68 @@ +package iavl + +import ( + "fmt" + "testing" + "time" + + dbm "github.com/cosmos/iavl/db" + "github.com/stretchr/testify/require" +) + +func TestAsyncPruning(t *testing.T) { + db, err := dbm.NewGoLevelDB("test", t.TempDir()) + require.NoError(t, err) + defer db.Close() + + tree := NewMutableTree(db, 0, false, NewNopLogger(), AsyncPruningOption(true), FlushThresholdOption(1000)) + + toVersion := 10000 + keyCount := 10 + pruneInterval := int64(100) + keepRecent := int64(300) + for i := 0; i < toVersion; i++ { + for j := 0; j < keyCount; j++ { + _, err := tree.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j))) + require.NoError(t, err) + } + + tree.SetCommitting() + _, v, err := tree.SaveVersion() + require.NoError(t, err) + tree.UnsetCommitting() + + if v%pruneInterval == 0 && v > keepRecent { + ti := time.Now() + require.NoError(t, tree.DeleteVersionsTo(v-keepRecent)) + t.Logf("Pruning %d versions took %v\n", keepRecent, time.Since(ti)) + } + } + + // wait for async pruning to finish + for i := 0; i < 100; i++ { + tree.SetCommitting() + _, _, err := tree.SaveVersion() + require.NoError(t, err) + tree.UnsetCommitting() + + firstVersion, err := tree.ndb.getFirstVersion() + require.NoError(t, err) + t.Logf("Iteration: %d First version: %d\n", i, firstVersion) + if firstVersion == int64(toVersion)-keepRecent+1 { + break + } + // simulate the consensus process + time.Sleep(500 * time.Millisecond) + } + + // Reload the tree + tree = NewMutableTree(db, 0, false, NewNopLogger()) + _, err = tree.LoadVersion(int64(toVersion) - keepRecent) + require.Error(t, err) + versions := tree.AvailableVersions() + require.Equal(t, versions[0], toVersion-int(keepRecent)+1) + for _, v := range versions { + _, err := tree.LoadVersion(int64(v)) + require.NoError(t, err) + } +} diff --git a/tree_test.go b/tree_test.go index 3fec9237b..376b5d9b3 100644 --- a/tree_test.go +++ b/tree_test.go @@ -1878,6 +1878,41 @@ func TestReferenceRoot(t *testing.T) { // check the root of version 2 is the leaf node of key2 require.Equal(t, tree.root.GetKey(), (&NodeKey{version: 1, nonce: 3}).GetKey()) require.Equal(t, tree.root.key, []byte("key2")) + + // test the reference root when pruning + db = dbm.NewMemDB() + require.NoError(t, err) + tree = NewMutableTree(db, 0, false, NewNopLogger()) + + _, err = tree.Set([]byte("key1"), []byte("value1")) + require.NoError(t, err) + + _, _, err = tree.SaveVersion() + require.NoError(t, err) + + _, _, err = tree.SaveVersion() // empty version + require.NoError(t, err) + + require.NoError(t, tree.DeleteVersionsTo(1)) + _, _, err = tree.SaveVersion() // empty version + require.NoError(t, err) + + // load the tree from disk + tree = NewMutableTree(db, 0, false, NewNopLogger()) + _, err = tree.Load() + require.NoError(t, err) + + _, err = tree.Set([]byte("key2"), []byte("value2")) + require.NoError(t, err) + _, _, err = tree.SaveVersion() + require.NoError(t, err) + + // load the tree from disk to check if the reference root is loaded correctly + tree = NewMutableTree(db, 0, false, NewNopLogger()) + _, err = tree.Load() + require.NoError(t, err) + _, err = tree.Set([]byte("key1"), []byte("value2")) + require.NoError(t, err) } func TestWorkingHashWithInitialVersion(t *testing.T) {