From 187e17724b8a84f31c44ffbb0af8e160574d1dbe Mon Sep 17 00:00:00 2001 From: mikera Date: Thu, 2 May 2024 23:21:15 +0100 Subject: [PATCH] Fix failing test, some peer edits --- .../src/main/java/convex/core/Peer.java | 26 +++- .../test/java/convex/core/lang/CoreTest.java | 2 +- .../src/main/java/convex/peer/Server.java | 134 +++++++++++------- 3 files changed, 103 insertions(+), 59 deletions(-) diff --git a/convex-core/src/main/java/convex/core/Peer.java b/convex-core/src/main/java/convex/core/Peer.java index a9a3864ba..351763657 100644 --- a/convex-core/src/main/java/convex/core/Peer.java +++ b/convex-core/src/main/java/convex/core/Peer.java @@ -169,12 +169,12 @@ public static Peer create(AKeyPair peerKP, State genesis) { /** * Create a Peer instance from a remotely acquired Belief * @param peerKP Peer KeyPair - * @param initialState Initial genesis State of the Network + * @param genesisState Initial genesis State of the Network * @param remoteBelief Remote belief to sync with * @return New Peer instance */ - public static Peer create(AKeyPair peerKP, State initialState, Belief remoteBelief) { - Peer peer=create(peerKP,initialState); + public static Peer create(AKeyPair peerKP, State genesisState, Belief remoteBelief) { + Peer peer=create(peerKP,genesisState); peer=peer.updateTimestamp(Utils.getCurrentTimestamp()); try { peer=peer.mergeBeliefs(remoteBelief); @@ -183,6 +183,26 @@ public static Peer create(AKeyPair peerKP, State initialState, Belief remoteBeli throw Utils.sneakyThrow(e); } } + + /** + * Create a Peer instance from a remotely acquired State and Order + * @param peerKP Peer KeyPair + * @param genesisState Initial genesis State of the Network + * @param remoteBelief Remote belief to sync with + * @return New Peer instance + */ +// public static Peer create(AKeyPair peerKP, State genesisState, State consensusState, SignedData order) { +// SignedData myOrder=peerKP.signData(order.getValue()); +// Belief b=Belief.create(order,myOrder); // two orders in Belief at least.... +// Peer peer=create(peerKP,genesisState,consensusState,b); +// peer=peer.updateTimestamp(Utils.getCurrentTimestamp()); +// try { +// peer=peer.mergeBeliefs(remoteBelief); +// return peer; +// } catch (Throwable e) { +// throw Utils.sneakyThrow(e); +// } +// } /** * Like {@link #restorePeer(AStore, AKeyPair, ACell)} but uses a null root key. diff --git a/convex-core/src/test/java/convex/core/lang/CoreTest.java b/convex-core/src/test/java/convex/core/lang/CoreTest.java index 4a1c0b59f..f71fe2523 100644 --- a/convex-core/src/test/java/convex/core/lang/CoreTest.java +++ b/convex-core/src/test/java/convex/core/lang/CoreTest.java @@ -1920,7 +1920,7 @@ public void testMap() { assertEquals(Sets.of(4,5,6),eval("(map (fn [a b] b) #{1 2 3} [4 5 6])")); // Map over an index - assertEquals(Vectors.of(4,5,6),eval("(map second (into (index) {0x 4 0x01 5 0x0001 6}))")); + assertEquals(Vectors.of(4,5,6),eval("(mapv second (into (index) {0x 4 0x01 5 0x0101 6}))")); // CAST error if any following arguments are not a data structure diff --git a/convex-peer/src/main/java/convex/peer/Server.java b/convex-peer/src/main/java/convex/peer/Server.java index a3d2c4f5e..d02f96f6f 100644 --- a/convex-peer/src/main/java/convex/peer/Server.java +++ b/convex-peer/src/main/java/convex/peer/Server.java @@ -33,6 +33,7 @@ import convex.core.data.Keywords; import convex.core.data.Maps; import convex.core.data.Ref; +import convex.core.data.SignedData; import convex.core.data.Strings; import convex.core.data.Vectors; import convex.core.data.prim.CVMLong; @@ -127,15 +128,6 @@ public class Server implements Closeable { private final ACell rootKey; - /** - * Flag for a running server. Setting to false will terminate server threads. - */ - private volatile boolean isRunning = false; - - /** - * Flag for a live server. Live Server has synced with at least one other peer - */ - private volatile boolean isLive = false; /** * NIO Server instance @@ -205,41 +197,9 @@ private Peer establishPeer() throws TimeoutException, IOException { // TODO: should probably move acquisition to launch phase? Object source=getConfig().get(Keywords.SOURCE); if (Utils.bool(source)) { - // Peer sync case InetSocketAddress sourceAddr=Utils.toInetSocketAddress(source); - Convex convex=Convex.connect(sourceAddr); - log.info("Attempting Peer Sync with: "+sourceAddr); - long timeout = establishTimeout(); - - // Sync status and genesis state - Result result = convex.requestStatusSync(timeout); - AVector status = result.getValue(); - if (status == null || status.count()!=Config.STATUS_COUNT) { - throw new Error("Bad status message from remote Peer"); - } - Hash beliefHash=RT.ensureHash(status.get(0)); - Hash networkID=RT.ensureHash(status.get(2)); - log.info("Attempting to sync genesis state with network: "+networkID); - State genF=(State) convex.acquire(networkID).get(timeout,TimeUnit.MILLISECONDS); - log.info("Retrieved Genesis State: "+networkID); - - // Belief acquisition - log.info("Attempting to obtain peer Belief: "+beliefHash); - Belief belF=null; - long timeElapsed=0; - while (belF==null) { - try { - belF=(Belief) convex.acquire(beliefHash).get(timeout,TimeUnit.MILLISECONDS); - } catch (TimeoutException te) { - timeElapsed+=timeout; - log.info("Still waiting for Belief sync after "+timeElapsed/1000+"s"); - } - } - log.info("Retrieved Peer Belief: "+beliefHash+ " with memory size: "+belF.getMemorySize()); - - convex.close(); - Peer peer=Peer.create(keyPair, genF, belF); - return peer; + if (sourceAddr==null) throw new IllegalStateException("Bad SOURCE for peer sync, should be an internet socket address: "+source); + return syncPeer(keyPair,sourceAddr); } else if (Utils.bool(getConfig().get(Keywords.RESTORE))) { // Restore from storage case @@ -265,6 +225,56 @@ private Peer establishPeer() throws TimeoutException, IOException { log.debug("Created new genesis state: "+genesisState.getHash()+ " with initial peer: "+peerKey); } return Peer.createGenesisPeer(keyPair,genesisState); + } finally { + + } + } + + private Peer syncPeer(AKeyPair keyPair, InetSocketAddress sourceAddr) throws IOException, TimeoutException { + // Peer sync case + try { + Convex convex = Convex.connect(sourceAddr); + log.info("Attempting Peer Sync with: "+sourceAddr); + long timeout = establishTimeout(); + + // Sync status and genesis state + Result result = convex.requestStatusSync(timeout); + AVector status = result.getValue(); + if (status == null || status.count()!=Config.STATUS_COUNT) { + throw new Error("Bad status message from remote Peer"); + } + Hash beliefHash=RT.ensureHash(status.get(0)); + AccountKey remoteKey=RT.ensureAccountKey(status.get(3)); + Hash genesisHash=RT.ensureHash(status.get(2)); + Hash stateHash=RT.ensureHash(status.get(4)); + log.info("Attempting to sync remote state: "+stateHash + " on network: "+genesisHash); + State genF=(State) convex.acquire(genesisHash).get(timeout,TimeUnit.MILLISECONDS); + log.info("Retrieved Consensus State: "+genesisHash); + + // Belief acquisition + log.info("Attempting to obtain peer Belief: "+beliefHash); + Belief belF=null; + long timeElapsed=0; + while (belF==null) { + try { + belF=(Belief) convex.acquire(beliefHash).get(timeout,TimeUnit.MILLISECONDS); + } catch (TimeoutException te) { + timeElapsed+=timeout; + log.info("Still waiting for Belief sync after "+timeElapsed/1000+"s"); + } + } + log.info("Retrieved Peer Belief: "+beliefHash+ " with memory size: "+belF.getMemorySize()); + + convex.close(); + SignedData peerOrder=belF.getOrders().get(remoteKey); + if (peerOrder!=null) { + SignedData newOrder=keyPair.signData(peerOrder.getValue()); + belF=belF.withOrders(belF.getOrders().assoc(keyPair.getAccountKey(),newOrder)); + } else { + log.warn("Remote peer Belief missing it's own Order?"); + } + Peer peer=Peer.create(keyPair, genF, belF); + return peer; } catch (ExecutionException|InterruptedException e) { throw Utils.sneakyThrow(e); } @@ -343,22 +353,19 @@ public void launch() { nio.launch((String)config.get(Keywords.BIND_ADDRESS), port); port = nio.getPort(); // Get the actual port (may be auto-allocated) - // set running status now, so that loops don't terminate + // set running status now, so that loops don't immediately terminate isRunning = true; - - // Start connection manager loop + + // Close server on shutdown, should be before Etch stores in priority + Shutdown.addHook(Shutdown.SERVER, ()->close()); + + // Start threaded components manager.start(); - queryHandler.start(); - propagator.start(); - transactionHandler.start(); - executor.start(); - // Close server on shutdown, should be before Etch stores in priority - Shutdown.addHook(Shutdown.SERVER, ()->close()); // Connect to source peer if specified if (getConfig().containsKey(Keywords.SOURCE)) { @@ -395,9 +402,7 @@ private void goLive() { * * SECURITY: Should anticipate malicious messages * - * If the message is partial, will be queued pending delivery of missing data. - * - * Runs on receiver thread + * Runs on receiver thread, so we want to offload to a queue ASAP * * @param m */ @@ -766,10 +771,29 @@ public void setHostname(String string) { config.put(Keywords.URL, string); } + + /** + * Flag for a running server. Setting to false will terminate server threads. + */ + private volatile boolean isRunning = true; + + /** + * Flag for a live server. Live Server has synced with at least one other peer + */ + private volatile boolean isLive = false; + + /** + * Checks is the server is Live, i.e. currently syncing successfully with network + * @return + */ public boolean isLive() { return isLive; } + /** + * Checks is the Server threads are running + * @return + */ public boolean isRunning() { return isRunning; }