Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus] Add MinBlockTime to delay mempool reaping #924

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8608bce
[WIP] Add MinBlockTime
red-0ne Jul 21, 2023
4a25b62
[Consensus] Feat: Configurable min block production time
red-0ne Jul 25, 2023
1028ae9
[Consensus] Refactor: decouple timer registration from subscription
red-0ne Jul 26, 2023
a7fd743
address review comments
red-0ne Jul 31, 2023
867e841
[Docs] Update development docs to warn to not use the changelog hook …
h5law Jul 24, 2023
0d448c9
[IBC] chore: Rename FlushAllEntries => FlushCachesToStore (#934)
h5law Jul 24, 2023
accccfc
[Utility] Feat: add client-side session cache (#888)
adshmh Jul 25, 2023
3165b8d
[IBC] Clone `cosmos/ics23` protobuf definitions into IBC repo (#922)
h5law Jul 26, 2023
990321e
[CLI] Consistent config/flag parsing & common helpers (#891)
bryanchriswhite Jul 26, 2023
21d7024
[IBC] Change Events to not have a Height field and use uint64 in quer…
h5law Jul 26, 2023
c67fa14
[IBC] Add ICS-02 Client Interfaces (#932)
h5law Jul 26, 2023
db8d8d6
[Persistence] Adds `node` subcommand to CLI (#935)
dylanlott Jul 26, 2023
74a5816
[IBC] chore: enable IBC module in k8s validators (#941)
h5law Jul 27, 2023
950ccc3
[Utility] Use TreeStore as source of truth (#937)
h5law Jul 27, 2023
d3bf0ad
[IBC] Enable validators and thus IBC host creation in K8s (#942)
h5law Jul 28, 2023
c903ca1
[Utility] Create trustless_relay_validation.md (#938)
adshmh Jul 31, 2023
298b08f
[Persistence] Adds atomic Update for TreeStore (#861)
dylanlott Jul 31, 2023
a68af5c
[chore] Replaces multierr usage with go native errors package (#939)
dylanlott Jul 31, 2023
0941549
hack: 😴 sleep enough for cli debug subcommands to broadcast (#954)
0xBigBoss Jul 31, 2023
50f8846
DevLog 12 (#957)
Olshansk Aug 1, 2023
e0e9fd4
[Utility] servicer signs relays (#952)
adshmh Aug 1, 2023
2a226cc
[LocalNet] Fix metrics scraping (#940)
okdas Aug 1, 2023
6c7599e
prevent sending to closed channels
red-0ne Aug 2, 2023
92ece19
disable block preparation delay when manual mode is on
red-0ne Aug 4, 2023
fef4217
[E2E Test] Utilities for State Sync Test (#874)
Olshansk Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions consensus/e2e_tests/pacemaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ func forcePacemakerTimeout(t *testing.T, clockMock *clock.Mock, paceMakerTimeout
advanceTime(t, clockMock, paceMakerTimeout+10*time.Millisecond)
}

func TestPacemaker_MinBlockTime(t *testing.T) {
func TestPacemaker_MinBlockTime_DelayBlockPrep(t *testing.T) {
// Test preparation
clockMock := clock.NewMock()
timeReminder(t, clockMock, time.Second)

// UnitTestNet configs
paceMakerTimeoutMsec := uint64(300000)
consensusMessageTimeout := time.Duration(paceMakerTimeoutMsec / 5) // Must be smaller than pacemaker timeout because we expect a deterministic number of consensus messages.
paceMakerMinBlockTimeMsec := uint64(5000) // Make sure it is larger than the consensus message timeout
paceMakerMinBlockTimeMsec := uint64(paceMakerTimeoutMsec / 6)
runtimeMgrs := GenerateNodeRuntimeMgrs(t, numValidators, clockMock)
for _, runtimeConfig := range runtimeMgrs {
consCfg := runtimeConfig.GetConfig().Consensus.PacemakerConfig
Expand All @@ -195,22 +195,15 @@ func TestPacemaker_MinBlockTime(t *testing.T) {
err := StartAllTestPocketNodes(t, pocketNodes)
require.NoError(t, err)

replicas := IdToNodeMapping{}
// First round ever has leaderId=2 ((height+round+step-1)%numValidators)+1
// See: consensus/leader_election/module.go#electNextLeaderDeterministicRoundRobin
leaderId := typesCons.NodeId(2)
leader := IdToNodeMapping{}
numReplicas := len(pocketNodes) - 1

// Debug message to start consensus by triggering next view
// Get leader out of replica set
for id, pocketNode := range pocketNodes {
for _, pocketNode := range pocketNodes {
TriggerNextView(t, pocketNode)
if id == leaderId {
leader[id] = pocketNode
} else {
replicas[id] = pocketNode
}

// Right after triggering the next view
// Consensus started and all nodes are at NewRound step
Expand All @@ -226,16 +219,16 @@ func TestPacemaker_MinBlockTime(t *testing.T) {
require.NoError(t, err)

// Broadcast new round messages to leader to build a block
broadcastMessages(t, newRoundMessages, leader)
broadcastMessages(t, newRoundMessages, IdToNodeMapping{leaderId: pocketNodes[leaderId]})

var step typesCons.HotstuffStep
var pivotTime = 1 * time.Millisecond // Min time it takes to switch from NewRound to Prepare step
minTimeIncrement := 1 * time.Millisecond // Min time it takes to switch from NewRound to Prepare step

// Give go routines time to trigger
advanceTime(t, clockMock, 0)

// We get consensus module from leader to get its POV
leaderConsensusModule := leader[leaderId].GetBus().GetConsensusModule()
leaderConsensusModule := pocketNodes[leaderId].GetBus().GetConsensusModule()

// Make sure all nodes are aligned to the same leader
for _, pocketNode := range pocketNodes {
Expand All @@ -248,14 +241,15 @@ func TestPacemaker_MinBlockTime(t *testing.T) {
require.Equal(t, consensus.NewRound, step)

// Advance time right before minBlockTime triggers
advanceTime(t, clockMock, time.Duration(paceMakerMinBlockTimeMsec*uint64(time.Millisecond))-pivotTime)
paceMakerPreActivationTime := time.Duration(paceMakerMinBlockTimeMsec*uint64(time.Millisecond)) - minTimeIncrement
advanceTime(t, clockMock, paceMakerPreActivationTime)

// Should still be blocking proposal step
step = typesCons.HotstuffStep(leaderConsensusModule.CurrentStep())
require.Equal(t, consensus.NewRound, step)

// Advance time just enough to trigger minBlockTime
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
advanceTime(t, clockMock, pivotTime)
advanceTime(t, clockMock, minTimeIncrement)
step = typesCons.HotstuffStep(leaderConsensusModule.CurrentStep())

// Time advanced by minBlockTime
Expand All @@ -275,12 +269,13 @@ func TestPacemaker_MinBlockTime_AllowOnlyLatestBlockPrep(t *testing.T) {
}

// TODO: Mempool reaped is the one present at minBlockTime or later.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// Since leader could just reap an earlier mempool and just blocks broadcasting the proposal
func TestPacemaker_MinBlockTime_DelayReapMempool(t *testing.T) {
t.Skip()
}

// TODO: Successive blocks timings are at least minBlockTime apart.
func TestPacemaker_MinBlockTime_BehaviorAcrossMultipleBlocks(t *testing.T) {
func TestPacemaker_MinBlockTime_ConsecutiveBlocksAtLeastMinBlockTimeApart(t *testing.T) {
t.Skip()
}

Expand Down
2 changes: 0 additions & 2 deletions consensus/e2e_tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,8 +840,6 @@ func advanceTime(t *testing.T, clck *clock.Mock, duration time.Duration) {
clck.Add(duration)
t.Logf("[⌚ CLOCK ⏩] advanced by %v", duration)
logTime(t, clck)
// Give goroutines a chance to run
clck.Add(0)
}

// sleep pauses the goroutine for the given duration on the mock clock and logs what just happened.
Expand Down
5 changes: 2 additions & 3 deletions consensus/hotstuff_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
// TODO: Add test to make sure same block is not applied twice if round is interrupted after being 'Applied'.
// TODO: Add more unit tests for these checks...
if m.shouldPrepareNewBlock(highPrepareQC) {
// Place this where we want to start the timer
// Placed here, timer starts when we receive enough NewRound messages
m.paceMaker.RegisterMinBlockTimeDelay()
// Leader should prepare a new block. Introducing a delay based on configurations.
m.paceMaker.StartMinBlockTimeDelay()

// This function delays block preparation and returns false if a concurrent preparation request with higher QC is available
if shouldPrepareBlock := m.paceMaker.DelayBlockPreparation(); !shouldPrepareBlock {
Expand Down
67 changes: 36 additions & 31 deletions consensus/pacemaker/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Pacemaker interface {
PacemakerDebug

ShouldHandleMessage(message *typesCons.HotstuffMessage) (bool, error)
RegisterMinBlockTimeDelay()
StartMinBlockTimeDelay()
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
DelayBlockPreparation() bool
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

RestartTimer()
Expand All @@ -64,14 +64,15 @@ type pacemaker struct {
logPrefix string
}

// Structure to handle delaying block preparation (reaping the block mempool)
// by adding a delay before the next prepare request
// prepareStepDelayer delays block preparation (mempool reaping)
// by adding a delay before the next prepare request to prevent block creation
type prepareStepDelayer struct {
m sync.Mutex // Mutex locking access to this structure and prevent inconsistent state
ch chan bool // Whenever there is a block proposal request arriving before the timeout, this channel is used to signal to it to whether build the block or not (if better candidate request with higher QC)
cancelFunc context.CancelFunc // This is used to cancel an ongoing timeout. It should not happen, but future code changes may no longer preserve this guarantee, so this feature maintain its own cancellation logic
blockProposed bool
deadlinePassed bool
m sync.Mutex // mutex locking access to this structure and prevent inconsistent state
ch chan bool // whenever there is a block proposal request arriving before the timeout, this channel is used to signal to it to whether build the block or not (if better candidate request with higher QC)
cancelFunc context.CancelFunc // cancels an ongoing timeout. It should not happen, but future code changes may no longer preserve this guarantee, so this feature maintain its own cancellation logic
shouldProposeBlock bool // a flag to capture whether a block was proposed, so later calls will skip building the block
delayExhausted bool // the delay for this step/round has already passed, so should create the block ASAP
minBlockTime time.Duration // the minimum time to wait before trying to propose a block
}

func CreatePacemaker(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
Expand Down Expand Up @@ -100,11 +101,12 @@ func (*pacemaker) Create(bus modules.Bus, options ...modules.ModuleOption) (modu
quorumCertificate: nil,
}
m.prepareStepDelayer = prepareStepDelayer{
m: sync.Mutex{},
ch: nil,
cancelFunc: nil,
blockProposed: false,
deadlinePassed: false,
m: sync.Mutex{},
ch: nil,
cancelFunc: nil,
shouldProposeBlock: false,
delayExhausted: false,
minBlockTime: time.Duration(m.pacemakerCfg.MinBlockTimeMsec * uint64(time.Millisecond)),
}

return m, nil
Expand Down Expand Up @@ -192,7 +194,6 @@ func (m *pacemaker) ShouldHandleMessage(msg *typesCons.HotstuffMessage) (bool, e

func (m *pacemaker) RestartTimer() {
// NOTE: Not deferring a cancel call because this function is asynchronous.
// DISCUSS: Should we have a lock to manipulate m.roundCancelFunc?
if m.roundCancelFunc != nil {
m.roundCancelFunc()
}
Expand Down Expand Up @@ -265,69 +266,73 @@ func (m *pacemaker) NewHeight() {
)
}

// This is called each time the system wants a delayed signal to build a block
func (m *pacemaker) RegisterMinBlockTimeDelay() {
// Discard any previous timer
// DISCUSS: This should not happen, Identify cases where an active timer has to be discarded, remove cancellation logic if none
// StartMinBlockTimeDelay should be called when a delay should be introduced into proposing a new block
func (m *pacemaker) StartMinBlockTimeDelay() {
// Discard any previous timer if one exists
if m.prepareStepDelayer.cancelFunc != nil {
m.logger.Warn().Msg("RegisterMinBlockTimeDelay has an existing timer which should not happen. Releasing for now...")
m.prepareStepDelayer.cancelFunc()
}

m.prepareStepDelayer.blockProposed = false
m.prepareStepDelayer.deadlinePassed = false
m.prepareStepDelayer.shouldProposeBlock = false
m.prepareStepDelayer.delayExhausted = false

minBlockTime := time.Duration(m.pacemakerCfg.MinBlockTimeMsec * uint64(time.Millisecond))
ctx, cancel := context.WithCancel(context.TODO())
m.prepareStepDelayer.cancelFunc = cancel

// Start a timer to wait for the MinBlockTimeMsec delay
// If a channel is provided, signal when the timer expires to it
go func() {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
select {
// only called if the delay timer is explicitly cancelled
case <-ctx.Done():
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
return
case <-m.GetBus().GetRuntimeMgr().GetClock().After(minBlockTime):
// called after the minimum block delay is exhausted meaning it is time to propose a new block
case <-m.GetBus().GetRuntimeMgr().GetClock().After(m.prepareStepDelayer.minBlockTime):
m.prepareStepDelayer.m.Lock()
defer m.prepareStepDelayer.m.Unlock()

// After the timeout, if there was any candidate request waiting for a signal, tell it to build the block
// After the timeout, if there was any `HotstuffLeaderMessageHandler.HandleNewRoundMessage()` call delayed to propose a block, unblock it by emitting true
if m.prepareStepDelayer.ch != nil {
m.prepareStepDelayer.ch <- true
close(m.prepareStepDelayer.ch)
m.prepareStepDelayer.blockProposed = true
m.prepareStepDelayer.shouldProposeBlock = true
}

// From now on, build the block ASAP
m.prepareStepDelayer.deadlinePassed = true
m.prepareStepDelayer.delayExhausted = true

// No need to cancel the context anymore
m.prepareStepDelayer.cancelFunc = nil
}
}()
}

// This is called when conditions are met by the leader to build a block but still needs to wait for the MinBlockTimeMsec delay before reaping the mempool
// DelayBlockPreparation is called when conditions are met by the leader to build a block but still needs to wait for the MinBlockTimeMsec delay before reaping the mempool.
// With MinBlockTimeMsec delay, multiple concurrent calls may happen
// DelayBlockPreparation is a synchronous blocking function that waits for channel to emit whether to propose a block or not given multiple HotstuffLeaderMessageHandler.HandleNewRoundMessage calling this function concurrently.
// It makes sure that:
// - Block proposal is made by only one of the possible `HotstuffLeaderMessageHandler.HandleNewRoundMessage()` concurrent (because delayed) calls
// - If the timer expires, the first call to this method will trigger the block proposal
// - If a late message is received AFTER the a block is marked as proposed by another call, the late message is discarded
// - Reads and affectations to pacemaker.DelayBlockPreparation state are protected by a mutex
// - Reads and assignments to pacemaker.prepareStepDelayer state are protected by a mutex
func (m *pacemaker) DelayBlockPreparation() bool {
m.prepareStepDelayer.m.Lock()

red-0ne marked this conversation as resolved.
Show resolved Hide resolved
if m.prepareStepDelayer.blockProposed {
if m.prepareStepDelayer.shouldProposeBlock {
m.prepareStepDelayer.m.Unlock()
return false
}

// If there already is a channel signaling block proposal, make sure it gives up
// If there already is a channel signaling block proposal, make sure it does not propose a block
if m.prepareStepDelayer.ch != nil {
m.prepareStepDelayer.ch <- false
close(m.prepareStepDelayer.ch)
}

// Deadline has passed, no need to have a channel, propose a block now
if m.prepareStepDelayer.deadlinePassed {
m.prepareStepDelayer.blockProposed = true
if m.prepareStepDelayer.delayExhausted {
m.prepareStepDelayer.shouldProposeBlock = true
m.prepareStepDelayer.m.Unlock()
return true
}
Expand Down
4 changes: 3 additions & 1 deletion runtime/configs/proto/consensus_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ message PacemakerConfig {
uint64 timeout_msec = 1;
bool manual = 2;
uint64 debug_time_between_steps_msec = 3;
uint64 min_block_time_msec = 4; // consenus protocol could produce blocks as soon as quorum is reached. This option allows to set min time between blocks and give more time to the mempool to fill up
// consenus could produce blocks as soon as a quorum is reached; responsivness per the Hotstuff whitepaper.
// This option allows to set min time between blocks and gives more time to the mempool to fill up; similar to timeout_propose in Tendermint.
uint64 min_block_time_msec = 4;
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}