From 6d9b04245657e23166e1ba400b133b3f2e8b9fd1 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 09:25:40 -0700 Subject: [PATCH] revert --- .../com/salesforce/apollo/choam/DynamicTest.java | 9 ++++++++- .../com/salesforce/apollo/choam/TestCHOAM.java | 14 ++++++++------ .../com/salesforce/apollo/fireflies/MtlsTest.java | 3 ++- .../apollo/archipelago/RouterSupplier.java | 15 +-------------- .../apollo/model/ContainmentDomainTest.java | 14 +++++++++----- .../com/salesforce/apollo/model/DomainTest.java | 14 +++++++++----- .../salesforce/apollo/model/FireFliesTest.java | 14 +++++++++----- .../com/salesforce/apollo/state/CHOAMTest.java | 9 ++++++++- 8 files changed, 54 insertions(+), 38 deletions(-) diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java index 9789f07fe..7cb99d6d1 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java @@ -3,6 +3,7 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContext; @@ -25,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -41,6 +43,7 @@ public class DynamicTest { private Map routers; private Map choams; private Map> contexts; + private ExecutorService executor; @BeforeEach public void setUp() throws Exception { @@ -61,10 +64,11 @@ public void setUp() throws Exception { .map(ControlledIdentifierMember::new) .map(e -> (Member) e) .toList(); + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); final var prefix = UUID.randomUUID().toString(); routers = members.stream() .collect(Collectors.toMap(m -> m, m -> new LocalServer(prefix, m).router( - ServerConnectionCache.newBuilder().setTarget(cardinality * 2)))); + ServerConnectionCache.newBuilder().setTarget(cardinality * 2), executor))); var template = Parameters.newBuilder() .setGenerateGenesis(true) @@ -215,6 +219,9 @@ public void tearDown() throws Exception { routers = null; } members = null; + if (executor != null) { + executor.shutdown(); + } } private CHOAM constructCHOAM(SigningMember m, Parameters.Builder params, Context context) { diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index 7d5e0235a..64be54d3c 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -8,10 +8,7 @@ import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.MetricRegistry; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters.ProducerParameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -71,6 +68,7 @@ public class TestCHOAM { private MetricRegistry registry; private Map routers; private ScheduledExecutorService scheduler; + private ExecutorService executor; @AfterEach public void after() throws Exception { @@ -85,6 +83,9 @@ public void after() throws Exception { if (scheduler != null) { scheduler.shutdown(); } + if (executor != null) { + executor.shutdown(); + } members = null; registry = null; } @@ -92,6 +93,7 @@ public void after() throws Exception { @BeforeEach public void before() throws Exception { scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory()); + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var origin = DigestAlgorithm.DEFAULT.getOrigin(); registry = new MetricRegistry(); var metrics = new ChoamMetricsImpl(origin, registry); @@ -101,7 +103,7 @@ public void before() throws Exception { var params = Parameters.newBuilder() .setGenerateGenesis(true) - .setGenesisViewId(origin.prefix("Slack")) + .setGenesisViewId(origin.prefix(entropy.nextLong())) .setGossipDuration(Duration.ofMillis(20)) .setProducer(ProducerParameters.newBuilder() .setMaxBatchCount(15_000) @@ -128,7 +130,7 @@ public void before() throws Exception { .collect(Collectors.toMap(m -> m.getId(), m -> new LocalServer(prefix, m).router( ServerConnectionCache.newBuilder() .setMetrics(new ServerConnectionCacheMetricsImpl(registry)) - .setTarget(CARDINALITY)))); + .setTarget(CARDINALITY), executor))); choams = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { var recording = new AtomicInteger(); blocks.put(m.getId(), recording); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java index bc805d703..36a616dd3 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java @@ -39,6 +39,7 @@ import java.time.Instant; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -127,7 +128,7 @@ public void smoke() throws Exception { builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry)); CertificateWithPrivateKey certWithKey = certs.get(node.getId()); Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router( - builder); + builder, Executors.newVirtualThreadPerTaskExecutor()); communications.add(comms); return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, Verifiers.NONE, comms, parameters, DigestAlgorithm.DEFAULT, metrics); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java index 8277eb526..a07eafb56 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java @@ -13,7 +13,7 @@ import java.util.Collections; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; import java.util.function.Predicate; import java.util.function.Supplier; @@ -21,19 +21,6 @@ * @author hal.hildebrand */ public interface RouterSupplier { - static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory) { - return newCachedThreadPool(corePoolSize, threadFactory, true); - } - - static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory, boolean preStart) { - var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - new SynchronousQueue(), threadFactory); - if (preStart) { - threadPoolExecutor.prestartAllCoreThreads(); - } - return threadPoolExecutor; - } - default Router router() { return router(ServerConnectionCache.newBuilder(), RouterImpl::defaultServerLimit, null); } diff --git a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java index b90d4f59c..641c211c8 100644 --- a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java @@ -6,10 +6,7 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.EndpointProvider; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -36,6 +33,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -50,6 +48,7 @@ public class ContainmentDomainTest { "Give me food or give me slack or kill me".getBytes()); private final ArrayList domains = new ArrayList<>(); private final ArrayList routers = new ArrayList<>(); + private ExecutorService executor; @AfterEach public void after() { @@ -57,10 +56,14 @@ public void after() { domains.clear(); routers.forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); + if (executor != null) { + executor.shutdown(); + } } @BeforeEach public void before() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); final var commsDirectory = Path.of("target/comms"); commsDirectory.toFile().mkdirs(); @@ -83,7 +86,8 @@ public void before() throws Exception { final var group = DigestAlgorithm.DEFAULT.getOrigin(); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); routers.add(localRouter); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1), diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index d5ab77ea9..122956188 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -6,10 +6,7 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.EndpointProvider; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -38,6 +35,7 @@ import java.util.Arrays; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -55,6 +53,7 @@ public class DomainTest { "Give me food or give me slack or kill me".getBytes()); private final ArrayList domains = new ArrayList<>(); private final ArrayList routers = new ArrayList<>(); + private ExecutorService executor; public static void smoke(Oracle oracle) throws Exception { // Namespace @@ -217,10 +216,14 @@ public void after() { domains.clear(); routers.forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); + if (executor != null) { + executor.shutdown(); + } } @BeforeEach public void before() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var ffParams = com.salesforce.apollo.fireflies.Parameters.newBuilder(); var entropy = SecureRandom.getInstance("SHA1PRNG"); entropy.setSeed(new byte[] { 6, 6, 6 }); @@ -240,7 +243,8 @@ public void before() throws Exception { final var group = DigestAlgorithm.DEFAULT.getOrigin(); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); routers.add(localRouter); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1), diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index 5df4b74a3..867d8d4d0 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -6,10 +6,7 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.EndpointProvider; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -37,6 +34,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -55,6 +53,7 @@ public class FireFliesTest { private final List domains = new ArrayList<>(); private final Map routers = new HashMap<>(); + private ExecutorService executor; @AfterEach public void after() { @@ -62,10 +61,14 @@ public void after() { domains.clear(); routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); + if (executor != null) { + executor.shutdown(); + } } @BeforeEach public void before() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var ffParams = com.salesforce.apollo.fireflies.Parameters.newBuilder(); var entropy = SecureRandom.getInstance("SHA1PRNG"); entropy.setSeed(new byte[] { 6, 6, 6 }); @@ -84,7 +87,8 @@ public void before() throws Exception { identities.forEach((digest, id) -> { var context = new DynamicContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3); final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofSeconds(5), "jdbc:h2:mem:%s-state".formatted(digest), diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java index efa4cc461..1ec5449d1 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java @@ -11,6 +11,7 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.CHOAM; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters; @@ -82,6 +83,7 @@ public class CHOAMTest { private MetricRegistry registry; private Map routers; private ScheduledExecutorService scheduler; + private ExecutorService executor; private static Txn initialInsert() { return Txn.newBuilder() @@ -107,6 +109,9 @@ public void after() throws Exception { scheduler.shutdownNow(); scheduler = null; } + if (executor != null) { + executor.shutdown(); + } updaters.values().forEach(up -> up.close()); updaters.clear(); members = null; @@ -123,6 +128,7 @@ public void after() throws Exception { @BeforeEach public void before() throws Exception { scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory()); + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); registry = new MetricRegistry(); checkpointDirBase = new File("target/ct-chkpoints-" + Entropy.nextBitsStreamLong()); Utils.clean(checkpointDirBase); @@ -155,7 +161,8 @@ public void before() throws Exception { members.forEach(m -> context.activate(m)); final var prefix = UUID.randomUUID().toString(); routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { - var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); return localRouter; })); choams = members.stream()