From abe662e386f16814e3ef8bbc106a526747617ba8 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 8 Jun 2024 11:14:59 -0700 Subject: [PATCH] use UE for RBC testing --- .../apollo/messaging/rbc/RbcTest.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java index b9fc84dfb..2d6eb50c2 100644 --- a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java @@ -10,10 +10,7 @@ import com.codahale.metrics.MetricRegistry; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.context.DynamicContext; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; @@ -39,6 +36,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -64,6 +62,7 @@ public class RbcTest { private final List communications = new ArrayList<>(); private final AtomicInteger totalReceived = new AtomicInteger(0); private List messengers; + private ExecutorService executor; @AfterEach public void after() { @@ -71,10 +70,14 @@ public void after() { messengers.forEach(e -> e.stop()); } communications.forEach(e -> e.close(Duration.ofMillis(0))); + if (executor != null) { + executor.shutdown(); + } } @Test public void broadcast() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); MetricRegistry registry = new MetricRegistry(); var entropy = SecureRandom.getInstance("SHA1PRNG"); @@ -96,11 +99,9 @@ public void broadcast() throws Exception { final var prefix = UUID.randomUUID().toString(); final var authentication = ReliableBroadcaster.defaultMessageAdapter(context, DigestAlgorithm.DEFAULT); messengers = members.stream().map(node -> { - var comms = new LocalServer(prefix, node).router(ServerConnectionCache.newBuilder() - .setTarget(30) - .setMetrics( - new ServerConnectionCacheMetricsImpl( - registry))); + var comms = new LocalServer(prefix, node).router( + ServerConnectionCache.newBuilder().setTarget(30).setMetrics(new ServerConnectionCacheMetricsImpl(registry)), + executor); communications.add(comms); comms.start(); return new ReliableBroadcaster(context, node, parameters.build(), comms, metrics, authentication);