Skip to content

Commit

Permalink
Just a rename.
Browse files Browse the repository at this point in the history
  • Loading branch information
kape1395 committed Apr 25, 2023
1 parent 4bb86d5 commit f4d0a96
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 36 deletions.
4 changes: 2 additions & 2 deletions packages/chain/cons/cons_gr/gr.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// Interfaces required from other components (MP, SM)

type Mempool interface {
ConsensusProposalsAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef
ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef
ConsensusRequestsAsync(ctx context.Context, requestRefs []*isc.RequestRef) <-chan []isc.Request
}

Expand Down Expand Up @@ -337,7 +337,7 @@ func (cgr *ConsGr) tryHandleOutput() { //nolint:gocyclo
}
output := outputUntyped.(*cons.Output)
if output.NeedMempoolProposal != nil && !cgr.mempoolProposalsAsked {
cgr.mempoolProposalsRespCh = cgr.mempool.ConsensusProposalsAsync(cgr.ctx, output.NeedMempoolProposal)
cgr.mempoolProposalsRespCh = cgr.mempool.ConsensusProposalAsync(cgr.ctx, output.NeedMempoolProposal)
cgr.mempoolProposalsAsked = true
}
if output.NeedMempoolRequests != nil && !cgr.mempoolRequestsAsked {
Expand Down
2 changes: 1 addition & 1 deletion packages/chain/cons/cons_gr/gr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (tmp *testMempool) tryRespondRequestQueries() {
tmp.qRequests = remaining
}

func (tmp *testMempool) ConsensusProposalsAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef {
func (tmp *testMempool) ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef {
tmp.lock.Lock()
defer tmp.lock.Unlock()
outputID := aliasOutput.OutputID()
Expand Down
53 changes: 28 additions & 25 deletions packages/chain/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// the latest state for which it has provided the proposal. Let's say the mempool
// has provided proposals for PrevAO (AO≡AliasOutput).
//
// Upon reception of the proposal query (ConsensusProposalsAsync) for NextAO
// Upon reception of the proposal query (ConsensusProposalAsync) for NextAO
// from the consensus, it asks the StateMgr for the virtual state VS(NextAO)
// corresponding to the NextAO and a list of blocks that has to be reverted.
// The state manager collects this information by finding a common ancestor of
Expand Down Expand Up @@ -134,8 +134,8 @@ type mempoolImpl struct {
accessNodes []*cryptolib.PublicKey
committeeNodes []*cryptolib.PublicKey
waitReq WaitReq
waitChainHead []*reqConsensusProposals
reqConsensusProposalsPipe pipe.Pipe[*reqConsensusProposals]
waitChainHead []*reqConsensusProposal
reqConsensusProposalPipe pipe.Pipe[*reqConsensusProposal]
reqConsensusRequestsPipe pipe.Pipe[*reqConsensusRequests]
reqReceiveOnLedgerRequestPipe pipe.Pipe[isc.OnLedgerRequest]
reqReceiveOffLedgerRequestPipe pipe.Pipe[isc.OffLedgerRequest]
Expand Down Expand Up @@ -166,12 +166,17 @@ type reqAccessNodesUpdated struct {
accessNodePubKeys []*cryptolib.PublicKey
}

type reqConsensusProposals struct {
type reqConsensusProposal struct {
ctx context.Context
aliasOutput *isc.AliasOutputWithID
responseCh chan<- []*isc.RequestRef
}

func (r *reqConsensusProposal) Respond(reqRefs []*isc.RequestRef) {
r.responseCh <- reqRefs
close(r.responseCh)
}

type reqConsensusRequests struct {
ctx context.Context
requestRefs []*isc.RequestRef
Expand Down Expand Up @@ -210,8 +215,8 @@ func New(
accessNodes: []*cryptolib.PublicKey{},
committeeNodes: []*cryptolib.PublicKey{},
waitReq: waitReq,
waitChainHead: []*reqConsensusProposals{},
reqConsensusProposalsPipe: pipe.NewInfinitePipe[*reqConsensusProposals](),
waitChainHead: []*reqConsensusProposal{},
reqConsensusProposalPipe: pipe.NewInfinitePipe[*reqConsensusProposal](),
reqConsensusRequestsPipe: pipe.NewInfinitePipe[*reqConsensusRequests](),
reqReceiveOnLedgerRequestPipe: pipe.NewInfinitePipe[isc.OnLedgerRequest](),
reqReceiveOffLedgerRequestPipe: pipe.NewInfinitePipe[isc.OffLedgerRequest](),
Expand Down Expand Up @@ -275,14 +280,14 @@ func (mpi *mempoolImpl) AccessNodesUpdated(committeePubKeys, accessNodePubKeys [
}
}

func (mpi *mempoolImpl) ConsensusProposalsAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef {
func (mpi *mempoolImpl) ConsensusProposalAsync(ctx context.Context, aliasOutput *isc.AliasOutputWithID) <-chan []*isc.RequestRef {
res := make(chan []*isc.RequestRef, 1)
req := &reqConsensusProposals{
req := &reqConsensusProposal{
ctx: ctx,
aliasOutput: aliasOutput,
responseCh: res,
}
mpi.reqConsensusProposalsPipe.In() <- req
mpi.reqConsensusProposalPipe.In() <- req
return res
}

Expand All @@ -300,7 +305,7 @@ func (mpi *mempoolImpl) ConsensusRequestsAsync(ctx context.Context, requestRefs
func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc) { //nolint:gocyclo
serverNodesUpdatedPipeOutCh := mpi.serverNodesUpdatedPipe.Out()
accessNodesUpdatedPipeOutCh := mpi.accessNodesUpdatedPipe.Out()
reqConsensusProposalsPipeOutCh := mpi.reqConsensusProposalsPipe.Out()
reqConsensusProposalPipeOutCh := mpi.reqConsensusProposalPipe.Out()
reqConsensusRequestsPipeOutCh := mpi.reqConsensusRequestsPipe.Out()
reqReceiveOnLedgerRequestPipeOutCh := mpi.reqReceiveOnLedgerRequestPipe.Out()
reqReceiveOffLedgerRequestPipeOutCh := mpi.reqReceiveOffLedgerRequestPipe.Out()
Expand All @@ -324,12 +329,12 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc)
break
}
mpi.handleAccessNodesUpdated(recv)
case recv, ok := <-reqConsensusProposalsPipeOutCh:
case recv, ok := <-reqConsensusProposalPipeOutCh:
if !ok {
reqConsensusProposalsPipeOutCh = nil
reqConsensusProposalPipeOutCh = nil
break
}
mpi.handleConsensusProposals(recv)
mpi.handleConsensusProposal(recv)
case recv, ok := <-reqConsensusRequestsPipeOutCh:
if !ok {
reqConsensusRequestsPipeOutCh = nil
Expand Down Expand Up @@ -375,7 +380,7 @@ func (mpi *mempoolImpl) run(ctx context.Context, cleanupFunc context.CancelFunc)
case <-ctx.Done():
// mpi.serverNodesUpdatedPipe.Close() // TODO: Causes panic: send on closed channel
// mpi.accessNodesUpdatedPipe.Close()
// mpi.reqConsensusProposalsPipe.Close()
// mpi.reqConsensusProposalPipe.Close()
// mpi.reqConsensusRequestsPipe.Close()
// mpi.reqReceiveOnLedgerRequestPipe.Close()
// mpi.reqReceiveOffLedgerRequestPipe.Close()
Expand Down Expand Up @@ -468,17 +473,17 @@ func (mpi *mempoolImpl) handleAccessNodesUpdated(recv *reqAccessNodesUpdated) {

// This implementation only tracks a single branch. So, we will only respond
// to the request matching the TrackNewChainHead call.
func (mpi *mempoolImpl) handleConsensusProposals(recv *reqConsensusProposals) {
func (mpi *mempoolImpl) handleConsensusProposal(recv *reqConsensusProposal) {
if mpi.chainHeadAO == nil || !recv.aliasOutput.Equals(mpi.chainHeadAO) {
mpi.log.Debugf("handleConsensusProposals, have to wait for chain head to become %v", recv.aliasOutput)
mpi.log.Debugf("handleConsensusProposal, have to wait for chain head to become %v", recv.aliasOutput)
mpi.waitChainHead = append(mpi.waitChainHead, recv)
return
}
mpi.log.Debugf("handleConsensusProposals, already have the chain head %v", recv.aliasOutput)
mpi.handleConsensusProposalsForChainHead(recv)
mpi.log.Debugf("handleConsensusProposal, already have the chain head %v", recv.aliasOutput)
mpi.handleConsensusProposalForChainHead(recv)
}

func (mpi *mempoolImpl) handleConsensusProposalsForChainHead(recv *reqConsensusProposals) {
func (mpi *mempoolImpl) handleConsensusProposalForChainHead(recv *reqConsensusProposal) {
//
// The case for matching ChainHeadAO and request BaseAO
reqRefs := []*isc.RequestRef{}
Expand All @@ -498,15 +503,13 @@ func (mpi *mempoolImpl) handleConsensusProposalsForChainHead(recv *reqConsensusP
return true // Keep them for now
})
if len(reqRefs) > 0 {
recv.responseCh <- reqRefs
close(recv.responseCh)
recv.Respond(reqRefs)
return
}
//
// Wait for any request.
mpi.waitReq.WaitAny(recv.ctx, func(req isc.Request) {
recv.responseCh <- []*isc.RequestRef{isc.RequestRefFromRequest(req)}
close(recv.responseCh)
recv.Respond([]*isc.RequestRef{isc.RequestRefFromRequest(req)})
})
}

Expand Down Expand Up @@ -682,13 +685,13 @@ func (mpi *mempoolImpl) handleTrackNewChainHead(req *reqTrackNewChainHead) {
//
// Process the pending consensus proposal requests if any.
if len(mpi.waitChainHead) != 0 {
newWaitChainHead := []*reqConsensusProposals{}
newWaitChainHead := []*reqConsensusProposal{}
for i, waiting := range mpi.waitChainHead {
if waiting.ctx.Err() != nil {
continue // Drop it.
}
if waiting.aliasOutput.Equals(mpi.chainHeadAO) {
mpi.handleConsensusProposalsForChainHead(waiting)
mpi.handleConsensusProposalForChainHead(waiting)
continue // Drop it from wait queue.
}
newWaitChainHead = append(newWaitChainHead, mpi.waitChainHead[i])
Expand Down
16 changes: 8 additions & 8 deletions packages/chain/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func testMempoolBasic(t *testing.T, n, f int, reliable bool) {
t.Log("Ask for proposals")
proposals := make([]<-chan []*isc.RequestRef, len(te.mempools))
for i, node := range te.mempools {
proposals[i] = node.ConsensusProposalsAsync(te.ctx, te.originAO)
proposals[i] = node.ConsensusProposalAsync(te.ctx, te.originAO)
}
t.Log("Wait for proposals and ask for decided requests")
decided := make([]<-chan []isc.Request, len(te.mempools))
Expand Down Expand Up @@ -161,7 +161,7 @@ func testMempoolBasic(t *testing.T, n, f int, reliable bool) {
// Ask proposals for the next
proposals = make([]<-chan []*isc.RequestRef, len(te.mempools))
for i := range te.mempools {
proposals[i] = te.mempools[i].ConsensusProposalsAsync(te.ctx, nextAO) // Intentionally invalid order (vs TrackNewChainHead).
proposals[i] = te.mempools[i].ConsensusProposalAsync(te.ctx, nextAO) // Intentionally invalid order (vs TrackNewChainHead).
te.mempools[i].TrackNewChainHead(chainState, te.originAO, nextAO, []state.Block{block}, []state.Block{})
}
//
Expand Down Expand Up @@ -261,7 +261,7 @@ func testTimeLock(t *testing.T, n, f int, reliable bool) { //nolint:gocyclo
// Check, if requests are proposed.
time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed.
for _, mp := range te.mempools {
reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO)
reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO)
require.Len(t, reqs, 3)
require.Contains(t, reqs, reqRefs[0])
require.Contains(t, reqs, reqRefs[1])
Expand All @@ -276,7 +276,7 @@ func testTimeLock(t *testing.T, n, f int, reliable bool) { //nolint:gocyclo
}
time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed.
for _, mp := range te.mempools {
reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO)
reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO)
require.Len(t, reqs, 3)
require.Contains(t, reqs, reqRefs[0])
require.Contains(t, reqs, reqRefs[1])
Expand All @@ -289,7 +289,7 @@ func testTimeLock(t *testing.T, n, f int, reliable bool) { //nolint:gocyclo
}
time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed.
for _, mp := range te.mempools {
reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO)
reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO)
require.Len(t, reqs, 4)
require.Contains(t, reqs, reqRefs[0])
require.Contains(t, reqs, reqRefs[1])
Expand All @@ -303,7 +303,7 @@ func testTimeLock(t *testing.T, n, f int, reliable bool) { //nolint:gocyclo
}
time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed.
for _, mp := range te.mempools {
reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO)
reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO)
require.Len(t, reqs, 5)
require.Contains(t, reqs, reqRefs[0])
require.Contains(t, reqs, reqRefs[1])
Expand Down Expand Up @@ -376,7 +376,7 @@ func testExpiration(t *testing.T, n, f int, reliable bool) {
// Check, if requests are proposed.
time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed.
for _, mp := range te.mempools {
reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO)
reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO)
require.Len(t, reqs, 2)
require.Contains(t, reqs, reqRefs[0])
require.Contains(t, reqs, reqRefs[3])
Expand All @@ -388,7 +388,7 @@ func testExpiration(t *testing.T, n, f int, reliable bool) {
}
time.Sleep(100 * time.Millisecond) // Just to make sure all the events have been consumed.
for _, mp := range te.mempools {
reqs := <-mp.ConsensusProposalsAsync(te.ctx, te.originAO)
reqs := <-mp.ConsensusProposalAsync(te.ctx, te.originAO)
require.Len(t, reqs, 1)
require.Contains(t, reqs, reqRefs[0])
}
Expand Down

0 comments on commit f4d0a96

Please sign in to comment.