diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index f8cdbee1b..a5ee566f6 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -106,12 +106,12 @@ public GenesisAssembly(ViewContext vc, CommonCommunications comms, @Override public void certify() { - if (slate.size() != nextAssembly.size()) { - log.info("Not certifying genesis for: {} slate incomplete: {} on: {}", view.context().getId(), - slate.keySet().stream().sorted().toList(), params().member().getId()); + if (slate.size() < params().majority()) { + log.info("Not certifying genesis for: {} need: {} slate incomplete: {} on: {}", view.context().getId(), + params().majority(), slate.keySet().stream().sorted().toList(), params().member().getId()); return; } - assert slate.size() == nextAssembly.size() : "Expected: %s members, slate: %s".formatted(nextAssembly.size(), + assert slate.size() >= params().majority() : "Expected: %s members, slate: %s".formatted(params().majority(), slate.size()); reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(), new NullBlock( @@ -187,12 +187,12 @@ public void publish() { log.trace("Cannot publish genesis, reconfiguration is NULL on: {}", params().member().getId()); return; } - if (witnesses.size() < nextAssembly.size()) { + if (witnesses.size() < params().majority()) { log.trace("Cannot publish genesis: {} with: {} witnesses on: {}", reconfiguration.hash, witnesses.size(), params().member().getId()); return; } - if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < nextAssembly.size()) { + if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < params().majority()) { log.trace("Cannot publish genesis: {} with: {} joins on: {}", reconfiguration.hash, reconfiguration.block.getGenesis().getInitialView().getJoinsCount(), params().member().getId()); return; diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index 2e1d5cb11..c6b854426 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -238,7 +238,7 @@ private boolean checkAssembly() { if (selected == null) { return false; } - if (proposals.size() == selected.majority) { + if (proposals.size() == selected.assembly.size()) { transitions.certified(); return true; } @@ -401,7 +401,7 @@ public String toString() { private class Recon implements Reconfiguration { @Override public void certify() { - if (proposals.size() == selected.majority) { + if (proposals.size() == selected.assembly.size()) { log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority, nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); transitions.certified(); @@ -424,11 +424,13 @@ public void checkAssembly() { } public void checkViews() { + countdown.set(-1); vote(); } @Override public void chill() { + countdown.set(-1); if (ViewAssembly.this.checkAssembly()) { transitions.certified(); } else { @@ -441,6 +443,15 @@ public void complete() { ViewAssembly.this.complete(); } + @Override + public void convened() { + if (viewProposals.size() == params().context().getRingCount()) { + transitions.proposed(); + } else { + countdown.set(2); + } + } + @Override public void failed() { view.onFailure(); @@ -456,5 +467,12 @@ public void finish() { public void publishViews() { propose(); } + + @Override + public void vibeCheck() { + if (ViewAssembly.this.checkAssembly()) { + transitions.certified(); + } + } } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java index 246daa51a..41532259b 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java @@ -23,12 +23,16 @@ public interface Reconfiguration { void complete(); + void convened(); + void failed(); void finish(); void publishViews(); + void vibeCheck(); + enum Reconfigure implements Transitions { AWAIT_ASSEMBLY { // Publish the Views of this node @@ -37,11 +41,27 @@ public void publish() { context().publishViews(); } - // We have a majority of members submitting view proposals + // We have a >= majority submitting view proposals + @Override + public Transitions proposed() { + return CONVIENE; + } + }, CONVIENE { + @Override + public Transitions countdownCompleted() { + return proposed(); + } + + // We have a >= majority of members submitting view proposals @Override public Transitions proposed() { return VIEW_AGREEMENT; } + + @Entry + public void conviene() { + context().convened(); + } }, CERTIFICATION { // We have a full complement of the committee view proposals @Override @@ -96,6 +116,17 @@ public Transitions certified() { return CERTIFICATION; } + @Override + public Transitions checkAssembly() { + context().checkAssembly(); + return null; + } + + @Entry + public void vibin() { + context().vibeCheck(); + } + // Check to see if we already have a full complement of committee Joins @Entry public void chillin() { @@ -149,12 +180,13 @@ public Transitions viewAcquired() { return GATHER; } - // no op+ + // no op @Override public Transitions proposed() { return null; } } + } interface Transitions extends FsmExecutor { diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java index 36a616dd3..9a1176cb7 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java @@ -39,7 +39,7 @@ import java.time.Instant; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -66,8 +66,9 @@ public class MtlsTest { CARDINALITY = LARGE_TESTS ? 20 : 10; } - private final List communications = new ArrayList<>(); - private List views; + private final List communications = new ArrayList<>(); + private List views; + private ExecutorService executor; @BeforeAll public static void beforeClass() throws Exception { @@ -98,10 +99,14 @@ public void after() { communications.forEach(e -> e.close(Duration.ofSeconds(1))); communications.clear(); } + if (executor != null) { + executor.shutdown(); + } } @Test public void smoke() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var parameters = Parameters.newBuilder().setMaximumTxfr(20).build(); final Duration duration = Duration.ofMillis(50); var registry = new MetricRegistry(); @@ -128,7 +133,7 @@ public void smoke() throws Exception { builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry)); CertificateWithPrivateKey certWithKey = certs.get(node.getId()); Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router( - builder, Executors.newVirtualThreadPerTaskExecutor()); + builder, executor); communications.add(comms); return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, Verifiers.NONE, comms, parameters, DigestAlgorithm.DEFAULT, metrics); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java index 8277eb526..a07eafb56 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java @@ -13,7 +13,7 @@ import java.util.Collections; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; import java.util.function.Predicate; import java.util.function.Supplier; @@ -21,19 +21,6 @@ * @author hal.hildebrand */ public interface RouterSupplier { - static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory) { - return newCachedThreadPool(corePoolSize, threadFactory, true); - } - - static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory, boolean preStart) { - var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, - new SynchronousQueue(), threadFactory); - if (preStart) { - threadPoolExecutor.prestartAllCoreThreads(); - } - return threadPoolExecutor; - } - default Router router() { return router(ServerConnectionCache.newBuilder(), RouterImpl::defaultServerLimit, null); } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java index 5d4c19057..cd65bd4fe 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java @@ -40,7 +40,7 @@ public class UnsafeExecutors { public static ExecutorService newVirtualThreadPerTaskExecutor() { var executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); - executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); executor.prestartAllCoreThreads(); return virtualThreadExecutor(executor); } diff --git a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java index b9fc84dfb..2d6eb50c2 100644 --- a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java @@ -10,10 +10,7 @@ import com.codahale.metrics.MetricRegistry; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.context.DynamicContext; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; @@ -39,6 +36,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -64,6 +62,7 @@ public class RbcTest { private final List communications = new ArrayList<>(); private final AtomicInteger totalReceived = new AtomicInteger(0); private List messengers; + private ExecutorService executor; @AfterEach public void after() { @@ -71,10 +70,14 @@ public void after() { messengers.forEach(e -> e.stop()); } communications.forEach(e -> e.close(Duration.ofMillis(0))); + if (executor != null) { + executor.shutdown(); + } } @Test public void broadcast() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); MetricRegistry registry = new MetricRegistry(); var entropy = SecureRandom.getInstance("SHA1PRNG"); @@ -96,11 +99,9 @@ public void broadcast() throws Exception { final var prefix = UUID.randomUUID().toString(); final var authentication = ReliableBroadcaster.defaultMessageAdapter(context, DigestAlgorithm.DEFAULT); messengers = members.stream().map(node -> { - var comms = new LocalServer(prefix, node).router(ServerConnectionCache.newBuilder() - .setTarget(30) - .setMetrics( - new ServerConnectionCacheMetricsImpl( - registry))); + var comms = new LocalServer(prefix, node).router( + ServerConnectionCache.newBuilder().setTarget(30).setMetrics(new ServerConnectionCacheMetricsImpl(registry)), + executor); communications.add(comms); comms.start(); return new ReliableBroadcaster(context, node, parameters.build(), comms, metrics, authentication); diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java index 602b910a8..c486f72fe 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java @@ -84,6 +84,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P .protocolNegotiator( new DomainSocketNegotiatorHandler.DomainSocketNegotiator( IMPL)) + .executor(Executors.newVirtualThreadPerTaskExecutor()) .withChildOption(ChannelOption.TCP_NODELAY, true) .channelType(IMPL.getServerDomainSocketChannelClass()) .workerEventLoopGroup(portalEventLoopGroup) @@ -93,6 +94,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P outerContextEndpoint = new DomainSocketAddress( communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile()); outerContextService = NettyServerBuilder.forAddress(outerContextEndpoint) + .executor(Executors.newVirtualThreadPerTaskExecutor()) .protocolNegotiator( new DomainSocketNegotiatorHandler.DomainSocketNegotiator(IMPL)) .withChildOption(ChannelOption.TCP_NODELAY, true) diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index 2ababf109..de2960e19 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -161,7 +161,8 @@ public void before() throws Exception { members.stream().filter(s -> s != testSubject).forEach(s -> context.activate(s)); final var prefix = UUID.randomUUID().toString(); routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { - var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); return localRouter; })); routers.put(testSubject.getId(),