Skip to content

Commit

Permalink
Fix failing test, some peer edits
Browse files Browse the repository at this point in the history
  • Loading branch information
mikera committed May 2, 2024
1 parent 95e8b2d commit 187e177
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 59 deletions.
26 changes: 23 additions & 3 deletions convex-core/src/main/java/convex/core/Peer.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ public static Peer create(AKeyPair peerKP, State genesis) {
/**
* Create a Peer instance from a remotely acquired Belief
* @param peerKP Peer KeyPair
* @param initialState Initial genesis State of the Network
* @param genesisState Initial genesis State of the Network
* @param remoteBelief Remote belief to sync with
* @return New Peer instance
*/
public static Peer create(AKeyPair peerKP, State initialState, Belief remoteBelief) {
Peer peer=create(peerKP,initialState);
public static Peer create(AKeyPair peerKP, State genesisState, Belief remoteBelief) {
Peer peer=create(peerKP,genesisState);
peer=peer.updateTimestamp(Utils.getCurrentTimestamp());
try {
peer=peer.mergeBeliefs(remoteBelief);
Expand All @@ -183,6 +183,26 @@ public static Peer create(AKeyPair peerKP, State initialState, Belief remoteBeli
throw Utils.sneakyThrow(e);
}
}

/**
* Create a Peer instance from a remotely acquired State and Order
* @param peerKP Peer KeyPair
* @param genesisState Initial genesis State of the Network
* @param remoteBelief Remote belief to sync with
* @return New Peer instance
*/
// public static Peer create(AKeyPair peerKP, State genesisState, State consensusState, SignedData<Order> order) {
// SignedData<Order> myOrder=peerKP.signData(order.getValue());
// Belief b=Belief.create(order,myOrder); // two orders in Belief at least....
// Peer peer=create(peerKP,genesisState,consensusState,b);
// peer=peer.updateTimestamp(Utils.getCurrentTimestamp());
// try {
// peer=peer.mergeBeliefs(remoteBelief);
// return peer;
// } catch (Throwable e) {
// throw Utils.sneakyThrow(e);
// }
// }

/**
* Like {@link #restorePeer(AStore, AKeyPair, ACell)} but uses a null root key.
Expand Down
2 changes: 1 addition & 1 deletion convex-core/src/test/java/convex/core/lang/CoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,7 @@ public void testMap() {
assertEquals(Sets.of(4,5,6),eval("(map (fn [a b] b) #{1 2 3} [4 5 6])"));

// Map over an index
assertEquals(Vectors.of(4,5,6),eval("(map second (into (index) {0x 4 0x01 5 0x0001 6}))"));
assertEquals(Vectors.of(4,5,6),eval("(mapv second (into (index) {0x 4 0x01 5 0x0101 6}))"));


// CAST error if any following arguments are not a data structure
Expand Down
134 changes: 79 additions & 55 deletions convex-peer/src/main/java/convex/peer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import convex.core.data.Keywords;
import convex.core.data.Maps;
import convex.core.data.Ref;
import convex.core.data.SignedData;
import convex.core.data.Strings;
import convex.core.data.Vectors;
import convex.core.data.prim.CVMLong;
Expand Down Expand Up @@ -127,15 +128,6 @@ public class Server implements Closeable {

private final ACell rootKey;

/**
* Flag for a running server. Setting to false will terminate server threads.
*/
private volatile boolean isRunning = false;

/**
* Flag for a live server. Live Server has synced with at least one other peer
*/
private volatile boolean isLive = false;

/**
* NIO Server instance
Expand Down Expand Up @@ -205,41 +197,9 @@ private Peer establishPeer() throws TimeoutException, IOException {
// TODO: should probably move acquisition to launch phase?
Object source=getConfig().get(Keywords.SOURCE);
if (Utils.bool(source)) {
// Peer sync case
InetSocketAddress sourceAddr=Utils.toInetSocketAddress(source);
Convex convex=Convex.connect(sourceAddr);
log.info("Attempting Peer Sync with: "+sourceAddr);
long timeout = establishTimeout();

// Sync status and genesis state
Result result = convex.requestStatusSync(timeout);
AVector<ACell> status = result.getValue();
if (status == null || status.count()!=Config.STATUS_COUNT) {
throw new Error("Bad status message from remote Peer");
}
Hash beliefHash=RT.ensureHash(status.get(0));
Hash networkID=RT.ensureHash(status.get(2));
log.info("Attempting to sync genesis state with network: "+networkID);
State genF=(State) convex.acquire(networkID).get(timeout,TimeUnit.MILLISECONDS);
log.info("Retrieved Genesis State: "+networkID);

// Belief acquisition
log.info("Attempting to obtain peer Belief: "+beliefHash);
Belief belF=null;
long timeElapsed=0;
while (belF==null) {
try {
belF=(Belief) convex.acquire(beliefHash).get(timeout,TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
timeElapsed+=timeout;
log.info("Still waiting for Belief sync after "+timeElapsed/1000+"s");
}
}
log.info("Retrieved Peer Belief: "+beliefHash+ " with memory size: "+belF.getMemorySize());

convex.close();
Peer peer=Peer.create(keyPair, genF, belF);
return peer;
if (sourceAddr==null) throw new IllegalStateException("Bad SOURCE for peer sync, should be an internet socket address: "+source);
return syncPeer(keyPair,sourceAddr);

} else if (Utils.bool(getConfig().get(Keywords.RESTORE))) {
// Restore from storage case
Expand All @@ -265,6 +225,56 @@ private Peer establishPeer() throws TimeoutException, IOException {
log.debug("Created new genesis state: "+genesisState.getHash()+ " with initial peer: "+peerKey);
}
return Peer.createGenesisPeer(keyPair,genesisState);
} finally {

}
}

private Peer syncPeer(AKeyPair keyPair, InetSocketAddress sourceAddr) throws IOException, TimeoutException {
// Peer sync case
try {
Convex convex = Convex.connect(sourceAddr);
log.info("Attempting Peer Sync with: "+sourceAddr);
long timeout = establishTimeout();

// Sync status and genesis state
Result result = convex.requestStatusSync(timeout);
AVector<ACell> status = result.getValue();
if (status == null || status.count()!=Config.STATUS_COUNT) {
throw new Error("Bad status message from remote Peer");
}
Hash beliefHash=RT.ensureHash(status.get(0));
AccountKey remoteKey=RT.ensureAccountKey(status.get(3));
Hash genesisHash=RT.ensureHash(status.get(2));
Hash stateHash=RT.ensureHash(status.get(4));
log.info("Attempting to sync remote state: "+stateHash + " on network: "+genesisHash);
State genF=(State) convex.acquire(genesisHash).get(timeout,TimeUnit.MILLISECONDS);
log.info("Retrieved Consensus State: "+genesisHash);

// Belief acquisition
log.info("Attempting to obtain peer Belief: "+beliefHash);
Belief belF=null;
long timeElapsed=0;
while (belF==null) {
try {
belF=(Belief) convex.acquire(beliefHash).get(timeout,TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
timeElapsed+=timeout;
log.info("Still waiting for Belief sync after "+timeElapsed/1000+"s");
}
}
log.info("Retrieved Peer Belief: "+beliefHash+ " with memory size: "+belF.getMemorySize());

convex.close();
SignedData<Order> peerOrder=belF.getOrders().get(remoteKey);
if (peerOrder!=null) {
SignedData<Order> newOrder=keyPair.signData(peerOrder.getValue());
belF=belF.withOrders(belF.getOrders().assoc(keyPair.getAccountKey(),newOrder));
} else {
log.warn("Remote peer Belief missing it's own Order?");
}
Peer peer=Peer.create(keyPair, genF, belF);
return peer;
} catch (ExecutionException|InterruptedException e) {
throw Utils.sneakyThrow(e);
}
Expand Down Expand Up @@ -343,22 +353,19 @@ public void launch() {
nio.launch((String)config.get(Keywords.BIND_ADDRESS), port);
port = nio.getPort(); // Get the actual port (may be auto-allocated)

// set running status now, so that loops don't terminate
// set running status now, so that loops don't immediately terminate
isRunning = true;

// Start connection manager loop

// Close server on shutdown, should be before Etch stores in priority
Shutdown.addHook(Shutdown.SERVER, ()->close());

// Start threaded components
manager.start();

queryHandler.start();

propagator.start();

transactionHandler.start();

executor.start();

// Close server on shutdown, should be before Etch stores in priority
Shutdown.addHook(Shutdown.SERVER, ()->close());

// Connect to source peer if specified
if (getConfig().containsKey(Keywords.SOURCE)) {
Expand Down Expand Up @@ -395,9 +402,7 @@ private void goLive() {
*
* SECURITY: Should anticipate malicious messages
*
* If the message is partial, will be queued pending delivery of missing data.
*
* Runs on receiver thread
* Runs on receiver thread, so we want to offload to a queue ASAP
*
* @param m
*/
Expand Down Expand Up @@ -766,10 +771,29 @@ public void setHostname(String string) {
config.put(Keywords.URL, string);
}


/**
* Flag for a running server. Setting to false will terminate server threads.
*/
private volatile boolean isRunning = true;

/**
* Flag for a live server. Live Server has synced with at least one other peer
*/
private volatile boolean isLive = false;

/**
* Checks is the server is Live, i.e. currently syncing successfully with network
* @return
*/
public boolean isLive() {
return isLive;
}

/**
* Checks is the Server threads are running
* @return
*/
public boolean isRunning() {
return isRunning;
}
Expand Down

0 comments on commit 187e177

Please sign in to comment.