Skip to content

Commit

Permalink
Code tidying
Browse files Browse the repository at this point in the history
  • Loading branch information
mikera committed Aug 24, 2024
1 parent 9dd12f3 commit 063927d
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 17 deletions.
12 changes: 6 additions & 6 deletions convex-peer/src/main/java/convex/api/ConvexRemote.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public synchronized CompletableFuture<Result> transact(SignedData<ATransaction>
CompletableFuture<Result> cf;
long id = -1;
long wait=1;
// loop until request is queued

// loop until request is queued. We need this for backpressure
while (true) {
if (connection.isClosed()) throw new IOException("Connection closed");
synchronized (awaiting) {
Expand All @@ -137,21 +137,20 @@ public synchronized CompletableFuture<Result> transact(SignedData<ATransaction>
Thread.sleep(wait);
wait+=1; // linear backoff
} catch (InterruptedException e) {
throw Utils.sneakyThrow(e);
Thread.currentThread().interrupt();
throw new IOException("Transaction sending interrupted",e);
}
}

log.trace("Sent transaction with message ID: {} awaiting count = {}", id, awaiting.size());
return cf;
}



@Override
public CompletableFuture<Result> query(ACell query, Address address) throws IOException {
long wait=1;

// loop until request is queued
// loop until request is queued. We need this for backpressure
while (true) {
synchronized (awaiting) {
long id = connection.sendQuery(query, address);
Expand All @@ -166,7 +165,8 @@ public CompletableFuture<Result> query(ACell query, Address address) throws IOEx
Thread.sleep(wait);
wait+=wait; // exponential backoff
} catch (InterruptedException e) {
throw new IOException("Transaction sending interrupted",e);
Thread.currentThread().interrupt();
throw new IOException("Query sending interrupted",e);
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions convex-peer/src/main/java/convex/net/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,7 @@ public boolean sendMessage(Message msg) {
}

/**
* Sends a full payload for the given message type. Should be called on the thread
* that responds to missing data messages from the destination.
* Sends a message with full payload for the given message type.
*
* @param type Type of message
* @param payload Payload value for message
Expand Down
4 changes: 2 additions & 2 deletions convex-peer/src/main/java/convex/net/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public static Message createGoodBye() {
@SuppressWarnings("unchecked")
public <T extends ACell> T getPayload() throws BadFormatException {
if (payload!=null) return (T) payload;
if (messageData==null) return null;
if (messageData==null) return null; // no message data, so must actually be null

// actual null payload :-)
// detect actual message data for null payload :-)
if ((messageData.count()==1)&&(messageData.byteAt(0)==Tag.NULL)) return null;

switch(type) {
Expand Down
16 changes: 9 additions & 7 deletions convex-peer/src/main/java/convex/peer/QueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import convex.core.data.AVector;
import convex.core.data.Address;
import convex.core.data.prim.CVMLong;
import convex.core.exceptions.BadFormatException;
import convex.core.lang.RT;
import convex.core.util.LoadMonitor;
import convex.net.Message;
Expand Down Expand Up @@ -64,28 +65,29 @@ protected void loop() throws InterruptedException {
private void handleQuery(Message m) {
try {
// query is a vector [id , form, address?]
AVector<ACell> v = m.getPayload();
AVector<ACell> v= m.getPayload();
CVMLong id = (CVMLong) v.get(0);
ACell form = v.get(1);

// extract the Address, might be null
Address address = RT.ensureAddress(v.get(2));

log.debug( "Processing query: {} with address: {}" , form, address);
// log.log(LEVEL_MESSAGE, "Processing query: " + form + " with address: " +
// address);
ResultContext resultContext = server.getPeer().executeQuery(form, address);

// Report result back to message sender
boolean resultReturned= m.returnResult(Result.fromContext(id, resultContext));

if (!resultReturned) {
log.warn("Failed to send query result back to client with ID: {}", id);
}

} catch (Throwable t) {
log.warn("Query Error: {}", t);
} catch (BadFormatException e) {
log.debug("Terminated client due to bad query format");
m.closeConnection();
}

}

@Override
Expand Down
10 changes: 10 additions & 0 deletions convex-peer/src/test/java/convex/api/ConvexRemoteTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import convex.core.crypto.AKeyPair;
import convex.core.crypto.Ed25519Signature;
import convex.core.data.Address;
import convex.core.data.Blobs;
import convex.core.data.Ref;
import convex.core.data.SignedData;
import convex.core.data.prim.CVMLong;
Expand All @@ -31,6 +32,8 @@
import convex.core.transactions.Invoke;
import convex.core.util.Utils;
import convex.net.Connection;
import convex.net.Message;
import convex.net.MessageType;
import convex.peer.TestNetwork;

/**
Expand Down Expand Up @@ -70,6 +73,13 @@ public void testConnection() throws IOException, TimeoutException {
assertTrue(convex.isConnected());
}
}

@Test
public void testBadQueryMessage() throws IOException, TimeoutException {
ConvexRemote convex = Convex.connect(network.SERVER.getHostAddress());
Connection conn=convex.connection;
conn.sendMessage(Message.create(MessageType.QUERY, Blobs.empty()));
}

@Test
public void testBadConnect() throws IOException, TimeoutException {
Expand Down

0 comments on commit 063927d

Please sign in to comment.