From 1d540a6485c33044f20a4f4500f08f342eccf1fb Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Wed, 5 Jun 2024 08:08:25 -0700 Subject: [PATCH] cleaner committee join, revert ReliableBroadcaster fpr changes The bloom window needs a very low FPR --- .../com/salesforce/apollo/choam/CHOAM.java | 47 +++++++++---------- .../com/salesforce/apollo/choam/Producer.java | 6 +-- choam/src/test/resources/logback-test.xml | 2 +- .../messaging/rbc/ReliableBroadcaster.java | 6 +-- pom.xml | 4 +- 5 files changed, 30 insertions(+), 35 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index dd2c43281..449305faf 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -35,7 +35,6 @@ import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.MessageAdapter; import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.Msg; import com.salesforce.apollo.messaging.proto.AgedMessageOrBuilder; -import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; import io.grpc.StatusRuntimeException; import org.h2.mvstore.MVMap; @@ -47,7 +46,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.security.KeyPair; -import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -99,7 +97,6 @@ public class CHOAM { private final TransSubmission txnSubmission = new TransSubmission(); private final AtomicReference view = new AtomicReference<>(); private final PendingViews pendingViews = new PendingViews(); - private final AtomicReference> join = new AtomicReference<>(); public CHOAM(Parameters params) { this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build()); @@ -711,10 +708,6 @@ private void process() { private void reconfigure(Digest hash, Reconfigure reconfigure) { log.info("Setting next view id: {} on: {}", hash, params.member().getId()); - var j = join.getAndSet(null); - if (j != null) { - j.cancel(true); - } nextViewId.set(hash); var pv = pendingViews.advance(); if (pv != null) { @@ -1282,10 +1275,10 @@ public Initial sync(Synchronize request, Digest from) { /** abstract class to maintain the common state */ private abstract class Administration implements Committee { - protected final Digest viewId; - - private final GroupIterator servers; - private final Map validators; + protected final Digest viewId; + private final GroupIterator servers; + private final Map validators; + private volatile JoinState ongoingJoin; public Administration(Map validators, Digest viewId) { this.validators = validators; @@ -1295,6 +1288,12 @@ public Administration(Map validators, Digest viewId) { @Override public void accept(HashedCertifiedBlock hb) { + final var oj = ongoingJoin; + ongoingJoin = null; + if (oj != null) { + oj.halt.set(true); + oj.joining.interrupt(); + } process(); } @@ -1384,30 +1383,25 @@ public boolean validate(HashedCertifiedBlock hb) { } private void join(View view) { - var joining = new CompletableFuture(); - if (!join.compareAndSet(null, joining)) { - log.info("Ongoing join of: {} should have been cancelled on: {}", Digest.from(view.getDiadem()), - params.member().getId()); - transitions.fail(); - return; + if (ongoingJoin != null) { + throw new IllegalStateException("Ongoing join should have been cancelled"); } log.info("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), params.member().getId()); var servers = new ConcurrentSkipListSet<>(validators.keySet()); - - var delay = Duration.ofMillis(Entropy.nextSecureInt(5)); var joined = new AtomicInteger(); + var halt = new AtomicBoolean(false); - Thread.ofPlatform().start(() -> { + ongoingJoin = new JoinState(halt, Thread.ofVirtual().start(Utils.wrapped(() -> { log.error("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), params.member().getId()); - while (!joining.isDone() && joined.get() < view.getMajority()) { + while (!halt.get() & joined.get() < view.getMajority()) { join(view, servers, joined); } log.info("Finishing join of: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), params.member().getId()); - joining.complete(null); - }); + ongoingJoin = null; + }, log()))); } private void join(View view, Collection servers, AtomicInteger joined) { @@ -1426,7 +1420,7 @@ private void join(View view, Collection servers, AtomicInteger joined) { var countdown = new CountDownLatch(servers.size()); servers.stream().map(comm::connect).filter(Objects::nonNull).forEach(t -> { - Thread.ofVirtual().start(() -> { + Thread.ofVirtual().start(Utils.wrapped(() -> { try { t.join(svm); servers.remove(t.getMember()); @@ -1442,7 +1436,7 @@ private void join(View view, Collection servers, AtomicInteger joined) { log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", t.getMember().getId(), nextViewId, Digest.from(view.getDiadem()), params.member().getId(), throwable); } - }); + }, log())); }); try { countdown.await(2, TimeUnit.SECONDS); @@ -1450,6 +1444,9 @@ private void join(View view, Collection servers, AtomicInteger joined) { Thread.currentThread().interrupt(); } } + + private record JoinState(AtomicBoolean halt, Thread joining) { + } } /** a member of the current committee */ diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 2f1623a08..19bc5580f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -234,7 +234,7 @@ private Parameters params() { private void processAssemblies(List aggregate) { var aggs = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList(); - log.trace("Consuming {} assemblies from {} units on: {}", aggs.size(), aggregate.size(), + log.trace("Consuming: {} assemblies from: {} units on: {}", aggs.size(), aggregate.size(), params().member().getId()); assembly.assemble(aggs); } @@ -313,14 +313,14 @@ private void publish(PendingBlock p) { } private void publish(PendingBlock p, boolean beacon) { - assert p.witnesses.size() >= params().majority() : "Publishing non majority block"; + assert p.witnesses.size() >= params().majority() : "Attempt to publish non majority block"; var publish = p.published.compareAndSet(false, true); if (!publish && !beacon) { log.trace("Already published: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(), p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId()); return; } - log.trace("Publishing {}pending: {} hash: {} height: {} witnesses: {} on: {}", beacon ? "(beacon) " : "", + log.trace("Publishing {}: {} hash: {} height: {} witnesses: {} on: {}", beacon ? "(beacon) " : "(pending)", p.block.block.getBodyCase(), p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId()); final var cb = CertifiedBlock.newBuilder() diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml index d5be6a052..9f1825d70 100644 --- a/choam/src/test/resources/logback-test.xml +++ b/choam/src/test/resources/logback-test.xml @@ -33,7 +33,7 @@ - + diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index ba02e114e..290f5299a 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -457,8 +457,7 @@ private class Buffer { private Buffer(int maxAge) { this.maxAge = maxAge; highWaterMark = (params.bufferSize - (int) (params.bufferSize + ((params.bufferSize) * 0.1))); - delivered = BloomWindow.create(params.dedupBufferSize, 1.0 / ((double) params.dedupBufferSize * 2.0), - Biff.Type.DIGEST); + delivered = BloomWindow.create(params.dedupBufferSize, params.dedupFpr, Biff.Type.DIGEST); } public void clear() { @@ -466,8 +465,7 @@ public void clear() { } public BloomFilter forReconcilliation() { - var biff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), params.bufferSize, - 1.0 / ((double) params.bufferSize * 2.0)); + var biff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), params.bufferSize, params.falsePositiveRate); state.keySet().forEach(k -> biff.add(k)); return biff; } diff --git a/pom.xml b/pom.xml index c4879bd2b..9e386ff13 100644 --- a/pom.xml +++ b/pom.xml @@ -783,8 +783,8 @@ 3.2.5 ${forks} - true - -Xmx10G -Xms4G + false + -Xmx10G -Xms100M -Djdk.tracePinnedThreads=full