Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi committed Feb 26, 2024
1 parent 2c84219 commit 8e8ca11
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 23 deletions.
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, nil)
tracker, err := NewInboxTracker(db, streamer, nil, nil, 0)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
20 changes: 11 additions & 9 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,15 +521,17 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
} else if haveAcc != beforeAcc {
reorgingDelayed = true
}
for len(delayedMessages) > 0 {
message := delayedMessages[0]
beforeCount, err := message.Message.Header.SeqNum()
if err != nil {
return err
}
if beforeCount < r.config().FirstBatch {
delayedMessages = delayedMessages[1:]
}
}
for len(delayedMessages) > 0 {
message := delayedMessages[0]
beforeCount, err := message.Message.Header.SeqNum()
if err != nil {
return err
}
if beforeCount < r.config().FirstBatch {
delayedMessages = delayedMessages[1:]
} else {
break
}
}
} else if missingDelayed && to.Cmp(currentHeight) >= 0 {
Expand Down
14 changes: 8 additions & 6 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ type InboxTracker struct {
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
blobReader arbstate.BlobReader
firstBatch uint64

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader, firstBatch uint64) (*InboxTracker, error) {
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil {
return nil, errors.New("data availability service required but unconfigured")
Expand All @@ -54,6 +55,7 @@ func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arb
txStreamer: txStreamer,
das: das,
blobReader: blobReader,
firstBatch: firstBatch,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
}
return tracker, nil
Expand Down Expand Up @@ -355,7 +357,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR
}

var nextAcc common.Hash
if pos > 0 {
if pos > t.firstBatch {
var err error
nextAcc, err = t.GetDelayedAcc(pos - 1)
if err != nil {
Expand All @@ -377,7 +379,7 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR
return fmt.Errorf("unexpected delayed sequence number %v, expected %v", seqNum, pos)
}

if nextAcc != message.BeforeInboxAcc {
if seqNum > t.firstBatch && nextAcc != message.BeforeInboxAcc {
return fmt.Errorf("previous delayed accumulator mismatch for message %v", seqNum)
}
nextAcc = message.AfterInboxAcc()
Expand Down Expand Up @@ -553,7 +555,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
startPos := pos
var nextAcc common.Hash
var prevbatchmeta BatchMetadata
if pos > 0 {
if pos > t.firstBatch {
var err error
prevbatchmeta, err = t.GetBatchMetadata(pos - 1)
nextAcc = prevbatchmeta.Accumulator
Expand All @@ -574,11 +576,11 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
if batch.SequenceNumber != pos {
return errors.New("unexpected batch sequence number")
}
if nextAcc != batch.BeforeInboxAcc {
if pos > t.firstBatch && nextAcc != batch.BeforeInboxAcc {
return errors.New("previous batch accumulator mismatch")
}

if batch.AfterDelayedCount > 0 {
if batch.AfterDelayedCount > t.firstBatch {
haveDelayedAcc, err := t.GetDelayedAcc(batch.AfterDelayedCount - 1)
if errors.Is(err, AccumulatorNotFoundErr) {
// We somehow missed a referenced delayed message; go back and look for it
Expand Down
2 changes: 1 addition & 1 deletion arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func createNodeImpl(
return nil, errors.New("a data availability service is required for this chain, but it was not configured")
}

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader)
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader, configFetcher.Get().InboxReader.FirstBatch)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/pruning/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node
return nil, fmt.Errorf("failed to get finalized block: %w", err)
}
l1BlockNum := l1Block.NumberU64()
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil)
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil, 0)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions system_tests/a_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ func TestA(t *testing.T) {
Require(t, err)
firstRoundBatches, err = seqInbox.LookupBatchesInRange(ctx, new(big.Int).SetUint64(startL1Block), new(big.Int).SetUint64(endL1Block))
Require(t, err)
batchCount, err := testClientB.ExecNode.HeadMessageNumber()
batchCount, err := testClientB.ConsensusNode.InboxTracker.GetBatchCount()
Require(t, err)
if uint64(len(firstRoundBatches)) > uint64(batchCount) {
//time.Sleep(1 * time.Second)
if uint64(len(firstRoundBatches)) > batchCount {
time.Sleep(1 * time.Second)
} else {
break
}
Expand Down
3 changes: 1 addition & 2 deletions system_tests/staker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ import (
)

func makeBackgroundTxs(ctx context.Context, builder *NodeBuilder) error {
for i := uint64(0); ctx.Err() == nil; i++ {
builder.L2Info.Accounts["BackgroundUser"].Nonce = i
for ctx.Err() == nil {
tx := builder.L2Info.PrepareTx("BackgroundUser", "BackgroundUser", builder.L2Info.TransferGas, common.Big0, nil)
err := builder.L2.Client.SendTransaction(ctx, tx)
if err != nil {
Expand Down

0 comments on commit 8e8ca11

Please sign in to comment.