Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc #197

Merged
merged 4 commits into from
May 13, 2024
Merged

misc #197

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ private void checkpointCompletion(int threshold, Initial mostRecent) {
store.put(checkpointView);
assert !checkpointView.height()
.equals(Unsigned.ulong(0)) : "Should not attempt when bootstrapping from genesis";
var diadem = HexBloom.from(checkpoint.block.getCheckpoint().getCrown());
var crown = HexBloom.from(checkpoint.block.getCheckpoint().getCrown());
log.info("Assembling from checkpoint: {}:{} crown: {} last cp: {} on: {}", checkpoint.height(), checkpoint.hash,
diadem.compactWrapped(), Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()),
crown.compactWrapped(), Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()),
params.member().getId());

CheckpointAssembler assembler = new CheckpointAssembler(params.gossipDuration(), checkpoint.height(),
Expand All @@ -130,10 +130,10 @@ private void checkpointCompletion(int threshold, Initial mostRecent) {

// assemble the checkpoint
checkpointAssembled = assembler.assemble(scheduler, params.gossipDuration()).whenComplete((cps, t) -> {
if (!cps.validate(diadem, Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()))) {
if (!cps.validate(crown, Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()))) {
throw new IllegalStateException("Cannot validate checkpoint: " + checkpoint.height());
}
log.info("Restored checkpoint: {} diadem: {} on: {}", checkpoint.height(), diadem.compactWrapped(),
log.info("Restored checkpoint: {} diadem: {} on: {}", checkpoint.height(), crown.compactWrapped(),
params.member().getId());
checkpointState = cps;
});
Expand Down
3 changes: 1 addition & 2 deletions fireflies/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ the necessity of a separate monitoring protocol.

## Current Limitations

The system is mostly complete. Currently, only the supermajority fast path consensus is implemented. Fall back consensus
to appear at a later date. Full bootstrap and join integration with Thoth is now complete and tested.
The Fireflies module is functionally complete. Full bootstrap and join integration with Thoth is complete and tested.

## Status

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static class Builder {
/**
* Maximum pending joins
*/
private int maxPending = 15;
private int maxPending = 200;
/**
* Minimum cardinality for bloom filters
*/
Expand Down
27 changes: 16 additions & 11 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,15 @@ void finalizeViewChange() {
return;
}
viewChange(() -> {
final var cardinality = context.memberCount();
final var superMajority = cardinality - ((cardinality - 1) / 4);
if (observations.size() < superMajority) {
log.trace("Do not have super majority: {} required: {} for: {} on: {}", observations.size(),
superMajority, currentView(), node.getId());
final var majority = context.size() == 1 ? 1 : context.majority();
if (observations.size() < majority) {
log.trace("Do not have majority: {} required: {} observers: {} for: {} on: {}", observations.size(),
majority, viewManagement.observers.stream().toList(), currentView(), node.getId());
scheduleFinalizeViewChange(2);
return;
}
log.info("Finalizing view change: {} required: {} observers: {} for: {} on: {}", context.getId(), majority,
viewManagement.observers.stream().toList(), currentView(), node.getId());
HashMultiset<Ballot> ballots = HashMultiset.create();
observations.values().forEach(vc -> {
final var leaving = new ArrayList<>(
Expand All @@ -365,16 +366,16 @@ void finalizeViewChange() {
.stream()
.max(Ordering.natural().onResultOf(Multiset.Entry::getCount))
.orElse(null);
if (max != null && max.getCount() >= superMajority) {
log.info("Fast path consensus successful: {} required: {} cardinality: {} for: {} on: {}", max,
superMajority, viewManagement.cardinality(), currentView(), node.getId());
if (max != null && max.getCount() >= majority) {
log.info("View consensus successful: {} required: {} cardinality: {} for: {} on: {}", max, majority,
viewManagement.cardinality(), currentView(), node.getId());
viewManagement.install(max.getElement());
observations.clear();
} else {
@SuppressWarnings("unchecked")
final var reversed = Comparator.comparing(e -> ((Entry<Ballot>) e).getCount()).reversed();
log.info("Fast path consensus failed: {}, required: {} cardinality: {} ballots: {} for: {} on: {}",
observations.size(), superMajority, viewManagement.cardinality(),
log.info("View consensus failed: {}, required: {} cardinality: {} ballots: {} for: {} on: {}",
observations.size(), majority, viewManagement.cardinality(),
ballots.entrySet().stream().sorted(reversed).limit(1).toList(), currentView(), node.getId());
observations.clear();
}
Expand All @@ -395,7 +396,7 @@ Node getNode() {
}

boolean hasPendingRebuttals() {
return pendingRebuttals.isEmpty();
return !pendingRebuttals.isEmpty();
}

void initiate(SignedViewChange viewChange) {
Expand Down Expand Up @@ -823,6 +824,10 @@ private boolean add(NoteWrapper note) {
*/
private boolean add(SignedViewChange observation) {
final Digest observer = Digest.from(observation.getChange().getObserver());
if (!viewManagement.observers.contains(observer)) {
log.trace("Invalid observer: {} current: {} on: {}", observer, currentView(), node.getId());
return false;
}
final var inView = Digest.from(observation.getChange().getCurrent());
if (!currentView().equals(inView)) {
log.trace("Invalid view change: {} current: {} from {} on: {}", inView, currentView(), observer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.salesforce.apollo.fireflies.comm.gossip.Fireflies;
import com.salesforce.apollo.fireflies.proto.*;
import com.salesforce.apollo.fireflies.proto.Update.Builder;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.ReservoirSampler;
import com.salesforce.apollo.ring.SliceIterator;
import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier;
Expand Down Expand Up @@ -47,24 +48,25 @@
* @author hal.hildebrand
*/
public class ViewManagement {
private static final Logger log = LoggerFactory.getLogger(
ViewManagement.class);
final AtomicReference<HexBloom> diadem = new AtomicReference<>();
private final AtomicInteger attempt = new AtomicInteger();
private final Digest bootstrapView;
private final DynamicContext<Participant> context;
private final DigestAlgorithm digestAlgo;
private final ConcurrentMap<Digest, NoteWrapper> joins = new ConcurrentSkipListMap<>();
private final FireflyMetrics metrics;
private final Node node;
private final Parameters params;
private final Map<Digest, Consumer<Collection<SignedNote>>> pendingJoins = new ConcurrentSkipListMap<>();
private final View view;
private final AtomicReference<ViewChange> vote = new AtomicReference<>();
private final Lock joinLock = new ReentrantLock();
private final AtomicReference<Digest> currentView = new AtomicReference<>();
private boolean bootstrap;
private CompletableFuture<Void> onJoined;
private static final Logger log = LoggerFactory.getLogger(ViewManagement.class);

final AtomicReference<HexBloom> diadem = new AtomicReference<>();
final Set<Digest> observers = new TreeSet<>();
private final AtomicInteger attempt = new AtomicInteger();
private final Digest bootstrapView;
private final DynamicContext<Participant> context;
private final DigestAlgorithm digestAlgo;
private final ConcurrentMap<Digest, NoteWrapper> joins = new ConcurrentSkipListMap<>();
private final FireflyMetrics metrics;
private final Node node;
private final Parameters params;
private final Map<Digest, Consumer<Collection<SignedNote>>> pendingJoins = new ConcurrentSkipListMap<>();
private final View view;
private final AtomicReference<ViewChange> vote = new AtomicReference<>();
private final Lock joinLock = new ReentrantLock();
private final AtomicReference<Digest> currentView = new AtomicReference<>();
private volatile boolean bootstrap;
private volatile CompletableFuture<Void> onJoined;

ViewManagement(View view, DynamicContext<Participant> context, Parameters params, FireflyMetrics metrics, Node node,
DigestAlgorithm digestAlgo) {
Expand Down Expand Up @@ -356,11 +358,21 @@ boolean joined() {
}

/**
* start a view change if there's any offline members or joining members
* start a view change if there are any offline members or joining members
*/
void maybeViewChange() {
if (context.offlineCount() > 0 || !joins.isEmpty()) {
initiateViewChange();
if ((context.offlineCount() > 0 || !joins.isEmpty())) {
if (isObserver()) {
initiateViewChange();
} else {
// Use pending rebuttals as a proxy for stability
if (view.hasPendingRebuttals()) {
log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId());
view.scheduleViewChange(1); // 1 TTL round to check again
} else {
view.scheduleFinalizeViewChange();
}
}
} else {
view.scheduleViewChange();
}
Expand Down Expand Up @@ -476,18 +488,21 @@ void start(CompletableFuture<Void> onJoin, boolean bootstrap) {
* Initiate the view change
*/
private void initiateViewChange() {
assert isObserver() : "Not observer: " + node.getId();
view.stable(() -> {
if (vote.get() != null) {
log.trace("Vote already cast for: {} on: {}", currentView(), node.getId());
return;
}
// Use pending rebuttals as a proxy for stability
if (!view.hasPendingRebuttals()) {
if (view.hasPendingRebuttals()) {
log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId());
view.scheduleViewChange(1); // 1 TTL round to check again
return;
}
view.scheduleFinalizeViewChange();
log.trace("Initiating view change vote: {} joins: {} leaves: {} on: {}", currentView(), joins.size(),
view.streamShunned().count(), node.getId());
final var builder = ViewChange.newBuilder()
.setObserver(node.getId().toDigeste())
.setCurrent(currentView().toDigeste())
Expand All @@ -508,6 +523,13 @@ private void initiateViewChange() {
});
}

/**
* @return true if the receiver is part of the BFT Observers of this group
*/
private boolean isObserver() {
return observers.contains(node.getId());
}

private void joined(Collection<SignedNote> seedSet, Digest from, StreamObserver<Gateway> responseObserver,
Timer.Context timer) {
var unique = new HashSet<>(seedSet);
Expand Down Expand Up @@ -546,6 +568,10 @@ private void joined(Collection<SignedNote> seedSet, Digest from, StreamObserver<
private void setDiadem(final HexBloom hex) {
diadem.set(hex);
currentView.set(diadem.get().compactWrapped());
observers.clear();
context.bftSubset(hex.compact()).stream().map(Member::getId).forEach(observers::add);
log.info("View: {} set diadem: {} observers: {} on: {}", context.getId(), diadem.get().compactWrapped(),
observers.stream().toList(), node.getId());
}

record Ballot(Digest view, List<Digest> leaving, List<Digest> joining, int hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void smokin() throws Exception {
}

private void initialize() {
var parameters = Parameters.newBuilder().setMaxPending(5).setMaximumTxfr(5).build();
var parameters = Parameters.newBuilder().setMaxPending(20).setMaximumTxfr(5).build();
registry = new MetricRegistry();
node0Registry = new MetricRegistry();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void swarm() throws Exception {

private void initialize() {
var parameters = Parameters.newBuilder()
.setMaxPending(10)
.setMaxPending(50)
.setMaximumTxfr(20)
.setJoinRetries(30)
.setFpr(0.00000125)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ static int minMajority(int bias, double pByz, int cardinality) {
*/
Iterable<T> betweenSuccessor(int ring, T start, T stop);

/**
* @param hash - the point on the rings to determine successors
* @return the Set of Members constructed from the sucessors of the supplied hash on each of the receiver Context's
* rings
*/
default Set<Member> bftSubset(Digest hash) {
Set<Member> successors = new HashSet<>();
successors(hash, m -> {
if (successors.size() == getRingCount()) {
return false;
}
return successors.add(m);
});
return successors;
}

/**
* Maximum cardinality of this context
*/
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<dropwizard.version>4.0.5</dropwizard.version>
<h2.version>2.2.220</h2.version>
<jooq.version>3.17.2</jooq.version>
<bc.version>1.74</bc.version>
<bc.version>1.78</bc.version>
<logback.version>1.4.12</logback.version>
<grpc.version>1.62.2</grpc.version>
<protobuf.version>3.25.3</protobuf.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,14 @@ public CertificateWithPrivateKey provision(Instant validFrom, Duration valid, Li

var signature = signer.sign(qb64(new BasicIdentifier(keyPair.getPublic())));

var dn = new BcX500NameDnImpl(String.format("UID=%s, DC=%s", Base64.getUrlEncoder()
.encodeToString(
(getState().getIdentifier()
.toIdent()
.toByteArray())),
Base64.getUrlEncoder()
.encodeToString(signature.toSig().toByteArray())));
var formatted = String.format("UID=%s, DC=%s", Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(
(getState().getIdentifier().toIdent().toByteArray())),
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(signature.toSig().toByteArray()));
var dn = new BcX500NameDnImpl(formatted);

return new CertificateWithPrivateKey(
Certificates.selfSign(false, dn, keyPair, validFrom, validFrom.plus(valid), extensions),
Expand Down
Loading