Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Commit

Permalink
Hand over one complete message to the ClientHandler implementation at…
Browse files Browse the repository at this point in the history
… a time so that it doesn't have to inspect the headers and calculate how many messages are present (#201)

Process full RPC responses from the server before handing it over to the ClientHandler.
  • Loading branch information
Happy0 authored and atoulme committed Mar 28, 2019
1 parent bcf7819 commit e3e6768
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 17 deletions.
2 changes: 2 additions & 0 deletions scuttlebutt-handshake/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ dependencies {
testCompile 'org.logl:logl-logl'
testCompile 'org.logl:logl-vertx'

testCompile project(':scuttlebutt-rpc')

testRuntime 'org.junit.jupiter:junit-jupiter-engine'
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ private class NetSocketClientHandler {
private SecureScuttlebuttStreamClient client;
private ClientHandler handler;

private Bytes messageBuffer = Bytes.EMPTY;

NetSocketClientHandler(
Logger logger,
NetSocket socket,
Expand Down Expand Up @@ -94,15 +96,38 @@ void handle(Buffer buffer) {
handshakeCounter++;
} else {
Bytes message = client.readFromServer(Bytes.wrapBuffer(buffer));
if (message == null) {
return;
}
if (SecureScuttlebuttStreamServer.isGoodbye(message)) {
logger.debug("Goodbye received from remote peer");
socket.close();
} else {
handler.receivedMessage(message);
messageBuffer = Bytes.concatenate(messageBuffer, message);

int headerSize = 9;

// Process any whole RPC message repsonses we have, and leave any partial ones at the end in the buffer
// We may have 1 or more whole messages, or 1 and a half, etc..
while (messageBuffer.size() >= headerSize) {

Bytes header = messageBuffer.slice(0, 9);
int bodyLength = getBodyLength(header);

if ((messageBuffer.size() - headerSize) >= (bodyLength)) {

int headerAndBodyLength = bodyLength + headerSize;
Bytes wholeMessage = messageBuffer.slice(0, headerAndBodyLength);

if (SecureScuttlebuttStreamServer.isGoodbye(wholeMessage)) {
logger.debug("Goodbye received from remote peer");
socket.close();
} else {
handler.receivedMessage(wholeMessage);
}

// We've removed 1 RPC message from the message buffer, leave the remaining messages / part of a message
// in the buffer to be processed in the next iteration
messageBuffer = messageBuffer.slice(headerAndBodyLength);
} else {
// We don't have a full RPC message, leave the bytes in the buffer for when more arrive
break;
}
}

}
} catch (HandshakeException | StreamException e) {
completionHandle.completeExceptionally(e);
Expand All @@ -119,6 +144,11 @@ void handle(Buffer buffer) {
}
}

private int getBodyLength(Bytes rpcHeader) {
Bytes size = rpcHeader.slice(1, 4);
return size.toInt();
}

private final LoggerProvider loggerProvider;
private final Vertx vertx;
private final Signature.KeyPair keyPair;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ private final class NetSocketHandler {
SecureScuttlebuttHandshakeServer handshakeServer =
SecureScuttlebuttHandshakeServer.create(keyPair, networkIdentifier);

private Bytes messageBuffer = Bytes.EMPTY;

void handle(NetSocket netSocket) {
this.netSocket = netSocket;
netSocket.closeHandler(res -> {
Expand Down Expand Up @@ -79,10 +81,35 @@ private void handleMessage(Buffer buffer) {
});
} else {
Bytes message = streamServer.readFromClient(Bytes.wrapBuffer(buffer));
if (SecureScuttlebuttStreamServer.isGoodbye(message)) {
netSocket.close();
} else {
handler.receivedMessage(message);
messageBuffer = Bytes.concatenate(messageBuffer, message);

int headerSize = 9;

// Process any whole RPC message repsonses we have, and leave any partial ones at the end in the buffer
// We may have 1 or more whole messages, or 1 and a half, etc..
while (messageBuffer.size() >= headerSize) {

Bytes header = messageBuffer.slice(0, 9);
int bodyLength = getBodyLength(header);

if ((messageBuffer.size() - headerSize) >= (bodyLength)) {

int headerAndBodyLength = bodyLength + headerSize;
Bytes wholeMessage = messageBuffer.slice(0, headerAndBodyLength);

if (SecureScuttlebuttStreamServer.isGoodbye(wholeMessage)) {
netSocket.close();
} else {
handler.receivedMessage(wholeMessage);
}

// We've removed 1 RPC message from the message buffer, leave the remaining messages / part of a message
// in the buffer to be processed in the next iteration
messageBuffer = messageBuffer.slice(headerAndBodyLength);
} else {
// We don't have a full RPC message, leave the bytes in the buffer for when more arrive
break;
}
}
}
} catch (HandshakeException | StreamException e) {
Expand All @@ -92,6 +119,11 @@ private void handleMessage(Buffer buffer) {
}
}

private int getBodyLength(Bytes rpcHeader) {
Bytes size = rpcHeader.slice(1, 4);
return size.toInt();
}

private final Vertx vertx;
private final InetSocketAddress addr;
private final Signature.KeyPair keyPair;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import net.consensys.cava.crypto.sodium.Signature;
import net.consensys.cava.junit.VertxExtension;
import net.consensys.cava.junit.VertxInstance;
import net.consensys.cava.scuttlebutt.rpc.RPCCodec;
import net.consensys.cava.scuttlebutt.rpc.RPCFlag;

import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
Expand Down Expand Up @@ -114,10 +116,21 @@ void connectToServer(@VertxInstance Vertx vertx) throws Exception {

Thread.sleep(1000);
assertNotNull(handler);
handler.sendMessage(Bytes.fromHexString("deadbeef"));

String rpcRequestBody = "{\"name\": [\"whoami\"],\"type\": \"async\",\"args\":[]}";
Bytes rpcRequest = RPCCodec.encodeRequest(rpcRequestBody, RPCFlag.BodyType.JSON);

handler.sendMessage(rpcRequest);

Thread.sleep(1000);
MyServerHandler serverHandler = serverHandlerRef.get();
assertEquals(Bytes.fromHexString("deadbeef"), serverHandler.received);

Bytes receivedBytes = serverHandler.received;
Bytes receivedBody = receivedBytes.slice(9);

Bytes requestBody = rpcRequest.slice(9);

assertEquals(requestBody, receivedBody);

handler.closeStream();
Thread.sleep(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ public MyClientHandler(Consumer<Bytes> sender, Runnable terminationFn) {

@Override
public void receivedMessage(Bytes message) {
System.out.println("We received a message?");
System.out.println(new String(message.toArrayUnsafe(), UTF_8));

RPCMessage rpcMessage = new RPCMessage(message);

System.out.println(rpcMessage.asString());


}

@Override
Expand Down Expand Up @@ -99,7 +103,6 @@ void runWithPatchWork(@VertxInstance Vertx vertx) throws Exception {
new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8))));
LoglLogDelegateFactory.setProvider(loggerProvider);


Optional<String> ssbDir = Optional.fromNullable(System.getenv().get("ssb_dir"));
Optional<String> homePath =
Optional.fromNullable(System.getProperty("user.home")).transform(home -> home + "/.ssb");
Expand Down

0 comments on commit e3e6768

Please sign in to comment.