Skip to content

Commit

Permalink
Use BFT subset based on diadem.compact for voting/majority
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed May 12, 2024
1 parent c10a491 commit 92b62ca
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static class Builder {
/**
* Maximum pending joins
*/
private int maxPending = 15;
private int maxPending = 200;
/**
* Minimum cardinality for bloom filters
*/
Expand Down
23 changes: 14 additions & 9 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,15 @@ void finalizeViewChange() {
return;
}
viewChange(() -> {
final var cardinality = context.memberCount();
final var superMajority = cardinality - ((cardinality - 1) / 4);
if (observations.size() < superMajority) {
log.trace("Do not have super majority: {} required: {} for: {} on: {}", observations.size(),
superMajority, currentView(), node.getId());
final var majority = context.size() == 1 ? 1 : context.majority();
if (observations.size() < majority) {
log.trace("Do not have majority: {} required: {} observers: {} for: {} on: {}", observations.size(),
majority, viewManagement.observers.stream().toList(), currentView(), node.getId());
scheduleFinalizeViewChange(2);
return;
}
log.info("Finalizing view change: {} required: {} observers: {} for: {} on: {}", context.getId(), majority,
viewManagement.observers.stream().toList(), currentView(), node.getId());
HashMultiset<Ballot> ballots = HashMultiset.create();
observations.values().forEach(vc -> {
final var leaving = new ArrayList<>(
Expand All @@ -365,16 +366,16 @@ void finalizeViewChange() {
.stream()
.max(Ordering.natural().onResultOf(Multiset.Entry::getCount))
.orElse(null);
if (max != null && max.getCount() >= superMajority) {
if (max != null && max.getCount() >= majority) {
log.info("Fast path consensus successful: {} required: {} cardinality: {} for: {} on: {}", max,
superMajority, viewManagement.cardinality(), currentView(), node.getId());
majority, viewManagement.cardinality(), currentView(), node.getId());
viewManagement.install(max.getElement());
observations.clear();
} else {
@SuppressWarnings("unchecked")
final var reversed = Comparator.comparing(e -> ((Entry<Ballot>) e).getCount()).reversed();
log.info("Fast path consensus failed: {}, required: {} cardinality: {} ballots: {} for: {} on: {}",
observations.size(), superMajority, viewManagement.cardinality(),
observations.size(), majority, viewManagement.cardinality(),
ballots.entrySet().stream().sorted(reversed).limit(1).toList(), currentView(), node.getId());
observations.clear();
}
Expand All @@ -395,7 +396,7 @@ Node getNode() {
}

boolean hasPendingRebuttals() {
return pendingRebuttals.isEmpty();
return !pendingRebuttals.isEmpty();
}

void initiate(SignedViewChange viewChange) {
Expand Down Expand Up @@ -823,6 +824,10 @@ private boolean add(NoteWrapper note) {
*/
private boolean add(SignedViewChange observation) {
final Digest observer = Digest.from(observation.getChange().getObserver());
if (!viewManagement.observers.contains(observer)) {
log.trace("Invalid observer: {} current: {} on: {}", observer, currentView(), node.getId());
return false;
}
final var inView = Digest.from(observation.getChange().getCurrent());
if (!currentView().equals(inView)) {
log.trace("Invalid view change: {} current: {} from {} on: {}", inView, currentView(), observer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.salesforce.apollo.fireflies.comm.gossip.Fireflies;
import com.salesforce.apollo.fireflies.proto.*;
import com.salesforce.apollo.fireflies.proto.Update.Builder;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.ReservoirSampler;
import com.salesforce.apollo.ring.SliceIterator;
import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier;
Expand Down Expand Up @@ -47,24 +48,25 @@
* @author hal.hildebrand
*/
public class ViewManagement {
private static final Logger log = LoggerFactory.getLogger(
ViewManagement.class);
final AtomicReference<HexBloom> diadem = new AtomicReference<>();
private final AtomicInteger attempt = new AtomicInteger();
private final Digest bootstrapView;
private final DynamicContext<Participant> context;
private final DigestAlgorithm digestAlgo;
private final ConcurrentMap<Digest, NoteWrapper> joins = new ConcurrentSkipListMap<>();
private final FireflyMetrics metrics;
private final Node node;
private final Parameters params;
private final Map<Digest, Consumer<Collection<SignedNote>>> pendingJoins = new ConcurrentSkipListMap<>();
private final View view;
private final AtomicReference<ViewChange> vote = new AtomicReference<>();
private final Lock joinLock = new ReentrantLock();
private final AtomicReference<Digest> currentView = new AtomicReference<>();
private boolean bootstrap;
private CompletableFuture<Void> onJoined;
private static final Logger log = LoggerFactory.getLogger(ViewManagement.class);

final AtomicReference<HexBloom> diadem = new AtomicReference<>();
final Set<Digest> observers = new TreeSet<>();
private final AtomicInteger attempt = new AtomicInteger();
private final Digest bootstrapView;
private final DynamicContext<Participant> context;
private final DigestAlgorithm digestAlgo;
private final ConcurrentMap<Digest, NoteWrapper> joins = new ConcurrentSkipListMap<>();
private final FireflyMetrics metrics;
private final Node node;
private final Parameters params;
private final Map<Digest, Consumer<Collection<SignedNote>>> pendingJoins = new ConcurrentSkipListMap<>();
private final View view;
private final AtomicReference<ViewChange> vote = new AtomicReference<>();
private final Lock joinLock = new ReentrantLock();
private final AtomicReference<Digest> currentView = new AtomicReference<>();
private volatile boolean bootstrap;
private volatile CompletableFuture<Void> onJoined;

ViewManagement(View view, DynamicContext<Participant> context, Parameters params, FireflyMetrics metrics, Node node,
DigestAlgorithm digestAlgo) {
Expand Down Expand Up @@ -356,11 +358,21 @@ boolean joined() {
}

/**
* start a view change if there's any offline members or joining members
* start a view change if there are any offline members or joining members
*/
void maybeViewChange() {
if (context.offlineCount() > 0 || !joins.isEmpty()) {
initiateViewChange();
if ((context.offlineCount() > 0 || !joins.isEmpty())) {
if (isObserver()) {
initiateViewChange();
} else {
// Use pending rebuttals as a proxy for stability
if (view.hasPendingRebuttals()) {
log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId());
view.scheduleViewChange(1); // 1 TTL round to check again
} else {
view.scheduleFinalizeViewChange();
}
}
} else {
view.scheduleViewChange();
}
Expand Down Expand Up @@ -476,18 +488,21 @@ void start(CompletableFuture<Void> onJoin, boolean bootstrap) {
* Initiate the view change
*/
private void initiateViewChange() {
assert isObserver() : "Not observer: " + node.getId();
view.stable(() -> {
if (vote.get() != null) {
log.trace("Vote already cast for: {} on: {}", currentView(), node.getId());
return;
}
// Use pending rebuttals as a proxy for stability
if (!view.hasPendingRebuttals()) {
if (view.hasPendingRebuttals()) {
log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId());
view.scheduleViewChange(1); // 1 TTL round to check again
return;
}
view.scheduleFinalizeViewChange();
log.trace("Initiating view change vote: {} joins: {} leaves: {} on: {}", currentView(), joins.size(),
view.streamShunned().count(), node.getId());
final var builder = ViewChange.newBuilder()
.setObserver(node.getId().toDigeste())
.setCurrent(currentView().toDigeste())
Expand All @@ -508,6 +523,13 @@ private void initiateViewChange() {
});
}

/**
* @return true if the receiver is part of the BFT Observers of this group
*/
private boolean isObserver() {
return observers.contains(node.getId());
}

private void joined(Collection<SignedNote> seedSet, Digest from, StreamObserver<Gateway> responseObserver,
Timer.Context timer) {
var unique = new HashSet<>(seedSet);
Expand Down Expand Up @@ -546,6 +568,10 @@ private void joined(Collection<SignedNote> seedSet, Digest from, StreamObserver<
private void setDiadem(final HexBloom hex) {
diadem.set(hex);
currentView.set(diadem.get().compactWrapped());
observers.clear();
context.bftSubset(hex.compact()).stream().map(Member::getId).forEach(observers::add);
log.info("View: {} set diadem: {} observers: {} on: {}", context.getId(), diadem.get().compactWrapped(),
observers.stream().toList(), node.getId());
}

record Ballot(Digest view, List<Digest> leaving, List<Digest> joining, int hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void smokin() throws Exception {
}

private void initialize() {
var parameters = Parameters.newBuilder().setMaxPending(5).setMaximumTxfr(5).build();
var parameters = Parameters.newBuilder().setMaxPending(20).setMaximumTxfr(5).build();
registry = new MetricRegistry();
node0Registry = new MetricRegistry();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void swarm() throws Exception {

private void initialize() {
var parameters = Parameters.newBuilder()
.setMaxPending(10)
.setMaxPending(50)
.setMaximumTxfr(20)
.setJoinRetries(30)
.setFpr(0.00000125)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ static int minMajority(int bias, double pByz, int cardinality) {
*/
Iterable<T> betweenSuccessor(int ring, T start, T stop);

/**
* @param hash - the point on the rings to determine successors
* @return the Set of Members constructed from the sucessors of the supplied hash on each of the receiver Context's
* rings
*/
default Set<Member> bftSubset(Digest hash) {
Set<Member> successors = new HashSet<>();
successors(hash, m -> {
if (successors.size() == getRingCount()) {
return false;
}
return successors.add(m);
});
return successors;
}

/**
* Maximum cardinality of this context
*/
Expand Down

0 comments on commit 92b62ca

Please sign in to comment.