Skip to content

Commit

Permalink
interim
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 22, 2024
1 parent eae25e7 commit 321724d
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,14 @@ private boolean complete(CompletableFuture<Redirect> redirect, Optional<Redirect
}

private boolean completeGateway(Participant member, CompletableFuture<Bound> gateway,
Optional<Gateway> futureSailor, HashMultiset<BootstrapTrust> trusts,
Optional<Gateway> futureSailor, HashMultiset<Bootstrapping> trusts,
Set<SignedNote> initialSeedSet, Digest v, int majority) {
if (futureSailor.isEmpty()) {
log.warn("No gateway returned from: {} on: {}", member.getId(), node.getId());
return true;
}
if (gateway.isDone()) {
log.warn("gateway is complete, ignoring from: {} on: {}", member.getId(), node.getId());
return false;
}

Expand All @@ -161,7 +163,7 @@ private boolean completeGateway(Participant member, CompletableFuture<Bound> gat
log.trace("Empty bootstrap trust in join returned from: {} on: {}", member.getId(), node.getId());
return true;
}
trusts.add(g.getTrust());
trusts.add(new Bootstrapping(g.getTrust()));
initialSeedSet.addAll(g.getInitialSeedSetList());
log.trace("Initial seed set count: {} view: {} from: {} on: {}", g.getInitialSeedSetCount(), v, member.getId(),
node.getId());
Expand All @@ -175,8 +177,14 @@ private boolean completeGateway(Participant member, CompletableFuture<Bound> gat
if (trust != null) {
validate(trust, gateway, initialSeedSet);
} else {
log.debug("Gateway received, trust count: {} majority: {} from: {} view: {} context: {} on: {}",
trusts.size(), majority, member.getId(), v, this.context.getId(), node.getId());
log.debug("Gateway received, trust count: {} majority: {} from: {} trusts: {} view: {} context: {} on: {}",
trusts.size(), majority, member.getId(), v, trusts.entrySet()
.stream()
.sorted()
.map(
e -> "%s x %s".formatted(e.getElement().diadem,
e.getCount()))
.toList(), this.context.getId(), node.getId());
}
return true;
}
Expand Down Expand Up @@ -255,7 +263,7 @@ private void join(Redirect redirect, Digest v, Duration duration) {
var regate = new AtomicReference<Runnable>();
var retries = new AtomicInteger();

HashMultiset<BootstrapTrust> trusts = HashMultiset.create();
HashMultiset<Bootstrapping> trusts = HashMultiset.create();
HashSet<SignedNote> initialSeedSet = new HashSet<>();

final var cardinality = redirect.getCardinality();
Expand Down Expand Up @@ -302,7 +310,8 @@ private void join(Redirect redirect, Digest v, Duration duration) {
return;
}
if (abandon.get() >= majority) {
log.debug("Abandoning Gateway view: {} reseeding on: {}", v, node.getId());
log.debug("Abandoning Gateway view: {} abandons: {} majority: {} reseeding on: {}", v,
abandon.get(), majority, node.getId());
seeding();
} else {
abandon.set(0);
Expand Down Expand Up @@ -345,13 +354,37 @@ private NoteWrapper seedFor(Seed seed) {
return new NoteWrapper(seedNote, digestAlgo);
}

private void validate(BootstrapTrust trust, CompletableFuture<Bound> gateway, Set<SignedNote> initialSeedSet) {
final var hexBloom = new HexBloom(trust.getDiadem());
private void validate(Bootstrapping trust, CompletableFuture<Bound> gateway, Set<SignedNote> initialSeedSet) {
if (gateway.complete(
new Bound(hexBloom, trust.getSuccessorsList().stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList(),
new Bound(trust.crown, trust.successors.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList(),
initialSeedSet.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList()))) {
log.info("Gateway acquired: {} context: {} on: {}", hexBloom.compactWrapped(), this.context.getId(),
node.getId());
log.info("Gateway acquired: {} context: {} on: {}", trust.diadem, this.context.getId(), node.getId());
}
}

private record Bootstrapping(Digest diadem, HexBloom crown, Set<SignedNote> successors) {
public Bootstrapping(BootstrapTrust trust) {
this(HexBloom.from(trust.getDiadem()), new HashSet<>(trust.getSuccessorsList()));
}

public Bootstrapping(HexBloom crown, Set<SignedNote> successors) {
this(crown.compact(), crown, new HashSet<>(successors));
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;

Bootstrapping that = (Bootstrapping) o;
return diadem.equals(that.diadem);
}

@Override
public int hashCode() {
return diadem.hashCode();
}
}

Expand Down
64 changes: 36 additions & 28 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -346,30 +346,42 @@ void finalizeViewChange() {
viewChange(() -> {
final var supermajority = context.getRingCount() * 3 / 4;
final var majority = context.size() == 1 ? 1 : supermajority;
if (observations.size() < majority) {
log.trace("Do not have majority: {} required: {} observers: {} for: {} on: {}", observations.size(),
majority, viewManagement.observersList(), currentView(), node.getId());
final var valid = observations.values()
.stream()
.filter(svc -> viewManagement.observers.contains(
Digest.from(svc.getChange().getObserver())))
.toList();
log.info("Finalize view change, observations: {} valid: {} observers: {} on: {}",
observations.values().stream().map(sv -> Digest.from(sv.getChange().getObserver())).toList(),
valid.size(), viewManagement.observersList(), node.getId());
observations.clear();
if (valid.size() < majority) {
log.info("Do not have majority: {} required: {} observers: {} for: {} on: {}", valid.size(), majority,
viewManagement.observersList(), currentView(), node.getId());
scheduleFinalizeViewChange(2);
return;
}
log.info("Finalizing view change: {} required: {} observers: {} for: {} on: {}", context.getId(), majority,
viewManagement.observersList(), currentView(), node.getId());
HashMultiset<Ballot> ballots = HashMultiset.create();
final var current = currentView();
observations.values()
.stream()
.filter(vc -> current.equals(Digest.from(vc.getChange().getCurrent())))
.forEach(vc -> {
final var leaving = new ArrayList<>(
vc.getChange().getLeavesList().stream().map(Digest::from).collect(Collectors.toSet()));
final var joining = new ArrayList<>(
vc.getChange().getJoinsList().stream().map(Digest::from).collect(Collectors.toSet()));
leaving.sort(Ordering.natural());
joining.sort(Ordering.natural());
ballots.add(
new Ballot(Digest.from(vc.getChange().getCurrent()), leaving, joining, digestAlgo));
});
observations.clear();
valid.stream().filter(vc -> current.equals(Digest.from(vc.getChange().getCurrent()))).forEach(vc -> {
final var leaving = vc.getChange()
.getLeavesList()
.stream()
.map(Digest::from)
.distinct()
.collect(Collectors.toCollection(ArrayList::new));
final var joining = vc.getChange()
.getJoinsList()
.stream()
.map(Digest::from)
.distinct()
.collect(Collectors.toCollection(ArrayList::new));
leaving.sort(Ordering.natural());
joining.sort(Ordering.natural());
ballots.add(new Ballot(Digest.from(vc.getChange().getCurrent()), leaving, joining, digestAlgo));
});
var max = ballots.entrySet()
.stream()
.max(Ordering.natural().onResultOf(Multiset.Entry::getCount))
Expand All @@ -382,7 +394,7 @@ void finalizeViewChange() {
@SuppressWarnings("unchecked")
final var reversed = Comparator.comparing(e -> ((Entry<Ballot>) e).getCount()).reversed();
log.info("View consensus failed: {}, required: {} cardinality: {} ballots: {} for: {} on: {}",
observations.size(), majority, viewManagement.cardinality(),
max == null ? 0 : max.getCount(), majority, viewManagement.cardinality(),
ballots.entrySet().stream().sorted(reversed).toList(), currentView(), node.getId());
}

Expand Down Expand Up @@ -845,13 +857,11 @@ private boolean add(SignedViewChange observation) {
observation.getChange().getAttempt(), currentObservation.getChange().getAttempt(), inView,
currentView(), observer, node.getId());
return false;
} else if (observation.getChange().getAttempt() < currentObservation.getChange().getAttempt()) {
return false;
}
}
final var member = context.getActiveMember(observer);
if (member == null) {
log.trace("Cannot validate view change: {} current: {} offline: {} on: {}", inView, currentView(), observer,
log.trace("Cannot validate view change: {} current: {} from: {} on: {}", inView, currentView(), observer,
node.getId());
return false;
}
Expand All @@ -860,6 +870,8 @@ private boolean add(SignedViewChange observation) {
if (!member.verify(signature, observation.getChange().toByteString())) {
return null;
}
log.trace("Observation: {} current: {} view change: {} from: {} on: {}",
observation.getChange().getAttempt(), inView, currentView(), observer, node.getId());
return observation;
}) != null;
}
Expand Down Expand Up @@ -1216,7 +1228,7 @@ private NoteGossip.Builder processNotes(BloomFilter<Digest> bff) {
.filter(m -> !bff.contains(m.getNote().getHash()))
.collect(new ReservoirSampler<>(params.maximumTxfr(), Entropy.bitsStream()))
.stream()
.map(m -> m.getNote())
.map(Participant::getNote)
.forEach(n -> builder.addUpdates(n.getWrapped()));
return builder;
}
Expand All @@ -1225,10 +1237,6 @@ private NoteGossip.Builder processNotes(BloomFilter<Digest> bff) {
* Process the inbound notes from the gossip. Reconcile the differences between the view's state and the digests of
* the gossip. Update the reply with the list of digests the view requires, as well as proposed updates based on the
* inbound digests that the view has more recent information
*
* @param from
* @param p
* @param bff
*/
private NoteGossip processNotes(Digest from, BloomFilter<Digest> bff, double p) {
NoteGossip.Builder builder = processNotes(bff);
Expand Down Expand Up @@ -1868,13 +1876,13 @@ public void join(Join join, Digest from, StreamObserver<Gateway> responseObserve
@Override
public Gossip rumors(SayWhat request, Digest from) {
if (!introduced.get()) {
log.trace("Not introduced; ring: {} from: {}, on: {}", request.getRing(), from, node.getId());
// log.trace("Not introduced; ring: {} from: {}, on: {}", request.getRing(), from, node.getId());
throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription("Not introduced"));
}
return stable(() -> {
final var ring = request.getRing();
if (!context.validRing(ring)) {
log.debug("invalid gossip ring: {} from: {} on: {}", ring, from, node.getId());
// log.debug("invalid gossip ring: {} from: {} on: {}", ring, from, node.getId());
throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription("invalid ring"));
}
validate(from, request);
Expand Down
Loading

0 comments on commit 321724d

Please sign in to comment.