diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java index 08a830894..a7a9c6548 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java @@ -109,7 +109,7 @@ public Digest getAgent() { public Digest getFrom() { return Constants.SERVER_CLIENT_ID_KEY.get(); } - }, contextRegistration, validator, executor); + }, contextRegistration, validator); } private ManagedChannel connectTo(Member to) { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java index 781217edb..822b4c79f 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Predicate; import java.util.function.Supplier; @@ -37,8 +38,16 @@ * @author hal.hildebrand */ public class LocalServer implements RouterSupplier { - private static final Logger log = LoggerFactory.getLogger(LocalServer.class); - private static final String NAME_TEMPLATE = "%s-%s"; + private static final Logger log = LoggerFactory.getLogger(LocalServer.class); + private static final String NAME_TEMPLATE = "%s-%s"; + private static final ExecutorService PLATFORM; + + static { + PLATFORM = Executors.newCachedThreadPool(); + var platform = (ThreadPoolExecutor) PLATFORM; + platform.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + platform.prestartAllCoreThreads(); + } private final ClientInterceptor clientInterceptor; private final Member from; @@ -67,12 +76,11 @@ public Member getFrom() { return from; } - @Override public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry, List interceptors, Predicate validator, ExecutorService executor) { if (executor == null) { - executor = Executors.newVirtualThreadPerTaskExecutor(); + executor = PLATFORM; } String name = String.format(NAME_TEMPLATE, prefix, qb64(from.getId())); var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get()); @@ -97,7 +105,7 @@ public Digest getFrom() { return Constants.SERVER_CLIENT_ID_KEY.get(); } }, d -> { - }, validator, executor); + }, validator); } private ManagedChannel connectTo(Member to) { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java index 540d41119..88e79fc34 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java @@ -175,7 +175,7 @@ public Digest getFrom() { } }; return new RouterImpl(from, serverBuilder, cacheBuilder.setFactory(t -> connectTo(t)), identity, c -> { - }, validator, executor); + }, validator); } private ManagedChannel connectTo(Member to) { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java index 57705f081..b2f9d3734 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java @@ -23,8 +23,6 @@ import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -52,36 +50,27 @@ public class RouterImpl implements Router { private final Map> services = new ConcurrentHashMap<>(); private final AtomicBoolean started = new AtomicBoolean(); private final Predicate validator; - private final ExecutorService executor; public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, ClientIdentity clientIdentityProvider) { this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> { - }, Executors.newVirtualThreadPerTaskExecutor()); - } - - public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, - ClientIdentity clientIdentityProvider, ExecutorService executor) { - this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> { - }, executor); + }); } public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, - ClientIdentity clientIdentityProvider, Consumer contextRegistration, - ExecutorService executor) { - this(from, serverBuilder, cacheBuilder, clientIdentityProvider, contextRegistration, null, executor); + ClientIdentity clientIdentityProvider, Consumer contextRegistration) { + this(from, serverBuilder, cacheBuilder, clientIdentityProvider, contextRegistration, null); } public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, ClientIdentity clientIdentityProvider, Consumer contextRegistration, - Predicate validator, ExecutorService executor) { + Predicate validator) { this.server = serverBuilder.fallbackHandlerRegistry(registry).intercept(serverInterceptor()).build(); this.cache = cacheBuilder.clone().setMember(from.getId()).build(); this.clientIdentityProvider = clientIdentityProvider; this.contextRegistration = contextRegistration; this.from = from; this.validator = validator; - this.executor = executor; } public static ClientInterceptor clientInterceptor(Digest ctx) { @@ -135,7 +124,6 @@ public void close(Duration await) { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - executor.shutdown(); } @Override