Skip to content

Commit

Permalink
Merge pull request #881 from iotaledger/fix/ledger-state-tree-memleak
Browse files Browse the repository at this point in the history
Fix memleak in ledgerstate state tree
  • Loading branch information
alexsporn authored Mar 26, 2024
2 parents fde08f1 + f43047c commit 4568cfd
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 43 deletions.
5 changes: 3 additions & 2 deletions pkg/protocol/engine/ledger/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ func (l *Ledger) CommitSlot(slot iotago.SlotIndex) (stateRoot iotago.Identifier,

// Commit the changes
// Update the UTXO ledger
if err = l.utxoLedger.ApplyDiff(slot, outputs, spenders); err != nil {
stateTreeRoot, err := l.utxoLedger.ApplyDiff(slot, outputs, spenders)
if err != nil {
return iotago.Identifier{}, iotago.Identifier{}, iotago.Identifier{}, nil, nil, nil, ierrors.Wrapf(err, "failed to apply diff to UTXO ledger for slot %d", slot)
}

Expand Down Expand Up @@ -217,7 +218,7 @@ func (l *Ledger) CommitSlot(slot iotago.SlotIndex) (stateRoot iotago.Identifier,
// (due to removing the attachments) and then committed, which would result in a broken state of the transaction.
l.memPool.Evict(slot)

return l.utxoLedger.StateTreeRoot(), stateDiff.Mutations().Root(), l.accountsLedger.AccountsTreeRoot(), outputs, spenders, mutations, nil
return stateTreeRoot, stateDiff.Mutations().Root(), l.accountsLedger.AccountsTreeRoot(), outputs, spenders, mutations, nil
}

func (l *Ledger) AddAccount(output *utxoledger.Output, blockIssuanceCredits iotago.BlockIssuanceCredits) error {
Expand Down
5 changes: 3 additions & 2 deletions pkg/protocol/engine/utxoledger/iteration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/iota-core/pkg/protocol/engine/utxoledger"
"github.com/iotaledger/iota-core/pkg/protocol/engine/utxoledger/tpkg"
iotago "github.com/iotaledger/iota.go/v4"
Expand Down Expand Up @@ -34,7 +35,7 @@ func TestUTXOComputeBalance(t *testing.T) {
tpkg.RandLedgerStateSpentWithOutput(initialOutput, index),
}

require.NoError(t, manager.ApplyDiffWithoutLocking(index, outputs, spents))
require.NoError(t, lo.Return2(manager.ApplyDiffWithoutLocking(index, outputs, spents)))

spent, err := manager.SpentOutputs()
require.NoError(t, err)
Expand Down Expand Up @@ -80,7 +81,7 @@ func TestUTXOIteration(t *testing.T) {
tpkg.RandLedgerStateSpentWithOutput(outputs[9], index),
}

require.NoError(t, manager.ApplyDiffWithoutLocking(index, outputs, spents))
require.NoError(t, lo.Return2(manager.ApplyDiffWithoutLocking(index, outputs, spents)))

// Prepare values to check
outputByID := make(map[string]struct{})
Expand Down
77 changes: 52 additions & 25 deletions pkg/protocol/engine/utxoledger/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,22 @@ type Manager struct {
store kvstore.KVStore
storeLock syncutils.RWMutex

stateTree ads.Map[iotago.Identifier, iotago.OutputID, *stateTreeMetadata]
stateTreeKVStore kvstore.KVStore
stateTree ads.Map[iotago.Identifier, iotago.OutputID, *stateTreeMetadata]

apiProvider iotago.APIProvider
}

func New(store kvstore.KVStore, apiProvider iotago.APIProvider) *Manager {
return &Manager{
store: store,
stateTree: ads.NewMap[iotago.Identifier](lo.PanicOnErr(store.WithExtendedRealm(kvstore.Realm{StoreKeyPrefixStateTree})),
iotago.Identifier.Bytes,
iotago.IdentifierFromBytes,
iotago.OutputID.Bytes,
iotago.OutputIDFromBytes,
(*stateTreeMetadata).Bytes,
stateMetadataFromBytes,
),
apiProvider: apiProvider,
m := &Manager{
store: store,
stateTreeKVStore: lo.PanicOnErr(store.WithExtendedRealm(kvstore.Realm{StoreKeyPrefixStateTree})),
apiProvider: apiProvider,
}

m.reInitStateTreeWithoutLocking()

return m
}

// KVStore returns the underlying KVStore.
Expand All @@ -58,6 +56,25 @@ func (m *Manager) ClearLedgerState() (err error) {
return m.store.Clear()
}

func (m *Manager) ReInitStateTreeWithLocking() {
m.WriteLockLedger()
defer m.WriteUnlockLedger()

m.reInitStateTreeWithoutLocking()
}

func (m *Manager) reInitStateTreeWithoutLocking() {
m.stateTree = ads.NewMap[iotago.Identifier](
m.stateTreeKVStore,
iotago.Identifier.Bytes,
iotago.IdentifierFromBytes,
iotago.OutputID.Bytes,
iotago.OutputIDFromBytes,
(*stateTreeMetadata).Bytes,
stateMetadataFromBytes,
)
}

func (m *Manager) ReadLockLedger() {
m.storeLock.RLock()
}
Expand Down Expand Up @@ -149,30 +166,30 @@ func (m *Manager) ReadLedgerSlot() (iotago.SlotIndex, error) {
return m.ReadLedgerIndexWithoutLocking()
}

func (m *Manager) ApplyDiffWithoutLocking(slot iotago.SlotIndex, newOutputs Outputs, newSpents Spents) error {
func (m *Manager) ApplyDiffWithoutLocking(slot iotago.SlotIndex, newOutputs Outputs, newSpents Spents) (newStateTreeRoot iotago.Identifier, err error) {
mutations, err := m.store.Batched()
if err != nil {
return err
return iotago.EmptyIdentifier, err
}

for _, output := range newOutputs {
if err = storeOutput(output, mutations); err != nil {
mutations.Cancel()

return err
return iotago.EmptyIdentifier, err
}
if err := markAsUnspent(output, mutations); err != nil {
mutations.Cancel()

return err
return iotago.EmptyIdentifier, err
}
}

for _, spent := range newSpents {
if err := storeSpentAndMarkOutputAsSpent(spent, mutations); err != nil {
mutations.Cancel()

return err
return iotago.EmptyIdentifier, err
}
}

Expand All @@ -185,38 +202,44 @@ func (m *Manager) ApplyDiffWithoutLocking(slot iotago.SlotIndex, newOutputs Outp
if err := storeDiff(slotDiff, mutations); err != nil {
mutations.Cancel()

return err
return iotago.EmptyIdentifier, err
}

if err := storeLedgerIndex(slot, mutations); err != nil {
mutations.Cancel()

return err
return iotago.EmptyIdentifier, err
}

if err := mutations.Commit(); err != nil {
return err
return iotago.EmptyIdentifier, err
}

for _, output := range newOutputs {
if err := m.stateTree.Set(output.OutputID(), newStateMetadata(output)); err != nil {
return ierrors.Wrapf(err, "failed to set new oputput in state tree, outputID: %s", output.OutputID().ToHex())
return iotago.EmptyIdentifier, ierrors.Wrapf(err, "failed to set new oputput in state tree, outputID: %s", output.OutputID().ToHex())
}
}
for _, spent := range newSpents {
if _, err := m.stateTree.Delete(spent.OutputID()); err != nil {
return ierrors.Wrapf(err, "failed to delete spent output from state tree, outputID: %s", spent.OutputID().ToHex())
return iotago.EmptyIdentifier, ierrors.Wrapf(err, "failed to delete spent output from state tree, outputID: %s", spent.OutputID().ToHex())
}
}

if err := m.stateTree.Commit(); err != nil {
return ierrors.Wrap(err, "failed to commit state tree")
return iotago.EmptyIdentifier, ierrors.Wrap(err, "failed to commit state tree")
}

return nil
root := m.StateTreeRoot()

// Re-Init the state tree to release memory from the underlying SMT of all the elements that were just added.
// This might increase runtime (as we need to access the disk) but it will reduce memory usage significantly for a big tree.
m.reInitStateTreeWithoutLocking()

return root, nil
}

func (m *Manager) ApplyDiff(slot iotago.SlotIndex, newOutputs Outputs, newSpents Spents) error {
func (m *Manager) ApplyDiff(slot iotago.SlotIndex, newOutputs Outputs, newSpents Spents) (newStateTreeRoot iotago.Identifier, err error) {
m.WriteLockLedger()
defer m.WriteUnlockLedger()

Expand Down Expand Up @@ -289,6 +312,8 @@ func (m *Manager) RollbackDiffWithoutLocking(slot iotago.SlotIndex, newOutputs O
return ierrors.Wrap(err, "failed to commit state tree")
}

m.reInitStateTreeWithoutLocking()

return nil
}

Expand Down Expand Up @@ -321,6 +346,8 @@ func (m *Manager) AddGenesisUnspentOutputWithoutLocking(unspentOutput *Output) e
return ierrors.Wrap(err, "failed to commit state tree")
}

m.reInitStateTreeWithoutLocking()

return nil
}

Expand Down
119 changes: 116 additions & 3 deletions pkg/protocol/engine/utxoledger/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@
package utxoledger_test

import (
"fmt"
"testing"
"time"

"github.com/fjl/memsize"
"github.com/stretchr/testify/require"

"github.com/iotaledger/hive.go/ads"
"github.com/iotaledger/hive.go/db"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/iota-core/pkg/protocol/engine/utxoledger"
"github.com/iotaledger/iota-core/pkg/protocol/engine/utxoledger/tpkg"
"github.com/iotaledger/iota-core/pkg/storage/database"
iotago "github.com/iotaledger/iota.go/v4"
iotago_tpkg "github.com/iotaledger/iota.go/v4/tpkg"
)
Expand All @@ -34,7 +42,7 @@ func TestConfirmationApplyAndRollbackToEmptyLedger(t *testing.T) {
tpkg.RandLedgerStateSpentWithOutput(outputs[2], slot),
}

require.NoError(t, manager.ApplyDiffWithoutLocking(slot, outputs, spents))
require.NoError(t, lo.Return2(manager.ApplyDiffWithoutLocking(slot, outputs, spents)))

require.NotEqual(t, manager.StateTreeRoot(), iotago.Identifier{})
require.True(t, manager.CheckStateTree())
Expand Down Expand Up @@ -100,7 +108,7 @@ func TestConfirmationApplyAndRollbackToPreviousLedger(t *testing.T) {
previousSpents := utxoledger.Spents{
tpkg.RandLedgerStateSpentWithOutput(previousOutputs[1], previousMsIndex),
}
require.NoError(t, manager.ApplyDiffWithoutLocking(previousMsIndex, previousOutputs, previousSpents))
require.NoError(t, lo.Return2(manager.ApplyDiffWithoutLocking(previousMsIndex, previousOutputs, previousSpents)))

require.True(t, manager.CheckStateTree())

Expand All @@ -122,7 +130,7 @@ func TestConfirmationApplyAndRollbackToPreviousLedger(t *testing.T) {
tpkg.RandLedgerStateSpentWithOutput(previousOutputs[2], index),
tpkg.RandLedgerStateSpentWithOutput(outputs[2], index),
}
require.NoError(t, manager.ApplyDiffWithoutLocking(index, outputs, spents))
require.NoError(t, lo.Return2(manager.ApplyDiffWithoutLocking(index, outputs, spents)))

require.True(t, manager.CheckStateTree())

Expand Down Expand Up @@ -238,3 +246,108 @@ func TestConfirmationApplyAndRollbackToPreviousLedger(t *testing.T) {
}))
require.Empty(t, spentByOutputID)
}

func TestMemLeakStateTree(t *testing.T) {
t.Skip("This test is not meant to be run in CI, it's for local testing only")

dbConfig := database.Config{
Engine: db.EngineRocksDB,
Directory: t.TempDir(),
Version: 1,
PrefixHealth: []byte{2},
}
rocksDB := database.NewDBInstance(dbConfig, nil)
kvStore := rocksDB.KVStore()
stateTree := ads.NewMap[iotago.Identifier](kvStore,
iotago.Identifier.Bytes,
iotago.IdentifierFromBytes,
iotago.OutputID.Bytes,
iotago.OutputIDFromBytes,
(*stateTreeMetadata).Bytes,
stateMetadataFromBytes,
)

var totalOutputs int
var allOutputs []*utxoledger.Output
runTest := func(reInit bool, outputCount int) {
totalOutputs += outputCount
fmt.Println(">>> Running with", outputCount, "outputs, reInit:", reInit)
fmt.Println(">>> Total outputs:", totalOutputs)

start := time.Now()
if reInit {
stateTree = ads.NewMap[iotago.Identifier](kvStore,
iotago.Identifier.Bytes,
iotago.IdentifierFromBytes,
iotago.OutputID.Bytes,
iotago.OutputIDFromBytes,
(*stateTreeMetadata).Bytes,
stateMetadataFromBytes,
)
}

newOutputs := make([]*utxoledger.Output, outputCount)
{
for i := 0; i < outputCount; i++ {
newOutputs[i] = tpkg.RandLedgerStateOutputWithType(iotago.OutputBasic)
allOutputs = append(allOutputs, newOutputs[i])
}

for _, output := range newOutputs {
if err := stateTree.Set(output.OutputID(), newStateMetadata(output)); err != nil {
panic(ierrors.Wrapf(err, "failed to set new oputput in state tree, outputID: %s", output.OutputID().ToHex()))
}
}

if err := stateTree.Commit(); err != nil {
panic(ierrors.Wrap(err, "failed to commit state tree"))
}
}

memConsumptionEnd := memsize.Scan(stateTree)
fmt.Println(">>> Took: ", time.Since(start).String())
fmt.Println()
fmt.Println(">>> Memory:", memConsumptionEnd.Report())

// Check that all outputs are in the tree
for _, output := range allOutputs {
exists, err := stateTree.Has(output.OutputID())
require.NoError(t, err)
require.True(t, exists)
}

fmt.Printf("----------------------------------------------------------------\n\n")
}

runTest(false, 1000000)
runTest(false, 1000000)
runTest(false, 1000000)
runTest(false, 10000)
}

type stateTreeMetadata struct {
Slot iotago.SlotIndex
}

func newStateMetadata(output *utxoledger.Output) *stateTreeMetadata {
return &stateTreeMetadata{
Slot: output.SlotCreated(),
}
}

func stateMetadataFromBytes(b []byte) (*stateTreeMetadata, int, error) {
s := new(stateTreeMetadata)

var err error
var n int
s.Slot, n, err = iotago.SlotIndexFromBytes(b)
if err != nil {
return nil, 0, err
}

return s, n, nil
}

func (s *stateTreeMetadata) Bytes() ([]byte, error) {
return s.Slot.Bytes()
}
2 changes: 1 addition & 1 deletion pkg/protocol/engine/utxoledger/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func AssertOutputUnspentAndSpentTransitions(t *testing.T, output *utxoledger.Out
require.True(t, has)

// Spent it with a slot.
require.NoError(t, manager.ApplyDiff(spent.SlotSpent(), utxoledger.Outputs{}, utxoledger.Spents{spent}))
require.NoError(t, lo.Return2(manager.ApplyDiff(spent.SlotSpent(), utxoledger.Outputs{}, utxoledger.Spents{spent})))

// Read Spent from DB and compare
readSpent, err := manager.ReadSpentForOutputIDWithoutLocking(outputID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/engine/utxoledger/slot_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestSlotDiffSerialization(t *testing.T) {
tpkg.RandLedgerStateSpentWithOutput(outputs[2], slot),
}

require.NoError(t, manager.ApplyDiffWithoutLocking(slot, outputs, spents))
require.NoError(t, lo.Return2(manager.ApplyDiffWithoutLocking(slot, outputs, spents)))

readDiff, err := manager.SlotDiffWithoutLocking(slot)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 4568cfd

Please sign in to comment.