From 2c842194c3dee0e1e271a558b34bacc360d0eab0 Mon Sep 17 00:00:00 2001 From: amsanghi Date: Mon, 26 Feb 2024 19:39:43 +0530 Subject: [PATCH] Test --- arbnode/delayed_seq_reorg_test.go | 6 +- arbnode/inbox_reader.go | 33 ++++-- arbnode/inbox_tracker.go | 15 +-- system_tests/a_test.go | 132 +++++++++++++++++++++++ system_tests/full_challenge_impl_test.go | 2 +- 5 files changed, 162 insertions(+), 26 deletions(-) create mode 100644 system_tests/a_test.go diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index a1113dd8e9..beb2656e2b 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -48,7 +48,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) { }, }, } - err = tracker.AddDelayedMessages([]*DelayedInboxMessage{initMsgDelayed, userDelayed}, false, 0) + err = tracker.AddDelayedMessages([]*DelayedInboxMessage{initMsgDelayed, userDelayed}, false) Require(t, err) serializedInitMsgBatch := make([]byte, 40) @@ -97,7 +97,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) { bridgeAddress: [20]byte{}, serialized: serializedUserMsgBatch, } - err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{initMsgBatch, userMsgBatch, emptyBatch}, 0) + err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{initMsgBatch, userMsgBatch, emptyBatch}) Require(t, err) // Reorg out the user delayed message @@ -136,7 +136,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) { bridgeAddress: [20]byte{}, serialized: serializedInitMsgBatch, } - err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{emptyBatch}, 0) + err = tracker.AddSequencerBatches(ctx, nil, []*SequencerInboxBatch{emptyBatch}) Require(t, err) msgCount, err = streamer.GetMessageCount() diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 5400ecdd17..ca5897c0c3 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -70,7 +70,7 @@ var DefaultInboxReaderConfig = InboxReaderConfig{ TargetMessagesRead: 500, MaxBlocksToRead: 2000, ReadMode: "latest", - FirstBatch: 100, + FirstBatch: 0, } var TestInboxReaderConfig = InboxReaderConfig{ @@ -82,7 +82,7 @@ var TestInboxReaderConfig = InboxReaderConfig{ TargetMessagesRead: 500, MaxBlocksToRead: 2000, ReadMode: "latest", - FirstBatch: 100, + FirstBatch: 0, } type InboxReader struct { @@ -344,7 +344,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { return err } } - if checkingDelayedCount > 0 && checkingDelayedCount > r.config().FirstBatch { + if checkingDelayedCount > 0 { checkingDelayedSeqNum := checkingDelayedCount - 1 l1DelayedAcc, err := r.delayedBridge.GetAccumulator(ctx, checkingDelayedSeqNum, currentHeight, common.Hash{}) if err != nil { @@ -380,7 +380,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { return err } } - if checkingBatchCount > 0 && checkingBatchCount > r.config().FirstBatch { + if checkingBatchCount > 0 { checkingBatchSeqNum := checkingBatchCount - 1 l1BatchAcc, err := r.sequencerInbox.GetAccumulator(ctx, checkingBatchSeqNum, currentHeight) if err != nil { @@ -462,7 +462,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { missingSequencer = false reorgingSequencer = false firstBatch := sequencerBatches[0] - if firstBatch.SequenceNumber > 0 { + if firstBatch.SequenceNumber > r.config().FirstBatch { haveAcc, err := r.tracker.GetBatchAcc(firstBatch.SequenceNumber - 1) if errors.Is(err, AccumulatorNotFoundErr) { reorgingSequencer = true @@ -476,6 +476,10 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { // Skip any batches we already have in the database for len(sequencerBatches) > 0 { batch := sequencerBatches[0] + if batch.SequenceNumber < r.config().FirstBatch { + sequencerBatches = sequencerBatches[1:] + continue + } haveAcc, err := r.tracker.GetBatchAcc(batch.SequenceNumber) if errors.Is(err, AccumulatorNotFoundErr) { // This batch is new @@ -508,7 +512,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { if err != nil { return err } - if beforeCount > 0 { + if beforeCount > r.config().FirstBatch { haveAcc, err := r.tracker.GetDelayedAcc(beforeCount - 1) if errors.Is(err, AccumulatorNotFoundErr) { reorgingDelayed = true @@ -517,6 +521,16 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } else if haveAcc != beforeAcc { reorgingDelayed = true } + for len(delayedMessages) > 0 { + message := delayedMessages[0] + beforeCount, err := message.Message.Header.SeqNum() + if err != nil { + return err + } + if beforeCount < r.config().FirstBatch { + delayedMessages = delayedMessages[1:] + } + } } } else if missingDelayed && to.Cmp(currentHeight) >= 0 { // We were missing delayed messages but didn't find any. @@ -575,11 +589,11 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } func (r *InboxReader) addMessages(ctx context.Context, sequencerBatches []*SequencerInboxBatch, delayedMessages []*DelayedInboxMessage) (bool, error) { - err := r.tracker.AddDelayedMessages(delayedMessages, r.config().HardReorg, r.config().FirstBatch) + err := r.tracker.AddDelayedMessages(delayedMessages, r.config().HardReorg) if err != nil { return false, err } - err = r.tracker.AddSequencerBatches(ctx, r.client, sequencerBatches, r.config().FirstBatch) + err = r.tracker.AddSequencerBatches(ctx, r.client, sequencerBatches) if errors.Is(err, delayedMessagesMismatch) { return true, nil } else if err != nil { @@ -607,9 +621,6 @@ func (r *InboxReader) getNextBlockToRead() (*big.Int, error) { if delayedCount == 0 { return new(big.Int).Set(r.firstMessageBlock), nil } - if r.config().FirstBatch+1 >= delayedCount { - delayedCount = r.config().FirstBatch + 1 - } _, _, parentChainBlockNumber, err := r.tracker.GetDelayedMessageAccumulatorAndParentChainBlockNumber(delayedCount - 1) if err != nil { return nil, err diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 3275f3271d..f98f93a3eb 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -329,7 +329,7 @@ func (t *InboxTracker) GetDelayedMessageBytes(seqNum uint64) ([]byte, error) { return msg.Serialize() } -func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool, firstBatchAllowed uint64) error { +func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardReorg bool) error { if len(messages) == 0 { return nil } @@ -381,11 +381,6 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR return fmt.Errorf("previous delayed accumulator mismatch for message %v", seqNum) } nextAcc = message.AfterInboxAcc() - pos++ - // Skip adding the message if it's before the first batch allowed. - if seqNum != 0 && seqNum < firstBatchAllowed { - continue - } delayedMsgKey := dbKey(rlpDelayedMessagePrefix, seqNum) @@ -409,6 +404,8 @@ func (t *InboxTracker) AddDelayedMessages(messages []*DelayedInboxMessage, hardR return err } } + + pos++ } return t.setDelayedCountReorgAndWriteBatch(batch, pos, true) @@ -545,7 +542,7 @@ func (b *multiplexerBackend) ReadDelayedInbox(seqNum uint64) (*arbostypes.L1Inco var delayedMessagesMismatch = errors.New("sequencer batch delayed messages missing or different") -func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L1Interface, batches []*SequencerInboxBatch, firstBatchAllowed uint64) error { +func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L1Interface, batches []*SequencerInboxBatch) error { if len(batches) == 0 { return nil } @@ -636,10 +633,6 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L lastBatchMeta := prevbatchmeta batchMetas := make(map[uint64]BatchMetadata, len(batches)) for _, batch := range batches { - // Skip adding the batch if it's before the first batch allowed. - if batch.SequenceNumber != 0 && batch.SequenceNumber < firstBatchAllowed { - continue - } meta := BatchMetadata{ Accumulator: batch.AfterInboxAcc, DelayedMessageCount: batch.AfterDelayedCount, diff --git a/system_tests/a_test.go b/system_tests/a_test.go new file mode 100644 index 0000000000..cc14e64d68 --- /dev/null +++ b/system_tests/a_test.go @@ -0,0 +1,132 @@ +package arbtest + +import ( + "context" + "errors" + "math/big" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + "github.com/offchainlabs/nitro/arbnode" +) + +func TestA(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup Node A with sequencer + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + cleanup := builder.Build(t) + defer cleanup() + startL1Block, err := builder.L1.Client.BlockNumber(ctx) + Require(t, err) + + // Setup Node B without sequencer + nodeBStackConfig := builder.l2StackConfig + // Set the data dir manually, since we want to reuse the same data dir for node C + nodeBStackConfig.DataDir = t.TempDir() + testClientB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{ + stackConfig: nodeBStackConfig, + }) + balance := big.NewInt(params.Ether) + balance.Mul(balance, big.NewInt(100)) + builder.L2Info.GenerateAccount("BackgroundUser") + tx := builder.L2Info.PrepareTx("Faucet", "BackgroundUser", builder.L2Info.TransferGas, balance, nil) + err = builder.L2.Client.SendTransaction(ctx, tx) + Require(t, err) + _, err = builder.L2.EnsureTxSucceeded(tx) + Require(t, err) + + // Make a bunch of L2 transactions, till we have at least 10 batches + cleanupFirstRoundBackgroundTx := startBackgroundTxs(ctx, builder) + var firstRoundBatches []*arbnode.SequencerInboxBatch + for ctx.Err() == nil && len(firstRoundBatches) < 10 { + endL1Block, err := builder.L1.Client.BlockNumber(ctx) + Require(t, err) + seqInbox, err := arbnode.NewSequencerInbox(builder.L1.Client, builder.L2.ConsensusNode.DeployInfo.SequencerInbox, 0) + Require(t, err) + firstRoundBatches, err = seqInbox.LookupBatchesInRange(ctx, new(big.Int).SetUint64(startL1Block), new(big.Int).SetUint64(endL1Block)) + Require(t, err) + } + // Stop making background transactions + cleanupFirstRoundBackgroundTx() + + // Make sure the sequencer has processed all the transactions + time.Sleep(1 * time.Second) + + // Make sure node B has synced up to the sequencer + for ctx.Err() == nil { + endL1Block, err := builder.L1.Client.BlockNumber(ctx) + Require(t, err) + seqInbox, err := arbnode.NewSequencerInbox(builder.L1.Client, builder.L2.ConsensusNode.DeployInfo.SequencerInbox, 0) + Require(t, err) + firstRoundBatches, err = seqInbox.LookupBatchesInRange(ctx, new(big.Int).SetUint64(startL1Block), new(big.Int).SetUint64(endL1Block)) + Require(t, err) + batchCount, err := testClientB.ExecNode.HeadMessageNumber() + Require(t, err) + if uint64(len(firstRoundBatches)) > uint64(batchCount) { + //time.Sleep(1 * time.Second) + } else { + break + } + } + // Stop node B + cleanupB() + + // Make a bunch of L2 transactions, till we have at least 10 more batches, since we stopped node B + cleanupSecondRoundBackgroundTx := startBackgroundTxs(ctx, builder) + var secondRoundBatches []*arbnode.SequencerInboxBatch + for ctx.Err() == nil && len(secondRoundBatches) < len(firstRoundBatches)+10 { + endL1Block, err := builder.L1.Client.BlockNumber(ctx) + Require(t, err) + seqInbox, err := arbnode.NewSequencerInbox(builder.L1.Client, builder.L2.ConsensusNode.DeployInfo.SequencerInbox, 0) + Require(t, err) + secondRoundBatches, err = seqInbox.LookupBatchesInRange(ctx, new(big.Int).SetUint64(startL1Block), new(big.Int).SetUint64(endL1Block)) + Require(t, err) + } + // Stop making background transactions + cleanupSecondRoundBackgroundTx() + + // Setup Node B without sequencer, while using the same data dir as node B + nodeCConfig := arbnode.ConfigDefaultL1NonSequencerTest() + // Start node C from the first batch till which node B has synced + nodeCConfig.InboxReader.FirstBatch = uint64(len(firstRoundBatches)) + // Remove arbitrumdata dir to simulate a fresh start + err = os.RemoveAll(nodeBStackConfig.ResolvePath("arbitrumdata")) + Require(t, err) + testClientC, cleanupC := builder.Build2ndNode(t, &SecondNodeParams{ + nodeConfig: nodeCConfig, + stackConfig: nodeBStackConfig, + }) + defer cleanupC() + for { + batchCount, err := testClientC.ConsensusNode.InboxTracker.GetBatchCount() + Require(t, err) + if uint64(len(secondRoundBatches)) > batchCount { + time.Sleep(1 * time.Second) + } else { + break + } + } +} + +func startBackgroundTxs(ctx context.Context, builder *NodeBuilder) func() { + // Continually make L2 transactions in a background thread + backgroundTxsCtx, cancelBackgroundTxs := context.WithCancel(ctx) + backgroundTxsShutdownChan := make(chan struct{}) + cleanup := func() { + cancelBackgroundTxs() + <-backgroundTxsShutdownChan + } + go (func() { + defer close(backgroundTxsShutdownChan) + err := makeBackgroundTxs(backgroundTxsCtx, builder) + if !errors.Is(err, context.Canceled) { + log.Warn("error making background txs", "err", err) + } + })() + return cleanup +} diff --git a/system_tests/full_challenge_impl_test.go b/system_tests/full_challenge_impl_test.go index 524db8b08f..362b134806 100644 --- a/system_tests/full_challenge_impl_test.go +++ b/system_tests/full_challenge_impl_test.go @@ -177,7 +177,7 @@ func makeBatch(t *testing.T, l2Node *arbnode.Node, l2Info *BlockchainTestInfo, b if len(batches) == 0 { Fatal(t, "batch not found after AddSequencerL2BatchFromOrigin") } - err = l2Node.InboxTracker.AddSequencerBatches(ctx, backend, batches, 0) + err = l2Node.InboxTracker.AddSequencerBatches(ctx, backend, batches) Require(t, err) _, err = l2Node.InboxTracker.GetBatchMetadata(0) Require(t, err, "failed to get batch metadata after adding batch:")