Skip to content

Commit

Permalink
cleaner committee join, revert ReliableBroadcaster fpr changes
Browse files Browse the repository at this point in the history
The bloom window needs a very low FPR
  • Loading branch information
Hellblazer committed Jun 5, 2024
1 parent 310321b commit 1d540a6
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 35 deletions.
47 changes: 22 additions & 25 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.MessageAdapter;
import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.Msg;
import com.salesforce.apollo.messaging.proto.AgedMessageOrBuilder;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
import io.grpc.StatusRuntimeException;
import org.h2.mvstore.MVMap;
Expand All @@ -47,7 +46,6 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyPair;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -99,7 +97,6 @@ public class CHOAM {
private final TransSubmission txnSubmission = new TransSubmission();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final PendingViews pendingViews = new PendingViews();
private final AtomicReference<CompletableFuture<Void>> join = new AtomicReference<>();

public CHOAM(Parameters params) {
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
Expand Down Expand Up @@ -711,10 +708,6 @@ private void process() {

private void reconfigure(Digest hash, Reconfigure reconfigure) {
log.info("Setting next view id: {} on: {}", hash, params.member().getId());
var j = join.getAndSet(null);
if (j != null) {
j.cancel(true);
}
nextViewId.set(hash);
var pv = pendingViews.advance();
if (pv != null) {
Expand Down Expand Up @@ -1282,10 +1275,10 @@ public Initial sync(Synchronize request, Digest from) {

/** abstract class to maintain the common state */
private abstract class Administration implements Committee {
protected final Digest viewId;

private final GroupIterator servers;
private final Map<Member, Verifier> validators;
protected final Digest viewId;
private final GroupIterator servers;
private final Map<Member, Verifier> validators;
private volatile JoinState ongoingJoin;

public Administration(Map<Member, Verifier> validators, Digest viewId) {
this.validators = validators;
Expand All @@ -1295,6 +1288,12 @@ public Administration(Map<Member, Verifier> validators, Digest viewId) {

@Override
public void accept(HashedCertifiedBlock hb) {
final var oj = ongoingJoin;
ongoingJoin = null;
if (oj != null) {
oj.halt.set(true);
oj.joining.interrupt();
}
process();
}

Expand Down Expand Up @@ -1384,30 +1383,25 @@ public boolean validate(HashedCertifiedBlock hb) {
}

private void join(View view) {
var joining = new CompletableFuture<Void>();
if (!join.compareAndSet(null, joining)) {
log.info("Ongoing join of: {} should have been cancelled on: {}", Digest.from(view.getDiadem()),
params.member().getId());
transitions.fail();
return;
if (ongoingJoin != null) {
throw new IllegalStateException("Ongoing join should have been cancelled");
}
log.info("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
var servers = new ConcurrentSkipListSet<>(validators.keySet());

var delay = Duration.ofMillis(Entropy.nextSecureInt(5));
var joined = new AtomicInteger();
var halt = new AtomicBoolean(false);

Thread.ofPlatform().start(() -> {
ongoingJoin = new JoinState(halt, Thread.ofVirtual().start(Utils.wrapped(() -> {
log.error("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
while (!joining.isDone() && joined.get() < view.getMajority()) {
while (!halt.get() & joined.get() < view.getMajority()) {
join(view, servers, joined);
}
log.info("Finishing join of: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
joining.complete(null);
});
ongoingJoin = null;
}, log())));
}

private void join(View view, Collection<Member> servers, AtomicInteger joined) {
Expand All @@ -1426,7 +1420,7 @@ private void join(View view, Collection<Member> servers, AtomicInteger joined) {
var countdown = new CountDownLatch(servers.size());

servers.stream().map(comm::connect).filter(Objects::nonNull).forEach(t -> {
Thread.ofVirtual().start(() -> {
Thread.ofVirtual().start(Utils.wrapped(() -> {
try {
t.join(svm);
servers.remove(t.getMember());
Expand All @@ -1442,14 +1436,17 @@ private void join(View view, Collection<Member> servers, AtomicInteger joined) {
log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", t.getMember().getId(),
nextViewId, Digest.from(view.getDiadem()), params.member().getId(), throwable);
}
});
}, log()));
});
try {
countdown.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private record JoinState(AtomicBoolean halt, Thread joining) {
}
}

/** a member of the current committee */
Expand Down
6 changes: 3 additions & 3 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private Parameters params() {

private void processAssemblies(List<UnitData> aggregate) {
var aggs = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList();
log.trace("Consuming {} assemblies from {} units on: {}", aggs.size(), aggregate.size(),
log.trace("Consuming: {} assemblies from: {} units on: {}", aggs.size(), aggregate.size(),
params().member().getId());
assembly.assemble(aggs);
}
Expand Down Expand Up @@ -313,14 +313,14 @@ private void publish(PendingBlock p) {
}

private void publish(PendingBlock p, boolean beacon) {
assert p.witnesses.size() >= params().majority() : "Publishing non majority block";
assert p.witnesses.size() >= params().majority() : "Attempt to publish non majority block";
var publish = p.published.compareAndSet(false, true);
if (!publish && !beacon) {
log.trace("Already published: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(),
p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId());
return;
}
log.trace("Publishing {}pending: {} hash: {} height: {} witnesses: {} on: {}", beacon ? "(beacon) " : "",
log.trace("Publishing {}: {} hash: {} height: {} witnesses: {} on: {}", beacon ? "(beacon) " : "(pending)",
p.block.block.getBodyCase(), p.block.hash, p.block.height(), p.witnesses.values().size(),
params().member().getId());
final var cb = CertifiedBlock.newBuilder()
Expand Down
2 changes: 1 addition & 1 deletion choam/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.chiralbehaviors.tron" level="warn" additivity="false">
<logger name="com.chiralbehaviors.tron" level="info" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,17 +457,15 @@ private class Buffer {
private Buffer(int maxAge) {
this.maxAge = maxAge;
highWaterMark = (params.bufferSize - (int) (params.bufferSize + ((params.bufferSize) * 0.1)));
delivered = BloomWindow.create(params.dedupBufferSize, 1.0 / ((double) params.dedupBufferSize * 2.0),
Biff.Type.DIGEST);
delivered = BloomWindow.create(params.dedupBufferSize, params.dedupFpr, Biff.Type.DIGEST);
}

public void clear() {
state.clear();
}

public BloomFilter<Digest> forReconcilliation() {
var biff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), params.bufferSize,
1.0 / ((double) params.bufferSize * 2.0));
var biff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), params.bufferSize, params.falsePositiveRate);
state.keySet().forEach(k -> biff.add(k));
return biff;
}
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,8 @@
<version>3.2.5</version>
<configuration>
<forkCount>${forks}</forkCount>
<reuseForks>true</reuseForks>
<argLine>-Xmx10G -Xms4G</argLine>
<reuseForks>false</reuseForks>
<argLine>-Xmx10G -Xms100M</argLine>
<argLine>-Djdk.tracePinnedThreads=full</argLine>
</configuration>
</plugin>
Expand Down

0 comments on commit 1d540a6

Please sign in to comment.