Skip to content

Commit

Permalink
Merge pull request #2951 from jorgemmsilva/feat/mempool-ttl
Browse files Browse the repository at this point in the history
Feat: mempool TTL
  • Loading branch information
fijter authored Oct 17, 2023
2 parents 027bd0c + c581358 commit 510015e
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 61 deletions.
1 change: 1 addition & 0 deletions components/chains/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func provide(c *dig.Container) error {
deps.NodeIdentityProvider,
deps.ConsensusStateRegistry,
deps.ChainListener,
ParamsChains.MempoolTTL,
shutdown.NewCoordinator("chains", Component.Logger().Named("Shutdown")),
deps.ChainMetricsProvider,
),
Expand Down
1 change: 1 addition & 0 deletions components/chains/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ParametersChains struct {
PrintStatusPeriod time.Duration `default:"3s" usage:"the period to print consensus instance status."`
ConsensusInstsInAdvance int `default:"3" usage:""`
AwaitReceiptCleanupEvery int `default:"100" usage:"for every this number AwaitReceipt will be cleaned up"`
MempoolTTL time.Duration `default:"24h" usage:"Time that requests are allowed to sit in the mempool without being processed"`
}

type ParametersWAL struct {
Expand Down
17 changes: 15 additions & 2 deletions packages/chain/cons/cons_gr/gr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
"go.uber.org/atomic"

"github.com/iotaledger/hive.go/logger"
iotago "github.com/iotaledger/iota.go/v3"
"github.com/iotaledger/wasp/packages/chain/cmt_log"
"github.com/iotaledger/wasp/packages/chain/cons"
"github.com/iotaledger/wasp/packages/cryptolib"
"github.com/iotaledger/wasp/packages/gpa"
"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/kv/codec"
"github.com/iotaledger/wasp/packages/metrics"
"github.com/iotaledger/wasp/packages/peering"
"github.com/iotaledger/wasp/packages/state"
Expand All @@ -35,8 +37,17 @@ const (
////////////////////////////////////////////////////////////////////////////////
// Interfaces required from other components (MP, SM)

type ConsensusID [iotago.Ed25519AddressBytesLength + 4]byte

func NewConsensusID(cmtAddr *iotago.Ed25519Address, logIndex *cmt_log.LogIndex) ConsensusID {
ret := ConsensusID{}
copy(ret[:], isc.AddressToBytes(cmtAddr)[1:]) // remove the byte kind prefix
copy(ret[iotago.Ed25519AddressBytesLength:], codec.EncodeUint32(logIndex.AsUint32()))
return ret
}

type Mempool interface {
ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef
ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID, consensusID ConsensusID) <-chan []*isc.RequestRef
ConsensusRequestsAsync(ctx context.Context, requestRefs []*isc.RequestRef) <-chan []isc.Request
}

Expand Down Expand Up @@ -117,6 +128,7 @@ type ConsGr struct {
netPeerPubs map[gpa.NodeID]*cryptolib.PublicKey
netDisconnect context.CancelFunc
net peering.NetworkProvider
consensusID ConsensusID
ctx context.Context
pipeMetrics *metrics.ChainPipeMetrics
log *logger.Logger
Expand Down Expand Up @@ -165,6 +177,7 @@ func New(
netPeerPubs: netPeerPubs,
netDisconnect: nil, // Set bellow.
net: net,
consensusID: NewConsensusID(cmtPubKey.AsEd25519Address(), logIndex),
ctx: ctx,
pipeMetrics: pipeMetrics,
log: log,
Expand Down Expand Up @@ -357,7 +370,7 @@ func (cgr *ConsGr) tryHandleOutput() { //nolint:gocyclo
}
output := outputUntyped.(*cons.Output)
if output.NeedMempoolProposal != nil && !cgr.mempoolProposalsAsked {
cgr.mempoolProposalsRespCh = cgr.mempool.ConsensusProposalAsync(cgr.ctx, output.NeedMempoolProposal)
cgr.mempoolProposalsRespCh = cgr.mempool.ConsensusProposalAsync(cgr.ctx, output.NeedMempoolProposal, cgr.consensusID)
cgr.mempoolProposalsAsked = true
}
if output.NeedMempoolRequests != nil && !cgr.mempoolRequestsAsked {
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/cons/cons_gr/gr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (tmp *testMempool) tryRespondRequestQueries() {
tmp.qRequests = remaining
}

func (tmp *testMempool) ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef {
func (tmp *testMempool) ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID, consensusID consGR.ConsensusID) <-chan []*isc.RequestRef {
tmp.lock.Lock()
defer tmp.lock.Unlock()
outputID := aliasOutput.OutputID()
Expand Down
84 changes: 63 additions & 21 deletions packages/chain/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type Mempool interface {
// These nodes should be used to disseminate the off-ledger requests.
ServerNodesUpdated(committeePubKeys []*cryptolib.PublicKey, serverNodePubKeys []*cryptolib.PublicKey)
AccessNodesUpdated(committeePubKeys []*cryptolib.PublicKey, accessNodePubKeys []*cryptolib.PublicKey)
ConsensusInstancesUpdated(activeConsensusInstances []consGR.ConsensusID)

GetContents() io.Reader
}

Expand All @@ -116,6 +118,7 @@ type RequestPool[V isc.Request] interface {
Remove(request V)
// this removes requests from the pool if predicate returns false
Filter(predicate func(request V, ts time.Time) bool)
Iterate(f func(e *typedPoolEntry[V]))
StatusString() string
WriteContent(io.Writer)
}
Expand All @@ -140,6 +143,8 @@ type mempoolImpl struct {
accessNodesUpdatedPipe pipe.Pipe[*reqAccessNodesUpdated]
accessNodes []*cryptolib.PublicKey
committeeNodes []*cryptolib.PublicKey
consensusInstancesUpdatedPipe pipe.Pipe[*reqConsensusInstancesUpdated]
consensusInstances []consGR.ConsensusID
waitReq WaitReq
waitChainHead []*reqConsensusProposal
reqConsensusProposalPipe pipe.Pipe[*reqConsensusProposal]
Expand All @@ -152,6 +157,8 @@ type mempoolImpl struct {
netPeeringID peering.PeeringID
netPeerPubs map[gpa.NodeID]*cryptolib.PublicKey
net peering.NetworkProvider
activeConsensusInstances []consGR.ConsensusID
ttl time.Duration // time to live (how much time requests are allowed to sit in the pool without being processed)
log *logger.Logger
metrics *metrics.ChainMempoolMetrics
listener ChainListener
Expand All @@ -173,9 +180,14 @@ type reqAccessNodesUpdated struct {
accessNodePubKeys []*cryptolib.PublicKey
}

type reqConsensusInstancesUpdated struct {
activeConsensusInstances []consGR.ConsensusID
}

type reqConsensusProposal struct {
ctx context.Context
aliasOutput *isc.AliasOutputWithID
consensusID consGR.ConsensusID
responseCh chan<- []*isc.RequestRef
}

Expand Down Expand Up @@ -208,6 +220,7 @@ func New(
metrics *metrics.ChainMempoolMetrics,
pipeMetrics *metrics.ChainPipeMetrics,
listener ChainListener,
ttl time.Duration,
) Mempool {
netPeeringID := peering.HashPeeringIDFromBytes(chainID.Bytes(), []byte("Mempool")) // ChainID × Mempool
waitReq := NewWaitReq(waitRequestCleanupEvery)
Expand Down Expand Up @@ -235,6 +248,9 @@ func New(
netPeeringID: netPeeringID,
netPeerPubs: map[gpa.NodeID]*cryptolib.PublicKey{},
net: net,
consensusInstancesUpdatedPipe: pipe.NewInfinitePipe[*reqConsensusInstancesUpdated](),
activeConsensusInstances: []consGR.ConsensusID{},
ttl: ttl,
log: log,
metrics: metrics,
listener: listener,
Expand Down Expand Up @@ -306,23 +322,18 @@ func (mpi *mempoolImpl) AccessNodesUpdated(committeePubKeys, accessNodePubKeys [
}
}

func (mpi *mempoolImpl) writeContentAndClose(pw *io.PipeWriter) {
defer pw.Close()
mpi.onLedgerPool.WriteContent(pw)
mpi.offLedgerPool.WriteContent(pw)
}

func (mpi *mempoolImpl) GetContents() io.Reader {
pr, pw := io.Pipe()
go mpi.writeContentAndClose(pw)
return pr
func (mpi *mempoolImpl) ConsensusInstancesUpdated(activeConsensusInstances []consGR.ConsensusID) {
mpi.consensusInstancesUpdatedPipe.In() <- &reqConsensusInstancesUpdated{
activeConsensusInstances: activeConsensusInstances,
}
}

func (mpi *mempoolImpl) ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef {
func (mpi *mempoolImpl) ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID, consensusID consGR.ConsensusID) <-chan []*isc.RequestRef {
res := make(chan []*isc.RequestRef, 1)
req := &reqConsensusProposal{
ctx: ctx,
aliasOutput: aliasOutput,
consensusID: consensusID,
responseCh: res,
}
mpi.reqConsensusProposalPipe.In() <- req
Expand All @@ -340,9 +351,22 @@ func (mpi *mempoolImpl) ConsensusRequestsAsync(ctx context.Context, requestRefs
return res
}

func (mpi *mempoolImpl) writeContentAndClose(pw *io.PipeWriter) {
defer pw.Close()
mpi.onLedgerPool.WriteContent(pw)
mpi.offLedgerPool.WriteContent(pw)
}

func (mpi *mempoolImpl) GetContents() io.Reader {
pr, pw := io.Pipe()
go mpi.writeContentAndClose(pw)
return pr
}

func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc) { //nolint:gocyclo
serverNodesUpdatedPipeOutCh := mpi.serverNodesUpdatedPipe.Out()
accessNodesUpdatedPipeOutCh := mpi.accessNodesUpdatedPipe.Out()
consensusInstancesUpdatedPipeOutCh := mpi.consensusInstancesUpdatedPipe.Out()
reqConsensusProposalPipeOutCh := mpi.reqConsensusProposalPipe.Out()
reqConsensusRequestsPipeOutCh := mpi.reqConsensusRequestsPipe.Out()
reqReceiveOnLedgerRequestPipeOutCh := mpi.reqReceiveOnLedgerRequestPipe.Out()
Expand Down Expand Up @@ -403,6 +427,11 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc)
break
}
mpi.handleTrackNewChainHead(recv)
case recv, ok := <-consensusInstancesUpdatedPipeOutCh:
if !ok {
break
}
mpi.activeConsensusInstances = recv.activeConsensusInstances
case recv, ok := <-netRecvPipeOutCh:
if !ok {
netRecvPipeOutCh = nil
Expand Down Expand Up @@ -544,7 +573,7 @@ func (mpi *mempoolImpl) handleConsensusProposal(recv *reqConsensusProposal) {
mpi.handleConsensusProposalForChainHead(recv)
}

func (mpi *mempoolImpl) refsToPropose() []*isc.RequestRef {
func (mpi *mempoolImpl) refsToPropose(consensusID consGR.ConsensusID) []*isc.RequestRef {
//
// The case for matching ChainHeadAO and request BaseAO
reqRefs := []*isc.RequestRef{}
Expand All @@ -567,22 +596,36 @@ func (mpi *mempoolImpl) refsToPropose() []*isc.RequestRef {
}
accountNonce := mpi.nonce(agentID)
for _, e := range entries {
reqNonce := e.req.Nonce()
if reqNonce < accountNonce {
// nonce too old, delete
mpi.log.Debugf("refsToPropose, account: %s, removing request (%s) with old nonce (%d) from the pool", account, e.req.ID(), e.req.Nonce())
mpi.offLedgerPool.Remove(e.req)
if time.Since(e.ts) > mpi.ttl { // stop proposing after TTL
if !lo.Some(mpi.consensusInstances, e.proposedFor) {
// request not used in active consensus anymore, remove it
mpi.log.Debugf("refsToPropose, request TTL expired, removing: %s", e.req.ID().String())
mpi.offLedgerPool.Remove(e.req)
continue
}
mpi.log.Debugf("refsToPropose, request TTL expired, skipping: %s", e.req.ID().String())
continue
}

if e.old {
// this request was marked as "old", do not propose it
mpi.log.Debugf("refsToPropose, account: %s, skipping old request: %s", account, e.req.ID().String())
continue
}

reqNonce := e.req.Nonce()
if reqNonce < accountNonce {
// nonce too old, delete
mpi.log.Debugf("refsToPropose, account: %s, removing request (%s) with old nonce (%d) from the pool", account, e.req.ID(), e.req.Nonce())
mpi.offLedgerPool.Remove(e.req)
continue
}

if reqNonce == accountNonce {
// expected nonce, add it to the list to propose
mpi.log.Debugf("refsToPropose, account: %s, proposing reqID %s with nonce: %d", account, e.req.ID().String(), e.req.Nonce())
reqRefs = append(reqRefs, isc.RequestRefFromRequest(e.req))
e.proposedFor = append(e.proposedFor, consensusID)
accountNonce++ // increment the account nonce to match the next valid request
}
if reqNonce > accountNonce {
Expand All @@ -596,7 +639,7 @@ func (mpi *mempoolImpl) refsToPropose() []*isc.RequestRef {
}

func (mpi *mempoolImpl) handleConsensusProposalForChainHead(recv *reqConsensusProposal) {
refs := mpi.refsToPropose()
refs := mpi.refsToPropose(recv.consensusID)
if len(refs) > 0 {
recv.Respond(refs)
return
Expand Down Expand Up @@ -730,9 +773,8 @@ func (mpi *mempoolImpl) handleTangleTimeUpdated(tangleTime time.Time) {
//
// Notify existing on-ledger requests if that's first time update.
if oldTangleTime.IsZero() {
mpi.onLedgerPool.Filter(func(request isc.OnLedgerRequest, ts time.Time) bool {
mpi.waitReq.MarkAvailable(request)
return true
mpi.onLedgerPool.Iterate(func(e *typedPoolEntry[isc.OnLedgerRequest]) {
mpi.waitReq.MarkAvailable(e.req)
})
}
}
Expand Down
Loading

0 comments on commit 510015e

Please sign in to comment.