Skip to content

Commit

Permalink
Saner resource management. Don't set exec on clients.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 4, 2024
1 parent 04c6275 commit 5a926c5
Show file tree
Hide file tree
Showing 44 changed files with 321 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void tearDown() throws Exception {
choams = null;
}
if (routers != null) {
routers.values().forEach(e -> e.close(Duration.ofSeconds(1)));
routers.values().forEach(e -> e.close(Duration.ofSeconds(0)));
routers = null;
}
members = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public Block reconfigure(Map<Digest, Join> joining, Digest nextViewId, HashedBlo
genii.values().forEach(GenesisAssembly::start);
complete.await(15, TimeUnit.SECONDS);
} finally {
communications.values().forEach(r -> r.close(Duration.ofSeconds(1)));
communications.values().forEach(r -> r.close(Duration.ofSeconds(0)));
genii.values().forEach(GenesisAssembly::stop);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private void shutdown() {
choams = null;
}
if (routers != null) {
routers.values().forEach(e -> e.close(Duration.ofSeconds(1)));
routers.values().forEach(e -> e.close(Duration.ofSeconds(0)));
routers = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class TestCHOAM {
@AfterEach
public void after() throws Exception {
if (routers != null) {
routers.values().forEach(e -> e.close(Duration.ofSeconds(1)));
routers.values().forEach(e -> e.close(Duration.ofSeconds(0)));
routers = null;
}
if (choams != null) {
Expand Down Expand Up @@ -197,7 +197,7 @@ public void submitMultiplTxn() throws Exception {
.filter(i -> i < max)
.count());
} finally {
routers.values().forEach(e -> e.close(Duration.ofSeconds(1)));
routers.values().forEach(e -> e.close(Duration.ofSeconds(0)));
choams.values().forEach(e -> e.stop());

System.out.println();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void unbounded() throws NoSuchAlgorithmException, InterruptedException, I
} finally {
controllers.forEach(Ethereal::stop);
gossipers.forEach(ChRbcGossip::stop);
comms.forEach(e -> e.close(Duration.ofSeconds(1)));
comms.forEach(e -> e.close(Duration.ofSeconds(0)));
}

final var expected = expectedEpochs * (EPOCH_LENGTH - 1);
Expand Down Expand Up @@ -287,7 +287,7 @@ private void one(int iteration)
controllers.forEach(c -> System.out.println(c.dump()));
controllers.forEach(Ethereal::stop);
gossipers.forEach(ChRbcGossip::stop);
comms.forEach(e -> e.close(Duration.ofSeconds(1)));
comms.forEach(e -> e.close(Duration.ofSeconds(0)));
}

final var expected = NUM_EPOCHS * (EPOCH_LENGTH - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ public void after() {
views.clear();
}

communications.forEach(e -> e.close(Duration.ofSeconds(1)));
communications.forEach(e -> e.close(Duration.ofSeconds(0)));
communications.clear();

gateways.forEach(e -> e.close(Duration.ofSeconds(1)));
gateways.forEach(e -> e.close(Duration.ofSeconds(0)));
gateways.clear();
}

Expand Down Expand Up @@ -210,8 +210,8 @@ public void churn() throws Exception {
for (int j = c.size() - 1; j >= c.size() - delta; j--) {
final var view = c.get(j);
view.stop();
r.get(j).close(Duration.ofSeconds(1));
g.get(j).close(Duration.ofSeconds(1));
r.get(j).close(Duration.ofSeconds(0));
g.get(j).close(Duration.ofSeconds(0));
removed.add(view.getNode().getId());
}
c = c.subList(0, c.size() - delta);
Expand Down Expand Up @@ -239,7 +239,7 @@ public void churn() throws Exception {
}

views.forEach(e -> e.stop());
communications.forEach(e -> e.close(Duration.ofSeconds(1)));
communications.forEach(e -> e.close(Duration.ofSeconds(0)));

System.out.println();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public void after() {
views.clear();
}

communications.forEach(e -> e.close(Duration.ofSeconds(1)));
communications.forEach(e -> e.close(Duration.ofSeconds(0)));
communications.clear();

gateways.forEach(e -> e.close(Duration.ofSeconds(1)));
gateways.forEach(e -> e.close(Duration.ofSeconds(0)));
gateways.clear();
}

Expand Down Expand Up @@ -209,7 +209,7 @@ private void initialize() {
}

private void post() {
communications.forEach(e -> e.close(Duration.ofSeconds(1)));
communications.forEach(e -> e.close(Duration.ofSeconds(0)));
views.forEach(view -> view.stop());
System.out.println("Node 0 metrics");
ConsoleReporter.forRegistry(node0Registry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public void clientSmoke() throws Exception {

var invitation = gorgoneionClient.apply(Duration.ofSeconds(60));

gorgonRouter.close(Duration.ofSeconds(1));
clientRouter.close(Duration.ofSeconds(1));
gorgonRouter.close(Duration.ofSeconds(0));
clientRouter.close(Duration.ofSeconds(0));

assertNotNull(invitation);
assertNotEquals(Validations.getDefaultInstance(), invitation);
Expand All @@ -113,10 +113,10 @@ public void clientSmoke() throws Exception {
@AfterEach
public void closeRouters() {
if (gorgonRouter != null) {
gorgonRouter.close(Duration.ofSeconds(3));
gorgonRouter.close(Duration.ofSeconds(0));
}
if (clientRouter != null) {
clientRouter.close(Duration.ofSeconds(3));
clientRouter.close(Duration.ofSeconds(0));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public void smokin() throws Exception {
.build())
.setNonce(fs)
.build(), Duration.ofSeconds(1));
gorgonRouter.close(Duration.ofSeconds(1));
clientRouter.close(Duration.ofSeconds(1));
gorgonRouter.close(Duration.ofSeconds(0));
clientRouter.close(Duration.ofSeconds(0));
assertNotNull(invitation);
assertNotEquals(Validations.getDefaultInstance(), invitation);
assertEquals(1, invitation.getValidationsCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public static void smoke(Oracle oracle) throws Exception {
public void after() {
domains.forEach(n -> n.stop());
domains.clear();
routers.values().forEach(r -> r.close(Duration.ofSeconds(1)));
routers.values().forEach(r -> r.close(Duration.ofSeconds(0)));
routers.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class LeydenJarTest {

@AfterEach
public void after() {
routers.values().forEach(r -> r.close(Duration.ofSeconds(2)));
routers.values().forEach(r -> r.close(Duration.ofSeconds(0)));
routers.clear();
dhts.values().forEach(t -> t.stop());
dhts.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand All @@ -46,7 +46,6 @@ public class Enclave implements RouterSupplier {
private final static Class<? extends io.netty.channel.Channel> channelType = IMPL.getChannelType();
private static final Logger log = LoggerFactory.getLogger(Enclave.class);

private final Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final DomainSocketAddress bridge;
private final Consumer<Digest> contextRegistration;
private final DomainSocketAddress endpoint;
Expand All @@ -63,10 +62,6 @@ public Enclave(Member from, DomainSocketAddress endpoint, DomainSocketAddress br
this.fromString = qb64(from.getId());
}

public void close() {
eventLoopGroup.shutdownGracefully();
}

/**
* @return the DomainSocketAddress for this Enclave
*/
Expand All @@ -77,7 +72,10 @@ public DomainSocketAddress getEndpoint() {
@Override
public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Limit> serverLimit,
LimitsRegistry limitsRegistry, List<ServerInterceptor> interceptors,
Predicate<FernetServerInterceptor.HashedToken> validator) {
Predicate<FernetServerInterceptor.HashedToken> validator, ExecutorService executor) {
if (executor == null) {
executor = Executors.newVirtualThreadPerTaskExecutor();
}
var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get());
if (limitsRegistry != null) {
limitsBuilder.metricRegistry(limitsRegistry);
Expand Down Expand Up @@ -111,7 +109,7 @@ public Digest getAgent() {
public Digest getFrom() {
return Constants.SERVER_CLIENT_ID_KEY.get();
}
}, contextRegistration, validator);
}, contextRegistration, validator, executor);
}

private ManagedChannel connectTo(Member to) {
Expand All @@ -132,7 +130,6 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
};
final var builder = NettyChannelBuilder.forAddress(bridge)
.withOption(ChannelOption.TCP_NODELAY, true)
.executor(executor)
.eventLoopGroup(eventLoopGroup)
.channelType(channelType)
.usePlaintext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import java.util.function.Supplier;

Expand All @@ -39,7 +40,6 @@ public class LocalServer implements RouterSupplier {
private static final Logger log = LoggerFactory.getLogger(LocalServer.class);
private static final String NAME_TEMPLATE = "%s-%s";

private final Executor executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
private final ClientInterceptor clientInterceptor;
private final Member from;
private final String prefix;
Expand Down Expand Up @@ -70,15 +70,17 @@ public Member getFrom() {
@Override
public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Limit> serverLimit,
LimitsRegistry limitsRegistry, List<ServerInterceptor> interceptors,
Predicate<FernetServerInterceptor.HashedToken> validator) {
Predicate<FernetServerInterceptor.HashedToken> validator, ExecutorService executor) {
if (executor == null) {
executor = Executors.newVirtualThreadPerTaskExecutor();
}
String name = String.format(NAME_TEMPLATE, prefix, qb64(from.getId()));
var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get());
if (limitsRegistry != null) {
limitsBuilder.metricRegistry(limitsRegistry);
}
ServerBuilder<?> serverBuilder = InProcessServerBuilder.forName(name)
.executor(
UnsafeExecutors.newVirtualThreadPerTaskExecutor())
.executor(executor)
.intercept(ConcurrencyLimitServerInterceptor.newBuilder(
limitsBuilder.build())
.statusSupplier(
Expand All @@ -95,13 +97,12 @@ public Digest getFrom() {
return Constants.SERVER_CLIENT_ID_KEY.get();
}
}, d -> {
}, validator);
}, validator, executor);
}

private ManagedChannel connectTo(Member to) {
final var name = String.format(NAME_TEMPLATE, prefix, qb64(to.getId()));
final InProcessChannelBuilder builder = InProcessChannelBuilder.forName(name)
.executor(executor)
.usePlaintext()
.intercept(clientInterceptor);
disableTrash(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.netty.handler.ssl.ClientAuth;

import java.net.SocketAddress;
import java.util.concurrent.Executor;

/**
* @author hal.hildebrand
Expand All @@ -29,11 +28,10 @@ public class MtlsClient {
private final ManagedChannel channel;

public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, ClientContextSupplier supplier,
CertificateValidator validator, Executor executor) {
CertificateValidator validator) {

Limiter<GrpcClientRequestContext> limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build();
channel = NettyChannelBuilder.forAddress(address)
.executor(executor)
.withOption(ChannelOption.TCP_NODELAY, true)
.sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3))
.intercept(new ConcurrencyLimitClientInterceptor(limiter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -63,15 +63,13 @@ public class MtlsServer implements RouterSupplier {
private final Member from;
private final Context.Key<SSLSession> sslSessionContext = Context.key("SSLSession");
private final ServerContextSupplier supplier;
private final Executor executor;

public MtlsServer(Member from, EndpointProvider epProvider, Function<Member, ClientContextSupplier> contextSupplier,
ServerContextSupplier supplier) {
this.from = from;
this.epProvider = epProvider;
this.contextSupplier = contextSupplier;
this.supplier = supplier;
this.executor = Executors.newVirtualThreadPerTaskExecutor();
cachedMembership = CacheBuilder.newBuilder().build(new CacheLoader<X509Certificate, Digest>() {
@Override
public Digest load(X509Certificate key) throws Exception {
Expand Down Expand Up @@ -142,7 +140,10 @@ public static SslContext forServer(ClientAuth clientAuth, String alias, X509Cert
@Override
public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Limit> serverLimit,
LimitsRegistry limitsRegistry, List<ServerInterceptor> interceptors,
Predicate<FernetServerInterceptor.HashedToken> validator) {
Predicate<FernetServerInterceptor.HashedToken> validator, ExecutorService executor) {
if (executor == null) {
executor = Executors.newVirtualThreadPerTaskExecutor();
}
var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get());
if (limitsRegistry != null) {
limitsBuilder.metricRegistry(limitsRegistry);
Expand Down Expand Up @@ -174,14 +175,14 @@ public Digest getFrom() {
}
};
return new RouterImpl(from, serverBuilder, cacheBuilder.setFactory(t -> connectTo(t)), identity, c -> {
}, validator);
}, validator, executor);
}

private ManagedChannel connectTo(Member to) {
var address = epProvider.addressFor(to);
log.debug("Connecting to: {} address: {} on: {}", to.getId(), address, from.getId());
return new MtlsClient(address, epProvider.getClientAuth(), epProvider.getAlias(), contextSupplier.apply(from),
epProvider.getValidator(), executor).getChannel();
epProvider.getValidator()).getChannel();
}

private X509Certificate getCert() {
Expand Down
Loading

0 comments on commit 5a926c5

Please sign in to comment.