From 8b86e0e5016f01323ab870fa10971154d242965e Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Thu, 11 Jan 2024 16:17:37 +0000 Subject: [PATCH 1/6] fix starting second node in pruning system test, add more checks --- system_tests/pruning_test.go | 96 +++++++++++++++++++++++------------- 1 file changed, 62 insertions(+), 34 deletions(-) diff --git a/system_tests/pruning_test.go b/system_tests/pruning_test.go index ef82c0466e..bdb6be6a3c 100644 --- a/system_tests/pruning_test.go +++ b/system_tests/pruning_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/trie" "github.com/offchainlabs/nitro/cmd/conf" "github.com/offchainlabs/nitro/cmd/pruning" "github.com/offchainlabs/nitro/execution/gethexec" @@ -32,35 +33,34 @@ func TestPruning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var dataDir string - - func() { - builder := NewNodeBuilder(ctx).DefaultConfig(t, true) - _ = builder.Build(t) - dataDir = builder.dataDir - l2cleanupDone := false - defer func() { - if !l2cleanupDone { - builder.L2.cleanup() - } - builder.L1.cleanup() - }() - builder.L2Info.GenerateAccount("User2") - var txs []*types.Transaction - for i := uint64(0); i < 200; i++ { - tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil) - txs = append(txs, tx) - err := builder.L2.Client.SendTransaction(ctx, tx) - Require(t, err) - } - for _, tx := range txs { - _, err := builder.L2.EnsureTxSucceeded(tx) - Require(t, err) + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + _ = builder.Build(t) + l2cleanupDone := false + defer func() { + if !l2cleanupDone { + builder.L2.cleanup() } - l2cleanupDone = true - builder.L2.cleanup() - t.Log("stopped l2 node") + builder.L1.cleanup() + }() + builder.L2Info.GenerateAccount("User2") + var txs []*types.Transaction + for i := uint64(0); i < 200; i++ { + tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil) + txs = append(txs, tx) + err := builder.L2.Client.SendTransaction(ctx, tx) + Require(t, err) + } + for _, tx := range txs { + _, err := builder.L2.EnsureTxSucceeded(tx) + Require(t, err) + } + lastBlock, err := builder.L2.Client.BlockNumber(ctx) + Require(t, err) + l2cleanupDone = true + builder.L2.cleanup() + t.Log("stopped l2 node") + func() { stack, err := node.New(builder.l2StackConfig) Require(t, err) defer stack.Close() @@ -105,15 +105,43 @@ func TestPruning(t *testing.T) { Fatal(t, "The db doesn't have less entries after pruning then before. Before:", chainDbEntriesBeforePruning, "After:", chainDbEntriesAfterPruning) } }() - builder := NewNodeBuilder(ctx).DefaultConfig(t, true) - builder.dataDir = dataDir - cancel = builder.Build(t) - defer cancel() - builder.L2Info.GenerateAccount("User2") + testClient, cleanup := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: builder.l2StackConfig}) + defer cleanup() + + currentBlock := uint64(0) + // wait for the chain to catch up + for currentBlock < lastBlock { + currentBlock, err = testClient.Client.BlockNumber(ctx) + Require(t, err) + } + + currentBlock, err = testClient.Client.BlockNumber(ctx) + Require(t, err) + bc := testClient.ExecNode.Backend.ArbInterface().BlockChain() + triedb := bc.StateCache().TrieDB() + var start uint64 + if currentBlock+1 >= builder.execConfig.Caching.BlockCount { + start = currentBlock + 1 - builder.execConfig.Caching.BlockCount + } else { + start = 0 + } + for i := start; i <= currentBlock; i++ { + header := bc.GetHeaderByNumber(i) + _, err := bc.StateAt(header.Root) + Require(t, err) + tr, err := trie.New(trie.TrieID(header.Root), triedb) + Require(t, err) + it, err := tr.NodeIterator(nil) + Require(t, err) + for it.Next(true) { + } + Require(t, it.Error()) + } + tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil) - err := builder.L2.Client.SendTransaction(ctx, tx) + err = testClient.Client.SendTransaction(ctx, tx) Require(t, err) - _, err = builder.L2.EnsureTxSucceeded(tx) + _, err = testClient.EnsureTxSucceeded(tx) Require(t, err) } From 3424519b0521a7e47d5b06ce06e3d4eb64c5bcd9 Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Sat, 13 Jan 2024 00:36:58 +0000 Subject: [PATCH 2/6] add delay while polling block number in pruning test --- system_tests/pruning_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/system_tests/pruning_test.go b/system_tests/pruning_test.go index bdb6be6a3c..e9e99dffcc 100644 --- a/system_tests/pruning_test.go +++ b/system_tests/pruning_test.go @@ -3,6 +3,7 @@ package arbtest import ( "context" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" @@ -114,6 +115,7 @@ func TestPruning(t *testing.T) { for currentBlock < lastBlock { currentBlock, err = testClient.Client.BlockNumber(ctx) Require(t, err) + time.Sleep(20 * time.Millisecond) } currentBlock, err = testClient.Client.BlockNumber(ctx) From 02e2be04e5d3e0e415e2119f1fee5202b5490a3c Mon Sep 17 00:00:00 2001 From: amsanghi Date: Mon, 22 Jan 2024 20:25:30 +0530 Subject: [PATCH 3/6] Prefetch state needed for future block executions by executing them in parallel against old state --- arbnode/transaction_streamer.go | 10 ++++++- execution/gethexec/executionengine.go | 38 +++++++++++++++++++++++---- execution/gethexec/node.go | 4 +-- execution/gethexec/sequencer.go | 7 +++++ execution/interface.go | 2 +- 5 files changed, 52 insertions(+), 9 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 24ef2a7cc4..5491cbdbf2 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -968,7 +968,15 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos) return false } - err = s.exec.DigestMessage(pos, msg) + var msgForPrefetch *arbostypes.MessageWithMetadata + if pos+1 < msgCount { + msgForPrefetch, err = s.GetMessage(pos + 1) + if err != nil { + log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos+1) + return false + } + } + err = s.exec.DigestMessage(pos, msg, msgForPrefetch) if err != nil { logger := log.Warn if prevMessageCount < msgCount { diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 58e91a197e..d376c59ba2 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -41,6 +41,8 @@ type ExecutionEngine struct { nextScheduledVersionCheck time.Time // protected by the createBlocksMutex reorgSequencing bool + + prefetchBlock bool } func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) { @@ -71,6 +73,16 @@ func (s *ExecutionEngine) EnableReorgSequencing() { s.reorgSequencing = true } +func (s *ExecutionEngine) EnablePrefetchBlock() { + if s.Started() { + panic("trying to enable prefetch block after start") + } + if s.prefetchBlock { + panic("trying to enable prefetch block when already set") + } + s.prefetchBlock = true +} + func (s *ExecutionEngine) SetTransactionStreamer(streamer execution.TransactionStreamer) { if s.Started() { panic("trying to set transaction streamer after start") @@ -107,7 +119,11 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost return err } for i := range newMessages { - err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i]) + var msgForPrefetch *arbostypes.MessageWithMetadata + if i < len(newMessages)-1 { + msgForPrefetch = &newMessages[i] + } + err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch) if err != nil { return err } @@ -486,15 +502,15 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos))) } -func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { +func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { if !s.createBlocksMutex.TryLock() { return errors.New("createBlock mutex held") } defer s.createBlocksMutex.Unlock() - return s.digestMessageWithBlockMutex(num, msg) + return s.digestMessageWithBlockMutex(num, msg, msgForPrefetch) } -func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { +func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { currentHeader, err := s.getCurrentHeader() if err != nil { return err @@ -508,11 +524,23 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, } startTime := time.Now() + var wg sync.WaitGroup + if s.prefetchBlock && msgForPrefetch != nil { + wg.Add(1) + go func() { + defer wg.Done() + _, _, _, err := s.createBlockFromNextMessage(msgForPrefetch) + if err != nil { + return + } + }() + } + block, statedb, receipts, err := s.createBlockFromNextMessage(msg) if err != nil { return err } - + wg.Wait() err = s.appendBlock(block, statedb, receipts, time.Since(startTime)) if err != nil { return err diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 00337cc355..1ad73febe7 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -311,8 +311,8 @@ func (n *ExecutionNode) StopAndWait() { // } } -func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error { - return n.ExecEngine.DigestMessage(num, msg) +func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { + return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch) } func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error { return n.ExecEngine.Reorg(count, newMessages, oldMessages) diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 5db38cbb4d..9bc6f4378d 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -66,6 +66,7 @@ type SequencerConfig struct { MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"` NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"` NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"` + EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` } func (c *SequencerConfig) Validate() error { @@ -97,6 +98,7 @@ var DefaultSequencerConfig = SequencerConfig{ MaxTxDataSize: 95000, NonceFailureCacheSize: 1024, NonceFailureCacheExpiry: time.Second, + EnablePrefetchBlock: false, } var TestSequencerConfig = SequencerConfig{ @@ -112,6 +114,7 @@ var TestSequencerConfig = SequencerConfig{ MaxTxDataSize: 95000, NonceFailureCacheSize: 1024, NonceFailureCacheExpiry: time.Second, + EnablePrefetchBlock: false, } func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -127,6 +130,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-tx-data-size", DefaultSequencerConfig.MaxTxDataSize, "maximum transaction size the sequencer will accept") f.Int(prefix+".nonce-failure-cache-size", DefaultSequencerConfig.NonceFailureCacheSize, "number of transactions with too high of a nonce to keep in memory while waiting for their predecessor") f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high") + f.Bool(prefix+".enable-prefetch-block", DefaultSequencerConfig.EnablePrefetchBlock, "enable prefetching of blocks") } type txQueueItem struct { @@ -324,6 +328,9 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead } s.Pause() execEngine.EnableReorgSequencing() + if config.EnablePrefetchBlock { + execEngine.EnablePrefetchBlock() + } return s, nil } diff --git a/execution/interface.go b/execution/interface.go index ef9409b9c1..414c31f64e 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -28,7 +28,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken") // always needed type ExecutionClient interface { - DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error + DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error HeadMessageNumber() (arbutil.MessageIndex, error) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) From 4bcb5ec449e6a5d286e7bb6f7ee7739aa8674bef Mon Sep 17 00:00:00 2001 From: amsanghi Date: Tue, 23 Jan 2024 16:29:18 +0530 Subject: [PATCH 4/6] Fix --- arbnode/transaction_streamer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 5491cbdbf2..f7ccae678c 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -970,14 +970,14 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution } var msgForPrefetch *arbostypes.MessageWithMetadata if pos+1 < msgCount { - msgForPrefetch, err = s.GetMessage(pos + 1) + msg, err := s.GetMessage(pos + 1) if err != nil { log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos+1) return false } + msgForPrefetch = msg } - err = s.exec.DigestMessage(pos, msg, msgForPrefetch) - if err != nil { + if err = s.exec.DigestMessage(pos, msg, msgForPrefetch); err != nil { logger := log.Warn if prevMessageCount < msgCount { logger = log.Debug From 2694d8e70f5b0c932fcaabe9b9a5b3bff352dff5 Mon Sep 17 00:00:00 2001 From: amsanghi Date: Tue, 23 Jan 2024 17:00:18 +0530 Subject: [PATCH 5/6] Add comments --- execution/gethexec/executionengine.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index d376c59ba2..e941faf9f9 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -502,6 +502,11 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos))) } +// DigestMessage is used to create a block by executing msg against the latest state and storing it. +// Also, while creating a block by executing msg against the latest state, +// in parallel, creates a block by executing msgForPrefetch (msg+1) against the latest state +// but does not store the block. +// This helps in filling the cache, so that the next block creation is faster. func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error { if !s.createBlocksMutex.TryLock() { return errors.New("createBlock mutex held") From 5a54d22a7e80023c347b3ebfdb9a80b28d67e934 Mon Sep 17 00:00:00 2001 From: Lee Bousfield Date: Mon, 29 Jan 2024 15:36:56 -0700 Subject: [PATCH 6/6] Prevent a 4844 header from being used inside Anytrust data --- arbstate/inbox.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/arbstate/inbox.go b/arbstate/inbox.go index cf8f61e97a..fcb1c1ebcb 100644 --- a/arbstate/inbox.go +++ b/arbstate/inbox.go @@ -75,6 +75,9 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash } payload := data[40:] + // Stage 1: Extract the payload from any data availability header. + // It's important that multiple DAS strategies can't both be invoked in the same batch, + // as these headers are validated by the sequencer inbox and not other DASs. if len(payload) > 0 && IsDASMessageHeaderByte(payload[0]) { if dasReader == nil { log.Error("No DAS Reader configured, but sequencer message found with DAS header") @@ -88,9 +91,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash return parsedMsg, nil } } - } - - if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) { + } else if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) { blobHashes := payload[1:] if len(blobHashes)%len(common.Hash{}) != 0 { return nil, fmt.Errorf("blob batch data is not a list of hashes as expected") @@ -115,6 +116,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash } } + // Stage 2: If enabled, decode the zero heavy payload (saves gas based on calldata charging). if len(payload) > 0 && IsZeroheavyEncodedHeaderByte(payload[0]) { pl, err := io.ReadAll(io.LimitReader(zeroheavy.NewZeroheavyDecoder(bytes.NewReader(payload[1:])), int64(maxZeroheavyDecompressedLen))) if err != nil { @@ -124,6 +126,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash payload = pl } + // Stage 3: Decompress the brotli payload and fill the parsedMsg.segments list. if len(payload) > 0 && IsBrotliMessageHeaderByte(payload[0]) { decompressed, err := arbcompress.Decompress(payload[1:], MaxDecompressedLen) if err == nil {