diff --git a/scuttlebutt-handshake/build.gradle b/scuttlebutt-handshake/build.gradle index be3c1c5b..fb770acc 100644 --- a/scuttlebutt-handshake/build.gradle +++ b/scuttlebutt-handshake/build.gradle @@ -18,5 +18,7 @@ dependencies { testCompile 'org.logl:logl-logl' testCompile 'org.logl:logl-vertx' + testCompile project(':scuttlebutt-rpc') + testRuntime 'org.junit.jupiter:junit-jupiter-engine' } diff --git a/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java b/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java index 5750adf0..71df3803 100644 --- a/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java +++ b/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java @@ -50,6 +50,8 @@ private class NetSocketClientHandler { private SecureScuttlebuttStreamClient client; private ClientHandler handler; + private Bytes messageBuffer = Bytes.EMPTY; + NetSocketClientHandler( Logger logger, NetSocket socket, @@ -94,15 +96,38 @@ void handle(Buffer buffer) { handshakeCounter++; } else { Bytes message = client.readFromServer(Bytes.wrapBuffer(buffer)); - if (message == null) { - return; - } - if (SecureScuttlebuttStreamServer.isGoodbye(message)) { - logger.debug("Goodbye received from remote peer"); - socket.close(); - } else { - handler.receivedMessage(message); + messageBuffer = Bytes.concatenate(messageBuffer, message); + + int headerSize = 9; + + // Process any whole RPC message repsonses we have, and leave any partial ones at the end in the buffer + // We may have 1 or more whole messages, or 1 and a half, etc.. + while (messageBuffer.size() >= headerSize) { + + Bytes header = messageBuffer.slice(0, 9); + int bodyLength = getBodyLength(header); + + if ((messageBuffer.size() - headerSize) >= (bodyLength)) { + + int headerAndBodyLength = bodyLength + headerSize; + Bytes wholeMessage = messageBuffer.slice(0, headerAndBodyLength); + + if (SecureScuttlebuttStreamServer.isGoodbye(wholeMessage)) { + logger.debug("Goodbye received from remote peer"); + socket.close(); + } else { + handler.receivedMessage(wholeMessage); + } + + // We've removed 1 RPC message from the message buffer, leave the remaining messages / part of a message + // in the buffer to be processed in the next iteration + messageBuffer = messageBuffer.slice(headerAndBodyLength); + } else { + // We don't have a full RPC message, leave the bytes in the buffer for when more arrive + break; + } } + } } catch (HandshakeException | StreamException e) { completionHandle.completeExceptionally(e); @@ -119,6 +144,11 @@ void handle(Buffer buffer) { } } + private int getBodyLength(Bytes rpcHeader) { + Bytes size = rpcHeader.slice(1, 4); + return size.toInt(); + } + private final LoggerProvider loggerProvider; private final Vertx vertx; private final Signature.KeyPair keyPair; diff --git a/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java b/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java index f744bcb1..95249db8 100644 --- a/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java +++ b/scuttlebutt-handshake/src/main/java/net/consensys/cava/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java @@ -45,6 +45,8 @@ private final class NetSocketHandler { SecureScuttlebuttHandshakeServer handshakeServer = SecureScuttlebuttHandshakeServer.create(keyPair, networkIdentifier); + private Bytes messageBuffer = Bytes.EMPTY; + void handle(NetSocket netSocket) { this.netSocket = netSocket; netSocket.closeHandler(res -> { @@ -79,10 +81,35 @@ private void handleMessage(Buffer buffer) { }); } else { Bytes message = streamServer.readFromClient(Bytes.wrapBuffer(buffer)); - if (SecureScuttlebuttStreamServer.isGoodbye(message)) { - netSocket.close(); - } else { - handler.receivedMessage(message); + messageBuffer = Bytes.concatenate(messageBuffer, message); + + int headerSize = 9; + + // Process any whole RPC message repsonses we have, and leave any partial ones at the end in the buffer + // We may have 1 or more whole messages, or 1 and a half, etc.. + while (messageBuffer.size() >= headerSize) { + + Bytes header = messageBuffer.slice(0, 9); + int bodyLength = getBodyLength(header); + + if ((messageBuffer.size() - headerSize) >= (bodyLength)) { + + int headerAndBodyLength = bodyLength + headerSize; + Bytes wholeMessage = messageBuffer.slice(0, headerAndBodyLength); + + if (SecureScuttlebuttStreamServer.isGoodbye(wholeMessage)) { + netSocket.close(); + } else { + handler.receivedMessage(wholeMessage); + } + + // We've removed 1 RPC message from the message buffer, leave the remaining messages / part of a message + // in the buffer to be processed in the next iteration + messageBuffer = messageBuffer.slice(headerAndBodyLength); + } else { + // We don't have a full RPC message, leave the bytes in the buffer for when more arrive + break; + } } } } catch (HandshakeException | StreamException e) { @@ -92,6 +119,11 @@ private void handleMessage(Buffer buffer) { } } + private int getBodyLength(Bytes rpcHeader) { + Bytes size = rpcHeader.slice(1, 4); + return size.toInt(); + } + private final Vertx vertx; private final InetSocketAddress addr; private final Signature.KeyPair keyPair; diff --git a/scuttlebutt-handshake/src/test/java/net/consensys/cava/scuttlebutt/handshake/vertx/VertxIntegrationTest.java b/scuttlebutt-handshake/src/test/java/net/consensys/cava/scuttlebutt/handshake/vertx/VertxIntegrationTest.java index 583ef10e..b6eb21b1 100644 --- a/scuttlebutt-handshake/src/test/java/net/consensys/cava/scuttlebutt/handshake/vertx/VertxIntegrationTest.java +++ b/scuttlebutt-handshake/src/test/java/net/consensys/cava/scuttlebutt/handshake/vertx/VertxIntegrationTest.java @@ -22,6 +22,8 @@ import net.consensys.cava.crypto.sodium.Signature; import net.consensys.cava.junit.VertxExtension; import net.consensys.cava.junit.VertxInstance; +import net.consensys.cava.scuttlebutt.rpc.RPCCodec; +import net.consensys.cava.scuttlebutt.rpc.RPCFlag; import java.io.BufferedWriter; import java.io.OutputStreamWriter; @@ -114,10 +116,21 @@ void connectToServer(@VertxInstance Vertx vertx) throws Exception { Thread.sleep(1000); assertNotNull(handler); - handler.sendMessage(Bytes.fromHexString("deadbeef")); + + String rpcRequestBody = "{\"name\": [\"whoami\"],\"type\": \"async\",\"args\":[]}"; + Bytes rpcRequest = RPCCodec.encodeRequest(rpcRequestBody, RPCFlag.BodyType.JSON); + + handler.sendMessage(rpcRequest); + Thread.sleep(1000); MyServerHandler serverHandler = serverHandlerRef.get(); - assertEquals(Bytes.fromHexString("deadbeef"), serverHandler.received); + + Bytes receivedBytes = serverHandler.received; + Bytes receivedBody = receivedBytes.slice(9); + + Bytes requestBody = rpcRequest.slice(9); + + assertEquals(requestBody, receivedBody); handler.closeStream(); Thread.sleep(1000); diff --git a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/PatchworkIntegrationTest.java index 3ffe91e2..60d61942 100644 --- a/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/PatchworkIntegrationTest.java +++ b/scuttlebutt-rpc/src/test/java/net/consensys/cava/scuttlebutt/rpc/PatchworkIntegrationTest.java @@ -69,8 +69,12 @@ public MyClientHandler(Consumer sender, Runnable terminationFn) { @Override public void receivedMessage(Bytes message) { - System.out.println("We received a message?"); - System.out.println(new String(message.toArrayUnsafe(), UTF_8)); + + RPCMessage rpcMessage = new RPCMessage(message); + + System.out.println(rpcMessage.asString()); + + } @Override @@ -99,7 +103,6 @@ void runWithPatchWork(@VertxInstance Vertx vertx) throws Exception { new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8)))); LoglLogDelegateFactory.setProvider(loggerProvider); - Optional ssbDir = Optional.fromNullable(System.getenv().get("ssb_dir")); Optional homePath = Optional.fromNullable(System.getProperty("user.home")).transform(home -> home + "/.ssb");