diff --git a/packages/chain/mempool/distsync/dist_sync.go b/packages/chain/mempool/distsync/dist_sync.go index e3df80f5c9..b7d09b1860 100644 --- a/packages/chain/mempool/distsync/dist_sync.go +++ b/packages/chain/mempool/distsync/dist_sync.go @@ -39,7 +39,7 @@ type distSyncImpl struct { accessNodes []gpa.NodeID // Maybe is not needed? Lets keep it until the redesign. committeeNodes []gpa.NodeID // Subset of serverNodes and accessNodes. requestNeededCB func(*isc.RequestRef) isc.Request - requestReceivedCB func(isc.Request) + requestReceivedCB func(isc.Request) bool nodeCountToShare int // Number of nodes to share a request per iteration. maxMsgsPerTick int needed *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *distSyncReqNeeded] @@ -58,7 +58,7 @@ type distSyncReqNeeded struct { func New( me gpa.NodeID, requestNeededCB func(*isc.RequestRef) isc.Request, - requestReceivedCB func(isc.Request), + requestReceivedCB func(isc.Request) bool, maxMsgsPerTick int, missingReqsMetric func(count int), log *logger.Logger, @@ -153,24 +153,29 @@ func (dsi *distSyncImpl) handleCommitteeNodes(committeeNodes []gpa.NodeID) { // In the current algorithm, for sharing a message: // - Just send a message to all the committee nodes (or server nodes, if committee is not known). func (dsi *distSyncImpl) handleInputPublishRequest(input *inputPublishRequest) gpa.OutMessages { + msgs := dsi.propagateRequest(input.request) + // + // Delete the it from the "needed" list, if any. + // This node has the request, if it tries to publish it. + reqRef := isc.RequestRefFromRequest(input.request) + if dsi.needed.Delete(reqRef.AsKey()) { + dsi.missingReqsMetric(dsi.needed.Size()) + } + return msgs +} + +func (dsi *distSyncImpl) propagateRequest(request isc.Request) gpa.OutMessages { msgs := gpa.NoMessages() var publishToNodes []gpa.NodeID if len(dsi.committeeNodes) > 0 { publishToNodes = dsi.committeeNodes - dsi.log.Debugf("Forwarding request %v to committee nodes: %v", input.request.ID(), dsi.committeeNodes) + dsi.log.Debugf("Forwarding request %v to committee nodes: %v", request.ID(), dsi.committeeNodes) } else { - dsi.log.Debugf("Forwarding request %v to server nodes: %v", input.request.ID(), dsi.serverNodes) + dsi.log.Debugf("Forwarding request %v to server nodes: %v", request.ID(), dsi.serverNodes) publishToNodes = dsi.serverNodes } for i := range publishToNodes { - msgs.Add(newMsgShareRequest(input.request, 0, publishToNodes[i])) - } - // - // Delete the it from the "needed" list, if any. - // This node has the request, if it tries to publish it. - reqRef := isc.RequestRefFromRequest(input.request) - if dsi.needed.Delete(reqRef.AsKey()) { - dsi.missingReqsMetric(dsi.needed.Size()) + msgs.Add(newMsgShareRequest(request, 0, publishToNodes[i])) } return msgs } @@ -244,22 +249,30 @@ func (dsi *distSyncImpl) handleMsgMissingRequest(msg *msgMissingRequest) gpa.Out } func (dsi *distSyncImpl) handleMsgShareRequest(msg *msgShareRequest) gpa.OutMessages { + msgs := gpa.NoMessages() reqRefKey := isc.RequestRefFromRequest(msg.request).AsKey() - dsi.requestReceivedCB(msg.request) + added := dsi.requestReceivedCB(msg.request) if dsi.needed.Delete(reqRefKey) { dsi.missingReqsMetric(dsi.needed.Size()) } + // + // Propagate the message, if it was new to us. + // Follow the logic as if the message is received via the API. + if added { + msgs.AddAll(dsi.propagateRequest(msg.request)) + } + // + // The following is de-factor unused, as TTL is always 0 currently. if msg.ttl > 0 { ttl := msg.ttl if ttl > maxTTL { ttl = maxTTL } - msgs := gpa.NoMessages() perm := dsi.rnd.Perm(len(dsi.committeeNodes)) for i := 0; i < dsi.nodeCountToShare; i++ { msgs.Add(newMsgShareRequest(msg.request, ttl-1, dsi.committeeNodes[perm[i]])) } return msgs } - return nil + return msgs } diff --git a/packages/chain/mempool/distsync/dist_sync_test.go b/packages/chain/mempool/distsync/dist_sync_test.go index e7e18f67c3..2e5e2ca575 100644 --- a/packages/chain/mempool/distsync/dist_sync_test.go +++ b/packages/chain/mempool/distsync/dist_sync_test.go @@ -38,8 +38,10 @@ func testBasic(t *testing.T, n, cmtN, cmtF int) { } return nil } - requestReceivedCB := func(req isc.Request) { + requestReceivedCB := func(req isc.Request) bool { + _, have := recv[thisNodeID] recv[thisNodeID] = req + return !have } nodes[nid] = distsync.New(thisNodeID, requestNeededCB, requestReceivedCB, 100, func(count int) {}, log) } diff --git a/packages/chain/mempool/mempool.go b/packages/chain/mempool/mempool.go index bf7c5d04e5..184d243165 100644 --- a/packages/chain/mempool/mempool.go +++ b/packages/chain/mempool/mempool.go @@ -493,15 +493,17 @@ func (mpi *mempoolImpl) distSyncRequestNeededCB(requestRef *isc.RequestRef) isc. } // A callback for distSync. -func (mpi *mempoolImpl) distSyncRequestReceivedCB(request isc.Request) { +func (mpi *mempoolImpl) distSyncRequestReceivedCB(request isc.Request) bool { offLedgerReq, ok := request.(isc.OffLedgerRequest) if !ok { mpi.log.Warn("Dropping non-OffLedger request form dist %T: %+v", request, request) - return + return false } if err := mpi.shouldAddOffledgerRequest(offLedgerReq); err == nil { mpi.addOffledger(offLedgerReq) + return true } + return false } func (mpi *mempoolImpl) nonce(account isc.AgentID) uint64 {