Skip to content

Commit

Permalink
[config change] Improve BlocksReExecutor implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Oct 1, 2024
1 parent 6b37f8a commit 9968d4c
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 47 deletions.
153 changes: 109 additions & 44 deletions blocks_reexecutor/blocks_reexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,27 @@ import (
"strings"

"github.com/ethereum/go-ethereum/arbitrum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/stopwaiter"
flag "github.com/spf13/pflag"
)

type Config struct {
Enable bool `koanf:"enable"`
Mode string `koanf:"mode"`
StartBlock uint64 `koanf:"start-block"`
EndBlock uint64 `koanf:"end-block"`
Room int `koanf:"room"`
BlocksPerThread uint64 `koanf:"blocks-per-thread"`
Enable bool `koanf:"enable"`
Mode string `koanf:"mode"`
StartBlock uint64 `koanf:"start-block"`
EndBlock uint64 `koanf:"end-block"`
Room int `koanf:"room"`
MinBlocksPerThread uint64 `koanf:"min-blocks-per-thread"`
}

func (c *Config) Validate() error {
Expand All @@ -48,10 +53,10 @@ var DefaultConfig = Config{
}

var TestConfig = Config{
Enable: true,
Mode: "full",
Room: runtime.NumCPU(),
BlocksPerThread: 10,
Enable: true,
Mode: "full",
Room: runtime.NumCPU(),
MinBlocksPerThread: 10,
}

func ConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -60,22 +65,26 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".start-block", DefaultConfig.StartBlock, "first block number of the block range for re-execution")
f.Uint64(prefix+".end-block", DefaultConfig.EndBlock, "last block number of the block range for re-execution")
f.Int(prefix+".room", DefaultConfig.Room, "number of threads to parallelize blocks re-execution")
f.Uint64(prefix+".blocks-per-thread", DefaultConfig.BlocksPerThread, "minimum number of blocks to execute per thread. When mode is random this acts as the size of random block range sample")
f.Uint64(prefix+".min-blocks-per-thread", DefaultConfig.MinBlocksPerThread, "minimum number of blocks to execute per thread. When mode is random this acts as the size of random block range sample")
}

type BlocksReExecutor struct {
stopwaiter.StopWaiter
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
blocksPerThread uint64
config *Config
db state.Database
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
minBlocksPerThread uint64
}

func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *BlocksReExecutor {
func New(c *Config, blockchain *core.BlockChain, ethDb ethdb.Database, fatalErrChan chan error) (*BlocksReExecutor, error) {
if blockchain.TrieDB().Scheme() == rawdb.PathScheme {
return nil, errors.New("blocksReExecutor not supported on pathdb")
}
start := c.StartBlock
end := c.EndBlock
chainStart := blockchain.Config().ArbitrumChainParams.GenesisBlockNum
Expand All @@ -92,13 +101,13 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd)
end = chainEnd
}
blocksPerThread := uint64(10000)
if c.BlocksPerThread != 0 {
blocksPerThread = c.BlocksPerThread
minBlocksPerThread := uint64(10000)
if c.MinBlocksPerThread != 0 {
minBlocksPerThread = c.MinBlocksPerThread
}
if c.Mode == "random" && end != start {
// Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly
rng := blocksPerThread
// Reexecute a range of 10000 or (non-zero) c.MinBlocksPerThread number of blocks between start to end picked randomly
rng := minBlocksPerThread
if rng > end-start {
rng = end - start
}
Expand All @@ -111,32 +120,41 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
if start > 0 && start != chainStart {
start--
}
// Divide work equally among available threads when BlocksPerThread is zero
if c.BlocksPerThread == 0 {
// Divide work equally among available threads when MinBlocksPerThread is zero
if c.MinBlocksPerThread == 0 {
// #nosec G115
work := (end - start) / uint64(c.Room)
if work > 0 {
blocksPerThread = work
minBlocksPerThread = work
}
}
return &BlocksReExecutor{
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
blocksPerThread: blocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
state, err := blockchain.StateAt(header.Root)
return state, arbitrum.NoopStateRelease, err
},
trieConfig := triedb.Config{
Preimages: false,
HashDB: hashdb.Defaults,
}
blocksReExecutor := &BlocksReExecutor{
config: c,
db: state.NewDatabaseWithConfig(ethDb, &trieConfig),
blockchain: blockchain,
currentBlock: end,
startBlock: start,
minBlocksPerThread: minBlocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
}
blocksReExecutor.stateFor = func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
sdb, err := state.NewDeterministic(header.Root, blocksReExecutor.db)
if err == nil {
_ = blocksReExecutor.db.TrieDB().Reference(header.Root, common.Hash{}) // Will be dereferenced later in advanceStateUpToBlock
}
return sdb, arbitrum.NoopStateRelease, err
}
return blocksReExecutor, nil
}

// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state
// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.MinBlocksPerThread, currentBlock] to the last available valid state
func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 {
start := arbmath.SaturatingUSub(currentBlock, s.blocksPerThread)
start := arbmath.SaturatingUSub(currentBlock, s.minBlocksPerThread)
if start < s.startBlock {
start = s.startBlock
}
Expand All @@ -149,8 +167,8 @@ func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentB
defer release()
start = startHeader.Number.Uint64()
s.LaunchThread(func(ctx context.Context) {
_, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil)
if err != nil {
log.Info("Starting reexecution of blocks against historic state", "stateAt", start, "startBlock", start+1, "endBlock", currentBlock)
if err := s.advanceStateUpToBlock(ctx, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader); err != nil {
s.fatalErrChan <- fmt.Errorf("blocksReExecutor errored advancing state from block %d to block %d, err: %w", start, currentBlock, err)
} else {
log.Info("Successfully reexecuted blocks against historic state", "stateAt", start, "startBlock", start+1, "endBlock", currentBlock)
Expand Down Expand Up @@ -199,3 +217,50 @@ func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
func (s *BlocksReExecutor) StopAndWait() {
s.StopWaiter.StopAndWait()
}

func (s *BlocksReExecutor) commitStateAndVerify(statedb *state.StateDB, expected common.Hash, blockNumber uint64) (*state.StateDB, error) {
result, err := statedb.Commit(blockNumber, true)
if err != nil {
return nil, err
}
if result != expected {
return nil, fmt.Errorf("bad root hash expected: %v got: %v", expected, result)
}
_ = s.db.TrieDB().Reference(result, common.Hash{})
return state.New(result, statedb.Database(), nil)
}

func (s *BlocksReExecutor) advanceStateUpToBlock(ctx context.Context, state *state.StateDB, targetHeader *types.Header, lastAvailableHeader *types.Header) error {
targetBlockNumber := targetHeader.Number.Uint64()
blockToRecreate := lastAvailableHeader.Number.Uint64() + 1
prevHash := lastAvailableHeader.Hash()
lastRoot := lastAvailableHeader.Root
defer func() {
if (lastRoot != common.Hash{}) {
_ = s.db.TrieDB().Dereference(lastRoot)
}
}()
var block *types.Block
var err error
for ctx.Err() == nil {
state, block, err = arbitrum.AdvanceStateByBlock(ctx, s.blockchain, state, targetHeader, blockToRecreate, prevHash, nil)
if err != nil {
return err
}
prevHash = block.Hash()
state, err = s.commitStateAndVerify(state, block.Root(), block.NumberU64())
if err != nil {
return fmt.Errorf("failed committing state for block %d : %w", blockToRecreate, err)
}
_ = s.db.TrieDB().Dereference(lastRoot)
lastRoot = block.Root()
if blockToRecreate >= targetBlockNumber {
if block.Hash() != targetHeader.Hash() {
return fmt.Errorf("blockHash doesn't match when recreating number: %d expected: %v got: %v", blockToRecreate, targetHeader.Hash(), block.Hash())
}
return nil
}
blockToRecreate++
}
return ctx.Err()
}
6 changes: 5 additions & 1 deletion cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,11 @@ func mainImpl() int {

var blocksReExecutor *blocksreexecutor.BlocksReExecutor
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
blocksReExecutor, err = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, chainDb, fatalErrChan)
if err != nil {
log.Error("error initializing blocksReExecutor", "err", err)
return 1
}
if nodeConfig.Init.ThenQuit {
if err := gethexec.PopulateStylusTargetCache(&nodeConfig.Execution.StylusTarget); err != nil {
log.Error("error populating stylus target cache", "err", err)
Expand Down
8 changes: 6 additions & 2 deletions system_tests/blocks_reexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
blocksreexecutor "github.com/offchainlabs/nitro/blocks_reexecutor"
)

Expand All @@ -13,6 +14,7 @@ func TestBlocksReExecutorModes(t *testing.T) {
defer cancel()

builder := NewNodeBuilder(ctx).DefaultConfig(t, false)
builder.execConfig.Caching.StateScheme = rawdb.HashScheme
cleanup := builder.Build(t)
defer cleanup()

Expand All @@ -37,7 +39,8 @@ func TestBlocksReExecutorModes(t *testing.T) {

// Reexecute blocks at mode full
success := make(chan struct{})
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull, err := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, builder.L2.ExecNode.ChainDB, feedErrChan)
Require(t, err)
executorFull.Start(ctx, success)
select {
case err := <-feedErrChan:
Expand All @@ -49,7 +52,8 @@ func TestBlocksReExecutorModes(t *testing.T) {
success = make(chan struct{})
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom, err := blocksreexecutor.New(c, blockchain, builder.L2.ExecNode.ChainDB, feedErrChan)
Require(t, err)
executorRandom.Start(ctx, success)
select {
case err := <-feedErrChan:
Expand Down

0 comments on commit 9968d4c

Please sign in to comment.