Skip to content

Commit

Permalink
ConsensusDelay option introduced.
Browse files Browse the repository at this point in the history
  • Loading branch information
kape1395 committed Apr 26, 2023
1 parent f4d0a96 commit 672fdc5
Show file tree
Hide file tree
Showing 16 changed files with 125 additions and 17 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ build: compile-solidity build-cli

build-lint: build lint

gendoc:
./scripts/gendoc.sh

test-full: install
go test -tags $(BUILD_TAGS),runheavy ./... --timeout 60m --count 1 -failfast

Expand Down
1 change: 1 addition & 0 deletions components/chains/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func provide(c *dig.Container) error {
ParamsChains.PullMissingRequestsFromCommittee,
ParamsChains.DeriveAliasOutputByQuorum,
ParamsChains.PipeliningLimit,
ParamsChains.ConsensusDelay,
deps.NetworkProvider,
deps.TrustedNetworkManager,
deps.ChainStateDatabaseManager.ChainStateKVStore,
Expand Down
1 change: 1 addition & 0 deletions components/chains/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ParametersChains struct {
PullMissingRequestsFromCommittee bool `default:"true" usage:"whether or not to pull missing requests from other committee members"`
DeriveAliasOutputByQuorum bool `default:"true" usage:"false means we propose own AliasOutput, true - by majority vote."`
PipeliningLimit int `default:"-1" usage:"-1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1."`
ConsensusDelay time.Duration `default:"500ms" usage:"Minimal delay between consensus runs."`
}

type ParametersWAL struct {
Expand Down
3 changes: 2 additions & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
"apiCacheTTL": "5m",
"pullMissingRequestsFromCommittee": true,
"deriveAliasOutputByQuorum": true,
"pipeliningLimit": -1
"pipeliningLimit": -1,
"consensusDelay": "500ms"
},
"wal": {
"enabled": true,
Expand Down
6 changes: 4 additions & 2 deletions config_defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
"apiCacheTTL": "5m",
"pullMissingRequestsFromCommittee": true,
"deriveAliasOutputByQuorum": true,
"pipeliningLimit": -1
"pipeliningLimit": -1,
"consensusDelay": "500ms"
},
"wal": {
"enabled": true,
Expand Down Expand Up @@ -122,6 +123,7 @@
"restAPIMetrics": true,
"goMetrics": true,
"processMetrics": true,
"promhttpMetrics": true
"promhttpMetrics": true,
"webAPIMetrics": true
}
}
8 changes: 6 additions & 2 deletions documentation/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ Example:
| pullMissingRequestsFromCommittee | Whether or not to pull missing requests from other committee members | boolean | true |
| deriveAliasOutputByQuorum | False means we propose own AliasOutput, true - by majority vote. | boolean | true |
| pipeliningLimit | -1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1. | int | -1 |
| consensusDelay | Minimal delay between consensus runs. | string | "500ms" |

Example:

Expand All @@ -288,7 +289,8 @@ Example:
"apiCacheTTL": "5m",
"pullMissingRequestsFromCommittee": true,
"deriveAliasOutputByQuorum": true,
"pipeliningLimit": -1
"pipeliningLimit": -1,
"consensusDelay": "500ms"
}
}
```
Expand Down Expand Up @@ -445,6 +447,7 @@ Example:
| goMetrics | Whether to include go metrics | boolean | true |
| processMetrics | Whether to include process metrics | boolean | true |
| promhttpMetrics | Whether to include promhttp metrics | boolean | true |
| webAPIMetrics | Whether to include webapi metrics | boolean | true |

Example:

Expand All @@ -464,7 +467,8 @@ Example:
"restAPIMetrics": true,
"goMetrics": true,
"processMetrics": true,
"promhttpMetrics": true
"promhttpMetrics": true,
"webAPIMetrics": true
}
}
```
Expand Down
17 changes: 17 additions & 0 deletions packages/chain/chainmanager/chain_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ func (cmi *chainMgrImpl) Input(input gpa.Input) gpa.OutMessages {
return cmi.handleInputConsensusOutputSkip(input)
case *inputConsensusTimeout:
return cmi.handleInputConsensusTimeout(input)
case *inputCanPropose:
return cmi.handleInputCanPropose()
}
panic(fmt.Errorf("unexpected input %T: %+v", input, input))
}
Expand Down Expand Up @@ -394,6 +396,13 @@ func (cmi *chainMgrImpl) handleInputConsensusTimeout(input *inputConsensusTimeou
})
}

func (cmi *chainMgrImpl) handleInputCanPropose() gpa.OutMessages {
cmi.log.Debugf("handleInputCanPropose")
return cmi.withAllCmtLogs(func(cl gpa.GPA) gpa.OutMessages {
return cl.Input(cmt_log.NewInputCanPropose())
})
}

// > UPON Reception of CmtLog.NextLI message:
// > Forward it to the corresponding CmtLog; HandleCmtLogOutput.
func (cmi *chainMgrImpl) handleMsgCmtLog(msg *msgCmtLog) gpa.OutMessages {
Expand Down Expand Up @@ -542,6 +551,14 @@ func (cmi *chainMgrImpl) withCmtLog(committeeAddr iotago.Ed25519Address, handler
return gpa.NoMessages().AddAll(cmi.handleCmtLogOutput(cli, handler(cli.gpaInstance)))
}

func (cmi *chainMgrImpl) withAllCmtLogs(handler func(cl gpa.GPA) gpa.OutMessages) gpa.OutMessages {
msgs := gpa.NoMessages()
for _, cli := range cmi.cmtLogs {
msgs.AddAll(cmi.handleCmtLogOutput(cli, handler(cli.gpaInstance)))
}
return msgs
}

// NOTE: ErrNotInCommittee
func (cmi *chainMgrImpl) ensureCmtLog(committeeAddr iotago.Ed25519Address) (*cmtLogInst, error) {
if cli, ok := cmi.cmtLogs[committeeAddr]; ok {
Expand Down
18 changes: 18 additions & 0 deletions packages/chain/chainmanager/input_can_propose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2020 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

package chainmanager

import (
"github.com/iotaledger/wasp/packages/gpa"
)

type inputCanPropose struct{}

func NewInputCanPropose() gpa.Input {
return &inputCanPropose{}
}

func (inp *inputCanPropose) String() string {
return "{chainMgr.inputCanPropose}"
}
43 changes: 35 additions & 8 deletions packages/chain/cmt_log/cmt_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ type cmtLogImpl struct {
varLogIndex VarLogIndex // Calculates the current log index.
varLocalView VarLocalView // Tracks the pending alias outputs.
varRunning VarRunning // Tracks the latest LI.
output *Output // The current request for a consensus.
outputCandidate *Output // We are about to propose this consensus, but we have to wait for time notification.
outputCanPropose bool // True, if the next proposal can be made without waiting more time notifications.
outputProposed *Output // The current request for a consensus.
asGPA gpa.GPA // This object, just with all the needed wrappers.
log *logger.Logger
}
Expand Down Expand Up @@ -228,6 +230,9 @@ func New(
varLogIndex: NewVarLogIndex(nodeIDs, n, f, prevLI, func(li LogIndex, ao *isc.AliasOutputWithID) {}, deriveAOByQuorum, log.Named("VLI")),
varLocalView: NewVarLocalView(pipeliningLimit, log.Named("VLV")),
varRunning: NewVarRunning(log.Named("VR")),
outputCandidate: nil,
outputCanPropose: true,
outputProposed: nil,
log: log,
}
cl.asGPA = gpa.NewOwnHandler(me, cl)
Expand Down Expand Up @@ -255,6 +260,8 @@ func (cl *cmtLogImpl) Input(input gpa.Input) gpa.OutMessages {
return cl.handleInputConsensusOutputRejected(input)
case *inputConsensusTimeout:
return cl.handleInputConsensusTimeout(input)
case *inputCanPropose:
return cl.handleInputCanPropose()
case *inputSuspend:
return cl.handleInputSuspend()
}
Expand Down Expand Up @@ -331,12 +338,25 @@ func (cl *cmtLogImpl) handleInputConsensusTimeout(input *inputConsensusTimeout)
return nil
}

func (cl *cmtLogImpl) handleInputCanPropose() gpa.OutMessages {
if cl.outputProposed == nil && cl.outputCandidate != nil {
// Proposal is already pending, so we output it.
// Then we already used this allowance, thus keep the can_propose false.
cl.outputProposed = cl.outputCandidate
cl.outputCanPropose = false
return nil
}
cl.outputCanPropose = true
return nil
}

// > ON Suspend:
// > ...
func (cl *cmtLogImpl) handleInputSuspend() gpa.OutMessages {
cl.log.Infof("Committee suspended.")
cl.suspended = true
cl.output = nil
cl.outputCandidate = nil
cl.outputProposed = nil
return cl.tryProposeConsensus(nil)
}

Expand All @@ -349,16 +369,16 @@ func (cl *cmtLogImpl) handleMsgNextLogIndex(msg *msgNextLogIndex) gpa.OutMessage

// Implements the gpa.GPA interface.
func (cl *cmtLogImpl) Output() gpa.Output {
if cl.output == nil {
if cl.outputProposed == nil {
return nil // Untyped nil!
}
return cl.output
return cl.outputProposed
}

// Implements the gpa.GPA interface.
func (cl *cmtLogImpl) StatusString() string {
vliLI, _ := cl.varLogIndex.Value()
return fmt.Sprintf("{cmtLogImpl, LogIndex=%v, output=%+v, %v, %v}", vliLI, cl.output, cl.varLocalView.StatusString(), cl.varLogIndex.StatusString())
return fmt.Sprintf("{cmtLogImpl, LogIndex=%v, output=%+v, %v, %v}", vliLI, cl.outputProposed, cl.varLocalView.StatusString(), cl.varLogIndex.StatusString())
}

// > PROCEDURE TryProposeConsensus:
Expand All @@ -380,7 +400,7 @@ func (cl *cmtLogImpl) tryProposeConsensus(msgs gpa.OutMessages) gpa.OutMessages
}
//
// Check, maybe it is already started.
if cl.output != nil && cl.output.logIndex == logIndex {
if cl.outputCandidate != nil && cl.outputCandidate.logIndex == logIndex {
// Already started, keep it as is.
return msgs
}
Expand All @@ -405,12 +425,19 @@ func (cl *cmtLogImpl) tryProposeConsensus(msgs gpa.OutMessages) gpa.OutMessages
//
// Start the consensus (ask the upper layer to start it).
cl.consensusLI = logIndex
cl.output = makeOutput(cl.consensusLI, baseAO)
cl.outputCandidate = makeOutput(cl.consensusLI, baseAO)
if cl.outputCanPropose {
cl.outputProposed = cl.outputCandidate
cl.outputCanPropose = false
} else {
cl.outputProposed = nil
}
cl.varRunning.ConsensusProposed(logIndex)
} else {
// > ELSE
// > Don't propose any consensus.
cl.output = nil // Outdated, clear it away.
cl.outputCandidate = nil // Outdated, clear it away.
cl.outputProposed = nil
}
return msgs
}
17 changes: 17 additions & 0 deletions packages/chain/cmt_log/input_can_propose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package cmt_log

import "github.com/iotaledger/wasp/packages/gpa"

// This event is introduced to avoid too-often consensus runs.
// They can produce more blocks than the PoV allows to confirm them.
// With this event the consensus will be proposed / started with
// a maximal rate defined by these events.
type inputCanPropose struct{}

func NewInputCanPropose() gpa.Input {
return &inputCanPropose{}
}

func (inp *inputCanPropose) String() string {
return "{cmtLog.inputCanPropose}"
}
10 changes: 10 additions & 0 deletions packages/chain/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ type chainNodeImpl struct {
stateTrackerCnf StateTracker
blockWAL sm_gpa_utils.BlockWAL
//
// Configuration values.
consensusDelay time.Duration
//
// Information for other components.
listener ChainListener // Object expecting event notifications.
accessLock *sync.RWMutex // Mutex for accessing informative fields from other threads.
Expand Down Expand Up @@ -267,6 +270,7 @@ func New(
onChainDisconnect func(),
deriveAliasOutputByQuorum bool,
pipeliningLimit int,
consensusDelay time.Duration,
) (Chain, error) {
log.Debugf("Starting the chain, chainID=%v", chainID)
if listener == nil {
Expand Down Expand Up @@ -297,6 +301,7 @@ func New(
stateTrackerAct: nil, // Set bellow.
stateTrackerCnf: nil, // Set bellow.
blockWAL: blockWAL,
consensusDelay: consensusDelay,
listener: listener,
accessLock: &sync.RWMutex{},
activeCommitteeDKShare: nil,
Expand Down Expand Up @@ -492,6 +497,7 @@ func (cni *chainNodeImpl) run(ctx context.Context, cleanupFunc context.CancelFun
consRecoverPipeOutCh := cni.consRecoverPipe.Out()
serversUpdatedPipeOutCh := cni.serversUpdatedPipe.Out()
redeliveryPeriodTicker := time.NewTicker(redeliveryPeriod)
consensusDelayTicker := time.NewTicker(cni.consensusDelay)
for {
if ctx.Err() != nil {
if cni.shutdownCoordinator == nil {
Expand Down Expand Up @@ -571,8 +577,12 @@ func (cni *chainNodeImpl) run(ctx context.Context, cleanupFunc context.CancelFun
if ok {
cni.stateTrackerCnf.ChainNodeStateMgrResponse(resp)
}
case <-consensusDelayTicker.C:
cni.sendMessages(cni.chainMgr.Input(chainmanager.NewInputCanPropose()))
cni.handleChainMgrOutput(ctx, cni.chainMgr.Output())
case t := <-redeliveryPeriodTicker.C:
cni.sendMessages(cni.chainMgr.Input(cni.chainMgr.MakeTickInput(t)))
cni.handleChainMgrOutput(ctx, cni.chainMgr.Output())
case <-ctx.Done():
continue
}
Expand Down
1 change: 1 addition & 0 deletions packages/chain/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv {
nil,
true,
-1,
10*time.Millisecond,
)
require.NoError(t, err)
te.nodes[i].ServersUpdated(te.peerPubKeys)
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/statemanager/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestCruelWorld(t *testing.T) {
}

func getRandomProducedBlockAIndex(blockProduced []*atomic.Bool) int {
//nolint:revive // we ingore the empty-block here because we wait for blockProduced 0 to become true
//nolint:revive // we ignore the empty-block here because we wait for blockProduced 0 to become true
for !blockProduced[0].Load() {
}
var maxIndex int
Expand Down
4 changes: 4 additions & 0 deletions packages/chains/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Chains struct {
pullMissingRequestsFromCommittee bool
deriveAliasOutputByQuorum bool
pipeliningLimit int
consensusDelay time.Duration

networkProvider peering.NetworkProvider
trustedNetworkManager peering.TrustedNetworkManager
Expand Down Expand Up @@ -88,6 +89,7 @@ func New(
pullMissingRequestsFromCommittee bool, // TODO: Unused for now.
deriveAliasOutputByQuorum bool,
pipeliningLimit int,
consensusDelay time.Duration,
networkProvider peering.NetworkProvider,
trustedNetworkManager peering.TrustedNetworkManager,
chainStateStoreProvider database.ChainStateKVStoreProvider,
Expand All @@ -111,6 +113,7 @@ func New(
pullMissingRequestsFromCommittee: pullMissingRequestsFromCommittee,
deriveAliasOutputByQuorum: deriveAliasOutputByQuorum,
pipeliningLimit: pipeliningLimit,
consensusDelay: consensusDelay,
networkProvider: networkProvider,
trustedNetworkManager: trustedNetworkManager,
chainStateStoreProvider: chainStateStoreProvider,
Expand Down Expand Up @@ -267,6 +270,7 @@ func (c *Chains) activateWithoutLocking(chainID isc.ChainID) error {
func() { c.chainMetricsProvider.UnregisterChain(chainID) },
c.deriveAliasOutputByQuorum,
c.pipeliningLimit,
c.consensusDelay,
)
if err != nil {
chainCancel()
Expand Down
5 changes: 3 additions & 2 deletions tools/cluster/templates/waspconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ var WaspConfig = `
"apiCacheTTL": "5m",
"pullMissingRequestsFromCommittee": true,
"deriveAliasOutputByQuorum": true,
"pipeliningLimit": -1
"pipeliningLimit": -1,
"consensusDelay": "50ms"
},
"rawBlocks": {
"enabled": false,
Expand All @@ -99,7 +100,7 @@ var WaspConfig = `
},
"profilingRecorder": {
"enabled": false
},
},
"prometheus": {
"enabled": true,
"bindAddress": "0.0.0.0:{{.MetricsPort}}",
Expand Down
3 changes: 2 additions & 1 deletion wasp.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ EOH
"apiCacheTTL": "5m",
"pullMissingRequestsFromCommittee": true,
"deriveAliasOutputByQuorum": true,
"pipeliningLimit": -1
"pipeliningLimit": -1,
"consensusDelay": "500ms"
},
"rawBlocks": {
"enabled": false,
Expand Down

0 comments on commit 672fdc5

Please sign in to comment.