Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consider peer reputation score when deciding to disconnect #6187

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private boolean registerDisconnect(
disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer));
peer.handleDisconnect();
abortPendingRequestsAssignedToDisconnectedPeers();
LOG.debug("Disconnected EthPeer {}", peer.getShortNodeId());
LOG.debug("Disconnected EthPeer {}...", peer.getShortNodeId());
LOG.trace("Disconnected EthPeer {}", peer);
}
}
Expand Down Expand Up @@ -391,7 +391,7 @@ public void disconnectWorstUselessPeer() {
peer -> {
LOG.atDebug()
.setMessage(
"disconnecting peer {}. Waiting for better peers. Current {} of max {}")
"disconnecting peer {}... Waiting for better peers. Current {} of max {}")
.addArgument(peer::getShortNodeId)
.addArgument(this::peerCount)
.addArgument(this::getMaxPeers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void handleDisconnect(
"Disconnect - {} - {} - {}... - {} peers left\n{}",
initiatedByPeer ? "Inbound" : "Outbound",
reason,
connection.getPeer().getId().slice(0, 16),
connection.getPeer().getId().slice(0, 8),
ethPeers.peerCount(),
ethPeers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,23 @@
public class PeerReputation implements Comparable<PeerReputation> {
static final long USELESS_RESPONSE_WINDOW_IN_MILLIS =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
static final int DEFAULT_MAX_SCORE = 150;
static final int DEFAULT_MAX_SCORE = 200;
// how much above the initial score you need to be to not get disconnected for timeouts/useless
// responses
static final int HAS_BEEN_USEFUL_SCORE_INCREASE = 10;
macfarla marked this conversation as resolved.
Show resolved Hide resolved
static final int DEFAULT_INITIAL_SCORE = 100;
private static final Logger LOG = LoggerFactory.getLogger(PeerReputation.class);
private static final int TIMEOUT_THRESHOLD = 3;
private static final int TIMEOUT_THRESHOLD = 5;
private static final int USELESS_RESPONSE_THRESHOLD = 5;

private final ConcurrentMap<Integer, AtomicInteger> timeoutCountByRequestType =
new ConcurrentHashMap<>();
private final Queue<Long> uselessResponseTimes = new ConcurrentLinkedQueue<>();

private static final int SMALL_ADJUSTMENT = 1;
private static final int LARGE_ADJUSTMENT = 10;
private static final int LARGE_ADJUSTMENT = 5;

private final int initialScore;
private int score;

private final int maxScore;
Expand All @@ -59,22 +63,37 @@ public PeerReputation(final int initialScore, final int maxScore) {
checkArgument(
initialScore <= maxScore, "Initial score must be less than or equal to max score");
this.maxScore = maxScore;
this.initialScore = initialScore;
this.score = initialScore;
}

public Optional<DisconnectReason> recordRequestTimeout(final int requestCode) {
final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet();
if (newTimeoutCount >= TIMEOUT_THRESHOLD) {
LOG.debug(
"Disconnection triggered by {} repeated timeouts for requestCode {}",
newTimeoutCount,
requestCode);
score -= LARGE_ADJUSTMENT;
return Optional.of(DisconnectReason.TIMEOUT);
// don't trigger disconnect if this peer has a sufficiently high reputation score
if (peerHasNotBeenUseful()) {
LOG.debug(
"Disconnection triggered by {} repeated timeouts for requestCode {}, peer score {}",
newTimeoutCount,
requestCode,
score);
return Optional.of(DisconnectReason.TIMEOUT);
}

LOG.trace(
"Not triggering disconnect for {} repeated timeouts for requestCode {} because peer has high score {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we adjust the score in this case? I might be mis-reading but if a previously useful peer becomes useless, would we never disconnect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the score still does get decremented on line 73

eg

2023-11-20 15:13:42.657+10:00 | nioEventLoopGroup-3-4 | TRACE | PeerReputation | Not triggering disconnect for exceeding useless response threshold because peer has high score 167
2023-11-20 15:13:42.716+10:00 | nioEventLoopGroup-3-2 | TRACE | PeerReputation | Not triggering disconnect for exceeding useless response threshold because peer has high score 116
2023-11-20 15:13:43.492+10:00 | nioEventLoopGroup-3-2 | DEBUG | PeerReputation | Disconnection triggered by exceeding useless response threshold, score 109
2023-11-20 14:28:19.456+10:00 | nioEventLoopGroup-3-5 | TRACE | PeerReputation | Not triggering disconnect for exceeding useless response threshold because peer has high score 117
2023-11-20 14:28:19.582+10:00 | nioEventLoopGroup-3-5 | TRACE | PeerReputation | Not triggering disconnect for exceeding useless response threshold because peer has high score 111
2023-11-20 14:28:22.063+10:00 | EthScheduler-Timer-0 | DEBUG | PeerReputation | Disconnection triggered by 13 repeated timeouts for requestCode 15, peer score 107

newTimeoutCount,
requestCode,
score);
} else {
score -= SMALL_ADJUSTMENT;
return Optional.empty();
}
return Optional.empty();
}

private boolean peerHasNotBeenUseful() {
return score - initialScore < HAS_BEEN_USEFUL_SCORE_INCREASE;
}

public void resetTimeoutCount(final int requestCode) {
Expand All @@ -96,12 +115,19 @@ public Optional<DisconnectReason> recordUselessResponse(final long timestamp) {
}
if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) {
score -= LARGE_ADJUSTMENT;
LOG.debug("Disconnection triggered by exceeding useless response threshold");
return Optional.of(DisconnectReason.USELESS_PEER);
// don't trigger disconnect if this peer has a sufficiently high reputation score
if (peerHasNotBeenUseful()) {
LOG.debug(
"Disconnection triggered by exceeding useless response threshold, score {}", score);
return Optional.of(DisconnectReason.USELESS_PEER);
}
LOG.trace(
"Not triggering disconnect for exceeding useless response threshold because peer has high score {}",
score);
} else {
score -= SMALL_ADJUSTMENT;
return Optional.empty();
}
return Optional.empty();
}

public void recordUsefulResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public void shouldThrowOnInvalidInitialScore() {

@Test
public void shouldOnlyDisconnectWhenTimeoutLimitReached() {
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
Expand All @@ -45,6 +47,11 @@ public void shouldOnlyDisconnectWhenTimeoutLimitReached() {
public void shouldTrackTimeoutsSeparatelyForDifferentRequestTypes() {
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();

assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();

Expand All @@ -57,6 +64,8 @@ public void shouldResetTimeoutCountForRequestType() {
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();

assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();

Expand Down
Loading