diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index f96d51ce0e..7e9cf1dbad 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -968,8 +968,16 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos) return false } - err = s.exec.DigestMessage(pos, msg) - if err != nil { + var msgForPrefetch *arbostypes.MessageWithMetadata + if pos+1 < msgCount { + msg, err := s.GetMessage(pos + 1) + if err != nil { + log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos+1) + return false + } + msgForPrefetch = msg + } + if err = s.exec.DigestMessage(pos, msg, msgForPrefetch); err != nil { logger := log.Warn if prevMessageCount < msgCount { logger = log.Debug diff --git a/arbstate/inbox.go b/arbstate/inbox.go index cf8f61e97a..fcb1c1ebcb 100644 --- a/arbstate/inbox.go +++ b/arbstate/inbox.go @@ -75,6 +75,9 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash } payload := data[40:] + // Stage 1: Extract the payload from any data availability header. + // It's important that multiple DAS strategies can't both be invoked in the same batch, + // as these headers are validated by the sequencer inbox and not other DASs. if len(payload) > 0 && IsDASMessageHeaderByte(payload[0]) { if dasReader == nil { log.Error("No DAS Reader configured, but sequencer message found with DAS header") @@ -88,9 +91,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash return parsedMsg, nil } } - } - - if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) { + } else if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) { blobHashes := payload[1:] if len(blobHashes)%len(common.Hash{}) != 0 { return nil, fmt.Errorf("blob batch data is not a list of hashes as expected") @@ -115,6 +116,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash } } + // Stage 2: If enabled, decode the zero heavy payload (saves gas based on calldata charging). if len(payload) > 0 && IsZeroheavyEncodedHeaderByte(payload[0]) { pl, err := io.ReadAll(io.LimitReader(zeroheavy.NewZeroheavyDecoder(bytes.NewReader(payload[1:])), int64(maxZeroheavyDecompressedLen))) if err != nil { @@ -124,6 +126,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash payload = pl } + // Stage 3: Decompress the brotli payload and fill the parsedMsg.segments list. if len(payload) > 0 && IsBrotliMessageHeaderByte(payload[0]) { decompressed, err := arbcompress.Decompress(payload[1:], MaxDecompressedLen) if err == nil { diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 20e9ca6f3b..003159589a 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -41,6 +41,8 @@ type ExecutionEngine struct { nextScheduledVersionCheck time.Time // protected by the createBlocksMutex reorgSequencing bool + + prefetchBlock bool } func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) { @@ -71,6 +73,16 @@ func (s *ExecutionEngine) EnableReorgSequencing() { s.reorgSequencing = true } +func (s *ExecutionEngine) EnablePrefetchBlock() { + if s.Started() { + panic("trying to enable prefetch block after start") + } + if s.prefetchBlock { + panic("trying to enable prefetch block when already set") + } + s.prefetchBlock = true +} + func (s *ExecutionEngine) SetTransactionStreamer(streamer execution.TransactionStreamer) { if s.Started() { panic("trying to set transaction streamer after start") @@ -107,7 +119,11 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost return err } for i := range newMessages { - err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i]) + var msgForPrefetch *arbostypes.MessageWithMetadata + if i < len(newMessages)-1 { + msgForPrefetch = &newMessages[i] + } + err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch) if err != nil { return err } @@ -489,15 +505,20 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos))) } -func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { +// DigestMessage is used to create a block by executing msg against the latest state and storing it. +// Also, while creating a block by executing msg against the latest state, +// in parallel, creates a block by executing msgForPrefetch (msg+1) against the latest state +// but does not store the block. +// This helps in filling the cache, so that the next block creation is faster. +func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { if !s.createBlocksMutex.TryLock() { return errors.New("createBlock mutex held") } defer s.createBlocksMutex.Unlock() - return s.digestMessageWithBlockMutex(num, msg) + return s.digestMessageWithBlockMutex(num, msg, msgForPrefetch) } -func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { +func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { currentHeader, err := s.getCurrentHeader() if err != nil { return err @@ -511,11 +532,23 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, } startTime := time.Now() + var wg sync.WaitGroup + if s.prefetchBlock && msgForPrefetch != nil { + wg.Add(1) + go func() { + defer wg.Done() + _, _, _, err := s.createBlockFromNextMessage(msgForPrefetch) + if err != nil { + return + } + }() + } + block, statedb, receipts, err := s.createBlockFromNextMessage(msg) if err != nil { return err } - + wg.Wait() err = s.appendBlock(block, statedb, receipts, time.Since(startTime)) if err != nil { return err diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 00337cc355..1ad73febe7 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -311,8 +311,8 @@ func (n *ExecutionNode) StopAndWait() { // } } -func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { - return n.ExecEngine.DigestMessage(num, msg) +func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { + return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch) } func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error { return n.ExecEngine.Reorg(count, newMessages, oldMessages) diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 5db38cbb4d..9bc6f4378d 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -66,6 +66,7 @@ type SequencerConfig struct { MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"` NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"` NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"` + EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` } func (c *SequencerConfig) Validate() error { @@ -97,6 +98,7 @@ var DefaultSequencerConfig = SequencerConfig{ MaxTxDataSize: 95000, NonceFailureCacheSize: 1024, NonceFailureCacheExpiry: time.Second, + EnablePrefetchBlock: false, } var TestSequencerConfig = SequencerConfig{ @@ -112,6 +114,7 @@ var TestSequencerConfig = SequencerConfig{ MaxTxDataSize: 95000, NonceFailureCacheSize: 1024, NonceFailureCacheExpiry: time.Second, + EnablePrefetchBlock: false, } func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -127,6 +130,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-tx-data-size", DefaultSequencerConfig.MaxTxDataSize, "maximum transaction size the sequencer will accept") f.Int(prefix+".nonce-failure-cache-size", DefaultSequencerConfig.NonceFailureCacheSize, "number of transactions with too high of a nonce to keep in memory while waiting for their predecessor") f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high") + f.Bool(prefix+".enable-prefetch-block", DefaultSequencerConfig.EnablePrefetchBlock, "enable prefetching of blocks") } type txQueueItem struct { @@ -324,6 +328,9 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead } s.Pause() execEngine.EnableReorgSequencing() + if config.EnablePrefetchBlock { + execEngine.EnablePrefetchBlock() + } return s, nil } diff --git a/execution/interface.go b/execution/interface.go index 5f7c01719e..6761011a77 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -28,7 +28,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken") // always needed type ExecutionClient interface { - DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error + DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error HeadMessageNumber() (arbutil.MessageIndex, error) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) diff --git a/system_tests/pruning_test.go b/system_tests/pruning_test.go index ef82c0466e..e9e99dffcc 100644 --- a/system_tests/pruning_test.go +++ b/system_tests/pruning_test.go @@ -3,12 +3,14 @@ package arbtest import ( "context" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/trie" "github.com/offchainlabs/nitro/cmd/conf" "github.com/offchainlabs/nitro/cmd/pruning" "github.com/offchainlabs/nitro/execution/gethexec" @@ -32,35 +34,34 @@ func TestPruning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var dataDir string - - func() { - builder := NewNodeBuilder(ctx).DefaultConfig(t, true) - _ = builder.Build(t) - dataDir = builder.dataDir - l2cleanupDone := false - defer func() { - if !l2cleanupDone { - builder.L2.cleanup() - } - builder.L1.cleanup() - }() - builder.L2Info.GenerateAccount("User2") - var txs []*types.Transaction - for i := uint64(0); i < 200; i++ { - tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil) - txs = append(txs, tx) - err := builder.L2.Client.SendTransaction(ctx, tx) - Require(t, err) - } - for _, tx := range txs { - _, err := builder.L2.EnsureTxSucceeded(tx) - Require(t, err) + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + _ = builder.Build(t) + l2cleanupDone := false + defer func() { + if !l2cleanupDone { + builder.L2.cleanup() } - l2cleanupDone = true - builder.L2.cleanup() - t.Log("stopped l2 node") + builder.L1.cleanup() + }() + builder.L2Info.GenerateAccount("User2") + var txs []*types.Transaction + for i := uint64(0); i < 200; i++ { + tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil) + txs = append(txs, tx) + err := builder.L2.Client.SendTransaction(ctx, tx) + Require(t, err) + } + for _, tx := range txs { + _, err := builder.L2.EnsureTxSucceeded(tx) + Require(t, err) + } + lastBlock, err := builder.L2.Client.BlockNumber(ctx) + Require(t, err) + l2cleanupDone = true + builder.L2.cleanup() + t.Log("stopped l2 node") + func() { stack, err := node.New(builder.l2StackConfig) Require(t, err) defer stack.Close() @@ -105,15 +106,44 @@ func TestPruning(t *testing.T) { Fatal(t, "The db doesn't have less entries after pruning then before. Before:", chainDbEntriesBeforePruning, "After:", chainDbEntriesAfterPruning) } }() - builder := NewNodeBuilder(ctx).DefaultConfig(t, true) - builder.dataDir = dataDir - cancel = builder.Build(t) - defer cancel() - builder.L2Info.GenerateAccount("User2") + testClient, cleanup := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: builder.l2StackConfig}) + defer cleanup() + + currentBlock := uint64(0) + // wait for the chain to catch up + for currentBlock < lastBlock { + currentBlock, err = testClient.Client.BlockNumber(ctx) + Require(t, err) + time.Sleep(20 * time.Millisecond) + } + + currentBlock, err = testClient.Client.BlockNumber(ctx) + Require(t, err) + bc := testClient.ExecNode.Backend.ArbInterface().BlockChain() + triedb := bc.StateCache().TrieDB() + var start uint64 + if currentBlock+1 >= builder.execConfig.Caching.BlockCount { + start = currentBlock + 1 - builder.execConfig.Caching.BlockCount + } else { + start = 0 + } + for i := start; i <= currentBlock; i++ { + header := bc.GetHeaderByNumber(i) + _, err := bc.StateAt(header.Root) + Require(t, err) + tr, err := trie.New(trie.TrieID(header.Root), triedb) + Require(t, err) + it, err := tr.NodeIterator(nil) + Require(t, err) + for it.Next(true) { + } + Require(t, it.Error()) + } + tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil) - err := builder.L2.Client.SendTransaction(ctx, tx) + err = testClient.Client.SendTransaction(ctx, tx) Require(t, err) - _, err = builder.L2.EnsureTxSucceeded(tx) + _, err = testClient.EnsureTxSucceeded(tx) Require(t, err) }