Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fluffy: Portal subnetwork peer ban list #3007

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion fluffy/network/state/state_network.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Fluffy
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Copyright (c) 2021-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand Down Expand Up @@ -109,6 +109,7 @@ proc getContent(
continue

validateRetrieval(key, contentValue).isOkOr:
n.portalProtocol.contentLookupFailedValidation(lookupRes.receivedFrom.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think when code reading it is clearer what this does when it would just be p.banPeer(nodeId, PeerBanDurationContentLookupFailedValidation). I feel the templates (or their names) mask to much which action is done here.

error "Validation of retrieved state content failed"
continue

Expand Down Expand Up @@ -243,6 +244,8 @@ proc processContentLoop(n: StateNetwork) {.async: (raises: []).} =
debug "Received offered content validated successfully",
srcNodeId, contentKeyBytes
else:
if srcNodeId.isSome():
n.portalProtocol.offerFailedValidation(srcNodeId.get())
state_network_offers_failed.inc(labelValues = [$n.portalProtocol.protocolId])
error "Received offered content failed validation",
srcNodeId, contentKeyBytes, error = offerRes.error()
Expand Down
80 changes: 63 additions & 17 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Fluffy
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Copyright (c) 2021-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand Down Expand Up @@ -126,6 +126,10 @@ const
## value in milliseconds
initialLookups = 1 ## Amount of lookups done when populating the routing table

# Ban durations for the banned peers table
PeerBanDurationContentLookupFailedValidation = 30.minutes
PeerBanDurationOfferFailedValidation = 60.minutes
Comment on lines +129 to +131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could probably be the same duration always? Similar type of protocol violation?


type
ToContentIdHandler* =
proc(contentKey: ContentKeyByteList): results.Opt[ContentId] {.raises: [], gcsafe.}
Expand Down Expand Up @@ -166,9 +170,12 @@ type
of Database:
contentKeys: ContentKeysList

BanTimeout = chronos.Moment

PortalProtocol* = ref object of TalkProtocol
protocolId*: PortalProtocolId
routingTable*: RoutingTable
bannedPeers: Table[NodeId, BanTimeout]
baseProtocol*: protocol.Protocol
toContentId*: ToContentIdHandler
contentCache: ContentCache
Expand Down Expand Up @@ -284,6 +291,36 @@ func getProtocolId*(
of PortalSubnetwork.transactionGossip:
[portalPrefix, 0x4F]

proc banPeer(p: PortalProtocol, nodeId: NodeId, period: chronos.Duration) =
let banTimeout = now(chronos.Moment) + period

if p.bannedPeers.contains(nodeId):
let existingTimeout = p.bannedPeers.getOrDefault(nodeId)
if existingTimeout < banTimeout:
p.bannedPeers[nodeId] = banTimeout
else:
p.bannedPeers[nodeId] = banTimeout

proc isBanned(p: PortalProtocol, nodeId: NodeId): bool =
if not p.bannedPeers.contains(nodeId):
return false

let
currentTime = now(chronos.Moment)
banTimeout = p.bannedPeers.getOrDefault(nodeId)
if currentTime < banTimeout:
return true

# Peer is in bannedPeers table but the time period has expired
p.bannedPeers.del(nodeId)
false

template contentLookupFailedValidation*(p: PortalProtocol, nodeId: NodeId) =
p.banPeer(nodeId, PeerBanDurationContentLookupFailedValidation)

template offerFailedValidation*(p: PortalProtocol, nodeId: NodeId) =
p.banPeer(nodeId, PeerBanDurationOfferFailedValidation)

func `$`(id: PortalProtocolId): string =
id.toHex()

Expand All @@ -299,8 +336,11 @@ func getNode*(p: PortalProtocol, id: NodeId): Opt[Node] =
func localNode*(p: PortalProtocol): Node =
p.baseProtocol.localNode

func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
p.routingTable.neighbours(id = id, seenOnly = seenOnly)
proc neighbours*(
p: PortalProtocol, id: NodeId, k: int = BUCKET_SIZE, seenOnly = false
): seq[Node] =
# TODO: Maybe banned peers should be in the routing table and filtered out at that level
p.routingTable.neighbours(id, k, seenOnly).filterIt(not p.isBanned(it.id))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kdeme Any thoughts on whether it would be a good idea to add the ban list into the routing table code in nim-eth? I believe the same routing table is used by the discv5 code and so it might be a good way to apply something similar at the discv5 level.

Copy link
Contributor

@kdeme kdeme Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be either in the routing table object or in PortalProtocol and DiscoveryProtocol. The latter would require a few more wrappers like you did now for neighbours.

What is more important I think is that banned nodes do not stay in the routing table. They should be removed from the routing table and not be allowed to re-added. That way the routing table gives space to valid nodes and a neighbours call gives better results.

I think achieving this might be slightly simpler when the ban list is part of routing table, not sure.

But yes, the ban list would also be useful for discv5 I think.


func distance(p: PortalProtocol, a, b: NodeId): UInt256 =
p.routingTable.distance(a, b)
Expand Down Expand Up @@ -437,7 +477,7 @@ proc handleFindContent(
# Node does not have the content, or content is not even in radius,
# send closest neighbours to the requested content id.
let
closestNodes = p.routingTable.neighbours(NodeId(contentId), seenOnly = true)
closestNodes = p.neighbours(NodeId(contentId), seenOnly = true)
enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead)
portal_content_enrs_packed.observe(enrs.len().int64, labelValues = [$p.protocolId])

Expand Down Expand Up @@ -514,6 +554,12 @@ proc messageHandler(

let p = PortalProtocol(protocol)

if p.isBanned(srcId):
# The sender of the message is in the temporary node ban list
# so we don't process the message
debug "Ignoring message from banned node", srcId, srcUdpAddress
return @[]

let decoded = decodeMessage(request)
if decoded.isOk():
let message = decoded.get()
Expand Down Expand Up @@ -1031,7 +1077,7 @@ proc lookup*(
## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation.
var closestNodes = p.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false)
var closestNodes = p.neighbours(target, BUCKET_SIZE, seenOnly = false)

var asked, seen = HashSet[NodeId]()
asked.incl(p.localNode.id) # No need to ask our own node
Expand Down Expand Up @@ -1076,7 +1122,7 @@ proc lookup*(
let nodes = await query
# TODO: Remove node on timed-out query?
for n in nodes:
if not seen.containsOrIncl(n.id):
if not seen.containsOrIncl(n.id) and not p.isBanned(n.id):
# If it wasn't seen before, insert node while remaining sorted
closestNodes.insert(
n,
Expand Down Expand Up @@ -1133,7 +1179,7 @@ proc contentLookup*(
## target.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation.
var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
var closestNodes = p.neighbours(targetId, BUCKET_SIZE, seenOnly = false)

# Shuffling the order of the nodes in order to not always hit the same node
# first for the same request.
Expand Down Expand Up @@ -1206,14 +1252,15 @@ proc contentLookup*(
of Nodes:
let maybeRadius = p.radiusCache.get(content.src.id)
if maybeRadius.isSome() and
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId):
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId) and
not p.isBanned(content.src.id):
# Only return nodes which may be interested in content.
# No need to check for duplicates in nodesWithoutContent
# as requests are never made two times to the same node.
nodesWithoutContent.add(content.src)

for n in content.nodes:
if not seen.containsOrIncl(n.id):
if not seen.containsOrIncl(n.id) and not p.isBanned(n.id):
discard p.addNode(n)
# If it wasn't seen before, insert node while remaining sorted
closestNodes.insert(
Expand Down Expand Up @@ -1259,7 +1306,7 @@ proc traceContentLookup*(
# Need to use a system clock and not the mono clock for this.
let startedAtMs = int64(times.epochTime() * 1000)

var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
var closestNodes = p.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
# Shuffling the order of the nodes in order to not always hit the same node
# first for the same request.
p.baseProtocol.rng[].shuffle(closestNodes)
Expand Down Expand Up @@ -1350,7 +1397,8 @@ proc traceContentLookup*(

let maybeRadius = p.radiusCache.get(content.src.id)
if maybeRadius.isSome() and
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId):
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId) and
not p.isBanned(content.src.id):
# Only return nodes which may be interested in content.
# No need to check for duplicates in nodesWithoutContent
# as requests are never made two times to the same node.
Expand All @@ -1364,7 +1412,7 @@ proc traceContentLookup*(
metadata["0x" & $n.id] = NodeMetadata(enr: n.record, distance: dist)
respondedWith.add(n.id)

if not seen.containsOrIncl(n.id):
if not seen.containsOrIncl(n.id) and not p.isBanned(n.id):
discard p.addNode(n)
# If it wasn't seen before, insert node while remaining sorted
closestNodes.insert(
Expand Down Expand Up @@ -1457,7 +1505,7 @@ proc query*(
## This will take k nodes from the routing table closest to target and
## query them for nodes closest to target. If there are less than k nodes in
## the routing table, nodes returned by the first queries will be used.
var queryBuffer = p.routingTable.neighbours(target, k, seenOnly = false)
var queryBuffer = p.neighbours(target, k, seenOnly = false)

var asked, seen = HashSet[NodeId]()
asked.incl(p.localNode.id) # No need to ask our own node
Expand Down Expand Up @@ -1512,8 +1560,7 @@ proc queryRandom*(
proc getNClosestNodesWithRadius*(
p: PortalProtocol, targetId: NodeId, n: int, seenOnly: bool = false
): seq[(Node, UInt256)] =
let closestLocalNodes =
p.routingTable.neighbours(targetId, k = n, seenOnly = seenOnly)
let closestLocalNodes = p.neighbours(targetId, k = n, seenOnly = seenOnly)

var nodesWithRadiuses: seq[(Node, UInt256)]
for node in closestLocalNodes:
Expand Down Expand Up @@ -1558,8 +1605,7 @@ proc neighborhoodGossip*(
# table, but at the same time avoid unnecessary node lookups.
# It might still cause issues in data getting propagated in a wider id range.

let closestLocalNodes =
p.routingTable.neighbours(NodeId(contentId), k = 16, seenOnly = true)
let closestLocalNodes = p.neighbours(NodeId(contentId), BUCKET_SIZE, seenOnly = true)

var gossipNodes: seq[Node]
for node in closestLocalNodes:
Expand Down
4 changes: 4 additions & 0 deletions fluffy/rpc/rpc_portal_state_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
of Content:
let valueBytes = foundContentResult.content
validateRetrieval(key, valueBytes).isOkOr:
p.contentLookupFailedValidation(node.id)
raise invalidValueErr()

let res = ContentInfo(
Expand Down Expand Up @@ -97,6 +98,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
valueBytes = contentLookupResult.content

validateRetrieval(key, valueBytes).isOkOr:
p.contentLookupFailedValidation(contentLookupResult.receivedFrom.id)
raise invalidValueErr()
p.storeContent(keyBytes, contentId, valueBytes, cacheContent = true)

Expand Down Expand Up @@ -124,6 +126,8 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
raise contentNotFoundErrWithTrace(data)

validateRetrieval(key, valueBytes).isOkOr:
if res.trace.receivedFrom.isSome():
p.contentLookupFailedValidation(res.trace.receivedFrom.get())
raise invalidValueErr()
p.storeContent(keyBytes, contentId, valueBytes, cacheContent = true)

Expand Down
Loading