From 1f7869be8c51f1e9e9055a118db7c535796548df Mon Sep 17 00:00:00 2001 From: bhartnett <51288821+bhartnett@users.noreply.github.com> Date: Thu, 16 Jan 2025 15:19:29 +0800 Subject: [PATCH 1/3] Implement ban list data structure, banPeer and isBanned. --- fluffy/network/wire/portal_protocol.nim | 29 ++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index ccade317cd..9671019b38 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -1,5 +1,5 @@ # Fluffy -# Copyright (c) 2021-2024 Status Research & Development GmbH +# Copyright (c) 2021-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -166,9 +166,12 @@ type of Database: contentKeys: ContentKeysList + BanTimeout = chronos.Moment + PortalProtocol* = ref object of TalkProtocol protocolId*: PortalProtocolId routingTable*: RoutingTable + bannedPeers: Table[NodeId, BanTimeout] baseProtocol*: protocol.Protocol toContentId*: ToContentIdHandler contentCache: ContentCache @@ -334,6 +337,30 @@ func truncateEnrs( enrs +proc banPeer(p: PortalProtocol, nodeId: NodeId, period: chronos.Duration) = + let banTimeout = now(chronos.Moment) + period + + if p.bannedPeers.contains(nodeId): + let existingTimeout = p.bannedPeers.getOrDefault(nodeId) + if existingTimeout < banTimeout: + p.bannedPeers[nodeId] = banTimeout + else: + p.bannedPeers[nodeId] = banTimeout + +proc isBanned(p: PortalProtocol, nodeId: NodeId): bool = + if not p.bannedPeers.contains(nodeId): + return false + + let + currentTime = now(chronos.Moment) + banTimeout = p.bannedPeers.getOrDefault(nodeId) + if currentTime < banTimeout: + return true + + # Peer is in bannedPeers table but the time period has expired + p.bannedPeers.del(nodeId) + false + proc handlePing(p: PortalProtocol, ping: PingMessage, srcId: NodeId): seq[byte] = # TODO: This should become custom per Portal Network # TODO: Need to think about the effect of malicious actor sending lots of From 000e2bbca06dcd1bef1333a8463ae33466256608 Mon Sep 17 00:00:00 2001 From: bhartnett <51288821+bhartnett@users.noreply.github.com> Date: Fri, 17 Jan 2025 11:23:32 +0800 Subject: [PATCH 2/3] Ban nodes when state network content lookups and offers fail validation. --- fluffy/network/state/state_network.nim | 5 ++++- fluffy/network/wire/portal_protocol.nim | 10 ++++++++++ fluffy/rpc/rpc_portal_state_api.nim | 4 ++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/fluffy/network/state/state_network.nim b/fluffy/network/state/state_network.nim index fe4f82e403..75d0ad8f67 100644 --- a/fluffy/network/state/state_network.nim +++ b/fluffy/network/state/state_network.nim @@ -1,5 +1,5 @@ # Fluffy -# Copyright (c) 2021-2024 Status Research & Development GmbH +# Copyright (c) 2021-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -109,6 +109,7 @@ proc getContent( continue validateRetrieval(key, contentValue).isOkOr: + n.portalProtocol.contentLookupFailedValidation(lookupRes.receivedFrom.id) error "Validation of retrieved state content failed" continue @@ -243,6 +244,8 @@ proc processContentLoop(n: StateNetwork) {.async: (raises: []).} = debug "Received offered content validated successfully", srcNodeId, contentKeyBytes else: + if srcNodeId.isSome(): + n.portalProtocol.offerFailedValidation(srcNodeId.get()) state_network_offers_failed.inc(labelValues = [$n.portalProtocol.protocolId]) error "Received offered content failed validation", srcNodeId, contentKeyBytes, error = offerRes.error() diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 9671019b38..7cc0179aa3 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -126,6 +126,10 @@ const ## value in milliseconds initialLookups = 1 ## Amount of lookups done when populating the routing table + # Ban durations for the banned peers table + PeerBanDurationContentLookupFailedValidation = 30.minutes + PeerBanDurationOfferFailedValidation = 60.minutes + type ToContentIdHandler* = proc(contentKey: ContentKeyByteList): results.Opt[ContentId] {.raises: [], gcsafe.} @@ -361,6 +365,12 @@ proc isBanned(p: PortalProtocol, nodeId: NodeId): bool = p.bannedPeers.del(nodeId) false +template contentLookupFailedValidation*(p: PortalProtocol, nodeId: NodeId) = + p.banPeer(nodeId, PeerBanDurationContentLookupFailedValidation) + +template offerFailedValidation*(p: PortalProtocol, nodeId: NodeId) = + p.banPeer(nodeId, PeerBanDurationOfferFailedValidation) + proc handlePing(p: PortalProtocol, ping: PingMessage, srcId: NodeId): seq[byte] = # TODO: This should become custom per Portal Network # TODO: Need to think about the effect of malicious actor sending lots of diff --git a/fluffy/rpc/rpc_portal_state_api.nim b/fluffy/rpc/rpc_portal_state_api.nim index f4d809e7c0..888d349a91 100644 --- a/fluffy/rpc/rpc_portal_state_api.nim +++ b/fluffy/rpc/rpc_portal_state_api.nim @@ -46,6 +46,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) = of Content: let valueBytes = foundContentResult.content validateRetrieval(key, valueBytes).isOkOr: + p.contentLookupFailedValidation(node.id) raise invalidValueErr() let res = ContentInfo( @@ -97,6 +98,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) = valueBytes = contentLookupResult.content validateRetrieval(key, valueBytes).isOkOr: + p.contentLookupFailedValidation(contentLookupResult.receivedFrom.id) raise invalidValueErr() p.storeContent(keyBytes, contentId, valueBytes, cacheContent = true) @@ -124,6 +126,8 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) = raise contentNotFoundErrWithTrace(data) validateRetrieval(key, valueBytes).isOkOr: + if res.trace.receivedFrom.isSome(): + p.contentLookupFailedValidation(res.trace.receivedFrom.get()) raise invalidValueErr() p.storeContent(keyBytes, contentId, valueBytes, cacheContent = true) From db11fb16c99c152e398bd43bc5131618bb8375dc Mon Sep 17 00:00:00 2001 From: bhartnett <51288821+bhartnett@users.noreply.github.com> Date: Fri, 17 Jan 2025 13:45:01 +0800 Subject: [PATCH 3/3] Filter out and ignore banned peers in portal protocol. --- fluffy/network/wire/portal_protocol.nim | 101 +++++++++++++----------- 1 file changed, 55 insertions(+), 46 deletions(-) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 7cc0179aa3..71aa69c140 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -291,6 +291,36 @@ func getProtocolId*( of PortalSubnetwork.transactionGossip: [portalPrefix, 0x4F] +proc banPeer(p: PortalProtocol, nodeId: NodeId, period: chronos.Duration) = + let banTimeout = now(chronos.Moment) + period + + if p.bannedPeers.contains(nodeId): + let existingTimeout = p.bannedPeers.getOrDefault(nodeId) + if existingTimeout < banTimeout: + p.bannedPeers[nodeId] = banTimeout + else: + p.bannedPeers[nodeId] = banTimeout + +proc isBanned(p: PortalProtocol, nodeId: NodeId): bool = + if not p.bannedPeers.contains(nodeId): + return false + + let + currentTime = now(chronos.Moment) + banTimeout = p.bannedPeers.getOrDefault(nodeId) + if currentTime < banTimeout: + return true + + # Peer is in bannedPeers table but the time period has expired + p.bannedPeers.del(nodeId) + false + +template contentLookupFailedValidation*(p: PortalProtocol, nodeId: NodeId) = + p.banPeer(nodeId, PeerBanDurationContentLookupFailedValidation) + +template offerFailedValidation*(p: PortalProtocol, nodeId: NodeId) = + p.banPeer(nodeId, PeerBanDurationOfferFailedValidation) + func `$`(id: PortalProtocolId): string = id.toHex() @@ -306,8 +336,11 @@ func getNode*(p: PortalProtocol, id: NodeId): Opt[Node] = func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode -func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] = - p.routingTable.neighbours(id = id, seenOnly = seenOnly) +proc neighbours*( + p: PortalProtocol, id: NodeId, k: int = BUCKET_SIZE, seenOnly = false +): seq[Node] = + # TODO: Maybe banned peers should be in the routing table and filtered out at that level + p.routingTable.neighbours(id, k, seenOnly).filterIt(not p.isBanned(it.id)) func distance(p: PortalProtocol, a, b: NodeId): UInt256 = p.routingTable.distance(a, b) @@ -341,36 +374,6 @@ func truncateEnrs( enrs -proc banPeer(p: PortalProtocol, nodeId: NodeId, period: chronos.Duration) = - let banTimeout = now(chronos.Moment) + period - - if p.bannedPeers.contains(nodeId): - let existingTimeout = p.bannedPeers.getOrDefault(nodeId) - if existingTimeout < banTimeout: - p.bannedPeers[nodeId] = banTimeout - else: - p.bannedPeers[nodeId] = banTimeout - -proc isBanned(p: PortalProtocol, nodeId: NodeId): bool = - if not p.bannedPeers.contains(nodeId): - return false - - let - currentTime = now(chronos.Moment) - banTimeout = p.bannedPeers.getOrDefault(nodeId) - if currentTime < banTimeout: - return true - - # Peer is in bannedPeers table but the time period has expired - p.bannedPeers.del(nodeId) - false - -template contentLookupFailedValidation*(p: PortalProtocol, nodeId: NodeId) = - p.banPeer(nodeId, PeerBanDurationContentLookupFailedValidation) - -template offerFailedValidation*(p: PortalProtocol, nodeId: NodeId) = - p.banPeer(nodeId, PeerBanDurationOfferFailedValidation) - proc handlePing(p: PortalProtocol, ping: PingMessage, srcId: NodeId): seq[byte] = # TODO: This should become custom per Portal Network # TODO: Need to think about the effect of malicious actor sending lots of @@ -474,7 +477,7 @@ proc handleFindContent( # Node does not have the content, or content is not even in radius, # send closest neighbours to the requested content id. let - closestNodes = p.routingTable.neighbours(NodeId(contentId), seenOnly = true) + closestNodes = p.neighbours(NodeId(contentId), seenOnly = true) enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead) portal_content_enrs_packed.observe(enrs.len().int64, labelValues = [$p.protocolId]) @@ -551,6 +554,12 @@ proc messageHandler( let p = PortalProtocol(protocol) + if p.isBanned(srcId): + # The sender of the message is in the temporary node ban list + # so we don't process the message + debug "Ignoring message from banned node", srcId, srcUdpAddress + return @[] + let decoded = decodeMessage(request) if decoded.isOk(): let message = decoded.get() @@ -1068,7 +1077,7 @@ proc lookup*( ## target. Maximum value for n is `BUCKET_SIZE`. # `closestNodes` holds the k closest nodes to target found, sorted by distance # Unvalidated nodes are used for requests as a form of validation. - var closestNodes = p.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false) + var closestNodes = p.neighbours(target, BUCKET_SIZE, seenOnly = false) var asked, seen = HashSet[NodeId]() asked.incl(p.localNode.id) # No need to ask our own node @@ -1113,7 +1122,7 @@ proc lookup*( let nodes = await query # TODO: Remove node on timed-out query? for n in nodes: - if not seen.containsOrIncl(n.id): + if not seen.containsOrIncl(n.id) and not p.isBanned(n.id): # If it wasn't seen before, insert node while remaining sorted closestNodes.insert( n, @@ -1170,7 +1179,7 @@ proc contentLookup*( ## target. # `closestNodes` holds the k closest nodes to target found, sorted by distance # Unvalidated nodes are used for requests as a form of validation. - var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, seenOnly = false) + var closestNodes = p.neighbours(targetId, BUCKET_SIZE, seenOnly = false) # Shuffling the order of the nodes in order to not always hit the same node # first for the same request. @@ -1243,14 +1252,15 @@ proc contentLookup*( of Nodes: let maybeRadius = p.radiusCache.get(content.src.id) if maybeRadius.isSome() and - p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId): + p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId) and + not p.isBanned(content.src.id): # Only return nodes which may be interested in content. # No need to check for duplicates in nodesWithoutContent # as requests are never made two times to the same node. nodesWithoutContent.add(content.src) for n in content.nodes: - if not seen.containsOrIncl(n.id): + if not seen.containsOrIncl(n.id) and not p.isBanned(n.id): discard p.addNode(n) # If it wasn't seen before, insert node while remaining sorted closestNodes.insert( @@ -1296,7 +1306,7 @@ proc traceContentLookup*( # Need to use a system clock and not the mono clock for this. let startedAtMs = int64(times.epochTime() * 1000) - var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, seenOnly = false) + var closestNodes = p.neighbours(targetId, BUCKET_SIZE, seenOnly = false) # Shuffling the order of the nodes in order to not always hit the same node # first for the same request. p.baseProtocol.rng[].shuffle(closestNodes) @@ -1387,7 +1397,8 @@ proc traceContentLookup*( let maybeRadius = p.radiusCache.get(content.src.id) if maybeRadius.isSome() and - p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId): + p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId) and + not p.isBanned(content.src.id): # Only return nodes which may be interested in content. # No need to check for duplicates in nodesWithoutContent # as requests are never made two times to the same node. @@ -1401,7 +1412,7 @@ proc traceContentLookup*( metadata["0x" & $n.id] = NodeMetadata(enr: n.record, distance: dist) respondedWith.add(n.id) - if not seen.containsOrIncl(n.id): + if not seen.containsOrIncl(n.id) and not p.isBanned(n.id): discard p.addNode(n) # If it wasn't seen before, insert node while remaining sorted closestNodes.insert( @@ -1494,7 +1505,7 @@ proc query*( ## This will take k nodes from the routing table closest to target and ## query them for nodes closest to target. If there are less than k nodes in ## the routing table, nodes returned by the first queries will be used. - var queryBuffer = p.routingTable.neighbours(target, k, seenOnly = false) + var queryBuffer = p.neighbours(target, k, seenOnly = false) var asked, seen = HashSet[NodeId]() asked.incl(p.localNode.id) # No need to ask our own node @@ -1549,8 +1560,7 @@ proc queryRandom*( proc getNClosestNodesWithRadius*( p: PortalProtocol, targetId: NodeId, n: int, seenOnly: bool = false ): seq[(Node, UInt256)] = - let closestLocalNodes = - p.routingTable.neighbours(targetId, k = n, seenOnly = seenOnly) + let closestLocalNodes = p.neighbours(targetId, k = n, seenOnly = seenOnly) var nodesWithRadiuses: seq[(Node, UInt256)] for node in closestLocalNodes: @@ -1595,8 +1605,7 @@ proc neighborhoodGossip*( # table, but at the same time avoid unnecessary node lookups. # It might still cause issues in data getting propagated in a wider id range. - let closestLocalNodes = - p.routingTable.neighbours(NodeId(contentId), k = 16, seenOnly = true) + let closestLocalNodes = p.neighbours(NodeId(contentId), BUCKET_SIZE, seenOnly = true) var gossipNodes: seq[Node] for node in closestLocalNodes: