Skip to content

Commit

Permalink
refactor(rotation): refactor sequencer set (#1022)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Aug 15, 2024
1 parent e1a5140 commit a812760
Show file tree
Hide file tree
Showing 26 changed files with 774 additions and 409 deletions.
8 changes: 7 additions & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
}

// check if the proposer needs to be changed
m.Executor.UpdateProposerFromBlock(m.State, block)
switchRole := m.Executor.UpdateProposerFromBlock(m.State, block)

// save sequencers to store to be queried over RPC
batch := m.Store.NewBatch()
Expand Down Expand Up @@ -98,6 +98,12 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
}
}

if switchRole {
// TODO: graceful role change (https://github.com/dymensionxyz/dymint/issues/1008)
m.logger.Info("Node changing to proposer role")
panic("sequencer is no longer the proposer")
}
return nil
}

Expand Down
12 changes: 9 additions & 3 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,19 @@ func TestCreateBlock(t *testing.T) {

maxBytes := uint64(100)

// Create a valid proposer for the block
proposerKey := ed25519.GenPrivKey()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposerKey.PubKey())
require.NoError(err)

// Init state
state := &types.State{}
state.Sequencers.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
state.Sequencers = *types.NewSequencerSet()

// empty block
block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash), state, maxBytes)
block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash[:]), state, maxBytes)
require.NotNil(block)
assert.Empty(block.Data.Txs)
assert.Equal(uint64(1), block.Header.Height)
Expand Down Expand Up @@ -146,7 +152,7 @@ func TestApplyBlock(t *testing.T) {

// Init state
state := &types.State{}
state.Sequencers.SetProposer(tmtypes.NewValidator(tmPubKey, 1))
state.Sequencers.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.InitialHeight = 1
state.SetHeight(0)
maxBytes := uint64(100)
Expand Down
2 changes: 1 addition & 1 deletion block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (m *Manager) RunInitChain(ctx context.Context) error {
return err
}
// update the state with only the consensus pubkey
m.Executor.UpdateStateAfterInitChain(m.State, res, tmProposer)
m.Executor.UpdateStateAfterInitChain(m.State, res)
m.Executor.UpdateMempoolAfterInitChain(m.State)
if _, err := m.Store.SaveState(m.State, nil); err != nil {
return err
Expand Down
9 changes: 7 additions & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/dymensionxyz/dymint/types"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/proxy"
Expand All @@ -39,8 +40,11 @@ func TestInitialState(t *testing.T) {
var err error
assert := assert.New(t)
genesis := testutil.GenerateGenesis(123)
sampleState := testutil.GenerateState(1, 128)
key, _, _ := crypto.GenerateEd25519Key(rand.Reader)
raw, _ := key.GetPublic().Raw()
pubkey := ed25519.PubKey(raw)
sampleState := testutil.GenerateStateWithSequencer(1, 128, pubkey)

conf := testutil.GetManagerConfig()
logger := log.TestingLogger()
pubsubServer := pubsub.NewServer()
Expand Down Expand Up @@ -152,7 +156,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
go func() {
errChan <- manager.Start(ctx)
err := <-errChan
assert.NoError(t, err)
require.NoError(t, err)
}()
<-ctx.Done()
assert.Equal(t, batch.EndHeight(), manager.LastSubmittedHeight.Load())
Expand Down Expand Up @@ -210,6 +214,7 @@ func TestProducePendingBlock(t *testing.T) {
require.NoError(t, err)
// Generate block and commit and save it to the store
block := testutil.GetRandomBlock(1, 3)
copy(block.Header.SequencerHash[:], manager.State.Sequencers.ProposerHash)
copy(block.Header.NextSequencersHash[:], manager.State.Sequencers.ProposerHash)

_, err = manager.Store.SaveBlock(block, &block.LastCommit, nil)
Expand Down
7 changes: 7 additions & 0 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func (m *Manager) syncToTargetHeight(targetHeight uint64) error {
if err != nil {
return fmt.Errorf("process next DA batch: %w", err)
}

// if height havent been updated, we are stuck
if m.State.NextHeight() == currH {
return fmt.Errorf("stuck at height %d", currH)
}
m.logger.Info("Synced from DA", "store height", m.State.Height(), "target height", targetHeight)
}

Expand All @@ -74,6 +79,8 @@ func (m *Manager) syncFromDABatch() error {
return fmt.Errorf("retrieve batch: %w", err)
}

// FIXME: set correct proposer

m.logger.Info("Retrieved batch.", "state_index", stateIndex)

err = m.ProcessNextDABatch(settlementBatch.MetaData.DA)
Expand Down
52 changes: 19 additions & 33 deletions block/sequencers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
"github.com/google/uuid"
tmtypes "github.com/tendermint/tendermint/types"
)

func (m *Manager) MonitorSequencerRotation(ctx context.Context, rotateC chan string) error {
Expand Down Expand Up @@ -37,14 +36,12 @@ func (m *Manager) MonitorSequencerRotation(ctx context.Context, rotateC chan str
if next == nil {
continue
}
nextSeqAddr = next.Address
// for loop will break afterwards
nextSeqAddr = next.SettlementAddress
case event := <-subscription.Out():
eventData, _ := event.Data().(*settlement.EventDataRotationStarted)
nextSeqAddr = eventData.NextSeqAddr
// for loop will break afterwards
}
break
break // break out of the loop after getting the next sequencer address
}
// we get here once a sequencer rotation signal is received
m.logger.Info("Sequencer rotation started.", "next_seq", nextSeqAddr)
Expand All @@ -62,9 +59,17 @@ func (m *Manager) IsProposer() bool {
l2Proposer := m.GetProposerPubKey().Bytes()

var expectedHubProposer []byte
if m.SLClient.GetProposer() != nil {
expectedHubProposer = m.SLClient.GetProposer().PublicKey.Bytes()
hubProposer := m.SLClient.GetProposer()
if hubProposer != nil {
expectedHubProposer = hubProposer.PubKey().Bytes()
}

// check if recovering from halt
if l2Proposer == nil && hubProposer != nil {
m.State.Sequencers.SetProposer(hubProposer)
}

// we run sequencer flow if we're proposer on L2 or hub (can be different during rotation phase)
return bytes.Equal(l2Proposer, localProposerKey) || bytes.Equal(expectedHubProposer, localProposerKey)
}

Expand All @@ -81,8 +86,8 @@ func (m *Manager) MissingLastBatch() (string, bool, error) {
// rotation in progress,
// check if we're the old proposer and needs to complete rotation
curr := m.SLClient.GetProposer()
isProposer := bytes.Equal(curr.PublicKey.Bytes(), localProposerKey)
return next.Address, isProposer, nil
isProposer := bytes.Equal(curr.PubKey().Bytes(), localProposerKey)
return next.SettlementAddress, isProposer, nil
}

// handleRotationReq completes the rotation flow once a signal is received from the SL
Expand All @@ -107,11 +112,11 @@ func (m *Manager) CompleteRotation(ctx context.Context, nextSeqAddr string) erro
// validate nextSeq is in the bonded set
var nextSeqHash [32]byte
if nextSeqAddr != "" {
val := m.State.Sequencers.GetByAddress([]byte(nextSeqAddr))
if val == nil {
seq := m.State.Sequencers.GetByAddress(nextSeqAddr)
if seq == nil {
return types.ErrMissingProposerPubKey
}
copy(nextSeqHash[:], val.PubKey.Address().Bytes())
copy(nextSeqHash[:], seq.Hash())
}

err := m.CreateAndPostLastBatch(ctx, nextSeqHash)
Expand Down Expand Up @@ -164,32 +169,13 @@ func (m *Manager) UpdateSequencerSetFromSL() error {
if err != nil {
return err
}
newSeqList := make([]*tmtypes.Validator, 0, len(seqs))
for _, seq := range seqs {
tmSeq, err := seq.TMValidator()
if err != nil {
return err
}
newSeqList = append(newSeqList, tmSeq)
}
m.State.Sequencers.SetSequencers(newSeqList)
m.State.Sequencers.SetSequencers(seqs)
m.logger.Debug("Updated bonded sequencer set.", "newSet", m.State.Sequencers.String())
return nil
}

// updateProposer updates the proposer in the state
func (m *Manager) UpdateProposer() error {
var (
err error
p *tmtypes.Validator
)
proposer := m.SLClient.GetProposer()
if proposer != nil {
p, err = proposer.TMValidator()
if err != nil {
return err
}
}
m.State.Sequencers.SetProposer(p)
m.State.Sequencers.SetProposer(m.SLClient.GetProposer())
return nil
}
37 changes: 15 additions & 22 deletions block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,14 @@ func (m *Manager) UpdateStateFromApp() error {
return nil
}

func (e *Executor) UpdateStateAfterInitChain(s *types.State, res *abci.ResponseInitChain, proposer *tmtypes.Validator) {
func (e *Executor) UpdateStateAfterInitChain(s *types.State, res *abci.ResponseInitChain) {
// If the app did not return an app hash, we keep the one set from the genesis doc in
// the state. We don't set appHash since we don't want the genesis doc app hash
// recorded in the genesis block. We should probably just remove GenesisDoc.AppHash.
if len(res.AppHash) > 0 {
copy(s.AppHash[:], res.AppHash)
}

if proposer == nil {
panic("proposer must be greater than zero on initChain")
}

if res.ConsensusParams != nil {
params := res.ConsensusParams
if params.Block != nil {
Expand All @@ -124,9 +120,6 @@ func (e *Executor) UpdateStateAfterInitChain(s *types.State, res *abci.ResponseI
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
copy(s.LastResultsHash[:], merkle.HashFromByteSlices(nil))

// Set the genesis sequencers in the state
s.Sequencers.SetProposer(proposer)
}

func (e *Executor) UpdateMempoolAfterInitChain(s *types.State) {
Expand All @@ -145,17 +138,22 @@ func (e *Executor) UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResp
}

// UpdateProposerFromBlock updates the proposer from the block
// in case of proposer change, the existing proposer sets the nextProposerHash in the block header
func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) {
// The next proposer is defined in the block header (NextSequencersHash)
// In case of a node that a becomes the proposer, we return true to mark the role change
// currently the node will rebooted to apply the new role
// TODO: (https://github.com/dymensionxyz/dymint/issues/1008)
func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) bool {
// no sequencer change
if bytes.Equal(s.Sequencers.ProposerHash[:], block.Header.NextSequencersHash[:]) {
return
if bytes.Equal(block.Header.SequencerHash[:], block.Header.NextSequencersHash[:]) {
return false
}

if block.Header.NextSequencersHash == [32]byte{} {
// the chain will be halted until proposer is set
// TODO: recover from halt (https://github.com/dymensionxyz/dymint/issues/1021)
e.logger.Info("rollapp left with no proposer. chain is halted")
s.Sequencers.SetProposer(nil)
return
return false
}

// if hash changed, update the active sequencer
Expand All @@ -165,15 +163,10 @@ func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) {
panic(fmt.Sprintf("failed to update new proposer: %v", err))
}

val := s.Sequencers.GetByAddress(e.localAddress)
if val == nil {
e.logger.Error("local key not found in sequencer set")
panic("local key not found in sequencer set")
localSeq := s.Sequencers.GetByConsAddress(e.localAddress)
if localSeq != nil && bytes.Equal(localSeq.Hash(), block.Header.NextSequencersHash[:]) {
return true
}

// TODO: graceful fallback to full node (https://github.com/dymensionxyz/dymint/issues/1008)
if bytes.Equal(types.GetHash(val), block.Header.NextSequencersHash[:]) {
e.logger.Error("node changing to proposer role")
panic("node changing to proposer role")
}
return false
}
10 changes: 4 additions & 6 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import (
"testing"
"time"

tmed25519 "github.com/tendermint/tendermint/crypto/ed25519"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/proxy"

cosmosed25519 "github.com/cosmos/cosmos-sdk/crypto/keys/ed25519"
"github.com/libp2p/go-libp2p/core/crypto"

"github.com/dymensionxyz/dymint/block"
"github.com/dymensionxyz/dymint/config"
slmocks "github.com/dymensionxyz/dymint/mocks/github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/testutil"
"github.com/dymensionxyz/dymint/types"
)
Expand Down Expand Up @@ -144,10 +144,8 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) {
lib2pPrivKey, err := crypto.UnmarshalEd25519PrivateKey(priv)
require.NoError(err)

cosmosPrivKey := cosmosed25519.PrivKey{Key: priv}
proposer := &settlement.Sequencer{
PublicKey: cosmosPrivKey.PubKey(),
}
proposerKey := tmed25519.PrivKey(priv)
proposer := *types.NewSequencer(proposerKey.PubKey(), "")

// Create a new mock ClientI
slmock := &slmocks.MockClientI{}
Expand Down
Loading

0 comments on commit a812760

Please sign in to comment.