Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2845] Add a nitro option to stop syncing at a given block number #2749

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
Require(t, err)

err = streamer.Start(ctx)
err = streamer.Start(ctx, 0)
Require(t, err)
exec.Start(ctx)
exec.Start(ctx, 0)
init, err := streamer.GetMessage(0)
Require(t, err)

Expand Down
17 changes: 14 additions & 3 deletions arbnode/delayed_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,20 @@ func (d *DelayedSequencer) ForceSequenceDelayed(ctx context.Context) error {
return d.sequenceWithoutLockout(ctx, lastBlockHeader)
}

func (d *DelayedSequencer) run(ctx context.Context) {
func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) {
headerChan, cancel := d.l1Reader.Subscribe(false)
defer cancel()

for {
delayedCount, err := d.inbox.GetDelayedCount()
if err != nil {
log.Warn("error reading delayed count", "err", err)
continue
}
if syncTillBlock > 0 && delayedCount >= syncTillBlock {
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
log.Info("stopping block creation in delayed sequencer", "syncTillBlock", syncTillBlock)
return
}
select {
case nextHeader, ok := <-headerChan:
if !ok {
Expand All @@ -232,7 +241,9 @@ func (d *DelayedSequencer) run(ctx context.Context) {
}
}

func (d *DelayedSequencer) Start(ctxIn context.Context) {
func (d *DelayedSequencer) Start(ctxIn context.Context, syncTillBlock uint64) {
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
d.StopWaiter.Start(ctxIn, d)
d.LaunchThread(d.run)
d.LaunchThread(func(ctx context.Context) {
d.run(ctx, syncTillBlock)
})
}
36 changes: 27 additions & 9 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,36 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h
}, nil
}

func (r *InboxReader) Start(ctxIn context.Context) error {
func (r *InboxReader) Start(ctxIn context.Context, syncTillBlock uint64) error {
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
r.StopWaiter.Start(ctxIn, r)
hadError := false
r.CallIteratively(func(ctx context.Context) time.Duration {
err := r.run(ctx, hadError)
if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") {
log.Warn("error reading inbox", "err", err)
hadError = true
} else {
hadError = false
r.LaunchThread(func(ctx context.Context) {
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
for {
delayedCount, err := r.tracker.GetDelayedCount()
if err != nil {
log.Warn("error reading delayed count", "err", err)
hadError = true
}
if syncTillBlock > 0 && delayedCount >= syncTillBlock {
log.Info("stopping block creation in inbox reader", "syncTillBlock", syncTillBlock)
return
}
err = r.run(ctx, hadError)
if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "header not found") {
log.Warn("error reading inbox", "err", err)
hadError = true
} else {
hadError = false
}
interval := time.Second
timer := time.NewTimer(interval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
}
return time.Second
})

// Ensure we read the init message before other things start up
Expand Down
4 changes: 2 additions & 2 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ func TestTransactionStreamer(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := inbox.Start(ctx)
err := inbox.Start(ctx, 0)
Require(t, err)
exec.Start(ctx)
exec.Start(ctx, 0)

maxExpectedGasCost := big.NewInt(l2pricing.InitialBaseFeeWei)
maxExpectedGasCost.Mul(maxExpectedGasCost, big.NewInt(2100*2))
Expand Down
15 changes: 9 additions & 6 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com

type Config struct {
Sequencer bool `koanf:"sequencer"`
SyncTillBlock uint64 `koanf:"sync-till-block"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
Expand Down Expand Up @@ -145,6 +146,7 @@ func (c *Config) ValidatorRequired() bool {

func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feedOutputEnable bool) {
f.Bool(prefix+".sequencer", ConfigDefault.Sequencer, "enable sequencer")
f.Uint64(prefix+".sync-till-block", ConfigDefault.SyncTillBlock, "sync till block")
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
headerreader.AddOptions(prefix+".parent-chain-reader", f)
InboxReaderConfigAddOptions(prefix+".inbox-reader", f)
DelayedSequencerConfigAddOptions(prefix+".delayed-sequencer", f)
Expand All @@ -163,6 +165,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed

var ConfigDefault = Config{
Sequencer: false,
SyncTillBlock: 0,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
Expand Down Expand Up @@ -839,7 +842,7 @@ func (n *Node) Start(ctx context.Context) error {
if execClient != nil {
execClient.SetConsensusClient(n)
}
err = n.Execution.Start(ctx)
err = n.Execution.Start(ctx, n.configFetcher.Get().SyncTillBlock)
if err != nil {
return fmt.Errorf("error starting exec client: %w", err)
}
Expand Down Expand Up @@ -869,12 +872,12 @@ func (n *Node) Start(ctx context.Context) error {
return fmt.Errorf("error populating feed backlog on startup: %w", err)
}
}
err = n.TxStreamer.Start(ctx)
err = n.TxStreamer.Start(ctx, n.configFetcher.Get().SyncTillBlock)
if err != nil {
return fmt.Errorf("error starting transaction streamer: %w", err)
}
if n.InboxReader != nil {
err = n.InboxReader.Start(ctx)
err = n.InboxReader.Start(ctx, n.configFetcher.Get().SyncTillBlock)
if err != nil {
return fmt.Errorf("error starting inbox reader: %w", err)
}
Expand All @@ -887,15 +890,15 @@ func (n *Node) Start(ctx context.Context) error {
}
}
if n.SeqCoordinator != nil {
n.SeqCoordinator.Start(ctx)
n.SeqCoordinator.Start(ctx, n.configFetcher.Get().SyncTillBlock)
} else {
n.Execution.Activate()
}
if n.MaintenanceRunner != nil {
n.MaintenanceRunner.Start(ctx)
}
if n.DelayedSequencer != nil {
n.DelayedSequencer.Start(ctx)
n.DelayedSequencer.Start(ctx, n.configFetcher.Get().SyncTillBlock)
}
if n.BatchPoster != nil {
n.BatchPoster.Start(ctx)
Expand Down Expand Up @@ -945,7 +948,7 @@ func (n *Node) Start(ctx context.Context) error {
return
}
}
n.BroadcastClients.Start(ctx)
n.BroadcastClients.Start(ctx, n.configFetcher.Get().SyncTillBlock)
}()
}
if n.configFetcher != nil {
Expand Down
30 changes: 28 additions & 2 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ func (c *SeqCoordinator) launchHealthcheckServer(ctx context.Context) {
}
}

func (c *SeqCoordinator) Start(ctxIn context.Context) {
func (c *SeqCoordinator) Start(ctxIn context.Context, syncTillBlock uint64) {
c.StopWaiter.Start(ctxIn, c)
var newRedisCoordinator *redisutil.RedisCoordinator
if c.config.NewRedisUrl != "" {
Expand All @@ -865,7 +865,33 @@ func (c *SeqCoordinator) Start(ctxIn context.Context) {
err, "newRedisUrl", c.config.NewRedisUrl)
}
}
c.CallIteratively(func(ctx context.Context) time.Duration { return c.chooseRedisAndUpdate(ctx, newRedisCoordinator) })
amsanghi marked this conversation as resolved.
Show resolved Hide resolved

c.LaunchThread(func(ctx context.Context) {
for {
count, err := c.streamer.GetMessageCount()
if err != nil {
log.Warn("failed to get message count", "err", err)
}
if syncTillBlock > 0 && uint64(count) >= syncTillBlock {
log.Info("stopping block creation in sequencer", "syncTillBlock", syncTillBlock)
return
}
interval := c.chooseRedisAndUpdate(ctx, newRedisCoordinator)
if ctx.Err() != nil {
return
}
if interval == time.Duration(0) {
continue
}
timer := time.NewTimer(interval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
}
})
if c.config.ChosenHealthcheckAddr != "" {
c.StopWaiter.LaunchThread(c.launchHealthcheckServer)
}
Expand Down
29 changes: 27 additions & 2 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,32 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc
return s.config().ExecuteMessageLoopDelay
}

func (s *TransactionStreamer) Start(ctxIn context.Context) error {
func (s *TransactionStreamer) Start(ctxIn context.Context, syncTillBlock uint64) error {
s.StopWaiter.Start(ctxIn, s)
return stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.executeMessages, s.newMessageNotifier)
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
return s.LaunchThreadSafe(func(ctx context.Context) {
var defaultVal struct{}
var val struct{}
for {
if syncTillBlock > 0 && uint64(s.execLastMsgCount) >= syncTillBlock {
log.Info("stopping block creation in transaction streamer", "syncTillBlock", syncTillBlock)
return
}
interval := s.executeMessages(ctx, val)
if ctx.Err() != nil {
return
}
val = defaultVal
if interval == time.Duration(0) {
continue
}
timer := time.NewTimer(interval)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
case val = <-s.newMessageNotifier:
}
}
})
}
7 changes: 6 additions & 1 deletion broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func NewBroadcastClient(
}, err
}

func (bc *BroadcastClient) Start(ctxIn context.Context) {
func (bc *BroadcastClient) Start(ctxIn context.Context, syncTillBlock uint64) {
bc.StopWaiter.Start(ctxIn, bc)
if bc.StopWaiter.Stopped() {
log.Info("broadcast client has already been stopped, not starting")
Expand All @@ -185,6 +185,11 @@ func (bc *BroadcastClient) Start(ctxIn context.Context) {
bc.LaunchThread(func(ctx context.Context) {
backoffDuration := bc.config().ReconnectInitialBackoff
for {

if syncTillBlock > 0 && uint64(bc.nextSeqNum) >= syncTillBlock {
log.Info("stopping block creation in broadcast client", "syncTillBlock", syncTillBlock)
return
}
earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum)
if errors.Is(err, ErrMissingChainId) ||
errors.Is(err, ErrIncorrectChainId) ||
Expand Down
20 changes: 10 additions & 10 deletions broadcastclient/broadcastclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestInvalidSignature(t *testing.T) {
&badSequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)

go func() {
for i := 0; i < messageCount; i++ {
Expand Down Expand Up @@ -227,7 +227,7 @@ func startMakeBroadcastClient(ctx context.Context, t *testing.T, clientConfig Co
sequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)
messageCount := 0

wg.Add(1)
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestServerClientDisconnect(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)

t.Log("broadcasting seq 0 message")
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil))
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestBroadcastClientConfirmedMessage(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)

t.Log("broadcasting seq 0 message")
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil))
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestServerIncorrectChainId(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
badBroadcastClient.Start(ctx)
badBroadcastClient.Start(ctx, 0)
badTimer := time.NewTimer(5 * time.Second)
select {
case err := <-feedErrChan:
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestServerMissingChainId(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
badBroadcastClient.Start(ctx)
badBroadcastClient.Start(ctx, 0)
badTimer := time.NewTimer(5 * time.Second)
select {
case err := <-feedErrChan:
Expand Down Expand Up @@ -574,7 +574,7 @@ func TestServerIncorrectFeedServerVersion(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
badBroadcastClient.Start(ctx)
badBroadcastClient.Start(ctx, 0)
badTimer := time.NewTimer(5 * time.Second)
select {
case err := <-feedErrChan:
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestServerMissingFeedServerVersion(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
badBroadcastClient.Start(ctx)
badBroadcastClient.Start(ctx, 0)
badTimer := time.NewTimer(5 * time.Second)
select {
case err := <-feedErrChan:
Expand Down Expand Up @@ -684,7 +684,7 @@ func TestBroadcastClientReconnectsOnServerDisconnect(t *testing.T) {
&sequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)
defer broadcastClient.StopAndWait()

// Client set to timeout connection at 200 milliseconds, and server set to send ping every 50 seconds,
Expand Down Expand Up @@ -796,7 +796,7 @@ func connectAndGetCachedMessages(ctx context.Context, addr net.Addr, chainId uin
sequencerAddr,
)
Require(t, err)
broadcastClient.Start(ctx)
broadcastClient.Start(ctx, 0)

go func() {
defer wg.Done()
Expand Down
Loading
Loading