Skip to content

Commit

Permalink
Test
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi committed Feb 26, 2024
1 parent 52da827 commit 2c84219
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 26 deletions.
6 changes: 3 additions & 3 deletions arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
},
},
}
err = tracker.AddDelayedMessages([]*DelayedInboxMessage{initMsgDelayed, userDelayed}, false, 0)
err = tracker.AddDelayedMessages([]*DelayedInboxMessage{initMsgDelayed, userDelayed}, false)
Require(t, err)

serializedInitMsgBatch := make([]byte, 40)
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
bridgeAddress: [20]byte{},
serialized: serializedUserMsgBatch,
}
err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{initMsgBatch, userMsgBatch, emptyBatch}, 0)
err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{initMsgBatch, userMsgBatch, emptyBatch})
Require(t, err)

// Reorg out the user delayed message
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
bridgeAddress: [20]byte{},
serialized: serializedInitMsgBatch,
}
err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{emptyBatch}, 0)
err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{emptyBatch})
Require(t, err)

msgCount, err = streamer.GetMessageCount()
Expand Down
33 changes: 22 additions & 11 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var DefaultInboxReaderConfig = InboxReaderConfig{
TargetMessagesRead: 500,
MaxBlocksToRead: 2000,
ReadMode: "latest",
FirstBatch: 100,
FirstBatch: 0,
}

var TestInboxReaderConfig = InboxReaderConfig{
Expand All @@ -82,7 +82,7 @@ var TestInboxReaderConfig = InboxReaderConfig{
TargetMessagesRead: 500,
MaxBlocksToRead: 2000,
ReadMode: "latest",
FirstBatch: 100,
FirstBatch: 0,
}

type InboxReader struct {
Expand Down Expand Up @@ -344,7 +344,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
}
if checkingDelayedCount > 0 && checkingDelayedCount > r.config().FirstBatch {
if checkingDelayedCount > 0 {
checkingDelayedSeqNum := checkingDelayedCount - 1
l1DelayedAcc, err := r.delayedBridge.GetAccumulator(ctx, checkingDelayedSeqNum, currentHeight, common.Hash{})
if err != nil {
Expand Down Expand Up @@ -380,7 +380,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
}
if checkingBatchCount > 0 && checkingBatchCount > r.config().FirstBatch {
if checkingBatchCount > 0 {
checkingBatchSeqNum := checkingBatchCount - 1
l1BatchAcc, err := r.sequencerInbox.GetAccumulator(ctx, checkingBatchSeqNum, currentHeight)
if err != nil {
Expand Down Expand Up @@ -462,7 +462,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
missingSequencer = false
reorgingSequencer = false
firstBatch := sequencerBatches[0]
if firstBatch.SequenceNumber > 0 {
if firstBatch.SequenceNumber > r.config().FirstBatch {
haveAcc, err := r.tracker.GetBatchAcc(firstBatch.SequenceNumber - 1)
if errors.Is(err, AccumulatorNotFoundErr) {
reorgingSequencer = true
Expand All @@ -476,6 +476,10 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
// Skip any batches we already have in the database
for len(sequencerBatches) > 0 {
batch := sequencerBatches[0]
if batch.SequenceNumber < r.config().FirstBatch {
sequencerBatches = sequencerBatches[1:]
continue
}
haveAcc, err := r.tracker.GetBatchAcc(batch.SequenceNumber)
if errors.Is(err, AccumulatorNotFoundErr) {
// This batch is new
Expand Down Expand Up @@ -508,7 +512,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if err != nil {
return err
}
if beforeCount > 0 {
if beforeCount > r.config().FirstBatch {
haveAcc, err := r.tracker.GetDelayedAcc(beforeCount - 1)
if errors.Is(err, AccumulatorNotFoundErr) {
reorgingDelayed = true
Expand All @@ -517,6 +521,16 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
} else if haveAcc != beforeAcc {
reorgingDelayed = true
}
for len(delayedMessages) > 0 {
message := delayedMessages[0]
beforeCount, err := message.Message.Header.SeqNum()
if err != nil {
return err
}
if beforeCount < r.config().FirstBatch {
delayedMessages = delayedMessages[1:]
}
}
}
} else if missingDelayed && to.Cmp(currentHeight) >= 0 {
// We were missing delayed messages but didn't find any.
Expand Down Expand Up @@ -575,11 +589,11 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}

func (r *InboxReader) addMessages(ctx context.Context, sequencerBatches []*SequencerInboxBatch, delayedMessages []*DelayedInboxMessage) (bool, error) {
err := r.tracker.AddDelayedMessages(delayedMessages, r.config().HardReorg, r.config().FirstBatch)
err := r.tracker.AddDelayedMessages(delayedMessages, r.config().HardReorg)
if err != nil {
return false, err
}
err = r.tracker.AddSequencerBatches(ctx, r.client, sequencerBatches, r.config().FirstBatch)
err = r.tracker.AddSequencerBatches(ctx, r.client, sequencerBatches)
if errors.Is(err, delayedMessagesMismatch) {
return true, nil
} else if err != nil {
Expand Down Expand Up @@ -607,9 +621,6 @@ func (r *InboxReader) getNextBlockToRead() (*big.Int, error) {
if delayedCount == 0 {
return new(big.Int).Set(r.firstMessageBlock), nil
}
if r.config().FirstBatch+1 >= delayedCount {
delayedCount = r.config().FirstBatch + 1
}
_, _, parentChainBlockNumber, err := r.tracker.GetDelayedMessageAccumulatorAndParentChainBlockNumber(delayedCount - 1)
if err != nil {
return nil, err
Expand Down
15 changes: 4 additions & 11 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (t *InboxTracker) GetDelayedMessageBytes(seqNum uint64) ([]byte, error) {
return msg.Serialize()
}

func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool, firstBatchAllowed uint64) error {
func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool) error {
if len(messages) == 0 {
return nil
}
Expand Down Expand Up @@ -381,11 +381,6 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR
return fmt.Errorf("previous delayed accumulator mismatch for message %v", seqNum)
}
nextAcc = message.AfterInboxAcc()
pos++
// Skip adding the message if it's before the first batch allowed.
if seqNum != 0 && seqNum < firstBatchAllowed {
continue
}

delayedMsgKey := dbKey(rlpDelayedMessagePrefix, seqNum)

Expand All @@ -409,6 +404,8 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR
return err
}
}

pos++
}

return t.setDelayedCountReorgAndWriteBatch(batch, pos, true)
Expand Down Expand Up @@ -545,7 +542,7 @@ func (b *multiplexerBackend) ReadDelayedInbox(seqNum uint64) (*arbostypes.L1Inco

var delayedMessagesMismatch = errors.New("sequencer batch delayed messages missing or different")

func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L1Interface, batches []*SequencerInboxBatch, firstBatchAllowed uint64) error {
func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L1Interface, batches []*SequencerInboxBatch) error {
if len(batches) == 0 {
return nil
}
Expand Down Expand Up @@ -636,10 +633,6 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
lastBatchMeta := prevbatchmeta
batchMetas := make(map[uint64]BatchMetadata, len(batches))
for _, batch := range batches {
// Skip adding the batch if it's before the first batch allowed.
if batch.SequenceNumber != 0 && batch.SequenceNumber < firstBatchAllowed {
continue
}
meta := BatchMetadata{
Accumulator: batch.AfterInboxAcc,
DelayedMessageCount: batch.AfterDelayedCount,
Expand Down
132 changes: 132 additions & 0 deletions system_tests/a_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package arbtest

import (
"context"
"errors"
"math/big"
"os"
"testing"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/offchainlabs/nitro/arbnode"
)

func TestA(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Setup Node A with sequencer
builder := NewNodeBuilder(ctx).DefaultConfig(t, true)
cleanup := builder.Build(t)
defer cleanup()
startL1Block, err := builder.L1.Client.BlockNumber(ctx)
Require(t, err)

// Setup Node B without sequencer
nodeBStackConfig := builder.l2StackConfig
// Set the data dir manually, since we want to reuse the same data dir for node C
nodeBStackConfig.DataDir = t.TempDir()
testClientB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{
stackConfig: nodeBStackConfig,
})
balance := big.NewInt(params.Ether)
balance.Mul(balance, big.NewInt(100))
builder.L2Info.GenerateAccount("BackgroundUser")
tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, balance, nil)
err = builder.L2.Client.SendTransaction(ctx, tx)
Require(t, err)
_, err = builder.L2.EnsureTxSucceeded(tx)
Require(t, err)

// Make a bunch of L2 transactions, till we have at least 10 batches
cleanupFirstRoundBackgroundTx := startBackgroundTxs(ctx, builder)
var firstRoundBatches []*arbnode.SequencerInboxBatch
for ctx.Err() == nil && len(firstRoundBatches) < 10 {
endL1Block, err := builder.L1.Client.BlockNumber(ctx)
Require(t, err)
seqInbox, err := arbnode.NewSequencerInbox(builder.L1.Client, builder.L2.ConsensusNode.DeployInfo.SequencerInbox, 0)
Require(t, err)
firstRoundBatches, err = seqInbox.LookupBatchesInRange(ctx, new(big.Int).SetUint64(startL1Block), new(big.Int).SetUint64(endL1Block))
Require(t, err)
}
// Stop making background transactions
cleanupFirstRoundBackgroundTx()

// Make sure the sequencer has processed all the transactions
time.Sleep(1 * time.Second)

// Make sure node B has synced up to the sequencer
for ctx.Err() == nil {
endL1Block, err := builder.L1.Client.BlockNumber(ctx)
Require(t, err)
seqInbox, err := arbnode.NewSequencerInbox(builder.L1.Client, builder.L2.ConsensusNode.DeployInfo.SequencerInbox, 0)
Require(t, err)
firstRoundBatches, err = seqInbox.LookupBatchesInRange(ctx, new(big.Int).SetUint64(startL1Block), new(big.Int).SetUint64(endL1Block))
Require(t, err)
batchCount, err := testClientB.ExecNode.HeadMessageNumber()
Require(t, err)
if uint64(len(firstRoundBatches)) > uint64(batchCount) {
//time.Sleep(1 * time.Second)

Check failure on line 71 in system_tests/a_test.go

View workflow job for this annotation

GitHub Actions / Go Tests (challenge)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 71 in system_tests/a_test.go

View workflow job for this annotation

GitHub Actions / Go Tests (race)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 71 in system_tests/a_test.go

View workflow job for this annotation

GitHub Actions / Go Tests (defaults)

commentFormatting: put a space between `//` and comment text (gocritic)
} else {
break
}
}
// Stop node B
cleanupB()

// Make a bunch of L2 transactions, till we have at least 10 more batches, since we stopped node B
cleanupSecondRoundBackgroundTx := startBackgroundTxs(ctx, builder)
var secondRoundBatches []*arbnode.SequencerInboxBatch
for ctx.Err() == nil && len(secondRoundBatches) < len(firstRoundBatches)+10 {
endL1Block, err := builder.L1.Client.BlockNumber(ctx)
Require(t, err)
seqInbox, err := arbnode.NewSequencerInbox(builder.L1.Client, builder.L2.ConsensusNode.DeployInfo.SequencerInbox, 0)
Require(t, err)
secondRoundBatches, err = seqInbox.LookupBatchesInRange(ctx, new(big.Int).SetUint64(startL1Block), new(big.Int).SetUint64(endL1Block))
Require(t, err)
}
// Stop making background transactions
cleanupSecondRoundBackgroundTx()

// Setup Node B without sequencer, while using the same data dir as node B
nodeCConfig := arbnode.ConfigDefaultL1NonSequencerTest()
// Start node C from the first batch till which node B has synced
nodeCConfig.InboxReader.FirstBatch = uint64(len(firstRoundBatches))
// Remove arbitrumdata dir to simulate a fresh start
err = os.RemoveAll(nodeBStackConfig.ResolvePath("arbitrumdata"))
Require(t, err)
testClientC, cleanupC := builder.Build2ndNode(t, &SecondNodeParams{
nodeConfig: nodeCConfig,
stackConfig: nodeBStackConfig,
})
defer cleanupC()
for {
batchCount, err := testClientC.ConsensusNode.InboxTracker.GetBatchCount()
Require(t, err)
if uint64(len(secondRoundBatches)) > batchCount {
time.Sleep(1 * time.Second)
} else {
break
}
}
}

func startBackgroundTxs(ctx context.Context, builder *NodeBuilder) func() {
// Continually make L2 transactions in a background thread
backgroundTxsCtx, cancelBackgroundTxs := context.WithCancel(ctx)
backgroundTxsShutdownChan := make(chan struct{})
cleanup := func() {
cancelBackgroundTxs()
<-backgroundTxsShutdownChan
}
go (func() {
defer close(backgroundTxsShutdownChan)
err := makeBackgroundTxs(backgroundTxsCtx, builder)
if !errors.Is(err, context.Canceled) {
log.Warn("error making background txs", "err", err)
}
})()
return cleanup
}
2 changes: 1 addition & 1 deletion system_tests/full_challenge_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func makeBatch(t *testing.T, l2Node *arbnode.Node, l2Info *BlockchainTestInfo, b
if len(batches) == 0 {
Fatal(t, "batch not found after AddSequencerL2BatchFromOrigin")
}
err = l2Node.InboxTracker.AddSequencerBatches(ctx, backend, batches, 0)
err = l2Node.InboxTracker.AddSequencerBatches(ctx, backend, batches)
Require(t, err)
_, err = l2Node.InboxTracker.GetBatchMetadata(0)
Require(t, err, "failed to get batch metadata after adding batch:")
Expand Down

0 comments on commit 2c84219

Please sign in to comment.