Skip to content

Commit

Permalink
Merge branch 'main' into stretch-timeout-when-low-peer-count
Browse files Browse the repository at this point in the history
  • Loading branch information
macfarla authored Nov 28, 2023
2 parents 0ab5306 + 8f5a444 commit 0766382
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,18 @@ public RequestManager.ResponseStream send(
throws PeerNotConnected {
if (connectionToUse.getAgreedCapabilities().stream()
.noneMatch(capability -> capability.getName().equalsIgnoreCase(protocolName))) {
LOG.debug("Protocol {} unavailable for this peer {}", protocolName, this.getShortNodeId());
LOG.atDebug()
.setMessage("Protocol {} unavailable for this peer {}...")
.addArgument(protocolName)
.addArgument(this.getShortNodeId())
.log();
return null;
}
if (permissioningProviders.stream()
.anyMatch(
p -> !p.isMessagePermitted(connectionToUse.getRemoteEnode(), messageData.getCode()))) {
LOG.info(
"Permissioning blocked sending of message code {} to {}",
"Permissioning blocked sending of message code {} to {}...",
messageData.getCode(),
this.getShortNodeId());
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -455,7 +459,7 @@ public PeerReputation getReputation() {
}

void handleDisconnect() {
LOG.debug("handleDisconnect - EthPeer {}", this);
LOG.trace("handleDisconnect - EthPeer {}", this);

requestManagers.forEach(
(protocolName, map) -> map.forEach((code, requestManager) -> requestManager.close()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,14 @@ public void handleDisconnect(
final DisconnectReason reason,
final boolean initiatedByPeer) {
if (ethPeers.registerDisconnect(connection)) {
LOG.debug(
"Disconnect - {} - {} - {}... - {} peers left\n{}",
initiatedByPeer ? "Inbound" : "Outbound",
reason,
connection.getPeer().getId().slice(0, 8),
ethPeers.peerCount(),
ethPeers);
LOG.atDebug()
.setMessage("Disconnect - {} - {} - {}... - {} peers left")
.addArgument(initiatedByPeer ? "Inbound" : "Outbound")
.addArgument(reason)
.addArgument(connection.getPeer().getId().slice(0, 8))
.addArgument(ethPeers.peerCount())
.log();
LOG.trace("{}", ethPeers);
}
}

Expand All @@ -410,7 +411,16 @@ private void handleStatusMessage(final EthPeer peer, final Message message) {
peer.getConnection().getPeer().setForkId(forkId);
try {
if (!status.networkId().equals(networkId)) {
LOG.debug("Mismatched network id: {}, EthPeer {}", status.networkId(), peer);
LOG.atDebug()
.setMessage("Mismatched network id: {}, EthPeer {}...")
.addArgument(status.networkId())
.addArgument(peer.getShortNodeId())
.log();
LOG.atTrace()
.setMessage("Mismatched network id: {}, EthPeer {}")
.addArgument(status.networkId())
.addArgument(peer)
.log();
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} else if (!forkIdManager.peerCheck(forkId) && status.protocolVersion() > 63) {
LOG.debug(
Expand All @@ -428,7 +438,10 @@ private void handleStatusMessage(final EthPeer peer, final Message message) {
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} else if (mergePeerFilter.isPresent()
&& mergePeerFilter.get().disconnectIfPoW(status, peer)) {
LOG.debug("Post-merge disconnect: peer still PoW {}", peer);
LOG.atDebug()
.setMessage("Post-merge disconnect: peer still PoW {}")
.addArgument(peer.getShortNodeId())
.log();
handleDisconnect(peer.getConnection(), DisconnectReason.SUBPROTOCOL_TRIGGERED, false);
} else {
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,19 @@ protected Optional<List<BlockHeader>> processResponse(
updatePeerChainState(peer, header);
}

LOG.debug(
"Received {} of {} headers requested from peer {}",
headersList.size(),
count,
peer.getShortNodeId());
LOG.atDebug()
.setMessage("Received {} of {} headers requested from peer {}...")
.addArgument(headersList.size())
.addArgument(count)
.addArgument(peer.getShortNodeId())
.log();
return Optional.of(headersList);
}

private void updatePeerChainState(final EthPeer peer, final BlockHeader blockHeader) {
if (blockHeader.getNumber() > peer.chainState().getEstimatedHeight()) {
LOG.atTrace()
.setMessage("Updating chain state for peer {} to block header {}")
.setMessage("Updating chain state for peer {}... to block header {}")
.addArgument(peer::getShortNodeId)
.addArgument(blockHeader::toLogString)
.log();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ public static AbstractGetHeadersFromPeerTask forSingleHash(
protected PendingPeerRequest sendRequest() {
return sendRequestToPeer(
peer -> {
LOG.debug(
"Requesting {} headers (hash {}...) from peer {}.",
count,
referenceHash.slice(0, 6),
peer.getShortNodeId());
LOG.atDebug()
.setMessage("Requesting {} headers (hash {}...) from peer {}...")
.addArgument(count)
.addArgument(referenceHash.slice(0, 6))
.addArgument(peer.getShortNodeId())
.log();
return peer.getHeadersByHash(referenceHash, count, skip, reverse);
},
minimumRequiredBlockNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ public class RecursivePeerRefreshState {
}

void start(final List<DiscoveryPeer> initialPeers, final Bytes target) {
// TODO check this flag earlier
if (iterativeSearchInProgress) {
LOG.debug("Skip peer search because previous search is still in progress.");
LOG.debug(
"Skip peer search because previous search ({}) is still in progress.", currentRound);
return;
}
LOG.debug("Start peer search.");
Expand All @@ -93,7 +95,7 @@ private boolean reachedMaximumNumberOfRounds() {
}

private void addInitialPeers(final List<DiscoveryPeer> initialPeers) {
LOG.debug("INITIAL PEERS: {}", initialPeers);
LOG.debug("{} INITIAL PEERS: {}", initialPeers.size(), initialPeers);
this.initialPeers = initialPeers;
for (final DiscoveryPeer peer : initialPeers) {
final MetadataPeer iterationParticipant =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ protected AbstractPeerConnection(
this.inboundInitiated = inboundInitiated;
this.initiatedAt = System.currentTimeMillis();

LOG.debug("New PeerConnection ({}) established with peer {}", this, peer.getId());
LOG.atDebug()
.setMessage("New PeerConnection ({}) established with peer {}...")
.addArgument(this)
.addArgument(peer.getId().slice(0, 16))
.log();
}

@Override
Expand Down Expand Up @@ -164,7 +168,11 @@ public void terminateConnection(final DisconnectReason reason, final boolean pee
// Always ensure the context gets closed immediately even if we previously sent a disconnect
// message and are waiting to close.
closeConnectionImmediately();
LOG.debug("Terminating connection {}, reason {}", this, reason);
LOG.atTrace()
.setMessage("Terminating connection {}, reason {}")
.addArgument(this)
.addArgument(reason)
.log();
}

protected abstract void closeConnectionImmediately();
Expand All @@ -176,11 +184,12 @@ public void disconnect(final DisconnectReason reason) {
if (disconnected.compareAndSet(false, true)) {
connectionEventDispatcher.dispatchDisconnect(this, reason, false);
doSend(null, DisconnectMessage.create(reason));
LOG.debug(
"Disconnecting connection {}, peer {}... reason {}",
System.identityHashCode(this),
peer.getId().slice(0, 16),
reason);
LOG.atDebug()
.setMessage("Disconnecting connection {}, peer {}... reason {}")
.addArgument(this.hashCode())
.addArgument(peer.getId().slice(0, 16))
.addArgument(reason)
.log();
closeConnection();
}
}
Expand Down

0 comments on commit 0766382

Please sign in to comment.