diff --git a/packages/chain/cons/cons_gr/gr.go b/packages/chain/cons/cons_gr/gr.go index 8856b152d7..04a99167fd 100644 --- a/packages/chain/cons/cons_gr/gr.go +++ b/packages/chain/cons/cons_gr/gr.go @@ -36,7 +36,7 @@ const ( // Interfaces required from other components (MP, SM) type Mempool interface { - ConsensusProposalsAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef + ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef ConsensusRequestsAsync(ctx context.Context, requestRefs []*isc.RequestRef) <-chan []isc.Request } @@ -337,7 +337,7 @@ func (cgr *ConsGr) tryHandleOutput() { //nolint:gocyclo } output := outputUntyped.(*cons.Output) if output.NeedMempoolProposal != nil && !cgr.mempoolProposalsAsked { - cgr.mempoolProposalsRespCh = cgr.mempool.ConsensusProposalsAsync(cgr.ctx, output.NeedMempoolProposal) + cgr.mempoolProposalsRespCh = cgr.mempool.ConsensusProposalAsync(cgr.ctx, output.NeedMempoolProposal) cgr.mempoolProposalsAsked = true } if output.NeedMempoolRequests != nil && !cgr.mempoolRequestsAsked { diff --git a/packages/chain/cons/cons_gr/gr_test.go b/packages/chain/cons/cons_gr/gr_test.go index c479f27445..0c8677eb73 100644 --- a/packages/chain/cons/cons_gr/gr_test.go +++ b/packages/chain/cons/cons_gr/gr_test.go @@ -232,7 +232,7 @@ func (tmp *testMempool) tryRespondRequestQueries() { tmp.qRequests = remaining } -func (tmp *testMempool) ConsensusProposalsAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef { +func (tmp *testMempool) ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef { tmp.lock.Lock() defer tmp.lock.Unlock() outputID := aliasOutput.OutputID() diff --git a/packages/chain/mempool/mempool.go b/packages/chain/mempool/mempool.go index 16ae598136..e5aa033585 100644 --- a/packages/chain/mempool/mempool.go +++ b/packages/chain/mempool/mempool.go @@ -12,7 +12,7 @@ // the latest state for which it has provided the proposal. Let's say the mempool // has provided proposals for PrevAO (AO≡AliasOutput). // -// Upon reception of the proposal query (ConsensusProposalsAsync) for NextAO +// Upon reception of the proposal query (ConsensusProposalAsync) for NextAO // from the consensus, it asks the StateMgr for the virtual state VS(NextAO) // corresponding to the NextAO and a list of blocks that has to be reverted. // The state manager collects this information by finding a common ancestor of @@ -134,8 +134,8 @@ type mempoolImpl struct { accessNodes []*cryptolib.PublicKey committeeNodes []*cryptolib.PublicKey waitReq WaitReq - waitChainHead []*reqConsensusProposals - reqConsensusProposalsPipe pipe.Pipe[*reqConsensusProposals] + waitChainHead []*reqConsensusProposal + reqConsensusProposalPipe pipe.Pipe[*reqConsensusProposal] reqConsensusRequestsPipe pipe.Pipe[*reqConsensusRequests] reqReceiveOnLedgerRequestPipe pipe.Pipe[isc.OnLedgerRequest] reqReceiveOffLedgerRequestPipe pipe.Pipe[isc.OffLedgerRequest] @@ -166,12 +166,17 @@ type reqAccessNodesUpdated struct { accessNodePubKeys []*cryptolib.PublicKey } -type reqConsensusProposals struct { +type reqConsensusProposal struct { ctx context.Context aliasOutput *isc.AliasOutputWithID responseCh chan<- []*isc.RequestRef } +func (r *reqConsensusProposal) Respond(reqRefs []*isc.RequestRef) { + r.responseCh <- reqRefs + close(r.responseCh) +} + type reqConsensusRequests struct { ctx context.Context requestRefs []*isc.RequestRef @@ -210,8 +215,8 @@ func New( accessNodes: []*cryptolib.PublicKey{}, committeeNodes: []*cryptolib.PublicKey{}, waitReq: waitReq, - waitChainHead: []*reqConsensusProposals{}, - reqConsensusProposalsPipe: pipe.NewInfinitePipe[*reqConsensusProposals](), + waitChainHead: []*reqConsensusProposal{}, + reqConsensusProposalPipe: pipe.NewInfinitePipe[*reqConsensusProposal](), reqConsensusRequestsPipe: pipe.NewInfinitePipe[*reqConsensusRequests](), reqReceiveOnLedgerRequestPipe: pipe.NewInfinitePipe[isc.OnLedgerRequest](), reqReceiveOffLedgerRequestPipe: pipe.NewInfinitePipe[isc.OffLedgerRequest](), @@ -275,14 +280,14 @@ func (mpi *mempoolImpl) AccessNodesUpdated(committeePubKeys, accessNodePubKeys [ } } -func (mpi *mempoolImpl) ConsensusProposalsAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef { +func (mpi *mempoolImpl) ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef { res := make(chan []*isc.RequestRef, 1) - req := &reqConsensusProposals{ + req := &reqConsensusProposal{ ctx: ctx, aliasOutput: aliasOutput, responseCh: res, } - mpi.reqConsensusProposalsPipe.In() <- req + mpi.reqConsensusProposalPipe.In() <- req return res } @@ -300,7 +305,7 @@ func (mpi *mempoolImpl) ConsensusRequestsAsync(ctx context.Context, requestRefs func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc) { //nolint:gocyclo serverNodesUpdatedPipeOutCh := mpi.serverNodesUpdatedPipe.Out() accessNodesUpdatedPipeOutCh := mpi.accessNodesUpdatedPipe.Out() - reqConsensusProposalsPipeOutCh := mpi.reqConsensusProposalsPipe.Out() + reqConsensusProposalPipeOutCh := mpi.reqConsensusProposalPipe.Out() reqConsensusRequestsPipeOutCh := mpi.reqConsensusRequestsPipe.Out() reqReceiveOnLedgerRequestPipeOutCh := mpi.reqReceiveOnLedgerRequestPipe.Out() reqReceiveOffLedgerRequestPipeOutCh := mpi.reqReceiveOffLedgerRequestPipe.Out() @@ -324,12 +329,12 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc) break } mpi.handleAccessNodesUpdated(recv) - case recv, ok := <-reqConsensusProposalsPipeOutCh: + case recv, ok := <-reqConsensusProposalPipeOutCh: if !ok { - reqConsensusProposalsPipeOutCh = nil + reqConsensusProposalPipeOutCh = nil break } - mpi.handleConsensusProposals(recv) + mpi.handleConsensusProposal(recv) case recv, ok := <-reqConsensusRequestsPipeOutCh: if !ok { reqConsensusRequestsPipeOutCh = nil @@ -375,7 +380,7 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc) case <-ctx.Done(): // mpi.serverNodesUpdatedPipe.Close() // TODO: Causes panic: send on closed channel // mpi.accessNodesUpdatedPipe.Close() - // mpi.reqConsensusProposalsPipe.Close() + // mpi.reqConsensusProposalPipe.Close() // mpi.reqConsensusRequestsPipe.Close() // mpi.reqReceiveOnLedgerRequestPipe.Close() // mpi.reqReceiveOffLedgerRequestPipe.Close() @@ -468,17 +473,17 @@ func (mpi *mempoolImpl) handleAccessNodesUpdated(recv *reqAccessNodesUpdated) { // This implementation only tracks a single branch. So, we will only respond // to the request matching the TrackNewChainHead call. -func (mpi *mempoolImpl) handleConsensusProposals(recv *reqConsensusProposals) { +func (mpi *mempoolImpl) handleConsensusProposal(recv *reqConsensusProposal) { if mpi.chainHeadAO == nil || !recv.aliasOutput.Equals(mpi.chainHeadAO) { - mpi.log.Debugf("handleConsensusProposals, have to wait for chain head to become %v", recv.aliasOutput) + mpi.log.Debugf("handleConsensusProposal, have to wait for chain head to become %v", recv.aliasOutput) mpi.waitChainHead = append(mpi.waitChainHead, recv) return } - mpi.log.Debugf("handleConsensusProposals, already have the chain head %v", recv.aliasOutput) - mpi.handleConsensusProposalsForChainHead(recv) + mpi.log.Debugf("handleConsensusProposal, already have the chain head %v", recv.aliasOutput) + mpi.handleConsensusProposalForChainHead(recv) } -func (mpi *mempoolImpl) handleConsensusProposalsForChainHead(recv *reqConsensusProposals) { +func (mpi *mempoolImpl) handleConsensusProposalForChainHead(recv *reqConsensusProposal) { // // The case for matching ChainHeadAO and request BaseAO reqRefs := []*isc.RequestRef{} @@ -498,15 +503,13 @@ func (mpi *mempoolImpl) handleConsensusProposalsForChainHead(recv *reqConsensusP return true // Keep them for now }) if len(reqRefs) > 0 { - recv.responseCh <- reqRefs - close(recv.responseCh) + recv.Respond(reqRefs) return } // // Wait for any request. mpi.waitReq.WaitAny(recv.ctx, func(req isc.Request) { - recv.responseCh <- []*isc.RequestRef{isc.RequestRefFromRequest(req)} - close(recv.responseCh) + recv.Respond([]*isc.RequestRef{isc.RequestRefFromRequest(req)}) }) } @@ -682,13 +685,13 @@ func (mpi *mempoolImpl) handleTrackNewChainHead(req *reqTrackNewChainHead) { // // Process the pending consensus proposal requests if any. if len(mpi.waitChainHead) != 0 { - newWaitChainHead := []*reqConsensusProposals{} + newWaitChainHead := []*reqConsensusProposal{} for i, waiting := range mpi.waitChainHead { if waiting.ctx.Err() != nil { continue // Drop it. } if waiting.aliasOutput.Equals(mpi.chainHeadAO) { - mpi.handleConsensusProposalsForChainHead(waiting) + mpi.handleConsensusProposalForChainHead(waiting) continue // Drop it from wait queue. } newWaitChainHead = append(newWaitChainHead, mpi.waitChainHead[i]) diff --git a/packages/chain/mempool/mempool_test.go b/packages/chain/mempool/mempool_test.go index a97b7e864b..4031e3662e 100644 --- a/packages/chain/mempool/mempool_test.go +++ b/packages/chain/mempool/mempool_test.go @@ -111,7 +111,7 @@ func testMempoolBasic(t *testing.T, n, f int, reliable bool) { t.Log("Ask for proposals") proposals := make([]<-chan []*isc.RequestRef, len(te.mempools)) for i, node := range te.mempools { - proposals[i] = node.ConsensusProposalsAsync(te.ctx, te.originAO) + proposals[i] = node.ConsensusProposalAsync(te.ctx, te.originAO) } t.Log("Wait for proposals and ask for decided requests") decided := make([]<-chan []isc.Request, len(te.mempools)) @@ -161,7 +161,7 @@ func testMempoolBasic(t *testing.T, n, f int, reliable bool) { // Ask proposals for the next proposals = make([]<-chan []*isc.RequestRef, len(te.mempools)) for i := range te.mempools { - proposals[i] = te.mempools[i].ConsensusProposalsAsync(te.ctx, nextAO) // Intentionally invalid order (vs TrackNewChainHead). + proposals[i] = te.mempools[i].ConsensusProposalAsync(te.ctx, nextAO) // Intentionally invalid order (vs TrackNewChainHead). te.mempools[i].TrackNewChainHead(chainState, te.originAO, nextAO, []state.Block{block}, []state.Block{}) } // @@ -261,7 +261,7 @@ func testTimeLock(t *testing.T, n, f int, reliable bool) { //nolint:gocyclo // Check, if requests are proposed. time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed. for _, mp := range te.mempools { - reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO) + reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO) require.Len(t, reqs, 3) require.Contains(t, reqs, reqRefs[0]) require.Contains(t, reqs, reqRefs[1]) @@ -276,7 +276,7 @@ func testTimeLock(t *testing.T, n, f int, reliable bool) { //nolint:gocyclo } time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed. for _, mp := range te.mempools { - reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO) + reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO) require.Len(t, reqs, 3) require.Contains(t, reqs, reqRefs[0]) require.Contains(t, reqs, reqRefs[1]) @@ -289,7 +289,7 @@ func testTimeLock(t *testing.T, n, f int, reliable bool) { //nolint:gocyclo } time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed. for _, mp := range te.mempools { - reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO) + reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO) require.Len(t, reqs, 4) require.Contains(t, reqs, reqRefs[0]) require.Contains(t, reqs, reqRefs[1]) @@ -303,7 +303,7 @@ func testTimeLock(t *testing.T, n, f int, reliable bool) { //nolint:gocyclo } time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed. for _, mp := range te.mempools { - reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO) + reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO) require.Len(t, reqs, 5) require.Contains(t, reqs, reqRefs[0]) require.Contains(t, reqs, reqRefs[1]) @@ -376,7 +376,7 @@ func testExpiration(t *testing.T, n, f int, reliable bool) { // Check, if requests are proposed. time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed. for _, mp := range te.mempools { - reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO) + reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO) require.Len(t, reqs, 2) require.Contains(t, reqs, reqRefs[0]) require.Contains(t, reqs, reqRefs[3]) @@ -388,7 +388,7 @@ func testExpiration(t *testing.T, n, f int, reliable bool) { } time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed. for _, mp := range te.mempools { - reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO) + reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO) require.Len(t, reqs, 1) require.Contains(t, reqs, reqRefs[0]) }