diff --git a/.golangci.yml b/.golangci.yml index 119a19445c..f4bf8be97a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -145,7 +145,6 @@ linters: - errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases - exportloopref # Checks for pointers to enclosing loop variables. - funlen # Tool for detection of long functions. - - goconst # Finds repeated strings that could be replaced by a constant. - gocritic # Provides many diagnostics that check for bugs, performance and style issues. - gocyclo # Computes and checks the cyclomatic complexity of functions. - goerr113 # Golang linter to check the errors handling expressions. @@ -186,6 +185,7 @@ linters: - wastedassign # wastedassign finds wasted assignment statements. [fast: true, auto-fix: false] + # - goconst # Finds repeated strings that could be replaced by a constant. # - depguard # Go linter that checks if package imports are in a list of acceptable packages [fast: true, auto-fix: false] # nlreturn # nlreturn checks for a new line before return and branch statements to increase code clarity [fast: true, auto-fix: false] # don't enable: diff --git a/components/chains/component.go b/components/chains/component.go index 7b4cd968ee..7adb476387 100644 --- a/components/chains/component.go +++ b/components/chains/component.go @@ -11,6 +11,7 @@ import ( hiveshutdown "github.com/iotaledger/hive.go/app/shutdown" "github.com/iotaledger/wasp/packages/chain" "github.com/iotaledger/wasp/packages/chain/cmt_log" + "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/chains" "github.com/iotaledger/wasp/packages/daemon" "github.com/iotaledger/wasp/packages/database" @@ -128,7 +129,15 @@ func provide(c *dig.Container) error { deps.NodeIdentityProvider, deps.ConsensusStateRegistry, deps.ChainListener, - ParamsChains.MempoolTTL, + mempool.Settings{ + TTL: ParamsChains.MempoolTTL, + OnLedgerRefreshMinInterval: ParamsChains.MempoolOnLedgerRefreshMinInterval, + MaxOffledgerInPool: ParamsChains.MempoolMaxOffledgerInPool, + MaxOnledgerInPool: ParamsChains.MempoolMaxOnledgerInPool, + MaxTimedInPool: ParamsChains.MempoolMaxTimedInPool, + MaxOnledgerToPropose: ParamsChains.MempoolMaxOnledgerToPropose, + MaxOffledgerToPropose: ParamsChains.MempoolMaxOffledgerToPropose, + }, ParamsChains.BroadcastInterval, shutdown.NewCoordinator("chains", Component.Logger().Named("Shutdown")), deps.ChainMetricsProvider, diff --git a/components/chains/params.go b/components/chains/params.go index a74125dac9..3e325128bd 100644 --- a/components/chains/params.go +++ b/components/chains/params.go @@ -7,20 +7,26 @@ import ( ) type ParametersChains struct { - BroadcastUpToNPeers int `default:"2" usage:"number of peers an offledger request is broadcasted to"` - BroadcastInterval time.Duration `default:"0s" usage:"time between re-broadcast of offledger requests; 0 value means that re-broadcasting is disabled"` - APICacheTTL time.Duration `default:"300s" usage:"time to keep processed offledger requests in api cache"` - PullMissingRequestsFromCommittee bool `default:"true" usage:"whether or not to pull missing requests from other committee members"` - DeriveAliasOutputByQuorum bool `default:"true" usage:"false means we propose own AliasOutput, true - by majority vote."` - PipeliningLimit int `default:"-1" usage:"-1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1."` - PostponeRecoveryMilestones int `default:"3" usage:"number of milestones to wait until a chain transition is considered as rejected"` - ConsensusDelay time.Duration `default:"500ms" usage:"Minimal delay between consensus runs."` - RecoveryTimeout time.Duration `default:"20s" usage:"Time after which another consensus attempt is made."` - RedeliveryPeriod time.Duration `default:"2s" usage:"the resend period for msg."` - PrintStatusPeriod time.Duration `default:"3s" usage:"the period to print consensus instance status."` - ConsensusInstsInAdvance int `default:"3" usage:""` - AwaitReceiptCleanupEvery int `default:"100" usage:"for every this number AwaitReceipt will be cleaned up"` - MempoolTTL time.Duration `default:"24h" usage:"Time that requests are allowed to sit in the mempool without being processed"` + BroadcastUpToNPeers int `default:"2" usage:"number of peers an offledger request is broadcasted to"` + BroadcastInterval time.Duration `default:"0s" usage:"time between re-broadcast of offledger requests; 0 value means that re-broadcasting is disabled"` + APICacheTTL time.Duration `default:"300s" usage:"time to keep processed offledger requests in api cache"` + PullMissingRequestsFromCommittee bool `default:"true" usage:"whether or not to pull missing requests from other committee members"` + DeriveAliasOutputByQuorum bool `default:"true" usage:"false means we propose own AliasOutput, true - by majority vote."` + PipeliningLimit int `default:"-1" usage:"-1 -- infinite, 0 -- disabled, X -- build the chain if there is up to X transactions unconfirmed by L1."` + PostponeRecoveryMilestones int `default:"3" usage:"number of milestones to wait until a chain transition is considered as rejected"` + ConsensusDelay time.Duration `default:"500ms" usage:"Minimal delay between consensus runs."` + RecoveryTimeout time.Duration `default:"20s" usage:"Time after which another consensus attempt is made."` + RedeliveryPeriod time.Duration `default:"2s" usage:"the resend period for msg."` + PrintStatusPeriod time.Duration `default:"3s" usage:"the period to print consensus instance status."` + ConsensusInstsInAdvance int `default:"3" usage:""` + AwaitReceiptCleanupEvery int `default:"100" usage:"for every this number AwaitReceipt will be cleaned up"` + MempoolTTL time.Duration `default:"24h" usage:"Time that requests are allowed to sit in the mempool without being processed"` + MempoolMaxOffledgerInPool int `default:"2000" usage:"Maximum number of off-ledger requests kept in the mempool"` + MempoolMaxOnledgerInPool int `default:"1000" usage:"Maximum number of on-ledger requests kept in the mempool"` + MempoolMaxTimedInPool int `default:"100" usage:"Maximum number of timed on-ledger requests kept in the mempool"` + MempoolMaxOffledgerToPropose int `default:"500" usage:"Maximum number of off-ledger requests to propose for the next block"` + MempoolMaxOnledgerToPropose int `default:"100" usage:"Maximum number of on-ledger requests to propose for the next block (includes timed requests)"` + MempoolOnLedgerRefreshMinInterval time.Duration `default:"10m" usage:"Minimum interval to try to refresh the list of on-ledger requests after some have been dropped from the pool (this interval is introduced to avoid dropping/refreshing cycle if there are too many requests on L1 to process)"` } type ParametersWAL struct { diff --git a/packages/chain/mempool/mempool.go b/packages/chain/mempool/mempool.go index ba31817b92..53d684e477 100644 --- a/packages/chain/mempool/mempool.go +++ b/packages/chain/mempool/mempool.go @@ -46,6 +46,7 @@ import ( "context" "fmt" "io" + "slices" "time" "github.com/samber/lo" @@ -113,16 +114,14 @@ type Mempool interface { GetContents() io.Reader } -type RequestPool[V isc.Request] interface { - Has(reqRef *isc.RequestRef) bool - Get(reqRef *isc.RequestRef) V - Add(request V) - Remove(request V) - // this removes requests from the pool if predicate returns false - Filter(predicate func(request V, ts time.Time) bool) - Iterate(f func(e *typedPoolEntry[V])) - StatusString() string - WriteContent(io.Writer) +type Settings struct { + TTL time.Duration // time to live (how much time requests are allowed to sit in the pool without being processed) + OnLedgerRefreshMinInterval time.Duration + MaxOffledgerInPool int + MaxOnledgerInPool int + MaxTimedInPool int + MaxOnledgerToPropose int // (including timed-requests) + MaxOffledgerToPropose int } // This implementation tracks single branch of the chain only. I.e. all the consensus @@ -134,9 +133,9 @@ type RequestPool[V isc.Request] interface { type mempoolImpl struct { chainID isc.ChainID tangleTime time.Time - timePool TimePool - onLedgerPool RequestPool[isc.OnLedgerRequest] - offLedgerPool *TypedPoolByNonce[isc.OffLedgerRequest] + timePool TimePool // TODO limit this pool + onLedgerPool RequestPool[isc.OnLedgerRequest] // TODO limit this pool + offLedgerPool *OffLedgerPool distSync gpa.GPA chainHeadAO *isc.AliasOutputWithID chainHeadState state.State @@ -160,11 +159,13 @@ type mempoolImpl struct { netPeerPubs map[gpa.NodeID]*cryptolib.PublicKey net peering.NetworkProvider activeConsensusInstances []consGR.ConsensusID - ttl time.Duration // time to live (how much time requests are allowed to sit in the pool without being processed) + settings Settings broadcastInterval time.Duration // how often requests should be rebroadcasted log *logger.Logger metrics *metrics.ChainMempoolMetrics listener ChainListener + refreshOnLedgerRequests func() + lastRefreshTimestamp time.Time } var _ Mempool = &mempoolImpl{} @@ -223,17 +224,18 @@ func New( metrics *metrics.ChainMempoolMetrics, pipeMetrics *metrics.ChainPipeMetrics, listener ChainListener, - ttl time.Duration, + settings Settings, broadcastInterval time.Duration, + refreshOnLedgerRequests func(), ) Mempool { netPeeringID := peering.HashPeeringIDFromBytes(chainID.Bytes(), []byte("Mempool")) // ChainID × Mempool waitReq := NewWaitReq(waitRequestCleanupEvery) mpi := &mempoolImpl{ chainID: chainID, tangleTime: time.Time{}, - timePool: NewTimePool(metrics.SetTimePoolSize, log.Named("TIM")), - onLedgerPool: NewTypedPool[isc.OnLedgerRequest](waitReq, metrics.SetOnLedgerPoolSize, metrics.SetOnLedgerReqTime, log.Named("ONL")), - offLedgerPool: NewTypedPoolByNonce[isc.OffLedgerRequest](waitReq, metrics.SetOffLedgerPoolSize, metrics.SetOffLedgerReqTime, log.Named("OFF")), + timePool: NewTimePool(settings.MaxTimedInPool, metrics.SetTimePoolSize, log.Named("TIM")), + onLedgerPool: NewTypedPool[isc.OnLedgerRequest](settings.MaxOnledgerInPool, waitReq, metrics.SetOnLedgerPoolSize, metrics.SetOnLedgerReqTime, log.Named("ONL")), + offLedgerPool: NewOffledgerPool(settings.MaxOffledgerInPool, waitReq, metrics.SetOffLedgerPoolSize, metrics.SetOffLedgerReqTime, log.Named("OFF")), chainHeadAO: nil, serverNodesUpdatedPipe: pipe.NewInfinitePipe[*reqServerNodesUpdated](), serverNodes: []*cryptolib.PublicKey{}, @@ -254,11 +256,13 @@ func New( net: net, consensusInstancesUpdatedPipe: pipe.NewInfinitePipe[*reqConsensusInstancesUpdated](), activeConsensusInstances: []consGR.ConsensusID{}, - ttl: ttl, + settings: settings, broadcastInterval: broadcastInterval, log: log, metrics: metrics, listener: listener, + refreshOnLedgerRequests: refreshOnLedgerRequests, + lastRefreshTimestamp: time.Now(), } pipeMetrics.TrackPipeLen("mp-serverNodesUpdatedPipe", mpi.serverNodesUpdatedPipe.Len) @@ -382,7 +386,7 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc) debugTicker := time.NewTicker(distShareDebugTick) timeTicker := time.NewTicker(distShareTimeTick) rePublishTicker := time.NewTicker(distShareRePublishTick) - forceCleanMempoolTicker := time.NewTicker(forceCleanMempoolTick) + forceCleanMempoolTicker := time.NewTicker(forceCleanMempoolTick) // this exists to force mempool cleanup on access nodes // thought: maybe access nodes shouldn't have a mempool at all for { select { case recv, ok := <-serverNodesUpdatedPipeOutCh: @@ -545,6 +549,12 @@ func (mpi *mempoolImpl) shouldAddOffledgerRequest(req isc.OffLedgerRequest) erro return fmt.Errorf("no funds on chain") } } + + // reject txs with gas price too low + if gp := req.GasPrice(); gp != nil && gp.Cmp(mpi.offLedgerPool.minGasPrice) == -1 { + return fmt.Errorf("gas price too low. Must be at least %s", mpi.offLedgerPool.minGasPrice.String()) + } + return nil } @@ -584,30 +594,43 @@ func (mpi *mempoolImpl) handleConsensusProposal(recv *reqConsensusProposal) { mpi.handleConsensusProposalForChainHead(recv) } +//nolint:gocyclo func (mpi *mempoolImpl) refsToPropose(consensusID consGR.ConsensusID) []*isc.RequestRef { // // The case for matching ChainHeadAO and request BaseAO reqRefs := []*isc.RequestRef{} if !mpi.tangleTime.IsZero() { // Wait for tangle-time to process the on ledger requests. - mpi.onLedgerPool.Filter(func(request isc.OnLedgerRequest, _ time.Time) bool { - if isc.RequestIsExpired(request, mpi.tangleTime) { - return false // Drop it from the mempool + mpi.onLedgerPool.Iterate(func(e *typedPoolEntry[isc.OnLedgerRequest]) bool { + if isc.RequestIsExpired(e.req, mpi.tangleTime) { + mpi.onLedgerPool.Remove(e.req) // Drop it from the mempool + return true } - if isc.RequestIsUnlockable(request, mpi.chainID.AsAddress(), mpi.tangleTime) { - reqRefs = append(reqRefs, isc.RequestRefFromRequest(request)) + if isc.RequestIsUnlockable(e.req, mpi.chainID.AsAddress(), mpi.tangleTime) { + reqRefs = append(reqRefs, isc.RequestRefFromRequest(e.req)) + e.proposedFor = append(e.proposedFor, consensusID) } - return true // Keep them for now + if len(reqRefs) >= mpi.settings.MaxOnledgerToPropose { + return false + } + return true }) } - mpi.offLedgerPool.Iterate(func(account string, entries []*OrderedPoolEntry[isc.OffLedgerRequest]) { - agentID, err := isc.AgentIDFromString(account) - if err != nil { - panic(fmt.Errorf("invalid agentID string: %s", err.Error())) - } - accountNonce := mpi.nonce(agentID) - for _, e := range entries { - if time.Since(e.ts) > mpi.ttl { // stop proposing after TTL + // + // iterate the ordered txs and add the first valid ones (respect nonce) to propose + // stop iterating when either: got MaxOffledgerToPropose, or no requests were added during last iteration (there are gaps in nonces) + accNonces := make(map[string]uint64) // cache of account nonces so we don't propose gaps + orderedList := slices.Clone(mpi.offLedgerPool.orderedByGasPrice) // clone the ordered list of references to requests, so we can alter it safely + for { + added := 0 + addedThisCycle := false + for i, e := range orderedList { + if e == nil { + continue + } + // + // drop tx with expired TTL + if time.Since(e.ts) > mpi.settings.TTL { // stop proposing after TTL if !lo.Some(mpi.consensusInstances, e.proposedFor) { // request not used in active consensus anymore, remove it mpi.log.Debugf("refsToPropose, request TTL expired, removing: %s", e.req.ID().String()) @@ -620,31 +643,52 @@ func (mpi *mempoolImpl) refsToPropose(consensusID consGR.ConsensusID) []*isc.Req if e.old { // this request was marked as "old", do not propose it - mpi.log.Debugf("refsToPropose, account: %s, skipping old request: %s", account, e.req.ID().String()) + mpi.log.Debugf("refsToPropose, skipping old request: %s", e.req.ID().String()) continue } + reqAccount := e.req.SenderAccount() + reqAccountKey := reqAccount.String() + accountNonce, ok := accNonces[reqAccountKey] + if !ok { + accountNonce = mpi.nonce(reqAccount) + accNonces[reqAccountKey] = accountNonce + } + reqNonce := e.req.Nonce() if reqNonce < accountNonce { // nonce too old, delete - mpi.log.Debugf("refsToPropose, account: %s, removing request (%s) with old nonce (%d) from the pool", account, e.req.ID(), e.req.Nonce()) + mpi.log.Debugf("refsToPropose, account: %s, removing request (%s) with old nonce (%d) from the pool", reqAccount, e.req.ID(), e.req.Nonce()) mpi.offLedgerPool.Remove(e.req) continue } if reqNonce == accountNonce { // expected nonce, add it to the list to propose - mpi.log.Debugf("refsToPropose, account: %s, proposing reqID %s with nonce: %d", account, e.req.ID().String(), e.req.Nonce()) + mpi.log.Debugf("refsToPropose, account: %s, proposing reqID %s with nonce: %d", reqAccount, e.req.ID().String(), e.req.Nonce()) reqRefs = append(reqRefs, isc.RequestRefFromRequest(e.req)) e.proposedFor = append(e.proposedFor, consensusID) + addedThisCycle = true + added++ accountNonce++ // increment the account nonce to match the next valid request + accNonces[reqAccountKey] = accountNonce + // delete from this list + orderedList[i] = nil + } + + if added >= mpi.settings.MaxOffledgerToPropose { + break // got enough requests } + if reqNonce > accountNonce { - mpi.log.Debugf("refsToPropose, account: %s, req %s has a nonce %d which is too high (expected %d), won't be proposed", account, e.req.ID().String(), e.req.Nonce(), accountNonce) - return // no more valid nonces for this account, continue to the next account + mpi.log.Debugf("refsToPropose, account: %s, req %s has a nonce %d which is too high (expected %d), won't be proposed", reqAccount, e.req.ID().String(), e.req.Nonce(), accountNonce) + continue // skip request } } - }) + if !addedThisCycle || (added >= mpi.settings.MaxOffledgerToPropose) { + break + } + } return reqRefs } @@ -769,23 +813,16 @@ func (mpi *mempoolImpl) handleTangleTimeUpdated(tangleTime time.Time) { // // Add requests from time locked pool. reqs := mpi.timePool.TakeTill(tangleTime) - for i := range reqs { - switch req := reqs[i].(type) { - case isc.OnLedgerRequest: - mpi.onLedgerPool.Add(req) - mpi.metrics.IncRequestsReceived(req) - case isc.OffLedgerRequest: - mpi.offLedgerPool.Add(req) - mpi.metrics.IncRequestsReceived(req) - default: - panic(fmt.Errorf("unexpected request type: %T, %+v", req, req)) - } + for _, req := range reqs { + mpi.onLedgerPool.Add(req) + mpi.metrics.IncRequestsReceived(req) } // // Notify existing on-ledger requests if that's first time update. if oldTangleTime.IsZero() { - mpi.onLedgerPool.Iterate(func(e *typedPoolEntry[isc.OnLedgerRequest]) { + mpi.onLedgerPool.Iterate(func(e *typedPoolEntry[isc.OnLedgerRequest]) bool { mpi.waitReq.MarkAvailable(e.req) + return true }) } } @@ -797,6 +834,7 @@ func (mpi *mempoolImpl) handleTangleTimeUpdated(tangleTime time.Time) { func (mpi *mempoolImpl) handleTrackNewChainHead(req *reqTrackNewChainHead) { defer close(req.responseCh) mpi.log.Debugf("handleTrackNewChainHead, %v from %v, current=%v", req.till, req.from, mpi.chainHeadAO) + if len(req.removed) != 0 { mpi.log.Infof("Reorg detected, removing %v blocks, adding %v blocks", len(req.removed), len(req.added)) // TODO: For IOTA 2.0: Maybe re-read the state from L1 (when reorgs will become possible). @@ -864,6 +902,9 @@ func (mpi *mempoolImpl) handleTrackNewChainHead(req *reqTrackNewChainHead) { } mpi.waitChainHead = newWaitChainHead } + + // update defaultGasPrice for offLedger requests + mpi.offLedgerPool.SetMinGasPrice(governance.NewStateAccess(mpi.chainHeadState).DefaultGasPrice()) } func (mpi *mempoolImpl) handleNetMessage(recv *peering.PeerMessageIn) { @@ -897,18 +938,26 @@ func (mpi *mempoolImpl) handleRePublishTimeTick() { return // re-broadcasting is disabled } retryOlder := time.Now().Add(-mpi.broadcastInterval) - mpi.offLedgerPool.Filter(func(request isc.OffLedgerRequest, ts time.Time) bool { + mpi.offLedgerPool.Cleanup(func(request isc.OffLedgerRequest, ts time.Time) bool { if ts.Before(retryOlder) { mpi.sendMessages(mpi.distSync.Input(distsync.NewInputPublishRequest(request))) } return true }) + + // periodically try to refresh On-ledger requests that might have been dropped + if time.Since(mpi.lastRefreshTimestamp) > mpi.settings.OnLedgerRefreshMinInterval { + if mpi.onLedgerPool.ShouldRefreshRequests() || mpi.timePool.ShouldRefreshRequests() { + mpi.refreshOnLedgerRequests() + mpi.lastRefreshTimestamp = time.Now() + } + } } func (mpi *mempoolImpl) handleForceCleanMempool() { - mpi.offLedgerPool.Iterate(func(account string, entries []*OrderedPoolEntry[isc.OffLedgerRequest]) { + mpi.offLedgerPool.Iterate(func(account string, entries []*OrderedPoolEntry) { for _, e := range entries { - if time.Since(e.ts) > mpi.ttl && !lo.Some(mpi.consensusInstances, e.proposedFor) { + if time.Since(e.ts) > mpi.settings.TTL && !lo.Some(mpi.consensusInstances, e.proposedFor) { mpi.log.Debugf("handleForceCleanMempool, request TTL expired, removing: %s", e.req.ID().String()) mpi.offLedgerPool.Remove(e.req) } @@ -948,9 +997,9 @@ func (mpi *mempoolImpl) tryRemoveRequest(req isc.Request) { } func (mpi *mempoolImpl) tryCleanupProcessed(chainState state.State) { - mpi.onLedgerPool.Filter(unprocessedPredicate[isc.OnLedgerRequest](chainState, mpi.log)) - mpi.offLedgerPool.Filter(unprocessedPredicate[isc.OffLedgerRequest](chainState, mpi.log)) - mpi.timePool.Filter(unprocessedPredicate[isc.Request](chainState, mpi.log)) + mpi.onLedgerPool.Cleanup(unprocessedPredicate[isc.OnLedgerRequest](chainState, mpi.log)) + mpi.offLedgerPool.Cleanup(unprocessedPredicate[isc.OffLedgerRequest](chainState, mpi.log)) + mpi.timePool.Cleanup(unprocessedPredicate[isc.OnLedgerRequest](chainState, mpi.log)) } func (mpi *mempoolImpl) sendMessages(outMsgs gpa.OutMessages) { diff --git a/packages/chain/mempool/mempool_test.go b/packages/chain/mempool/mempool_test.go index fd92460a3c..06bcc10a43 100644 --- a/packages/chain/mempool/mempool_test.go +++ b/packages/chain/mempool/mempool_test.go @@ -516,14 +516,14 @@ func TestMempoolsNonceGaps(t *testing.T) { askProposalExpectReqs := func(ao *isc.AliasOutputWithID, reqs ...isc.Request) *isc.AliasOutputWithID { t.Log("Ask for proposals") - proposals := make([]<-chan []*isc.RequestRef, len(te.mempools)) + proposalCh := make([]<-chan []*isc.RequestRef, len(te.mempools)) for i, node := range te.mempools { - proposals[i] = node.ConsensusProposalAsync(te.ctx, ao, consGR.ConsensusID{}) + proposalCh[i] = node.ConsensusProposalAsync(te.ctx, ao, consGR.ConsensusID{}) } t.Log("Wait for proposals and ask for decided requests") decided := make([]<-chan []isc.Request, len(te.mempools)) for i, node := range te.mempools { - proposal := <-proposals[i] + proposal := <-proposalCh[i] require.Len(t, proposal, len(reqs)) decided[i] = node.ConsensusRequestsAsync(te.ctx, proposal) } @@ -679,8 +679,16 @@ func TestTTL(t *testing.T) { chainMetrics.Mempool, chainMetrics.Pipe, chain.NewEmptyChainListener(), - 200*time.Millisecond, // 200ms TTL + mempool.Settings{ + TTL: 200 * time.Millisecond, + MaxOffledgerInPool: 1000, + MaxOnledgerInPool: 1000, + MaxTimedInPool: 1000, + MaxOnledgerToPropose: 1000, + MaxOffledgerToPropose: 1000, + }, 1*time.Second, + func() {}, ) defer te.close() start := time.Now() @@ -804,8 +812,16 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv { chainMetrics.Mempool, chainMetrics.Pipe, chain.NewEmptyChainListener(), - 24*time.Hour, + mempool.Settings{ + TTL: 24 * time.Hour, + MaxOffledgerInPool: 1000, + MaxOnledgerInPool: 1000, + MaxTimedInPool: 1000, + MaxOnledgerToPropose: 1000, + MaxOffledgerToPropose: 1000, + }, 1*time.Second, + func() {}, ) } return te diff --git a/packages/chain/mempool/offledger_pool.go b/packages/chain/mempool/offledger_pool.go new file mode 100644 index 0000000000..ef06097874 --- /dev/null +++ b/packages/chain/mempool/offledger_pool.go @@ -0,0 +1,304 @@ +// Copyright 2020 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +package mempool + +import ( + "fmt" + "io" + "math/big" + "slices" + "time" + + "github.com/samber/lo" + + "github.com/iotaledger/hive.go/ds/shrinkingmap" + "github.com/iotaledger/hive.go/logger" + consGR "github.com/iotaledger/wasp/packages/chain/cons/cons_gr" + "github.com/iotaledger/wasp/packages/isc" + "github.com/iotaledger/wasp/packages/kv/codec" +) + +// keeps a map of requests ordered by nonce for each account +type OffLedgerPool struct { + waitReq WaitReq + refLUT *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *OrderedPoolEntry] + // reqsByAcountOrdered keeps an ordered map of reqsByAcountOrdered for each account by nonce + reqsByAcountOrdered *shrinkingmap.ShrinkingMap[string, []*OrderedPoolEntry] // string is isc.AgentID.String() + // orderedByGasPrice keeps a list ordered by the highest gas price + orderedByGasPrice []*OrderedPoolEntry // TODO use a better data structure instead!!! (probably RedBlackTree) + minGasPrice *big.Int + maxPoolSize int + sizeMetric func(int) + timeMetric func(time.Duration) + log *logger.Logger +} + +func NewOffledgerPool(maxPoolSize int, waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) *OffLedgerPool { + return &OffLedgerPool{ + waitReq: waitReq, + refLUT: shrinkingmap.New[isc.RequestRefKey, *OrderedPoolEntry](), + reqsByAcountOrdered: shrinkingmap.New[string, []*OrderedPoolEntry](), + orderedByGasPrice: []*OrderedPoolEntry{}, + minGasPrice: big.NewInt(1), + maxPoolSize: maxPoolSize, + sizeMetric: sizeMetric, + timeMetric: timeMetric, + log: log, + } +} + +type OrderedPoolEntry struct { + req isc.OffLedgerRequest + old bool + ts time.Time + proposedFor []consGR.ConsensusID +} + +func (p *OffLedgerPool) Has(reqRef *isc.RequestRef) bool { + return p.refLUT.Has(reqRef.AsKey()) +} + +func (p *OffLedgerPool) Get(reqRef *isc.RequestRef) isc.OffLedgerRequest { + entry, exists := p.refLUT.Get(reqRef.AsKey()) + if !exists { + return isc.OffLedgerRequest(nil) + } + return entry.req +} + +func (p *OffLedgerPool) Add(request isc.OffLedgerRequest) { + ref := isc.RequestRefFromRequest(request) + entry := &OrderedPoolEntry{req: request, ts: time.Now()} + account := request.SenderAccount().String() + + // + // add the request to the "request ref" Lookup Table + if !p.refLUT.Set(ref.AsKey(), entry) { + p.log.Debugf("NOT ADDED, already exists. reqID: %v as key=%v, senderAccount: ", request.ID(), ref, account) + return // not added already exists + } + + // + // add to the account requests, keep the slice ordered + { + reqsForAcount, exists := p.reqsByAcountOrdered.Get(account) + if !exists { + // no other requests for this account + p.reqsByAcountOrdered.Set(account, []*OrderedPoolEntry{entry}) + } else { + // find the index where the new entry should be added + index, exists := slices.BinarySearchFunc(reqsForAcount, entry, + func(a, b *OrderedPoolEntry) int { + aNonce := a.req.Nonce() + bNonce := b.req.Nonce() + if aNonce == bNonce { + return 0 + } + if aNonce > bNonce { + return 1 + } + return -1 + }, + ) + if exists { + // same nonce, mark the existing request with overlapping nonce as "old", place the new one + // NOTE: do not delete the request here, as it might already be part of an on-going consensus round + reqsForAcount[index].old = true + } + + reqsForAcount = append(reqsForAcount, entry) // add to the end of the list (thus extending the array) + + // make room if target position is not at the end + if index != len(reqsForAcount)-1 { + copy(reqsForAcount[index+1:], reqsForAcount[index:]) + reqsForAcount[index] = entry + } + p.reqsByAcountOrdered.Set(account, reqsForAcount) + } + } + + // + // add the to the ordered list of requests by gas price + { + index, _ := slices.BinarySearchFunc(p.orderedByGasPrice, entry, p.reqSort) + p.orderedByGasPrice = append(p.orderedByGasPrice, entry) + // make room if target position is not at the end + if index != len(p.orderedByGasPrice)-1 { + copy(p.orderedByGasPrice[index+1:], p.orderedByGasPrice[index:]) + p.orderedByGasPrice[index] = entry + } + } + + // keep the pool size in check + deleted := p.LimitPoolSize() + if lo.Contains(deleted, entry) { + // this exact request was deleted from the pool, do not update metrics, or mark available + return + } + + // + // update metrics and signal that the request is available + p.log.Debugf("ADD %v as key=%v, senderAccount: %s", request.ID(), ref, account) + p.sizeMetric(p.refLUT.Size()) + p.waitReq.MarkAvailable(request) +} + +// LimitPoolSize drops the txs with the lowest price if the total number of requests is too big +func (p *OffLedgerPool) LimitPoolSize() []*OrderedPoolEntry { + if len(p.orderedByGasPrice) <= p.maxPoolSize { + return nil // nothing to do + } + + totalToDelete := len(p.orderedByGasPrice) - p.maxPoolSize + reqsToDelete := make([]*OrderedPoolEntry, totalToDelete) + j := 0 + for i := 0; i < len(p.orderedByGasPrice); i++ { + if len(p.orderedByGasPrice[i].proposedFor) > 0 { + continue // we cannot drop requests that are being used in current consensus instances + } + reqsToDelete[j] = p.orderedByGasPrice[i] + if j >= totalToDelete-1 { + break + } + } + + for _, r := range reqsToDelete { + p.log.Debugf("LimitPoolSize dropping request: %v", r.req.ID()) + p.Remove(r.req) + } + return reqsToDelete +} + +func (p *OffLedgerPool) GasPrice(e *OrderedPoolEntry) *big.Int { + price := e.req.GasPrice() + if price != nil { + return price + } + // requests without a price specified are assigned the minimum gas price + return p.minGasPrice +} + +func (p *OffLedgerPool) SetMinGasPrice(newPrice *big.Int) { + if p.minGasPrice.Cmp(newPrice) == 0 { + // no change + return + } + // update the price and re-order the transactions + p.minGasPrice = newPrice + slices.SortFunc(p.orderedByGasPrice, p.reqSort) +} + +func (p *OffLedgerPool) reqSort(a, b *OrderedPoolEntry) int { + cmp := p.GasPrice(a).Cmp(p.GasPrice(b)) + if cmp != 0 { + return cmp + } + // use requestID as a fallback in case of matching gas price (it's random and should give roughly the same order between nodes) + aID := a.req.ID() + bID := b.req.ID() + for i := range aID { + if aID[i] == bID[i] { + continue + } + if aID[i] > bID[i] { + return 1 + } + return -1 + } + return 0 +} + +func (p *OffLedgerPool) Remove(request isc.OffLedgerRequest) { + refKey := isc.RequestRefFromRequest(request).AsKey() + entry, exists := p.refLUT.Get(refKey) + if !exists { + return // does not exist + } + defer func() { + p.sizeMetric(p.refLUT.Size()) + p.timeMetric(time.Since(entry.ts)) + }() + + // + // delete from the "requests reference" LookupTable + if p.refLUT.Delete(refKey) { + p.log.Debugf("DEL %v as key=%v", request.ID(), refKey) + } + + // + // find the request in the accounts map and delete it + { + account := entry.req.SenderAccount().String() + reqsByAccount, exists := p.reqsByAcountOrdered.Get(account) + if !exists { + p.log.Error("inconsistency trying to DEL %v as key=%v, no request list for account %s", request.ID(), refKey, account) + return + } + indexToDel := slices.IndexFunc(reqsByAccount, func(e *OrderedPoolEntry) bool { + return refKey == isc.RequestRefFromRequest(e.req).AsKey() + }) + if indexToDel == -1 { + p.log.Error("inconsistency trying to DEL %v as key=%v, request not found in list for account %s", request.ID(), refKey, account) + return + } + if len(reqsByAccount) == 1 { // just remove the entire array for the account + p.reqsByAcountOrdered.Delete(account) + } else { + reqsByAccount[indexToDel] = nil // remove the pointer reference to allow GC of the entry object + reqsByAccount = slices.Delete(reqsByAccount, indexToDel, indexToDel+1) + p.reqsByAcountOrdered.Set(account, reqsByAccount) + } + } + + // + // find and delete the request from the gas price ordered list + { + indexToDel := slices.IndexFunc(p.orderedByGasPrice, func(e *OrderedPoolEntry) bool { + return refKey == isc.RequestRefFromRequest(e.req).AsKey() + }) + p.orderedByGasPrice[indexToDel] = nil // remove the pointer reference to allow GC of the entry object + p.orderedByGasPrice = slices.Delete(p.orderedByGasPrice, indexToDel, indexToDel+1) + } +} + +func (p *OffLedgerPool) Iterate(f func(account string, requests []*OrderedPoolEntry)) { + p.reqsByAcountOrdered.ForEach(func(acc string, entries []*OrderedPoolEntry) bool { + f(acc, slices.Clone(entries)) + return true + }) +} + +func (p *OffLedgerPool) Cleanup(predicate func(request isc.OffLedgerRequest, ts time.Time) bool) { + p.refLUT.ForEach(func(refKey isc.RequestRefKey, entry *OrderedPoolEntry) bool { + if !predicate(entry.req, entry.ts) { + p.Remove(entry.req) + } + return true + }) + p.sizeMetric(p.refLUT.Size()) +} + +func (p *OffLedgerPool) StatusString() string { + return fmt.Sprintf("{|req|=%d}", p.refLUT.Size()) +} + +func (p *OffLedgerPool) WriteContent(w io.Writer) { + p.reqsByAcountOrdered.ForEach(func(_ string, list []*OrderedPoolEntry) bool { + for _, entry := range list { + jsonData, err := isc.RequestToJSON(entry.req) + if err != nil { + return false // stop iteration + } + _, err = w.Write(codec.EncodeUint32(uint32(len(jsonData)))) + if err != nil { + return false // stop iteration + } + _, err = w.Write(jsonData) + if err != nil { + return false // stop iteration + } + } + return true + }) +} diff --git a/packages/chain/mempool/offledger_pool_test.go b/packages/chain/mempool/offledger_pool_test.go new file mode 100644 index 0000000000..e21d00535b --- /dev/null +++ b/packages/chain/mempool/offledger_pool_test.go @@ -0,0 +1,99 @@ +package mempool + +import ( + "math/big" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/require" + + "github.com/iotaledger/wasp/packages/isc" + "github.com/iotaledger/wasp/packages/testutil" + "github.com/iotaledger/wasp/packages/testutil/testkey" + "github.com/iotaledger/wasp/packages/testutil/testlogger" +) + +func TestOffledgerMempoolAccountNonce(t *testing.T) { + waitReq := NewWaitReq(waitRequestCleanupEvery) + pool := NewOffledgerPool(100, waitReq, func(int) {}, func(time.Duration) {}, testlogger.NewSilentLogger("", true)) + + // generate a bunch of requests for the same account + kp, addr := testkey.GenKeyAddr() + agentID := isc.NewAgentID(addr) + + req0 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 0, kp) + req1 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 1, kp) + req2 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 2, kp) + req2new := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 2, kp) + pool.Add(req0) + pool.Add(req1) + pool.Add(req1) // try to add the same request many times + pool.Add(req2) + pool.Add(req1) + require.EqualValues(t, 3, pool.refLUT.Size()) + require.EqualValues(t, 1, pool.reqsByAcountOrdered.Size()) + reqsInPoolForAccount, _ := pool.reqsByAcountOrdered.Get(agentID.String()) + require.Len(t, reqsInPoolForAccount, 3) + pool.Add(req2new) + pool.Add(req2new) + require.EqualValues(t, 4, pool.refLUT.Size()) + require.EqualValues(t, 1, pool.reqsByAcountOrdered.Size()) + reqsInPoolForAccount, _ = pool.reqsByAcountOrdered.Get(agentID.String()) + require.Len(t, reqsInPoolForAccount, 4) + + // try to remove everything during iteration + pool.Iterate(func(account string, entries []*OrderedPoolEntry) { + for _, e := range entries { + pool.Remove(e.req) + } + }) + require.EqualValues(t, 0, pool.refLUT.Size()) + require.EqualValues(t, 0, pool.reqsByAcountOrdered.Size()) +} + +func TestOffledgerMempoolLimit(t *testing.T) { + waitReq := NewWaitReq(waitRequestCleanupEvery) + poolSizeLimit := 3 + pool := NewOffledgerPool(poolSizeLimit, waitReq, func(int) {}, func(time.Duration) {}, testlogger.NewSilentLogger("", true)) + + // create requests with different gas prices + req0 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(1)) + req1 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(2)) + req2 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(3)) + pool.Add(req0) + pool.Add(req1) + pool.Add(req2) + + assertPoolSize := func() { + require.EqualValues(t, 3, pool.refLUT.Size()) + require.Len(t, pool.orderedByGasPrice, 3) + require.EqualValues(t, 3, pool.reqsByAcountOrdered.Size()) + } + contains := func(reqs ...isc.OffLedgerRequest) { + for _, req := range reqs { + lo.ContainsBy(pool.orderedByGasPrice, func(e *OrderedPoolEntry) bool { + return e.req.ID().Equals(req.ID()) + }) + } + } + + assertPoolSize() + + // add a request with high + req3 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(3)) + pool.Add(req3) + assertPoolSize() + contains(req1, req2, req3) // assert req3 was added and req0 was removed + + req4 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(1)) + pool.Add(req4) + assertPoolSize() + contains(req1, req2, req3) // assert req4 is not added + + req5 := testutil.DummyEVMRequest(isc.RandomChainID(), big.NewInt(4)) + pool.Add(req5) + assertPoolSize() + + contains(req2, req3, req5) // assert req5 was added and req1 was removed +} diff --git a/packages/chain/mempool/time_pool.go b/packages/chain/mempool/time_pool.go index f6fbc74c21..0d277ec307 100644 --- a/packages/chain/mempool/time_pool.go +++ b/packages/chain/mempool/time_pool.go @@ -14,26 +14,29 @@ import ( // Maintains a pool of requests that have to be postponed until specified timestamp. type TimePool interface { - AddRequest(timestamp time.Time, request isc.Request) - TakeTill(timestamp time.Time) []isc.Request + AddRequest(timestamp time.Time, request isc.OnLedgerRequest) + TakeTill(timestamp time.Time) []isc.OnLedgerRequest Has(reqID *isc.RequestRef) bool - Filter(predicate func(request isc.Request, ts time.Time) bool) + Cleanup(predicate func(request isc.OnLedgerRequest, ts time.Time) bool) + ShouldRefreshRequests() bool } // Here we implement TimePool. We maintain the request in a list ordered by a timestamp. // The list is organized in slots. Each slot contains a list of requests that fit to the // slot boundaries. type timePoolImpl struct { - requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, isc.Request] // All the requests in this pool. - slots *timeSlot // Structure to fetch them fast by their time. - sizeMetric func(int) - log *logger.Logger + requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, isc.OnLedgerRequest] // All the requests in this pool. + slots *timeSlot // Structure to fetch them fast by their time. + hasDroppedRequests bool + maxPoolSize int + sizeMetric func(int) + log *logger.Logger } type timeSlot struct { from time.Time till time.Time - reqs *shrinkingmap.ShrinkingMap[time.Time, []isc.Request] + reqs *shrinkingmap.ShrinkingMap[time.Time, []isc.OnLedgerRequest] next *timeSlot } @@ -41,33 +44,34 @@ const slotPrecision = time.Minute var _ TimePool = &timePoolImpl{} -func NewTimePool(sizeMetric func(int), log *logger.Logger) TimePool { +func NewTimePool(maxTimedInPool int, sizeMetric func(int), log *logger.Logger) TimePool { return &timePoolImpl{ - requests: shrinkingmap.New[isc.RequestRefKey, isc.Request](), - slots: nil, - sizeMetric: sizeMetric, - log: log, + requests: shrinkingmap.New[isc.RequestRefKey, isc.OnLedgerRequest](), + slots: nil, + hasDroppedRequests: false, + maxPoolSize: maxTimedInPool, + sizeMetric: sizeMetric, + log: log, } } -func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.Request) { +func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.OnLedgerRequest) { reqRefKey := isc.RequestRefFromRequest(request).AsKey() if tpi.requests.Has(reqRefKey) { return } - if tpi.requests.Set(reqRefKey, request) { - tpi.log.Debugf("ADD %v as key=%v", request.ID(), reqRefKey) + if !tpi.requests.Set(reqRefKey, request) { + return } - tpi.sizeMetric(tpi.requests.Size()) reqFrom, reqTill := tpi.timestampSlotBounds(timestamp) prevNext := &tpi.slots for slot := tpi.slots; ; { if slot == nil || slot.from.After(reqFrom) { // Add new slot (append or insert). - newRequests := shrinkingmap.New[time.Time, []isc.Request]() - newRequests.Set(timestamp, []isc.Request{request}) + newRequests := shrinkingmap.New[time.Time, []isc.OnLedgerRequest]() + newRequests.Set(timestamp, []isc.OnLedgerRequest{request}) newSlot := &timeSlot{ from: reqFrom, @@ -76,25 +80,73 @@ func (tpi *timePoolImpl) AddRequest(timestamp time.Time, request isc.Request) { next: slot, } *prevNext = newSlot - return + break } if slot.from == reqFrom { // Add to existing slot. - requests, _ := slot.reqs.GetOrCreate(timestamp, func() []isc.Request { return make([]isc.Request, 0, 1) }) + requests, _ := slot.reqs.GetOrCreate(timestamp, func() []isc.OnLedgerRequest { return make([]isc.OnLedgerRequest, 0, 1) }) slot.reqs.Set(timestamp, append(requests, request)) - return + break } prevNext = &slot.next slot = slot.next } + + // + // keep the size of this pool limited + if tpi.requests.Size() > tpi.maxPoolSize { + // remove the slot most far out in the future + var prev *timeSlot + lastSlot := tpi.slots + for { + if lastSlot.next == nil { + break + } + prev = lastSlot + lastSlot = lastSlot.next + } + + // remove the link to the lastSlot + if prev == nil { + tpi.slots = nil + } else { + prev.next = nil + } + + // delete the requests included in the last slot + reqsToDelete := lastSlot.reqs.Values() + for _, reqs := range reqsToDelete { + for _, req := range reqs { + rKey := isc.RequestRefFromRequest(req).AsKey() + tpi.requests.Delete(rKey) + } + } + tpi.hasDroppedRequests = true + } + + // log and update metrics + tpi.log.Debugf("ADD %v as key=%v", request.ID(), reqRefKey) + tpi.sizeMetric(tpi.requests.Size()) +} + +func (tpi *timePoolImpl) ShouldRefreshRequests() bool { + if !tpi.hasDroppedRequests { + return false + } + if tpi.requests.Size() > 0 { + return false // wait until pool is empty to refresh + } + // assume after this function returns true, the requests will be refreshed + tpi.hasDroppedRequests = false + return true } -func (tpi *timePoolImpl) TakeTill(timestamp time.Time) []isc.Request { - resp := []isc.Request{} +func (tpi *timePoolImpl) TakeTill(timestamp time.Time) []isc.OnLedgerRequest { + resp := []isc.OnLedgerRequest{} for slot := tpi.slots; slot != nil; slot = slot.next { if slot.from.After(timestamp) { break } - slot.reqs.ForEach(func(ts time.Time, tsReqs []isc.Request) bool { + slot.reqs.ForEach(func(ts time.Time, tsReqs []isc.OnLedgerRequest) bool { if ts == timestamp || ts.Before(timestamp) { resp = append(resp, tsReqs...) for _, req := range tsReqs { @@ -121,10 +173,10 @@ func (tpi *timePoolImpl) Has(reqRef *isc.RequestRef) bool { return tpi.requests.Has(reqRef.AsKey()) } -func (tpi *timePoolImpl) Filter(predicate func(request isc.Request, ts time.Time) bool) { +func (tpi *timePoolImpl) Cleanup(predicate func(request isc.OnLedgerRequest, ts time.Time) bool) { prevNext := &tpi.slots for slot := tpi.slots; slot != nil; slot = slot.next { - slot.reqs.ForEach(func(ts time.Time, tsReqs []isc.Request) bool { + slot.reqs.ForEach(func(ts time.Time, tsReqs []isc.OnLedgerRequest) bool { requests := tsReqs for i, req := range requests { if !predicate(req, ts) { diff --git a/packages/chain/mempool/time_pool_test.go b/packages/chain/mempool/time_pool_test.go index a5982e9bc4..ecd55cf4f7 100644 --- a/packages/chain/mempool/time_pool_test.go +++ b/packages/chain/mempool/time_pool_test.go @@ -10,26 +10,31 @@ import ( "github.com/stretchr/testify/require" "pgregory.net/rapid" + iotago "github.com/iotaledger/iota.go/v3" + "github.com/iotaledger/iota.go/v3/tpkg" "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/testutil/testlogger" - "github.com/iotaledger/wasp/packages/vm/core/governance" - "github.com/iotaledger/wasp/packages/vm/gas" ) func TestTimePoolBasic(t *testing.T) { log := testlogger.NewLogger(t) - kp := cryptolib.NewKeyPair() - tp := mempool.NewTimePool(func(i int) {}, log) + tp := mempool.NewTimePool(1000, func(i int) {}, log) t0 := time.Now() t1 := t0.Add(17 * time.Nanosecond) t2 := t0.Add(17 * time.Minute) t3 := t0.Add(17 * time.Hour) - r0 := isc.NewOffLedgerRequest(isc.RandomChainID(), governance.Contract.Hname(), governance.FuncAddCandidateNode.Hname(), nil, 0, gas.LimitsDefault.MaxGasPerRequest).Sign(kp) - r1 := isc.NewOffLedgerRequest(isc.RandomChainID(), governance.Contract.Hname(), governance.FuncAddCandidateNode.Hname(), nil, 1, gas.LimitsDefault.MaxGasPerRequest).Sign(kp) - r2 := isc.NewOffLedgerRequest(isc.RandomChainID(), governance.Contract.Hname(), governance.FuncAddCandidateNode.Hname(), nil, 2, gas.LimitsDefault.MaxGasPerRequest).Sign(kp) - r3 := isc.NewOffLedgerRequest(isc.RandomChainID(), governance.Contract.Hname(), governance.FuncAddCandidateNode.Hname(), nil, 3, gas.LimitsDefault.MaxGasPerRequest).Sign(kp) + + r0, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(0)) + require.NoError(t, err) + r1, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(1)) + require.NoError(t, err) + r2, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(2)) + require.NoError(t, err) + r3, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(3)) + require.NoError(t, err) + require.False(t, tp.Has(isc.RequestRefFromRequest(r0))) require.False(t, tp.Has(isc.RequestRefFromRequest(r1))) require.False(t, tp.Has(isc.RequestRefFromRequest(r2))) @@ -43,7 +48,7 @@ func TestTimePoolBasic(t *testing.T) { require.True(t, tp.Has(isc.RequestRefFromRequest(r2))) require.True(t, tp.Has(isc.RequestRefFromRequest(r3))) - var taken []isc.Request + var taken []isc.OnLedgerRequest taken = tp.TakeTill(t0) require.Len(t, taken, 1) @@ -93,7 +98,7 @@ var _ rapid.StateMachine = &timePoolSM{} func newtimePoolSM(t *rapid.T) *timePoolSM { sm := new(timePoolSM) log := testlogger.NewLogger(t) - sm.tp = mempool.NewTimePool(func(i int) {}, log) + sm.tp = mempool.NewTimePool(1000, func(i int) {}, log) sm.kp = cryptolib.NewKeyPair() sm.added = 0 sm.taken = 0 @@ -106,7 +111,8 @@ func (sm *timePoolSM) Check(t *rapid.T) { func (sm *timePoolSM) AddRequest(t *rapid.T) { ts := time.Unix(rapid.Int64().Draw(t, "req.ts"), 0) - req := isc.NewOffLedgerRequest(isc.RandomChainID(), governance.Contract.Hname(), governance.FuncAddCandidateNode.Hname(), nil, 0, gas.LimitsDefault.MaxGasPerRequest).Sign(sm.kp) + req, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(3)) + require.NoError(t, err) sm.tp.AddRequest(ts, req) sm.added++ } @@ -116,3 +122,35 @@ func (sm *timePoolSM) TakeTill(t *rapid.T) { res := sm.tp.TakeTill(ts) sm.taken += len(res) } + +func TestTimePoolLimit(t *testing.T) { + log := testlogger.NewLogger(t) + size := 0 + tp := mempool.NewTimePool(3, func(newSize int) { size = newSize }, log) + t0 := time.Now().Add(4 * time.Hour) + t1 := time.Now().Add(3 * time.Hour) + t2 := time.Now().Add(2 * time.Hour) + t3 := time.Now().Add(1 * time.Hour) + + r0, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(0)) + require.NoError(t, err) + r1, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(1)) + require.NoError(t, err) + r2, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(2)) + require.NoError(t, err) + r3, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(3)) + require.NoError(t, err) + + tp.AddRequest(t0, r0) + tp.AddRequest(t1, r1) + tp.AddRequest(t2, r2) + tp.AddRequest(t3, r3) + + require.Equal(t, 3, size) + + // assert t0 was removed (the request scheduled to the latest time in the future) + require.False(t, tp.Has(isc.RequestRefFromRequest(r0))) + require.True(t, tp.Has(isc.RequestRefFromRequest(r1))) + require.True(t, tp.Has(isc.RequestRefFromRequest(r2))) + require.True(t, tp.Has(isc.RequestRefFromRequest(r3))) +} diff --git a/packages/chain/mempool/typed_pool.go b/packages/chain/mempool/typed_pool.go index 4398c9dc3e..3dc2a3186d 100644 --- a/packages/chain/mempool/typed_pool.go +++ b/packages/chain/mempool/typed_pool.go @@ -6,36 +6,62 @@ package mempool import ( "fmt" "io" + "slices" "time" + "github.com/samber/lo" + "github.com/iotaledger/hive.go/ds/shrinkingmap" "github.com/iotaledger/hive.go/logger" + consGR "github.com/iotaledger/wasp/packages/chain/cons/cons_gr" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/kv/codec" ) +type RequestPool[V isc.Request] interface { + Has(reqRef *isc.RequestRef) bool + Get(reqRef *isc.RequestRef) V + Add(request V) + Remove(request V) + // this removes requests from the pool if predicate returns false + Cleanup(predicate func(request V, ts time.Time) bool) + Iterate(f func(e *typedPoolEntry[V]) bool) + StatusString() string + WriteContent(io.Writer) + ShouldRefreshRequests() bool +} + +// TODO add gas price to on-ledger requests +// TODO this list needs to be periodically re-filled from L1 once the activity is lower type typedPool[V isc.Request] struct { - waitReq WaitReq - requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *typedPoolEntry[V]] - sizeMetric func(int) - timeMetric func(time.Duration) - log *logger.Logger + waitReq WaitReq + requests *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *typedPoolEntry[V]] + ordered []*typedPoolEntry[V] // TODO use a better data structure instead!!! (probably RedBlackTree) + hasDroppedRequests bool + maxPoolSize int + sizeMetric func(int) + timeMetric func(time.Duration) + log *logger.Logger } type typedPoolEntry[V isc.Request] struct { - req V - ts time.Time + req V + proposedFor []consGR.ConsensusID + ts time.Time } var _ RequestPool[isc.OffLedgerRequest] = &typedPool[isc.OffLedgerRequest]{} -func NewTypedPool[V isc.Request](waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) RequestPool[V] { +func NewTypedPool[V isc.Request](maxOnledgerInPool int, waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) RequestPool[V] { return &typedPool[V]{ - waitReq: waitReq, - requests: shrinkingmap.New[isc.RequestRefKey, *typedPoolEntry[V]](), - sizeMetric: sizeMetric, - timeMetric: timeMetric, - log: log, + waitReq: waitReq, + requests: shrinkingmap.New[isc.RequestRefKey, *typedPoolEntry[V]](), + ordered: []*typedPoolEntry[V]{}, + hasDroppedRequests: false, + maxPoolSize: maxOnledgerInPool, + sizeMetric: sizeMetric, + timeMetric: timeMetric, + log: log, } } @@ -53,42 +79,141 @@ func (olp *typedPool[V]) Get(reqRef *isc.RequestRef) V { func (olp *typedPool[V]) Add(request V) { refKey := isc.RequestRefFromRequest(request).AsKey() - if olp.requests.Set(refKey, &typedPoolEntry[V]{req: request, ts: time.Now()}) { - olp.log.Debugf("ADD %v as key=%v", request.ID(), refKey) - olp.sizeMetric(olp.requests.Size()) + entry := &typedPoolEntry[V]{ + req: request, + ts: time.Now(), + proposedFor: []consGR.ConsensusID{}, } + if !olp.requests.Set(refKey, entry) { + return // already in pool + } + + // + // add the to the ordered list of requests + { + index, _ := slices.BinarySearchFunc(olp.ordered, entry, olp.reqSort) + olp.ordered = append(olp.ordered, entry) + // make room if target position is not at the end + if index != len(olp.ordered)-1 { + copy(olp.ordered[index+1:], olp.ordered[index:]) + olp.ordered[index] = entry + } + } + + // keep the pool size in check + deleted := olp.LimitPoolSize() + if lo.Contains(deleted, entry) { + // this exact request was deleted from the pool, do not update metrics, or mark available + return + } + + // + // update metrics and signal that the request is available + olp.log.Debugf("ADD %v as key=%v", request.ID(), refKey) + olp.sizeMetric(olp.requests.Size()) olp.waitReq.MarkAvailable(request) } +// LimitPoolSize drops the txs with the lowest price if the total number of requests is too big +func (olp *typedPool[V]) LimitPoolSize() []*typedPoolEntry[V] { + if len(olp.ordered) <= olp.maxPoolSize { + return nil // nothing to do + } + + totalToDelete := len(olp.ordered) - olp.maxPoolSize + reqsToDelete := make([]*typedPoolEntry[V], totalToDelete) + j := 0 + for i := 0; i < len(olp.ordered); i++ { + if len(olp.ordered[i].proposedFor) > 0 { + continue // we cannot drop requests that are being used in current consensus instances + } + reqsToDelete[j] = olp.ordered[i] + if j >= totalToDelete-1 { + break + } + } + + for _, r := range reqsToDelete { + olp.log.Debugf("LimitPoolSize dropping request: %v", r.req.ID()) + olp.Remove(r.req) + } + olp.hasDroppedRequests = true + return reqsToDelete +} + +func (olp *typedPool[V]) reqSort(a, b *typedPoolEntry[V]) int { + // TODO use gas price to sort here, once on-ledger requests have a gas price field + // use requestID as a fallback in case of matching gas price (it's random and should give roughly the same order between nodes) + aID := a.req.ID() + bID := b.req.ID() + for i := range aID { + if aID[i] == bID[i] { + continue + } + if aID[i] > bID[i] { + return 1 + } + return -1 + } + return 0 +} + func (olp *typedPool[V]) Remove(request V) { refKey := isc.RequestRefFromRequest(request).AsKey() - if entry, ok := olp.requests.Get(refKey); ok { - if olp.requests.Delete(refKey) { - olp.log.Debugf("DEL %v as key=%v", request.ID(), refKey) - } - olp.sizeMetric(olp.requests.Size()) - olp.timeMetric(time.Since(entry.ts)) + entry, ok := olp.requests.Get(refKey) + if !ok { + return + } + if !olp.requests.Delete(refKey) { + return + } + + // + // find and delete the request from the ordered list + { + indexToDel := slices.IndexFunc(olp.ordered, func(e *typedPoolEntry[V]) bool { + return refKey == isc.RequestRefFromRequest(e.req).AsKey() + }) + olp.ordered[indexToDel] = nil // remove the pointer reference to allow GC of the entry object + olp.ordered = slices.Delete(olp.ordered, indexToDel, indexToDel+1) + } + + // log and update metrics + olp.log.Debugf("DEL %v as key=%v", request.ID(), refKey) + olp.sizeMetric(olp.requests.Size()) + olp.timeMetric(time.Since(entry.ts)) +} + +func (olp *typedPool[V]) ShouldRefreshRequests() bool { + if !olp.hasDroppedRequests { + return false } + if olp.requests.Size() > 0 { + return false // wait until pool is empty to refresh + } + // assume after this function returns true, the requests will be refreshed + olp.hasDroppedRequests = false + return true } -func (olp *typedPool[V]) Filter(predicate func(request V, ts time.Time) bool) { +func (olp *typedPool[V]) Cleanup(predicate func(request V, ts time.Time) bool) { olp.requests.ForEach(func(refKey isc.RequestRefKey, entry *typedPoolEntry[V]) bool { if !predicate(entry.req, entry.ts) { - if olp.requests.Delete(refKey) { - olp.log.Debugf("DEL %v as key=%v", entry.req.ID(), refKey) - olp.timeMetric(time.Since(entry.ts)) - } + olp.Remove(entry.req) } return true }) olp.sizeMetric(olp.requests.Size()) } -func (olp *typedPool[V]) Iterate(f func(e *typedPoolEntry[V])) { - olp.requests.ForEach(func(refKey isc.RequestRefKey, entry *typedPoolEntry[V]) bool { - f(entry) - return true - }) +func (olp *typedPool[V]) Iterate(f func(e *typedPoolEntry[V]) bool) { + orderedCopy := slices.Clone(olp.ordered) + for _, entry := range orderedCopy { + if !f(entry) { + break + } + } + olp.sizeMetric(olp.requests.Size()) } diff --git a/packages/chain/mempool/typed_pool_by_nonce.go b/packages/chain/mempool/typed_pool_by_nonce.go deleted file mode 100644 index d5b7dca220..0000000000 --- a/packages/chain/mempool/typed_pool_by_nonce.go +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright 2020 IOTA Stiftung -// SPDX-License-Identifier: Apache-2.0 - -package mempool - -import ( - "fmt" - "io" - "slices" - "time" - - "github.com/iotaledger/hive.go/ds/shrinkingmap" - "github.com/iotaledger/hive.go/logger" - consGR "github.com/iotaledger/wasp/packages/chain/cons/cons_gr" - "github.com/iotaledger/wasp/packages/isc" - "github.com/iotaledger/wasp/packages/kv/codec" -) - -// keeps a map of requests ordered by nonce for each account -type TypedPoolByNonce[V isc.OffLedgerRequest] struct { - waitReq WaitReq - refLUT *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *OrderedPoolEntry[V]] - // reqsByAcountOrdered keeps an ordered map of reqsByAcountOrdered for each account by nonce - reqsByAcountOrdered *shrinkingmap.ShrinkingMap[string, []*OrderedPoolEntry[V]] // string is isc.AgentID.String() - sizeMetric func(int) - timeMetric func(time.Duration) - log *logger.Logger -} - -func NewTypedPoolByNonce[V isc.OffLedgerRequest](waitReq WaitReq, sizeMetric func(int), timeMetric func(time.Duration), log *logger.Logger) *TypedPoolByNonce[V] { - return &TypedPoolByNonce[V]{ - waitReq: waitReq, - reqsByAcountOrdered: shrinkingmap.New[string, []*OrderedPoolEntry[V]](), - refLUT: shrinkingmap.New[isc.RequestRefKey, *OrderedPoolEntry[V]](), - sizeMetric: sizeMetric, - timeMetric: timeMetric, - log: log, - } -} - -type OrderedPoolEntry[V isc.OffLedgerRequest] struct { - req V - old bool - ts time.Time - proposedFor []consGR.ConsensusID -} - -func (p *TypedPoolByNonce[V]) Has(reqRef *isc.RequestRef) bool { - return p.refLUT.Has(reqRef.AsKey()) -} - -func (p *TypedPoolByNonce[V]) Get(reqRef *isc.RequestRef) V { - entry, exists := p.refLUT.Get(reqRef.AsKey()) - if !exists { - return *new(V) - } - return entry.req -} - -func (p *TypedPoolByNonce[V]) Add(request V) { - ref := isc.RequestRefFromRequest(request) - entry := &OrderedPoolEntry[V]{req: request, ts: time.Now()} - account := request.SenderAccount().String() - - if !p.refLUT.Set(ref.AsKey(), entry) { - p.log.Debugf("NOT ADDED, already exists. reqID: %v as key=%v, senderAccount: ", request.ID(), ref, account) - return // not added already exists - } - - defer func() { - p.log.Debugf("ADD %v as key=%v, senderAccount: %s", request.ID(), ref, account) - p.sizeMetric(p.refLUT.Size()) - p.waitReq.MarkAvailable(request) - }() - - reqsForAcount, exists := p.reqsByAcountOrdered.Get(account) - if !exists { - // no other requests for this account - p.reqsByAcountOrdered.Set(account, []*OrderedPoolEntry[V]{entry}) - return - } - - // add to the account requests, keep the slice ordered - - // find the index where the new entry should be added - index, exists := slices.BinarySearchFunc(reqsForAcount, entry, - func(a, b *OrderedPoolEntry[V]) int { - aNonce := a.req.Nonce() - bNonce := b.req.Nonce() - if aNonce == bNonce { - return 0 - } - if aNonce > bNonce { - return 1 - } - return -1 - }, - ) - if exists { - // same nonce, mark the existing request with overlapping nonce as "old", place the new one - // NOTE: do not delete the request here, as it might already be part of an on-going consensus round - reqsForAcount[index].old = true - } - - reqsForAcount = append(reqsForAcount, entry) // add to the end of the list (thus extending the array) - - // make room if target position is not at the end - if index != len(reqsForAcount)+1 { - copy(reqsForAcount[index+1:], reqsForAcount[index:]) - reqsForAcount[index] = entry - } - p.reqsByAcountOrdered.Set(account, reqsForAcount) -} - -func (p *TypedPoolByNonce[V]) Remove(request V) { - refKey := isc.RequestRefFromRequest(request).AsKey() - entry, exists := p.refLUT.Get(refKey) - if !exists { - return // does not exist - } - defer func() { - p.sizeMetric(p.refLUT.Size()) - p.timeMetric(time.Since(entry.ts)) - }() - if p.refLUT.Delete(refKey) { - p.log.Debugf("DEL %v as key=%v", request.ID(), refKey) - } - account := entry.req.SenderAccount().String() - reqsByAccount, exists := p.reqsByAcountOrdered.Get(account) - if !exists { - p.log.Error("inconsistency trying to DEL %v as key=%v, no request list for account %s", request.ID(), refKey, account) - return - } - // find the request in the accounts map - indexToDel := slices.IndexFunc(reqsByAccount, func(e *OrderedPoolEntry[V]) bool { - return refKey == isc.RequestRefFromRequest(e.req).AsKey() - }) - if indexToDel == -1 { - p.log.Error("inconsistency trying to DEL %v as key=%v, request not found in list for account %s", request.ID(), refKey, account) - return - } - if len(reqsByAccount) == 1 { // just remove the entire array for the account - p.reqsByAcountOrdered.Delete(account) - return - } - reqsByAccount[indexToDel] = nil // remove the pointer reference to allow GC of the entry object - reqsByAccount = slices.Delete(reqsByAccount, indexToDel, indexToDel+1) - p.reqsByAcountOrdered.Set(account, reqsByAccount) -} - -func (p *TypedPoolByNonce[V]) Iterate(f func(account string, requests []*OrderedPoolEntry[V])) { - p.reqsByAcountOrdered.ForEach(func(acc string, entries []*OrderedPoolEntry[V]) bool { - f(acc, slices.Clone(entries)) - return true - }) -} - -func (p *TypedPoolByNonce[V]) Filter(predicate func(request V, ts time.Time) bool) { - p.refLUT.ForEach(func(refKey isc.RequestRefKey, entry *OrderedPoolEntry[V]) bool { - if !predicate(entry.req, entry.ts) { - p.Remove(entry.req) - } - return true - }) - p.sizeMetric(p.refLUT.Size()) -} - -func (p *TypedPoolByNonce[V]) StatusString() string { - return fmt.Sprintf("{|req|=%d}", p.refLUT.Size()) -} - -func (p *TypedPoolByNonce[V]) WriteContent(w io.Writer) { - p.reqsByAcountOrdered.ForEach(func(_ string, list []*OrderedPoolEntry[V]) bool { - for _, entry := range list { - jsonData, err := isc.RequestToJSON(entry.req) - if err != nil { - return false // stop iteration - } - _, err = w.Write(codec.EncodeUint32(uint32(len(jsonData)))) - if err != nil { - return false // stop iteration - } - _, err = w.Write(jsonData) - if err != nil { - return false // stop iteration - } - } - return true - }) -} diff --git a/packages/chain/mempool/typed_pool_by_nonce_test.go b/packages/chain/mempool/typed_pool_by_nonce_test.go deleted file mode 100644 index 85ef62afd8..0000000000 --- a/packages/chain/mempool/typed_pool_by_nonce_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package mempool - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/iotaledger/wasp/packages/isc" - "github.com/iotaledger/wasp/packages/testutil" - "github.com/iotaledger/wasp/packages/testutil/testkey" - "github.com/iotaledger/wasp/packages/testutil/testlogger" -) - -func TestSomething(t *testing.T) { - waitReq := NewWaitReq(waitRequestCleanupEvery) - pool := NewTypedPoolByNonce[isc.OffLedgerRequest](waitReq, func(int) {}, func(time.Duration) {}, testlogger.NewSilentLogger("", true)) - - // generate a bunch of requests for the same account - kp, addr := testkey.GenKeyAddr() - agentID := isc.NewAgentID(addr) - - req0 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 0, kp) - req1 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 1, kp) - req2 := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 2, kp) - req2new := testutil.DummyOffledgerRequestForAccount(isc.RandomChainID(), 2, kp) - pool.Add(req0) - pool.Add(req1) - pool.Add(req1) // try to add the same request many times - pool.Add(req2) - pool.Add(req1) - require.EqualValues(t, 3, pool.refLUT.Size()) - require.EqualValues(t, 1, pool.reqsByAcountOrdered.Size()) - reqsInPoolForAccount, _ := pool.reqsByAcountOrdered.Get(agentID.String()) - require.Len(t, reqsInPoolForAccount, 3) - pool.Add(req2new) - pool.Add(req2new) - require.EqualValues(t, 4, pool.refLUT.Size()) - require.EqualValues(t, 1, pool.reqsByAcountOrdered.Size()) - reqsInPoolForAccount, _ = pool.reqsByAcountOrdered.Get(agentID.String()) - require.Len(t, reqsInPoolForAccount, 4) - - // try to remove everything during iteration - pool.Iterate(func(account string, entries []*OrderedPoolEntry[isc.OffLedgerRequest]) { - for _, e := range entries { - pool.Remove(e.req) - } - }) - require.EqualValues(t, 0, pool.refLUT.Size()) - require.EqualValues(t, 0, pool.reqsByAcountOrdered.Size()) -} diff --git a/packages/chain/mempool/typed_pool_test.go b/packages/chain/mempool/typed_pool_test.go new file mode 100644 index 0000000000..a12a1ae8f2 --- /dev/null +++ b/packages/chain/mempool/typed_pool_test.go @@ -0,0 +1,39 @@ +// Copyright 2020 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +package mempool + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + iotago "github.com/iotaledger/iota.go/v3" + "github.com/iotaledger/iota.go/v3/tpkg" + "github.com/iotaledger/wasp/packages/isc" + "github.com/iotaledger/wasp/packages/testutil/testlogger" +) + +func TestTypedMempoolPoolLimit(t *testing.T) { + waitReq := NewWaitReq(waitRequestCleanupEvery) + poolSizeLimit := 3 + size := 0 + pool := NewTypedPool[isc.OnLedgerRequest](poolSizeLimit, waitReq, func(newSize int) { size = newSize }, func(time.Duration) {}, testlogger.NewSilentLogger("", true)) + + r0, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(0)) + require.NoError(t, err) + r1, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(1)) + require.NoError(t, err) + r2, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(2)) + require.NoError(t, err) + r3, err := isc.OnLedgerFromUTXO(&iotago.BasicOutput{}, tpkg.RandOutputID(3)) + require.NoError(t, err) + + pool.Add(r0) + pool.Add(r1) + pool.Add(r2) + pool.Add(r3) + + require.Equal(t, 3, size) +} diff --git a/packages/chain/node.go b/packages/chain/node.go index 6549d3d976..1848a2822e 100644 --- a/packages/chain/node.go +++ b/packages/chain/node.go @@ -141,6 +141,8 @@ type ChainNodeConn interface { onChainConnect func(), onChainDisconnect func(), ) + // called if the mempoll has dropped some requests during congestion, and now the congestion stopped + RefreshOnLedgerRequests(ctx context.Context, chainID isc.ChainID) } type chainNodeImpl struct { @@ -283,7 +285,7 @@ func New( recoveryTimeout time.Duration, validatorAgentID isc.AgentID, smParameters sm_gpa.StateManagerParameters, - mempoolTTL time.Duration, + mempoolSettings mempool.Settings, mempoolBroadcastInterval time.Duration, ) (Chain, error) { log.Debugf("Starting the chain, chainID=%v", chainID) @@ -433,9 +435,11 @@ func New( chainMetrics.Mempool, chainMetrics.Pipe, cni.listener, - mempoolTTL, + mempoolSettings, mempoolBroadcastInterval, + func() { nodeConn.RefreshOnLedgerRequests(ctx, chainID) }, ) + cni.chainMgr = gpa.NewAckHandler(cni.me, chainMgr.AsGPA(), RedeliveryPeriod) cni.stateMgr = stateMgr cni.mempool = mempool diff --git a/packages/chain/node_test.go b/packages/chain/node_test.go index 8249e5dcd6..96e39b21a3 100644 --- a/packages/chain/node_test.go +++ b/packages/chain/node_test.go @@ -19,6 +19,7 @@ import ( iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/wasp/contracts/native/inccounter" "github.com/iotaledger/wasp/packages/chain" + "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa/sm_gpa_utils" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_snapshots" @@ -281,6 +282,8 @@ type testNodeConn struct { attachWG *sync.WaitGroup } +var _ chain.NodeConnection = &testNodeConn{} + func newTestNodeConn(t *testing.T) *testNodeConn { tnc := &testNodeConn{ t: t, @@ -371,6 +374,11 @@ func (tnc *testNodeConn) GetL1ProtocolParams() *iotago.ProtocolParameters { return testparameters.GetL1ProtocolParamsForTesting() } +// RefreshOnLedgerRequests implements chain.NodeConnection. +func (tnc *testNodeConn) RefreshOnLedgerRequests(ctx context.Context, chainID isc.ChainID) { + // noop +} + //////////////////////////////////////////////////////////////////////////////// // testEnv @@ -469,7 +477,14 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv { 10*time.Second, accounts.CommonAccount(), sm_gpa.NewStateManagerParameters(), - 24*time.Hour, + mempool.Settings{ + TTL: 24 * time.Hour, + MaxOffledgerInPool: 1000, + MaxOnledgerInPool: 1000, + MaxTimedInPool: 1000, + MaxOnledgerToPropose: 1000, + MaxOffledgerToPropose: 1000, + }, 1*time.Second, ) require.NoError(t, err) diff --git a/packages/chains/chains.go b/packages/chains/chains.go index 6b68686a56..3bbfa85a79 100644 --- a/packages/chains/chains.go +++ b/packages/chains/chains.go @@ -17,6 +17,7 @@ import ( iotago "github.com/iotaledger/iota.go/v3" "github.com/iotaledger/wasp/packages/chain" "github.com/iotaledger/wasp/packages/chain/cmt_log" + "github.com/iotaledger/wasp/packages/chain/mempool" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa/sm_gpa_utils" "github.com/iotaledger/wasp/packages/chain/statemanager/sm_snapshots" @@ -93,7 +94,7 @@ type Chains struct { validatorFeeAddr iotago.Address - mempoolTTL time.Duration + mempoolSettings mempool.Settings mempoolBroadcastInterval time.Duration } @@ -138,7 +139,7 @@ func New( nodeIdentityProvider registry.NodeIdentityProvider, consensusStateRegistry cmt_log.ConsensusStateRegistry, chainListener chain.ChainListener, - mempoolTTL time.Duration, + mempoolSettings mempool.Settings, mempoolBroadcastInterval time.Duration, shutdownCoordinator *shutdown.Coordinator, chainMetricsProvider *metrics.ChainMetricsProvider, @@ -188,7 +189,7 @@ func New( dkShareRegistryProvider: dkShareRegistryProvider, nodeIdentityProvider: nodeIdentityProvider, chainListener: nil, // See bellow. - mempoolTTL: mempoolTTL, + mempoolSettings: mempoolSettings, mempoolBroadcastInterval: mempoolBroadcastInterval, consensusStateRegistry: consensusStateRegistry, shutdownCoordinator: shutdownCoordinator, @@ -424,7 +425,7 @@ func (c *Chains) activateWithoutLocking(chainID isc.ChainID) error { //nolint:fu c.recoveryTimeout, validatorAgentID, stateManagerParameters, - c.mempoolTTL, + c.mempoolSettings, c.mempoolBroadcastInterval, ) if err != nil { diff --git a/packages/evm/jsonrpc/jsonrpctest/env.go b/packages/evm/jsonrpc/jsonrpctest/env.go index 17d6111dd7..ca4a97e4a7 100644 --- a/packages/evm/jsonrpc/jsonrpctest/env.go +++ b/packages/evm/jsonrpc/jsonrpctest/env.go @@ -378,6 +378,19 @@ func (e *Env) TestRPCGasLimitTooLow() { func (e *Env) TestGasPrice() { gasPrice := e.MustGetGasPrice() require.NotZero(e.T, gasPrice.Uint64()) + + // assert sending txs with lower than set gas is not allowed + from, _ := e.NewAccountWithL2Funds() + tx, err := types.SignTx( + types.NewTransaction(0, common.Address{}, big.NewInt(123), math.MaxUint64, new(big.Int).Sub(gasPrice, big.NewInt(1)), nil), + e.Signer(), + from, + ) + require.NoError(e.T, err) + + _, err = e.SendTransactionAndWait(tx) + require.Error(e.T, err) + require.Regexp(e.T, `insufficient gas price: got \d+, minimum is \d+`, err.Error()) } func (e *Env) TestRPCAccessHistoricalState() { diff --git a/packages/evm/solidity/abi.go b/packages/evm/solidity/abi.go index 2d0074f5b8..7d13b6e1b2 100644 --- a/packages/evm/solidity/abi.go +++ b/packages/evm/solidity/abi.go @@ -42,7 +42,7 @@ func StorageEncodeString(slotNumber uint8, s string) (ret map[common.Hash]common ret[mainSlot] = common.BigToHash(big.NewInt(int64(len(s)*2) + 1)) i := 0 - for len(s) > 0 { + for s != "" { var chunk common.Hash copy(chunk[:], s) diff --git a/packages/isc/request.go b/packages/isc/request.go index ed4fde83ec..c4a049db3c 100644 --- a/packages/isc/request.go +++ b/packages/isc/request.go @@ -3,6 +3,7 @@ package isc import ( "fmt" "io" + "math/big" "time" "github.com/ethereum/go-ethereum" @@ -80,6 +81,7 @@ type OffLedgerRequest interface { ChainID() ChainID Nonce() uint64 VerifySignature() error + GasPrice() *big.Int } type OnLedgerRequest interface { diff --git a/packages/isc/request_evmcall.go b/packages/isc/request_evmcall.go index 67f6b51027..aacc288067 100644 --- a/packages/isc/request_evmcall.go +++ b/packages/isc/request_evmcall.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io" + "math/big" "github.com/ethereum/go-ethereum" @@ -126,3 +127,7 @@ func (req *evmOffLedgerCallRequest) VerifySignature() error { func (req *evmOffLedgerCallRequest) EVMCallMsg() *ethereum.CallMsg { return &req.callMsg } + +func (req *evmOffLedgerCallRequest) GasPrice() *big.Int { + return req.callMsg.GasPrice +} diff --git a/packages/isc/request_evmtx.go b/packages/isc/request_evmtx.go index 7a094d400d..6ccfdade73 100644 --- a/packages/isc/request_evmtx.go +++ b/packages/isc/request_evmtx.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math/big" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" @@ -153,3 +154,11 @@ func (req *evmOffLedgerTxRequest) VerifySignature() error { func (req *evmOffLedgerTxRequest) EVMCallMsg() *ethereum.CallMsg { return EVMCallDataFromTx(req.tx) } + +func (req *evmOffLedgerTxRequest) TxValue() *big.Int { + return req.tx.Value() +} + +func (req *evmOffLedgerTxRequest) GasPrice() *big.Int { + return req.tx.GasPrice() +} diff --git a/packages/isc/request_offledger.go b/packages/isc/request_offledger.go index faaf29efe0..ccd778c0ac 100644 --- a/packages/isc/request_offledger.go +++ b/packages/isc/request_offledger.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "math/big" "time" "github.com/ethereum/go-ethereum" @@ -271,3 +272,7 @@ func (req *OffLedgerRequestData) WithSender(sender *cryptolib.PublicKey) Unsigne } return req } + +func (req *OffLedgerRequestData) GasPrice() *big.Int { + return nil +} diff --git a/packages/nodeconn/nc_chain.go b/packages/nodeconn/nc_chain.go index 1783e8cebe..7f080aedf4 100644 --- a/packages/nodeconn/nc_chain.go +++ b/packages/nodeconn/nc_chain.go @@ -56,7 +56,7 @@ type ncChain struct { ctx context.Context nodeConn *nodeConnection chainID isc.ChainID - requestOutputHandler func(iotago.MilestoneIndex, *isc.OutputInfo) + requestOutputHandler func(*isc.OutputInfo) aliasOutputHandler func(iotago.MilestoneIndex, *isc.OutputInfo) milestoneHandler func(iotago.MilestoneIndex, time.Time) @@ -100,8 +100,8 @@ func newNCChain( lastPendingTx: nil, } - chain.requestOutputHandler = func(milestoneIndex iotago.MilestoneIndex, outputInfo *isc.OutputInfo) { - chain.LogDebugf("applying request output: outputID: %s, milestoneIndex: %d, chainID: %s", outputInfo.OutputID.ToHex(), milestoneIndex, chainID) + chain.requestOutputHandler = func(outputInfo *isc.OutputInfo) { + chain.LogDebugf("applying request output: outputID: %s, , chainID: %s", outputInfo.OutputID.ToHex(), chainID) requestOutputHandler(outputInfo) } @@ -168,7 +168,7 @@ func (ncc *ncChain) applyPendingLedgerUpdates(ledgerIndex iotago.MilestoneIndex) switch update.Type { case pendingLedgerUpdateTypeRequest: - ncc.requestOutputHandler(update.LedgerIndex, update.Update.(*isc.OutputInfo)) + ncc.requestOutputHandler(update.Update.(*isc.OutputInfo)) case pendingLedgerUpdateTypeAlias: ncc.aliasOutputHandler(update.LedgerIndex, update.Update.(*isc.OutputInfo)) case pendingLedgerUpdateTypeMilestone: @@ -198,7 +198,7 @@ func (ncc *ncChain) HandleRequestOutput(ledgerIndex iotago.MilestoneIndex, outpu return } - ncc.requestOutputHandler(ledgerIndex, outputInfo) + ncc.requestOutputHandler(outputInfo) } func (ncc *ncChain) HandleAliasOutput(ledgerIndex iotago.MilestoneIndex, outputInfo *isc.OutputInfo) { @@ -669,6 +669,19 @@ func (ncc *ncChain) SyncChainStateWithL1(ctx context.Context) error { ncc.milestoneHandler(ledgerIndex, milestoneTimestamp) ncc.aliasOutputHandler(ledgerIndex, aliasOutput) + if err := ncc.refreshOwnedOutputs(ctx); err != nil { + return err + } + + if err := ncc.applyPendingLedgerUpdates(ledgerIndex); err != nil { + return err + } + + ncc.LogInfof("Synchronizing chain state and owned outputs for %s... done. (LedgerIndex: %d)", ncc.chainID, ledgerIndex) + return nil +} + +func (ncc *ncChain) refreshOwnedOutputs(ctx context.Context) error { // the indexer returns the outputs in sorted order by timestampBooked, // so we don't miss newly added outputs if the ledgerIndex increases during the query. // HINT: requests might be applied twice, if they are part of a pendingLedgerUpdate that overlaps with @@ -686,14 +699,8 @@ func (ncc *ncChain) SyncChainStateWithL1(ctx context.Context) error { } ncc.LogDebugf("received output, chainID: %s, outputID: %s", ncc.chainID, outputID.ToHex()) - ncc.requestOutputHandler(ledgerIndex, isc.NewOutputInfo(outputID, output, iotago.TransactionID{})) - } - - if err := ncc.applyPendingLedgerUpdates(ledgerIndex); err != nil { - return err + ncc.requestOutputHandler(isc.NewOutputInfo(outputID, output, iotago.TransactionID{})) } - - ncc.LogInfof("Synchronizing chain state and owned outputs for %s... done. (LedgerIndex: %d)", ncc.chainID, ledgerIndex) return nil } diff --git a/packages/nodeconn/nodeconn.go b/packages/nodeconn/nodeconn.go index 73fdee5258..84cb957348 100644 --- a/packages/nodeconn/nodeconn.go +++ b/packages/nodeconn/nodeconn.go @@ -81,6 +81,17 @@ type nodeConnection struct { shutdownHandler *shutdown.ShutdownHandler } +// RefreshOnLedgerRequests implements chain.NodeConnection. +func (nc *nodeConnection) RefreshOnLedgerRequests(ctx context.Context, chainID isc.ChainID) { + ncChain, ok := nc.chainsMap.Get(chainID) + if !ok { + panic("unexpected chainID") + } + if err := ncChain.refreshOwnedOutputs(ctx); err != nil { + nc.LogError(fmt.Sprintf("error refreshing outputs: %s", err.Error())) + } +} + func New( ctx context.Context, log *logger.Logger, diff --git a/packages/testutil/dummyrequest.go b/packages/testutil/dummyrequest.go index 697e13b83b..a0c9b49201 100644 --- a/packages/testutil/dummyrequest.go +++ b/packages/testutil/dummyrequest.go @@ -1,6 +1,12 @@ package testutil import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/kv/dict" @@ -24,3 +30,26 @@ func DummyOffledgerRequestForAccount(chainID isc.ChainID, nonce uint64, kp *cryp req := isc.NewOffLedgerRequest(chainID, contract, entrypoint, args, nonce, gas.LimitsDefault.MaxGasPerRequest) return req.Sign(kp) } + +func DummyEVMRequest(chainID isc.ChainID, gasPrice *big.Int) isc.OffLedgerRequest { + key, err := crypto.GenerateKey() + if err != nil { + panic(err) + } + + tx := types.MustSignNewTx(key, types.NewEIP155Signer(big.NewInt(0)), + &types.LegacyTx{ + Nonce: 0, + To: &common.MaxAddress, + Value: big.NewInt(123), + Gas: 10000, + GasPrice: gasPrice, + Data: []byte{}, + }) + + req, err := isc.NewEVMOffLedgerTxRequest(chainID, tx) + if err != nil { + panic(err) + } + return req +} diff --git a/packages/trie/test/proof_test.go b/packages/trie/test/proof_test.go index dc8846b87e..e903817271 100644 --- a/packages/trie/test/proof_test.go +++ b/packages/trie/test/proof_test.go @@ -30,7 +30,7 @@ func TestProofScenariosBlake2b(t *testing.T) { p := trr.MerkleProof([]byte(k)) err = p.Validate(root.Bytes()) require.NoError(t, err) - if len(v) > 0 { + if v != "" { cID := trie.CommitToData([]byte(v)) err = p.ValidateWithTerminal(root.Bytes(), cID.Bytes()) require.NoError(t, err) diff --git a/packages/trie/test/trie_test.go b/packages/trie/test/trie_test.go index a45db0c03d..115600e8f0 100644 --- a/packages/trie/test/trie_test.go +++ b/packages/trie/test/trie_test.go @@ -299,7 +299,7 @@ func runUpdateScenario(trieUpdatable *trie.TrieUpdatable, store trie.KVStore, sc continue // key must not be empty } key = []byte(before) - if len(after) > 0 { + if after != "" { value = []byte(after) } } else { @@ -333,11 +333,11 @@ func checkResult(t *testing.T, trie *trie.TrieUpdatable, checklist map[string]st for key, expectedValue := range checklist { v := trie.GetStr(key) if traceScenarios { - if len(v) > 0 { + if v != "" { fmt.Printf("FOUND '%s': '%s' (expected '%s')\n", key, v, expectedValue) } else { fmt.Printf("NOT FOUND '%s' (expected '%s')\n", key, func() string { - if len(expectedValue) > 0 { + if expectedValue != "" { return "FOUND" } return "NOT FOUND" diff --git a/packages/util/pipe/queue_test.go b/packages/util/pipe/queue_test.go index 98884d76ac..45863a0ad9 100644 --- a/packages/util/pipe/queue_test.go +++ b/packages/util/pipe/queue_test.go @@ -170,12 +170,11 @@ func TestLimitPriorityLimitedPriorityHashQueueTwice(t *testing.T) { return 3*index/2 - 20 } return (3*index - 41) / 2 - } else { - if index%2 == 1 { - return (3*index - 139) / 2 - } - return 3*index/2 - 70 } + if index%2 == 1 { + return (3*index - 139) / 2 + } + return 3*index/2 - 70 } testQueueTwice(NewSimpleNothashableFactory(), q, elementsToAddSingle, alwaysTrueFun, limit, resultFun, t) } @@ -262,12 +261,11 @@ func testPriorityQueueTwice[E IntConvertible](factory Factory[E], makePriorityQu return 3*index/2 - 50 } return (3*index - 101) / 2 - } else { - if index%2 == 1 { - return (3*index - 199) / 2 - } - return 3*index/2 - 100 } + if index%2 == 1 { + return (3*index - 199) / 2 + } + return 3*index/2 - 100 } testQueueTwice(factory, q, elementsToAddSingle, alwaysTrueFun, 2*elementsToAddSingle, resultFun, t) } @@ -354,12 +352,11 @@ func TestLimitPriorityHashLimitedPriorityHashQueueDuplicates(t *testing.T) { return 3*index - 40 } return 3*index - 41 - } else { - if index%2 == 0 { - return 3*index - 139 - } - return 3*index - 140 } + if index%2 == 0 { + return 3*index - 139 + } + return 3*index - 140 } testQueueBasicAddLengthPeekRemove(NewSimpleHashableFactory(), q, 3*elementsToAddFirstIteration, addFun, addResultFun, limit, resultFun, t) } diff --git a/packages/vm/core/governance/governanceimpl/chaininfo.go b/packages/vm/core/governance/governanceimpl/chaininfo.go index 91416cf704..5215d26ec4 100644 --- a/packages/vm/core/governance/governanceimpl/chaininfo.go +++ b/packages/vm/core/governance/governanceimpl/chaininfo.go @@ -19,7 +19,7 @@ func getChainInfo(ctx isc.SandboxView) dict.Dict { ret.Set(governance.VarGasFeePolicyBytes, info.GasFeePolicy.Bytes()) ret.Set(governance.VarGasLimitsBytes, info.GasLimits.Bytes()) - if len(info.PublicURL) > 0 { + if info.PublicURL != "" { ret.Set(governance.VarPublicURL, codec.EncodeString(info.PublicURL)) } diff --git a/packages/vm/core/governance/stateaccess.go b/packages/vm/core/governance/stateaccess.go index dee372fde2..43cea3e763 100644 --- a/packages/vm/core/governance/stateaccess.go +++ b/packages/vm/core/governance/stateaccess.go @@ -4,11 +4,14 @@ package governance import ( + "math/big" + "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/isc" "github.com/iotaledger/wasp/packages/kv" "github.com/iotaledger/wasp/packages/kv/codec" "github.com/iotaledger/wasp/packages/kv/subrealm" + "github.com/iotaledger/wasp/packages/parameters" ) type StateAccess struct { @@ -65,3 +68,7 @@ func (sa *StateAccess) ChainOwnerID() isc.AgentID { func (sa *StateAccess) GetBlockKeepAmount() int32 { return GetBlockKeepAmount(sa.state) } + +func (sa *StateAccess) DefaultGasPrice() *big.Int { + return MustGetGasFeePolicy(sa.state).DefaultGasPriceFullDecimals(parameters.L1().BaseToken.Decimals) +} diff --git a/packages/webapi/controllers/chain/waitforrequest.go b/packages/webapi/controllers/chain/waitforrequest.go index ef34da6332..783bdf07a9 100644 --- a/packages/webapi/controllers/chain/waitforrequest.go +++ b/packages/webapi/controllers/chain/waitforrequest.go @@ -32,7 +32,7 @@ func (c *Controller) waitForRequestToFinish(e echo.Context) error { timeout := defaultTimeoutSeconds * time.Second timeoutInSeconds := e.QueryParam("timeoutSeconds") - if len(timeoutInSeconds) > 0 { + if timeoutInSeconds != "" { parsedTimeout, _ := strconv.Atoi(timeoutInSeconds) if err != nil { diff --git a/tools/schema/model/yaml/convert.go b/tools/schema/model/yaml/convert.go index 8c310abb21..0632ed4eb9 100644 --- a/tools/schema/model/yaml/convert.go +++ b/tools/schema/model/yaml/convert.go @@ -85,10 +85,10 @@ func (n *Node) toStringElt() model.DefElt { func (n *Node) toDefElt() *model.DefElt { comment := "" - if len(n.HeadComment) > 0 { + if n.HeadComment != "" { // remove trailing '\n' and space comment = strings.TrimSpace(n.HeadComment) - } else if len(n.LineComment) > 0 { + } else if n.LineComment != "" { // remove trailing '\n' and space comment = strings.TrimSpace(n.LineComment) } @@ -126,9 +126,9 @@ func (n *Node) toDefMapMap() model.DefMapMap { continue } comment := "" - if len(yamlKey.HeadComment) > 0 { + if yamlKey.HeadComment != "" { comment = strings.TrimSpace(yamlKey.HeadComment) // remove trailing '\n' - } else if len(yamlKey.LineComment) > 0 { + } else if yamlKey.LineComment != "" { comment = strings.TrimSpace(yamlKey.LineComment) // remove trailing '\n' } @@ -146,9 +146,9 @@ func (n *Node) toDefMapMap() model.DefMapMap { func (n *Node) toFuncDef() model.FuncDef { def := model.FuncDef{} def.Line = n.Line - if len(n.HeadComment) > 0 { + if n.HeadComment != "" { def.Comment = strings.TrimSpace(n.HeadComment) // remove trailing '\n' - } else if len(n.LineComment) > 0 { + } else if n.LineComment != "" { def.Comment = strings.TrimSpace(n.LineComment) // remove trailing '\n' } @@ -164,9 +164,9 @@ func (n *Node) toFuncDef() model.FuncDef { return model.FuncDef{} } def.Access = *yamlKey.Contents[0].toDefElt() - if len(yamlKey.HeadComment) > 0 { + if yamlKey.HeadComment != "" { def.Access.Comment = strings.TrimSpace(yamlKey.HeadComment) // remove trailing '\n' - } else if len(yamlKey.LineComment) > 0 { + } else if yamlKey.LineComment != "" { def.Access.Comment = strings.TrimSpace(yamlKey.LineComment) // remove trailing '\n' } case KeyParams: @@ -188,9 +188,9 @@ func (n *Node) toFuncDefMap() model.FuncDefMap { continue } comment := "" - if len(yamlKey.HeadComment) > 0 { + if yamlKey.HeadComment != "" { comment = strings.TrimSpace(yamlKey.HeadComment) // remove trailing '\n' - } else if len(yamlKey.LineComment) > 0 { + } else if yamlKey.LineComment != "" { comment = strings.TrimSpace(yamlKey.LineComment) // remove trailing '\n' } key := model.DefElt{