From 6a91c8f63bd0faa35382ec7c2e0178a2ea4f5272 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sun, 5 Nov 2023 09:16:26 -0800 Subject: [PATCH] set explicit executors to netty client/server builders --- .../client/GorgoneionClientTest.java | 2 +- .../apollo/gorgoneion/GorgoneionTest.java | 2 +- .../apollo/demesnes/DemesneSmoke.java | 81 +++---- .../apollo/demesnes/FireFliesTrace.java | 2 +- .../apollo/archipelago/Enclave.java | 2 +- .../salesforce/apollo/archipelago/Portal.java | 42 ++-- .../apollo/archipeligo/DemultiplexerTest.java | 127 +++++----- .../apollo/archipeligo/EnclaveTest.java | 219 +++++++++--------- .../com/salesforce/apollo/model/Domain.java | 138 ++++++----- .../apollo/model/ProcessDomain.java | 14 +- .../apollo/model/demesnes/DemesneImpl.java | 5 +- .../apollo/model/FireFliesTest.java | 2 +- .../apollo/model/StoredProceduresTest.java | 2 +- .../model/delphinius/ShardedOracleTest.java | 2 +- .../apollo/model/demesnes/DemesneTest.java | 119 +++++----- .../model/stereotomy/ShardedKERLTest.java | 4 +- ...ConcurrencyLimitClientInterceptorTest.java | 2 +- .../limiter/LifoBlockingLimiterTest.java | 2 +- .../apollo/state/AbstractLifecycleTest.java | 2 +- .../apollo/state/EmulationTest.java | 2 +- 20 files changed, 374 insertions(+), 397 deletions(-) diff --git a/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java b/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java index 7f4dcbe8e..fdecd4bbc 100644 --- a/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java +++ b/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java @@ -71,7 +71,7 @@ public void clientSmoke() throws Exception { final var parameters = Parameters.newBuilder().setKerl(kerl).build(); @SuppressWarnings("unused") var gorgon = new Gorgoneion(parameters, member, context, observer, gorgonRouter, - Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()), null); + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), null); // The registering client var client = new ControlledIdentifierMember(stereotomy.newIdentifier()); diff --git a/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java b/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java index cd06581ea..52e153bdb 100644 --- a/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java +++ b/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java @@ -61,7 +61,7 @@ public void smokin() throws Exception { @SuppressWarnings("unused") var gorgon = new Gorgoneion(Parameters.newBuilder().setKerl(kerl).build(), member, context, observer, gorgonRouter, - Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()), null); + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), null); // The registering client var client = new ControlledIdentifierMember(stereotomy.newIdentifier()); diff --git a/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java b/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java index d98dc3e73..3c6975782 100644 --- a/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java +++ b/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java @@ -55,6 +55,8 @@ import java.util.Collections; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static com.salesforce.apollo.comm.grpc.DomainSockets.*; @@ -65,9 +67,10 @@ */ public class DemesneSmoke { - private final static Class clientChannelType = getChannelType(); + private final static Class clientChannelType = getChannelType(); private static final Class serverChannelType = getServerDomainSocketChannelClass(); - private EventLoopGroup eventLoopGroup; + private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private EventLoopGroup eventLoopGroup; public static ClientInterceptor clientInterceptor(Digest ctx) { return new ClientInterceptor() { @@ -118,15 +121,16 @@ public void smokin() throws Exception { Member serverMember = new ControlledIdentifierMember(identifier); final var portalAddress = UUID.randomUUID().toString(); final var portalEndpoint = new DomainSocketAddress(commDirectory.resolve(portalAddress).toFile()); - final var router = new RouterImpl(serverMember, - NettyServerBuilder.forAddress(portalEndpoint) - .protocolNegotiator(new DomainSocketNegotiator()) - .channelType(serverChannelType) - .workerEventLoopGroup(eventLoopGroup) - .bossEventLoopGroup(eventLoopGroup) - .intercept(new DomainSocketServerInterceptor()), - ServerConnectionCache.newBuilder().setFactory(to -> handler(portalEndpoint)), - null); + final var router = new RouterImpl(serverMember, NettyServerBuilder.forAddress(portalEndpoint) + .protocolNegotiator( + new DomainSocketNegotiator()) + .channelType(serverChannelType) + .workerEventLoopGroup(eventLoopGroup) + .bossEventLoopGroup(eventLoopGroup) + .intercept( + new DomainSocketServerInterceptor()), + ServerConnectionCache.newBuilder().setFactory(to -> handler(portalEndpoint)), + null); router.start(); final var registered = new TreeSet(); @@ -149,30 +153,30 @@ public void register(SubContext context) { final var kerlServer = new DemesneKERLServer(new ProtoKERLAdapter(kerl), null); final var outerService = new OuterContextServer(service, null); final var outerContextService = NettyServerBuilder.forAddress(parentEndpoint) - .protocolNegotiator(new DomainSocketNegotiator()) - .channelType(getServerDomainSocketChannelClass()) - .addService(kerlServer) - .addService(outerService) - .workerEventLoopGroup(getEventLoopGroup()) - .bossEventLoopGroup(getEventLoopGroup()) - .intercept(new DomainSocketServerInterceptor()) - .build(); + .protocolNegotiator(new DomainSocketNegotiator()) + .channelType(getServerDomainSocketChannelClass()) + .addService(kerlServer) + .addService(outerService) + .workerEventLoopGroup(getEventLoopGroup()) + .bossEventLoopGroup(getEventLoopGroup()) + .intercept(new DomainSocketServerInterceptor()) + .build(); outerContextService.start(); final var parameters = DemesneParameters.newBuilder() - .setContext(context.toDigeste()) - .setPortal(portalAddress) - .setParent(parentAddress) - .setCommDirectory(commDirectory.toString()) - .setMaxTransfer(100) - .setFalsePositiveRate(.125) - .build(); + .setContext(context.toDigeste()) + .setPortal(portalAddress) + .setParent(parentAddress) + .setCommDirectory(commDirectory.toString()) + .setMaxTransfer(100) + .setFalsePositiveRate(.125) + .build(); final var demesne = new DemesneImpl(parameters); Builder specification = IdentifierSpecification.newBuilder(); final var incp = demesne.inception(identifier.getIdentifier().toIdent(), specification); final var seal = Seal.EventSeal.construct(incp.getIdentifier(), incp.hash(controller.digestAlgorithm()), - incp.getSequenceNumber().longValue()); + incp.getSequenceNumber().longValue()); final var builder = InteractionSpecification.newBuilder().addAllSeals(Collections.singletonList(seal)); @@ -186,11 +190,12 @@ public void register(SubContext context) { private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) - .eventLoopGroup(eventLoopGroup) - .channelType(clientChannelType) - .keepAliveTime(1, TimeUnit.SECONDS) - .usePlaintext() - .build(); + .executor(executor) + .eventLoopGroup(eventLoopGroup) + .channelType(clientChannelType) + .keepAliveTime(1, TimeUnit.SECONDS) + .usePlaintext() + .build(); } public static interface TestIt { @@ -215,7 +220,7 @@ public void ping(Any request, StreamObserver responseObserver) { } public static class TestItClient implements TestItService { - private final TestItBlockingStub client; + private final TestItBlockingStub client; private final ManagedServerChannel connection; public TestItClient(ManagedServerChannel c) { @@ -242,9 +247,8 @@ public Any ping(Any request) { public class ServerA implements TestIt { @Override public void ping(Any request, StreamObserver responseObserver) { - responseObserver.onNext(Any.pack(ByteMessage.newBuilder() - .setContents(ByteString.copyFromUtf8("Hello Server A")) - .build())); + responseObserver.onNext( + Any.pack(ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8("Hello Server A")).build())); responseObserver.onCompleted(); } } @@ -252,9 +256,8 @@ public void ping(Any request, StreamObserver responseObserver) { public class ServerB implements TestIt { @Override public void ping(Any request, StreamObserver responseObserver) { - responseObserver.onNext(Any.pack(ByteMessage.newBuilder() - .setContents(ByteString.copyFromUtf8("Hello Server B")) - .build())); + responseObserver.onNext( + Any.pack(ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8("Hello Server B")).build())); responseObserver.onCompleted(); } } diff --git a/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java b/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java index c235ef247..4e6cbd3d9 100644 --- a/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java +++ b/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java @@ -201,7 +201,7 @@ public void before() throws Exception { identities.keySet().forEach(d -> foundation.addMembership(d.toDigeste())); var sealed = FoundationSeal.newBuilder().setFoundation(foundation).build(); TransactionConfiguration txnConfig = new TransactionConfiguration( - Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory())); + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())); identities.forEach((digest, id) -> { var context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3); final var member = new ControlledIdentifierMember(id); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java index 2cf277808..e94d5828b 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java @@ -122,10 +122,10 @@ public void start(Listener responseListener, Metadata headers) { } }; final var builder = NettyChannelBuilder.forAddress(bridge) + .executor(executor) .eventLoopGroup(eventLoopGroup) .channelType(channelType) .usePlaintext() - .executor(executor) .intercept(clientInterceptor); return builder.build(); } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java index 9a3ba810f..294f80ab7 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java @@ -6,48 +6,36 @@ */ package com.salesforce.apollo.archipelago; -import static com.salesforce.apollo.comm.grpc.DomainSockets.getChannelType; -import static com.salesforce.apollo.comm.grpc.DomainSockets.getEventLoopGroup; -import static com.salesforce.apollo.comm.grpc.DomainSockets.getServerDomainSocketChannelClass; - -import java.io.IOException; -import java.time.Duration; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - import com.salesforce.apollo.comm.grpc.DomainSocketServerInterceptor; import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.QualifiedBase64; import com.salesforce.apollo.membership.Member; - -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; +import io.grpc.*; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; -import io.grpc.ManagedChannel; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.ServerBuilder; import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static com.salesforce.apollo.comm.grpc.DomainSockets.*; + /** - * Local "service mesh" for in process Isolate Enclaves. The Portal provides the - * externally visible GRPC endpoint that all enclaves are multiplexed through. - * The Portal also serves as the exit point from the process that all Isolate + * Local "service mesh" for in process Isolate Enclaves. The Portal provides the externally visible GRPC endpoint that + * all enclaves are multiplexed through. The Portal also serves as the exit point from the process that all Isolate * Enclaves use to talk to each other and Enclaves in other processes * * @author hal.hildebrand - * */ public class Portal { - private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); private final static Class channelType = getChannelType(); private final String agent; @@ -57,8 +45,7 @@ public class Portal { private final Demultiplexer outbound; public Portal(Digest agent, ServerBuilder inbound, Function outbound, - DomainSocketAddress bridge, Duration keepAlive, - Function router) { + DomainSocketAddress bridge, Duration keepAlive, Function router) { this.inbound = new Demultiplexer(inbound, Router.METADATA_CONTEXT_KEY, d -> handler(router.apply(d))); this.outbound = new Demultiplexer(NettyServerBuilder.forAddress(bridge) .executor(executor) @@ -98,6 +85,7 @@ public void start(Listener responseListener, Metadata headers) { } }; return NettyChannelBuilder.forAddress(address) + .executor(executor) .eventLoopGroup(eventLoopGroup) .channelType(channelType) .keepAliveTime(keepAlive.toNanos(), TimeUnit.NANOSECONDS) diff --git a/memberships/src/test/java/com/salesforce/apollo/archipeligo/DemultiplexerTest.java b/memberships/src/test/java/com/salesforce/apollo/archipeligo/DemultiplexerTest.java index a42a796e7..29ef6bebb 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipeligo/DemultiplexerTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipeligo/DemultiplexerTest.java @@ -6,30 +6,6 @@ */ package com.salesforce.apollo.archipeligo; -import static com.salesforce.apollo.archipelago.RouterImpl.clientInterceptor; -import static com.salesforce.apollo.comm.grpc.DomainSocketServerInterceptor.PEER_CREDENTIALS_CONTEXT_KEY; -import static com.salesforce.apollo.comm.grpc.DomainSockets.getChannelType; -import static com.salesforce.apollo.comm.grpc.DomainSockets.getEventLoopGroup; -import static com.salesforce.apollo.comm.grpc.DomainSockets.getServerDomainSocketChannelClass; -import static com.salesforce.apollo.crypto.QualifiedBase64.qb64; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; - import com.google.common.primitives.Ints; import com.google.protobuf.Any; import com.google.protobuf.ByteString; @@ -41,7 +17,6 @@ import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.comm.grpc.DomainSocketServerInterceptor; import com.salesforce.apollo.crypto.DigestAlgorithm; - import io.grpc.ManagedChannel; import io.grpc.Server; import io.grpc.Status; @@ -54,52 +29,40 @@ import io.grpc.stub.StreamObserver; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static com.salesforce.apollo.archipelago.RouterImpl.clientInterceptor; +import static com.salesforce.apollo.comm.grpc.DomainSocketServerInterceptor.PEER_CREDENTIALS_CONTEXT_KEY; +import static com.salesforce.apollo.comm.grpc.DomainSockets.*; +import static com.salesforce.apollo.crypto.QualifiedBase64.qb64; +import static org.junit.jupiter.api.Assertions.*; /** * @author hal.hildebrand - * */ public class DemultiplexerTest { - public static class ServerA extends TestItImplBase { - @Override - public void ping(Any request, StreamObserver responseObserver) { - final var credentials = PEER_CREDENTIALS_CONTEXT_KEY.get(); - if (credentials == null) { - responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("No credentials available"))); - return; - } - responseObserver.onNext(Any.pack(PeerCreds.newBuilder() - .setPid(credentials.pid()) - .setUid(credentials.uid()) - .addAllGids(Ints.asList(credentials.gids())) - .build())); - responseObserver.onCompleted(); - } - } - - public static class ServerB extends TestItImplBase { - @Override - public void ping(Any request, StreamObserver responseObserver) { - final var credentials = PEER_CREDENTIALS_CONTEXT_KEY.get(); - if (credentials == null) { - responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("No credentials available"))); - return; - } - responseObserver.onNext(Any.pack(ByteMessage.newBuilder() - .setContents(ByteString.copyFromUtf8("Hello Server")) - .build())); - responseObserver.onCompleted(); - } - } - - private static final Class channelType = getChannelType(); - - private final EventLoopGroup eventLoopGroup = getEventLoopGroup(); - private final List opened = new ArrayList<>(); - private Server serverA; - private Server serverB; - private Demultiplexer terminus; + private static final Class channelType = getChannelType(); + private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final EventLoopGroup eventLoopGroup = getEventLoopGroup(); + private final List opened = new ArrayList<>(); + private Server serverA; + private Server serverB; + private Demultiplexer terminus; @AfterEach public void after() throws InterruptedException { @@ -152,6 +115,7 @@ public void smokin() throws Exception { private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) + .executor(executor) .eventLoopGroup(eventLoopGroup) .channelType(channelType) .keepAliveTime(1, TimeUnit.SECONDS) @@ -194,4 +158,37 @@ private DomainSocketAddress serverB() throws IOException { serverB.start(); return address; } + + public static class ServerA extends TestItImplBase { + @Override + public void ping(Any request, StreamObserver responseObserver) { + final var credentials = PEER_CREDENTIALS_CONTEXT_KEY.get(); + if (credentials == null) { + responseObserver.onError( + new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("No credentials available"))); + return; + } + responseObserver.onNext(Any.pack(PeerCreds.newBuilder() + .setPid(credentials.pid()) + .setUid(credentials.uid()) + .addAllGids(Ints.asList(credentials.gids())) + .build())); + responseObserver.onCompleted(); + } + } + + public static class ServerB extends TestItImplBase { + @Override + public void ping(Any request, StreamObserver responseObserver) { + final var credentials = PEER_CREDENTIALS_CONTEXT_KEY.get(); + if (credentials == null) { + responseObserver.onError( + new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("No credentials available"))); + return; + } + responseObserver.onNext( + Any.pack(ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8("Hello Server")).build())); + responseObserver.onCompleted(); + } + } } diff --git a/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java b/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java index ab683de83..5ca709ce5 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java @@ -6,44 +6,19 @@ */ package com.salesforce.apollo.archipeligo; -import static com.salesforce.apollo.comm.grpc.DomainSockets.getChannelType; -import static com.salesforce.apollo.comm.grpc.DomainSockets.getEventLoopGroup; -import static com.salesforce.apollo.comm.grpc.DomainSockets.getServerDomainSocketChannelClass; -import static com.salesforce.apollo.crypto.QualifiedBase64.*; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import java.io.IOException; -import java.nio.file.Path; -import java.time.Duration; -import java.util.HashMap; -import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.salesfoce.apollo.test.proto.ByteMessage; import com.salesfoce.apollo.test.proto.TestItGrpc; import com.salesfoce.apollo.test.proto.TestItGrpc.TestItBlockingStub; import com.salesfoce.apollo.test.proto.TestItGrpc.TestItImplBase; -import com.salesforce.apollo.archipelago.Enclave; -import com.salesforce.apollo.archipelago.Link; -import com.salesforce.apollo.archipelago.ManagedServerChannel; -import com.salesforce.apollo.archipelago.Portal; -import com.salesforce.apollo.archipelago.RoutableService; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.comm.grpc.DomainSocketServerInterceptor; import com.salesforce.apollo.crypto.DigestAlgorithm; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.impl.SigningMemberImpl; import com.salesforce.apollo.utils.Utils; - import io.grpc.ManagedChannel; import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator; import io.grpc.netty.NettyChannelBuilder; @@ -51,97 +26,48 @@ import io.grpc.stub.StreamObserver; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static com.salesforce.apollo.comm.grpc.DomainSockets.*; +import static com.salesforce.apollo.crypto.QualifiedBase64.qb64; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; /** * @author hal.hildebrand - * */ public class EnclaveTest { - public static class Server extends TestItImplBase { - private final RoutableService router; - - public Server(RoutableService router) { - this.router = router; - } - - @Override - public void ping(Any request, StreamObserver responseObserver) { - router.evaluate(responseObserver, t -> t.ping(request, responseObserver)); - } - } - - public class ServerA implements TestIt { - @Override - public void ping(Any request, StreamObserver responseObserver) { - responseObserver.onNext(Any.pack(ByteMessage.newBuilder() - .setContents(ByteString.copyFromUtf8("Hello Server A")) - .build())); - responseObserver.onCompleted(); - } - } - - public class ServerB implements TestIt { - @Override - public void ping(Any request, StreamObserver responseObserver) { - responseObserver.onNext(Any.pack(ByteMessage.newBuilder() - .setContents(ByteString.copyFromUtf8("Hello Server B")) - .build())); - responseObserver.onCompleted(); - } - } - - public static interface TestIt { - void ping(Any request, StreamObserver responseObserver); - } - - public static class TestItClient implements TestItService { - private final TestItBlockingStub client; - private final ManagedServerChannel connection; - - public TestItClient(ManagedServerChannel c) { - this.connection = c; - client = TestItGrpc.newBlockingStub(c); - } + private final static Class channelType = getChannelType(); + private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final TestItService local = new TestItService() { @Override public void close() throws IOException { - connection.release(); } @Override public Member getMember() { - return connection.getMember(); + return null; } @Override public Any ping(Any request) { - return client.ping(request); + return null; } - } - - public static interface TestItService extends Link { - Any ping(Any request); - } - - private final static Class channelType = getChannelType(); - - private EventLoopGroup eventLoopGroup; - private final TestItService local = new TestItService() { - - @Override - public void close() throws IOException { - } - - @Override - public Member getMember() { - return null; - } - - @Override - public Any ping(Any request) { - return null; - } - }; + }; + private EventLoopGroup eventLoopGroup; @AfterEach public void after() throws Exception { @@ -168,33 +94,31 @@ public void smokin() throws Exception { final var routes = new HashMap(); final Function router = s -> routes.get(s); - final var portalEndpoint = new DomainSocketAddress(Path.of("target") - .resolve(UUID.randomUUID().toString()) - .toFile()); + final var portalEndpoint = new DomainSocketAddress( + Path.of("target").resolve(UUID.randomUUID().toString()).toFile()); final var agent = DigestAlgorithm.DEFAULT.getLast(); - final var portal = new Portal<>(agent, - NettyServerBuilder.forAddress(portalEndpoint) - .protocolNegotiator(new DomainSocketNegotiator()) - .channelType(getServerDomainSocketChannelClass()) - .workerEventLoopGroup(getEventLoopGroup()) - .bossEventLoopGroup(getEventLoopGroup()) - .intercept(new DomainSocketServerInterceptor()), - s -> handler(portalEndpoint), bridge, Duration.ofMillis(1), router); + final var portal = new Portal<>(agent, NettyServerBuilder.forAddress(portalEndpoint) + .protocolNegotiator(new DomainSocketNegotiator()) + .channelType(getServerDomainSocketChannelClass()) + .workerEventLoopGroup(getEventLoopGroup()) + .bossEventLoopGroup(getEventLoopGroup()) + .intercept(new DomainSocketServerInterceptor()), + s -> handler(portalEndpoint), bridge, Duration.ofMillis(1), router); final var endpoint1 = new DomainSocketAddress(Path.of("target").resolve(UUID.randomUUID().toString()).toFile()); - var enclave1 = new Enclave(serverMember1, endpoint1, bridge, d -> { + var enclave1 = new Enclave(serverMember1, endpoint1, bridge, d -> { routes.put(qb64(d), endpoint1); }); - var router1 = enclave1.router( ); + var router1 = enclave1.router(); CommonCommunications commsA = router1.create(serverMember1, ctxA, new ServerA(), "A", r -> new Server(r), c -> new TestItClient(c), local); final var endpoint2 = new DomainSocketAddress(Path.of("target").resolve(UUID.randomUUID().toString()).toFile()); - var enclave2 = new Enclave(serverMember2, endpoint2, bridge, d -> { + var enclave2 = new Enclave(serverMember2, endpoint2, bridge, d -> { routes.put(qb64(d), endpoint2); }); - var router2 = enclave2.router( ); + var router2 = enclave2.router(); CommonCommunications commsB = router2.create(serverMember2, ctxB, new ServerB(), "A", r -> new Server(r), c -> new TestItClient(c), local); @@ -223,10 +147,75 @@ public void smokin() throws Exception { private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) + .executor(executor) .eventLoopGroup(eventLoopGroup) .channelType(channelType) .keepAliveTime(1, TimeUnit.SECONDS) .usePlaintext() .build(); } + + public static interface TestIt { + void ping(Any request, StreamObserver responseObserver); + } + + public static interface TestItService extends Link { + Any ping(Any request); + } + + public static class Server extends TestItImplBase { + private final RoutableService router; + + public Server(RoutableService router) { + this.router = router; + } + + @Override + public void ping(Any request, StreamObserver responseObserver) { + router.evaluate(responseObserver, t -> t.ping(request, responseObserver)); + } + } + + public static class TestItClient implements TestItService { + private final TestItBlockingStub client; + private final ManagedServerChannel connection; + + public TestItClient(ManagedServerChannel c) { + this.connection = c; + client = TestItGrpc.newBlockingStub(c); + } + + @Override + public void close() throws IOException { + connection.release(); + } + + @Override + public Member getMember() { + return connection.getMember(); + } + + @Override + public Any ping(Any request) { + return client.ping(request); + } + } + + public class ServerA implements TestIt { + @Override + public void ping(Any request, StreamObserver responseObserver) { + responseObserver.onNext( + Any.pack(ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8("Hello Server A")).build())); + responseObserver.onCompleted(); + } + } + + public class ServerB implements TestIt { + @Override + public void ping(Any request, StreamObserver responseObserver) { + responseObserver.onNext( + Any.pack(ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8("Hello Server B")).build())); + responseObserver.onCompleted(); + } + } } diff --git a/model/src/main/java/com/salesforce/apollo/model/Domain.java b/model/src/main/java/com/salesforce/apollo/model/Domain.java index 0eec6e8e0..ed6b5c575 100644 --- a/model/src/main/java/com/salesforce/apollo/model/Domain.java +++ b/model/src/main/java/com/salesforce/apollo/model/Domain.java @@ -53,6 +53,7 @@ import java.sql.JDBCType; import java.util.*; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import static com.salesforce.apollo.crypto.QualifiedBase64.qb64; @@ -61,22 +62,23 @@ import static java.nio.file.Path.of; /** - * An abstract sharded domain, top level, or sub domain. A domain minimally - * consists of a managed KERL, ReBAC Oracle and the defined membership + * An abstract sharded domain, top level, or sub domain. A domain minimally consists of a managed KERL, ReBAC Oracle and + * the defined membership * * @author hal.hildebrand */ abstract public class Domain { + protected static final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private static final Logger log = LoggerFactory.getLogger(Domain.class); + protected final CHOAM choam; + protected final KERL commonKERL; + protected final ControlledIdentifierMember member; + protected final Mutator mutator; + protected final Oracle oracle; + protected final Parameters params; + protected final SqlStateMachine sqlStateMachine; + protected final Connection stateConnection; - private static final Logger log = LoggerFactory.getLogger(Domain.class); - protected final CHOAM choam; - protected final KERL commonKERL; - protected final ControlledIdentifierMember member; - protected final Mutator mutator; - protected final Oracle oracle; - protected final Parameters params; - protected final SqlStateMachine sqlStateMachine; - protected final Connection stateConnection; public Domain(ControlledIdentifierMember member, Parameters.Builder params, String dbURL, Path checkpointBaseDir, RuntimeParameters.Builder runtime, TransactionConfiguration txnConfig) { var paramsClone = params.clone(); @@ -96,30 +98,30 @@ public Domain(ControlledIdentifierMember member, Parameters.Builder params, Stri paramsClone.getProducer().ethereal().setSigner(member); this.params = paramsClone.build(runtimeClone.setCheckpointer(sqlStateMachine.getCheckpointer()) - .setProcessor(sqlStateMachine.getExecutor()) - .setMember(member) - .setRestorer(sqlStateMachine.getBootstrapper()) - .setKerl(() -> kerl()) - .setGenesisData(members -> genesisOf(members)) - .build()); + .setProcessor(sqlStateMachine.getExecutor()) + .setMember(member) + .setRestorer(sqlStateMachine.getBootstrapper()) + .setKerl(() -> kerl()) + .setGenesisData(members -> genesisOf(members)) + .build()); choam = new CHOAM(this.params); mutator = sqlStateMachine.getMutator(choam.getSession()); stateConnection = sqlStateMachine.newConnection(); - this.oracle = new ShardedOracle(stateConnection, mutator, txnConfig.scheduler(), params.getSubmitTimeout() ); + this.oracle = new ShardedOracle(stateConnection, mutator, txnConfig.scheduler(), params.getSubmitTimeout()); this.commonKERL = new ShardedKERL(stateConnection, mutator, txnConfig.scheduler(), params.getSubmitTimeout(), - params.getDigestAlgorithm() ); + params.getDigestAlgorithm()); log.info("Domain: {} member: {} db URL: {} checkpoint base dir: {}", this.params.context().getId(), - member.getId(), dbURL, checkpointBaseDir); + member.getId(), dbURL, checkpointBaseDir); } public static void addMembers(Connection connection, List members, String state) { var context = DSL.using(connection, SQLDialect.H2); for (var m : members) { var id = context.insertInto(IDENTIFIER, IDENTIFIER.PREFIX) - .values(m) - .onDuplicateKeyIgnore() - .returning(IDENTIFIER.ID) - .fetchOne(); + .values(m) + .onDuplicateKeyIgnore() + .returning(IDENTIFIER.ID) + .fetchOne(); if (id != null) { context.insertInto(MEMBER).set(MEMBER.IDENTIFIER, id.value1()).onConflictDoNothing().execute(); } @@ -138,19 +140,18 @@ public static Txn boostrapMigration() { resources.put(of("/model/model.xml"), res("/model/model.xml")); return Txn.newBuilder() - .setMigration(Migration.newBuilder() - .setUpdate(Mutator.changeLog(resources, "/initialize.xml")) - .build()) - .build(); + .setMigration( + Migration.newBuilder().setUpdate(Mutator.changeLog(resources, "/initialize.xml")).build()) + .build(); } public static boolean isMember(DSLContext context, SelfAddressingIdentifier id) { final var idTable = com.salesforce.apollo.stereotomy.schema.tables.Identifier.IDENTIFIER; return context.fetchExists(context.select(MEMBER.IDENTIFIER) - .from(MEMBER) - .join(idTable) - .on(idTable.ID.eq(MEMBER.IDENTIFIER)) - .and(idTable.PREFIX.eq(id.getDigest().getBytes()))); + .from(MEMBER) + .join(idTable) + .on(idTable.ID.eq(MEMBER.IDENTIFIER)) + .and(idTable.PREFIX.eq(id.getDigest().getBytes()))); } public static Path tempDirOf(ControlledIdentifier id) { @@ -171,12 +172,12 @@ private static URL res(String resource) { public boolean activate(Member m) { if (!active()) { return params.runtime() - .foundation() - .getFoundation() - .getMembershipList() - .stream() - .map(d -> Digest.from(d)) - .anyMatch(d -> m.getId().equals(d)); + .foundation() + .getFoundation() + .getMembershipList() + .stream() + .map(d -> Digest.from(d)) + .anyMatch(d -> m.getId().equals(d)); } final var context = DSL.using(stateConnection, SQLDialect.H2); final var activeMember = isMember(context, new SelfAddressingIdentifier(m.getId())); @@ -207,8 +208,7 @@ public Identifier getIdentifier() { } /** - * @return the adapter that provides raw Protobuf access to the underlying KERI - * resolution + * @return the adapter that provides raw Protobuf access to the underlying KERI resolution */ public ProtoKERLAdapter getKERLService() { return new ProtoKERLAdapter(commonKERL); @@ -244,27 +244,22 @@ private List genesisOf(Map members) { // Schemas transactions.add(transactionOf(boostrapMigration())); sorted.stream() - .map(e -> manifest(members.get(e))) - .filter(t -> t != null) - .flatMap(l -> l.stream()) - .forEach(t -> transactions.add(t)); - transactions.add(initalMembership(params.runtime() - .foundation() - .getFoundation() - .getMembershipList() - .stream() - .map(d -> Digest.from(d)) - .toList())); + .map(e -> manifest(members.get(e))) + .filter(t -> t != null) + .flatMap(l -> l.stream()) + .forEach(t -> transactions.add(t)); + transactions.add(initalMembership( + params.runtime().foundation().getFoundation().getMembershipList().stream().map(d -> Digest.from(d)).toList())); return transactions; } private Transaction initalMembership(List digests) { - var call = mutator.call("{ call apollo_kernel.add_members(?, ?) }", - digests.stream() - .map(d -> new SelfAddressingIdentifier(d)) - .map(id -> id.toIdent().toByteArray()) - .toList(), - "active"); + var call = mutator.call("{ call apollo_kernel.add_members(?, ?) }", digests.stream() + .map( + d -> new SelfAddressingIdentifier(d)) + .map( + id -> id.toIdent().toByteArray()) + .toList(), "active"); return transactionOf(Txn.newBuilder().setCall(call).build()); } @@ -295,16 +290,17 @@ private Transaction transactionOf(KeyEventWithAttachments ke) { default -> throw new IllegalArgumentException("Unexpected value: " + ke.getEventCase()); }; var batch = mutator.batch(); - batch.execute(mutator.call("{ ? = call stereotomy.append(?, ?, ?) }", - Collections.singletonList(JDBCType.BINARY), event.getBytes(), event.getIlk(), - DigestAlgorithm.DEFAULT.digestCode())); + batch.execute( + mutator.call("{ ? = call stereotomy.append(?, ?, ?) }", Collections.singletonList(JDBCType.BINARY), + event.getBytes(), event.getIlk(), DigestAlgorithm.DEFAULT.digestCode())); if (!ke.getAttachment().equals(Attachment.getDefaultInstance())) { var attach = AttachmentEvent.newBuilder() - .setCoordinates(event.getCoordinates().toEventCoords()) - .setAttachment(ke.getAttachment()) - .build(); - batch.execute(mutator.call("{ ? = call stereotomy.appendAttachment(?) }", - Collections.singletonList(JDBCType.BINARY), attach.toByteArray())); + .setCoordinates(event.getCoordinates().toEventCoords()) + .setAttachment(ke.getAttachment()) + .build(); + batch.execute( + mutator.call("{ ? = call stereotomy.appendAttachment(?) }", Collections.singletonList(JDBCType.BINARY), + attach.toByteArray())); } return transactionOf(Txn.newBuilder().setBatched(batch.build()).build()); } @@ -316,14 +312,14 @@ private Transaction transactionOf(Message message) { var signer = new Signer.MockSigner(params.viewSigAlgorithm()); var digeste = params.digestAlgorithm().getOrigin().toDigeste(); var sig = signer.sign(digeste.toByteString().asReadOnlyByteBuffer(), buff, - message.toByteString().asReadOnlyByteBuffer()); + message.toByteString().asReadOnlyByteBuffer()); return Transaction.newBuilder() - .setSource(digeste) - .setContent(message.toByteString()) - .setSignature(sig.toSig()) - .build(); + .setSource(digeste) + .setContent(message.toByteString()) + .setSignature(sig.toSig()) + .build(); } - public record TransactionConfiguration( ScheduledExecutorService scheduler) { + public record TransactionConfiguration(ScheduledExecutorService scheduler) { } } diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index 4040d9c9e..d0f12a715 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -54,10 +54,7 @@ import java.time.Duration; import java.time.Instant; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import static com.salesforce.apollo.comm.grpc.DomainSockets.*; @@ -100,7 +97,7 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, Builder bu InetSocketAddress endpoint, Path commDirectory, com.salesforce.apollo.fireflies.Parameters.Builder ff, TransactionConfiguration txnConfig, EventValidation eventValidation, - IdentifierSpecification.Builder subDomainSpecification ) { + IdentifierSpecification.Builder subDomainSpecification) { super(member, builder, dbURL, checkpointBaseDir, runtime, txnConfig); communicationsDirectory = commDirectory; var base = Context.newBuilder() @@ -113,8 +110,7 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, Builder bu JdbcConnectionPool connectionPool = JdbcConnectionPool.create(url, "", ""); connectionPool.setMaxConnections(10); dht = new KerlDHT(Duration.ofMillis(10), foundation.getContext(), member, connectionPool, - params.digestAlgorithm(), params.communications(), Duration.ofSeconds(1), 0.00125, - null); + params.digestAlgorithm(), params.communications(), Duration.ofSeconds(1), 0.00125, null); listener = foundation.register(listener()); bridge = new DomainSocketAddress(communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile()); portalEndpoint = new DomainSocketAddress( @@ -125,8 +121,7 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, Builder bu .workerEventLoopGroup(portalEventLoopGroup) .bossEventLoopGroup(portalEventLoopGroup) .intercept(new DomainSocketServerInterceptor()), - s -> handler(portalEndpoint), bridge, Duration.ofMillis(1), - s -> routes.get(s)); + s -> handler(portalEndpoint), bridge, Duration.ofMillis(1), s -> routes.get(s)); outerContextEndpoint = new DomainSocketAddress( communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile()); outerContextService = NettyServerBuilder.forAddress(outerContextEndpoint) @@ -234,6 +229,7 @@ public void stop() { private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) + .executor(executor) .eventLoopGroup(clientEventLoopGroup) .channelType(channelType) .keepAliveTime(1, TimeUnit.SECONDS) diff --git a/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java b/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java index 10c94b3c3..aaac9308e 100644 --- a/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java +++ b/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java @@ -57,6 +57,7 @@ import java.security.SecureRandom; import java.time.Duration; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -77,6 +78,7 @@ public class DemesneImpl implements Demesne { private static final int DEFAULT_VIRTUAL_THREADS = 5; private static final EventLoopGroup eventLoopGroup = getEventLoopGroup(); private static final Logger log = LoggerFactory.getLogger(DemesneImpl.class); + private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); private final KERL kerl; private final OuterContextClient outer; private final DemesneParameters parameters; @@ -196,11 +198,11 @@ private CachingKERL kerlFrom(File address) { Digest kerlContext = context.getId(); final var serverAddress = new DomainSocketAddress(address); log.info("Kerl context: {} address: {}", kerlContext, serverAddress); - NettyChannelBuilder.forAddress(serverAddress); return new CachingKERL(f -> { ManagedChannel channel = null; try { channel = NettyChannelBuilder.forAddress(serverAddress) + .executor(executor) .intercept(clientInterceptor(kerlContext)) .eventLoopGroup(eventLoopGroup) .channelType(channelType) @@ -221,6 +223,7 @@ private CachingKERL kerlFrom(File address) { private OuterContextClient outerFrom(File address) { return new OuterContextClient(NettyChannelBuilder.forAddress(new DomainSocketAddress(address)) + .executor(executor) .intercept(clientInterceptor(context.getId())) .eventLoopGroup(eventLoopGroup) .channelType(channelType) diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index 371f232d4..c071205f0 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -94,7 +94,7 @@ public void before() throws Exception { identities.keySet().forEach(d -> foundation.addMembership(d.toDigeste())); var sealed = FoundationSeal.newBuilder().setFoundation(foundation).build(); TransactionConfiguration txnConfig = new TransactionConfiguration( - Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory())); + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())); identities.forEach((digest, id) -> { var context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3); final var member = new ControlledIdentifierMember(id); diff --git a/model/src/test/java/com/salesforce/apollo/model/StoredProceduresTest.java b/model/src/test/java/com/salesforce/apollo/model/StoredProceduresTest.java index 9a2f98c99..5e24a352e 100644 --- a/model/src/test/java/com/salesforce/apollo/model/StoredProceduresTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/StoredProceduresTest.java @@ -33,7 +33,7 @@ public class StoredProceduresTest { @Test public void membership() throws Exception { var entropy = new Random(0x1638); - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); Duration timeout = Duration.ofSeconds(100); Executor exec = Executors.newSingleThreadExecutor(Thread.ofVirtual().factory()); Emulator emmy = new Emulator(); diff --git a/model/src/test/java/com/salesforce/apollo/model/delphinius/ShardedOracleTest.java b/model/src/test/java/com/salesforce/apollo/model/delphinius/ShardedOracleTest.java index 249f5cd83..fe529c509 100644 --- a/model/src/test/java/com/salesforce/apollo/model/delphinius/ShardedOracleTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/delphinius/ShardedOracleTest.java @@ -26,7 +26,7 @@ public class ShardedOracleTest { @Test public void func() throws Exception { - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); Duration timeout = Duration.ofSeconds(1); Emulator emmy = new Emulator(); diff --git a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java index 297b13b9f..56e7adf5d 100644 --- a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java @@ -61,6 +61,7 @@ import java.util.HashMap; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -72,9 +73,10 @@ * @author hal.hildebrand */ public class DemesneTest { - private final static Class clientChannelType = getChannelType(); + private final static Class clientChannelType = getChannelType(); private static final Class serverChannelType = getServerDomainSocketChannelClass(); - private final TestItService local = new TestItService() { + private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final TestItService local = new TestItService() { @Override public void close() throws IOException { @@ -90,7 +92,7 @@ public Any ping(Any request) { return null; } }; - private EventLoopGroup eventLoopGroup; + private EventLoopGroup eventLoopGroup; public static ClientInterceptor clientInterceptor(Digest ctx) { return new ClientInterceptor() { @@ -132,33 +134,35 @@ public void portal() throws Exception { final var bridge = new DomainSocketAddress(Path.of("target").resolve(UUID.randomUUID().toString()).toFile()); - final var portalEndpoint = new DomainSocketAddress(Path.of("target") - .resolve(UUID.randomUUID().toString()) - .toFile()); + final var portalEndpoint = new DomainSocketAddress( + Path.of("target").resolve(UUID.randomUUID().toString()).toFile()); final var routes = new HashMap(); - final var portal = new Portal<>(serverMember1.getId(), - NettyServerBuilder.forAddress(portalEndpoint) - .protocolNegotiator(new DomainSocketNegotiator()) - .channelType(getServerDomainSocketChannelClass()) - .workerEventLoopGroup(getEventLoopGroup()) - .bossEventLoopGroup(getEventLoopGroup()) - .intercept(new DomainSocketServerInterceptor()), - s -> handler(portalEndpoint), bridge, Duration.ofMillis(1), - s -> routes.get(s)); + final var portal = new Portal<>(serverMember1.getId(), NettyServerBuilder.forAddress(portalEndpoint) + .protocolNegotiator( + new DomainSocketNegotiator()) + .channelType( + getServerDomainSocketChannelClass()) + .workerEventLoopGroup( + getEventLoopGroup()) + .bossEventLoopGroup( + getEventLoopGroup()) + .intercept( + new DomainSocketServerInterceptor()), + s -> handler(portalEndpoint), bridge, Duration.ofMillis(1), s -> routes.get(s)); final var endpoint1 = new DomainSocketAddress(Path.of("target").resolve(UUID.randomUUID().toString()).toFile()); var enclave1 = new Enclave(serverMember1, endpoint1, bridge, d -> routes.put(qb64(d), endpoint1)); var router1 = enclave1.router(); CommonCommunications commsA = router1.create(serverMember1, ctxA, new ServerA(), "A", - r -> new Server(r), - c -> new TestItClient(c), local); + r -> new Server(r), + c -> new TestItClient(c), local); final var endpoint2 = new DomainSocketAddress(Path.of("target").resolve(UUID.randomUUID().toString()).toFile()); var enclave2 = new Enclave(serverMember2, endpoint2, bridge, d -> routes.put(qb64(d), endpoint2)); var router2 = enclave2.router(); CommonCommunications commsB = router2.create(serverMember2, ctxB, new ServerB(), "B", - r -> new Server(r), - c -> new TestItClient(c), local); + r -> new Server(r), + c -> new TestItClient(c), local); portal.start(); router1.start(); @@ -193,15 +197,16 @@ public void smokin() throws Exception { Member serverMember = new ControlledIdentifierMember(identifier); final var portalAddress = UUID.randomUUID().toString(); final var portalEndpoint = new DomainSocketAddress(commDirectory.resolve(portalAddress).toFile()); - final var router = new RouterImpl(serverMember, - NettyServerBuilder.forAddress(portalEndpoint) - .protocolNegotiator(new DomainSocketNegotiator()) - .channelType(serverChannelType) - .workerEventLoopGroup(eventLoopGroup) - .bossEventLoopGroup(eventLoopGroup) - .intercept(new DomainSocketServerInterceptor()), - ServerConnectionCache.newBuilder().setFactory(to -> handler(portalEndpoint)), - null); + final var router = new RouterImpl(serverMember, NettyServerBuilder.forAddress(portalEndpoint) + .protocolNegotiator( + new DomainSocketNegotiator()) + .channelType(serverChannelType) + .workerEventLoopGroup(eventLoopGroup) + .bossEventLoopGroup(eventLoopGroup) + .intercept( + new DomainSocketServerInterceptor()), + ServerConnectionCache.newBuilder().setFactory(to -> handler(portalEndpoint)), + null); router.start(); final var registered = new TreeSet(); @@ -224,30 +229,30 @@ public void register(SubContext context) { final var kerlServer = new DemesneKERLServer(new ProtoKERLAdapter(kerl), null); final var outerService = new OuterContextServer(service, null); final var outerContextService = NettyServerBuilder.forAddress(parentEndpoint) - .protocolNegotiator(new DomainSocketNegotiator()) - .channelType(getServerDomainSocketChannelClass()) - .addService(kerlServer) - .addService(outerService) - .workerEventLoopGroup(getEventLoopGroup()) - .bossEventLoopGroup(getEventLoopGroup()) - .intercept(new DomainSocketServerInterceptor()) - .build(); + .protocolNegotiator(new DomainSocketNegotiator()) + .channelType(getServerDomainSocketChannelClass()) + .addService(kerlServer) + .addService(outerService) + .workerEventLoopGroup(getEventLoopGroup()) + .bossEventLoopGroup(getEventLoopGroup()) + .intercept(new DomainSocketServerInterceptor()) + .build(); outerContextService.start(); final var parameters = DemesneParameters.newBuilder() - .setContext(context.toDigeste()) - .setPortal(portalAddress) - .setParent(parentAddress) - .setCommDirectory(commDirectory.toString()) - .setMaxTransfer(100) - .setFalsePositiveRate(.125) - .build(); + .setContext(context.toDigeste()) + .setPortal(portalAddress) + .setParent(parentAddress) + .setCommDirectory(commDirectory.toString()) + .setMaxTransfer(100) + .setFalsePositiveRate(.125) + .build(); final var demesne = new DemesneImpl(parameters); Builder specification = IdentifierSpecification.newBuilder(); final var incp = demesne.inception(identifier.getIdentifier().toIdent(), specification); final var seal = Seal.EventSeal.construct(incp.getIdentifier(), incp.hash(controller.digestAlgorithm()), - incp.getSequenceNumber().longValue()); + incp.getSequenceNumber().longValue()); final var builder = InteractionSpecification.newBuilder().addAllSeals(Collections.singletonList(seal)); @@ -268,17 +273,19 @@ public void register(SubContext context) { assertEquals(1, attached.seals().size()); final var extracted = attached.seals().get(0); assertTrue(extracted instanceof Seal.DigestSeal); -// assertEquals(1, attached.endorsements().size()); + // assertEquals(1, attached.endorsements().size()); } private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) - .eventLoopGroup(eventLoopGroup) - .channelType(clientChannelType) - .keepAliveTime(1, TimeUnit.SECONDS) - .usePlaintext() - .build(); + .executor(executor) + .eventLoopGroup(eventLoopGroup) + .channelType(clientChannelType) + .keepAliveTime(1, TimeUnit.SECONDS) + .usePlaintext() + .build(); } + public static interface TestIt { void ping(Any request, StreamObserver responseObserver); } @@ -301,7 +308,7 @@ public void ping(Any request, StreamObserver responseObserver) { } public static class TestItClient implements TestItService { - private final TestItBlockingStub client; + private final TestItBlockingStub client; private final ManagedServerChannel connection; public TestItClient(ManagedServerChannel c) { @@ -328,9 +335,8 @@ public Any ping(Any request) { public class ServerA implements TestIt { @Override public void ping(Any request, StreamObserver responseObserver) { - responseObserver.onNext(Any.pack(ByteMessage.newBuilder() - .setContents(ByteString.copyFromUtf8("Hello Server A")) - .build())); + responseObserver.onNext( + Any.pack(ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8("Hello Server A")).build())); responseObserver.onCompleted(); } } @@ -338,9 +344,8 @@ public void ping(Any request, StreamObserver responseObserver) { public class ServerB implements TestIt { @Override public void ping(Any request, StreamObserver responseObserver) { - responseObserver.onNext(Any.pack(ByteMessage.newBuilder() - .setContents(ByteString.copyFromUtf8("Hello Server B")) - .build())); + responseObserver.onNext( + Any.pack(ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8("Hello Server B")).build())); responseObserver.onCompleted(); } } diff --git a/model/src/test/java/com/salesforce/apollo/model/stereotomy/ShardedKERLTest.java b/model/src/test/java/com/salesforce/apollo/model/stereotomy/ShardedKERLTest.java index a7402257a..75050d93b 100644 --- a/model/src/test/java/com/salesforce/apollo/model/stereotomy/ShardedKERLTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/stereotomy/ShardedKERLTest.java @@ -51,7 +51,7 @@ public void before() throws Exception { @Test public void delegated() throws Exception { - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); Duration timeout = Duration.ofSeconds(1000); Emulator emmy = new Emulator(); emmy.start(Domain.boostrapMigration()); @@ -131,7 +131,7 @@ public void delegated() throws Exception { @Test public void direct() throws Exception { - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); Duration timeout = Duration.ofSeconds(1); Emulator emmy = new Emulator(); emmy.start(Domain.boostrapMigration()); diff --git a/protocols/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java b/protocols/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java index e4284ddd0..347b415e7 100644 --- a/protocols/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java +++ b/protocols/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java @@ -61,7 +61,7 @@ public void simulation() throws IOException { .build(); AtomicLong counter = new AtomicLong(); - Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()).scheduleAtFixedRate(() -> { + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()).scheduleAtFixedRate(() -> { System.out.println(" " + counter.getAndSet(0) + " : " + limiter.toString()); }, 1, 1, TimeUnit.SECONDS); diff --git a/protocols/src/test/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiterTest.java b/protocols/src/test/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiterTest.java index c688c5f80..24679c422 100644 --- a/protocols/src/test/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiterTest.java +++ b/protocols/src/test/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiterTest.java @@ -105,7 +105,7 @@ public void unblockWhenFullBeforeTimeout() { List> listeners = acquireN(blockingLimiter, 4); // Schedule one to release in 250 msec - Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()) + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()) .schedule(() -> listeners.get(0).get().onSuccess(), 250, TimeUnit.MILLISECONDS); // Next acquire will block for 1 second diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index 7153f9ad9..26d2f2623 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -298,7 +298,7 @@ protected void pre() throws Exception { var mutator = txneer.getMutator(choams.get(members.get(0).getId()).getSession()); transactioneers.add( new Transactioneer(() -> update(entropy, mutator), mutator, timeout, 1, txExecutor, countdown, - Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()))); + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()))); System.out.println("Transaction member: " + members.get(0).getId()); System.out.println("Starting txns"); transactioneers.stream().forEach(e -> e.start()); diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/EmulationTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/EmulationTest.java index 0cc56482c..0fcbc57a2 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/EmulationTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/EmulationTest.java @@ -24,7 +24,7 @@ public class EmulationTest { @Test public void functional() throws Exception { // Resources to manage - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); // How long to wait until timing out ;) Duration timeout = Duration.ofSeconds(3);