Skip to content

Commit

Permalink
Believe the view Reconfiguration is now fixed.
Browse files Browse the repository at this point in the history
Significant work on join protocol.  Added new Chillin' state to ensure we get full membership through slow joiners.
  • Loading branch information
Hellblazer committed Jun 6, 2024
1 parent 1d540a6 commit 6a19047
Show file tree
Hide file tree
Showing 20 changed files with 239 additions and 155 deletions.
11 changes: 0 additions & 11 deletions choam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,4 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<reuseForks>false</reuseForks>
</configuration>
</plugin>
</plugins>
</build>
</project>
204 changes: 132 additions & 72 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java

Large diffs are not rendered by default.

41 changes: 27 additions & 14 deletions choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ public Map<Digest, Join> getSlate() {

public void joined(SignedViewMember viewMember) {
final var mid = Digest.from(viewMember.getVm().getId());
if (!validate(mid, viewMember)) {
return;
}
joins.put(mid, SignedJoin.newBuilder()
.setMember(params().member().getId().toDigeste())
.setJoin(viewMember)
Expand Down Expand Up @@ -219,7 +216,7 @@ void join(List<SignedViewMember> joins) {
proposals.put(mid, svm);
}
}
checkAssembly();
transitions.checkAssembly();
}

void newEpoch() {
Expand All @@ -237,15 +234,15 @@ private Map<Digest, Member> assemblyOf(List<Digeste> committee) {
.collect(Collectors.toMap(Member::getId, m -> m));
}

private void checkAssembly() {
private boolean checkAssembly() {
if (selected == null) {
return;
return false;
}
if (proposals.size() == selected.majority) {
transitions.certified();
} else if (proposals.size() >= selected.majority) {
transitions.gathered();
return true;
}
return false;
}

private Parameters params() {
Expand Down Expand Up @@ -405,24 +402,40 @@ private class Recon implements Reconfiguration {
@Override
public void certify() {
if (proposals.size() == selected.majority) {
log.debug("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority,
nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId());
log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority,
nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId());
transitions.certified();
} else {
countdown.set(3);
log.debug("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority,
proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId());
countdown.set(4);
log.info("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority,
proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId());
}
}

public void checkAssembly() {
ViewAssembly.this.checkAssembly();
if (ViewAssembly.this.checkAssembly()) {
return;
}
if (proposals.size() >= selected.majority) {
transitions.chill();
} else {
log.info("Check assembly: {} on: {}", proposals.size(), params().member().getId());
}
}

public void checkViews() {
vote();
}

@Override
public void chill() {
if (ViewAssembly.this.checkAssembly()) {
transitions.certified();
} else {
countdown.set(2);
}
}

@Override
public void complete() {
ViewAssembly.this.complete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
package com.salesforce.apollo.choam.comm;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Empty;
import com.salesforce.apollo.archipelago.Link;
import com.salesforce.apollo.choam.proto.*;
Expand Down Expand Up @@ -47,8 +49,11 @@ public Member getMember() {
}

@Override
public Empty join(SignedViewMember join) {
return service.join(join, member.getId());
public ListenableFuture<Empty> join(SignedViewMember join) {
var j = service.join(join, member.getId());
SettableFuture<Empty> sf = SettableFuture.create();
sf.set(j);
return sf;
}

@Override
Expand All @@ -64,7 +69,7 @@ public Initial sync(Synchronize sync) {

Blocks fetchViewChain(BlockReplication replication);

Empty join(SignedViewMember join);
ListenableFuture<Empty> join(SignedViewMember join);

Initial sync(Synchronize sync);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package com.salesforce.apollo.choam.comm;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
Expand All @@ -20,12 +21,14 @@ public class TerminalClient implements Terminal {

private final ManagedServerChannel channel;
private final TerminalGrpc.TerminalBlockingStub client;
private final TerminalGrpc.TerminalFutureStub asyncClient;
@SuppressWarnings("unused")
private final ChoamMetrics metrics;

public TerminalClient(ManagedServerChannel channel, ChoamMetrics metrics) {
this.channel = channel;
this.client = channel.wrap(TerminalGrpc.newBlockingStub(channel));
this.asyncClient = channel.wrap(TerminalGrpc.newFutureStub(channel));
this.metrics = metrics;
}

Expand Down Expand Up @@ -60,8 +63,8 @@ public Member getMember() {
}

@Override
public Empty join(SignedViewMember vm) {
return client.join(vm);
public ListenableFuture<Empty> join(SignedViewMember vm) {
return asyncClient.join(vm);
}

public void release() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public interface Reconfiguration {

void checkViews();

void chill();

void complete();

void failed();
Expand Down Expand Up @@ -60,10 +62,9 @@ public void certify() {
context().certify();
}
}, GATHER {
// We have a majority of the new committee Joins
@Override
public Transitions gathered() {
return CERTIFICATION;
public Transitions chill() {
return CHILLIN;
}

// We have a full complement of the new committee Joins
Expand All @@ -77,6 +78,29 @@ public Transitions certified() {
public void gather() {
context().checkAssembly();
}

@Override
public Transitions checkAssembly() {
context().checkAssembly();
return null;
}
}, CHILLIN {
@Override
public Transitions countdownCompleted() {
return certified();
}

// We have what we have
@Override
public Transitions certified() {
return CERTIFICATION;
}

// Check to see if we already have a full complement of committee Joins
@Entry
public void chillin() {
context().chill();
}
}, PROTOCOL_FAILURE {
@Override
public Transitions certified() {
Expand Down Expand Up @@ -139,6 +163,14 @@ default Transitions certified() {
throw fsm().invalidTransitionOn();
}

default Transitions checkAssembly() {
throw fsm().invalidTransitionOn();
}

default Transitions chill() {
throw fsm().invalidTransitionOn();
}

default Transitions complete() {
throw fsm().invalidTransitionOn();
}
Expand All @@ -151,10 +183,6 @@ default Transitions failed() {
return Reconfigure.PROTOCOL_FAILURE;
}

default Transitions gathered() {
throw fsm().invalidTransitionOn();
}

default Transitions proposed() {
throw fsm().invalidTransitionOn();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.UnsafeExecutors;
import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DynamicContext;
Expand Down Expand Up @@ -64,10 +63,9 @@ public void setUp() throws Exception {
.toList();

final var prefix = UUID.randomUUID().toString();
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
routers = members.stream()
.collect(Collectors.toMap(m -> m, m -> new LocalServer(prefix, m).router(
ServerConnectionCache.newBuilder().setTarget(cardinality * 2), executor)));
ServerConnectionCache.newBuilder().setTarget(cardinality * 2))));

var template = Parameters.newBuilder()
.setGenerateGenesis(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.UnsafeExecutors;
import com.salesforce.apollo.choam.CHOAM.TransactionExecutor;
import com.salesforce.apollo.choam.Parameters.BootstrapParameters;
import com.salesforce.apollo.choam.Parameters.ProducerParameters;
Expand Down Expand Up @@ -171,10 +170,9 @@ public SigningMember initialize(int checkpointBlockSize, int cardinality) throws
SigningMember testSubject = new ControlledIdentifierMember(stereotomy.newIdentifier());

final var prefix = UUID.randomUUID().toString();
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
routers = members.stream()
.collect(Collectors.toMap(Member::getId, m -> new LocalServer(prefix, m).router(
ServerConnectionCache.newBuilder().setTarget(cardinality), executor)));
ServerConnectionCache.newBuilder().setTarget(cardinality))));
routers.put(testSubject.getId(), new LocalServer(prefix, testSubject).router(
ServerConnectionCache.newBuilder().setTarget(cardinality)));
choams = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl;
import com.salesforce.apollo.choam.CHOAM.TransactionExecutor;
import com.salesforce.apollo.choam.Parameters.ProducerParameters;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
Expand Down Expand Up @@ -121,12 +124,11 @@ public void before() throws Exception {
.toList();
var context = new StaticContext<>(origin, 0.2, members, 3);
final var prefix = UUID.randomUUID().toString();
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
routers = members.stream()
.collect(Collectors.toMap(m -> m.getId(), m -> new LocalServer(prefix, m).router(
ServerConnectionCache.newBuilder()
.setMetrics(new ServerConnectionCacheMetricsImpl(registry))
.setTarget(CARDINALITY), executor)));
.setTarget(CARDINALITY))));
choams = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> {
var recording = new AtomicInteger();
blocks.put(m.getId(), recording);
Expand Down
2 changes: 1 addition & 1 deletion choam/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.chiralbehaviors.tron" level="info" additivity="false">
<logger name="com.chiralbehaviors.tron" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ private void initialize() {
AtomicBoolean frist = new AtomicBoolean(true);
final var prefix = UUID.randomUUID().toString();
final var gatewayPrefix = UUID.randomUUID().toString();
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
views = members.values().stream().map(node -> {
DynamicContext<Participant> context = ctxBuilder.build();
FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(),
Expand All @@ -289,8 +288,8 @@ private void initialize() {
.setMetrics(
new ServerConnectionCacheMetricsImpl(
frist.getAndSet(false)
? node0Registry : registry)),
executor);
? node0Registry
: registry)));
comms.start();
communications.add(comms);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ private void initialize() {
AtomicBoolean frist = new AtomicBoolean(true);
final var prefix = UUID.randomUUID().toString();
final var gatewayPrefix = UUID.randomUUID().toString();
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
views = members.values().stream().map(node -> {
DynamicContext<Participant> context = ctxBuilder.build();
FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(),
Expand All @@ -242,8 +241,8 @@ private void initialize() {
.setMetrics(
new ServerConnectionCacheMetricsImpl(
frist.getAndSet(false)
? node0Registry : registry)),
executor);
? node0Registry
: registry)));
comms.start();
communications.add(comms);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,10 @@ public static Parameters.Builder newBuilder() {
public static class Builder implements Cloneable {
private int bufferSize = 1500;
private int dedupBufferSize = 100;
private double dedupFpr = Math.pow(10, -9);
private double dedupFpr = Math.pow(10, -6);
private int deliveredCacheSize = 100;
private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT;
private double falsePositiveRate = 0.00125;
private double falsePositiveRate = 0.0000125;
private int maxMessages = 500;

public Parameters build() {
Expand Down
7 changes: 0 additions & 7 deletions model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<reuseForks>false</reuseForks>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
Expand Down
Loading

0 comments on commit 6a19047

Please sign in to comment.