Skip to content

Commit

Permalink
fixins. Reimplement ReservoirSampler. 5x5
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 22, 2024
1 parent 321724d commit 2ca8e1c
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 136 deletions.
200 changes: 130 additions & 70 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.codahale.metrics.Timer;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.context.Context;
Expand All @@ -35,9 +36,7 @@

import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -75,6 +74,12 @@ public Binding(View view, List<Seed> seeds, Duration duration, DynamicContext<Pa
this.digestAlgo = digestAlgo;
}

private static void dec(CompletableFuture<Boolean> complete, AtomicInteger remaining) {
if (remaining.decrementAndGet() <= 0) {
complete.complete(false);
}
}

void seeding() {
if (seeds.isEmpty()) {// This node is the bootstrap seed
bootstrap();
Expand Down Expand Up @@ -135,33 +140,40 @@ private boolean complete(CompletableFuture<Redirect> redirect, Optional<Redirect
return true;
}

private boolean completeGateway(Participant member, CompletableFuture<Bound> gateway,
Optional<Gateway> futureSailor, HashMultiset<Bootstrapping> trusts,
Set<SignedNote> initialSeedSet, Digest v, int majority) {
if (futureSailor.isEmpty()) {
log.warn("No gateway returned from: {} on: {}", member.getId(), node.getId());
return true;
private void complete(Member member, CompletableFuture<Bound> gateway, HashMultiset<Bootstrapping> trusts,
Set<SignedNote> initialSeedSet, Digest v, int majority, CompletableFuture<Boolean> complete,
AtomicInteger remaining, ListenableFuture<Gateway> futureSailor) {
if (complete.isDone()) {
return;
}
if (gateway.isDone()) {
log.warn("gateway is complete, ignoring from: {} on: {}", member.getId(), node.getId());
return false;
Gateway g = null;
try {
g = futureSailor.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.warn("Error retrieving Gateway from: {} on: {}", member.getId(), node.getId(), e.getCause());
dec(complete, remaining);
return;
}

Gateway g = futureSailor.get();

if (g.equals(Gateway.getDefaultInstance())) {
return true;
log.warn("Empty gateway returned from: {} on: {}", member.getId(), node.getId());
dec(complete, remaining);
return;
}
if (g.getInitialSeedSetCount() == 0) {
log.warn("No seeds in gateway returned from: {} on: {}", member.getId(), node.getId());
return true;
dec(complete, remaining);
return;
}

if (g.getTrust().equals(BootstrapTrust.getDefaultInstance()) || g.getTrust()
.getDiadem()
.equals(HexBloome.getDefaultInstance())) {
log.trace("Empty bootstrap trust in join returned from: {} on: {}", member.getId(), node.getId());
return true;
dec(complete, remaining);
return;
}
trusts.add(new Bootstrapping(g.getTrust()));
initialSeedSet.addAll(g.getInitialSeedSetList());
Expand All @@ -175,7 +187,13 @@ private boolean completeGateway(Participant member, CompletableFuture<Bound> gat
.findFirst()
.orElse(null);
if (trust != null) {
validate(trust, gateway, initialSeedSet);
var bound = new Bound(trust.crown,
trust.successors.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList(),
initialSeedSet.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList());
if (gateway.complete(bound)) {
log.info("Gateway acquired: {} context: {} on: {}", trust.diadem, this.context.getId(), node.getId());
}
complete.complete(true);
} else {
log.debug("Gateway received, trust count: {} majority: {} from: {} trusts: {} view: {} context: {} on: {}",
trusts.size(), majority, member.getId(), v, trusts.entrySet()
Expand All @@ -185,8 +203,8 @@ private boolean completeGateway(Participant member, CompletableFuture<Bound> gat
e -> "%s x %s".formatted(e.getElement().diadem,
e.getCount()))
.toList(), this.context.getId(), node.getId());
dec(complete, remaining);
}
return true;
}

private void gatewaySRE(Digest v, Entrance link, StatusRuntimeException sre, AtomicInteger abandon) {
Expand Down Expand Up @@ -216,6 +234,31 @@ private void gatewaySRE(Digest v, Entrance link, StatusRuntimeException sre, Ato
}
}

private boolean join(Member member, CompletableFuture<Bound> gateway, Optional<ListenableFuture<Gateway>> fs,
HashMultiset<Bootstrapping> trusts, Set<SignedNote> initialSeedSet, Digest v, int majority,
CompletableFuture<Boolean> complete, AtomicInteger remaining) {
if (complete.isDone()) {
log.trace("join round already completed for: {} on: {}", member.getId(), node.getId());
return false;
}
if (fs.isEmpty()) {
log.warn("No gateway returned from: {} on: {}", member.getId(), node.getId());
dec(complete, remaining);
return true;
}
if (gateway.isDone()) {
log.warn("gateway is complete, ignoring from: {} on: {}", member.getId(), node.getId());
complete.complete(true);
return false;
}
var futureSailor = fs.get();
futureSailor.addListener(
() -> complete(member, gateway, trusts, initialSeedSet, v, majority, complete, remaining, futureSailor),
r -> Thread.ofVirtual().start(r));

return true;
}

private Join join(Digest v) {
return Join.newBuilder().setView(v.toDigeste()).setNote(node.getNote().getWrapped()).build();
}
Expand Down Expand Up @@ -276,63 +319,88 @@ private void join(Redirect redirect, Digest v, Duration duration) {
final var redirecting = new SliceIterator<>("Gateways", node, sample, approaches);
var majority = redirect.getBootstrap() ? 1 : Context.minimalQuorum(redirect.getRings(), this.context.getBias());
final var join = join(v);
final var abandon = new AtomicInteger();
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
regate.set(() -> {
log.info("Round: {} formally joining view: {} on: {}", retries.get(), v, node.getId());
if (!view.started.get()) {
return;
}
redirecting.iterate((link) -> {
if (gateway.isDone() || !view.started.get()) {
return null;
}
log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId());
try {
var g = link.join(join, params.seedingTimeout());
if (g == null || g.equals(Gateway.getDefaultInstance())) {
log.debug("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId());
abandon.incrementAndGet();
return null;
}
return g;
} catch (StatusRuntimeException sre) {
gatewaySRE(v, link, sre, abandon);
return null;
} catch (Throwable t) {
log.info("Gateway view: {} error: {} from: {} on: {}", v, t, link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
return null;
var complete = new CompletableFuture<Boolean>();
final var abandon = new AtomicInteger();
complete.whenComplete((success, error) -> {
if (error != null) {
log.info("Failed Join on: {}", node.getId(), error);
return;
}
}, (futureSailor, _, _, member) -> completeGateway((Participant) member, gateway, futureSailor, trusts,
initialSeedSet, v, majority), () -> {
if (!view.started.get() || gateway.isDone()) {
if (success) {
return;
}
if (abandon.get() >= majority) {
log.debug("Abandoning Gateway view: {} abandons: {} majority: {} reseeding on: {}", v,
abandon.get(), majority, node.getId());
seeding();
log.info("Join unsuccessful, abandoned: {} trusts: {} on: {}", abandon.get(), trusts.entrySet()
.stream()
.sorted()
.map(
e -> "%s x %s".formatted(
e.getElement().diadem,
e.getCount()))
.toList(),
node.getId());
abandon.set(0);
if (retries.get() < params.joinRetries()) {
log.info("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(),
params.joinRetries(), node.getId());
trusts.clear();
initialSeedSet.clear();
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(regate.get(), log)),
Entropy.nextBitsStreamLong(params.retryDelay().toNanos()), TimeUnit.NANOSECONDS);
} else {
abandon.set(0);
if (retries.get() < params.joinRetries()) {
log.info("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(),
params.joinRetries(), node.getId());
trusts.clear();
initialSeedSet.clear();
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(regate.get(), log)),
Entropy.nextBitsStreamLong(params.retryDelay().toNanos()),
TimeUnit.NANOSECONDS);
} else {
log.error("Failed to join view: {} cannot obtain majority Gateway on: {}", view, node.getId());
view.stop();
}
log.error("Failed to join view: {} cannot obtain majority Gateway on: {}", view, node.getId());
view.stop();
}
}, params.retryDelay());
});
var remaining = new AtomicInteger(sample.size());
redirecting.iterate((link) -> join(v, link, gateway, join, abandon, complete),
(futureSailor, _, _, member) -> join(member, gateway, futureSailor, trusts,
initialSeedSet, v, majority, complete, remaining),
() -> {
if (!view.started.get() || gateway.isDone()) {
return;
}
if (abandon.get() >= majority) {
log.debug(
"Abandoning Gateway view: {} abandons: {} majority: {} reseeding on: {}", v,
abandon.get(), majority, node.getId());
complete.completeExceptionally(new TimeoutException("Failed Join"));
seeding();
}
}, params.retryDelay());
});
regate.get().run();
}

private ListenableFuture<Gateway> join(Digest v, Entrance link, CompletableFuture<Bound> gateway, Join join,
AtomicInteger abandon, CompletableFuture<Boolean> complete) {
if (!view.started.get() || complete.isDone() || gateway.isDone()) {
return null;
}
log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId());
try {
var g = link.join(join, params.seedingTimeout());
if (g == null || g.equals(Gateway.getDefaultInstance())) {
log.debug("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId());
abandon.incrementAndGet();
return null;
}
return g;
} catch (StatusRuntimeException sre) {
gatewaySRE(v, link, sre, abandon);
return null;
} catch (Throwable t) {
log.info("Gateway view: {} error: {} from: {} on: {}", v, t, link.getMember().getId(), node.getId());
abandon.incrementAndGet();
return null;
}
}

private Registration registration() {
return Registration.newBuilder()
.setView(view.currentView().toDigeste())
Expand All @@ -354,14 +422,6 @@ private NoteWrapper seedFor(Seed seed) {
return new NoteWrapper(seedNote, digestAlgo);
}

private void validate(Bootstrapping trust, CompletableFuture<Bound> gateway, Set<SignedNote> initialSeedSet) {
if (gateway.complete(
new Bound(trust.crown, trust.successors.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList(),
initialSeedSet.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList()))) {
log.info("Gateway acquired: {} context: {} on: {}", trust.diadem, this.context.getId(), node.getId());
}
}

private record Bootstrapping(Digest diadem, HexBloom crown, Set<SignedNote> successors) {
public Bootstrapping(BootstrapTrust trust) {
this(HexBloom.from(trust.getDiadem()), new HashSet<>(trust.getSuccessorsList()));
Expand Down
25 changes: 12 additions & 13 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,6 @@ void viewChange(Runnable r) {
*
* @param ring - the index of the gossip ring the gossip is originating from in this view
* @param link - the outbound communications to the paired member
* @param ring
*/
protected Gossip gossip(Fireflies link, int ring) {
tick();
Expand Down Expand Up @@ -656,8 +655,8 @@ protected Gossip gossip(Fireflies link, int ring) {
node.getId());
break;
case UNAVAILABLE:
log.trace("Communication cancelled for gossip view: {} from: {} on: {}", currentView(), p.getId(),
node.getId(), sre);
log.trace("Communication unavailable for gossip view: {} from: {} on: {}", currentView(), p.getId(),
node.getId());
accuse(p, ring, sre);
break;
default:
Expand Down Expand Up @@ -1017,8 +1016,8 @@ private void gc(Participant member) {
* @return the bloom filter containing the digests of known accusations
*/
private BloomFilter<Digest> getAccusationsBff(long seed, double p) {
BloomFilter<Digest> bff = new BloomFilter.DigestBloomFilter(seed, Math.max(params.minimumBiffCardinality(),
context.cardinality() * 2), p);
var n = Math.max(params.minimumBiffCardinality(), context.cardinality());
BloomFilter<Digest> bff = new BloomFilter.DigestBloomFilter(seed, n, 1.0 / (double) n);
context.allMembers()
.flatMap(Participant::getAccusations)
.filter(Objects::nonNull)
Expand All @@ -1033,9 +1032,9 @@ private BloomFilter<Digest> getAccusationsBff(long seed, double p) {
* @return the bloom filter containing the digests of known notes
*/
private BloomFilter<Digest> getNotesBff(long seed, double p) {
BloomFilter<Digest> bff = new BloomFilter.DigestBloomFilter(seed, Math.max(params.minimumBiffCardinality(),
context.cardinality() * 2), p);
context.allMembers().map(m -> m.getNote()).filter(e -> e != null).forEach(n -> bff.add(n.getHash()));
var n = Math.max(params.minimumBiffCardinality(), context.cardinality());
BloomFilter<Digest> bff = new BloomFilter.DigestBloomFilter(seed, n, 1.0 / (double) n);
context.allMembers().map(m -> m.getNote()).filter(e -> e != null).forEach(note -> bff.add(note.getHash()));
return bff;
}

Expand All @@ -1045,8 +1044,8 @@ private BloomFilter<Digest> getNotesBff(long seed, double p) {
* @return the bloom filter containing the digests of known observations
*/
private BloomFilter<Digest> getObservationsBff(long seed, double p) {
BloomFilter<Digest> bff = new BloomFilter.DigestBloomFilter(seed, Math.max(params.minimumBiffCardinality(),
context.cardinality() * 2), p);
var n = Math.max(params.minimumBiffCardinality(), observations.size());
BloomFilter<Digest> bff = new BloomFilter.DigestBloomFilter(seed, n, 1.0 / (double) n);
observations.keySet().stream().collect(Utils.toShuffledList()).forEach(bff::add);
return bff;
}
Expand Down Expand Up @@ -1226,8 +1225,9 @@ private NoteGossip.Builder processNotes(BloomFilter<Digest> bff) {
.filter(m -> current.equals(m.getNote().currentView()))
.filter(m -> !shunned.contains(m.getId()))
.filter(m -> !bff.contains(m.getNote().getHash()))
.collect(new ReservoirSampler<>(params.maximumTxfr(), Entropy.bitsStream()))
.collect(new ReservoirSampler<>(params.maximumTxfr()))
.stream()
.filter(sn -> sn != null)
.map(Participant::getNote)
.forEach(n -> builder.addUpdates(n.getWrapped()));
return builder;
Expand Down Expand Up @@ -1351,9 +1351,8 @@ private Update updatesForDigests(Gossip gossip) {
.filter(m -> m.getNote() != null)
.filter(m -> current.equals(m.getNote().currentView()))
.filter(m -> !notesBff.contains(m.getNote().getHash()))
.collect(new ReservoirSampler<>(params.maximumTxfr(), Entropy.bitsStream()))
.stream()
.map(m -> m.getNote().getWrapped())
.limit(params.maximumTxfr())
.forEach(builder::addNotes);
}

Expand Down
Loading

0 comments on commit 2ca8e1c

Please sign in to comment.