Skip to content

Commit

Permalink
Merge pull request #2396 from iotaledger/mempool-proposal-delay
Browse files Browse the repository at this point in the history
Minimal delay between consensus runs introduced.
  • Loading branch information
kape1395 authored Apr 26, 2023
2 parents 78fdaa5 + d86a501 commit 3a2113e
Show file tree
Hide file tree
Showing 20 changed files with 164 additions and 53 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ build: compile-solidity build-cli

build-lint: build lint

gendoc:
./scripts/gendoc.sh

test-full: install
go test -tags $(BUILD_TAGS),runheavy ./... --timeout 60m --count 1 -failfast

Expand Down
1 change: 1 addition & 0 deletions components/chains/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func provide(c *dig.Container) error {
ParamsChains.PullMissingRequestsFromCommittee,
ParamsChains.DeriveAliasOutputByQuorum,
ParamsChains.PipeliningLimit,
ParamsChains.ConsensusDelay,
deps.NetworkProvider,
deps.TrustedNetworkManager,
deps.ChainStateDatabaseManager.ChainStateKVStore,
Expand Down
1 change: 1 addition & 0 deletions components/chains/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ParametersChains struct {
PullMissingRequestsFromCommittee bool `default:"true" usage:"whether or not to pull missing requests from other committee members"`
DeriveAliasOutputByQuorum bool `default:"true" usage:"false means we propose own AliasOutput, true - by majority vote."`
PipeliningLimit int `default:"-1" usage:"-1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1."`
ConsensusDelay time.Duration `default:"500ms" usage:"Minimal delay between consensus runs."`
}

type ParametersWAL struct {
Expand Down
3 changes: 2 additions & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
"apiCacheTTL": "5m",
"pullMissingRequestsFromCommittee": true,
"deriveAliasOutputByQuorum": true,
"pipeliningLimit": -1
"pipeliningLimit": -1,
"consensusDelay": "500ms"
},
"wal": {
"enabled": true,
Expand Down
6 changes: 4 additions & 2 deletions config_defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
"apiCacheTTL": "5m",
"pullMissingRequestsFromCommittee": true,
"deriveAliasOutputByQuorum": true,
"pipeliningLimit": -1
"pipeliningLimit": -1,
"consensusDelay": "500ms"
},
"wal": {
"enabled": true,
Expand Down Expand Up @@ -122,6 +123,7 @@
"restAPIMetrics": true,
"goMetrics": true,
"processMetrics": true,
"promhttpMetrics": true
"promhttpMetrics": true,
"webAPIMetrics": true
}
}
8 changes: 6 additions & 2 deletions documentation/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ Example:
| pullMissingRequestsFromCommittee | Whether or not to pull missing requests from other committee members | boolean | true |
| deriveAliasOutputByQuorum | False means we propose own AliasOutput, true - by majority vote. | boolean | true |
| pipeliningLimit | -1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1. | int | -1 |
| consensusDelay | Minimal delay between consensus runs. | string | "500ms" |

Example:

Expand All @@ -288,7 +289,8 @@ Example:
"apiCacheTTL": "5m",
"pullMissingRequestsFromCommittee": true,
"deriveAliasOutputByQuorum": true,
"pipeliningLimit": -1
"pipeliningLimit": -1,
"consensusDelay": "500ms"
}
}
```
Expand Down Expand Up @@ -445,6 +447,7 @@ Example:
| goMetrics | Whether to include go metrics | boolean | true |
| processMetrics | Whether to include process metrics | boolean | true |
| promhttpMetrics | Whether to include promhttp metrics | boolean | true |
| webAPIMetrics | Whether to include webapi metrics | boolean | true |

Example:

Expand All @@ -464,7 +467,8 @@ Example:
"restAPIMetrics": true,
"goMetrics": true,
"processMetrics": true,
"promhttpMetrics": true
"promhttpMetrics": true,
"webAPIMetrics": true
}
}
```
Expand Down
17 changes: 17 additions & 0 deletions packages/chain/chainmanager/chain_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ func (cmi *chainMgrImpl) Input(input gpa.Input) gpa.OutMessages {
return cmi.handleInputConsensusOutputSkip(input)
case *inputConsensusTimeout:
return cmi.handleInputConsensusTimeout(input)
case *inputCanPropose:
return cmi.handleInputCanPropose()
}
panic(fmt.Errorf("unexpected input %T: %+v", input, input))
}
Expand Down Expand Up @@ -394,6 +396,13 @@ func (cmi *chainMgrImpl) handleInputConsensusTimeout(input *inputConsensusTimeou
})
}

func (cmi *chainMgrImpl) handleInputCanPropose() gpa.OutMessages {
cmi.log.Debugf("handleInputCanPropose")
return cmi.withAllCmtLogs(func(cl gpa.GPA) gpa.OutMessages {
return cl.Input(cmt_log.NewInputCanPropose())
})
}

// > UPON Reception of CmtLog.NextLI message:
// > Forward it to the corresponding CmtLog; HandleCmtLogOutput.
func (cmi *chainMgrImpl) handleMsgCmtLog(msg *msgCmtLog) gpa.OutMessages {
Expand Down Expand Up @@ -542,6 +551,14 @@ func (cmi *chainMgrImpl) withCmtLog(committeeAddr iotago.Ed25519Address, handler
return gpa.NoMessages().AddAll(cmi.handleCmtLogOutput(cli, handler(cli.gpaInstance)))
}

func (cmi *chainMgrImpl) withAllCmtLogs(handler func(cl gpa.GPA) gpa.OutMessages) gpa.OutMessages {
msgs := gpa.NoMessages()
for _, cli := range cmi.cmtLogs {
msgs.AddAll(cmi.handleCmtLogOutput(cli, handler(cli.gpaInstance)))
}
return msgs
}

// NOTE: ErrNotInCommittee
func (cmi *chainMgrImpl) ensureCmtLog(committeeAddr iotago.Ed25519Address) (*cmtLogInst, error) {
if cli, ok := cmi.cmtLogs[committeeAddr]; ok {
Expand Down
18 changes: 18 additions & 0 deletions packages/chain/chainmanager/input_can_propose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2020 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

package chainmanager

import (
"github.com/iotaledger/wasp/packages/gpa"
)

type inputCanPropose struct{}

func NewInputCanPropose() gpa.Input {
return &inputCanPropose{}
}

func (inp *inputCanPropose) String() string {
return "{chainMgr.inputCanPropose}"
}
43 changes: 35 additions & 8 deletions packages/chain/cmt_log/cmt_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ type cmtLogImpl struct {
varLogIndex VarLogIndex // Calculates the current log index.
varLocalView VarLocalView // Tracks the pending alias outputs.
varRunning VarRunning // Tracks the latest LI.
output *Output // The current request for a consensus.
outputCandidate *Output // We are about to propose this consensus, but we have to wait for time notification.
outputCanPropose bool // True, if the next proposal can be made without waiting more time notifications.
outputProposed *Output // The current request for a consensus.
asGPA gpa.GPA // This object, just with all the needed wrappers.
log *logger.Logger
}
Expand Down Expand Up @@ -228,6 +230,9 @@ func New(
varLogIndex: NewVarLogIndex(nodeIDs, n, f, prevLI, func(li LogIndex, ao *isc.AliasOutputWithID) {}, deriveAOByQuorum, log.Named("VLI")),
varLocalView: NewVarLocalView(pipeliningLimit, log.Named("VLV")),
varRunning: NewVarRunning(log.Named("VR")),
outputCandidate: nil,
outputCanPropose: true,
outputProposed: nil,
log: log,
}
cl.asGPA = gpa.NewOwnHandler(me, cl)
Expand Down Expand Up @@ -255,6 +260,8 @@ func (cl *cmtLogImpl) Input(input gpa.Input) gpa.OutMessages {
return cl.handleInputConsensusOutputRejected(input)
case *inputConsensusTimeout:
return cl.handleInputConsensusTimeout(input)
case *inputCanPropose:
return cl.handleInputCanPropose()
case *inputSuspend:
return cl.handleInputSuspend()
}
Expand Down Expand Up @@ -331,12 +338,25 @@ func (cl *cmtLogImpl) handleInputConsensusTimeout(input *inputConsensusTimeout)
return nil
}

func (cl *cmtLogImpl) handleInputCanPropose() gpa.OutMessages {
if cl.outputProposed == nil && cl.outputCandidate != nil {
// Proposal is already pending, so we output it.
// Then we already used this allowance, thus keep the can_propose false.
cl.outputProposed = cl.outputCandidate
cl.outputCanPropose = false
return nil
}
cl.outputCanPropose = true
return nil
}

// > ON Suspend:
// > ...
func (cl *cmtLogImpl) handleInputSuspend() gpa.OutMessages {
cl.log.Infof("Committee suspended.")
cl.suspended = true
cl.output = nil
cl.outputCandidate = nil
cl.outputProposed = nil
return cl.tryProposeConsensus(nil)
}

Expand All @@ -349,16 +369,16 @@ func (cl *cmtLogImpl) handleMsgNextLogIndex(msg *msgNextLogIndex) gpa.OutMessage

// Implements the gpa.GPA interface.
func (cl *cmtLogImpl) Output() gpa.Output {
if cl.output == nil {
if cl.outputProposed == nil {
return nil // Untyped nil!
}
return cl.output
return cl.outputProposed
}

// Implements the gpa.GPA interface.
func (cl *cmtLogImpl) StatusString() string {
vliLI, _ := cl.varLogIndex.Value()
return fmt.Sprintf("{cmtLogImpl, LogIndex=%v, output=%+v, %v, %v}", vliLI, cl.output, cl.varLocalView.StatusString(), cl.varLogIndex.StatusString())
return fmt.Sprintf("{cmtLogImpl, LogIndex=%v, output=%+v, %v, %v}", vliLI, cl.outputProposed, cl.varLocalView.StatusString(), cl.varLogIndex.StatusString())
}

// > PROCEDURE TryProposeConsensus:
Expand All @@ -380,7 +400,7 @@ func (cl *cmtLogImpl) tryProposeConsensus(msgs gpa.OutMessages) gpa.OutMessages
}
//
// Check, maybe it is already started.
if cl.output != nil && cl.output.logIndex == logIndex {
if cl.outputCandidate != nil && cl.outputCandidate.logIndex == logIndex {
// Already started, keep it as is.
return msgs
}
Expand All @@ -405,12 +425,19 @@ func (cl *cmtLogImpl) tryProposeConsensus(msgs gpa.OutMessages) gpa.OutMessages
//
// Start the consensus (ask the upper layer to start it).
cl.consensusLI = logIndex
cl.output = makeOutput(cl.consensusLI, baseAO)
cl.outputCandidate = makeOutput(cl.consensusLI, baseAO)
if cl.outputCanPropose {
cl.outputProposed = cl.outputCandidate
cl.outputCanPropose = false
} else {
cl.outputProposed = nil
}
cl.varRunning.ConsensusProposed(logIndex)
} else {
// > ELSE
// > Don't propose any consensus.
cl.output = nil // Outdated, clear it away.
cl.outputCandidate = nil // Outdated, clear it away.
cl.outputProposed = nil
}
return msgs
}
17 changes: 17 additions & 0 deletions packages/chain/cmt_log/input_can_propose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package cmt_log

import "github.com/iotaledger/wasp/packages/gpa"

// This event is introduced to avoid too-often consensus runs.
// They can produce more blocks than the PoV allows to confirm them.
// With this event the consensus will be proposed / started with
// a maximal rate defined by these events.
type inputCanPropose struct{}

func NewInputCanPropose() gpa.Input {
return &inputCanPropose{}
}

func (inp *inputCanPropose) String() string {
return "{cmtLog.inputCanPropose}"
}
4 changes: 2 additions & 2 deletions packages/chain/cons/cons_gr/gr.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// Interfaces required from other components (MP, SM)

type Mempool interface {
ConsensusProposalsAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef
ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef
ConsensusRequestsAsync(ctx context.Context, requestRefs []*isc.RequestRef) <-chan []isc.Request
}

Expand Down Expand Up @@ -337,7 +337,7 @@ func (cgr *ConsGr) tryHandleOutput() { //nolint:gocyclo
}
output := outputUntyped.(*cons.Output)
if output.NeedMempoolProposal != nil && !cgr.mempoolProposalsAsked {
cgr.mempoolProposalsRespCh = cgr.mempool.ConsensusProposalsAsync(cgr.ctx, output.NeedMempoolProposal)
cgr.mempoolProposalsRespCh = cgr.mempool.ConsensusProposalAsync(cgr.ctx, output.NeedMempoolProposal)
cgr.mempoolProposalsAsked = true
}
if output.NeedMempoolRequests != nil && !cgr.mempoolRequestsAsked {
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/cons/cons_gr/gr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (tmp *testMempool) tryRespondRequestQueries() {
tmp.qRequests = remaining
}

func (tmp *testMempool) ConsensusProposalsAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef {
func (tmp *testMempool) ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef {
tmp.lock.Lock()
defer tmp.lock.Unlock()
outputID := aliasOutput.OutputID()
Expand Down
Loading

0 comments on commit 3a2113e

Please sign in to comment.