Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Commit

Permalink
Merge pull request #122 from atoulme/rlpx_prod_fixes
Browse files Browse the repository at this point in the history
Enable snappy compression of RLPx when p2pVersion is equal or more than 5
  • Loading branch information
atoulme authored Jan 16, 2019
2 parents 9c9b650 + 8a5bab8 commit 5300bcf
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:

environment:
TERM: dumb
JAVA_TOOL_OPTIONS: -Xmx1024m
JAVA_TOOL_OPTIONS: -Xmx768m
GRADLE_OPTS: -Dorg.gradle.daemon=false -Dorg.gradle.workers.max=2
GRADLE_MAX_TEST_FORKS: 2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import net.consensys.cava.bytes.MutableBytes;
import net.consensys.cava.crypto.SECP256K1;
import net.consensys.cava.rlp.RLP;
import net.consensys.cava.rlpx.wire.HelloMessage;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -70,10 +71,10 @@ private static Bytes32 snapshot(KeccakDigest digest) {
private final KeccakDigest ingressMac = new KeccakDigest(Bytes32.SIZE * 8);
private final SECP256K1.PublicKey publicKey;
private final SECP256K1.PublicKey peerPublicKey;

private final AESEngine macEncryptionEngine;

private boolean applySnappyCompression = false;
private Bytes buffer = Bytes.EMPTY;

RLPxConnection(
Bytes32 aesSecret,
Expand Down Expand Up @@ -113,7 +114,9 @@ public SECP256K1.PublicKey peerPublicKey() {
return peerPublicKey;
}

private Bytes buffer = Bytes.EMPTY;
public void configureAfterHandshake(HelloMessage helloMessage) {
this.applySnappyCompression = helloMessage.p2pVersion() >= 5;
}

public synchronized void stream(Bytes newBytes, Consumer<RLPxMessage> messageConsumer) {
buffer = Bytes.concatenate(buffer, newBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
import org.bouncycastle.crypto.engines.AESEngine;
import org.bouncycastle.crypto.macs.HMac;
import org.bouncycastle.crypto.modes.SICBlockCipher;
import org.bouncycastle.crypto.params.*;
import org.bouncycastle.crypto.params.ECPrivateKeyParameters;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import org.bouncycastle.crypto.params.IESWithCipherParameters;
import org.bouncycastle.crypto.params.KDFParameters;
import org.bouncycastle.crypto.params.ParametersWithIV;
import org.bouncycastle.util.BigIntegers;

/**
Expand Down Expand Up @@ -262,6 +266,18 @@ private static EthereumIESEncryptionEngine forEncryption(
return engine;
}

/**
* Identify the size of a handshake message based on elements of the common MAC.
*
* @param msgBytes the bytes of the message
* @return the size of the message, including MAC, key and IV
*/
public static int messageSize(Bytes msgBytes) {
Bytes commonMac = msgBytes.slice(0, 2);
int size = (commonMac.get(1) & 0xFF) + ((commonMac.get(0) & 0xFF) << 8);
return size + 2;
}

static Bytes decryptMessage(Bytes msgBytes, SecretKey ourKey) {
Bytes commonMac = msgBytes.slice(0, 2);
int size = (commonMac.get(1) & 0xFF) + ((commonMac.get(0) & 0xFF) << 8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import net.consensys.cava.crypto.SECP256K1.KeyPair;
import net.consensys.cava.crypto.SECP256K1.PublicKey;
import net.consensys.cava.rlpx.HandshakeMessage;
import net.consensys.cava.rlpx.InvalidMACException;
import net.consensys.cava.rlpx.MemoryWireConnectionsRepository;
import net.consensys.cava.rlpx.RLPxConnection;
import net.consensys.cava.rlpx.RLPxConnectionFactory;
Expand Down Expand Up @@ -299,8 +298,14 @@ public AsyncCompletion connectTo(PublicKey peerPublicKey, InetSocketAddress peer
@Override
public void handle(Buffer buffer) {
try {
Bytes messageBytes = Bytes.wrapBuffer(buffer);
if (conn == null) {
Bytes responseBytes = Bytes.wrapBuffer(buffer);
int messageSize = RLPxConnectionFactory.messageSize(messageBytes);
Bytes responseBytes = messageBytes;
if (messageBytes.size() > messageSize) {
responseBytes = responseBytes.slice(0, messageSize);
}
messageBytes = messageBytes.slice(messageSize);
HandshakeMessage responseMessage =
RLPxConnectionFactory.readResponse(responseBytes, keyPair.secretKey());
conn = RLPxConnectionFactory.createConnection(
Expand All @@ -315,12 +320,15 @@ public void handle(Buffer buffer) {
peerPublicKey);

this.wireConnection = createConnection(conn, netSocket);
wireConnection.handleConnectionStart();
connected.complete();
} else {
conn.stream(Bytes.wrapBuffer(buffer), wireConnection::messageReceived);
if (messageBytes.isEmpty()) {
return;
}
}
} catch (InvalidMACException e) {
if (conn != null) {
conn.stream(messageBytes, wireConnection::messageReceived);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
connected.completeExceptionally(e);
netSocket.close();
Expand All @@ -345,6 +353,7 @@ private WireConnection createConnection(RLPxConnection conn, NetSocket netSocket
vertx.eventBus().send(netSocket.writeHandlerID(), Buffer.buffer(bytes.toArrayUnsafe()));
}
},
conn::configureAfterHandshake,
netSocket::end,
handlers,
DEVP2P_VERSION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ final class Capability {

private final String name;

private final String version;
private final int version;


Capability(String name, String version) {
Capability(String name, int version) {
this.name = name;
this.version = version;
}
Expand All @@ -30,7 +30,7 @@ String name() {
return name;
}

String version() {
int version() {
return version;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
final class DefaultSubProtocolIdentifier implements SubProtocolIdentifier {

private final String name;
private final String version;
private final int version;

public DefaultSubProtocolIdentifier(String name, String version) {
public DefaultSubProtocolIdentifier(String name, int version) {
this.name = name;
this.version = version;
}
Expand All @@ -31,7 +31,7 @@ public String name() {
}

@Override
public String version() {
public int version() {
return version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.List;
import java.util.Objects;

final class HelloMessage implements WireProtocolMessage {
public final class HelloMessage implements WireProtocolMessage {

private final Bytes nodeId;
private final int listenPort;
Expand Down Expand Up @@ -53,7 +53,7 @@ static HelloMessage read(Bytes data) {
while (!capabilitiesReader.isComplete()) {
caps.add(
capabilitiesReader.readList(
capabilityReader -> new Capability(capabilityReader.readString(), capabilityReader.readString())));
capabilityReader -> new Capability(capabilityReader.readString(), capabilityReader.readInt())));
}
return caps;
});
Expand All @@ -72,7 +72,7 @@ public Bytes toBytes() {
for (Capability cap : capabilities) {
capabilitiesWriter.writeList(capabilityWriter -> {
capabilityWriter.writeString(cap.name());
capabilityWriter.writeString(cap.version());
capabilityWriter.writeInt(cap.version());
});
}
});
Expand All @@ -94,7 +94,7 @@ List<Capability> capabilities() {
return capabilities;
}

int p2pVersion() {
public int p2pVersion() {
return p2pVersion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface SubProtocol {
* @param version the version of the subprotocol to associate with the range
* @return the length of the range of message types supported by the subprotocol for a given version
*/
int versionRange(String version);
int versionRange(int version);

/**
* Creates a new handler for the subprotocol.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
public interface SubProtocolIdentifier {

static SubProtocolIdentifier of(String name, String version) {
static SubProtocolIdentifier of(String name, int version) {
return new DefaultSubProtocolIdentifier(name, version);
}

Expand All @@ -31,5 +31,5 @@ static SubProtocolIdentifier of(String name, String version) {
*
* @return the version of the subprotocol
*/
String version();
int version();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public String connectionId() {
private final Logger logger;
private final String id;
private final Consumer<RLPxMessage> writer;
private final Consumer<HelloMessage> afterHandshakeListener;
private final Runnable disconnectHandler;
private final LinkedHashMap<SubProtocol, SubProtocolHandler> subprotocols;
private final int p2pVersion;
Expand All @@ -98,6 +99,7 @@ public String connectionId() {
* @param peerNodeId the node id of the peer
* @param logger a logger
* @param writer the message writer
* @param afterHandshakeListener a listener called after the handshake is complete with the peer hello message.
* @param disconnectHandler the handler to run upon receiving a disconnect message
* @param subprotocols the subprotocols supported by this connection
* @param p2pVersion the version of the devp2p protocol supported by this client
Expand All @@ -110,6 +112,7 @@ public WireConnection(
Bytes peerNodeId,
Logger logger,
Consumer<RLPxMessage> writer,
Consumer<HelloMessage> afterHandshakeListener,
Runnable disconnectHandler,
LinkedHashMap<SubProtocol, SubProtocolHandler> subprotocols,
int p2pVersion,
Expand All @@ -120,17 +123,20 @@ public WireConnection(
this.peerNodeId = peerNodeId;
this.logger = logger;
this.writer = writer;
this.afterHandshakeListener = afterHandshakeListener;
this.disconnectHandler = disconnectHandler;
this.subprotocols = subprotocols;
this.p2pVersion = p2pVersion;
this.clientId = clientId;
this.advertisedPort = advertisedPort;
logger.debug("New wire connection created");
}

public void messageReceived(RLPxMessage message) {
if (message.messageId() == 0) {
peerHelloMessage = HelloMessage.read(message.content());
logger.debug("Received peer Hello message {}", peerHelloMessage);
initSupportedRange(peerHelloMessage.capabilities());

if (peerHelloMessage.nodeId() == null || peerHelloMessage.nodeId().isEmpty()) {
disconnect(DisconnectReason.NULL_NODE_IDENTITY_RECEIVED);
Expand All @@ -152,10 +158,12 @@ public void messageReceived(RLPxMessage message) {
return;
}

initSupportedRange(peerHelloMessage.capabilities());
if (myHelloMessage == null) {
sendHello();
}

afterHandshakeListener.accept(peerHelloMessage);

for (SubProtocol subProtocol : subprotocolRangeMap.asMapOfRanges().values()) {
subprotocols.get(subProtocol).newPeerConnection(this);
}
Expand All @@ -167,6 +175,7 @@ public void messageReceived(RLPxMessage message) {
}

if (peerHelloMessage == null || myHelloMessage == null) {
logger.debug("Message sent before hello exchanged {}", message.messageId());
disconnect(DisconnectReason.PROTOCOL_BREACH);
}

Expand Down Expand Up @@ -243,7 +252,7 @@ void sendHello() {
clientId,
subprotocols.keySet().stream().map(sp -> new Capability(sp.id().name(), sp.id().version())).collect(
Collectors.toList()));
logger.debug("Sending a hello message {}", myHelloMessage);
logger.debug("Sending hello message {}", myHelloMessage);
writer.accept(new RLPxMessage(0, myHelloMessage.toBytes()));
}

Expand All @@ -252,6 +261,7 @@ public String id() {
}

public void sendMessage(WireSubProtocolMessage message) {
logger.debug("Sending sub-protocol message {}", message);
Integer offset = null;
for (Map.Entry<Range<Integer>, SubProtocol> entry : subprotocolRangeMap.asMapOfRanges().entrySet()) {
if (entry.getValue().supports(message.subProtocolIdentifier())) {
Expand Down
Loading

0 comments on commit 5300bcf

Please sign in to comment.