diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d6e97804c8..af6309a1a0 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -102,6 +102,7 @@ proc init*( overheadRateLimit = Opt.none(tuple[bytes: int, interval: Duration]), disconnectPeerAboveRateLimit = false, maxNumElementsInNonPriorityQueue = DefaultMaxNumElementsInNonPriorityQueue, + sendIDontWantOnPublish = false, ): GossipSubParams = GossipSubParams( explicit: true, @@ -139,6 +140,7 @@ proc init*( overheadRateLimit: overheadRateLimit, disconnectPeerAboveRateLimit: disconnectPeerAboveRateLimit, maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue, + sendIDontWantOnPublish: sendIDontWantOnPublish, ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -381,6 +383,40 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = trace "sending iwant reply messages", peer g.send(peer, RPCMsg(messages: messages), isHighPriority = false) +proc sendIDontWant( + g: GossipSub, + msg: Message, + msgId: MessageId, + peersToSendIDontWant: HashSet[PubSubPeer], +) = + # If the message is "large enough", let the mesh know that we do not want + # any more copies of it, regardless if it is valid or not. + # + # In the case that it is not valid, this leads to some redundancy + # (since the other peer should not send us an invalid message regardless), + # but the expectation is that this is rare (due to such peers getting + # descored) and that the savings from honest peers are greater than the + # cost a dishonest peer can incur in short time (since the IDONTWANT is + # small). + + # IDONTWANT is only supported by >= GossipSubCodec_12 + let peers = peersToSendIDontWant.filterIt( + it.codec != GossipSubCodec_10 and it.codec != GossipSubCodec_11 + ) + + g.broadcast( + peers, + RPCMsg( + control: some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])])) + ), + isHighPriority = true, + ) + +const iDontWantMessageSizeThreshold* = 512 + +proc isLargeMessage(msg: Message, msgId: MessageId): bool = + msg.data.len > max(iDontWantMessageSizeThreshold, msgId.len * 10) + proc validateAndRelay( g: GossipSub, msg: Message, msgId: MessageId, saltedId: SaltedId, peer: PubSubPeer ) {.async: (raises: []).} = @@ -397,29 +433,10 @@ proc validateAndRelay( toSendPeers.incl(peers[]) toSendPeers.excl(peer) - if msg.data.len > max(512, msgId.len * 10): - # If the message is "large enough", let the mesh know that we do not want - # any more copies of it, regardless if it is valid or not. - # - # In the case that it is not valid, this leads to some redundancy - # (since the other peer should not send us an invalid message regardless), - # but the expectation is that this is rare (due to such peers getting - # descored) and that the savings from honest peers are greater than the - # cost a dishonest peer can incur in short time (since the IDONTWANT is - # small). + if isLargeMessage(msg, msgId): var peersToSendIDontWant = HashSet[PubSubPeer]() addToSendPeers(peersToSendIDontWant) - peersToSendIDontWant.exclIfIt( - it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11 - ) - g.broadcast( - peersToSendIDontWant, - RPCMsg( - control: - some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])])) - ), - isHighPriority = true, - ) + g.sendIDontWant(msg, msgId, peersToSendIDontWant) let validation = await g.validate(msg) @@ -788,6 +805,9 @@ method publish*( g.mcache.put(msgId, msg) + if g.parameters.sendIDontWantOnPublish and isLargeMessage(msg, msgId): + g.sendIDontWant(msg, msgId, peers) + g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true) if g.knownTopics.contains(topic): diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index d50e098240..0044be9d50 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -154,6 +154,9 @@ type # Max number of elements allowed in the non-priority queue. When this limit has been reached, the peer will be disconnected. maxNumElementsInNonPriorityQueue*: int + # Broadcast an IDONTWANT message automatically when the message exceeds the IDONTWANT message size threshold + sendIDontWantOnPublish*: bool + BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[SaltedId, HashSet[PubSubPeer]] diff --git a/libp2p/utility.nim b/libp2p/utility.nim index 62fbe97bc6..cdfd19ca5d 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -149,3 +149,11 @@ template exclIfIt*[T](set: var HashSet[T], condition: untyped) = if condition: toExcl.incl(it) set.excl(toExcl) + +template filterIt*[T](set: HashSet[T], condition: untyped): HashSet[T] = + var filtered = HashSet[T]() + if set.len != 0: + for it {.inject.} in set: + if condition: + filtered.incl(it) + filtered diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 6b17ae9ad6..52d5e66f22 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -936,6 +936,44 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - iDontWant is broadcasted on publish": + func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] = + ok(newSeq[byte](10)) + let + nodes = generateNodes( + 2, + gossip = true, + msgIdProvider = dumbMsgIdProvider, + sendIDontWantOnPublish = true, + ) + + nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start()) + + await nodes[0].switch.connect( + nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs + ) + + proc handlerA(topic: string, data: seq[byte]) {.async: (raises: []).} = + discard + + proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} = + discard + + nodes[0].subscribe("foobar", handlerA) + nodes[1].subscribe("foobar", handlerB) + await waitSubGraph(nodes, "foobar") + + var gossip2: GossipSub = GossipSub(nodes[1]) + + tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1 + + checkUntilTimeout: + gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1) + + await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop()) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - iDontWant is sent only for 1.2": # 3 nodes: A <=> B <=> C # (A & C are NOT connected). We pre-emptively send a dontwant from C to B, diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index ecd0e7b9a6..be58ac064d 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -74,6 +74,7 @@ proc generateNodes*( overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] = Opt.none(tuple[bytes: int, interval: Duration]), gossipSubVersion: string = "", + sendIDontWantOnPublish: bool = false, ): seq[PubSub] = for i in 0 ..< num: let switch = newStandardSwitch( @@ -97,6 +98,7 @@ proc generateNodes*( p.unsubscribeBackoff = unsubscribeBackoff p.enablePX = enablePX p.overheadRateLimit = overheadRateLimit + p.sendIDontWantOnPublish = sendIDontWantOnPublish p ), )