From 8a13df52de590b424232e85acdce0f378d124b54 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 22 Oct 2024 20:17:24 +0530 Subject: [PATCH 1/8] Add a nitro option to stop syncing at a given block number --- arbnode/delayed_seq_reorg_test.go | 4 +-- arbnode/delayed_sequencer.go | 17 +++++++++--- arbnode/inbox_reader.go | 36 ++++++++++++++++++------- arbnode/inbox_test.go | 4 +-- arbnode/node.go | 15 ++++++----- arbnode/seq_coordinator.go | 30 +++++++++++++++++++-- arbnode/transaction_streamer.go | 29 ++++++++++++++++++-- broadcastclient/broadcastclient.go | 7 ++++- broadcastclient/broadcastclient_test.go | 20 +++++++------- broadcastclients/broadcastclients.go | 21 +++++++++------ execution/gethexec/executionengine.go | 9 +++++-- execution/gethexec/node.go | 4 +-- execution/interface.go | 2 +- relay/relay.go | 2 +- relay/relay_stress_test.go | 2 +- 15 files changed, 150 insertions(+), 52 deletions(-) diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index 699eb3e8f6..87d93ac3d4 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -22,9 +22,9 @@ func TestSequencerReorgFromDelayed(t *testing.T) { tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig) Require(t, err) - err = streamer.Start(ctx) + err = streamer.Start(ctx, 0) Require(t, err) - exec.Start(ctx) + exec.Start(ctx, 0) init, err := streamer.GetMessage(0) Require(t, err) diff --git a/arbnode/delayed_sequencer.go b/arbnode/delayed_sequencer.go index b29a66dd05..9e63d4ea03 100644 --- a/arbnode/delayed_sequencer.go +++ b/arbnode/delayed_sequencer.go @@ -211,11 +211,20 @@ func (d *DelayedSequencer) ForceSequenceDelayed(ctx context.Context) error { return d.sequenceWithoutLockout(ctx, lastBlockHeader) } -func (d *DelayedSequencer) run(ctx context.Context) { +func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) { headerChan, cancel := d.l1Reader.Subscribe(false) defer cancel() for { + delayedCount, err := d.inbox.GetDelayedCount() + if err != nil { + log.Warn("error reading delayed count", "err", err) + continue + } + if syncTillBlock > 0 && delayedCount >= syncTillBlock { + log.Info("stopping block creation in delayed sequencer", "syncTillBlock", syncTillBlock) + return + } select { case nextHeader, ok := <-headerChan: if !ok { @@ -232,7 +241,9 @@ func (d *DelayedSequencer) run(ctx context.Context) { } } -func (d *DelayedSequencer) Start(ctxIn context.Context) { +func (d *DelayedSequencer) Start(ctxIn context.Context, syncTillBlock uint64) { d.StopWaiter.Start(ctxIn, d) - d.LaunchThread(d.run) + d.LaunchThread(func(ctx context.Context) { + d.run(ctx, syncTillBlock) + }) } diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 98104b2ea7..811e780c40 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -119,18 +119,36 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h }, nil } -func (r *InboxReader) Start(ctxIn context.Context) error { +func (r *InboxReader) Start(ctxIn context.Context, syncTillBlock uint64) error { r.StopWaiter.Start(ctxIn, r) hadError := false - r.CallIteratively(func(ctx context.Context) time.Duration { - err := r.run(ctx, hadError) - if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") { - log.Warn("error reading inbox", "err", err) - hadError = true - } else { - hadError = false + r.LaunchThread(func(ctx context.Context) { + for { + delayedCount, err := r.tracker.GetDelayedCount() + if err != nil { + log.Warn("error reading delayed count", "err", err) + hadError = true + } + if syncTillBlock > 0 && delayedCount >= syncTillBlock { + log.Info("stopping block creation in inbox reader", "syncTillBlock", syncTillBlock) + return + } + err = r.run(ctx, hadError) + if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") { + log.Warn("error reading inbox", "err", err) + hadError = true + } else { + hadError = false + } + interval := time.Second + timer := time.NewTimer(interval) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } } - return time.Second }) // Ensure we read the init message before other things start up diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index e588ef399b..e75e9ee620 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -106,9 +106,9 @@ func TestTransactionStreamer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := inbox.Start(ctx) + err := inbox.Start(ctx, 0) Require(t, err) - exec.Start(ctx) + exec.Start(ctx, 0) maxExpectedGasCost := big.NewInt(l2pricing.InitialBaseFeeWei) maxExpectedGasCost.Mul(maxExpectedGasCost, big.NewInt(2100*2)) diff --git a/arbnode/node.go b/arbnode/node.go index c5b3bbe071..3e57dba13b 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -79,6 +79,7 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com type Config struct { Sequencer bool `koanf:"sequencer"` + SyncTillBlock uint64 `koanf:"sync-till-block"` ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"` DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"` @@ -145,6 +146,7 @@ func (c *Config) ValidatorRequired() bool { func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feedOutputEnable bool) { f.Bool(prefix+".sequencer", ConfigDefault.Sequencer, "enable sequencer") + f.Uint64(prefix+".sync-till-block", ConfigDefault.SyncTillBlock, "sync till block") headerreader.AddOptions(prefix+".parent-chain-reader", f) InboxReaderConfigAddOptions(prefix+".inbox-reader", f) DelayedSequencerConfigAddOptions(prefix+".delayed-sequencer", f) @@ -163,6 +165,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed var ConfigDefault = Config{ Sequencer: false, + SyncTillBlock: 0, ParentChainReader: headerreader.DefaultConfig, InboxReader: DefaultInboxReaderConfig, DelayedSequencer: DefaultDelayedSequencerConfig, @@ -839,7 +842,7 @@ func (n *Node) Start(ctx context.Context) error { if execClient != nil { execClient.SetConsensusClient(n) } - err = n.Execution.Start(ctx) + err = n.Execution.Start(ctx, n.configFetcher.Get().SyncTillBlock) if err != nil { return fmt.Errorf("error starting exec client: %w", err) } @@ -869,12 +872,12 @@ func (n *Node) Start(ctx context.Context) error { return fmt.Errorf("error populating feed backlog on startup: %w", err) } } - err = n.TxStreamer.Start(ctx) + err = n.TxStreamer.Start(ctx, n.configFetcher.Get().SyncTillBlock) if err != nil { return fmt.Errorf("error starting transaction streamer: %w", err) } if n.InboxReader != nil { - err = n.InboxReader.Start(ctx) + err = n.InboxReader.Start(ctx, n.configFetcher.Get().SyncTillBlock) if err != nil { return fmt.Errorf("error starting inbox reader: %w", err) } @@ -887,7 +890,7 @@ func (n *Node) Start(ctx context.Context) error { } } if n.SeqCoordinator != nil { - n.SeqCoordinator.Start(ctx) + n.SeqCoordinator.Start(ctx, n.configFetcher.Get().SyncTillBlock) } else { n.Execution.Activate() } @@ -895,7 +898,7 @@ func (n *Node) Start(ctx context.Context) error { n.MaintenanceRunner.Start(ctx) } if n.DelayedSequencer != nil { - n.DelayedSequencer.Start(ctx) + n.DelayedSequencer.Start(ctx, n.configFetcher.Get().SyncTillBlock) } if n.BatchPoster != nil { n.BatchPoster.Start(ctx) @@ -945,7 +948,7 @@ func (n *Node) Start(ctx context.Context) error { return } } - n.BroadcastClients.Start(ctx) + n.BroadcastClients.Start(ctx, n.configFetcher.Get().SyncTillBlock) }() } if n.configFetcher != nil { diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 5987801d5f..6563ffde9a 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -854,7 +854,7 @@ func (c *SeqCoordinator) launchHealthcheckServer(ctx context.Context) { } } -func (c *SeqCoordinator) Start(ctxIn context.Context) { +func (c *SeqCoordinator) Start(ctxIn context.Context, syncTillBlock uint64) { c.StopWaiter.Start(ctxIn, c) var newRedisCoordinator *redisutil.RedisCoordinator if c.config.NewRedisUrl != "" { @@ -865,7 +865,33 @@ func (c *SeqCoordinator) Start(ctxIn context.Context) { err, "newRedisUrl", c.config.NewRedisUrl) } } - c.CallIteratively(func(ctx context.Context) time.Duration { return c.chooseRedisAndUpdate(ctx, newRedisCoordinator) }) + + c.LaunchThread(func(ctx context.Context) { + for { + count, err := c.streamer.GetMessageCount() + if err != nil { + log.Warn("failed to get message count", "err", err) + } + if syncTillBlock > 0 && uint64(count) >= syncTillBlock { + log.Info("stopping block creation in sequencer", "syncTillBlock", syncTillBlock) + return + } + interval := c.chooseRedisAndUpdate(ctx, newRedisCoordinator) + if ctx.Err() != nil { + return + } + if interval == time.Duration(0) { + continue + } + timer := time.NewTimer(interval) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + } + }) if c.config.ChosenHealthcheckAddr != "" { c.StopWaiter.LaunchThread(c.launchHealthcheckServer) } diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 38b1c003db..a279bb378c 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -1218,7 +1218,32 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc return s.config().ExecuteMessageLoopDelay } -func (s *TransactionStreamer) Start(ctxIn context.Context) error { +func (s *TransactionStreamer) Start(ctxIn context.Context, syncTillBlock uint64) error { s.StopWaiter.Start(ctxIn, s) - return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier) + return s.LaunchThreadSafe(func(ctx context.Context) { + var defaultVal struct{} + var val struct{} + for { + if syncTillBlock > 0 && uint64(s.execLastMsgCount) >= syncTillBlock { + log.Info("stopping block creation in transaction streamer", "syncTillBlock", syncTillBlock) + return + } + interval := s.executeMessages(ctx, val) + if ctx.Err() != nil { + return + } + val = defaultVal + if interval == time.Duration(0) { + continue + } + timer := time.NewTimer(interval) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + case val = <-s.newMessageNotifier: + } + } + }) } diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 4e97ca8cd0..a539c104cf 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -176,7 +176,7 @@ func NewBroadcastClient( }, err } -func (bc *BroadcastClient) Start(ctxIn context.Context) { +func (bc *BroadcastClient) Start(ctxIn context.Context, syncTillBlock uint64) { bc.StopWaiter.Start(ctxIn, bc) if bc.StopWaiter.Stopped() { log.Info("broadcast client has already been stopped, not starting") @@ -185,6 +185,11 @@ func (bc *BroadcastClient) Start(ctxIn context.Context) { bc.LaunchThread(func(ctx context.Context) { backoffDuration := bc.config().ReconnectInitialBackoff for { + + if syncTillBlock > 0 && uint64(bc.nextSeqNum) >= syncTillBlock { + log.Info("stopping block creation in broadcast client", "syncTillBlock", syncTillBlock) + return + } earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum) if errors.Is(err, ErrMissingChainId) || errors.Is(err, ErrIncorrectChainId) || diff --git a/broadcastclient/broadcastclient_test.go b/broadcastclient/broadcastclient_test.go index a499628cd5..ecf5a88b3b 100644 --- a/broadcastclient/broadcastclient_test.go +++ b/broadcastclient/broadcastclient_test.go @@ -153,7 +153,7 @@ func TestInvalidSignature(t *testing.T) { &badSequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx) + broadcastClient.Start(ctx, 0) go func() { for i := 0; i < messageCount; i++ { @@ -227,7 +227,7 @@ func startMakeBroadcastClient(ctx context.Context, t *testing.T, clientConfig Co sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx) + broadcastClient.Start(ctx, 0) messageCount := 0 wg.Add(1) @@ -315,7 +315,7 @@ func TestServerClientDisconnect(t *testing.T) { &sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx) + broadcastClient.Start(ctx, 0) t.Log("broadcasting seq 0 message") Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil)) @@ -386,7 +386,7 @@ func TestBroadcastClientConfirmedMessage(t *testing.T) { &sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx) + broadcastClient.Start(ctx, 0) t.Log("broadcasting seq 0 message") Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil)) @@ -458,7 +458,7 @@ func TestServerIncorrectChainId(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx) + badBroadcastClient.Start(ctx, 0) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -517,7 +517,7 @@ func TestServerMissingChainId(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx) + badBroadcastClient.Start(ctx, 0) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -574,7 +574,7 @@ func TestServerIncorrectFeedServerVersion(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx) + badBroadcastClient.Start(ctx, 0) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -633,7 +633,7 @@ func TestServerMissingFeedServerVersion(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx) + badBroadcastClient.Start(ctx, 0) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -684,7 +684,7 @@ func TestBroadcastClientReconnectsOnServerDisconnect(t *testing.T) { &sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx) + broadcastClient.Start(ctx, 0) defer broadcastClient.StopAndWait() // Client set to timeout connection at 200 milliseconds, and server set to send ping every 50 seconds, @@ -796,7 +796,7 @@ func connectAndGetCachedMessages(ctx context.Context, addr net.Addr, chainId uin sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx) + broadcastClient.Start(ctx, 0) go func() { defer wg.Done() diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 8cd124bfe0..a2c4fa13dd 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -131,12 +131,12 @@ func clearAndResetTicker(timer *time.Ticker, interval time.Duration) { timer.Reset(interval) } -func (bcs *BroadcastClients) Start(ctx context.Context) { +func (bcs *BroadcastClients) Start(ctx context.Context, syncTillBlock uint64) { bcs.primaryRouter.StopWaiter.Start(ctx, bcs.primaryRouter) bcs.secondaryRouter.StopWaiter.Start(ctx, bcs.secondaryRouter) for _, client := range bcs.primaryClients { - client.Start(ctx) + client.Start(ctx, syncTillBlock) } var lastConfirmed arbutil.MessageIndex @@ -176,7 +176,12 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { } // Multiple select statements to prioritize reading messages from primary feeds' channels and avoid starving of timers + var msg m.BroadcastFeedMessage for { + if syncTillBlock > 0 && uint64(msg.SequenceNumber) >= syncTillBlock { + log.Info("stopping block creation in broadcast client", "syncTillBlock", syncTillBlock) + return + } select { // Cycle buckets to get rid of old entries case <-recentFeedItemsCleanup.C: @@ -192,7 +197,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { case <-ctx.Done(): return // Primary feeds - case msg := <-bcs.primaryRouter.messageChan: + case msg = <-bcs.primaryRouter.messageChan: if err := msgHandler(msg, bcs.primaryRouter); err != nil { log.Error("Error routing message from Primary Sequencer Feeds", "err", err) } @@ -210,7 +215,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { case <-ctx.Done(): return // Secondary Feeds - case msg := <-bcs.secondaryRouter.messageChan: + case msg = <-bcs.secondaryRouter.messageChan: if err := msgHandler(msg, bcs.secondaryRouter); err != nil { log.Error("Error routing message from Secondary Sequencer Feeds", "err", err) } @@ -218,7 +223,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan: confSeqHandler(cs, bcs.secondaryRouter) clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME) - case msg := <-bcs.primaryRouter.messageChan: + case msg = <-bcs.primaryRouter.messageChan: if err := msgHandler(msg, bcs.primaryRouter); err != nil { log.Error("Error routing message from Primary Sequencer Feeds", "err", err) } @@ -229,7 +234,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME) clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME) case <-startSecondaryFeedTimer.C: - bcs.startSecondaryFeed(ctx) + bcs.startSecondaryFeed(ctx, syncTillBlock) case <-primaryFeedIsDownTimer.C: clearAndResetTicker(stopSecondaryFeedTimer, PRIMARY_FEED_UPTIME) } @@ -238,7 +243,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { }) } -func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { +func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context, syncTillBlock uint64) { pos := len(bcs.secondaryClients) if pos < len(bcs.secondaryURL) { url := bcs.secondaryURL[pos] @@ -249,7 +254,7 @@ func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { return } bcs.secondaryClients = append(bcs.secondaryClients, client) - client.Start(ctx) + client.Start(ctx, syncTillBlock) log.Info("secondary feed started", "url", url) } else if len(bcs.secondaryURL) > 0 { log.Warn("failed to start a new secondary feed all available secondary feeds were started") diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 6571672b71..e7d639bf42 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -392,7 +392,8 @@ func (s *ExecutionEngine) resequenceReorgedMessages(messages []*arbostypes.Messa log.Info("not resequencing delayed message due to unexpected index", "expected", nextDelayedSeqNum, "found", delayedSeqNum) continue } - _, err := s.sequenceDelayedMessageWithBlockMutex(msg.Message, delayedSeqNum) + _, err := s. + sequenceDelayedMessageWithBlockMutex(msg.Message, delayedSeqNum) if err != nil { log.Error("failed to re-sequence old delayed message removed by reorg", "err", err) } @@ -942,10 +943,14 @@ func (s *ExecutionEngine) ArbOSVersionForMessageNumber(messageNum arbutil.Messag return extra.ArbOSFormatVersion, nil } -func (s *ExecutionEngine) Start(ctx_in context.Context) { +func (s *ExecutionEngine) Start(ctx_in context.Context, syncTillBlock uint64) { s.StopWaiter.Start(ctx_in, s) s.LaunchThread(func(ctx context.Context) { for { + if syncTillBlock > 0 && s.latestBlock.NumberU64() >= syncTillBlock { + log.Info("stopping block creation in execution engine", "syncTillBlock", syncTillBlock) + return + } select { case <-ctx.Done(): return diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 499a13164e..e2e3f7282f 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -339,7 +339,7 @@ func (n *ExecutionNode) Initialize(ctx context.Context) error { } // not thread safe -func (n *ExecutionNode) Start(ctx context.Context) error { +func (n *ExecutionNode) Start(ctx context.Context, syncTillBlock uint64) error { if n.started.Swap(true) { return errors.New("already started") } @@ -348,7 +348,7 @@ func (n *ExecutionNode) Start(ctx context.Context) error { // if err != nil { // return fmt.Errorf("error starting geth stack: %w", err) // } - n.ExecEngine.Start(ctx) + n.ExecEngine.Start(ctx, syncTillBlock) err := n.TxPublisher.Start(ctx) if err != nil { return fmt.Errorf("error starting transaction puiblisher: %w", err) diff --git a/execution/interface.go b/execution/interface.go index 2a3d79c697..428b79cf89 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -64,7 +64,7 @@ type FullExecutionClient interface { ExecutionRecorder ExecutionSequencer - Start(ctx context.Context) error + Start(ctx context.Context, syncTillBlock uint64) error StopAndWait() Maintenance() error diff --git a/relay/relay.go b/relay/relay.go index 89bb899f29..936e3e5b1a 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -86,7 +86,7 @@ func (r *Relay) Start(ctx context.Context) error { return errors.New("broadcast unable to start") } - r.broadcastClients.Start(ctx) + r.broadcastClients.Start(ctx, 0) r.LaunchThread(func(ctx context.Context) { for { diff --git a/relay/relay_stress_test.go b/relay/relay_stress_test.go index 575a77ee6f..287d1a795b 100644 --- a/relay/relay_stress_test.go +++ b/relay/relay_stress_test.go @@ -142,7 +142,7 @@ func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize if err != nil { t.FailNow() } - client.Start(ctx) + client.Start(ctx, 0) defer client.StopOnly() } From 68c34bfa4d04bb1e59bc3b5c3591d0e00526f218 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 5 Nov 2024 20:56:20 +0530 Subject: [PATCH 2/8] Changes based on PR comments --- arbnode/delayed_seq_reorg_test.go | 4 ++-- arbnode/delayed_sequencer.go | 28 +++++++++++++------------ arbnode/inbox_reader.go | 10 +++++---- arbnode/inbox_test.go | 6 +++--- arbnode/node.go | 22 +++++++++---------- arbnode/seq_coordinator.go | 9 +++++--- arbnode/transaction_streamer.go | 10 ++++++--- broadcastclient/broadcastclient.go | 10 ++++++--- broadcastclient/broadcastclient_test.go | 22 +++++++++---------- broadcastclients/broadcastclients.go | 19 ++++++++++------- cmd/nitro/nitro.go | 1 + execution/gethexec/executionengine.go | 10 +++++---- execution/gethexec/node.go | 9 +++++--- execution/interface.go | 2 +- relay/relay.go | 3 ++- relay/relay_stress_test.go | 4 ++-- system_tests/common_test.go | 8 +++---- 17 files changed, 101 insertions(+), 76 deletions(-) diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index 87d93ac3d4..699eb3e8f6 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -22,9 +22,9 @@ func TestSequencerReorgFromDelayed(t *testing.T) { tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig) Require(t, err) - err = streamer.Start(ctx, 0) + err = streamer.Start(ctx) Require(t, err) - exec.Start(ctx, 0) + exec.Start(ctx) init, err := streamer.GetMessage(0) Require(t, err) diff --git a/arbnode/delayed_sequencer.go b/arbnode/delayed_sequencer.go index 9e63d4ea03..10e845a393 100644 --- a/arbnode/delayed_sequencer.go +++ b/arbnode/delayed_sequencer.go @@ -32,6 +32,7 @@ type DelayedSequencer struct { waitingForFinalizedBlock uint64 mutex sync.Mutex config DelayedSequencerConfigFetcher + syncTillBlock uint64 } type DelayedSequencerConfig struct { @@ -64,15 +65,16 @@ var TestDelayedSequencerConfig = DelayedSequencerConfig{ UseMergeFinality: false, } -func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReader, exec execution.ExecutionSequencer, coordinator *SeqCoordinator, config DelayedSequencerConfigFetcher) (*DelayedSequencer, error) { +func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReader, exec execution.ExecutionSequencer, coordinator *SeqCoordinator, config DelayedSequencerConfigFetcher, syncTillBlock uint64) (*DelayedSequencer, error) { d := &DelayedSequencer{ - l1Reader: l1Reader, - bridge: reader.DelayedBridge(), - inbox: reader.Tracker(), - reader: reader, - coordinator: coordinator, - exec: exec, - config: config, + l1Reader: l1Reader, + bridge: reader.DelayedBridge(), + inbox: reader.Tracker(), + reader: reader, + coordinator: coordinator, + exec: exec, + config: config, + syncTillBlock: syncTillBlock, } if coordinator != nil { coordinator.SetDelayedSequencer(d) @@ -211,7 +213,7 @@ func (d *DelayedSequencer) ForceSequenceDelayed(ctx context.Context) error { return d.sequenceWithoutLockout(ctx, lastBlockHeader) } -func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) { +func (d *DelayedSequencer) run(ctx context.Context) { headerChan, cancel := d.l1Reader.Subscribe(false) defer cancel() @@ -221,8 +223,8 @@ func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) { log.Warn("error reading delayed count", "err", err) continue } - if syncTillBlock > 0 && delayedCount >= syncTillBlock { - log.Info("stopping block creation in delayed sequencer", "syncTillBlock", syncTillBlock) + if d.syncTillBlock > 0 && delayedCount >= d.syncTillBlock { + log.Info("stopping block creation in delayed sequencer", "syncTillBlock", d.syncTillBlock) return } select { @@ -241,9 +243,9 @@ func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) { } } -func (d *DelayedSequencer) Start(ctxIn context.Context, syncTillBlock uint64) { +func (d *DelayedSequencer) Start(ctxIn context.Context) { d.StopWaiter.Start(ctxIn, d) d.LaunchThread(func(ctx context.Context) { - d.run(ctx, syncTillBlock) + d.run(ctx) }) } diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index c760a82202..9c432ed013 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -96,13 +96,14 @@ type InboxReader struct { caughtUpChan chan struct{} client *ethclient.Client l1Reader *headerreader.HeaderReader + syncTillBlock uint64 // Atomic lastSeenBatchCount atomic.Uint64 lastReadBatchCount atomic.Uint64 } -func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) { +func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher, syncTillBlock uint64) (*InboxReader, error) { err := config().Validate() if err != nil { return nil, err @@ -116,10 +117,11 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h firstMessageBlock: firstMessageBlock, caughtUpChan: make(chan struct{}), config: config, + syncTillBlock: syncTillBlock, }, nil } -func (r *InboxReader) Start(ctxIn context.Context, syncTillBlock uint64) error { +func (r *InboxReader) Start(ctxIn context.Context) error { r.StopWaiter.Start(ctxIn, r) hadError := false r.LaunchThread(func(ctx context.Context) { @@ -129,8 +131,8 @@ func (r *InboxReader) Start(ctxIn context.Context, syncTillBlock uint64) error { log.Warn("error reading delayed count", "err", err) hadError = true } - if syncTillBlock > 0 && delayedCount >= syncTillBlock { - log.Info("stopping block creation in inbox reader", "syncTillBlock", syncTillBlock) + if r.syncTillBlock > 0 && delayedCount >= r.syncTillBlock { + log.Info("stopping block creation in inbox reader", "syncTillBlock", r.syncTillBlock) return } err = r.run(ctx, hadError) diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index e75e9ee620..a6a4984f38 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -68,7 +68,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (* } transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &DefaultTransactionStreamerConfig } - execEngine, err := gethexec.NewExecutionEngine(bc) + execEngine, err := gethexec.NewExecutionEngine(bc, 0) if err != nil { Fail(t, err) } @@ -106,9 +106,9 @@ func TestTransactionStreamer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := inbox.Start(ctx, 0) + err := inbox.Start(ctx) Require(t, err) - exec.Start(ctx, 0) + exec.Start(ctx) maxExpectedGasCost := big.NewInt(l2pricing.InitialBaseFeeWei) maxExpectedGasCost.Mul(maxExpectedGasCost, big.NewInt(2100*2)) diff --git a/arbnode/node.go b/arbnode/node.go index 3e57dba13b..28935034c6 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -79,7 +79,6 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com type Config struct { Sequencer bool `koanf:"sequencer"` - SyncTillBlock uint64 `koanf:"sync-till-block"` ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"` DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"` @@ -146,7 +145,6 @@ func (c *Config) ValidatorRequired() bool { func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feedOutputEnable bool) { f.Bool(prefix+".sequencer", ConfigDefault.Sequencer, "enable sequencer") - f.Uint64(prefix+".sync-till-block", ConfigDefault.SyncTillBlock, "sync till block") headerreader.AddOptions(prefix+".parent-chain-reader", f) InboxReaderConfigAddOptions(prefix+".inbox-reader", f) DelayedSequencerConfigAddOptions(prefix+".delayed-sequencer", f) @@ -165,7 +163,6 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed var ConfigDefault = Config{ Sequencer: false, - SyncTillBlock: 0, ParentChainReader: headerreader.DefaultConfig, InboxReader: DefaultInboxReaderConfig, DelayedSequencer: DefaultDelayedSequencerConfig, @@ -473,7 +470,7 @@ func createNodeImpl( } if config.SeqCoordinator.Enable { - coordinator, err = NewSeqCoordinator(dataSigner, bpVerifier, txStreamer, exec, syncMonitor, config.SeqCoordinator) + coordinator, err = NewSeqCoordinator(dataSigner, bpVerifier, txStreamer, exec, syncMonitor, config.SeqCoordinator, config.TransactionStreamer.SyncTillBlock) if err != nil { return nil, err } @@ -501,6 +498,7 @@ func createNodeImpl( nil, fatalErrChan, bpVerifier, + config.TransactionStreamer.SyncTillBlock, ) if err != nil { return nil, err @@ -592,7 +590,7 @@ func createNodeImpl( if err != nil { return nil, err } - inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader }) + inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader }, configFetcher.Get().TransactionStreamer.SyncTillBlock) if err != nil { return nil, err } @@ -741,7 +739,7 @@ func createNodeImpl( } // always create DelayedSequencer, it won't do anything if it is disabled - delayedSequencer, err = NewDelayedSequencer(l1Reader, inboxReader, exec, coordinator, func() *DelayedSequencerConfig { return &configFetcher.Get().DelayedSequencer }) + delayedSequencer, err = NewDelayedSequencer(l1Reader, inboxReader, exec, coordinator, func() *DelayedSequencerConfig { return &configFetcher.Get().DelayedSequencer }, configFetcher.Get().TransactionStreamer.SyncTillBlock) if err != nil { return nil, err } @@ -842,7 +840,7 @@ func (n *Node) Start(ctx context.Context) error { if execClient != nil { execClient.SetConsensusClient(n) } - err = n.Execution.Start(ctx, n.configFetcher.Get().SyncTillBlock) + err = n.Execution.Start(ctx) if err != nil { return fmt.Errorf("error starting exec client: %w", err) } @@ -872,12 +870,12 @@ func (n *Node) Start(ctx context.Context) error { return fmt.Errorf("error populating feed backlog on startup: %w", err) } } - err = n.TxStreamer.Start(ctx, n.configFetcher.Get().SyncTillBlock) + err = n.TxStreamer.Start(ctx) if err != nil { return fmt.Errorf("error starting transaction streamer: %w", err) } if n.InboxReader != nil { - err = n.InboxReader.Start(ctx, n.configFetcher.Get().SyncTillBlock) + err = n.InboxReader.Start(ctx) if err != nil { return fmt.Errorf("error starting inbox reader: %w", err) } @@ -890,7 +888,7 @@ func (n *Node) Start(ctx context.Context) error { } } if n.SeqCoordinator != nil { - n.SeqCoordinator.Start(ctx, n.configFetcher.Get().SyncTillBlock) + n.SeqCoordinator.Start(ctx) } else { n.Execution.Activate() } @@ -898,7 +896,7 @@ func (n *Node) Start(ctx context.Context) error { n.MaintenanceRunner.Start(ctx) } if n.DelayedSequencer != nil { - n.DelayedSequencer.Start(ctx, n.configFetcher.Get().SyncTillBlock) + n.DelayedSequencer.Start(ctx) } if n.BatchPoster != nil { n.BatchPoster.Start(ctx) @@ -948,7 +946,7 @@ func (n *Node) Start(ctx context.Context) error { return } } - n.BroadcastClients.Start(ctx, n.configFetcher.Get().SyncTillBlock) + n.BroadcastClients.Start(ctx) }() } if n.configFetcher != nil { diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 6563ffde9a..a94bdc0ac1 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -51,6 +51,7 @@ type SeqCoordinator struct { prevChosenSequencer string reportedWantsLockout bool + syncTillBlock uint64 lockoutUntil atomic.Int64 // atomic @@ -150,6 +151,7 @@ func NewSeqCoordinator( sequencer execution.ExecutionSequencer, sync *SyncMonitor, config SeqCoordinatorConfig, + syncTillBlock uint64, ) (*SeqCoordinator, error) { redisCoordinator, err := redisutil.NewRedisCoordinator(config.RedisUrl) if err != nil { @@ -166,6 +168,7 @@ func NewSeqCoordinator( sequencer: sequencer, config: config, signer: signer, + syncTillBlock: syncTillBlock, } streamer.SetSeqCoordinator(coordinator) return coordinator, nil @@ -854,7 +857,7 @@ func (c *SeqCoordinator) launchHealthcheckServer(ctx context.Context) { } } -func (c *SeqCoordinator) Start(ctxIn context.Context, syncTillBlock uint64) { +func (c *SeqCoordinator) Start(ctxIn context.Context) { c.StopWaiter.Start(ctxIn, c) var newRedisCoordinator *redisutil.RedisCoordinator if c.config.NewRedisUrl != "" { @@ -872,8 +875,8 @@ func (c *SeqCoordinator) Start(ctxIn context.Context, syncTillBlock uint64) { if err != nil { log.Warn("failed to get message count", "err", err) } - if syncTillBlock > 0 && uint64(count) >= syncTillBlock { - log.Info("stopping block creation in sequencer", "syncTillBlock", syncTillBlock) + if c.syncTillBlock > 0 && uint64(count) >= c.syncTillBlock { + log.Info("stopping block creation in sequencer", "syncTillBlock", c.syncTillBlock) return } interval := c.chooseRedisAndUpdate(ctx, newRedisCoordinator) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index a279bb378c..a0607a166e 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -74,6 +74,7 @@ type TransactionStreamerConfig struct { MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"` MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"` ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"` + SyncTillBlock uint64 `koanf:"sync-till-block"` } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig @@ -82,18 +83,21 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 50_000, MaxReorgResequenceDepth: 1024, ExecuteMessageLoopDelay: time.Millisecond * 100, + SyncTillBlock: 0, } var TestTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 10_000, MaxReorgResequenceDepth: 128 * 1024, ExecuteMessageLoopDelay: time.Millisecond, + SyncTillBlock: 0, } func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages") f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)") f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages") + f.Uint64(prefix+".sync-till-block", DefaultTransactionStreamerConfig.SyncTillBlock, "node will not sync past this block") } func NewTransactionStreamer( @@ -1218,14 +1222,14 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc return s.config().ExecuteMessageLoopDelay } -func (s *TransactionStreamer) Start(ctxIn context.Context, syncTillBlock uint64) error { +func (s *TransactionStreamer) Start(ctxIn context.Context) error { s.StopWaiter.Start(ctxIn, s) return s.LaunchThreadSafe(func(ctx context.Context) { var defaultVal struct{} var val struct{} for { - if syncTillBlock > 0 && uint64(s.execLastMsgCount) >= syncTillBlock { - log.Info("stopping block creation in transaction streamer", "syncTillBlock", syncTillBlock) + if s.config().SyncTillBlock > 0 && uint64(s.execLastMsgCount) >= s.config().SyncTillBlock { + log.Info("stopping block creation in transaction streamer", "syncTillBlock", s.config().SyncTillBlock) return } interval := s.executeMessages(ctx, val) diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index a539c104cf..08b3df3518 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -135,6 +135,8 @@ type BroadcastClient struct { retryCount atomic.Int64 + syncTillBlock uint64 + retrying bool shuttingDown bool confirmedSequenceNumberListener chan arbutil.MessageIndex @@ -158,6 +160,7 @@ func NewBroadcastClient( fatalErrChan chan error, addrVerifier contracts.AddressVerifierInterface, adjustCount func(int32), + syncTillBlock uint64, ) (*BroadcastClient, error) { sigVerifier, err := signature.NewVerifier(&config().Verify, addrVerifier) if err != nil { @@ -173,10 +176,11 @@ func NewBroadcastClient( fatalErrChan: fatalErrChan, sigVerifier: sigVerifier, adjustCount: adjustCount, + syncTillBlock: syncTillBlock, }, err } -func (bc *BroadcastClient) Start(ctxIn context.Context, syncTillBlock uint64) { +func (bc *BroadcastClient) Start(ctxIn context.Context) { bc.StopWaiter.Start(ctxIn, bc) if bc.StopWaiter.Stopped() { log.Info("broadcast client has already been stopped, not starting") @@ -186,8 +190,8 @@ func (bc *BroadcastClient) Start(ctxIn context.Context, syncTillBlock uint64) { backoffDuration := bc.config().ReconnectInitialBackoff for { - if syncTillBlock > 0 && uint64(bc.nextSeqNum) >= syncTillBlock { - log.Info("stopping block creation in broadcast client", "syncTillBlock", syncTillBlock) + if bc.syncTillBlock > 0 && uint64(bc.nextSeqNum) >= bc.syncTillBlock { + log.Info("stopping block creation in broadcast client", "syncTillBlock", bc.syncTillBlock) return } earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum) diff --git a/broadcastclient/broadcastclient_test.go b/broadcastclient/broadcastclient_test.go index ecf5a88b3b..6ffa58e621 100644 --- a/broadcastclient/broadcastclient_test.go +++ b/broadcastclient/broadcastclient_test.go @@ -153,7 +153,7 @@ func TestInvalidSignature(t *testing.T) { &badSequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) go func() { for i := 0; i < messageCount; i++ { @@ -210,7 +210,7 @@ func newTestBroadcastClient(config Config, listenerAddress net.Addr, chainId uin } else { config.Verify.AcceptSequencer = false } - return NewBroadcastClient(func() *Config { return &config }, fmt.Sprintf("ws://127.0.0.1:%d/", port), chainId, currentMessageCount, txStreamer, confirmedSequenceNumberListener, feedErrChan, av, func(_ int32) {}) + return NewBroadcastClient(func() *Config { return &config }, fmt.Sprintf("ws://127.0.0.1:%d/", port), chainId, currentMessageCount, txStreamer, confirmedSequenceNumberListener, feedErrChan, av, func(_ int32) {}, 0) } func startMakeBroadcastClient(ctx context.Context, t *testing.T, clientConfig Config, addr net.Addr, index int, expectedCount int, chainId uint64, wg *sync.WaitGroup, sequencerAddr *common.Address) { @@ -227,7 +227,7 @@ func startMakeBroadcastClient(ctx context.Context, t *testing.T, clientConfig Co sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) messageCount := 0 wg.Add(1) @@ -315,7 +315,7 @@ func TestServerClientDisconnect(t *testing.T) { &sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) t.Log("broadcasting seq 0 message") Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil)) @@ -386,7 +386,7 @@ func TestBroadcastClientConfirmedMessage(t *testing.T) { &sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) t.Log("broadcasting seq 0 message") Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil)) @@ -458,7 +458,7 @@ func TestServerIncorrectChainId(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx, 0) + badBroadcastClient.Start(ctx) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -517,7 +517,7 @@ func TestServerMissingChainId(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx, 0) + badBroadcastClient.Start(ctx) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -574,7 +574,7 @@ func TestServerIncorrectFeedServerVersion(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx, 0) + badBroadcastClient.Start(ctx) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -633,7 +633,7 @@ func TestServerMissingFeedServerVersion(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx, 0) + badBroadcastClient.Start(ctx) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -684,7 +684,7 @@ func TestBroadcastClientReconnectsOnServerDisconnect(t *testing.T) { &sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) defer broadcastClient.StopAndWait() // Client set to timeout connection at 200 milliseconds, and server set to send ping every 50 seconds, @@ -796,7 +796,7 @@ func connectAndGetCachedMessages(ctx context.Context, addr net.Addr, chainId uin sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) go func() { defer wg.Done() diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index a2c4fa13dd..a72b6088e0 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -48,6 +48,8 @@ type BroadcastClients struct { primaryRouter *Router secondaryRouter *Router + syncTillBlock uint64 + // Use atomic access connected atomic.Int32 } @@ -60,6 +62,7 @@ func NewBroadcastClients( confirmedSequenceNumberListener chan arbutil.MessageIndex, fatalErrChan chan error, addrVerifier contracts.AddressVerifierInterface, + syncTillBlock uint64, ) (*BroadcastClients, error) { config := configFetcher() if len(config.URL) == 0 && len(config.SecondaryURL) == 0 { @@ -79,6 +82,7 @@ func NewBroadcastClients( primaryClients: make([]*broadcastclient.BroadcastClient, 0, len(config.URL)), secondaryClients: make([]*broadcastclient.BroadcastClient, 0, len(config.SecondaryURL)), secondaryURL: config.SecondaryURL, + syncTillBlock: syncTillBlock, } clients.makeClient = func(url string, router *Router) (*broadcastclient.BroadcastClient, error) { return broadcastclient.NewBroadcastClient( @@ -91,6 +95,7 @@ func NewBroadcastClients( fatalErrChan, addrVerifier, func(delta int32) { clients.adjustCount(delta) }, + syncTillBlock, ) } @@ -131,12 +136,12 @@ func clearAndResetTicker(timer *time.Ticker, interval time.Duration) { timer.Reset(interval) } -func (bcs *BroadcastClients) Start(ctx context.Context, syncTillBlock uint64) { +func (bcs *BroadcastClients) Start(ctx context.Context) { bcs.primaryRouter.StopWaiter.Start(ctx, bcs.primaryRouter) bcs.secondaryRouter.StopWaiter.Start(ctx, bcs.secondaryRouter) for _, client := range bcs.primaryClients { - client.Start(ctx, syncTillBlock) + client.Start(ctx) } var lastConfirmed arbutil.MessageIndex @@ -178,8 +183,8 @@ func (bcs *BroadcastClients) Start(ctx context.Context, syncTillBlock uint64) { // Multiple select statements to prioritize reading messages from primary feeds' channels and avoid starving of timers var msg m.BroadcastFeedMessage for { - if syncTillBlock > 0 && uint64(msg.SequenceNumber) >= syncTillBlock { - log.Info("stopping block creation in broadcast client", "syncTillBlock", syncTillBlock) + if bcs.syncTillBlock > 0 && uint64(msg.SequenceNumber) >= bcs.syncTillBlock { + log.Info("stopping block creation in broadcast client", "syncTillBlock", bcs.syncTillBlock) return } select { @@ -234,7 +239,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context, syncTillBlock uint64) { clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME) clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME) case <-startSecondaryFeedTimer.C: - bcs.startSecondaryFeed(ctx, syncTillBlock) + bcs.startSecondaryFeed(ctx) case <-primaryFeedIsDownTimer.C: clearAndResetTicker(stopSecondaryFeedTimer, PRIMARY_FEED_UPTIME) } @@ -243,7 +248,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context, syncTillBlock uint64) { }) } -func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context, syncTillBlock uint64) { +func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { pos := len(bcs.secondaryClients) if pos < len(bcs.secondaryURL) { url := bcs.secondaryURL[pos] @@ -254,7 +259,7 @@ func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context, syncTillBlo return } bcs.secondaryClients = append(bcs.secondaryClients, client) - client.Start(ctx, syncTillBlock) + client.Start(ctx) log.Info("secondary feed started", "url", url) } else if len(bcs.secondaryURL) > 0 { log.Warn("failed to start a new secondary feed all available secondary feeds were started") diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 076ac66ec4..95048be325 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -527,6 +527,7 @@ func mainImpl() int { l2BlockChain, l1Client, func() *gethexec.Config { return &liveNodeConfig.Get().Execution }, + liveNodeConfig.Get().Node.TransactionStreamer.SyncTillBlock, ) if err != nil { log.Error("failed to create execution node", "err", err) diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index e7d639bf42..b8925cd5b1 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -92,6 +92,7 @@ type ExecutionEngine struct { prefetchBlock bool cachedL1PriceData *L1PriceData + syncTillBlock uint64 } func NewL1PriceData() *L1PriceData { @@ -100,12 +101,13 @@ func NewL1PriceData() *L1PriceData { } } -func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) { +func NewExecutionEngine(bc *core.BlockChain, syncTillBlock uint64) (*ExecutionEngine, error) { return &ExecutionEngine{ bc: bc, resequenceChan: make(chan []*arbostypes.MessageWithMetadata), newBlockNotifier: make(chan struct{}, 1), cachedL1PriceData: NewL1PriceData(), + syncTillBlock: syncTillBlock, }, nil } @@ -943,12 +945,12 @@ func (s *ExecutionEngine) ArbOSVersionForMessageNumber(messageNum arbutil.Messag return extra.ArbOSFormatVersion, nil } -func (s *ExecutionEngine) Start(ctx_in context.Context, syncTillBlock uint64) { +func (s *ExecutionEngine) Start(ctx_in context.Context) { s.StopWaiter.Start(ctx_in, s) s.LaunchThread(func(ctx context.Context) { for { - if syncTillBlock > 0 && s.latestBlock.NumberU64() >= syncTillBlock { - log.Info("stopping block creation in execution engine", "syncTillBlock", syncTillBlock) + if s.syncTillBlock > 0 && s.latestBlock.NumberU64() >= s.syncTillBlock { + log.Info("stopping block creation in execution engine", "syncTillBlock", s.syncTillBlock) return } select { diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index e2e3f7282f..3f743887be 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -173,6 +173,7 @@ type ExecutionNode struct { ParentChainReader *headerreader.HeaderReader ClassicOutbox *ClassicOutboxRetriever started atomic.Bool + syncTillBlock uint64 } func CreateExecutionNode( @@ -182,9 +183,10 @@ func CreateExecutionNode( l2BlockChain *core.BlockChain, l1client *ethclient.Client, configFetcher ConfigFetcher, + syncTillBlock uint64, ) (*ExecutionNode, error) { config := configFetcher() - execEngine, err := NewExecutionEngine(l2BlockChain) + execEngine, err := NewExecutionEngine(l2BlockChain, syncTillBlock) if config.EnablePrefetchBlock { execEngine.EnablePrefetchBlock() } @@ -308,6 +310,7 @@ func CreateExecutionNode( SyncMonitor: syncMon, ParentChainReader: parentChainReader, ClassicOutbox: classicOutbox, + syncTillBlock: syncTillBlock, }, nil } @@ -339,7 +342,7 @@ func (n *ExecutionNode) Initialize(ctx context.Context) error { } // not thread safe -func (n *ExecutionNode) Start(ctx context.Context, syncTillBlock uint64) error { +func (n *ExecutionNode) Start(ctx context.Context) error { if n.started.Swap(true) { return errors.New("already started") } @@ -348,7 +351,7 @@ func (n *ExecutionNode) Start(ctx context.Context, syncTillBlock uint64) error { // if err != nil { // return fmt.Errorf("error starting geth stack: %w", err) // } - n.ExecEngine.Start(ctx, syncTillBlock) + n.ExecEngine.Start(ctx) err := n.TxPublisher.Start(ctx) if err != nil { return fmt.Errorf("error starting transaction puiblisher: %w", err) diff --git a/execution/interface.go b/execution/interface.go index 428b79cf89..2a3d79c697 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -64,7 +64,7 @@ type FullExecutionClient interface { ExecutionRecorder ExecutionSequencer - Start(ctx context.Context, syncTillBlock uint64) error + Start(ctx context.Context) error StopAndWait() Maintenance() error diff --git a/relay/relay.go b/relay/relay.go index 936e3e5b1a..ac8906661d 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -56,6 +56,7 @@ func NewRelay(config *Config, feedErrChan chan error) (*Relay, error) { confirmedSequenceNumberListener, feedErrChan, nil, + 0, ) if err != nil { return nil, err @@ -86,7 +87,7 @@ func (r *Relay) Start(ctx context.Context) error { return errors.New("broadcast unable to start") } - r.broadcastClients.Start(ctx, 0) + r.broadcastClients.Start(ctx) r.LaunchThread(func(ctx context.Context) { for { diff --git a/relay/relay_stress_test.go b/relay/relay_stress_test.go index 287d1a795b..5d469e5f57 100644 --- a/relay/relay_stress_test.go +++ b/relay/relay_stress_test.go @@ -138,11 +138,11 @@ func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize for i := 0; i < numClients; i++ { ts := &dummyTxStreamer{id: i} streamers = append(streamers, ts) - client, err := broadcastclient.NewBroadcastClient(func() *broadcastclient.Config { return &clientConfig }, relayURL, relayConfig.Chain.ID, 0, ts, nil, fatalErrChan, nil, func(_ int32) {}) + client, err := broadcastclient.NewBroadcastClient(func() *broadcastclient.Config { return &clientConfig }, relayURL, relayConfig.Chain.ID, 0, ts, nil, fatalErrChan, nil, func(_ int32) {}, 0) if err != nil { t.FailNow() } - client.Start(ctx, 0) + client.Start(ctx) defer client.StopOnly() } diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 027a41d875..e388441d3c 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -476,7 +476,7 @@ func buildOnParentChain( Require(t, execConfig.Validate()) execConfigToBeUsedInConfigFetcher := execConfig execConfigFetcher := func() *gethexec.Config { return execConfigToBeUsedInConfigFetcher } - execNode, err := gethexec.CreateExecutionNode(ctx, chainTestClient.Stack, chainDb, blockchain, parentChainTestClient.Client, execConfigFetcher) + execNode, err := gethexec.CreateExecutionNode(ctx, chainTestClient.Stack, chainDb, blockchain, parentChainTestClient.Client, execConfigFetcher, 0) Require(t, err) fatalErrChan := make(chan error, 10) @@ -596,7 +596,7 @@ func (b *NodeBuilder) BuildL2(t *testing.T) func() { Require(t, b.execConfig.Validate()) execConfig := b.execConfig execConfigFetcher := func() *gethexec.Config { return execConfig } - execNode, err := gethexec.CreateExecutionNode(b.ctx, b.L2.Stack, chainDb, blockchain, nil, execConfigFetcher) + execNode, err := gethexec.CreateExecutionNode(b.ctx, b.L2.Stack, chainDb, blockchain, nil, execConfigFetcher, 0) Require(t, err) fatalErrChan := make(chan error, 10) @@ -645,7 +645,7 @@ func (b *NodeBuilder) RestartL2Node(t *testing.T) { l2info, stack, chainDb, arbDb, blockchain := createNonL1BlockChainWithStackConfig(t, b.L2Info, b.dataDir, b.chainConfig, b.initMessage, b.l2StackConfig, b.execConfig, b.wasmCacheTag) execConfigFetcher := func() *gethexec.Config { return b.execConfig } - execNode, err := gethexec.CreateExecutionNode(b.ctx, stack, chainDb, blockchain, nil, execConfigFetcher) + execNode, err := gethexec.CreateExecutionNode(b.ctx, stack, chainDb, blockchain, nil, execConfigFetcher, 0) Require(t, err) feedErrChan := make(chan error, 10) @@ -1445,7 +1445,7 @@ func Create2ndNodeWithConfig( Require(t, execConfig.Validate()) Require(t, nodeConfig.Validate()) configFetcher := func() *gethexec.Config { return execConfig } - currentExec, err := gethexec.CreateExecutionNode(ctx, chainStack, chainDb, blockchain, parentChainClient, configFetcher) + currentExec, err := gethexec.CreateExecutionNode(ctx, chainStack, chainDb, blockchain, parentChainClient, configFetcher, 0) Require(t, err) currentNode, err := arbnode.CreateNode(ctx, chainStack, currentExec, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainClient, addresses, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil) From 3bdd14df9f67a3c2eca37f64d8da0f5ca5a57a93 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Wed, 6 Nov 2024 12:38:21 +0530 Subject: [PATCH 3/8] Changes based on PR comments --- arbnode/delayed_sequencer.go | 14 +++++--------- arbnode/inbox_reader.go | 15 ++++----------- arbnode/node.go | 4 ++-- arbnode/seq_coordinator.go | 11 ++--------- execution/gethexec/executionengine.go | 6 ++++++ execution/gethexec/node.go | 2 -- 6 files changed, 19 insertions(+), 33 deletions(-) diff --git a/arbnode/delayed_sequencer.go b/arbnode/delayed_sequencer.go index 10e845a393..abdea7ce7e 100644 --- a/arbnode/delayed_sequencer.go +++ b/arbnode/delayed_sequencer.go @@ -17,6 +17,7 @@ import ( "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/execution" + "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/stopwaiter" ) @@ -218,15 +219,6 @@ func (d *DelayedSequencer) run(ctx context.Context) { defer cancel() for { - delayedCount, err := d.inbox.GetDelayedCount() - if err != nil { - log.Warn("error reading delayed count", "err", err) - continue - } - if d.syncTillBlock > 0 && delayedCount >= d.syncTillBlock { - log.Info("stopping block creation in delayed sequencer", "syncTillBlock", d.syncTillBlock) - return - } select { case nextHeader, ok := <-headerChan: if !ok { @@ -234,6 +226,10 @@ func (d *DelayedSequencer) run(ctx context.Context) { return } if err := d.trySequence(ctx, nextHeader); err != nil { + if errors.Is(err, gethexec.ExecutionEngineBlockCreationStopped) { + log.Info("stopping block creation in delayed sequencer", "syncTillBlock", d.syncTillBlock) + return + } log.Error("Delayed sequencer error", "err", err) } case <-ctx.Done(): diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 9c432ed013..ea48f015b6 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -96,14 +96,13 @@ type InboxReader struct { caughtUpChan chan struct{} client *ethclient.Client l1Reader *headerreader.HeaderReader - syncTillBlock uint64 // Atomic lastSeenBatchCount atomic.Uint64 lastReadBatchCount atomic.Uint64 } -func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher, syncTillBlock uint64) (*InboxReader, error) { +func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) { err := config().Validate() if err != nil { return nil, err @@ -117,7 +116,6 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h firstMessageBlock: firstMessageBlock, caughtUpChan: make(chan struct{}), config: config, - syncTillBlock: syncTillBlock, }, nil } @@ -126,16 +124,11 @@ func (r *InboxReader) Start(ctxIn context.Context) error { hadError := false r.LaunchThread(func(ctx context.Context) { for { - delayedCount, err := r.tracker.GetDelayedCount() - if err != nil { - log.Warn("error reading delayed count", "err", err) - hadError = true - } - if r.syncTillBlock > 0 && delayedCount >= r.syncTillBlock { - log.Info("stopping block creation in inbox reader", "syncTillBlock", r.syncTillBlock) + if r.tracker.txStreamer.Stopped() { + log.Info("stopping block creation in inbox reader because transaction streamer has stopped") return } - err = r.run(ctx, hadError) + err := r.run(ctx, hadError) if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") { log.Warn("error reading inbox", "err", err) hadError = true diff --git a/arbnode/node.go b/arbnode/node.go index 28935034c6..a0b4291963 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -470,7 +470,7 @@ func createNodeImpl( } if config.SeqCoordinator.Enable { - coordinator, err = NewSeqCoordinator(dataSigner, bpVerifier, txStreamer, exec, syncMonitor, config.SeqCoordinator, config.TransactionStreamer.SyncTillBlock) + coordinator, err = NewSeqCoordinator(dataSigner, bpVerifier, txStreamer, exec, syncMonitor, config.SeqCoordinator) if err != nil { return nil, err } @@ -590,7 +590,7 @@ func createNodeImpl( if err != nil { return nil, err } - inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader }, configFetcher.Get().TransactionStreamer.SyncTillBlock) + inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader }) if err != nil { return nil, err } diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index a94bdc0ac1..ebbd179c24 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -51,7 +51,6 @@ type SeqCoordinator struct { prevChosenSequencer string reportedWantsLockout bool - syncTillBlock uint64 lockoutUntil atomic.Int64 // atomic @@ -151,7 +150,6 @@ func NewSeqCoordinator( sequencer execution.ExecutionSequencer, sync *SyncMonitor, config SeqCoordinatorConfig, - syncTillBlock uint64, ) (*SeqCoordinator, error) { redisCoordinator, err := redisutil.NewRedisCoordinator(config.RedisUrl) if err != nil { @@ -168,7 +166,6 @@ func NewSeqCoordinator( sequencer: sequencer, config: config, signer: signer, - syncTillBlock: syncTillBlock, } streamer.SetSeqCoordinator(coordinator) return coordinator, nil @@ -871,12 +868,8 @@ func (c *SeqCoordinator) Start(ctxIn context.Context) { c.LaunchThread(func(ctx context.Context) { for { - count, err := c.streamer.GetMessageCount() - if err != nil { - log.Warn("failed to get message count", "err", err) - } - if c.syncTillBlock > 0 && uint64(count) >= c.syncTillBlock { - log.Info("stopping block creation in sequencer", "syncTillBlock", c.syncTillBlock) + if c.streamer.Stopped() { + log.Info("stopping block creation in sequencer because transaction streamer has stopped") return } interval := c.chooseRedisAndUpdate(ctx, newRedisCoordinator) diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index b8925cd5b1..790721af69 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -55,6 +55,8 @@ var ( gasUsedSinceStartupCounter = metrics.NewRegisteredCounter("arb/gas_used", nil) ) +var ExecutionEngineBlockCreationStopped = errors.New("block creation stopped in execution engine") + type L1PriceDataOfMsg struct { callDataUnits uint64 cummulativeCallDataUnits uint64 @@ -588,6 +590,10 @@ func (s *ExecutionEngine) SequenceDelayedMessage(message *arbostypes.L1IncomingM } func (s *ExecutionEngine) sequenceDelayedMessageWithBlockMutex(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) (*types.Block, error) { + if s.syncTillBlock > 0 && s.latestBlock.NumberU64() >= s.syncTillBlock { + return nil, ExecutionEngineBlockCreationStopped + } + currentHeader, err := s.getCurrentHeader() if err != nil { return nil, err diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 3f743887be..85b4213e8c 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -173,7 +173,6 @@ type ExecutionNode struct { ParentChainReader *headerreader.HeaderReader ClassicOutbox *ClassicOutboxRetriever started atomic.Bool - syncTillBlock uint64 } func CreateExecutionNode( @@ -310,7 +309,6 @@ func CreateExecutionNode( SyncMonitor: syncMon, ParentChainReader: parentChainReader, ClassicOutbox: classicOutbox, - syncTillBlock: syncTillBlock, }, nil } From 22393a9b156935cd40c52f691357332f7a46ae43 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Wed, 6 Nov 2024 13:07:55 +0530 Subject: [PATCH 4/8] Changes based on PR comments --- arbnode/delayed_sequencer.go | 20 +++++++------- arbnode/node.go | 3 +-- arbnode/transaction_streamer.go | 36 +++++++------------------ broadcastclient/broadcastclient.go | 15 +++++------ broadcastclient/broadcastclient_test.go | 2 +- broadcastclients/broadcastclients.go | 14 ++++------ relay/relay.go | 1 - relay/relay_stress_test.go | 2 +- 8 files changed, 33 insertions(+), 60 deletions(-) diff --git a/arbnode/delayed_sequencer.go b/arbnode/delayed_sequencer.go index abdea7ce7e..7985f6d197 100644 --- a/arbnode/delayed_sequencer.go +++ b/arbnode/delayed_sequencer.go @@ -33,7 +33,6 @@ type DelayedSequencer struct { waitingForFinalizedBlock uint64 mutex sync.Mutex config DelayedSequencerConfigFetcher - syncTillBlock uint64 } type DelayedSequencerConfig struct { @@ -66,16 +65,15 @@ var TestDelayedSequencerConfig = DelayedSequencerConfig{ UseMergeFinality: false, } -func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReader, exec execution.ExecutionSequencer, coordinator *SeqCoordinator, config DelayedSequencerConfigFetcher, syncTillBlock uint64) (*DelayedSequencer, error) { +func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReader, exec execution.ExecutionSequencer, coordinator *SeqCoordinator, config DelayedSequencerConfigFetcher) (*DelayedSequencer, error) { d := &DelayedSequencer{ - l1Reader: l1Reader, - bridge: reader.DelayedBridge(), - inbox: reader.Tracker(), - reader: reader, - coordinator: coordinator, - exec: exec, - config: config, - syncTillBlock: syncTillBlock, + l1Reader: l1Reader, + bridge: reader.DelayedBridge(), + inbox: reader.Tracker(), + reader: reader, + coordinator: coordinator, + exec: exec, + config: config, } if coordinator != nil { coordinator.SetDelayedSequencer(d) @@ -227,7 +225,7 @@ func (d *DelayedSequencer) run(ctx context.Context) { } if err := d.trySequence(ctx, nextHeader); err != nil { if errors.Is(err, gethexec.ExecutionEngineBlockCreationStopped) { - log.Info("stopping block creation in delayed sequencer", "syncTillBlock", d.syncTillBlock) + log.Info("stopping block creation in delayed sequencer because execution engine is stopped") return } log.Error("Delayed sequencer error", "err", err) diff --git a/arbnode/node.go b/arbnode/node.go index a0b4291963..c5b3bbe071 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -498,7 +498,6 @@ func createNodeImpl( nil, fatalErrChan, bpVerifier, - config.TransactionStreamer.SyncTillBlock, ) if err != nil { return nil, err @@ -739,7 +738,7 @@ func createNodeImpl( } // always create DelayedSequencer, it won't do anything if it is disabled - delayedSequencer, err = NewDelayedSequencer(l1Reader, inboxReader, exec, coordinator, func() *DelayedSequencerConfig { return &configFetcher.Get().DelayedSequencer }, configFetcher.Get().TransactionStreamer.SyncTillBlock) + delayedSequencer, err = NewDelayedSequencer(l1Reader, inboxReader, exec, coordinator, func() *DelayedSequencerConfig { return &configFetcher.Get().DelayedSequencer }) if err != nil { return nil, err } diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index a0607a166e..79b1d1e1da 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -29,6 +29,7 @@ import ( "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/broadcastclient" "github.com/offchainlabs/nitro/broadcaster" m "github.com/offchainlabs/nitro/broadcaster/message" "github.com/offchainlabs/nitro/execution" @@ -1048,6 +1049,10 @@ func (s *TransactionStreamer) broadcastMessages( // The mutex must be held, and pos must be the latest message count. // `batch` may be nil, which initializes a new batch. The batch is closed out in this function. func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { + if s.config().SyncTillBlock > 0 && uint64(pos) > s.config().SyncTillBlock { + return broadcastclient.TransactionStreamerBlockCreationStopped + } + if batch == nil { batch = s.db.NewBatch() } @@ -1216,6 +1221,10 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool { } func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struct{}) time.Duration { + if s.config().SyncTillBlock > 0 && uint64(s.execLastMsgCount) >= s.config().SyncTillBlock { + log.Info("stopping block creation in transaction streamer", "syncTillBlock", s.config().SyncTillBlock) + return s.config().ExecuteMessageLoopDelay + } if s.ExecuteNextMsg(ctx) { return 0 } @@ -1224,30 +1233,5 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc func (s *TransactionStreamer) Start(ctxIn context.Context) error { s.StopWaiter.Start(ctxIn, s) - return s.LaunchThreadSafe(func(ctx context.Context) { - var defaultVal struct{} - var val struct{} - for { - if s.config().SyncTillBlock > 0 && uint64(s.execLastMsgCount) >= s.config().SyncTillBlock { - log.Info("stopping block creation in transaction streamer", "syncTillBlock", s.config().SyncTillBlock) - return - } - interval := s.executeMessages(ctx, val) - if ctx.Err() != nil { - return - } - val = defaultVal - if interval == time.Duration(0) { - continue - } - timer := time.NewTimer(interval) - select { - case <-ctx.Done(): - timer.Stop() - return - case <-timer.C: - case val = <-s.newMessageNotifier: - } - } - }) + return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier) } diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 08b3df3518..e712db1236 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -38,6 +38,8 @@ var ( sourcesDisconnectedGauge = metrics.NewRegisteredGauge("arb/feed/sources/disconnected", nil) ) +var TransactionStreamerBlockCreationStopped = errors.New("block creation stopped in transaction streamer") + type FeedConfig struct { Output wsbroadcastserver.BroadcasterConfig `koanf:"output" reload:"hot"` Input Config `koanf:"input" reload:"hot"` @@ -135,8 +137,6 @@ type BroadcastClient struct { retryCount atomic.Int64 - syncTillBlock uint64 - retrying bool shuttingDown bool confirmedSequenceNumberListener chan arbutil.MessageIndex @@ -160,7 +160,6 @@ func NewBroadcastClient( fatalErrChan chan error, addrVerifier contracts.AddressVerifierInterface, adjustCount func(int32), - syncTillBlock uint64, ) (*BroadcastClient, error) { sigVerifier, err := signature.NewVerifier(&config().Verify, addrVerifier) if err != nil { @@ -176,7 +175,6 @@ func NewBroadcastClient( fatalErrChan: fatalErrChan, sigVerifier: sigVerifier, adjustCount: adjustCount, - syncTillBlock: syncTillBlock, }, err } @@ -189,11 +187,6 @@ func (bc *BroadcastClient) Start(ctxIn context.Context) { bc.LaunchThread(func(ctx context.Context) { backoffDuration := bc.config().ReconnectInitialBackoff for { - - if bc.syncTillBlock > 0 && uint64(bc.nextSeqNum) >= bc.syncTillBlock { - log.Info("stopping block creation in broadcast client", "syncTillBlock", bc.syncTillBlock) - return - } earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum) if errors.Is(err, ErrMissingChainId) || errors.Is(err, ErrIncorrectChainId) || @@ -443,6 +436,10 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) { bc.nextSeqNum = message.SequenceNumber + 1 } if err := bc.txStreamer.AddBroadcastMessages(res.Messages); err != nil { + if errors.Is(err, TransactionStreamerBlockCreationStopped) { + log.Info("stopping block creation in broadcast client because transaction streamer is stopped") + return + } log.Error("Error adding message from Sequencer Feed", "err", err) } } diff --git a/broadcastclient/broadcastclient_test.go b/broadcastclient/broadcastclient_test.go index 6ffa58e621..a499628cd5 100644 --- a/broadcastclient/broadcastclient_test.go +++ b/broadcastclient/broadcastclient_test.go @@ -210,7 +210,7 @@ func newTestBroadcastClient(config Config, listenerAddress net.Addr, chainId uin } else { config.Verify.AcceptSequencer = false } - return NewBroadcastClient(func() *Config { return &config }, fmt.Sprintf("ws://127.0.0.1:%d/", port), chainId, currentMessageCount, txStreamer, confirmedSequenceNumberListener, feedErrChan, av, func(_ int32) {}, 0) + return NewBroadcastClient(func() *Config { return &config }, fmt.Sprintf("ws://127.0.0.1:%d/", port), chainId, currentMessageCount, txStreamer, confirmedSequenceNumberListener, feedErrChan, av, func(_ int32) {}) } func startMakeBroadcastClient(ctx context.Context, t *testing.T, clientConfig Config, addr net.Addr, index int, expectedCount int, chainId uint64, wg *sync.WaitGroup, sequencerAddr *common.Address) { diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index a72b6088e0..5be326cfef 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -5,6 +5,7 @@ package broadcastclients import ( "context" + "errors" "sync/atomic" "time" @@ -48,8 +49,6 @@ type BroadcastClients struct { primaryRouter *Router secondaryRouter *Router - syncTillBlock uint64 - // Use atomic access connected atomic.Int32 } @@ -62,7 +61,6 @@ func NewBroadcastClients( confirmedSequenceNumberListener chan arbutil.MessageIndex, fatalErrChan chan error, addrVerifier contracts.AddressVerifierInterface, - syncTillBlock uint64, ) (*BroadcastClients, error) { config := configFetcher() if len(config.URL) == 0 && len(config.SecondaryURL) == 0 { @@ -82,7 +80,6 @@ func NewBroadcastClients( primaryClients: make([]*broadcastclient.BroadcastClient, 0, len(config.URL)), secondaryClients: make([]*broadcastclient.BroadcastClient, 0, len(config.SecondaryURL)), secondaryURL: config.SecondaryURL, - syncTillBlock: syncTillBlock, } clients.makeClient = func(url string, router *Router) (*broadcastclient.BroadcastClient, error) { return broadcastclient.NewBroadcastClient( @@ -95,7 +92,6 @@ func NewBroadcastClients( fatalErrChan, addrVerifier, func(delta int32) { clients.adjustCount(delta) }, - syncTillBlock, ) } @@ -183,10 +179,6 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { // Multiple select statements to prioritize reading messages from primary feeds' channels and avoid starving of timers var msg m.BroadcastFeedMessage for { - if bcs.syncTillBlock > 0 && uint64(msg.SequenceNumber) >= bcs.syncTillBlock { - log.Info("stopping block creation in broadcast client", "syncTillBlock", bcs.syncTillBlock) - return - } select { // Cycle buckets to get rid of old entries case <-recentFeedItemsCleanup.C: @@ -204,6 +196,10 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { // Primary feeds case msg = <-bcs.primaryRouter.messageChan: if err := msgHandler(msg, bcs.primaryRouter); err != nil { + if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) { + log.Info("stopping block creation in broadcast clients because transaction streamer is stopped") + return + } log.Error("Error routing message from Primary Sequencer Feeds", "err", err) } clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME) diff --git a/relay/relay.go b/relay/relay.go index ac8906661d..89bb899f29 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -56,7 +56,6 @@ func NewRelay(config *Config, feedErrChan chan error) (*Relay, error) { confirmedSequenceNumberListener, feedErrChan, nil, - 0, ) if err != nil { return nil, err diff --git a/relay/relay_stress_test.go b/relay/relay_stress_test.go index 5d469e5f57..575a77ee6f 100644 --- a/relay/relay_stress_test.go +++ b/relay/relay_stress_test.go @@ -138,7 +138,7 @@ func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize for i := 0; i < numClients; i++ { ts := &dummyTxStreamer{id: i} streamers = append(streamers, ts) - client, err := broadcastclient.NewBroadcastClient(func() *broadcastclient.Config { return &clientConfig }, relayURL, relayConfig.Chain.ID, 0, ts, nil, fatalErrChan, nil, func(_ int32) {}, 0) + client, err := broadcastclient.NewBroadcastClient(func() *broadcastclient.Config { return &clientConfig }, relayURL, relayConfig.Chain.ID, 0, ts, nil, fatalErrChan, nil, func(_ int32) {}) if err != nil { t.FailNow() } From 94e31c073666c6dee37658e11fd7c53b054d55f1 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Wed, 6 Nov 2024 13:10:52 +0530 Subject: [PATCH 5/8] minor fix --- arbnode/delayed_sequencer.go | 4 +--- broadcastclients/broadcastclients.go | 7 +++---- execution/gethexec/executionengine.go | 3 +-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/arbnode/delayed_sequencer.go b/arbnode/delayed_sequencer.go index 7985f6d197..72b68b5e7b 100644 --- a/arbnode/delayed_sequencer.go +++ b/arbnode/delayed_sequencer.go @@ -239,7 +239,5 @@ func (d *DelayedSequencer) run(ctx context.Context) { func (d *DelayedSequencer) Start(ctxIn context.Context) { d.StopWaiter.Start(ctxIn, d) - d.LaunchThread(func(ctx context.Context) { - d.run(ctx) - }) + d.LaunchThread(d.run) } diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 5be326cfef..a6d81068b5 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -177,7 +177,6 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { } // Multiple select statements to prioritize reading messages from primary feeds' channels and avoid starving of timers - var msg m.BroadcastFeedMessage for { select { // Cycle buckets to get rid of old entries @@ -194,7 +193,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { case <-ctx.Done(): return // Primary feeds - case msg = <-bcs.primaryRouter.messageChan: + case msg := <-bcs.primaryRouter.messageChan: if err := msgHandler(msg, bcs.primaryRouter); err != nil { if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) { log.Info("stopping block creation in broadcast clients because transaction streamer is stopped") @@ -216,7 +215,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { case <-ctx.Done(): return // Secondary Feeds - case msg = <-bcs.secondaryRouter.messageChan: + case msg := <-bcs.secondaryRouter.messageChan: if err := msgHandler(msg, bcs.secondaryRouter); err != nil { log.Error("Error routing message from Secondary Sequencer Feeds", "err", err) } @@ -224,7 +223,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { case cs := <-bcs.secondaryRouter.confirmedSequenceNumberChan: confSeqHandler(cs, bcs.secondaryRouter) clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME) - case msg = <-bcs.primaryRouter.messageChan: + case msg := <-bcs.primaryRouter.messageChan: if err := msgHandler(msg, bcs.primaryRouter); err != nil { log.Error("Error routing message from Primary Sequencer Feeds", "err", err) } diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 790721af69..bb10102018 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -396,8 +396,7 @@ func (s *ExecutionEngine) resequenceReorgedMessages(messages []*arbostypes.Messa log.Info("not resequencing delayed message due to unexpected index", "expected", nextDelayedSeqNum, "found", delayedSeqNum) continue } - _, err := s. - sequenceDelayedMessageWithBlockMutex(msg.Message, delayedSeqNum) + _, err := s.sequenceDelayedMessageWithBlockMutex(msg.Message, delayedSeqNum) if err != nil { log.Error("failed to re-sequence old delayed message removed by reorg", "err", err) } From e37c1380fd35678bbf6370278b1634f491a81b6c Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Wed, 6 Nov 2024 13:25:10 +0530 Subject: [PATCH 6/8] minor fix --- arbnode/delayed_sequencer.go | 2 +- arbnode/inbox_reader.go | 5 ++-- arbnode/seq_coordinator.go | 36 +++++++++++++++------------- broadcastclient/broadcastclient.go | 2 +- broadcastclients/broadcastclients.go | 2 +- 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/arbnode/delayed_sequencer.go b/arbnode/delayed_sequencer.go index 72b68b5e7b..c63bf8d7e0 100644 --- a/arbnode/delayed_sequencer.go +++ b/arbnode/delayed_sequencer.go @@ -225,7 +225,7 @@ func (d *DelayedSequencer) run(ctx context.Context) { } if err := d.trySequence(ctx, nextHeader); err != nil { if errors.Is(err, gethexec.ExecutionEngineBlockCreationStopped) { - log.Info("stopping block creation in delayed sequencer because execution engine is stopped") + log.Info("stopping block creation in delayed sequencer because execution engine has stopped") return } log.Error("Delayed sequencer error", "err", err) diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index ea48f015b6..b1fa63462e 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -19,6 +19,7 @@ import ( flag "github.com/spf13/pflag" "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/broadcastclient" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/stopwaiter" @@ -124,11 +125,11 @@ func (r *InboxReader) Start(ctxIn context.Context) error { hadError := false r.LaunchThread(func(ctx context.Context) { for { - if r.tracker.txStreamer.Stopped() { + err := r.run(ctx, hadError) + if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) { log.Info("stopping block creation in inbox reader because transaction streamer has stopped") return } - err := r.run(ctx, hadError) if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") { log.Warn("error reading inbox", "err", err) hadError = true diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index ebbd179c24..f44619ec61 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -22,6 +22,7 @@ import ( "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/broadcastclient" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/contracts" @@ -593,14 +594,14 @@ func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, final return nil } -func (c *SeqCoordinator) update(ctx context.Context) time.Duration { +func (c *SeqCoordinator) update(ctx context.Context) (time.Duration, error) { chosenSeq, err := c.RedisCoordinator().RecommendSequencerWantingLockout(ctx) if err != nil { log.Warn("coordinator failed finding sequencer wanting lockout", "err", err) - return c.retryAfterRedisError() + return c.retryAfterRedisError(), nil } if c.prevChosenSequencer == c.config.Url() { - return c.updateWithLockout(ctx, chosenSeq) + return c.updateWithLockout(ctx, chosenSeq), nil } if chosenSeq != c.config.Url() && chosenSeq != c.prevChosenSequencer { var err error @@ -621,14 +622,14 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { localMsgCount, err := c.streamer.GetMessageCount() if err != nil { log.Error("cannot read message count", "err", err) - return c.config.UpdateInterval + return c.config.UpdateInterval, nil } // Cache the previous redis coordinator's message count if c.prevRedisCoordinator != nil && c.prevRedisMessageCount == 0 { prevRemoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.prevRedisCoordinator.Client) if err != nil { log.Warn("cannot get remote message count", "err", err) - return c.retryAfterRedisError() + return c.retryAfterRedisError(), nil } c.prevRedisMessageCount = prevRemoteMsgCount } @@ -643,7 +644,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { remoteMsgCount, err := c.GetRemoteMsgCount() if err != nil { log.Warn("cannot get remote message count", "err", err) - return c.retryAfterRedisError() + return c.retryAfterRedisError(), nil } readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount) client := c.RedisCoordinator().Client @@ -720,6 +721,9 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } if len(messages) > 0 { if err := c.streamer.AddMessages(localMsgCount, false, messages); err != nil { + if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) { + return time.Duration(0), broadcastclient.TransactionStreamerBlockCreationStopped + } log.Warn("coordinator failed to add messages", "err", err, "pos", localMsgCount, "length", len(messages)) } else { localMsgCount = msgToRead @@ -727,7 +731,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } if c.config.Url() == redisutil.INVALID_URL { - return c.noRedisError() + return c.noRedisError(), nil } // Sequencer should want lockout if and only if- its synced, not avoiding lockout and execution processed every message that consensus had 1 second ago @@ -745,7 +749,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { if synced && localMsgCount >= remoteMsgCount && chosenSeq == c.config.Url() { if c.sequencer == nil { log.Error("myurl main sequencer, but no sequencer exists") - return c.noRedisError() + return c.noRedisError(), nil } processedMessages, err := c.streamer.GetProcessedMessageCount() if err != nil { @@ -765,7 +769,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { log.Warn("failed to update wants lockout key", "err", err) } c.prevChosenSequencer = "" - return c.retryAfterRedisError() + return c.retryAfterRedisError(), nil } log.Info("caught chosen-coordinator lock", "myUrl", c.config.Url()) if c.delayedSequencer != nil { @@ -782,7 +786,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } c.sequencer.Activate() c.prevChosenSequencer = c.config.Url() - return c.noRedisError() + return c.noRedisError(), nil } } @@ -798,9 +802,9 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { } if (wantsLockoutErr != nil) || (msgReadErr != nil) { - return c.retryAfterRedisError() + return c.retryAfterRedisError(), nil } - return c.noRedisError() + return c.noRedisError(), nil } // Warning: acquires the wantsLockoutMutex @@ -868,11 +872,11 @@ func (c *SeqCoordinator) Start(ctxIn context.Context) { c.LaunchThread(func(ctx context.Context) { for { - if c.streamer.Stopped() { + interval, err := c.chooseRedisAndUpdate(ctx, newRedisCoordinator) + if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) { log.Info("stopping block creation in sequencer because transaction streamer has stopped") return } - interval := c.chooseRedisAndUpdate(ctx, newRedisCoordinator) if ctx.Err() != nil { return } @@ -893,13 +897,13 @@ func (c *SeqCoordinator) Start(ctxIn context.Context) { } } -func (c *SeqCoordinator) chooseRedisAndUpdate(ctx context.Context, newRedisCoordinator *redisutil.RedisCoordinator) time.Duration { +func (c *SeqCoordinator) chooseRedisAndUpdate(ctx context.Context, newRedisCoordinator *redisutil.RedisCoordinator) (time.Duration, error) { // If we have a new redis coordinator, and we haven't switched to it yet, try to switch. if c.config.NewRedisUrl != "" && c.prevRedisCoordinator == nil { // If we fail to try to switch, we'll retry soon. if err := c.trySwitchingRedis(ctx, newRedisCoordinator); err != nil { log.Warn("error while trying to switch redis coordinator", "err", err) - return c.retryAfterRedisError() + return c.retryAfterRedisError(), nil } } return c.update(ctx) diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index e712db1236..362032921a 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -437,7 +437,7 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) { } if err := bc.txStreamer.AddBroadcastMessages(res.Messages); err != nil { if errors.Is(err, TransactionStreamerBlockCreationStopped) { - log.Info("stopping block creation in broadcast client because transaction streamer is stopped") + log.Info("stopping block creation in broadcast client because transaction streamer has stopped") return } log.Error("Error adding message from Sequencer Feed", "err", err) diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index a6d81068b5..5092549349 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -196,7 +196,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context) { case msg := <-bcs.primaryRouter.messageChan: if err := msgHandler(msg, bcs.primaryRouter); err != nil { if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) { - log.Info("stopping block creation in broadcast clients because transaction streamer is stopped") + log.Info("stopping block creation in broadcast clients because transaction streamer has stopped") return } log.Error("Error routing message from Primary Sequencer Feeds", "err", err) From 3c2e99b2fce5d52d2d307a382b493cd0346527ee Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Wed, 6 Nov 2024 13:29:24 +0530 Subject: [PATCH 7/8] fix lint --- arbnode/seq_coordinator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index f44619ec61..41bef0c46a 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -802,6 +802,7 @@ func (c *SeqCoordinator) update(ctx context.Context) (time.Duration, error) { } if (wantsLockoutErr != nil) || (msgReadErr != nil) { + //lint:ignore nilerr we want to retry after redis error return c.retryAfterRedisError(), nil } return c.noRedisError(), nil From 40baf657b35cb7682d95308b4d516821e76c686f Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Thu, 21 Nov 2024 13:25:24 +0530 Subject: [PATCH 8/8] Changes based on PR comments --- arbnode/inbox_reader.go | 26 ++++++++++++-------------- arbnode/seq_coordinator.go | 31 ++++++++++++------------------- util/stopwaiter/stopwaiter.go | 6 +++++- 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 49d392de35..51c753fe01 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -124,12 +124,14 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h func (r *InboxReader) Start(ctxIn context.Context) error { r.StopWaiter.Start(ctxIn, r) hadError := false - r.LaunchThread(func(ctx context.Context) { - for { + runChan := make(chan struct{}, 1) + err := stopwaiter.CallIterativelyWith[struct{}]( + &r.StopWaiterSafe, + func(ctx context.Context, ignored struct{}) time.Duration { err := r.run(ctx, hadError) if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) { log.Info("stopping block creation in inbox reader because transaction streamer has stopped") - return + close(runChan) } if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") { log.Warn("error reading inbox", "err", err) @@ -137,17 +139,13 @@ func (r *InboxReader) Start(ctxIn context.Context) error { } else { hadError = false } - interval := time.Second - timer := time.NewTimer(interval) - select { - case <-ctx.Done(): - timer.Stop() - return - case <-timer.C: - } - } - }) - + return time.Second + }, + runChan, + ) + if err != nil { + return err + } // Ensure we read the init message before other things start up for i := 0; ; i++ { batchCount, err := r.tracker.GetBatchCount() diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 41bef0c46a..c9d4b0d699 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -870,29 +870,22 @@ func (c *SeqCoordinator) Start(ctxIn context.Context) { err, "newRedisUrl", c.config.NewRedisUrl) } } - - c.LaunchThread(func(ctx context.Context) { - for { + chooseRedisAndUpdateChan := make(chan struct{}, 1) + err := stopwaiter.CallIterativelyWith[struct{}]( + &c.StopWaiterSafe, + func(ctx context.Context, ignored struct{}) time.Duration { interval, err := c.chooseRedisAndUpdate(ctx, newRedisCoordinator) if errors.Is(err, broadcastclient.TransactionStreamerBlockCreationStopped) { log.Info("stopping block creation in sequencer because transaction streamer has stopped") - return - } - if ctx.Err() != nil { - return - } - if interval == time.Duration(0) { - continue - } - timer := time.NewTimer(interval) - select { - case <-ctx.Done(): - timer.Stop() - return - case <-timer.C: + close(chooseRedisAndUpdateChan) } - } - }) + return interval + }, + chooseRedisAndUpdateChan, + ) + if err != nil { + log.Warn("error in starting iterative call to chooseRedisAndUpdate", "err", err) + } if c.config.ChosenHealthcheckAddr != "" { c.StopWaiter.LaunchThread(c.launchHealthcheckServer) } diff --git a/util/stopwaiter/stopwaiter.go b/util/stopwaiter/stopwaiter.go index 993768dd85..40b0d0d97a 100644 --- a/util/stopwaiter/stopwaiter.go +++ b/util/stopwaiter/stopwaiter.go @@ -231,6 +231,7 @@ func CallIterativelyWith[T any]( return s.LaunchThreadSafe(func(ctx context.Context) { var defaultVal T var val T + var ok bool for { interval := foo(ctx, val) if ctx.Err() != nil { @@ -246,7 +247,10 @@ func CallIterativelyWith[T any]( timer.Stop() return case <-timer.C: - case val = <-triggerChan: + case val, ok = <-triggerChan: + if !ok { + return + } } } })