Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve pruning: performance, config and progress logging #328

Merged
merged 5 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 50 additions & 19 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"math"
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -38,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
)

const (
Expand All @@ -56,8 +57,10 @@ const (

// Config includes all the configurations for pruning.
type Config struct {
Datadir string // The directory of the state database
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
Datadir string // The directory of the state database
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
Threads int // The maximum number of threads spawned in dumpRawTrieDescendants and removeOtherRoots
CleanCacheSize int // The Megabytes of clean cache size used in dumpRawTrieDescendants
}

// Pruner is an offline tool to prune the stale state with the
Expand Down Expand Up @@ -107,6 +110,10 @@ func NewPruner(db ethdb.Database, config Config) (*Pruner, error) {
if err != nil {
return nil, err
}
// sanitize threads number, if set too low
if config.Threads <= 0 {
config.Threads = 1
}
return &Pruner{
config: config,
chainHeader: headBlock.Header(),
Expand All @@ -124,7 +131,7 @@ func readStoredChainConfig(db ethdb.Database) *params.ChainConfig {
return rawdb.ReadChainConfig(db, block0Hash)
}

func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *stateBloom) error {
func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *stateBloom, threads int) error {
chainConfig := readStoredChainConfig(db)
var genesisBlockNum uint64
if chainConfig != nil {
Expand All @@ -139,7 +146,6 @@ func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *st
return errors.New("failed to load head block")
}
blockRange := headBlock.NumberU64() - genesisBlockNum
threads := runtime.NumCPU()
var wg sync.WaitGroup
errors := make(chan error, threads)
for thread := 0; thread < threads; thread++ {
Expand Down Expand Up @@ -207,7 +213,7 @@ func removeOtherRoots(db ethdb.Database, rootsList []common.Hash, stateBloom *st
}

// Arbitrum: snaptree and root are for the final snapshot kept
func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, start time.Time) error {
func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, start time.Time, threads int) error {
// Delete all stale trie nodes in the disk. With the help of state bloom
// the trie nodes(and codes) belong to the active state will be filtered
// out. A very small part of stale tries will also be filtered because of
Expand Down Expand Up @@ -297,7 +303,7 @@ func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Databas
}

// Clean up any false positives that are top-level state roots.
err := removeOtherRoots(maindb, allRoots, stateBloom)
err := removeOtherRoots(maindb, allRoots, stateBloom, threads)
if err != nil {
return err
}
Expand Down Expand Up @@ -333,8 +339,16 @@ func prune(snaptree *snapshot.Tree, allRoots []common.Hash, maindb ethdb.Databas
}

// We assume state blooms do not need the value, only the key
func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBloom) error {
sdb := state.NewDatabase(db)
func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBloom, config *Config) error {
// Offline pruning is only supported in legacy hash based scheme.
hashConfig := *hashdb.Defaults
hashConfig.CleanCacheSize = config.CleanCacheSize * 1024 * 1024
trieConfig := &trie.Config{
Preimages: false,
HashDB: &hashConfig,
}
sdb := state.NewDatabaseWithConfig(db, trieConfig)
defer sdb.TrieDB().Close()
tr, err := sdb.OpenTrie(root)
if err != nil {
return err
Expand All @@ -350,11 +364,12 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
// To do so, we create a semaphore out of a channel's buffer.
// Before launching a new goroutine, we acquire the semaphore by taking an entry from this channel.
// This channel doubles as a mechanism for the background goroutine to report an error on release.
threads := runtime.NumCPU()
threads := config.Threads
results := make(chan error, threads)
for i := 0; i < threads; i++ {
results <- nil
}
var threadsRunning atomic.Int32

for accountIt.Next(true) {
accountTrieHash := accountIt.Hash()
Expand Down Expand Up @@ -385,7 +400,10 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
output.Put(data.CodeHash, nil)
}
if data.Root != (common.Hash{}) {
storageTr, err := trie.NewStateTrie(trie.StorageTrieID(root, key, data.Root), sdb.TrieDB())
// note: we are passing data.Root as stateRoot here, to skip the check for stateRoot existence in trie.newTrieReader,
// we already check that when opening state trie and reading the account node
trieID := trie.StorageTrieID(data.Root, key, data.Root)
storageTr, err := trie.NewStateTrie(trieID, sdb.TrieDB())
if err != nil {
return err
}
Expand All @@ -394,14 +412,20 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
return err
}
go func() {
threadsRunning.Add(1)
defer threadsRunning.Add(-1)
var err error
defer func() {
results <- err
}()
threadStartedAt := time.Now()
threadLastLog := time.Now()

storageIt, err := storageTr.NodeIterator(nil)
if err != nil {
return
}
var processedNodes uint64
for storageIt.Next(true) {
storageTrieHash := storageIt.Hash()
if storageTrieHash != (common.Hash{}) {
Expand All @@ -411,6 +435,13 @@ func dumpRawTrieDescendants(db ethdb.Database, root common.Hash, output *stateBl
return
}
}
processedNodes++
if time.Since(threadLastLog) > 5*time.Minute {
elapsedTotal := time.Since(startedAt)
elapsedThread := time.Since(threadStartedAt)
log.Info("traversing trie database - traversing storage trie taking long", "key", key, "elapsedTotal", elapsedTotal, "elapsedThread", elapsedThread, "processedNodes", processedNodes, "threadsRunning", threadsRunning.Load())
threadLastLog = time.Now()
}
}
err = storageIt.Error()
if err != nil {
Expand Down Expand Up @@ -445,7 +476,7 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
return err
}
if bloomExists {
return RecoverPruning(p.config.Datadir, p.db)
return RecoverPruning(p.config.Datadir, p.db, p.config.Threads)
}
// Retrieve all snapshot layers from the current HEAD.
// In theory there are 128 difflayers + 1 disk layer present,
Expand Down Expand Up @@ -511,14 +542,14 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
return err
}
} else {
if err := dumpRawTrieDescendants(p.db, root, p.stateBloom); err != nil {
if err := dumpRawTrieDescendants(p.db, root, p.stateBloom, &p.config); err != nil {
return err
}
}
}
// Traverse the genesis, put all genesis state entries into the
// bloom filter too.
if err := extractGenesis(p.db, p.stateBloom); err != nil {
if err := extractGenesis(p.db, p.stateBloom, &p.config); err != nil {
return err
}

Expand All @@ -529,7 +560,7 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
return err
}
log.Info("State bloom filter committed", "name", filterName, "roots", roots)
return prune(p.snaptree, roots, p.db, p.stateBloom, filterName, start)
return prune(p.snaptree, roots, p.db, p.stateBloom, filterName, start, p.config.Threads)
}

// RecoverPruning will resume the pruning procedure during the system restart.
Expand All @@ -539,7 +570,7 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
// pruning can be resumed. What's more if the bloom filter is constructed, the
// pruning **has to be resumed**. Otherwise a lot of dangling nodes may be left
// in the disk.
func RecoverPruning(datadir string, db ethdb.Database) error {
func RecoverPruning(datadir string, db ethdb.Database, threads int) error {
exists, err := bloomFilterExists(datadir)
if err != nil {
return err
Expand Down Expand Up @@ -578,12 +609,12 @@ func RecoverPruning(datadir string, db ethdb.Database) error {
}
log.Info("Loaded state bloom filter", "path", stateBloomPath, "roots", stateBloomRoots)

return prune(snaptree, stateBloomRoots, db, stateBloom, stateBloomPath, time.Now())
return prune(snaptree, stateBloomRoots, db, stateBloom, stateBloomPath, time.Now(), threads)
}

// extractGenesis loads the genesis state and commits all the state entries
// into the given bloomfilter.
func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error {
func extractGenesis(db ethdb.Database, stateBloom *stateBloom, config *Config) error {
genesisHash := rawdb.ReadCanonicalHash(db, 0)
if genesisHash == (common.Hash{}) {
return errors.New("missing genesis hash")
Expand All @@ -593,7 +624,7 @@ func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error {
return errors.New("missing genesis block")
}

return dumpRawTrieDescendants(db, genesis.Root(), stateBloom)
return dumpRawTrieDescendants(db, genesis.Root(), stateBloom, config)
}

func bloomFilterPath(datadir string) string {
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
// Try to recover offline state pruning only in hash-based.
if scheme == rawdb.HashScheme {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb, 1); err != nil {
log.Error("Failed to recover state", "error", err)
}
}
Expand Down
Loading