Skip to content

Commit

Permalink
stable (#208)
Browse files Browse the repository at this point in the history
* use default virtual threading caching pools

* correct use of majority in ViewAssembly

* logging

* revert

* revert

* set executor

* one executor

* forgot to make use of it ;)

* add vibe check to CHILLIN

* use majority certs for Reconfiguration.  Add new CONVENE state for view slate completion

* use UE for RBC testing

* moar threads for the tests

* normal logging
  • Loading branch information
Hellblazer authored Jun 9, 2024
1 parent 76ac944 commit 865b6bf
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,

@Override
public void certify() {
if (slate.size() != nextAssembly.size()) {
log.info("Not certifying genesis for: {} slate incomplete: {} on: {}", view.context().getId(),
slate.keySet().stream().sorted().toList(), params().member().getId());
if (slate.size() < params().majority()) {
log.info("Not certifying genesis for: {} need: {} slate incomplete: {} on: {}", view.context().getId(),
params().majority(), slate.keySet().stream().sorted().toList(), params().member().getId());
return;
}
assert slate.size() == nextAssembly.size() : "Expected: %s members, slate: %s".formatted(nextAssembly.size(),
assert slate.size() >= params().majority() : "Expected: %s members, slate: %s".formatted(params().majority(),
slate.size());
reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(),
new NullBlock(
Expand Down Expand Up @@ -187,12 +187,12 @@ public void publish() {
log.trace("Cannot publish genesis, reconfiguration is NULL on: {}", params().member().getId());
return;
}
if (witnesses.size() < nextAssembly.size()) {
if (witnesses.size() < params().majority()) {
log.trace("Cannot publish genesis: {} with: {} witnesses on: {}", reconfiguration.hash, witnesses.size(),
params().member().getId());
return;
}
if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < nextAssembly.size()) {
if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < params().majority()) {
log.trace("Cannot publish genesis: {} with: {} joins on: {}", reconfiguration.hash,
reconfiguration.block.getGenesis().getInitialView().getJoinsCount(), params().member().getId());
return;
Expand Down
22 changes: 20 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private boolean checkAssembly() {
if (selected == null) {
return false;
}
if (proposals.size() == selected.majority) {
if (proposals.size() == selected.assembly.size()) {
transitions.certified();
return true;
}
Expand Down Expand Up @@ -401,7 +401,7 @@ public String toString() {
private class Recon implements Reconfiguration {
@Override
public void certify() {
if (proposals.size() == selected.majority) {
if (proposals.size() == selected.assembly.size()) {
log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority,
nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId());
transitions.certified();
Expand All @@ -424,11 +424,13 @@ public void checkAssembly() {
}

public void checkViews() {
countdown.set(-1);
vote();
}

@Override
public void chill() {
countdown.set(-1);
if (ViewAssembly.this.checkAssembly()) {
transitions.certified();
} else {
Expand All @@ -441,6 +443,15 @@ public void complete() {
ViewAssembly.this.complete();
}

@Override
public void convened() {
if (viewProposals.size() == params().context().getRingCount()) {
transitions.proposed();
} else {
countdown.set(2);
}
}

@Override
public void failed() {
view.onFailure();
Expand All @@ -456,5 +467,12 @@ public void finish() {
public void publishViews() {
propose();
}

@Override
public void vibeCheck() {
if (ViewAssembly.this.checkAssembly()) {
transitions.certified();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ public interface Reconfiguration {

void complete();

void convened();

void failed();

void finish();

void publishViews();

void vibeCheck();

enum Reconfigure implements Transitions {
AWAIT_ASSEMBLY {
// Publish the Views of this node
Expand All @@ -37,11 +41,27 @@ public void publish() {
context().publishViews();
}

// We have a majority of members submitting view proposals
// We have a >= majority submitting view proposals
@Override
public Transitions proposed() {
return CONVIENE;
}
}, CONVIENE {
@Override
public Transitions countdownCompleted() {
return proposed();
}

// We have a >= majority of members submitting view proposals
@Override
public Transitions proposed() {
return VIEW_AGREEMENT;
}

@Entry
public void conviene() {
context().convened();
}
}, CERTIFICATION {
// We have a full complement of the committee view proposals
@Override
Expand Down Expand Up @@ -96,6 +116,17 @@ public Transitions certified() {
return CERTIFICATION;
}

@Override
public Transitions checkAssembly() {
context().checkAssembly();
return null;
}

@Entry
public void vibin() {
context().vibeCheck();
}

// Check to see if we already have a full complement of committee Joins
@Entry
public void chillin() {
Expand Down Expand Up @@ -149,12 +180,13 @@ public Transitions viewAcquired() {
return GATHER;
}

// no op+
// no op
@Override
public Transitions proposed() {
return null;
}
}

}

interface Transitions extends FsmExecutor<Reconfiguration, Transitions> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -66,8 +66,9 @@ public class MtlsTest {
CARDINALITY = LARGE_TESTS ? 20 : 10;
}

private final List<Router> communications = new ArrayList<>();
private List<View> views;
private final List<Router> communications = new ArrayList<>();
private List<View> views;
private ExecutorService executor;

@BeforeAll
public static void beforeClass() throws Exception {
Expand Down Expand Up @@ -98,10 +99,14 @@ public void after() {
communications.forEach(e -> e.close(Duration.ofSeconds(1)));
communications.clear();
}
if (executor != null) {
executor.shutdown();
}
}

@Test
public void smoke() throws Exception {
executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
var parameters = Parameters.newBuilder().setMaximumTxfr(20).build();
final Duration duration = Duration.ofMillis(50);
var registry = new MetricRegistry();
Expand All @@ -128,7 +133,7 @@ public void smoke() throws Exception {
builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry));
CertificateWithPrivateKey certWithKey = certs.get(node.getId());
Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router(
builder, Executors.newVirtualThreadPerTaskExecutor());
builder, executor);
communications.add(comms);
return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, Verifiers.NONE, comms,
parameters, DigestAlgorithm.DEFAULT, metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,14 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* @author hal.hildebrand
*/
public interface RouterSupplier {
static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return newCachedThreadPool(corePoolSize, threadFactory, true);
}

static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory, boolean preStart) {
var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory);
if (preStart) {
threadPoolExecutor.prestartAllCoreThreads();
}
return threadPoolExecutor;
}

default Router router() {
return router(ServerConnectionCache.newBuilder(), RouterImpl::defaultServerLimit, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class UnsafeExecutors {

public static ExecutorService newVirtualThreadPerTaskExecutor() {
var executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.prestartAllCoreThreads();
return virtualThreadExecutor(executor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import com.codahale.metrics.MetricRegistry;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.context.DynamicContext;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
Expand All @@ -39,6 +36,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -64,17 +62,22 @@ public class RbcTest {
private final List<Router> communications = new ArrayList<>();
private final AtomicInteger totalReceived = new AtomicInteger(0);
private List<ReliableBroadcaster> messengers;
private ExecutorService executor;

@AfterEach
public void after() {
if (messengers != null) {
messengers.forEach(e -> e.stop());
}
communications.forEach(e -> e.close(Duration.ofMillis(0)));
if (executor != null) {
executor.shutdown();
}
}

@Test
public void broadcast() throws Exception {
executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
MetricRegistry registry = new MetricRegistry();

var entropy = SecureRandom.getInstance("SHA1PRNG");
Expand All @@ -96,11 +99,9 @@ public void broadcast() throws Exception {
final var prefix = UUID.randomUUID().toString();
final var authentication = ReliableBroadcaster.defaultMessageAdapter(context, DigestAlgorithm.DEFAULT);
messengers = members.stream().map(node -> {
var comms = new LocalServer(prefix, node).router(ServerConnectionCache.newBuilder()
.setTarget(30)
.setMetrics(
new ServerConnectionCacheMetricsImpl(
registry)));
var comms = new LocalServer(prefix, node).router(
ServerConnectionCache.newBuilder().setTarget(30).setMetrics(new ServerConnectionCacheMetricsImpl(registry)),
executor);
communications.add(comms);
comms.start();
return new ReliableBroadcaster(context, node, parameters.build(), comms, metrics, authentication);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P
.protocolNegotiator(
new DomainSocketNegotiatorHandler.DomainSocketNegotiator(
IMPL))
.executor(Executors.newVirtualThreadPerTaskExecutor())
.withChildOption(ChannelOption.TCP_NODELAY, true)
.channelType(IMPL.getServerDomainSocketChannelClass())
.workerEventLoopGroup(portalEventLoopGroup)
Expand All @@ -93,6 +94,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P
outerContextEndpoint = new DomainSocketAddress(
communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile());
outerContextService = NettyServerBuilder.forAddress(outerContextEndpoint)
.executor(Executors.newVirtualThreadPerTaskExecutor())
.protocolNegotiator(
new DomainSocketNegotiatorHandler.DomainSocketNegotiator(IMPL))
.withChildOption(ChannelOption.TCP_NODELAY, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public void before() throws Exception {
members.stream().filter(s -> s != testSubject).forEach(s -> context.activate(s));
final var prefix = UUID.randomUUID().toString();
routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> {
var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30));
var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30),
executor);
return localRouter;
}));
routers.put(testSubject.getId(),
Expand Down

0 comments on commit 865b6bf

Please sign in to comment.