Skip to content

Commit

Permalink
Merge pull request #3113 from iotaledger/fix-access-access
Browse files Browse the repository at this point in the history
Forward requests via access nodes.
  • Loading branch information
kape1395 authored Nov 27, 2023
2 parents 9e5f066 + 6d13e95 commit ea396b2
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
43 changes: 28 additions & 15 deletions packages/chain/mempool/distsync/dist_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type distSyncImpl struct {
accessNodes []gpa.NodeID // Maybe is not needed? Lets keep it until the redesign.
committeeNodes []gpa.NodeID // Subset of serverNodes and accessNodes.
requestNeededCB func(*isc.RequestRef) isc.Request
requestReceivedCB func(isc.Request)
requestReceivedCB func(isc.Request) bool
nodeCountToShare int // Number of nodes to share a request per iteration.
maxMsgsPerTick int
needed *shrinkingmap.ShrinkingMap[isc.RequestRefKey, *distSyncReqNeeded]
Expand All @@ -58,7 +58,7 @@ type distSyncReqNeeded struct {
func New(
me gpa.NodeID,
requestNeededCB func(*isc.RequestRef) isc.Request,
requestReceivedCB func(isc.Request),
requestReceivedCB func(isc.Request) bool,
maxMsgsPerTick int,
missingReqsMetric func(count int),
log *logger.Logger,
Expand Down Expand Up @@ -153,24 +153,29 @@ func (dsi *distSyncImpl) handleCommitteeNodes(committeeNodes []gpa.NodeID) {
// In the current algorithm, for sharing a message:
// - Just send a message to all the committee nodes (or server nodes, if committee is not known).
func (dsi *distSyncImpl) handleInputPublishRequest(input *inputPublishRequest) gpa.OutMessages {
msgs := dsi.propagateRequest(input.request)
//
// Delete the it from the "needed" list, if any.
// This node has the request, if it tries to publish it.
reqRef := isc.RequestRefFromRequest(input.request)
if dsi.needed.Delete(reqRef.AsKey()) {
dsi.missingReqsMetric(dsi.needed.Size())
}
return msgs
}

func (dsi *distSyncImpl) propagateRequest(request isc.Request) gpa.OutMessages {
msgs := gpa.NoMessages()
var publishToNodes []gpa.NodeID
if len(dsi.committeeNodes) > 0 {
publishToNodes = dsi.committeeNodes
dsi.log.Debugf("Forwarding request %v to committee nodes: %v", input.request.ID(), dsi.committeeNodes)
dsi.log.Debugf("Forwarding request %v to committee nodes: %v", request.ID(), dsi.committeeNodes)
} else {
dsi.log.Debugf("Forwarding request %v to server nodes: %v", input.request.ID(), dsi.serverNodes)
dsi.log.Debugf("Forwarding request %v to server nodes: %v", request.ID(), dsi.serverNodes)
publishToNodes = dsi.serverNodes
}
for i := range publishToNodes {
msgs.Add(newMsgShareRequest(input.request, 0, publishToNodes[i]))
}
//
// Delete the it from the "needed" list, if any.
// This node has the request, if it tries to publish it.
reqRef := isc.RequestRefFromRequest(input.request)
if dsi.needed.Delete(reqRef.AsKey()) {
dsi.missingReqsMetric(dsi.needed.Size())
msgs.Add(newMsgShareRequest(request, 0, publishToNodes[i]))
}
return msgs
}
Expand Down Expand Up @@ -244,22 +249,30 @@ func (dsi *distSyncImpl) handleMsgMissingRequest(msg *msgMissingRequest) gpa.Out
}

func (dsi *distSyncImpl) handleMsgShareRequest(msg *msgShareRequest) gpa.OutMessages {
msgs := gpa.NoMessages()
reqRefKey := isc.RequestRefFromRequest(msg.request).AsKey()
dsi.requestReceivedCB(msg.request)
added := dsi.requestReceivedCB(msg.request)
if dsi.needed.Delete(reqRefKey) {
dsi.missingReqsMetric(dsi.needed.Size())
}
//
// Propagate the message, if it was new to us.
// Follow the logic as if the message is received via the API.
if added {
msgs.AddAll(dsi.propagateRequest(msg.request))
}
//
// The following is de-factor unused, as TTL is always 0 currently.
if msg.ttl > 0 {
ttl := msg.ttl
if ttl > maxTTL {
ttl = maxTTL
}
msgs := gpa.NoMessages()
perm := dsi.rnd.Perm(len(dsi.committeeNodes))
for i := 0; i < dsi.nodeCountToShare; i++ {
msgs.Add(newMsgShareRequest(msg.request, ttl-1, dsi.committeeNodes[perm[i]]))
}
return msgs
}
return nil
return msgs
}
4 changes: 3 additions & 1 deletion packages/chain/mempool/distsync/dist_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ func testBasic(t *testing.T, n, cmtN, cmtF int) {
}
return nil
}
requestReceivedCB := func(req isc.Request) {
requestReceivedCB := func(req isc.Request) bool {
_, have := recv[thisNodeID]
recv[thisNodeID] = req
return !have
}
nodes[nid] = distsync.New(thisNodeID, requestNeededCB, requestReceivedCB, 100, func(count int) {}, log)
}
Expand Down
6 changes: 4 additions & 2 deletions packages/chain/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,17 @@ func (mpi *mempoolImpl) distSyncRequestNeededCB(requestRef *isc.RequestRef) isc.
}

// A callback for distSync.
func (mpi *mempoolImpl) distSyncRequestReceivedCB(request isc.Request) {
func (mpi *mempoolImpl) distSyncRequestReceivedCB(request isc.Request) bool {
offLedgerReq, ok := request.(isc.OffLedgerRequest)
if !ok {
mpi.log.Warn("Dropping non-OffLedger request form dist %T: %+v", request, request)
return
return false
}
if err := mpi.shouldAddOffledgerRequest(offLedgerReq); err == nil {
mpi.addOffledger(offLedgerReq)
return true
}
return false
}

func (mpi *mempoolImpl) nonce(account isc.AgentID) uint64 {
Expand Down

0 comments on commit ea396b2

Please sign in to comment.