diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java index 62b5cbafe..803dcfa44 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java @@ -43,7 +43,7 @@ public static class Builder { /** * Maximum pending joins */ - private int maxPending = 15; + private int maxPending = 200; /** * Minimum cardinality for bloom filters */ diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 7c9b702c8..7f48dfd1c 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -343,14 +343,15 @@ void finalizeViewChange() { return; } viewChange(() -> { - final var cardinality = context.memberCount(); - final var superMajority = cardinality - ((cardinality - 1) / 4); - if (observations.size() < superMajority) { - log.trace("Do not have super majority: {} required: {} for: {} on: {}", observations.size(), - superMajority, currentView(), node.getId()); + final var majority = context.size() == 1 ? 1 : context.majority(); + if (observations.size() < majority) { + log.trace("Do not have majority: {} required: {} observers: {} for: {} on: {}", observations.size(), + majority, viewManagement.observers.stream().toList(), currentView(), node.getId()); scheduleFinalizeViewChange(2); return; } + log.info("Finalizing view change: {} required: {} observers: {} for: {} on: {}", context.getId(), majority, + viewManagement.observers.stream().toList(), currentView(), node.getId()); HashMultiset ballots = HashMultiset.create(); observations.values().forEach(vc -> { final var leaving = new ArrayList<>( @@ -365,16 +366,16 @@ void finalizeViewChange() { .stream() .max(Ordering.natural().onResultOf(Multiset.Entry::getCount)) .orElse(null); - if (max != null && max.getCount() >= superMajority) { + if (max != null && max.getCount() >= majority) { log.info("Fast path consensus successful: {} required: {} cardinality: {} for: {} on: {}", max, - superMajority, viewManagement.cardinality(), currentView(), node.getId()); + majority, viewManagement.cardinality(), currentView(), node.getId()); viewManagement.install(max.getElement()); observations.clear(); } else { @SuppressWarnings("unchecked") final var reversed = Comparator.comparing(e -> ((Entry) e).getCount()).reversed(); log.info("Fast path consensus failed: {}, required: {} cardinality: {} ballots: {} for: {} on: {}", - observations.size(), superMajority, viewManagement.cardinality(), + observations.size(), majority, viewManagement.cardinality(), ballots.entrySet().stream().sorted(reversed).limit(1).toList(), currentView(), node.getId()); observations.clear(); } @@ -395,7 +396,7 @@ Node getNode() { } boolean hasPendingRebuttals() { - return pendingRebuttals.isEmpty(); + return !pendingRebuttals.isEmpty(); } void initiate(SignedViewChange viewChange) { @@ -823,6 +824,10 @@ private boolean add(NoteWrapper note) { */ private boolean add(SignedViewChange observation) { final Digest observer = Digest.from(observation.getChange().getObserver()); + if (!viewManagement.observers.contains(observer)) { + log.trace("Invalid observer: {} current: {} on: {}", observer, currentView(), node.getId()); + return false; + } final var inView = Digest.from(observation.getChange().getCurrent()); if (!currentView().equals(inView)) { log.trace("Invalid view change: {} current: {} from {} on: {}", inView, currentView(), observer, diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 9129c1ac8..7124e4fd3 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -19,6 +19,7 @@ import com.salesforce.apollo.fireflies.comm.gossip.Fireflies; import com.salesforce.apollo.fireflies.proto.*; import com.salesforce.apollo.fireflies.proto.Update.Builder; +import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.ReservoirSampler; import com.salesforce.apollo.ring.SliceIterator; import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; @@ -47,24 +48,25 @@ * @author hal.hildebrand */ public class ViewManagement { - private static final Logger log = LoggerFactory.getLogger( - ViewManagement.class); - final AtomicReference diadem = new AtomicReference<>(); - private final AtomicInteger attempt = new AtomicInteger(); - private final Digest bootstrapView; - private final DynamicContext context; - private final DigestAlgorithm digestAlgo; - private final ConcurrentMap joins = new ConcurrentSkipListMap<>(); - private final FireflyMetrics metrics; - private final Node node; - private final Parameters params; - private final Map>> pendingJoins = new ConcurrentSkipListMap<>(); - private final View view; - private final AtomicReference vote = new AtomicReference<>(); - private final Lock joinLock = new ReentrantLock(); - private final AtomicReference currentView = new AtomicReference<>(); - private boolean bootstrap; - private CompletableFuture onJoined; + private static final Logger log = LoggerFactory.getLogger(ViewManagement.class); + + final AtomicReference diadem = new AtomicReference<>(); + final Set observers = new TreeSet<>(); + private final AtomicInteger attempt = new AtomicInteger(); + private final Digest bootstrapView; + private final DynamicContext context; + private final DigestAlgorithm digestAlgo; + private final ConcurrentMap joins = new ConcurrentSkipListMap<>(); + private final FireflyMetrics metrics; + private final Node node; + private final Parameters params; + private final Map>> pendingJoins = new ConcurrentSkipListMap<>(); + private final View view; + private final AtomicReference vote = new AtomicReference<>(); + private final Lock joinLock = new ReentrantLock(); + private final AtomicReference currentView = new AtomicReference<>(); + private volatile boolean bootstrap; + private volatile CompletableFuture onJoined; ViewManagement(View view, DynamicContext context, Parameters params, FireflyMetrics metrics, Node node, DigestAlgorithm digestAlgo) { @@ -356,11 +358,21 @@ boolean joined() { } /** - * start a view change if there's any offline members or joining members + * start a view change if there are any offline members or joining members */ void maybeViewChange() { - if (context.offlineCount() > 0 || !joins.isEmpty()) { - initiateViewChange(); + if ((context.offlineCount() > 0 || !joins.isEmpty())) { + if (isObserver()) { + initiateViewChange(); + } else { + // Use pending rebuttals as a proxy for stability + if (view.hasPendingRebuttals()) { + log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId()); + view.scheduleViewChange(1); // 1 TTL round to check again + } else { + view.scheduleFinalizeViewChange(); + } + } } else { view.scheduleViewChange(); } @@ -476,18 +488,21 @@ void start(CompletableFuture onJoin, boolean bootstrap) { * Initiate the view change */ private void initiateViewChange() { + assert isObserver() : "Not observer: " + node.getId(); view.stable(() -> { if (vote.get() != null) { log.trace("Vote already cast for: {} on: {}", currentView(), node.getId()); return; } // Use pending rebuttals as a proxy for stability - if (!view.hasPendingRebuttals()) { + if (view.hasPendingRebuttals()) { log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId()); view.scheduleViewChange(1); // 1 TTL round to check again return; } view.scheduleFinalizeViewChange(); + log.trace("Initiating view change vote: {} joins: {} leaves: {} on: {}", currentView(), joins.size(), + view.streamShunned().count(), node.getId()); final var builder = ViewChange.newBuilder() .setObserver(node.getId().toDigeste()) .setCurrent(currentView().toDigeste()) @@ -508,6 +523,13 @@ private void initiateViewChange() { }); } + /** + * @return true if the receiver is part of the BFT Observers of this group + */ + private boolean isObserver() { + return observers.contains(node.getId()); + } + private void joined(Collection seedSet, Digest from, StreamObserver responseObserver, Timer.Context timer) { var unique = new HashSet<>(seedSet); @@ -546,6 +568,10 @@ private void joined(Collection seedSet, Digest from, StreamObserver< private void setDiadem(final HexBloom hex) { diadem.set(hex); currentView.set(diadem.get().compactWrapped()); + observers.clear(); + context.bftSubset(hex.compact()).stream().map(Member::getId).forEach(observers::add); + log.info("View: {} set diadem: {} observers: {} on: {}", context.getId(), diadem.get().compactWrapped(), + observers.stream().toList(), node.getId()); } record Ballot(Digest view, List leaving, List joining, int hash) { diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java index 1c94d5fa2..73f69cbc7 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java @@ -168,7 +168,7 @@ public void smokin() throws Exception { } private void initialize() { - var parameters = Parameters.newBuilder().setMaxPending(5).setMaximumTxfr(5).build(); + var parameters = Parameters.newBuilder().setMaxPending(20).setMaximumTxfr(5).build(); registry = new MetricRegistry(); node0Registry = new MetricRegistry(); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java index d3577be59..11d8c6440 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java @@ -197,7 +197,7 @@ public void swarm() throws Exception { private void initialize() { var parameters = Parameters.newBuilder() - .setMaxPending(10) + .setMaxPending(50) .setMaximumTxfr(20) .setJoinRetries(30) .setFpr(0.00000125) diff --git a/memberships/src/main/java/com/salesforce/apollo/context/Context.java b/memberships/src/main/java/com/salesforce/apollo/context/Context.java index 391f61bcc..ed43d7fa0 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/Context.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/Context.java @@ -122,6 +122,22 @@ static int minMajority(int bias, double pByz, int cardinality) { */ Iterable betweenSuccessor(int ring, T start, T stop); + /** + * @param hash - the point on the rings to determine successors + * @return the Set of Members constructed from the sucessors of the supplied hash on each of the receiver Context's + * rings + */ + default Set bftSubset(Digest hash) { + Set successors = new HashSet<>(); + successors(hash, m -> { + if (successors.size() == getRingCount()) { + return false; + } + return successors.add(m); + }); + return successors; + } + /** * Maximum cardinality of this context */