Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Commit

Permalink
Handle handshake response collated with hello message in buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jan 16, 2019
1 parent 655281f commit 0de027c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
import org.bouncycastle.crypto.engines.AESEngine;
import org.bouncycastle.crypto.macs.HMac;
import org.bouncycastle.crypto.modes.SICBlockCipher;
import org.bouncycastle.crypto.params.*;
import org.bouncycastle.crypto.params.ECPrivateKeyParameters;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import org.bouncycastle.crypto.params.IESWithCipherParameters;
import org.bouncycastle.crypto.params.KDFParameters;
import org.bouncycastle.crypto.params.ParametersWithIV;
import org.bouncycastle.util.BigIntegers;

/**
Expand Down Expand Up @@ -262,6 +266,18 @@ private static EthereumIESEncryptionEngine forEncryption(
return engine;
}

/**
* Identify the size of a handshake message based on elements of the common MAC.
*
* @param msgBytes the bytes of the message
* @return the size of the message, including MAC, key and IV
*/
public static int messageSize(Bytes msgBytes) {
Bytes commonMac = msgBytes.slice(0, 2);
int size = (commonMac.get(1) & 0xFF) + ((commonMac.get(0) & 0xFF) << 8);
return size + 2;
}

static Bytes decryptMessage(Bytes msgBytes, SecretKey ourKey) {
Bytes commonMac = msgBytes.slice(0, 2);
int size = (commonMac.get(1) & 0xFF) + ((commonMac.get(0) & 0xFF) << 8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import net.consensys.cava.crypto.SECP256K1.KeyPair;
import net.consensys.cava.crypto.SECP256K1.PublicKey;
import net.consensys.cava.rlpx.HandshakeMessage;
import net.consensys.cava.rlpx.InvalidMACException;
import net.consensys.cava.rlpx.MemoryWireConnectionsRepository;
import net.consensys.cava.rlpx.RLPxConnection;
import net.consensys.cava.rlpx.RLPxConnectionFactory;
Expand Down Expand Up @@ -299,8 +298,14 @@ public AsyncCompletion connectTo(PublicKey peerPublicKey, InetSocketAddress peer
@Override
public void handle(Buffer buffer) {
try {
Bytes messageBytes = Bytes.wrapBuffer(buffer);
if (conn == null) {
Bytes responseBytes = Bytes.wrapBuffer(buffer);
int messageSize = RLPxConnectionFactory.messageSize(messageBytes);
Bytes responseBytes = messageBytes;
if (messageBytes.size() > messageSize) {
responseBytes = responseBytes.slice(0, messageSize);
}
messageBytes = messageBytes.slice(messageSize);
HandshakeMessage responseMessage =
RLPxConnectionFactory.readResponse(responseBytes, keyPair.secretKey());
conn = RLPxConnectionFactory.createConnection(
Expand All @@ -315,12 +320,15 @@ public void handle(Buffer buffer) {
peerPublicKey);

this.wireConnection = createConnection(conn, netSocket);
wireConnection.handleConnectionStart();
connected.complete();
} else {
conn.stream(Bytes.wrapBuffer(buffer), wireConnection::messageReceived);
if (messageBytes.isEmpty()) {
return;
}
}
} catch (InvalidMACException e) {
if (conn != null) {
conn.stream(messageBytes, wireConnection::messageReceived);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
connected.completeExceptionally(e);
netSocket.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,14 @@ public WireConnection(
this.p2pVersion = p2pVersion;
this.clientId = clientId;
this.advertisedPort = advertisedPort;
logger.debug("New wire connection created");
}

public void messageReceived(RLPxMessage message) {
if (message.messageId() == 0) {
peerHelloMessage = HelloMessage.read(message.content());
logger.debug("Received peer Hello message {}", peerHelloMessage);
afterHandshakeListener.accept(peerHelloMessage);
initSupportedRange(peerHelloMessage.capabilities());

if (peerHelloMessage.nodeId() == null || peerHelloMessage.nodeId().isEmpty()) {
disconnect(DisconnectReason.NULL_NODE_IDENTITY_RECEIVED);
Expand All @@ -157,10 +158,12 @@ public void messageReceived(RLPxMessage message) {
return;
}

initSupportedRange(peerHelloMessage.capabilities());
if (myHelloMessage == null) {
sendHello();
}

afterHandshakeListener.accept(peerHelloMessage);

for (SubProtocol subProtocol : subprotocolRangeMap.asMapOfRanges().values()) {
subprotocols.get(subProtocol).newPeerConnection(this);
}
Expand All @@ -172,6 +175,7 @@ public void messageReceived(RLPxMessage message) {
}

if (peerHelloMessage == null || myHelloMessage == null) {
logger.debug("Message sent before hello exchanged {}", message.messageId());
disconnect(DisconnectReason.PROTOCOL_BREACH);
}

Expand Down Expand Up @@ -248,7 +252,7 @@ void sendHello() {
clientId,
subprotocols.keySet().stream().map(sp -> new Capability(sp.id().name(), sp.id().version())).collect(
Collectors.toList()));
logger.debug("Sending a hello message {}", myHelloMessage);
logger.debug("Sending hello message {}", myHelloMessage);
writer.accept(new RLPxMessage(0, myHelloMessage.toBytes()));
}

Expand All @@ -257,6 +261,7 @@ public String id() {
}

public void sendMessage(WireSubProtocolMessage message) {
logger.debug("Sending sub-protocol message {}", message);
Integer offset = null;
for (Map.Entry<Range<Integer>, SubProtocol> entry : subprotocolRangeMap.asMapOfRanges().entrySet()) {
if (entry.getValue().supports(message.subProtocolIdentifier())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.logl.Level;
import org.logl.Logger;
import org.logl.LoggerProvider;
import org.logl.logl.SimpleLogger;

Expand Down Expand Up @@ -151,8 +152,8 @@ void testTwoServicesSendingMessagesOfCustomSubProtocolToEachOther(@VertxInstance
SECP256K1.KeyPair secondKp = SECP256K1.KeyPair.random();
MyCustomSubProtocol sp = new MyCustomSubProtocol(1);
MyCustomSubProtocol secondSp = new MyCustomSubProtocol(2);
LoggerProvider logProvider =
SimpleLogger.toPrintWriter(new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.err, UTF_8))));
LoggerProvider logProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter(
new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.err, UTF_8))));
MemoryWireConnectionsRepository repository = new MemoryWireConnectionsRepository();
VertxRLPxService service = new VertxRLPxService(
vertx,
Expand Down Expand Up @@ -272,14 +273,17 @@ void testTwoServicesSendingMessagesOfCustomSubProtocolToEachOtherSimultaneously(
@Test
@Disabled
void connectToPeer(@VertxInstance Vertx vertx) throws Exception {
LoggerProvider logProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter(
new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.err, UTF_8))));
Logger logger = logProvider.getLogger("test");

SECP256K1.KeyPair kp = SECP256K1.KeyPair.fromSecretKey(
SECP256K1.SecretKey
.fromBytes(Bytes32.fromHexString("0x2CADB9DDEA3E675CC5349A1AF053CF2E144AF657016A6155DF4AD767F561F18E")));
System.out.println(kp.secretKey().bytes().toHexString());
logger.debug(kp.secretKey().bytes().toHexString());

logger.debug("enode://" + kp.publicKey().toHexString() + "@127.0.0.1:36000");

System.out.println("enode://" + kp.publicKey().toHexString() + "@127.0.0.1:36000");
LoggerProvider logProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter(
new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.err, UTF_8))));
MemoryWireConnectionsRepository repository = new MemoryWireConnectionsRepository();

VertxRLPxService service = new VertxRLPxService(
Expand All @@ -295,12 +299,12 @@ public SubProtocolIdentifier id() {
return new SubProtocolIdentifier() {
@Override
public String name() {
return "shh";
return "eth";
}

@Override
public int version() {
return 6;
return 63;
}
};
}
Expand Down

0 comments on commit 0de027c

Please sign in to comment.