Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stable #208

Merged
merged 13 commits into from
Jun 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,

@Override
public void certify() {
if (slate.size() != nextAssembly.size()) {
log.info("Not certifying genesis for: {} slate incomplete: {} on: {}", view.context().getId(),
slate.keySet().stream().sorted().toList(), params().member().getId());
if (slate.size() < params().majority()) {
log.info("Not certifying genesis for: {} need: {} slate incomplete: {} on: {}", view.context().getId(),
params().majority(), slate.keySet().stream().sorted().toList(), params().member().getId());
return;
}
assert slate.size() == nextAssembly.size() : "Expected: %s members, slate: %s".formatted(nextAssembly.size(),
assert slate.size() >= params().majority() : "Expected: %s members, slate: %s".formatted(params().majority(),
slate.size());
reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(),
new NullBlock(
Expand Down Expand Up @@ -187,12 +187,12 @@ public void publish() {
log.trace("Cannot publish genesis, reconfiguration is NULL on: {}", params().member().getId());
return;
}
if (witnesses.size() < nextAssembly.size()) {
if (witnesses.size() < params().majority()) {
log.trace("Cannot publish genesis: {} with: {} witnesses on: {}", reconfiguration.hash, witnesses.size(),
params().member().getId());
return;
}
if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < nextAssembly.size()) {
if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < params().majority()) {
log.trace("Cannot publish genesis: {} with: {} joins on: {}", reconfiguration.hash,
reconfiguration.block.getGenesis().getInitialView().getJoinsCount(), params().member().getId());
return;
Expand Down
22 changes: 20 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private boolean checkAssembly() {
if (selected == null) {
return false;
}
if (proposals.size() == selected.majority) {
if (proposals.size() == selected.assembly.size()) {
transitions.certified();
return true;
}
Expand Down Expand Up @@ -401,7 +401,7 @@ public String toString() {
private class Recon implements Reconfiguration {
@Override
public void certify() {
if (proposals.size() == selected.majority) {
if (proposals.size() == selected.assembly.size()) {
log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority,
nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId());
transitions.certified();
Expand All @@ -424,11 +424,13 @@ public void checkAssembly() {
}

public void checkViews() {
countdown.set(-1);
vote();
}

@Override
public void chill() {
countdown.set(-1);
if (ViewAssembly.this.checkAssembly()) {
transitions.certified();
} else {
Expand All @@ -441,6 +443,15 @@ public void complete() {
ViewAssembly.this.complete();
}

@Override
public void convened() {
if (viewProposals.size() == params().context().getRingCount()) {
transitions.proposed();
} else {
countdown.set(2);
}
}

@Override
public void failed() {
view.onFailure();
Expand All @@ -456,5 +467,12 @@ public void finish() {
public void publishViews() {
propose();
}

@Override
public void vibeCheck() {
if (ViewAssembly.this.checkAssembly()) {
transitions.certified();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ public interface Reconfiguration {

void complete();

void convened();

void failed();

void finish();

void publishViews();

void vibeCheck();

enum Reconfigure implements Transitions {
AWAIT_ASSEMBLY {
// Publish the Views of this node
Expand All @@ -37,11 +41,27 @@ public void publish() {
context().publishViews();
}

// We have a majority of members submitting view proposals
// We have a >= majority submitting view proposals
@Override
public Transitions proposed() {
return CONVIENE;
}
}, CONVIENE {
@Override
public Transitions countdownCompleted() {
return proposed();
}

// We have a >= majority of members submitting view proposals
@Override
public Transitions proposed() {
return VIEW_AGREEMENT;
}

@Entry
public void conviene() {
context().convened();
}
}, CERTIFICATION {
// We have a full complement of the committee view proposals
@Override
Expand Down Expand Up @@ -96,6 +116,17 @@ public Transitions certified() {
return CERTIFICATION;
}

@Override
public Transitions checkAssembly() {
context().checkAssembly();
return null;
}

@Entry
public void vibin() {
context().vibeCheck();
}

// Check to see if we already have a full complement of committee Joins
@Entry
public void chillin() {
Expand Down Expand Up @@ -149,12 +180,13 @@ public Transitions viewAcquired() {
return GATHER;
}

// no op+
// no op
@Override
public Transitions proposed() {
return null;
}
}

}

interface Transitions extends FsmExecutor<Reconfiguration, Transitions> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -66,8 +66,9 @@ public class MtlsTest {
CARDINALITY = LARGE_TESTS ? 20 : 10;
}

private final List<Router> communications = new ArrayList<>();
private List<View> views;
private final List<Router> communications = new ArrayList<>();
private List<View> views;
private ExecutorService executor;

@BeforeAll
public static void beforeClass() throws Exception {
Expand Down Expand Up @@ -98,10 +99,14 @@ public void after() {
communications.forEach(e -> e.close(Duration.ofSeconds(1)));
communications.clear();
}
if (executor != null) {
executor.shutdown();
}
}

@Test
public void smoke() throws Exception {
executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
var parameters = Parameters.newBuilder().setMaximumTxfr(20).build();
final Duration duration = Duration.ofMillis(50);
var registry = new MetricRegistry();
Expand All @@ -128,7 +133,7 @@ public void smoke() throws Exception {
builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry));
CertificateWithPrivateKey certWithKey = certs.get(node.getId());
Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router(
builder, Executors.newVirtualThreadPerTaskExecutor());
builder, executor);
communications.add(comms);
return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, Verifiers.NONE, comms,
parameters, DigestAlgorithm.DEFAULT, metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,14 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* @author hal.hildebrand
*/
public interface RouterSupplier {
static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return newCachedThreadPool(corePoolSize, threadFactory, true);
}

static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory, boolean preStart) {
var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory);
if (preStart) {
threadPoolExecutor.prestartAllCoreThreads();
}
return threadPoolExecutor;
}

default Router router() {
return router(ServerConnectionCache.newBuilder(), RouterImpl::defaultServerLimit, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class UnsafeExecutors {

public static ExecutorService newVirtualThreadPerTaskExecutor() {
var executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.prestartAllCoreThreads();
return virtualThreadExecutor(executor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import com.codahale.metrics.MetricRegistry;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.context.DynamicContext;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
Expand All @@ -39,6 +36,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -64,17 +62,22 @@ public class RbcTest {
private final List<Router> communications = new ArrayList<>();
private final AtomicInteger totalReceived = new AtomicInteger(0);
private List<ReliableBroadcaster> messengers;
private ExecutorService executor;

@AfterEach
public void after() {
if (messengers != null) {
messengers.forEach(e -> e.stop());
}
communications.forEach(e -> e.close(Duration.ofMillis(0)));
if (executor != null) {
executor.shutdown();
}
}

@Test
public void broadcast() throws Exception {
executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
MetricRegistry registry = new MetricRegistry();

var entropy = SecureRandom.getInstance("SHA1PRNG");
Expand All @@ -96,11 +99,9 @@ public void broadcast() throws Exception {
final var prefix = UUID.randomUUID().toString();
final var authentication = ReliableBroadcaster.defaultMessageAdapter(context, DigestAlgorithm.DEFAULT);
messengers = members.stream().map(node -> {
var comms = new LocalServer(prefix, node).router(ServerConnectionCache.newBuilder()
.setTarget(30)
.setMetrics(
new ServerConnectionCacheMetricsImpl(
registry)));
var comms = new LocalServer(prefix, node).router(
ServerConnectionCache.newBuilder().setTarget(30).setMetrics(new ServerConnectionCacheMetricsImpl(registry)),
executor);
communications.add(comms);
comms.start();
return new ReliableBroadcaster(context, node, parameters.build(), comms, metrics, authentication);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P
.protocolNegotiator(
new DomainSocketNegotiatorHandler.DomainSocketNegotiator(
IMPL))
.executor(Executors.newVirtualThreadPerTaskExecutor())
.withChildOption(ChannelOption.TCP_NODELAY, true)
.channelType(IMPL.getServerDomainSocketChannelClass())
.workerEventLoopGroup(portalEventLoopGroup)
Expand All @@ -93,6 +94,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P
outerContextEndpoint = new DomainSocketAddress(
communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile());
outerContextService = NettyServerBuilder.forAddress(outerContextEndpoint)
.executor(Executors.newVirtualThreadPerTaskExecutor())
.protocolNegotiator(
new DomainSocketNegotiatorHandler.DomainSocketNegotiator(IMPL))
.withChildOption(ChannelOption.TCP_NODELAY, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public void before() throws Exception {
members.stream().filter(s -> s != testSubject).forEach(s -> context.activate(s));
final var prefix = UUID.randomUUID().toString();
routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> {
var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30));
var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30),
executor);
return localRouter;
}));
routers.put(testSubject.getId(),
Expand Down
Loading