Skip to content

Commit

Permalink
refactor(manager): cleanup proposer, full node and rotation flows (#1183
Browse files Browse the repository at this point in the history
)

Co-authored-by: Sergi Rene <[email protected]>
  • Loading branch information
omritoptix and srene authored Oct 31, 2024
1 parent efbf726 commit 58b9caf
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 233 deletions.
18 changes: 10 additions & 8 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("save proposer: %w", err)
}

// 2. Update the proposer in the state in case of rotation.
switchRole := m.Executor.UpdateProposerFromBlock(m.State, m.Sequencers, block)
// 2. Update the proposer in the state in case of rotation happened on the rollapp level (not necessarily on the hub yet).
isProposerUpdated := m.Executor.UpdateProposerFromBlock(m.State, m.Sequencers, block)

// 3. Save the state to the store (independently of the height). Here the proposer might differ from (1).
batch, err = m.Store.SaveState(m.State, batch)
Expand All @@ -172,18 +172,20 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

m.blockCache.Delete(block.Header.Height)

if switchRole {
// TODO: graceful role change (https://github.com/dymensionxyz/dymint/issues/1008)
m.logger.Info("Node changing to proposer role")
panic("sequencer is no longer the proposer")
}

// validate whether configuration params and rollapp consensus params keep in line, after rollapp params are updated from the responses received in the block execution
err = m.ValidateConfigWithRollappParams()
if err != nil {
return err
}

// Check if there was an Update for the proposer and if I am the new proposer.
// If so, restart so I can start as the proposer.
// For current proposer, we don't want to restart because we still need to send the last batch.
// This will be done as part of the `rotate` function.
if isProposerUpdated && m.AmIProposerOnRollapp() {
panic("I'm the new Proposer now. restarting as a proposer")
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func (m *Manager) RunInitChain(ctx context.Context) error {
// get the proposer's consensus pubkey
// FIXME: We want to get the initial proposer and not current one
proposer := m.SLClient.GetProposer()
if proposer == nil {
return errors.New("failed to get proposer")
Expand Down
120 changes: 24 additions & 96 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/dymensionxyz/dymint/indexers/txindex"
"github.com/dymensionxyz/dymint/store"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"
"github.com/dymensionxyz/dymint/version"

"github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -192,17 +191,29 @@ func (m *Manager) Start(ctx context.Context) error {
}
}

// Check if the chain is halted
err := m.isChainHalted()
if err != nil {
return err
// Check if a proposer on the rollapp is set. In case no proposer is set on the Rollapp, fallback to the hub proposer (If such exists).
// No proposer on the rollapp means that at some point there was no available proposer.
// In case there is also no proposer on the hub to our current height, it means that the chain is halted.
// FIXME: In case we are syncing we would like to get the proposer from the hub relevant to the current height.
if m.State.GetProposer() == nil {
m.logger.Info("No proposer on the rollapp, fallback to the hub proposer, if available")
SLProposer := m.SLClient.GetProposer()
if SLProposer == nil {
return fmt.Errorf("no proposer available. chain is halted")
}
m.State.SetProposer(SLProposer)
}

isProposer := m.IsProposer()
m.logger.Info("starting block manager", "proposer", isProposer)
// checks if the the current node is the proposer either on rollapp or on the hub.
// In case of sequencer rotation, there's a phase where proposer rotated on Rollapp but hasn't yet rotated on hub.
// for this case, 2 nodes will get `true` for `AmIProposer` so the l2 proposer can produce blocks and the hub proposer can submit his last batch.
// The hub proposer, after sending the last state update, will panic and restart as full node.
amIProposer := m.AmIProposerOnSL() || m.AmIProposerOnRollapp()

m.logger.Info("starting block manager", "mode", map[bool]string{true: "proposer", false: "full node"}[amIProposer])

// update local state from latest state in settlement
err = m.updateFromLastSettlementState()
err := m.updateFromLastSettlementState()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
Expand All @@ -224,100 +235,17 @@ func (m *Manager) Start(ctx context.Context) error {
return m.SettlementSyncLoop(ctx)
})

// Monitor sequencer set updates
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.MonitorSequencerSetUpdates(ctx)
})

/* ----------------------------- full node mode ----------------------------- */
if !isProposer {

// update latest finalized height
err = m.updateLastFinalizedHeightFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}

// Start the settlement validation loop in the background
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SettlementValidateLoop(ctx)
})

// Subscribe to new (or finalized) state updates events.
go uevent.MustSubscribe(ctx, m.Pubsub, "syncLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "validateLoop", settlement.EventQueryNewSettlementBatchFinalized, m.onNewStateUpdateFinalized, m.logger)

// Subscribe to P2P received blocks events (used for P2P syncing).
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.OnReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.OnReceivedBlock, m.logger)

return nil
}

/* ----------------------------- sequencer mode ----------------------------- */
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet.
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)

// Sequencer must wait till the DA light client is synced. Otherwise it will fail when submitting blocks.
// Full-nodes does not need to wait, but if it tries to fetch blocks from DA heights previous to the DA light client height it will fail, and it will retry till it reaches the height.
m.DAClient.WaitForSyncing()

// Sequencer must wait till node is synced till last submittedHeight, in case it is not
m.waitForSettlementSyncing()

// check if sequencer in the middle of rotation
nextSeqAddr, missing, err := m.MissingLastBatch()
if err != nil {
return fmt.Errorf("checking if missing last batch: %w", err)
}
// if sequencer is in the middle of rotation, complete rotation instead of running the main loop
if missing {
m.handleRotationReq(ctx, nextSeqAddr)
return nil
// run based on the node role
if !amIProposer {
return m.runAsFullNode(ctx, eg)
}

// populate the bytes produced channel
bytesProducedC := make(chan int)

// channel to signal sequencer rotation started
rotateSequencerC := make(chan string, 1)

uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
bytesProducedC <- m.GetUnsubmittedBytes() // load unsubmitted bytes from previous run
return m.ProduceBlockLoop(ctx, bytesProducedC)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.MonitorSequencerRotation(ctx, rotateSequencerC)
})

go func() {
_ = eg.Wait()
// Check if exited due to sequencer rotation signal
select {
case nextSeqAddr := <-rotateSequencerC:
m.handleRotationReq(ctx, nextSeqAddr)
default:
m.logger.Info("Block manager err group finished.")
}
}()

return nil
}

func (m *Manager) isChainHalted() error {
if m.State.GetProposerPubKey() == nil {
// if no proposer set in state, try to update it from the hub
err := m.UpdateProposerFromSL()
if err != nil {
return fmt.Errorf("update proposer: %w", err)
}
if m.State.GetProposerPubKey() == nil {
return fmt.Errorf("no proposer pubkey found. chain is halted")
}
}
return nil
return m.runAsProposer(ctx, eg)
}

func (m *Manager) NextHeightToSubmit() uint64 {
Expand Down
73 changes: 73 additions & 0 deletions block/modes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package block

import (
"context"
"fmt"

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/settlement"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"
"golang.org/x/sync/errgroup"
)

// setFraudHandler sets the fraud handler for the block manager.
func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {
// update latest finalized height
err := m.updateLastFinalizedHeightFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}

// Start the settlement validation loop in the background
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SettlementValidateLoop(ctx)
})

// Subscribe to new (or finalized) state updates events.
go uevent.MustSubscribe(ctx, m.Pubsub, "syncLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "validateLoop", settlement.EventQueryNewSettlementBatchFinalized, m.onNewStateUpdateFinalized, m.logger)

// Subscribe to P2P received blocks events (used for P2P syncing).
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.OnReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.OnReceivedBlock, m.logger)

return nil
}

func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet.
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)

// Sequencer must wait till the DA light client is synced. Otherwise it will fail when submitting blocks.
// Full-nodes does not need to wait, but if it tries to fetch blocks from DA heights previous to the DA light client height it will fail, and it will retry till it reaches the height.
m.DAClient.WaitForSyncing()

// Sequencer must wait till node is synced till last submittedHeight, in case it is not
m.waitForSettlementSyncing()

// check if we should rotate
shouldRotate, err := m.ShouldRotate()
if err != nil {
return fmt.Errorf("checking should rotate: %w", err)
}
if shouldRotate {
m.rotate(ctx)
}

// populate the bytes produced channel
bytesProducedC := make(chan int)

uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
bytesProducedC <- m.GetUnsubmittedBytes() // load unsubmitted bytes from previous run
return m.ProduceBlockLoop(ctx, bytesProducedC)
})

// Monitor and handling of the rotation
go m.MonitorProposerRotation(ctx)

return nil
}
5 changes: 5 additions & 0 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
case <-ctx.Done():
return nil
case <-ticker.C:
// Only produce if I'm the current rollapp proposer.
if !m.AmIProposerOnRollapp() {
continue
}

// if empty blocks are configured to be enabled, and one is scheduled...
produceEmptyBlock := firstBlock || m.Conf.MaxIdleTime == 0 || nextEmptyBlock.Before(time.Now())
Expand Down Expand Up @@ -107,6 +111,7 @@ func (m *Manager) ProduceApplyGossipBlock(ctx context.Context, allowEmpty bool)
}

func (m *Manager) produceApplyGossip(ctx context.Context, allowEmpty bool, nextProposerHash *[32]byte) (block *types.Block, commit *types.Commit, err error) {
// If I'm not the current rollapp proposer, I should not produce a blocks.
block, commit, err = m.produceBlock(allowEmpty, nextProposerHash)
if err != nil {
return nil, nil, fmt.Errorf("produce block: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error {
case retainHeight := <-m.pruningC:

var pruningHeight uint64
if m.IsProposer() { // do not delete anything that we might submit in future
if m.AmIProposerOnSL() || m.AmIProposerOnRollapp() { // do not delete anything that we might submit in future
pruningHeight = min(m.NextHeightToSubmit(), uint64(retainHeight))
} else { // do not delete anything that is not validated yet
pruningHeight = min(m.SettlementValidator.NextValidationHeight(), uint64(retainHeight))
Expand Down
Loading

0 comments on commit 58b9caf

Please sign in to comment.