Skip to content

Commit

Permalink
global platform threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 6, 2024
1 parent e14371d commit 890d549
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public Digest getAgent() {
public Digest getFrom() {
return Constants.SERVER_CLIENT_ID_KEY.get();
}
}, contextRegistration, validator, executor);
}, contextRegistration, validator);
}

private ManagedChannel connectTo(Member to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Predicate;
import java.util.function.Supplier;

Expand All @@ -37,8 +38,16 @@
* @author hal.hildebrand
*/
public class LocalServer implements RouterSupplier {
private static final Logger log = LoggerFactory.getLogger(LocalServer.class);
private static final String NAME_TEMPLATE = "%s-%s";
private static final Logger log = LoggerFactory.getLogger(LocalServer.class);
private static final String NAME_TEMPLATE = "%s-%s";
private static final ExecutorService PLATFORM;

static {
PLATFORM = Executors.newCachedThreadPool();
var platform = (ThreadPoolExecutor) PLATFORM;
platform.setCorePoolSize(Runtime.getRuntime().availableProcessors());
platform.prestartAllCoreThreads();
}

private final ClientInterceptor clientInterceptor;
private final Member from;
Expand Down Expand Up @@ -67,12 +76,11 @@ public Member getFrom() {
return from;
}

@Override
public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Limit> serverLimit,
LimitsRegistry limitsRegistry, List<ServerInterceptor> interceptors,
Predicate<FernetServerInterceptor.HashedToken> validator, ExecutorService executor) {
if (executor == null) {
executor = Executors.newVirtualThreadPerTaskExecutor();
executor = PLATFORM;
}
String name = String.format(NAME_TEMPLATE, prefix, qb64(from.getId()));
var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get());
Expand All @@ -97,7 +105,7 @@ public Digest getFrom() {
return Constants.SERVER_CLIENT_ID_KEY.get();
}
}, d -> {
}, validator, executor);
}, validator);
}

private ManagedChannel connectTo(Member to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public Digest getFrom() {
}
};
return new RouterImpl(from, serverBuilder, cacheBuilder.setFactory(t -> connectTo(t)), identity, c -> {
}, validator, executor);
}, validator);
}

private ManagedChannel connectTo(Member to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
Expand Down Expand Up @@ -52,36 +50,27 @@ public class RouterImpl implements Router {
private final Map<String, RoutableService<?>> services = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
private final Predicate<FernetServerInterceptor.HashedToken> validator;
private final ExecutorService executor;

public RouterImpl(Member from, ServerBuilder<?> serverBuilder, ServerConnectionCache.Builder cacheBuilder,
ClientIdentity clientIdentityProvider) {
this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> {
}, Executors.newVirtualThreadPerTaskExecutor());
}

public RouterImpl(Member from, ServerBuilder<?> serverBuilder, ServerConnectionCache.Builder cacheBuilder,
ClientIdentity clientIdentityProvider, ExecutorService executor) {
this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> {
}, executor);
});
}

public RouterImpl(Member from, ServerBuilder<?> serverBuilder, ServerConnectionCache.Builder cacheBuilder,
ClientIdentity clientIdentityProvider, Consumer<Digest> contextRegistration,
ExecutorService executor) {
this(from, serverBuilder, cacheBuilder, clientIdentityProvider, contextRegistration, null, executor);
ClientIdentity clientIdentityProvider, Consumer<Digest> contextRegistration) {
this(from, serverBuilder, cacheBuilder, clientIdentityProvider, contextRegistration, null);
}

public RouterImpl(Member from, ServerBuilder<?> serverBuilder, ServerConnectionCache.Builder cacheBuilder,
ClientIdentity clientIdentityProvider, Consumer<Digest> contextRegistration,
Predicate<FernetServerInterceptor.HashedToken> validator, ExecutorService executor) {
Predicate<FernetServerInterceptor.HashedToken> validator) {
this.server = serverBuilder.fallbackHandlerRegistry(registry).intercept(serverInterceptor()).build();
this.cache = cacheBuilder.clone().setMember(from.getId()).build();
this.clientIdentityProvider = clientIdentityProvider;
this.contextRegistration = contextRegistration;
this.from = from;
this.validator = validator;
this.executor = executor;
}

public static ClientInterceptor clientInterceptor(Digest ctx) {
Expand Down Expand Up @@ -135,7 +124,6 @@ public void close(Duration await) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}

@Override
Expand Down

0 comments on commit 890d549

Please sign in to comment.