From 5a926c55f663c7a92f6fd38270beb3bc30b94502 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Mon, 3 Jun 2024 17:02:13 -0700 Subject: [PATCH] Saner resource management. Don't set exec on clients. --- .../salesforce/apollo/choam/DynamicTest.java | 2 +- .../apollo/choam/GenesisAssemblyTest.java | 2 +- .../apollo/choam/MembershipTests.java | 2 +- .../salesforce/apollo/choam/TestCHOAM.java | 4 +- .../apollo/ethereal/EtherealTest.java | 4 +- .../apollo/fireflies/ChurnTest.java | 10 +-- .../salesforce/apollo/fireflies/E2ETest.java | 6 +- .../client/GorgoneionClientTest.java | 8 +- .../apollo/gorgoneion/GorgoneionTest.java | 4 +- .../apollo/demesnes/FireFliesTrace.java | 2 +- .../apollo/leyden/LeydenJarTest.java | 2 +- .../apollo/archipelago/Enclave.java | 15 ++-- .../apollo/archipelago/LocalServer.java | 15 ++-- .../apollo/archipelago/MtlsClient.java | 4 +- .../apollo/archipelago/MtlsServer.java | 13 +-- .../apollo/archipelago/RouterImpl.java | 42 +++++---- .../apollo/archipelago/RouterSupplier.java | 23 ++++- .../archipelago/ServerConnectionCache.java | 42 ++++++--- .../apollo/archipelago/UnsafeExecutors.java | 88 +++++++++++++++++++ .../messaging/rbc/ReliableBroadcaster.java | 11 +-- .../apollo/archipelago/EnclaveTest.java | 6 +- .../apollo/archipelago/FernetTest.java | 4 +- .../apollo/archipelago/LocalServerTest.java | 4 +- .../apollo/archipelago/RouterTest.java | 2 +- .../apollo/messaging/rbc/RbcTest.java | 4 +- .../apollo/ring/RingCommunicationsTest.java | 2 +- .../apollo/ring/RingIteratorTest.java | 2 +- .../apollo/ring/SliceIteratorTest.java | 2 +- .../apollo/model/ContainmentDomainTest.java | 2 +- .../salesforce/apollo/model/DomainTest.java | 2 +- .../apollo/model/FireFliesTest.java | 2 +- .../apollo/model/demesnes/DemesneTest.java | 6 +- .../comm/grpc/ForwardingManagedChannel.java | 68 ++++++++++++++ .../apollo/state/AbstractLifecycleTest.java | 4 +- .../salesforce/apollo/state/CHOAMTest.java | 4 +- .../stereotomy/services/grpc/TestBinder.java | 4 +- .../services/grpc/TestEventObserver.java | 4 +- .../services/grpc/TestEventValidation.java | 4 +- .../services/grpc/TestKerlService.java | 4 +- .../services/grpc/TestResolver.java | 4 +- .../apollo/thoth/AbstractDhtTest.java | 2 +- .../apollo/thoth/BootstrappingTest.java | 2 +- .../apollo/thoth/DhtRebalanceTest.java | 2 +- .../apollo/thoth/PublisherTest.java | 4 +- 44 files changed, 321 insertions(+), 122 deletions(-) create mode 100644 protocols/src/main/java/com/salesforce/apollo/comm/grpc/ForwardingManagedChannel.java diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java index d3d583f35..fe29a7e45 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java @@ -212,7 +212,7 @@ public void tearDown() throws Exception { choams = null; } if (routers != null) { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); routers = null; } members = null; diff --git a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java index 079dbe402..b113b13c8 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java @@ -183,7 +183,7 @@ public Block reconfigure(Map joining, Digest nextViewId, HashedBlo genii.values().forEach(GenesisAssembly::start); complete.await(15, TimeUnit.SECONDS); } finally { - communications.values().forEach(r -> r.close(Duration.ofSeconds(1))); + communications.values().forEach(r -> r.close(Duration.ofSeconds(0))); genii.values().forEach(GenesisAssembly::stop); } } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java index f56f69436..effd854f5 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java @@ -207,7 +207,7 @@ private void shutdown() { choams = null; } if (routers != null) { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); routers = null; } } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index 8f0aff039..391706bf2 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -75,7 +75,7 @@ public class TestCHOAM { @AfterEach public void after() throws Exception { if (routers != null) { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); routers = null; } if (choams != null) { @@ -197,7 +197,7 @@ public void submitMultiplTxn() throws Exception { .filter(i -> i < max) .count()); } finally { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); choams.values().forEach(e -> e.stop()); System.out.println(); diff --git a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java index c6f38106f..7f8443c27 100644 --- a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java +++ b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java @@ -160,7 +160,7 @@ public void unbounded() throws NoSuchAlgorithmException, InterruptedException, I } finally { controllers.forEach(Ethereal::stop); gossipers.forEach(ChRbcGossip::stop); - comms.forEach(e -> e.close(Duration.ofSeconds(1))); + comms.forEach(e -> e.close(Duration.ofSeconds(0))); } final var expected = expectedEpochs * (EPOCH_LENGTH - 1); @@ -287,7 +287,7 @@ private void one(int iteration) controllers.forEach(c -> System.out.println(c.dump())); controllers.forEach(Ethereal::stop); gossipers.forEach(ChRbcGossip::stop); - comms.forEach(e -> e.close(Duration.ofSeconds(1))); + comms.forEach(e -> e.close(Duration.ofSeconds(0))); } final var expected = NUM_EPOCHS * (EPOCH_LENGTH - 1); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java index d209c439b..9825e45f6 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java @@ -74,10 +74,10 @@ public void after() { views.clear(); } - communications.forEach(e -> e.close(Duration.ofSeconds(1))); + communications.forEach(e -> e.close(Duration.ofSeconds(0))); communications.clear(); - gateways.forEach(e -> e.close(Duration.ofSeconds(1))); + gateways.forEach(e -> e.close(Duration.ofSeconds(0))); gateways.clear(); } @@ -210,8 +210,8 @@ public void churn() throws Exception { for (int j = c.size() - 1; j >= c.size() - delta; j--) { final var view = c.get(j); view.stop(); - r.get(j).close(Duration.ofSeconds(1)); - g.get(j).close(Duration.ofSeconds(1)); + r.get(j).close(Duration.ofSeconds(0)); + g.get(j).close(Duration.ofSeconds(0)); removed.add(view.getNode().getId()); } c = c.subList(0, c.size() - delta); @@ -239,7 +239,7 @@ public void churn() throws Exception { } views.forEach(e -> e.stop()); - communications.forEach(e -> e.close(Duration.ofSeconds(1))); + communications.forEach(e -> e.close(Duration.ofSeconds(0))); System.out.println(); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java index a6e58f479..1300fe554 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java @@ -83,10 +83,10 @@ public void after() { views.clear(); } - communications.forEach(e -> e.close(Duration.ofSeconds(1))); + communications.forEach(e -> e.close(Duration.ofSeconds(0))); communications.clear(); - gateways.forEach(e -> e.close(Duration.ofSeconds(1))); + gateways.forEach(e -> e.close(Duration.ofSeconds(0))); gateways.clear(); } @@ -209,7 +209,7 @@ private void initialize() { } private void post() { - communications.forEach(e -> e.close(Duration.ofSeconds(1))); + communications.forEach(e -> e.close(Duration.ofSeconds(0))); views.forEach(view -> view.stop()); System.out.println("Node 0 metrics"); ConsoleReporter.forRegistry(node0Registry) diff --git a/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java b/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java index c84e85eac..5b3f962c4 100644 --- a/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java +++ b/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java @@ -97,8 +97,8 @@ public void clientSmoke() throws Exception { var invitation = gorgoneionClient.apply(Duration.ofSeconds(60)); - gorgonRouter.close(Duration.ofSeconds(1)); - clientRouter.close(Duration.ofSeconds(1)); + gorgonRouter.close(Duration.ofSeconds(0)); + clientRouter.close(Duration.ofSeconds(0)); assertNotNull(invitation); assertNotEquals(Validations.getDefaultInstance(), invitation); @@ -113,10 +113,10 @@ public void clientSmoke() throws Exception { @AfterEach public void closeRouters() { if (gorgonRouter != null) { - gorgonRouter.close(Duration.ofSeconds(3)); + gorgonRouter.close(Duration.ofSeconds(0)); } if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(3)); + clientRouter.close(Duration.ofSeconds(0)); } } diff --git a/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java b/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java index 57386081a..b31f69259 100644 --- a/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java +++ b/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java @@ -109,8 +109,8 @@ public void smokin() throws Exception { .build()) .setNonce(fs) .build(), Duration.ofSeconds(1)); - gorgonRouter.close(Duration.ofSeconds(1)); - clientRouter.close(Duration.ofSeconds(1)); + gorgonRouter.close(Duration.ofSeconds(0)); + clientRouter.close(Duration.ofSeconds(0)); assertNotNull(invitation); assertNotEquals(Validations.getDefaultInstance(), invitation); assertEquals(1, invitation.getValidationsCount()); diff --git a/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java b/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java index 8e2a95111..0bf42a2cc 100644 --- a/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java +++ b/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java @@ -169,7 +169,7 @@ public static void smoke(Oracle oracle) throws Exception { public void after() { domains.forEach(n -> n.stop()); domains.clear(); - routers.values().forEach(r -> r.close(Duration.ofSeconds(1))); + routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); } diff --git a/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java b/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java index 1e5ffce2e..1944a54cc 100644 --- a/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java +++ b/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java @@ -46,7 +46,7 @@ public class LeydenJarTest { @AfterEach public void after() { - routers.values().forEach(r -> r.close(Duration.ofSeconds(2))); + routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); dhts.values().forEach(t -> t.stop()); dhts.clear(); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java index 5ae364d6c..08a830894 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.function.Predicate; @@ -46,7 +46,6 @@ public class Enclave implements RouterSupplier { private final static Class channelType = IMPL.getChannelType(); private static final Logger log = LoggerFactory.getLogger(Enclave.class); - private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); private final DomainSocketAddress bridge; private final Consumer contextRegistration; private final DomainSocketAddress endpoint; @@ -63,10 +62,6 @@ public Enclave(Member from, DomainSocketAddress endpoint, DomainSocketAddress br this.fromString = qb64(from.getId()); } - public void close() { - eventLoopGroup.shutdownGracefully(); - } - /** * @return the DomainSocketAddress for this Enclave */ @@ -77,7 +72,10 @@ public DomainSocketAddress getEndpoint() { @Override public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry, List interceptors, - Predicate validator) { + Predicate validator, ExecutorService executor) { + if (executor == null) { + executor = Executors.newVirtualThreadPerTaskExecutor(); + } var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get()); if (limitsRegistry != null) { limitsBuilder.metricRegistry(limitsRegistry); @@ -111,7 +109,7 @@ public Digest getAgent() { public Digest getFrom() { return Constants.SERVER_CLIENT_ID_KEY.get(); } - }, contextRegistration, validator); + }, contextRegistration, validator, executor); } private ManagedChannel connectTo(Member to) { @@ -132,7 +130,6 @@ public void start(Listener responseListener, Metadata headers) { }; final var builder = NettyChannelBuilder.forAddress(bridge) .withOption(ChannelOption.TCP_NODELAY, true) - .executor(executor) .eventLoopGroup(eventLoopGroup) .channelType(channelType) .usePlaintext() diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java index fcee2a5af..781217edb 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java @@ -25,7 +25,8 @@ import java.lang.reflect.Method; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Predicate; import java.util.function.Supplier; @@ -39,7 +40,6 @@ public class LocalServer implements RouterSupplier { private static final Logger log = LoggerFactory.getLogger(LocalServer.class); private static final String NAME_TEMPLATE = "%s-%s"; - private final Executor executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); private final ClientInterceptor clientInterceptor; private final Member from; private final String prefix; @@ -70,15 +70,17 @@ public Member getFrom() { @Override public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry, List interceptors, - Predicate validator) { + Predicate validator, ExecutorService executor) { + if (executor == null) { + executor = Executors.newVirtualThreadPerTaskExecutor(); + } String name = String.format(NAME_TEMPLATE, prefix, qb64(from.getId())); var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get()); if (limitsRegistry != null) { limitsBuilder.metricRegistry(limitsRegistry); } ServerBuilder serverBuilder = InProcessServerBuilder.forName(name) - .executor( - UnsafeExecutors.newVirtualThreadPerTaskExecutor()) + .executor(executor) .intercept(ConcurrencyLimitServerInterceptor.newBuilder( limitsBuilder.build()) .statusSupplier( @@ -95,13 +97,12 @@ public Digest getFrom() { return Constants.SERVER_CLIENT_ID_KEY.get(); } }, d -> { - }, validator); + }, validator, executor); } private ManagedChannel connectTo(Member to) { final var name = String.format(NAME_TEMPLATE, prefix, qb64(to.getId())); final InProcessChannelBuilder builder = InProcessChannelBuilder.forName(name) - .executor(executor) .usePlaintext() .intercept(clientInterceptor); disableTrash(builder); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java index 58db9ec8e..8f5b2a7be 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java @@ -19,7 +19,6 @@ import io.netty.handler.ssl.ClientAuth; import java.net.SocketAddress; -import java.util.concurrent.Executor; /** * @author hal.hildebrand @@ -29,11 +28,10 @@ public class MtlsClient { private final ManagedChannel channel; public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, ClientContextSupplier supplier, - CertificateValidator validator, Executor executor) { + CertificateValidator validator) { Limiter limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build(); channel = NettyChannelBuilder.forAddress(address) - .executor(executor) .withOption(ChannelOption.TCP_NODELAY, true) .sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3)) .intercept(new ConcurrencyLimitClientInterceptor(limiter, diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java index 0a800fa92..540d41119 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java @@ -43,7 +43,7 @@ import java.security.cert.X509Certificate; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; import java.util.function.Predicate; @@ -63,7 +63,6 @@ public class MtlsServer implements RouterSupplier { private final Member from; private final Context.Key sslSessionContext = Context.key("SSLSession"); private final ServerContextSupplier supplier; - private final Executor executor; public MtlsServer(Member from, EndpointProvider epProvider, Function contextSupplier, ServerContextSupplier supplier) { @@ -71,7 +70,6 @@ public MtlsServer(Member from, EndpointProvider epProvider, Function() { @Override public Digest load(X509Certificate key) throws Exception { @@ -142,7 +140,10 @@ public static SslContext forServer(ClientAuth clientAuth, String alias, X509Cert @Override public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry, List interceptors, - Predicate validator) { + Predicate validator, ExecutorService executor) { + if (executor == null) { + executor = Executors.newVirtualThreadPerTaskExecutor(); + } var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get()); if (limitsRegistry != null) { limitsBuilder.metricRegistry(limitsRegistry); @@ -174,14 +175,14 @@ public Digest getFrom() { } }; return new RouterImpl(from, serverBuilder, cacheBuilder.setFactory(t -> connectTo(t)), identity, c -> { - }, validator); + }, validator, executor); } private ManagedChannel connectTo(Member to) { var address = epProvider.addressFor(to); log.debug("Connecting to: {} address: {} on: {}", to.getId(), address, from.getId()); return new MtlsClient(address, epProvider.getClientAuth(), epProvider.getAlias(), contextSupplier.apply(from), - epProvider.getValidator(), executor).getChannel(); + epProvider.getValidator()).getChannel(); } private X509Certificate getCert() { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java index e7506c4e9..57705f081 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java @@ -23,6 +23,8 @@ import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -39,38 +41,47 @@ */ public class RouterImpl implements Router { - private final static Logger log = LoggerFactory.getLogger( - RouterImpl.class); - private final ServerConnectionCache cache; - private final ClientIdentity clientIdentityProvider; - private final Consumer contextRegistration; - private final Member from; - private final MutableHandlerRegistry registry = new MutableHandlerRegistry(); - private final Server server; - private final Map> services = new ConcurrentHashMap<>(); - private final AtomicBoolean started = new AtomicBoolean(); - private final Predicate validator; + private final static Logger log = LoggerFactory.getLogger(RouterImpl.class); + + private final ServerConnectionCache cache; + private final ClientIdentity clientIdentityProvider; + private final Consumer contextRegistration; + private final Member from; + private final MutableHandlerRegistry registry = new MutableHandlerRegistry(); + private final Server server; + private final Map> services = new ConcurrentHashMap<>(); + private final AtomicBoolean started = new AtomicBoolean(); + private final Predicate validator; + private final ExecutorService executor; public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, ClientIdentity clientIdentityProvider) { this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> { - }); + }, Executors.newVirtualThreadPerTaskExecutor()); } public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, - ClientIdentity clientIdentityProvider, Consumer contextRegistration) { - this(from, serverBuilder, cacheBuilder, clientIdentityProvider, contextRegistration, null); + ClientIdentity clientIdentityProvider, ExecutorService executor) { + this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> { + }, executor); + } + + public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, + ClientIdentity clientIdentityProvider, Consumer contextRegistration, + ExecutorService executor) { + this(from, serverBuilder, cacheBuilder, clientIdentityProvider, contextRegistration, null, executor); } public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, ClientIdentity clientIdentityProvider, Consumer contextRegistration, - Predicate validator) { + Predicate validator, ExecutorService executor) { this.server = serverBuilder.fallbackHandlerRegistry(registry).intercept(serverInterceptor()).build(); this.cache = cacheBuilder.clone().setMember(from.getId()).build(); this.clientIdentityProvider = clientIdentityProvider; this.contextRegistration = contextRegistration; this.from = from; this.validator = validator; + this.executor = executor; } public static ClientInterceptor clientInterceptor(Digest ctx) { @@ -124,6 +135,7 @@ public void close(Duration await) { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + executor.shutdown(); } @Override diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java index 5a8d71df2..a88882f09 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java @@ -13,6 +13,7 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.*; import java.util.function.Predicate; import java.util.function.Supplier; @@ -20,6 +21,18 @@ * @author hal.hildebrand */ public interface RouterSupplier { + static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory) { + return newCachedThreadPool(corePoolSize, threadFactory, true); + } + + static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory, boolean preStart) { + var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, + new SynchronousQueue(), threadFactory); + if (preStart) { + threadPoolExecutor.prestartAllCoreThreads(); + } + return threadPoolExecutor; + } default Router router() { return router(ServerConnectionCache.newBuilder(), RouterImpl::defaultServerLimit, null); @@ -39,8 +52,14 @@ default Router router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, + LimitsRegistry limitsRegistry, List interceptors, + Predicate validator) { + return router(cacheBuilder, serverLimit, limitsRegistry, interceptors, validator, null); + + } + Router router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry, List interceptors, - Predicate validator); - + Predicate validator, ExecutorService executor); } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java index fca126e8d..d568bd361 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.time.Instant; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; @@ -44,18 +45,19 @@ */ public class ServerConnectionCache { - private final static Logger log = LoggerFactory.getLogger( - ServerConnectionCache.class); - private final Map cache = new HashMap<>(); - private final Clock clock; - private final ServerConnectionFactory factory; - private final ReentrantLock lock = new ReentrantLock(true); - private final ServerConnectionCacheMetrics metrics; - private final Duration minIdle; - private final PriorityQueue queue = new PriorityQueue<>(); - private final int target; - private final Digest member; - private final CallCredentials credentials; + private final static Logger log = LoggerFactory.getLogger(ServerConnectionCache.class); + + private final Map cache = new HashMap<>(); + private final Clock clock; + private final ServerConnectionFactory factory; + private final ReentrantLock lock = new ReentrantLock(true); + private final ServerConnectionCacheMetrics metrics; + private final Duration minIdle; + private final PriorityQueue queue = new PriorityQueue<>(); + private final int target; + private final Digest member; + private final CallCredentials credentials; + private final AtomicBoolean open = new AtomicBoolean(true); public ServerConnectionCache(Digest member, CallCredentials credentials, ServerConnectionFactory factory, int target, Duration minIdle, Clock clock, ServerConnectionCacheMetrics metrics) { @@ -74,6 +76,9 @@ public static Builder newBuilder() { } public ManagedServerChannel borrow(Digest context, Member to) { + if (!open.get()) { + throw new IllegalStateException("not open on: " + member); + } return lock(() -> { if (cache.size() >= target) { log.debug("Cache target open connections exceeded: {}, opening to: {} on: {}", target, to.getId(), @@ -110,15 +115,21 @@ public ManagedServerChannel borrow(Digest context, Member to) { } public T borrow(Digest context, Member to, CreateClientCommunications createFunction) { + if (!open.get()) { + throw new IllegalStateException("not open on: " + member); + } return createFunction.create(borrow(context, to)); } public void close() { + if (!open.compareAndSet(true, false)) { + return; + } lock(() -> { log.info("Closing connection cache on: {}", member); for (ReleasableManagedChannel conn : new ArrayList<>(cache.values())) { try { - conn.channel.shutdownNow(); + conn.channel.shutdown(); if (metrics != null) { metrics.channelOpenDuration().update(Duration.between(conn.created, Instant.now(clock))); metrics.openConnections().dec(); @@ -134,6 +145,9 @@ public void close() { } public void release(ReleasableManagedChannel connection) { + if (!open.get()) { + return; + } lock(() -> { if (connection.decrementBorrow()) { log.debug("Releasing connection to: {} on: {}", connection.member.getId(), member); @@ -150,7 +164,7 @@ public void release(ReleasableManagedChannel connection) { private boolean close(ReleasableManagedChannel connection) { if (connection.isCloseable()) { try { - connection.channel.shutdownNow(); + connection.channel.shutdown(); } catch (Throwable t) { log.debug("Error closing connection to: {} on: {}", connection.member.getId(), connection.member); } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java index 1b0e35cb6..eec5c0926 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java @@ -62,6 +62,94 @@ private static void setExecutor(Object builder, Object executor) { } } + public static ThreadPoolExecutor newCachedThreadPool(int corePoolSize) { + return newCachedThreadPool(corePoolSize, true); + } + + public static ThreadPoolExecutor newCachedThreadPool(int corePoolSize, boolean prestart) { + var executorService = newCachedThreadPool(corePoolSize, new ForkJoinPool()); + if (prestart) { + executorService.prestartAllCoreThreads(); + } + return executorService; + } + + public static ThreadPoolExecutor newCachedThreadPool(int corePoolSize, ExecutorService executor) { + ThreadFactory factory = r -> { + var builder = Thread.ofVirtual(); + setExecutor(builder, executor); + return builder.unstarted(r); + }; + return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, + new SynchronousQueue(), factory) { + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit); + } + + @Override + public boolean isShutdown() { + return executor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executor.isTerminated() && super.isTerminated(); + } + + @Override + public void shutdown() { + executor.shutdown(); + super.shutdown(); + } + + @Override + public List shutdownNow() { + var returned = executor.shutdownNow(); + super.shutdownNow(); + return returned; + } + }; + } + + public static ExecutorService newFixedThreadPool(int nThreads, ExecutorService executor) { + ThreadFactory factory = r -> { + var builder = Thread.ofVirtual(); + setExecutor(builder, executor); + return builder.unstarted(r); + }; + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), factory) { + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit); + } + + @Override + public boolean isShutdown() { + return executor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executor.isTerminated() && super.isTerminated(); + } + + @Override + public void shutdown() { + executor.shutdown(); + super.shutdown(); + } + + @Override + public List shutdownNow() { + var returned = executor.shutdownNow(); + super.shutdownNow(); + return returned; + } + }; + } + private static class BTB { private int characteristics; private long counter; diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index 8de6acb5d..290f5299a 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -77,6 +77,7 @@ public ReliableBroadcaster(Context context, SigningMember member, Parame r -> new RbcServer(communications.getClientIdentityProvider(), metrics, r), getCreate(metrics), ReliableBroadcast.getLocalLoopback(member)); gossiper = new RingCommunications<>(context, member, this.comm); + gossiper.ignoreSelf(); this.adapter = adapter; } @@ -230,18 +231,18 @@ private Reconcile gossipRound(ReliableBroadcast link, int ring) { if (!started.get()) { return null; } - log.trace("rbc gossiping[{}] with: {} ring: {} on: {}", buffer.round(), member.getId(), + log.trace("rbc gossiping[{}:{}] with: {} ring: {} on: {}", context.getId(), buffer.round(), link.getMember().getId(), ring, member.getId()); try { return link.gossip( MessageBff.newBuilder().setRing(ring).setDigests(buffer.forReconcilliation().toBff()).build()); } catch (StatusRuntimeException sre) { - log.trace("rbc gossiping[{}] failed: {} with: {} ring: {} on: {}", buffer.round(), sre.getStatus(), - link.getMember().getId(), ring, member.getId()); + log.trace("rbc gossiping[{}:{}] failed: {} with: {} ring: {} on: {}", context.getId(), buffer.round(), + sre.getStatus(), link.getMember().getId(), ring, member.getId()); return null; } catch (Throwable e) { - log.trace("rbc gossiping[{}] failed with: {} ring: {} on: {}", buffer.round(), link.getMember().getId(), - ring, member.getId(), e); + log.trace("rbc gossiping[{}:{}] failed with: {} ring: {} on: {}", context.getId(), buffer.round(), + link.getMember().getId(), ring, member.getId(), e); return null; } } diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java index 55e8c78f2..85ce08693 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java @@ -142,9 +142,9 @@ public void smokin() throws Exception { msg = resultB.unpack(ByteMessage.class); assertEquals("Hello Server B", msg.getContents().toStringUtf8()); - portal.close(Duration.ofSeconds(1)); - router1.close(Duration.ofSeconds(1)); - router2.close(Duration.ofSeconds(1)); + portal.close(Duration.ofSeconds(0)); + router1.close(Duration.ofSeconds(0)); + router2.close(Duration.ofSeconds(0)); } private ManagedChannel handler(DomainSocketAddress address) { diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/FernetTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/FernetTest.java index 615a4cf43..612a34e13 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipelago/FernetTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/FernetTest.java @@ -115,8 +115,8 @@ public void smokin() throws Exception { assertNotNull(resultB); assertEquals("Hello Server A", resultB.unpack(ByteMessage.class).getContents().toStringUtf8()); - routerA.close(Duration.ofSeconds(1)); - routerB.close(Duration.ofSeconds(1)); + routerA.close(Duration.ofSeconds(0)); + routerB.close(Duration.ofSeconds(0)); } public interface TestIt { diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/LocalServerTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/LocalServerTest.java index 3099ac332..3c7cf715d 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipelago/LocalServerTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/LocalServerTest.java @@ -84,8 +84,8 @@ public void smokin() throws Exception { assertNotNull(resultB); assertEquals("Hello Server A", resultB.unpack(ByteMessage.class).getContents().toStringUtf8()); - routerA.close(Duration.ofSeconds(1)); - routerB.close(Duration.ofSeconds(1)); + routerA.close(Duration.ofSeconds(0)); + routerB.close(Duration.ofSeconds(0)); } public interface TestIt { diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/RouterTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/RouterTest.java index 919c8aaaa..884142ded 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipelago/RouterTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/RouterTest.java @@ -83,7 +83,7 @@ public Any ping(Any request) { msg = resultB.unpack(ByteMessage.class); assertEquals("Hello Server B", msg.getContents().toStringUtf8()); - router.close(Duration.ofSeconds(1)); + router.close(Duration.ofSeconds(0)); } public interface TestIt { diff --git a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java index c7cf320ce..b9fc84dfb 100644 --- a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java @@ -70,7 +70,7 @@ public void after() { if (messengers != null) { messengers.forEach(e -> e.stop()); } - communications.forEach(e -> e.close(Duration.ofMillis(1))); + communications.forEach(e -> e.close(Duration.ofMillis(0))); } @Test @@ -140,7 +140,7 @@ public void broadcast() throws Exception { receiver.reset(); } } - communications.forEach(e -> e.close(Duration.ofMillis(1))); + communications.forEach(e -> e.close(Duration.ofMillis(0))); System.out.println(); diff --git a/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java b/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java index 3c2f6b81a..fc0ac9be5 100644 --- a/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java @@ -95,7 +95,7 @@ public Any ping(Any request) { assertFalse(pinged1.get()); assertTrue(pinged2.get()); } finally { - router.close(Duration.ofSeconds(5)); + router.close(Duration.ofSeconds(0)); } } } diff --git a/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java b/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java index bf9d9bf67..00db46be2 100644 --- a/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java @@ -105,7 +105,7 @@ public Any ping(Any request) { assertFalse(pinged1.get()); assertTrue(pinged2.get()); } finally { - router.close(Duration.ofSeconds(2)); + router.close(Duration.ofSeconds(0)); } } } diff --git a/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java b/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java index 500fd165f..b0abd0790 100644 --- a/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java @@ -103,7 +103,7 @@ public Any ping(Any request) { assertTrue(pinged1.get()); assertTrue(pinged2.get()); } finally { - router.close(Duration.ofSeconds(2)); + router.close(Duration.ofSeconds(0)); } } } diff --git a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java index c316dd9d7..6494cbecb 100644 --- a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java @@ -55,7 +55,7 @@ public class ContainmentDomainTest { public void after() { domains.forEach(Domain::stop); domains.clear(); - routers.forEach(r -> r.close(Duration.ofSeconds(100))); + routers.forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); } diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index 9b915ed9a..dd5f0503f 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -215,7 +215,7 @@ public static void smoke(Oracle oracle) throws Exception { public void after() { domains.forEach(Domain::stop); domains.clear(); - routers.forEach(r -> r.close(Duration.ofSeconds(1))); + routers.forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); } diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index cb9b466d1..beb936b33 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -60,7 +60,7 @@ public class FireFliesTest { public void after() { domains.forEach(n -> n.stop()); domains.clear(); - routers.values().forEach(r -> r.close(Duration.ofSeconds(1))); + routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); } diff --git a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java index 877e7661b..202d4c4a6 100644 --- a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java @@ -186,9 +186,9 @@ public void portal() throws Exception { msg = resultB.unpack(ByteMessage.class); assertEquals("Hello Server B", msg.getContents().toStringUtf8()); - portal.close(Duration.ofSeconds(1)); - router1.close(Duration.ofSeconds(1)); - router2.close(Duration.ofSeconds(1)); + portal.close(Duration.ofSeconds(0)); + router1.close(Duration.ofSeconds(0)); + router2.close(Duration.ofSeconds(0)); } @Test diff --git a/protocols/src/main/java/com/salesforce/apollo/comm/grpc/ForwardingManagedChannel.java b/protocols/src/main/java/com/salesforce/apollo/comm/grpc/ForwardingManagedChannel.java new file mode 100644 index 000000000..550eb705a --- /dev/null +++ b/protocols/src/main/java/com/salesforce/apollo/comm/grpc/ForwardingManagedChannel.java @@ -0,0 +1,68 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by FernFlower decompiler) +// + +package com.salesforce.apollo.comm.grpc; + +import com.google.common.base.MoreObjects; +import io.grpc.*; + +import java.util.concurrent.TimeUnit; + +public class ForwardingManagedChannel extends ManagedChannel { + private final ManagedChannel delegate; + + ForwardingManagedChannel(ManagedChannel delegate) { + this.delegate = delegate; + } + + public String authority() { + return this.delegate.authority(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return this.delegate.awaitTermination(timeout, unit); + } + + public void enterIdle() { + this.delegate.enterIdle(); + } + + public ConnectivityState getState(boolean requestConnection) { + return this.delegate.getState(requestConnection); + } + + public boolean isShutdown() { + return this.delegate.isShutdown(); + } + + public boolean isTerminated() { + return this.delegate.isTerminated(); + } + + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return this.delegate.newCall(methodDescriptor, callOptions); + } + + public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) { + this.delegate.notifyWhenStateChanged(source, callback); + } + + public void resetConnectBackoff() { + this.delegate.resetConnectBackoff(); + } + + public ManagedChannel shutdown() { + return this.delegate.shutdown(); + } + + public ManagedChannel shutdownNow() { + return this.delegate.shutdownNow(); + } + + public String toString() { + return MoreObjects.toStringHelper(this).add("delegate", this.delegate).toString(); + } +} diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index 3c8802818..028f4b67b 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -111,7 +111,7 @@ private static Txn initialInsert() { @AfterEach public void after() throws Exception { if (routers != null) { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); routers = null; } if (choams != null) { @@ -209,7 +209,7 @@ protected void post() throws Exception { .toList()); choams.values().forEach(e -> e.stop()); - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); final ULong target = updaters.values() .stream() .map(ssm -> ssm.getCurrentBlock()) diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java index 433ef9acb..e8cb71761 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java @@ -96,7 +96,7 @@ private static Txn initialInsert() { @AfterEach public void after() throws Exception { if (routers != null) { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); routers = null; } if (choams != null) { @@ -230,7 +230,7 @@ public void submitMultiplTxn() throws Exception { .toList()); } finally { choams.values().forEach(e -> e.stop()); - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); System.out.println("Final block height: " + members.stream() .map(m -> updaters.get(m)) diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java index 3a0926d20..18233c2bf 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java @@ -42,11 +42,11 @@ public class TestBinder { @AfterEach public void after() { if (serverRouter != null) { - serverRouter.close(Duration.ofMillis(1)); + serverRouter.close(Duration.ofMillis(0)); serverRouter = null; } if (clientRouter != null) { - clientRouter.close(Duration.ofMillis(1)); + clientRouter.close(Duration.ofMillis(0)); clientRouter = null; } } diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java index 9ba52e7b9..eab2a64a1 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java @@ -44,11 +44,11 @@ public class TestEventObserver { @AfterEach public void after() { if (serverRouter != null) { - serverRouter.close(Duration.ofSeconds(1)); + serverRouter.close(Duration.ofSeconds(0)); serverRouter = null; } if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(1)); + clientRouter.close(Duration.ofSeconds(0)); clientRouter = null; } } diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java index 0ac4b5ac1..3b69cf798 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java @@ -39,11 +39,11 @@ public class TestEventValidation { @AfterEach public void after() { if (serverRouter != null) { - serverRouter.close(Duration.ofSeconds(1)); + serverRouter.close(Duration.ofSeconds(0)); serverRouter = null; } if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(1)); + clientRouter.close(Duration.ofSeconds(0)); clientRouter = null; } } diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestKerlService.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestKerlService.java index 1c4721d54..e7d81f368 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestKerlService.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestKerlService.java @@ -51,11 +51,11 @@ public class TestKerlService { @AfterEach public void after() { if (serverRouter != null) { - serverRouter.close(Duration.ofSeconds(1)); + serverRouter.close(Duration.ofSeconds(0)); serverRouter = null; } if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(1)); + clientRouter.close(Duration.ofSeconds(0)); clientRouter = null; } } diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestResolver.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestResolver.java index 19da10932..705e23e77 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestResolver.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestResolver.java @@ -40,11 +40,11 @@ public class TestResolver { @AfterEach public void after() { if (serverRouter != null) { - serverRouter.close(Duration.ofSeconds(1)); + serverRouter.close(Duration.ofSeconds(0)); serverRouter = null; } if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(1)); + clientRouter.close(Duration.ofSeconds(0)); clientRouter = null; } } diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java index fd22fde20..a761af4d6 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java @@ -100,7 +100,7 @@ public static RotationEvent rotation(KeyPair prevNext, final Digest prevDigest, @AfterEach public void after() { - routers.values().forEach(r -> r.close(Duration.ofSeconds(2))); + routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); dhts.values().forEach(t -> t.stop()); dhts.clear(); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java index 208189545..e1021f3be 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java @@ -50,7 +50,7 @@ public class BootstrappingTest extends AbstractDhtTest { @AfterEach public void closeClient() throws Exception { if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(3)); + clientRouter.close(Duration.ofSeconds(0)); } } diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java index 71fb3662a..62fa3dbaf 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java @@ -54,7 +54,7 @@ public class DhtRebalanceTest { @AfterEach public void afterIt() throws Exception { - routers.values().forEach(r -> r.close(Duration.ofSeconds(1))); + routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); dhts.clear(); contexts.clear(); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/PublisherTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/PublisherTest.java index 485fe03ac..f74821219 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/PublisherTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/PublisherTest.java @@ -79,8 +79,8 @@ public void smokin() throws Exception { client.publish(KERL_.getDefaultInstance(), Collections.emptyList()); client.publishEvents(Collections.emptyList(), Collections.emptyList()); } finally { - clientRouter.close(Duration.ofSeconds(1)); - serverRouter.close(Duration.ofSeconds(1)); + clientRouter.close(Duration.ofSeconds(0)); + serverRouter.close(Duration.ofSeconds(0)); } }