Skip to content

Commit

Permalink
set explicit executors to netty client/server builders
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Nov 5, 2023
1 parent e105b6f commit 6a91c8f
Show file tree
Hide file tree
Showing 20 changed files with 374 additions and 397 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void clientSmoke() throws Exception {
final var parameters = Parameters.newBuilder().setKerl(kerl).build();
@SuppressWarnings("unused")
var gorgon = new Gorgoneion(parameters, member, context, observer, gorgonRouter,
Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()), null);
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), null);

// The registering client
var client = new ControlledIdentifierMember(stereotomy.newIdentifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void smokin() throws Exception {
@SuppressWarnings("unused")
var gorgon = new Gorgoneion(Parameters.newBuilder().setKerl(kerl).build(), member, context, observer,
gorgonRouter,
Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()), null);
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), null);

// The registering client
var client = new ControlledIdentifierMember(stereotomy.newIdentifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.Collections;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static com.salesforce.apollo.comm.grpc.DomainSockets.*;
Expand All @@ -65,9 +67,10 @@
*/
public class DemesneSmoke {

private final static Class<? extends io.netty.channel.Channel> clientChannelType = getChannelType();
private final static Class<? extends io.netty.channel.Channel> clientChannelType = getChannelType();
private static final Class<? extends ServerDomainSocketChannel> serverChannelType = getServerDomainSocketChannelClass();
private EventLoopGroup eventLoopGroup;
private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private EventLoopGroup eventLoopGroup;

public static ClientInterceptor clientInterceptor(Digest ctx) {
return new ClientInterceptor() {
Expand Down Expand Up @@ -118,15 +121,16 @@ public void smokin() throws Exception {
Member serverMember = new ControlledIdentifierMember(identifier);
final var portalAddress = UUID.randomUUID().toString();
final var portalEndpoint = new DomainSocketAddress(commDirectory.resolve(portalAddress).toFile());
final var router = new RouterImpl(serverMember,
NettyServerBuilder.forAddress(portalEndpoint)
.protocolNegotiator(new DomainSocketNegotiator())
.channelType(serverChannelType)
.workerEventLoopGroup(eventLoopGroup)
.bossEventLoopGroup(eventLoopGroup)
.intercept(new DomainSocketServerInterceptor()),
ServerConnectionCache.newBuilder().setFactory(to -> handler(portalEndpoint)),
null);
final var router = new RouterImpl(serverMember, NettyServerBuilder.forAddress(portalEndpoint)
.protocolNegotiator(
new DomainSocketNegotiator())
.channelType(serverChannelType)
.workerEventLoopGroup(eventLoopGroup)
.bossEventLoopGroup(eventLoopGroup)
.intercept(
new DomainSocketServerInterceptor()),
ServerConnectionCache.newBuilder().setFactory(to -> handler(portalEndpoint)),
null);
router.start();

final var registered = new TreeSet<Digest>();
Expand All @@ -149,30 +153,30 @@ public void register(SubContext context) {
final var kerlServer = new DemesneKERLServer(new ProtoKERLAdapter(kerl), null);
final var outerService = new OuterContextServer(service, null);
final var outerContextService = NettyServerBuilder.forAddress(parentEndpoint)
.protocolNegotiator(new DomainSocketNegotiator())
.channelType(getServerDomainSocketChannelClass())
.addService(kerlServer)
.addService(outerService)
.workerEventLoopGroup(getEventLoopGroup())
.bossEventLoopGroup(getEventLoopGroup())
.intercept(new DomainSocketServerInterceptor())
.build();
.protocolNegotiator(new DomainSocketNegotiator())
.channelType(getServerDomainSocketChannelClass())
.addService(kerlServer)
.addService(outerService)
.workerEventLoopGroup(getEventLoopGroup())
.bossEventLoopGroup(getEventLoopGroup())
.intercept(new DomainSocketServerInterceptor())
.build();
outerContextService.start();

final var parameters = DemesneParameters.newBuilder()
.setContext(context.toDigeste())
.setPortal(portalAddress)
.setParent(parentAddress)
.setCommDirectory(commDirectory.toString())
.setMaxTransfer(100)
.setFalsePositiveRate(.125)
.build();
.setContext(context.toDigeste())
.setPortal(portalAddress)
.setParent(parentAddress)
.setCommDirectory(commDirectory.toString())
.setMaxTransfer(100)
.setFalsePositiveRate(.125)
.build();
final var demesne = new DemesneImpl(parameters);
Builder<SelfAddressingIdentifier> specification = IdentifierSpecification.newBuilder();
final var incp = demesne.inception(identifier.getIdentifier().toIdent(), specification);

final var seal = Seal.EventSeal.construct(incp.getIdentifier(), incp.hash(controller.digestAlgorithm()),
incp.getSequenceNumber().longValue());
incp.getSequenceNumber().longValue());

final var builder = InteractionSpecification.newBuilder().addAllSeals(Collections.singletonList(seal));

Expand All @@ -186,11 +190,12 @@ public void register(SubContext context) {

private ManagedChannel handler(DomainSocketAddress address) {
return NettyChannelBuilder.forAddress(address)
.eventLoopGroup(eventLoopGroup)
.channelType(clientChannelType)
.keepAliveTime(1, TimeUnit.SECONDS)
.usePlaintext()
.build();
.executor(executor)
.eventLoopGroup(eventLoopGroup)
.channelType(clientChannelType)
.keepAliveTime(1, TimeUnit.SECONDS)
.usePlaintext()
.build();
}

public static interface TestIt {
Expand All @@ -215,7 +220,7 @@ public void ping(Any request, StreamObserver<Any> responseObserver) {
}

public static class TestItClient implements TestItService {
private final TestItBlockingStub client;
private final TestItBlockingStub client;
private final ManagedServerChannel connection;

public TestItClient(ManagedServerChannel c) {
Expand All @@ -242,19 +247,17 @@ public Any ping(Any request) {
public class ServerA implements TestIt {
@Override
public void ping(Any request, StreamObserver<Any> responseObserver) {
responseObserver.onNext(Any.pack(ByteMessage.newBuilder()
.setContents(ByteString.copyFromUtf8("Hello Server A"))
.build()));
responseObserver.onNext(
Any.pack(ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8("Hello Server A")).build()));
responseObserver.onCompleted();
}
}

public class ServerB implements TestIt {
@Override
public void ping(Any request, StreamObserver<Any> responseObserver) {
responseObserver.onNext(Any.pack(ByteMessage.newBuilder()
.setContents(ByteString.copyFromUtf8("Hello Server B"))
.build()));
responseObserver.onNext(
Any.pack(ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8("Hello Server B")).build()));
responseObserver.onCompleted();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void before() throws Exception {
identities.keySet().forEach(d -> foundation.addMembership(d.toDigeste()));
var sealed = FoundationSeal.newBuilder().setFoundation(foundation).build();
TransactionConfiguration txnConfig = new TransactionConfiguration(
Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()));
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()));
identities.forEach((digest, id) -> {
var context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3);
final var member = new ControlledIdentifierMember(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
}
};
final var builder = NettyChannelBuilder.forAddress(bridge)
.executor(executor)
.eventLoopGroup(eventLoopGroup)
.channelType(channelType)
.usePlaintext()
.executor(executor)
.intercept(clientInterceptor);
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,36 @@
*/
package com.salesforce.apollo.archipelago;

import static com.salesforce.apollo.comm.grpc.DomainSockets.getChannelType;
import static com.salesforce.apollo.comm.grpc.DomainSockets.getEventLoopGroup;
import static com.salesforce.apollo.comm.grpc.DomainSockets.getServerDomainSocketChannelClass;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.salesforce.apollo.comm.grpc.DomainSocketServerInterceptor;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.QualifiedBase64;
import com.salesforce.apollo.membership.Member;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.*;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerBuilder;
import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static com.salesforce.apollo.comm.grpc.DomainSockets.*;

/**
* Local "service mesh" for in process Isolate Enclaves. The Portal provides the
* externally visible GRPC endpoint that all enclaves are multiplexed through.
* The Portal also serves as the exit point from the process that all Isolate
* Local "service mesh" for in process Isolate Enclaves. The Portal provides the externally visible GRPC endpoint that
* all enclaves are multiplexed through. The Portal also serves as the exit point from the process that all Isolate
* Enclaves use to talk to each other and Enclaves in other processes
*
* @author hal.hildebrand
*
*/
public class Portal<To extends Member> {
private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final static Class<? extends io.netty.channel.Channel> channelType = getChannelType();

private final String agent;
Expand All @@ -57,8 +45,7 @@ public class Portal<To extends Member> {
private final Demultiplexer outbound;

public Portal(Digest agent, ServerBuilder<?> inbound, Function<String, ManagedChannel> outbound,
DomainSocketAddress bridge, Duration keepAlive,
Function<String, DomainSocketAddress> router) {
DomainSocketAddress bridge, Duration keepAlive, Function<String, DomainSocketAddress> router) {
this.inbound = new Demultiplexer(inbound, Router.METADATA_CONTEXT_KEY, d -> handler(router.apply(d)));
this.outbound = new Demultiplexer(NettyServerBuilder.forAddress(bridge)
.executor(executor)
Expand Down Expand Up @@ -98,6 +85,7 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
}
};
return NettyChannelBuilder.forAddress(address)
.executor(executor)
.eventLoopGroup(eventLoopGroup)
.channelType(channelType)
.keepAliveTime(keepAlive.toNanos(), TimeUnit.NANOSECONDS)
Expand Down
Loading

0 comments on commit 6a91c8f

Please sign in to comment.