Skip to content

Commit

Permalink
feat: idontwant on publish (#1230)
Browse files Browse the repository at this point in the history
Closes #1224 

The following changes were done:
1. Once a message is broadcasted, if it exceeds the threeshold to
consider it a large message it will send an IDONTWANT message before
publishing the message

https://github.com/vacp2p/nim-libp2p/blob/4f9c21f6999a58dff0fde2d4f031086bc846c9f7/libp2p/protocols/pubsub/gossipsub.nim#L800-L801
2. Extracts the logic to broadcast an IDONTWANT message and to verify
the size of a message into separate functions
3. Adds a template to filter values of a hashset without modifying the
original
  • Loading branch information
richard-ramos authored Jan 14, 2025
1 parent d215bb2 commit 483e1d9
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 21 deletions.
62 changes: 41 additions & 21 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ proc init*(
overheadRateLimit = Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit = false,
maxNumElementsInNonPriorityQueue = DefaultMaxNumElementsInNonPriorityQueue,
sendIDontWantOnPublish = false,
): GossipSubParams =
GossipSubParams(
explicit: true,
Expand Down Expand Up @@ -139,6 +140,7 @@ proc init*(
overheadRateLimit: overheadRateLimit,
disconnectPeerAboveRateLimit: disconnectPeerAboveRateLimit,
maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue,
sendIDontWantOnPublish: sendIDontWantOnPublish,
)

proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
Expand Down Expand Up @@ -381,6 +383,40 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
trace "sending iwant reply messages", peer
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)

proc sendIDontWant(
g: GossipSub,
msg: Message,
msgId: MessageId,
peersToSendIDontWant: HashSet[PubSubPeer],
) =
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).

# IDONTWANT is only supported by >= GossipSubCodec_12
let peers = peersToSendIDontWant.filterIt(
it.codec != GossipSubCodec_10 and it.codec != GossipSubCodec_11
)

g.broadcast(
peers,
RPCMsg(
control: some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)

const iDontWantMessageSizeThreshold* = 512

proc isLargeMessage(msg: Message, msgId: MessageId): bool =
msg.data.len > max(iDontWantMessageSizeThreshold, msgId.len * 10)

proc validateAndRelay(
g: GossipSub, msg: Message, msgId: MessageId, saltedId: SaltedId, peer: PubSubPeer
) {.async: (raises: []).} =
Expand All @@ -397,29 +433,10 @@ proc validateAndRelay(
toSendPeers.incl(peers[])
toSendPeers.excl(peer)

if msg.data.len > max(512, msgId.len * 10):
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
if isLargeMessage(msg, msgId):
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
g.sendIDontWant(msg, msgId, peersToSendIDontWant)

let validation = await g.validate(msg)

Expand Down Expand Up @@ -788,6 +805,9 @@ method publish*(

g.mcache.put(msgId, msg)

if g.parameters.sendIDontWantOnPublish and isLargeMessage(msg, msgId):
g.sendIDontWant(msg, msgId, peers)

g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)

if g.knownTopics.contains(topic):
Expand Down
3 changes: 3 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ type
# Max number of elements allowed in the non-priority queue. When this limit has been reached, the peer will be disconnected.
maxNumElementsInNonPriorityQueue*: int

# Broadcast an IDONTWANT message automatically when the message exceeds the IDONTWANT message size threshold
sendIDontWantOnPublish*: bool

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[SaltedId, HashSet[PubSubPeer]]

Expand Down
8 changes: 8 additions & 0 deletions libp2p/utility.nim
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,11 @@ template exclIfIt*[T](set: var HashSet[T], condition: untyped) =
if condition:
toExcl.incl(it)
set.excl(toExcl)

template filterIt*[T](set: HashSet[T], condition: untyped): HashSet[T] =
var filtered = HashSet[T]()
if set.len != 0:
for it {.inject.} in set:
if condition:
filtered.incl(it)
filtered
38 changes: 38 additions & 0 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,44 @@ suite "GossipSub":

await allFuturesThrowing(nodesFut.concat())

asyncTest "e2e - iDontWant is broadcasted on publish":
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
ok(newSeq[byte](10))
let
nodes = generateNodes(
2,
gossip = true,
msgIdProvider = dumbMsgIdProvider,
sendIDontWantOnPublish = true,
)

nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())

await nodes[0].switch.connect(
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
)

proc handlerA(topic: string, data: seq[byte]) {.async: (raises: []).} =
discard

proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} =
discard

nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
await waitSubGraph(nodes, "foobar")

var gossip2: GossipSub = GossipSub(nodes[1])

tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1

checkUntilTimeout:
gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1)

await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())

await allFuturesThrowing(nodesFut.concat())

asyncTest "e2e - iDontWant is sent only for 1.2":
# 3 nodes: A <=> B <=> C
# (A & C are NOT connected). We pre-emptively send a dontwant from C to B,
Expand Down
2 changes: 2 additions & 0 deletions tests/pubsub/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ proc generateNodes*(
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] =
Opt.none(tuple[bytes: int, interval: Duration]),
gossipSubVersion: string = "",
sendIDontWantOnPublish: bool = false,
): seq[PubSub] =
for i in 0 ..< num:
let switch = newStandardSwitch(
Expand All @@ -97,6 +98,7 @@ proc generateNodes*(
p.unsubscribeBackoff = unsubscribeBackoff
p.enablePX = enablePX
p.overheadRateLimit = overheadRateLimit
p.sendIDontWantOnPublish = sendIDontWantOnPublish
p
),
)
Expand Down

0 comments on commit 483e1d9

Please sign in to comment.