From 61ceb50c42094226c47baa077d38543ae7eb9c82 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 07:00:48 -0700 Subject: [PATCH 01/13] use default virtual threading caching pools --- .../com/salesforce/apollo/choam/DynamicTest.java | 9 +-------- .../com/salesforce/apollo/choam/TestCHOAM.java | 12 +++++------- .../com/salesforce/apollo/fireflies/ChurnTest.java | 13 +++---------- .../com/salesforce/apollo/fireflies/MtlsTest.java | 3 +-- .../com/salesforce/apollo/fireflies/SwarmTest.java | 13 +++---------- .../apollo/model/ContainmentDomainTest.java | 14 +++++--------- .../com/salesforce/apollo/model/DomainTest.java | 14 +++++--------- .../com/salesforce/apollo/model/FireFliesTest.java | 14 +++++--------- .../com/salesforce/apollo/state/CHOAMTest.java | 9 +-------- 9 files changed, 29 insertions(+), 72 deletions(-) diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java index 7cb99d6d1..9789f07fe 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java @@ -3,7 +3,6 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContext; @@ -26,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -43,7 +41,6 @@ public class DynamicTest { private Map routers; private Map choams; private Map> contexts; - private ExecutorService executor; @BeforeEach public void setUp() throws Exception { @@ -64,11 +61,10 @@ public void setUp() throws Exception { .map(ControlledIdentifierMember::new) .map(e -> (Member) e) .toList(); - executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); final var prefix = UUID.randomUUID().toString(); routers = members.stream() .collect(Collectors.toMap(m -> m, m -> new LocalServer(prefix, m).router( - ServerConnectionCache.newBuilder().setTarget(cardinality * 2), executor))); + ServerConnectionCache.newBuilder().setTarget(cardinality * 2)))); var template = Parameters.newBuilder() .setGenerateGenesis(true) @@ -219,9 +215,6 @@ public void tearDown() throws Exception { routers = null; } members = null; - if (executor != null) { - executor.shutdown(); - } } private CHOAM constructCHOAM(SigningMember m, Parameters.Builder params, Context context) { diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index 64be54d3c..0ca7fd638 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -8,7 +8,10 @@ import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.MetricRegistry; -import com.salesforce.apollo.archipelago.*; +import com.salesforce.apollo.archipelago.LocalServer; +import com.salesforce.apollo.archipelago.Router; +import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters.ProducerParameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -68,7 +71,6 @@ public class TestCHOAM { private MetricRegistry registry; private Map routers; private ScheduledExecutorService scheduler; - private ExecutorService executor; @AfterEach public void after() throws Exception { @@ -83,9 +85,6 @@ public void after() throws Exception { if (scheduler != null) { scheduler.shutdown(); } - if (executor != null) { - executor.shutdown(); - } members = null; registry = null; } @@ -93,7 +92,6 @@ public void after() throws Exception { @BeforeEach public void before() throws Exception { scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory()); - executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var origin = DigestAlgorithm.DEFAULT.getOrigin(); registry = new MetricRegistry(); var metrics = new ChoamMetricsImpl(origin, registry); @@ -130,7 +128,7 @@ public void before() throws Exception { .collect(Collectors.toMap(m -> m.getId(), m -> new LocalServer(prefix, m).router( ServerConnectionCache.newBuilder() .setMetrics(new ServerConnectionCacheMetricsImpl(registry)) - .setTarget(CARDINALITY), executor))); + .setTarget(CARDINALITY)))); choams = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { var recording = new AtomicInteger(); blocks.put(m.getId(), recording); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java index fab24a1bd..9825e45f6 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java @@ -29,7 +29,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -53,7 +52,6 @@ public class ChurnTest { private MetricRegistry node0Registry; private MetricRegistry registry; private List views; - private ExecutorService executor; @BeforeAll public static void beforeClass() throws Exception { @@ -81,9 +79,6 @@ public void after() { gateways.forEach(e -> e.close(Duration.ofSeconds(0))); gateways.clear(); - if (executor != null) { - executor.shutdown(); - } } @Test @@ -265,7 +260,6 @@ public void churn() throws Exception { } private void initialize() { - executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var parameters = Parameters.newBuilder().setMaximumTxfr(20).build(); registry = new MetricRegistry(); node0Registry = new MetricRegistry(); @@ -288,15 +282,14 @@ private void initialize() { .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) ? node0Registry - : registry)), - executor); + : registry))); var gateway = new LocalServer(gatewayPrefix, node).router(ServerConnectionCache.newBuilder() .setTarget(200) .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) - ? node0Registry : registry)), - executor); + ? node0Registry + : registry))); comms.start(); communications.add(comms); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java index 36a616dd3..bc805d703 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java @@ -39,7 +39,6 @@ import java.time.Instant; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -128,7 +127,7 @@ public void smoke() throws Exception { builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry)); CertificateWithPrivateKey certWithKey = certs.get(node.getId()); Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router( - builder, Executors.newVirtualThreadPerTaskExecutor()); + builder); communications.add(comms); return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, Verifiers.NONE, comms, parameters, DigestAlgorithm.DEFAULT, metrics); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java index 632cc0613..0e4f93dc6 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java @@ -29,7 +29,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -63,7 +62,6 @@ public class SwarmTest { private MetricRegistry node0Registry; private MetricRegistry registry; private List views; - private ExecutorService executor; @BeforeAll public static void beforeClass() throws Exception { @@ -91,9 +89,6 @@ public void after() { gateways.forEach(e -> e.close(Duration.ofSeconds(1))); gateways.clear(); - if (executor != null) { - executor.shutdown(); - } } @Test @@ -209,7 +204,6 @@ public void swarm() throws Exception { } private void initialize() { - executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var parameters = Parameters.newBuilder() .setMaxPending(50) .setMaximumTxfr(20) @@ -241,15 +235,14 @@ private void initialize() { .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) ? node0Registry - : registry)), - executor); + : registry))); var gateway = new LocalServer(gatewayPrefix, node).router(ServerConnectionCache.newBuilder() .setTarget(200) .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) - ? node0Registry : registry)), - executor); + ? node0Registry + : registry))); comms.start(); communications.add(comms); diff --git a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java index 641c211c8..b90d4f59c 100644 --- a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java @@ -6,7 +6,10 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.*; +import com.salesforce.apollo.archipelago.EndpointProvider; +import com.salesforce.apollo.archipelago.LocalServer; +import com.salesforce.apollo.archipelago.Router; +import com.salesforce.apollo.archipelago.ServerConnectionCache; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -33,7 +36,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -48,7 +50,6 @@ public class ContainmentDomainTest { "Give me food or give me slack or kill me".getBytes()); private final ArrayList domains = new ArrayList<>(); private final ArrayList routers = new ArrayList<>(); - private ExecutorService executor; @AfterEach public void after() { @@ -56,14 +57,10 @@ public void after() { domains.clear(); routers.forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); - if (executor != null) { - executor.shutdown(); - } } @BeforeEach public void before() throws Exception { - executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); final var commsDirectory = Path.of("target/comms"); commsDirectory.toFile().mkdirs(); @@ -86,8 +83,7 @@ public void before() throws Exception { final var group = DigestAlgorithm.DEFAULT.getOrigin(); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), - executor); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); routers.add(localRouter); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1), diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index 122956188..d5ab77ea9 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -6,7 +6,10 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.*; +import com.salesforce.apollo.archipelago.EndpointProvider; +import com.salesforce.apollo.archipelago.LocalServer; +import com.salesforce.apollo.archipelago.Router; +import com.salesforce.apollo.archipelago.ServerConnectionCache; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -35,7 +38,6 @@ import java.util.Arrays; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -53,7 +55,6 @@ public class DomainTest { "Give me food or give me slack or kill me".getBytes()); private final ArrayList domains = new ArrayList<>(); private final ArrayList routers = new ArrayList<>(); - private ExecutorService executor; public static void smoke(Oracle oracle) throws Exception { // Namespace @@ -216,14 +217,10 @@ public void after() { domains.clear(); routers.forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); - if (executor != null) { - executor.shutdown(); - } } @BeforeEach public void before() throws Exception { - executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var ffParams = com.salesforce.apollo.fireflies.Parameters.newBuilder(); var entropy = SecureRandom.getInstance("SHA1PRNG"); entropy.setSeed(new byte[] { 6, 6, 6 }); @@ -243,8 +240,7 @@ public void before() throws Exception { final var group = DigestAlgorithm.DEFAULT.getOrigin(); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), - executor); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); routers.add(localRouter); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1), diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index 867d8d4d0..5df4b74a3 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -6,7 +6,10 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.*; +import com.salesforce.apollo.archipelago.EndpointProvider; +import com.salesforce.apollo.archipelago.LocalServer; +import com.salesforce.apollo.archipelago.Router; +import com.salesforce.apollo.archipelago.ServerConnectionCache; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -34,7 +37,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -53,7 +55,6 @@ public class FireFliesTest { private final List domains = new ArrayList<>(); private final Map routers = new HashMap<>(); - private ExecutorService executor; @AfterEach public void after() { @@ -61,14 +62,10 @@ public void after() { domains.clear(); routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); - if (executor != null) { - executor.shutdown(); - } } @BeforeEach public void before() throws Exception { - executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var ffParams = com.salesforce.apollo.fireflies.Parameters.newBuilder(); var entropy = SecureRandom.getInstance("SHA1PRNG"); entropy.setSeed(new byte[] { 6, 6, 6 }); @@ -87,8 +84,7 @@ public void before() throws Exception { identities.forEach((digest, id) -> { var context = new DynamicContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3); final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), - executor); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofSeconds(5), "jdbc:h2:mem:%s-state".formatted(digest), diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java index 1ec5449d1..efa4cc461 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java @@ -11,7 +11,6 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.CHOAM; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters; @@ -83,7 +82,6 @@ public class CHOAMTest { private MetricRegistry registry; private Map routers; private ScheduledExecutorService scheduler; - private ExecutorService executor; private static Txn initialInsert() { return Txn.newBuilder() @@ -109,9 +107,6 @@ public void after() throws Exception { scheduler.shutdownNow(); scheduler = null; } - if (executor != null) { - executor.shutdown(); - } updaters.values().forEach(up -> up.close()); updaters.clear(); members = null; @@ -128,7 +123,6 @@ public void after() throws Exception { @BeforeEach public void before() throws Exception { scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory()); - executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); registry = new MetricRegistry(); checkpointDirBase = new File("target/ct-chkpoints-" + Entropy.nextBitsStreamLong()); Utils.clean(checkpointDirBase); @@ -161,8 +155,7 @@ public void before() throws Exception { members.forEach(m -> context.activate(m)); final var prefix = UUID.randomUUID().toString(); routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { - var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30), - executor); + var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30)); return localRouter; })); choams = members.stream() From 44fad517d912bc571bbed4ff2878eb5a38a63fc6 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 07:20:23 -0700 Subject: [PATCH 02/13] correct use of majority in ViewAssembly --- .../java/com/salesforce/apollo/choam/GenesisAssembly.java | 4 ++-- .../main/java/com/salesforce/apollo/choam/ViewAssembly.java | 4 ++-- .../src/test/java/com/salesforce/apollo/choam/TestCHOAM.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index f8cdbee1b..cdcb732dd 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -107,8 +107,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications comms, @Override public void certify() { if (slate.size() != nextAssembly.size()) { - log.info("Not certifying genesis for: {} slate incomplete: {} on: {}", view.context().getId(), - slate.keySet().stream().sorted().toList(), params().member().getId()); + log.info("Not certifying genesis for: {} need: {} slate incomplete: {} on: {}", view.context().getId(), + nextAssembly.size(), slate.keySet().stream().sorted().toList(), params().member().getId()); return; } assert slate.size() == nextAssembly.size() : "Expected: %s members, slate: %s".formatted(nextAssembly.size(), diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index 2e1d5cb11..b68893c8b 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -238,7 +238,7 @@ private boolean checkAssembly() { if (selected == null) { return false; } - if (proposals.size() == selected.majority) { + if (proposals.size() == selected.assembly.size()) { transitions.certified(); return true; } @@ -401,7 +401,7 @@ public String toString() { private class Recon implements Reconfiguration { @Override public void certify() { - if (proposals.size() == selected.majority) { + if (proposals.size() == selected.assembly.size()) { log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority, nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); transitions.certified(); diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index 0ca7fd638..7d5e0235a 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -101,7 +101,7 @@ public void before() throws Exception { var params = Parameters.newBuilder() .setGenerateGenesis(true) - .setGenesisViewId(origin.prefix(entropy.nextLong())) + .setGenesisViewId(origin.prefix("Slack")) .setGossipDuration(Duration.ofMillis(20)) .setProducer(ProducerParameters.newBuilder() .setMaxBatchCount(15_000) From 5b6bd61ff403f041a4bf49fefb9da859f858f89f Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 07:41:58 -0700 Subject: [PATCH 03/13] logging --- choam/src/test/resources/logback-test.xml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml index d5be6a052..63b2b14e6 100644 --- a/choam/src/test/resources/logback-test.xml +++ b/choam/src/test/resources/logback-test.xml @@ -33,7 +33,7 @@ - + @@ -41,6 +41,10 @@ + + + + From a453f4fb0b5224e8736f948a564084553bfdfbf4 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 07:43:42 -0700 Subject: [PATCH 04/13] revert --- .../com/salesforce/apollo/fireflies/ChurnTest.java | 13 ++++++++++--- .../com/salesforce/apollo/fireflies/SwarmTest.java | 13 ++++++++++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java index 9825e45f6..fab24a1bd 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java @@ -29,6 +29,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -52,6 +53,7 @@ public class ChurnTest { private MetricRegistry node0Registry; private MetricRegistry registry; private List views; + private ExecutorService executor; @BeforeAll public static void beforeClass() throws Exception { @@ -79,6 +81,9 @@ public void after() { gateways.forEach(e -> e.close(Duration.ofSeconds(0))); gateways.clear(); + if (executor != null) { + executor.shutdown(); + } } @Test @@ -260,6 +265,7 @@ public void churn() throws Exception { } private void initialize() { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var parameters = Parameters.newBuilder().setMaximumTxfr(20).build(); registry = new MetricRegistry(); node0Registry = new MetricRegistry(); @@ -282,14 +288,15 @@ private void initialize() { .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) ? node0Registry - : registry))); + : registry)), + executor); var gateway = new LocalServer(gatewayPrefix, node).router(ServerConnectionCache.newBuilder() .setTarget(200) .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) - ? node0Registry - : registry))); + ? node0Registry : registry)), + executor); comms.start(); communications.add(comms); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java index 0e4f93dc6..632cc0613 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java @@ -29,6 +29,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -62,6 +63,7 @@ public class SwarmTest { private MetricRegistry node0Registry; private MetricRegistry registry; private List views; + private ExecutorService executor; @BeforeAll public static void beforeClass() throws Exception { @@ -89,6 +91,9 @@ public void after() { gateways.forEach(e -> e.close(Duration.ofSeconds(1))); gateways.clear(); + if (executor != null) { + executor.shutdown(); + } } @Test @@ -204,6 +209,7 @@ public void swarm() throws Exception { } private void initialize() { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var parameters = Parameters.newBuilder() .setMaxPending(50) .setMaximumTxfr(20) @@ -235,14 +241,15 @@ private void initialize() { .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) ? node0Registry - : registry))); + : registry)), + executor); var gateway = new LocalServer(gatewayPrefix, node).router(ServerConnectionCache.newBuilder() .setTarget(200) .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) - ? node0Registry - : registry))); + ? node0Registry : registry)), + executor); comms.start(); communications.add(comms); From 6d9b04245657e23166e1ba400b133b3f2e8b9fd1 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 09:25:40 -0700 Subject: [PATCH 05/13] revert --- .../com/salesforce/apollo/choam/DynamicTest.java | 9 ++++++++- .../com/salesforce/apollo/choam/TestCHOAM.java | 14 ++++++++------ .../com/salesforce/apollo/fireflies/MtlsTest.java | 3 ++- .../apollo/archipelago/RouterSupplier.java | 15 +-------------- .../apollo/model/ContainmentDomainTest.java | 14 +++++++++----- .../com/salesforce/apollo/model/DomainTest.java | 14 +++++++++----- .../salesforce/apollo/model/FireFliesTest.java | 14 +++++++++----- .../com/salesforce/apollo/state/CHOAMTest.java | 9 ++++++++- 8 files changed, 54 insertions(+), 38 deletions(-) diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java index 9789f07fe..7cb99d6d1 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java @@ -3,6 +3,7 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContext; @@ -25,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -41,6 +43,7 @@ public class DynamicTest { private Map routers; private Map choams; private Map> contexts; + private ExecutorService executor; @BeforeEach public void setUp() throws Exception { @@ -61,10 +64,11 @@ public void setUp() throws Exception { .map(ControlledIdentifierMember::new) .map(e -> (Member) e) .toList(); + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); final var prefix = UUID.randomUUID().toString(); routers = members.stream() .collect(Collectors.toMap(m -> m, m -> new LocalServer(prefix, m).router( - ServerConnectionCache.newBuilder().setTarget(cardinality * 2)))); + ServerConnectionCache.newBuilder().setTarget(cardinality * 2), executor))); var template = Parameters.newBuilder() .setGenerateGenesis(true) @@ -215,6 +219,9 @@ public void tearDown() throws Exception { routers = null; } members = null; + if (executor != null) { + executor.shutdown(); + } } private CHOAM constructCHOAM(SigningMember m, Parameters.Builder params, Context context) { diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index 7d5e0235a..64be54d3c 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -8,10 +8,7 @@ import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.MetricRegistry; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters.ProducerParameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -71,6 +68,7 @@ public class TestCHOAM { private MetricRegistry registry; private Map routers; private ScheduledExecutorService scheduler; + private ExecutorService executor; @AfterEach public void after() throws Exception { @@ -85,6 +83,9 @@ public void after() throws Exception { if (scheduler != null) { scheduler.shutdown(); } + if (executor != null) { + executor.shutdown(); + } members = null; registry = null; } @@ -92,6 +93,7 @@ public void after() throws Exception { @BeforeEach public void before() throws Exception { scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory()); + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var origin = DigestAlgorithm.DEFAULT.getOrigin(); registry = new MetricRegistry(); var metrics = new ChoamMetricsImpl(origin, registry); @@ -101,7 +103,7 @@ public void before() throws Exception { var params = Parameters.newBuilder() .setGenerateGenesis(true) - .setGenesisViewId(origin.prefix("Slack")) + .setGenesisViewId(origin.prefix(entropy.nextLong())) .setGossipDuration(Duration.ofMillis(20)) .setProducer(ProducerParameters.newBuilder() .setMaxBatchCount(15_000) @@ -128,7 +130,7 @@ public void before() throws Exception { .collect(Collectors.toMap(m -> m.getId(), m -> new LocalServer(prefix, m).router( ServerConnectionCache.newBuilder() .setMetrics(new ServerConnectionCacheMetricsImpl(registry)) - .setTarget(CARDINALITY)))); + .setTarget(CARDINALITY), executor))); choams = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { var recording = new AtomicInteger(); blocks.put(m.getId(), recording); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java index bc805d703..36a616dd3 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java @@ -39,6 +39,7 @@ import java.time.Instant; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -127,7 +128,7 @@ public void smoke() throws Exception { builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry)); CertificateWithPrivateKey certWithKey = certs.get(node.getId()); Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router( - builder); + builder, Executors.newVirtualThreadPerTaskExecutor()); communications.add(comms); return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, Verifiers.NONE, comms, parameters, DigestAlgorithm.DEFAULT, metrics); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java index 8277eb526..a07eafb56 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java @@ -13,7 +13,7 @@ import java.util.Collections; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; import java.util.function.Predicate; import java.util.function.Supplier; @@ -21,19 +21,6 @@ * @author hal.hildebrand */ public interface RouterSupplier { - static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory) { - return newCachedThreadPool(corePoolSize, threadFactory, true); - } - - static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory, boolean preStart) { - var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - new SynchronousQueue(), threadFactory); - if (preStart) { - threadPoolExecutor.prestartAllCoreThreads(); - } - return threadPoolExecutor; - } - default Router router() { return router(ServerConnectionCache.newBuilder(), RouterImpl::defaultServerLimit, null); } diff --git a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java index b90d4f59c..641c211c8 100644 --- a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java @@ -6,10 +6,7 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.EndpointProvider; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -36,6 +33,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -50,6 +48,7 @@ public class ContainmentDomainTest { "Give me food or give me slack or kill me".getBytes()); private final ArrayList domains = new ArrayList<>(); private final ArrayList routers = new ArrayList<>(); + private ExecutorService executor; @AfterEach public void after() { @@ -57,10 +56,14 @@ public void after() { domains.clear(); routers.forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); + if (executor != null) { + executor.shutdown(); + } } @BeforeEach public void before() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); final var commsDirectory = Path.of("target/comms"); commsDirectory.toFile().mkdirs(); @@ -83,7 +86,8 @@ public void before() throws Exception { final var group = DigestAlgorithm.DEFAULT.getOrigin(); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); routers.add(localRouter); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1), diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index d5ab77ea9..122956188 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -6,10 +6,7 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.EndpointProvider; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -38,6 +35,7 @@ import java.util.Arrays; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -55,6 +53,7 @@ public class DomainTest { "Give me food or give me slack or kill me".getBytes()); private final ArrayList domains = new ArrayList<>(); private final ArrayList routers = new ArrayList<>(); + private ExecutorService executor; public static void smoke(Oracle oracle) throws Exception { // Namespace @@ -217,10 +216,14 @@ public void after() { domains.clear(); routers.forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); + if (executor != null) { + executor.shutdown(); + } } @BeforeEach public void before() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var ffParams = com.salesforce.apollo.fireflies.Parameters.newBuilder(); var entropy = SecureRandom.getInstance("SHA1PRNG"); entropy.setSeed(new byte[] { 6, 6, 6 }); @@ -240,7 +243,8 @@ public void before() throws Exception { final var group = DigestAlgorithm.DEFAULT.getOrigin(); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); routers.add(localRouter); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1), diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index 5df4b74a3..867d8d4d0 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -6,10 +6,7 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.EndpointProvider; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -37,6 +34,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -55,6 +53,7 @@ public class FireFliesTest { private final List domains = new ArrayList<>(); private final Map routers = new HashMap<>(); + private ExecutorService executor; @AfterEach public void after() { @@ -62,10 +61,14 @@ public void after() { domains.clear(); routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); + if (executor != null) { + executor.shutdown(); + } } @BeforeEach public void before() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var ffParams = com.salesforce.apollo.fireflies.Parameters.newBuilder(); var entropy = SecureRandom.getInstance("SHA1PRNG"); entropy.setSeed(new byte[] { 6, 6, 6 }); @@ -84,7 +87,8 @@ public void before() throws Exception { identities.forEach((digest, id) -> { var context = new DynamicContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3); final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofSeconds(5), "jdbc:h2:mem:%s-state".formatted(digest), diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java index efa4cc461..1ec5449d1 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java @@ -11,6 +11,7 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.CHOAM; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters; @@ -82,6 +83,7 @@ public class CHOAMTest { private MetricRegistry registry; private Map routers; private ScheduledExecutorService scheduler; + private ExecutorService executor; private static Txn initialInsert() { return Txn.newBuilder() @@ -107,6 +109,9 @@ public void after() throws Exception { scheduler.shutdownNow(); scheduler = null; } + if (executor != null) { + executor.shutdown(); + } updaters.values().forEach(up -> up.close()); updaters.clear(); members = null; @@ -123,6 +128,7 @@ public void after() throws Exception { @BeforeEach public void before() throws Exception { scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory()); + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); registry = new MetricRegistry(); checkpointDirBase = new File("target/ct-chkpoints-" + Entropy.nextBitsStreamLong()); Utils.clean(checkpointDirBase); @@ -155,7 +161,8 @@ public void before() throws Exception { members.forEach(m -> context.activate(m)); final var prefix = UUID.randomUUID().toString(); routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { - var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); return localRouter; })); choams = members.stream() From e430e17dc6d34138114b6bfdd099acb0f9a509e4 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 09:38:10 -0700 Subject: [PATCH 06/13] set executor --- .../com/salesforce/apollo/model/ProcessContainerDomain.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java index 602b910a8..c486f72fe 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java @@ -84,6 +84,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P .protocolNegotiator( new DomainSocketNegotiatorHandler.DomainSocketNegotiator( IMPL)) + .executor(Executors.newVirtualThreadPerTaskExecutor()) .withChildOption(ChannelOption.TCP_NODELAY, true) .channelType(IMPL.getServerDomainSocketChannelClass()) .workerEventLoopGroup(portalEventLoopGroup) @@ -93,6 +94,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P outerContextEndpoint = new DomainSocketAddress( communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile()); outerContextService = NettyServerBuilder.forAddress(outerContextEndpoint) + .executor(Executors.newVirtualThreadPerTaskExecutor()) .protocolNegotiator( new DomainSocketNegotiatorHandler.DomainSocketNegotiator(IMPL)) .withChildOption(ChannelOption.TCP_NODELAY, true) From 8bd4a9274d0a0a126ebcbd0d34b924c223a08597 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 09:47:26 -0700 Subject: [PATCH 07/13] one executor --- .../com/salesforce/apollo/fireflies/MtlsTest.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java index 36a616dd3..9a1176cb7 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java @@ -39,7 +39,7 @@ import java.time.Instant; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -66,8 +66,9 @@ public class MtlsTest { CARDINALITY = LARGE_TESTS ? 20 : 10; } - private final List communications = new ArrayList<>(); - private List views; + private final List communications = new ArrayList<>(); + private List views; + private ExecutorService executor; @BeforeAll public static void beforeClass() throws Exception { @@ -98,10 +99,14 @@ public void after() { communications.forEach(e -> e.close(Duration.ofSeconds(1))); communications.clear(); } + if (executor != null) { + executor.shutdown(); + } } @Test public void smoke() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var parameters = Parameters.newBuilder().setMaximumTxfr(20).build(); final Duration duration = Duration.ofMillis(50); var registry = new MetricRegistry(); @@ -128,7 +133,7 @@ public void smoke() throws Exception { builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry)); CertificateWithPrivateKey certWithKey = certs.get(node.getId()); Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router( - builder, Executors.newVirtualThreadPerTaskExecutor()); + builder, executor); communications.add(comms); return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, Verifiers.NONE, comms, parameters, DigestAlgorithm.DEFAULT, metrics); From dfa40221d6f07eb1800157f6a7b08be191900fd9 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 09:57:26 -0700 Subject: [PATCH 08/13] forgot to make use of it ;) --- .../com/salesforce/apollo/state/AbstractLifecycleTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index 2ababf109..de2960e19 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -161,7 +161,8 @@ public void before() throws Exception { members.stream().filter(s -> s != testSubject).forEach(s -> context.activate(s)); final var prefix = UUID.randomUUID().toString(); routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { - var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); return localRouter; })); routers.put(testSubject.getId(), From 06849279f1c1ec9466b57be1a8d1bc81bfe679a6 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 10:10:49 -0700 Subject: [PATCH 09/13] add vibe check to CHILLIN --- .../java/com/salesforce/apollo/choam/ViewAssembly.java | 8 ++++++++ .../com/salesforce/apollo/choam/fsm/Reconfiguration.java | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index b68893c8b..238f92f46 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -429,6 +429,7 @@ public void checkViews() { @Override public void chill() { + countdown.set(-1); if (ViewAssembly.this.checkAssembly()) { transitions.certified(); } else { @@ -456,5 +457,12 @@ public void finish() { public void publishViews() { propose(); } + + @Override + public void vibeCheck() { + if (ViewAssembly.this.checkAssembly()) { + transitions.certified(); + } + } } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java index 246daa51a..c76a2492d 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java @@ -29,6 +29,8 @@ public interface Reconfiguration { void publishViews(); + void vibeCheck(); + enum Reconfigure implements Transitions { AWAIT_ASSEMBLY { // Publish the Views of this node @@ -96,6 +98,12 @@ public Transitions certified() { return CERTIFICATION; } + @Override + public Transitions checkAssembly() { + context().vibeCheck(); + return null; + } + // Check to see if we already have a full complement of committee Joins @Entry public void chillin() { From 29c83b60b1a1e89f2e0fc1726d93a913ff530f52 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 11:11:08 -0700 Subject: [PATCH 10/13] use majority certs for Reconfiguration. Add new CONVENE state for view slate completion --- .../apollo/choam/GenesisAssembly.java | 10 +++---- .../salesforce/apollo/choam/ViewAssembly.java | 10 +++++++ .../apollo/choam/fsm/Reconfiguration.java | 30 +++++++++++++++++-- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index cdcb732dd..a5ee566f6 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -106,12 +106,12 @@ public GenesisAssembly(ViewContext vc, CommonCommunications comms, @Override public void certify() { - if (slate.size() != nextAssembly.size()) { + if (slate.size() < params().majority()) { log.info("Not certifying genesis for: {} need: {} slate incomplete: {} on: {}", view.context().getId(), - nextAssembly.size(), slate.keySet().stream().sorted().toList(), params().member().getId()); + params().majority(), slate.keySet().stream().sorted().toList(), params().member().getId()); return; } - assert slate.size() == nextAssembly.size() : "Expected: %s members, slate: %s".formatted(nextAssembly.size(), + assert slate.size() >= params().majority() : "Expected: %s members, slate: %s".formatted(params().majority(), slate.size()); reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(), new NullBlock( @@ -187,12 +187,12 @@ public void publish() { log.trace("Cannot publish genesis, reconfiguration is NULL on: {}", params().member().getId()); return; } - if (witnesses.size() < nextAssembly.size()) { + if (witnesses.size() < params().majority()) { log.trace("Cannot publish genesis: {} with: {} witnesses on: {}", reconfiguration.hash, witnesses.size(), params().member().getId()); return; } - if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < nextAssembly.size()) { + if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < params().majority()) { log.trace("Cannot publish genesis: {} with: {} joins on: {}", reconfiguration.hash, reconfiguration.block.getGenesis().getInitialView().getJoinsCount(), params().member().getId()); return; diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index 238f92f46..c6b854426 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -424,6 +424,7 @@ public void checkAssembly() { } public void checkViews() { + countdown.set(-1); vote(); } @@ -442,6 +443,15 @@ public void complete() { ViewAssembly.this.complete(); } + @Override + public void convened() { + if (viewProposals.size() == params().context().getRingCount()) { + transitions.proposed(); + } else { + countdown.set(2); + } + } + @Override public void failed() { view.onFailure(); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java index c76a2492d..41532259b 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java @@ -23,6 +23,8 @@ public interface Reconfiguration { void complete(); + void convened(); + void failed(); void finish(); @@ -39,11 +41,27 @@ public void publish() { context().publishViews(); } - // We have a majority of members submitting view proposals + // We have a >= majority submitting view proposals + @Override + public Transitions proposed() { + return CONVIENE; + } + }, CONVIENE { + @Override + public Transitions countdownCompleted() { + return proposed(); + } + + // We have a >= majority of members submitting view proposals @Override public Transitions proposed() { return VIEW_AGREEMENT; } + + @Entry + public void conviene() { + context().convened(); + } }, CERTIFICATION { // We have a full complement of the committee view proposals @Override @@ -100,10 +118,15 @@ public Transitions certified() { @Override public Transitions checkAssembly() { - context().vibeCheck(); + context().checkAssembly(); return null; } + @Entry + public void vibin() { + context().vibeCheck(); + } + // Check to see if we already have a full complement of committee Joins @Entry public void chillin() { @@ -157,12 +180,13 @@ public Transitions viewAcquired() { return GATHER; } - // no op+ + // no op @Override public Transitions proposed() { return null; } } + } interface Transitions extends FsmExecutor { From abe662e386f16814e3ef8bbc106a526747617ba8 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 11:14:59 -0700 Subject: [PATCH 11/13] use UE for RBC testing --- .../apollo/messaging/rbc/RbcTest.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java index b9fc84dfb..2d6eb50c2 100644 --- a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java @@ -10,10 +10,7 @@ import com.codahale.metrics.MetricRegistry; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.context.DynamicContext; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; @@ -39,6 +36,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -64,6 +62,7 @@ public class RbcTest { private final List communications = new ArrayList<>(); private final AtomicInteger totalReceived = new AtomicInteger(0); private List messengers; + private ExecutorService executor; @AfterEach public void after() { @@ -71,10 +70,14 @@ public void after() { messengers.forEach(e -> e.stop()); } communications.forEach(e -> e.close(Duration.ofMillis(0))); + if (executor != null) { + executor.shutdown(); + } } @Test public void broadcast() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); MetricRegistry registry = new MetricRegistry(); var entropy = SecureRandom.getInstance("SHA1PRNG"); @@ -96,11 +99,9 @@ public void broadcast() throws Exception { final var prefix = UUID.randomUUID().toString(); final var authentication = ReliableBroadcaster.defaultMessageAdapter(context, DigestAlgorithm.DEFAULT); messengers = members.stream().map(node -> { - var comms = new LocalServer(prefix, node).router(ServerConnectionCache.newBuilder() - .setTarget(30) - .setMetrics( - new ServerConnectionCacheMetricsImpl( - registry))); + var comms = new LocalServer(prefix, node).router( + ServerConnectionCache.newBuilder().setTarget(30).setMetrics(new ServerConnectionCacheMetricsImpl(registry)), + executor); communications.add(comms); comms.start(); return new ReliableBroadcaster(context, node, parameters.build(), comms, metrics, authentication); From cd287c2c3eaaaab4c29cc5e744e34a9f3610c8ed Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 14:08:44 -0700 Subject: [PATCH 12/13] moar threads for the tests --- .../java/com/salesforce/apollo/archipelago/UnsafeExecutors.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java index 5d4c19057..cd65bd4fe 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java @@ -40,7 +40,7 @@ public class UnsafeExecutors { public static ExecutorService newVirtualThreadPerTaskExecutor() { var executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); - executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); executor.prestartAllCoreThreads(); return virtualThreadExecutor(executor); } From 77dfbc9caec1e6c57b9b988040e4801fa2454ecc Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 14:15:43 -0700 Subject: [PATCH 13/13] normal logging --- choam/src/test/resources/logback-test.xml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml index 63b2b14e6..d5be6a052 100644 --- a/choam/src/test/resources/logback-test.xml +++ b/choam/src/test/resources/logback-test.xml @@ -33,7 +33,7 @@ - + @@ -41,10 +41,6 @@ - - - -