From 672fdc508eda7a38b7bcb83ca6bac91736c4770b Mon Sep 17 00:00:00 2001 From: Karolis Petrauskas Date: Wed, 26 Apr 2023 09:45:21 +0300 Subject: [PATCH] ConsensusDelay option introduced. --- Makefile | 3 ++ components/chains/component.go | 1 + components/chains/params.go | 1 + config.json | 3 +- config_defaults.json | 6 ++- documentation/docs/configuration.md | 8 +++- packages/chain/chainmanager/chain_manager.go | 17 ++++++++ .../chain/chainmanager/input_can_propose.go | 18 ++++++++ packages/chain/cmt_log/cmt_log.go | 43 +++++++++++++++---- packages/chain/cmt_log/input_can_propose.go | 17 ++++++++ packages/chain/node.go | 10 +++++ packages/chain/node_test.go | 1 + .../chain/statemanager/state_manager_test.go | 2 +- packages/chains/chains.go | 4 ++ tools/cluster/templates/waspconfig.go | 5 ++- wasp.nomad | 3 +- 16 files changed, 125 insertions(+), 17 deletions(-) create mode 100644 packages/chain/chainmanager/input_can_propose.go create mode 100644 packages/chain/cmt_log/input_can_propose.go diff --git a/Makefile b/Makefile index 5312bed533..67a489f66d 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,9 @@ build: compile-solidity build-cli build-lint: build lint +gendoc: + ./scripts/gendoc.sh + test-full: install go test -tags $(BUILD_TAGS),runheavy ./... --timeout 60m --count 1 -failfast diff --git a/components/chains/component.go b/components/chains/component.go index e4a81be9ee..f74708eb8d 100644 --- a/components/chains/component.go +++ b/components/chains/component.go @@ -96,6 +96,7 @@ func provide(c *dig.Container) error { ParamsChains.PullMissingRequestsFromCommittee, ParamsChains.DeriveAliasOutputByQuorum, ParamsChains.PipeliningLimit, + ParamsChains.ConsensusDelay, deps.NetworkProvider, deps.TrustedNetworkManager, deps.ChainStateDatabaseManager.ChainStateKVStore, diff --git a/components/chains/params.go b/components/chains/params.go index c4e6328ed6..9266305813 100644 --- a/components/chains/params.go +++ b/components/chains/params.go @@ -13,6 +13,7 @@ type ParametersChains struct { PullMissingRequestsFromCommittee bool `default:"true" usage:"whether or not to pull missing requests from other committee members"` DeriveAliasOutputByQuorum bool `default:"true" usage:"false means we propose own AliasOutput, true - by majority vote."` PipeliningLimit int `default:"-1" usage:"-1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1."` + ConsensusDelay time.Duration `default:"500ms" usage:"Minimal delay between consensus runs."` } type ParametersWAL struct { diff --git a/config.json b/config.json index f1d39d3160..b9abab082a 100644 --- a/config.json +++ b/config.json @@ -68,7 +68,8 @@ "apiCacheTTL": "5m", "pullMissingRequestsFromCommittee": true, "deriveAliasOutputByQuorum": true, - "pipeliningLimit": -1 + "pipeliningLimit": -1, + "consensusDelay": "500ms" }, "wal": { "enabled": true, diff --git a/config_defaults.json b/config_defaults.json index b072e4603d..c8e63ca251 100755 --- a/config_defaults.json +++ b/config_defaults.json @@ -68,7 +68,8 @@ "apiCacheTTL": "5m", "pullMissingRequestsFromCommittee": true, "deriveAliasOutputByQuorum": true, - "pipeliningLimit": -1 + "pipeliningLimit": -1, + "consensusDelay": "500ms" }, "wal": { "enabled": true, @@ -122,6 +123,7 @@ "restAPIMetrics": true, "goMetrics": true, "processMetrics": true, - "promhttpMetrics": true + "promhttpMetrics": true, + "webAPIMetrics": true } } diff --git a/documentation/docs/configuration.md b/documentation/docs/configuration.md index d4c2e527ba..d1894f9530 100755 --- a/documentation/docs/configuration.md +++ b/documentation/docs/configuration.md @@ -277,6 +277,7 @@ Example: | pullMissingRequestsFromCommittee | Whether or not to pull missing requests from other committee members | boolean | true | | deriveAliasOutputByQuorum | False means we propose own AliasOutput, true - by majority vote. | boolean | true | | pipeliningLimit | -1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1. | int | -1 | +| consensusDelay | Minimal delay between consensus runs. | string | "500ms" | Example: @@ -288,7 +289,8 @@ Example: "apiCacheTTL": "5m", "pullMissingRequestsFromCommittee": true, "deriveAliasOutputByQuorum": true, - "pipeliningLimit": -1 + "pipeliningLimit": -1, + "consensusDelay": "500ms" } } ``` @@ -445,6 +447,7 @@ Example: | goMetrics | Whether to include go metrics | boolean | true | | processMetrics | Whether to include process metrics | boolean | true | | promhttpMetrics | Whether to include promhttp metrics | boolean | true | +| webAPIMetrics | Whether to include webapi metrics | boolean | true | Example: @@ -464,7 +467,8 @@ Example: "restAPIMetrics": true, "goMetrics": true, "processMetrics": true, - "promhttpMetrics": true + "promhttpMetrics": true, + "webAPIMetrics": true } } ``` diff --git a/packages/chain/chainmanager/chain_manager.go b/packages/chain/chainmanager/chain_manager.go index bee0dd5b86..47a8b1714c 100644 --- a/packages/chain/chainmanager/chain_manager.go +++ b/packages/chain/chainmanager/chain_manager.go @@ -244,6 +244,8 @@ func (cmi *chainMgrImpl) Input(input gpa.Input) gpa.OutMessages { return cmi.handleInputConsensusOutputSkip(input) case *inputConsensusTimeout: return cmi.handleInputConsensusTimeout(input) + case *inputCanPropose: + return cmi.handleInputCanPropose() } panic(fmt.Errorf("unexpected input %T: %+v", input, input)) } @@ -394,6 +396,13 @@ func (cmi *chainMgrImpl) handleInputConsensusTimeout(input *inputConsensusTimeou }) } +func (cmi *chainMgrImpl) handleInputCanPropose() gpa.OutMessages { + cmi.log.Debugf("handleInputCanPropose") + return cmi.withAllCmtLogs(func(cl gpa.GPA) gpa.OutMessages { + return cl.Input(cmt_log.NewInputCanPropose()) + }) +} + // > UPON Reception of CmtLog.NextLI message: // > Forward it to the corresponding CmtLog; HandleCmtLogOutput. func (cmi *chainMgrImpl) handleMsgCmtLog(msg *msgCmtLog) gpa.OutMessages { @@ -542,6 +551,14 @@ func (cmi *chainMgrImpl) withCmtLog(committeeAddr iotago.Ed25519Address, handler return gpa.NoMessages().AddAll(cmi.handleCmtLogOutput(cli, handler(cli.gpaInstance))) } +func (cmi *chainMgrImpl) withAllCmtLogs(handler func(cl gpa.GPA) gpa.OutMessages) gpa.OutMessages { + msgs := gpa.NoMessages() + for _, cli := range cmi.cmtLogs { + msgs.AddAll(cmi.handleCmtLogOutput(cli, handler(cli.gpaInstance))) + } + return msgs +} + // NOTE: ErrNotInCommittee func (cmi *chainMgrImpl) ensureCmtLog(committeeAddr iotago.Ed25519Address) (*cmtLogInst, error) { if cli, ok := cmi.cmtLogs[committeeAddr]; ok { diff --git a/packages/chain/chainmanager/input_can_propose.go b/packages/chain/chainmanager/input_can_propose.go new file mode 100644 index 0000000000..a4de26d6f5 --- /dev/null +++ b/packages/chain/chainmanager/input_can_propose.go @@ -0,0 +1,18 @@ +// Copyright 2020 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +package chainmanager + +import ( + "github.com/iotaledger/wasp/packages/gpa" +) + +type inputCanPropose struct{} + +func NewInputCanPropose() gpa.Input { + return &inputCanPropose{} +} + +func (inp *inputCanPropose) String() string { + return "{chainMgr.inputCanPropose}" +} diff --git a/packages/chain/cmt_log/cmt_log.go b/packages/chain/cmt_log/cmt_log.go index 2ecf8e2027..ce21bc53c2 100644 --- a/packages/chain/cmt_log/cmt_log.go +++ b/packages/chain/cmt_log/cmt_log.go @@ -158,7 +158,9 @@ type cmtLogImpl struct { varLogIndex VarLogIndex // Calculates the current log index. varLocalView VarLocalView // Tracks the pending alias outputs. varRunning VarRunning // Tracks the latest LI. - output *Output // The current request for a consensus. + outputCandidate *Output // We are about to propose this consensus, but we have to wait for time notification. + outputCanPropose bool // True, if the next proposal can be made without waiting more time notifications. + outputProposed *Output // The current request for a consensus. asGPA gpa.GPA // This object, just with all the needed wrappers. log *logger.Logger } @@ -228,6 +230,9 @@ func New( varLogIndex: NewVarLogIndex(nodeIDs, n, f, prevLI, func(li LogIndex, ao *isc.AliasOutputWithID) {}, deriveAOByQuorum, log.Named("VLI")), varLocalView: NewVarLocalView(pipeliningLimit, log.Named("VLV")), varRunning: NewVarRunning(log.Named("VR")), + outputCandidate: nil, + outputCanPropose: true, + outputProposed: nil, log: log, } cl.asGPA = gpa.NewOwnHandler(me, cl) @@ -255,6 +260,8 @@ func (cl *cmtLogImpl) Input(input gpa.Input) gpa.OutMessages { return cl.handleInputConsensusOutputRejected(input) case *inputConsensusTimeout: return cl.handleInputConsensusTimeout(input) + case *inputCanPropose: + return cl.handleInputCanPropose() case *inputSuspend: return cl.handleInputSuspend() } @@ -331,12 +338,25 @@ func (cl *cmtLogImpl) handleInputConsensusTimeout(input *inputConsensusTimeout) return nil } +func (cl *cmtLogImpl) handleInputCanPropose() gpa.OutMessages { + if cl.outputProposed == nil && cl.outputCandidate != nil { + // Proposal is already pending, so we output it. + // Then we already used this allowance, thus keep the can_propose false. + cl.outputProposed = cl.outputCandidate + cl.outputCanPropose = false + return nil + } + cl.outputCanPropose = true + return nil +} + // > ON Suspend: // > ... func (cl *cmtLogImpl) handleInputSuspend() gpa.OutMessages { cl.log.Infof("Committee suspended.") cl.suspended = true - cl.output = nil + cl.outputCandidate = nil + cl.outputProposed = nil return cl.tryProposeConsensus(nil) } @@ -349,16 +369,16 @@ func (cl *cmtLogImpl) handleMsgNextLogIndex(msg *msgNextLogIndex) gpa.OutMessage // Implements the gpa.GPA interface. func (cl *cmtLogImpl) Output() gpa.Output { - if cl.output == nil { + if cl.outputProposed == nil { return nil // Untyped nil! } - return cl.output + return cl.outputProposed } // Implements the gpa.GPA interface. func (cl *cmtLogImpl) StatusString() string { vliLI, _ := cl.varLogIndex.Value() - return fmt.Sprintf("{cmtLogImpl, LogIndex=%v, output=%+v, %v, %v}", vliLI, cl.output, cl.varLocalView.StatusString(), cl.varLogIndex.StatusString()) + return fmt.Sprintf("{cmtLogImpl, LogIndex=%v, output=%+v, %v, %v}", vliLI, cl.outputProposed, cl.varLocalView.StatusString(), cl.varLogIndex.StatusString()) } // > PROCEDURE TryProposeConsensus: @@ -380,7 +400,7 @@ func (cl *cmtLogImpl) tryProposeConsensus(msgs gpa.OutMessages) gpa.OutMessages } // // Check, maybe it is already started. - if cl.output != nil && cl.output.logIndex == logIndex { + if cl.outputCandidate != nil && cl.outputCandidate.logIndex == logIndex { // Already started, keep it as is. return msgs } @@ -405,12 +425,19 @@ func (cl *cmtLogImpl) tryProposeConsensus(msgs gpa.OutMessages) gpa.OutMessages // // Start the consensus (ask the upper layer to start it). cl.consensusLI = logIndex - cl.output = makeOutput(cl.consensusLI, baseAO) + cl.outputCandidate = makeOutput(cl.consensusLI, baseAO) + if cl.outputCanPropose { + cl.outputProposed = cl.outputCandidate + cl.outputCanPropose = false + } else { + cl.outputProposed = nil + } cl.varRunning.ConsensusProposed(logIndex) } else { // > ELSE // > Don't propose any consensus. - cl.output = nil // Outdated, clear it away. + cl.outputCandidate = nil // Outdated, clear it away. + cl.outputProposed = nil } return msgs } diff --git a/packages/chain/cmt_log/input_can_propose.go b/packages/chain/cmt_log/input_can_propose.go new file mode 100644 index 0000000000..87f31ecd73 --- /dev/null +++ b/packages/chain/cmt_log/input_can_propose.go @@ -0,0 +1,17 @@ +package cmt_log + +import "github.com/iotaledger/wasp/packages/gpa" + +// This event is introduced to avoid too-often consensus runs. +// They can produce more blocks than the PoV allows to confirm them. +// With this event the consensus will be proposed / started with +// a maximal rate defined by these events. +type inputCanPropose struct{} + +func NewInputCanPropose() gpa.Input { + return &inputCanPropose{} +} + +func (inp *inputCanPropose) String() string { + return "{cmtLog.inputCanPropose}" +} diff --git a/packages/chain/node.go b/packages/chain/node.go index 11533851b3..e820cbec1c 100644 --- a/packages/chain/node.go +++ b/packages/chain/node.go @@ -165,6 +165,9 @@ type chainNodeImpl struct { stateTrackerCnf StateTracker blockWAL sm_gpa_utils.BlockWAL // + // Configuration values. + consensusDelay time.Duration + // // Information for other components. listener ChainListener // Object expecting event notifications. accessLock *sync.RWMutex // Mutex for accessing informative fields from other threads. @@ -267,6 +270,7 @@ func New( onChainDisconnect func(), deriveAliasOutputByQuorum bool, pipeliningLimit int, + consensusDelay time.Duration, ) (Chain, error) { log.Debugf("Starting the chain, chainID=%v", chainID) if listener == nil { @@ -297,6 +301,7 @@ func New( stateTrackerAct: nil, // Set bellow. stateTrackerCnf: nil, // Set bellow. blockWAL: blockWAL, + consensusDelay: consensusDelay, listener: listener, accessLock: &sync.RWMutex{}, activeCommitteeDKShare: nil, @@ -492,6 +497,7 @@ func (cni *chainNodeImpl) run(ctx context.Context, cleanupFunc context.CancelFun consRecoverPipeOutCh := cni.consRecoverPipe.Out() serversUpdatedPipeOutCh := cni.serversUpdatedPipe.Out() redeliveryPeriodTicker := time.NewTicker(redeliveryPeriod) + consensusDelayTicker := time.NewTicker(cni.consensusDelay) for { if ctx.Err() != nil { if cni.shutdownCoordinator == nil { @@ -571,8 +577,12 @@ func (cni *chainNodeImpl) run(ctx context.Context, cleanupFunc context.CancelFun if ok { cni.stateTrackerCnf.ChainNodeStateMgrResponse(resp) } + case <-consensusDelayTicker.C: + cni.sendMessages(cni.chainMgr.Input(chainmanager.NewInputCanPropose())) + cni.handleChainMgrOutput(ctx, cni.chainMgr.Output()) case t := <-redeliveryPeriodTicker.C: cni.sendMessages(cni.chainMgr.Input(cni.chainMgr.MakeTickInput(t))) + cni.handleChainMgrOutput(ctx, cni.chainMgr.Output()) case <-ctx.Done(): continue } diff --git a/packages/chain/node_test.go b/packages/chain/node_test.go index 4b2849ac31..7089f89020 100644 --- a/packages/chain/node_test.go +++ b/packages/chain/node_test.go @@ -458,6 +458,7 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv { nil, true, -1, + 10*time.Millisecond, ) require.NoError(t, err) te.nodes[i].ServersUpdated(te.peerPubKeys) diff --git a/packages/chain/statemanager/state_manager_test.go b/packages/chain/statemanager/state_manager_test.go index ef0037dc24..7c8d593ffc 100644 --- a/packages/chain/statemanager/state_manager_test.go +++ b/packages/chain/statemanager/state_manager_test.go @@ -177,7 +177,7 @@ func TestCruelWorld(t *testing.T) { } func getRandomProducedBlockAIndex(blockProduced []*atomic.Bool) int { - //nolint:revive // we ingore the empty-block here because we wait for blockProduced 0 to become true + //nolint:revive // we ignore the empty-block here because we wait for blockProduced 0 to become true for !blockProduced[0].Load() { } var maxIndex int diff --git a/packages/chains/chains.go b/packages/chains/chains.go index 5fb0e15ff1..24b89bbad9 100644 --- a/packages/chains/chains.go +++ b/packages/chains/chains.go @@ -50,6 +50,7 @@ type Chains struct { pullMissingRequestsFromCommittee bool deriveAliasOutputByQuorum bool pipeliningLimit int + consensusDelay time.Duration networkProvider peering.NetworkProvider trustedNetworkManager peering.TrustedNetworkManager @@ -88,6 +89,7 @@ func New( pullMissingRequestsFromCommittee bool, // TODO: Unused for now. deriveAliasOutputByQuorum bool, pipeliningLimit int, + consensusDelay time.Duration, networkProvider peering.NetworkProvider, trustedNetworkManager peering.TrustedNetworkManager, chainStateStoreProvider database.ChainStateKVStoreProvider, @@ -111,6 +113,7 @@ func New( pullMissingRequestsFromCommittee: pullMissingRequestsFromCommittee, deriveAliasOutputByQuorum: deriveAliasOutputByQuorum, pipeliningLimit: pipeliningLimit, + consensusDelay: consensusDelay, networkProvider: networkProvider, trustedNetworkManager: trustedNetworkManager, chainStateStoreProvider: chainStateStoreProvider, @@ -267,6 +270,7 @@ func (c *Chains) activateWithoutLocking(chainID isc.ChainID) error { func() { c.chainMetricsProvider.UnregisterChain(chainID) }, c.deriveAliasOutputByQuorum, c.pipeliningLimit, + c.consensusDelay, ) if err != nil { chainCancel() diff --git a/tools/cluster/templates/waspconfig.go b/tools/cluster/templates/waspconfig.go index 431da84a43..f3bc7067c5 100644 --- a/tools/cluster/templates/waspconfig.go +++ b/tools/cluster/templates/waspconfig.go @@ -87,7 +87,8 @@ var WaspConfig = ` "apiCacheTTL": "5m", "pullMissingRequestsFromCommittee": true, "deriveAliasOutputByQuorum": true, - "pipeliningLimit": -1 + "pipeliningLimit": -1, + "consensusDelay": "50ms" }, "rawBlocks": { "enabled": false, @@ -99,7 +100,7 @@ var WaspConfig = ` }, "profilingRecorder": { "enabled": false - }, + }, "prometheus": { "enabled": true, "bindAddress": "0.0.0.0:{{.MetricsPort}}", diff --git a/wasp.nomad b/wasp.nomad index 92c1be5840..7d40b18526 100644 --- a/wasp.nomad +++ b/wasp.nomad @@ -217,7 +217,8 @@ EOH "apiCacheTTL": "5m", "pullMissingRequestsFromCommittee": true, "deriveAliasOutputByQuorum": true, - "pipeliningLimit": -1 + "pipeliningLimit": -1, + "consensusDelay": "500ms" }, "rawBlocks": { "enabled": false,