diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index beb2656e2b..8572673fba 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) { defer cancel() exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{}) - tracker, err := NewInboxTracker(db, streamer, nil, nil) + tracker, err := NewInboxTracker(db, streamer, nil, nil, 0) Require(t, err) err = streamer.Start(ctx) diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index ca5897c0c3..59ca658b34 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -521,15 +521,17 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } else if haveAcc != beforeAcc { reorgingDelayed = true } - for len(delayedMessages) > 0 { - message := delayedMessages[0] - beforeCount, err := message.Message.Header.SeqNum() - if err != nil { - return err - } - if beforeCount < r.config().FirstBatch { - delayedMessages = delayedMessages[1:] - } + } + for len(delayedMessages) > 0 { + message := delayedMessages[0] + beforeCount, err := message.Message.Header.SeqNum() + if err != nil { + return err + } + if beforeCount < r.config().FirstBatch { + delayedMessages = delayedMessages[1:] + } else { + break } } } else if missingDelayed && to.Cmp(currentHeight) >= 0 { diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index f98f93a3eb..fa82c5b675 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -39,12 +39,13 @@ type InboxTracker struct { validator *staker.BlockValidator das arbstate.DataAvailabilityReader blobReader arbstate.BlobReader + firstBatch uint64 batchMetaMutex sync.Mutex batchMeta *containers.LruCache[uint64, BatchMetadata] } -func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader) (*InboxTracker, error) { +func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader, firstBatch uint64) (*InboxTracker, error) { // We support a nil txStreamer for the pruning code if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil { return nil, errors.New("data availability service required but unconfigured") @@ -54,6 +55,7 @@ func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arb txStreamer: txStreamer, das: das, blobReader: blobReader, + firstBatch: firstBatch, batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000), } return tracker, nil @@ -355,7 +357,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR } var nextAcc common.Hash - if pos > 0 { + if pos > t.firstBatch { var err error nextAcc, err = t.GetDelayedAcc(pos - 1) if err != nil { @@ -377,7 +379,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR return fmt.Errorf("unexpected delayed sequence number %v, expected %v", seqNum, pos) } - if nextAcc != message.BeforeInboxAcc { + if seqNum > t.firstBatch && nextAcc != message.BeforeInboxAcc { return fmt.Errorf("previous delayed accumulator mismatch for message %v", seqNum) } nextAcc = message.AfterInboxAcc() @@ -553,7 +555,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L startPos := pos var nextAcc common.Hash var prevbatchmeta BatchMetadata - if pos > 0 { + if pos > t.firstBatch { var err error prevbatchmeta, err = t.GetBatchMetadata(pos - 1) nextAcc = prevbatchmeta.Accumulator @@ -574,11 +576,11 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L if batch.SequenceNumber != pos { return errors.New("unexpected batch sequence number") } - if nextAcc != batch.BeforeInboxAcc { + if pos > t.firstBatch && nextAcc != batch.BeforeInboxAcc { return errors.New("previous batch accumulator mismatch") } - if batch.AfterDelayedCount > 0 { + if batch.AfterDelayedCount > t.firstBatch { haveDelayedAcc, err := t.GetDelayedAcc(batch.AfterDelayedCount - 1) if errors.Is(err, AccumulatorNotFoundErr) { // We somehow missed a referenced delayed message; go back and look for it diff --git a/arbnode/node.go b/arbnode/node.go index 9f66710623..83009c68db 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -535,7 +535,7 @@ func createNodeImpl( return nil, errors.New("a data availability service is required for this chain, but it was not configured") } - inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader) + inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader, configFetcher.Get().InboxReader.FirstBatch) if err != nil { return nil, err } diff --git a/cmd/pruning/pruning.go b/cmd/pruning/pruning.go index da015ac52c..c93479f087 100644 --- a/cmd/pruning/pruning.go +++ b/cmd/pruning/pruning.go @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node return nil, fmt.Errorf("failed to get finalized block: %w", err) } l1BlockNum := l1Block.NumberU64() - tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil) + tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil, 0) if err != nil { return nil, err } diff --git a/system_tests/a_test.go b/system_tests/a_test.go index cc14e64d68..2c2bf63195 100644 --- a/system_tests/a_test.go +++ b/system_tests/a_test.go @@ -65,10 +65,10 @@ func TestA(t *testing.T) { Require(t, err) firstRoundBatches, err = seqInbox.LookupBatchesInRange(ctx, new(big.Int).SetUint64(startL1Block), new(big.Int).SetUint64(endL1Block)) Require(t, err) - batchCount, err := testClientB.ExecNode.HeadMessageNumber() + batchCount, err := testClientB.ConsensusNode.InboxTracker.GetBatchCount() Require(t, err) - if uint64(len(firstRoundBatches)) > uint64(batchCount) { - //time.Sleep(1 * time.Second) + if uint64(len(firstRoundBatches)) > batchCount { + time.Sleep(1 * time.Second) } else { break } diff --git a/system_tests/staker_test.go b/system_tests/staker_test.go index 62e89ff782..12e9aed7cc 100644 --- a/system_tests/staker_test.go +++ b/system_tests/staker_test.go @@ -42,8 +42,7 @@ import ( ) func makeBackgroundTxs(ctx context.Context, builder *NodeBuilder) error { - for i := uint64(0); ctx.Err() == nil; i++ { - builder.L2Info.Accounts["BackgroundUser"].Nonce = i + for ctx.Err() == nil { tx := builder.L2Info.PrepareTx("BackgroundUser", "BackgroundUser", builder.L2Info.TransferGas, common.Big0, nil) err := builder.L2.Client.SendTransaction(ctx, tx) if err != nil {