diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 0a1a4bef9..3d8ae208c 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -181,26 +181,25 @@ private boolean completeGateway(Participant member, CompletableFuture gat private void gatewaySRE(Digest v, Entrance link, StatusRuntimeException sre, AtomicInteger abandon) { if (sre.getStatus().getCode().equals(Status.OUT_OF_RANGE.getCode())) { - log.debug("Gateway view: {} invalid: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), - node.getId()); + log.info("Gateway view: {} invalid: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), + node.getId()); abandon.incrementAndGet(); } else if (sre.getStatus().getCode().equals(Status.FAILED_PRECONDITION.getCode())) { - log.debug("Gateway view: {} unavailable: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), - node.getId()); + log.info("Gateway view: {} unavailable: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), + node.getId()); abandon.incrementAndGet(); } else if (sre.getStatus().getCode().equals(Status.PERMISSION_DENIED.getCode())) { - log.debug("Gateway view: {} permission denied: {} from: {} on: {}", v, sre.getMessage(), - link.getMember().getId(), node.getId()); + log.info("Gateway view: {} permission denied: {} from: {} on: {}", v, sre.getMessage(), + link.getMember().getId(), node.getId()); abandon.incrementAndGet(); } else if (sre.getStatus().getCode().equals(Status.RESOURCE_EXHAUSTED.getCode())) { - log.debug("Gateway view: {} full: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), - node.getId()); + log.info("Gateway view: {} full: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), + node.getId()); abandon.incrementAndGet(); } else { - log.debug("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), - node.getId()); + log.info("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), + node.getId()); } - ; } private Join join(Digest v) { @@ -224,8 +223,8 @@ private Join join(Digest v) { this.context.rebalance(r.getCardinality()); node.nextNote(view); - log.debug("Completing redirect to view: {} context: {} successors: {} on: {}", view, this.context.getId(), - r.getSuccessorsCount(), node.getId()); + log.debug("Completing redirect to view: {} context: {} sample: {} on: {}", view, this.context.getId(), + r.getSampleCount(), node.getId()); if (timer != null) { timer.close(); } @@ -234,12 +233,12 @@ private Join join(Digest v) { } private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecutorService scheduler) { - var successors = redirect.getSuccessorsList() - .stream() - .map(sn -> new NoteWrapper(sn.getNote(), digestAlgo)) - .map(nw -> view.new Participant(nw)) - .collect(Collectors.toList()); - log.info("Redirecting to: {} context: {} successors: {} on: {}", v, this.context.getId(), successors.size(), + var sample = redirect.getSampleList() + .stream() + .map(sn -> new NoteWrapper(sn.getNote(), digestAlgo)) + .map(nw -> view.new Participant(nw)) + .collect(Collectors.toList()); + log.info("Redirecting to: {} context: {} sample: {} on: {}", v, this.context.getId(), sample.size(), node.getId()); var gateway = new CompletableFuture(); var timer = metrics == null ? null : metrics.joinDuration().time(); @@ -258,7 +257,7 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu this.context.rebalance(cardinality); node.nextNote(v); - final var redirecting = new SliceIterator<>("Gateways", node, successors, approaches); + final var redirecting = new SliceIterator<>("Gateways", node, sample, approaches); var majority = redirect.getBootstrap() ? 1 : Context.minimalQuorum(redirect.getRings(), this.context.getBias()); final var join = join(v); final var abandon = new AtomicInteger(); @@ -267,8 +266,8 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId()); try { var g = link.join(join, params.seedingTimeout()); - if (g.equals(Gateway.getDefaultInstance())) { - log.debug("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId()); + if (g == null || g.equals(Gateway.getDefaultInstance())) { + log.info("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId()); abandon.incrementAndGet(); return null; } @@ -276,6 +275,11 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu } catch (StatusRuntimeException sre) { gatewaySRE(v, link, sre, abandon); return null; + } catch (Throwable t) { + log.info("Gateway view: {} error: {} from: {} on: {}", v, t.toString(), link.getMember().getId(), + node.getId()); + abandon.incrementAndGet(); + return null; } }, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, trusts, initialSeedSet, v, majority), () -> { @@ -283,20 +287,20 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu return; } if (abandon.get() >= majority) { - log.debug("Abandoning Gateway view: {} reseeding on: {}", v, node.getId()); + log.info("Abandoning Gateway view: {} reseeding on: {}", v, node.getId()); seeding(); } else { abandon.set(0); if (retries.get() < params.joinRetries()) { - log.debug("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(), - params.joinRetries(), node.getId()); + log.info("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(), + params.joinRetries(), node.getId()); trusts.clear(); initialSeedSet.clear(); scheduler.schedule(Utils.wrapped(regate.get(), log), Entropy.nextBitsStreamLong(params.retryDelay().toNanos()), TimeUnit.NANOSECONDS); } else { - log.error("Failed to join view: {} cannot obtain majority on: {}", view, node.getId()); + log.error("Failed to join view: {} cannot obtain majority Gateway on: {}", view, node.getId()); view.stop(); } } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java index cbcde66d6..fe9622696 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java @@ -10,7 +10,6 @@ /** * @author hal.hildebrand - * */ public record Parameters(int joinRetries, int minimumBiffCardinality, int rebuttalTimeout, int viewChangeRounds, int finalizeViewRounds, double fpr, int maximumTxfr, Duration retryDelay, int maxPending, @@ -40,10 +39,9 @@ public static class Builder { /** * Maximum number of elements to transfer per type per update */ - private int maximumTxfr = 10; + private int maximumTxfr = 1024; /** * Maximum pending joins - * */ private int maxPending = 15; /** @@ -81,68 +79,49 @@ public int getCrowns() { return crowns; } + public Builder setCrowns(int crowns) { + this.crowns = crowns; + return this; + } + public int getFinalizeViewRounds() { return finalizeViewRounds; } + public Builder setFinalizeViewRounds(int finalizeViewRounds) { + this.finalizeViewRounds = finalizeViewRounds; + return this; + } + public double getFpr() { return fpr; } + public Builder setFpr(double fpr) { + this.fpr = fpr; + return this; + } + public int getJoinRetries() { return joinRetries; } - public int getMaximumTxfr() { - return maximumTxfr; + public Builder setJoinRetries(int joinRetries) { + this.joinRetries = joinRetries; + return this; } public int getMaxPending() { return maxPending; } - public int getMinimumBiffCardinality() { - return minimumBiffCardinality; - } - - public int getRebuttalTimeout() { - return rebuttalTimeout; - } - - public Duration getRetryDelay() { - return retryDelay; - } - - public Duration getSeedingTimout() { - return seedingTimout; - } - - public int getValidationRetries() { - return validationRetries; - } - - public int getViewChangeRounds() { - return viewChangeRounds; - } - - public Builder setCrowns(int crowns) { - this.crowns = crowns; - return this; - } - - public Builder setFinalizeViewRounds(int finalizeViewRounds) { - this.finalizeViewRounds = finalizeViewRounds; - return this; - } - - public Builder setFpr(double fpr) { - this.fpr = fpr; + public Builder setMaxPending(int maxPending) { + this.maxPending = maxPending; return this; } - public Builder setJoinRetries(int joinRetries) { - this.joinRetries = joinRetries; - return this; + public int getMaximumTxfr() { + return maximumTxfr; } public Builder setMaximumTxfr(int maximumTxfr) { @@ -150,9 +129,8 @@ public Builder setMaximumTxfr(int maximumTxfr) { return this; } - public Builder setMaxPending(int maxPending) { - this.maxPending = maxPending; - return this; + public int getMinimumBiffCardinality() { + return minimumBiffCardinality; } public Builder setMinimumBiffCardinality(int minimumBiffCardinality) { @@ -160,26 +138,46 @@ public Builder setMinimumBiffCardinality(int minimumBiffCardinality) { return this; } + public int getRebuttalTimeout() { + return rebuttalTimeout; + } + public Builder setRebuttalTimeout(int rebuttalTimeout) { this.rebuttalTimeout = rebuttalTimeout; return this; } + public Duration getRetryDelay() { + return retryDelay; + } + public Builder setRetryDelay(Duration retryDelay) { this.retryDelay = retryDelay; return this; } + public Duration getSeedingTimout() { + return seedingTimout; + } + public Builder setSeedingTimout(Duration seedingTimout) { this.seedingTimout = seedingTimout; return this; } + public int getValidationRetries() { + return validationRetries; + } + public Builder setValidationRetries(int validationRetries) { this.validationRetries = validationRetries; return this; } + public int getViewChangeRounds() { + return viewChangeRounds; + } + public Builder setViewChangeRounds(int viewChangeRounds) { this.viewChangeRounds = viewChangeRounds; return this; diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 1b5fc2080..cb1265229 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -85,8 +85,8 @@ public class View { private static final Logger log = LoggerFactory.getLogger( View.class); private static final String SCHEDULED_VIEW_CHANGE = "Scheduled View Change"; + final CommonCommunications comm; private final CommonCommunications approaches; - private final CommonCommunications comm; private final Context context; private final DigestAlgorithm digestAlgo; private final RingCommunications gossiper; @@ -315,7 +315,7 @@ boolean addToView(NoteWrapper note) { if (accused) { checkInvalidations(member); } - if (!viewManagement.isJoined() && context.totalCount() == context.cardinality()) { + if (!viewManagement.joined() && context.totalCount() == context.cardinality()) { assert context.totalCount() == context.cardinality(); viewManagement.join(); } else { @@ -423,6 +423,37 @@ void notifyListeners(List joining, List leaving) { }); } + /** + * Process the updates of the supplied juicy gossip. + * + * @param gossip + */ + void processUpdates(Gossip gossip) { + processUpdates(gossip.getNotes().getUpdatesList(), gossip.getAccusations().getUpdatesList(), + gossip.getObservations().getUpdatesList(), gossip.getJoins().getUpdatesList()); + } + + /** + * Redirect the receiver to the correct ring, processing any new accusations + * + * @param member + * @param gossip + * @param ring + */ + boolean redirect(Participant member, Gossip gossip, int ring) { + if (!gossip.hasRedirect()) { + log.warn("Redirect from: {} on ring: {} did not contain redirect member note on: {}", member.getId(), ring, + node.getId()); + return false; + } + final var redirect = new NoteWrapper(gossip.getRedirect(), digestAlgo); + add(redirect); + processUpdates(gossip); + log.debug("Redirected from: {} to: {} on ring: {} on: {}", member.getId(), redirect.getId(), ring, + node.getId()); + return true; + } + /** * Remove the participant from the context * @@ -529,6 +560,10 @@ Stream streamShunned() { return shunned.stream(); } + void tick() { + roundTimers.tick(); + } + void viewChange(Runnable r) { // log.error("Enter view change on: {}", node.getId()); final var lock = viewChange.writeLock(); @@ -541,6 +576,74 @@ void viewChange(Runnable r) { } } + /** + * Gossip with the member + * + * @param ring - the index of the gossip ring the gossip is originating from in this view + * @param link - the outbound communications to the paired member + * @param ring + * @throws Exception + */ + protected Gossip gossip(Fireflies link, int ring) { + tick(); + if (shunned.contains(link.getMember().getId())) { + log.trace("Shunning gossip view: {} with: {} on: {}", currentView(), link.getMember().getId(), + node.getId()); + if (metrics != null) { + metrics.shunnedGossip().mark(); + } + return null; + } + + final SayWhat gossip = stable(() -> SayWhat.newBuilder() + .setView(currentView().toDigeste()) + .setNote(node.getNote().getWrapped()) + .setRing(ring) + .setGossip(commonDigests()) + .build()); + try { + return link.gossip(gossip); + } catch (Throwable e) { + final var p = (Participant) link.getMember(); + if (!viewManagement.joined()) { + log.debug("Exception: {} bootstrap gossiping with:S {} view: {} on: {}", e.getMessage(), p.getId(), + currentView(), node.getId()); + return null; + } + if (e instanceof StatusRuntimeException sre) { + switch (sre.getStatus().getCode()) { + case PERMISSION_DENIED: + log.trace("Rejected gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), + node.getId()); + break; + case FAILED_PRECONDITION: + log.trace("Failed gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), + node.getId()); + break; + case RESOURCE_EXHAUSTED: + log.trace("Unavailable for gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), + p.getId(), node.getId()); + break; + case CANCELLED: + log.trace("Communication cancelled for gossip view: {} from: {} on: {}", currentView(), p.getId(), + node.getId()); + break; + default: + log.debug("Error gossiping: {} view: {} from: {} on: {}", sre.getStatus(), p.getId(), currentView(), + node.getId()); + accuse(p, ring, sre); + break; + + } + } else { + log.debug("Exception gossiping with: {} view: {} on: {}", p.getId(), currentView(), node.getId(), e); + accuse(p, ring, e); + } + return null; + } + + } + /** * Accuse the member on the ring * @@ -908,81 +1011,12 @@ private void gossip(Duration duration, ScheduledExecutorService scheduler) { } if (context.activeCount() == 1) { - roundTimers.tick(); + tick(); } gossiper.execute((link, ring) -> gossip(link, ring), (result, destination) -> gossip(result, destination, duration, scheduler)); } - /** - * Gossip with the member - * - * @param ring - the index of the gossip ring the gossip is originating from in this view - * @param link - the outbound communications to the paired member - * @param ring - * @throws Exception - */ - private Gossip gossip(Fireflies link, int ring) { - roundTimers.tick(); - if (shunned.contains(link.getMember().getId())) { - log.trace("Shunning gossip view: {} with: {} on: {}", currentView(), link.getMember().getId(), - node.getId()); - if (metrics != null) { - metrics.shunnedGossip().mark(); - } - return null; - } - - final SayWhat gossip = stable(() -> SayWhat.newBuilder() - .setView(currentView().toDigeste()) - .setNote(node.getNote().getWrapped()) - .setRing(ring) - .setGossip(commonDigests()) - .build()); - try { - return link.gossip(gossip); - } catch (Throwable e) { - final var p = (Participant) link.getMember(); - if (!viewManagement.joined()) { - log.debug("Exception: {} bootstrap gossiping with:S {} view: {} on: {}", e.getMessage(), p.getId(), - currentView(), node.getId()); - return null; - } - if (e instanceof StatusRuntimeException sre) { - switch (sre.getStatus().getCode()) { - case PERMISSION_DENIED: - log.trace("Rejected gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), - node.getId()); - break; - case FAILED_PRECONDITION: - log.trace("Failed gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), - node.getId()); - break; - case RESOURCE_EXHAUSTED: - log.trace("Unavailable for gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), - p.getId(), node.getId()); - break; - case CANCELLED: - log.trace("Communication cancelled for gossip view: {} from: {} on: {}", currentView(), p.getId(), - node.getId()); - break; - default: - log.debug("Error gossiping: {} view: {} from: {} on: {}", sre.getStatus(), p.getId(), currentView(), - node.getId()); - accuse(p, ring, sre); - break; - - } - return null; - } else { - log.debug("Exception gossiping with {} view: {} on: {}", p.getId(), currentView(), node.getId(), e); - accuse(p, ring, e); - return null; - } - } - - } - /** * Handle the gossip response from the destination * @@ -993,49 +1027,48 @@ private Gossip gossip(Fireflies link, int ring) { */ private void gossip(Optional result, RingCommunications.Destination destination, Duration duration, ScheduledExecutorService scheduler) { - final var member = destination.member(); try { - if (result.isEmpty()) { - return; - } - - try { - Gossip gossip = result.get(); - if (gossip.hasRedirect()) { - stable(() -> redirect(member, gossip, destination.ring())); - } else if (viewManagement.joined()) { - try { - Update update = stable(() -> response(gossip)); - if (update != null && !update.equals(Update.getDefaultInstance())) { - log.trace("Update for: {} notes: {} accusations: {} joins: {} observations: {} on: {}", - destination.link().getMember().getId(), update.getNotesCount(), - update.getAccusationsCount(), update.getJoinsCount(), - update.getObservationsCount(), node.getId()); - destination.link() - .update(State.newBuilder() - .setView(currentView().toDigeste()) - .setRing(destination.ring()) - .setUpdate(update) - .build()); + if (result.isPresent()) { + final var member = destination.member(); + try { + Gossip gossip = result.get(); + if (gossip.hasRedirect()) { + stable(() -> redirect(member, gossip, destination.ring())); + } else if (viewManagement.joined()) { + try { + Update update = stable(() -> response(gossip)); + if (update != null && !update.equals(Update.getDefaultInstance())) { + log.trace("Update for: {} notes: {} accusations: {} joins: {} observations: {} on: {}", + destination.member().getId(), update.getNotesCount(), + update.getAccusationsCount(), update.getJoinsCount(), + update.getObservationsCount(), node.getId()); + destination.link() + .update(State.newBuilder() + .setView(currentView().toDigeste()) + .setRing(destination.ring()) + .setUpdate(update) + .build()); + } + } catch (StatusRuntimeException e) { + handleSRE("update", destination, member, e); + } + } else { + stable(() -> processUpdates(gossip)); + } + } catch (NoSuchElementException e) { + if (!viewManagement.joined()) { + log.debug("Null bootstrap gossiping with: {} view: {} on: {}", member.getId(), currentView(), + node.getId()); + } else { + if (e.getCause() instanceof StatusRuntimeException sre) { + handleSRE("gossip", destination, member, sre); + } else { + accuse(member, destination.ring(), e); } - } catch (StatusRuntimeException e) { - handleSRE("update", destination, member, e); } - } else { - stable(() -> processUpdates(gossip)); - } - } catch (NoSuchElementException e) { - if (!viewManagement.joined()) { - log.debug("Null bootstrap gossiping with: {} view: {} on: {}", member.getId(), currentView(), - node.getId()); - return; - } - if (e.getCause() instanceof StatusRuntimeException sre) { - handleSRE("gossip", destination, member, sre); - } else { - accuse(member, destination.ring(), e); } } + } finally { futureGossip = scheduler.schedule(Utils.wrapped(() -> gossip(duration, scheduler), log), duration.toNanos(), TimeUnit.NANOSECONDS); @@ -1196,16 +1229,6 @@ private ViewChangeGossip processObservations(BloomFilter bff, double p) return builder.build(); } - /** - * Process the updates of the supplied juicy gossip. - * - * @param gossip - */ - private void processUpdates(Gossip gossip) { - processUpdates(gossip.getNotes().getUpdatesList(), gossip.getAccusations().getUpdatesList(), - gossip.getObservations().getUpdatesList(), gossip.getJoins().getUpdatesList()); - } - /** * Process the updates of the supplied juicy gossip. * @@ -1247,27 +1270,6 @@ private void recover(Participant member) { } } - /** - * Redirect the receiver to the correct ring, processing any new accusations - * - * @param member - * @param gossip - * @param ring - */ - private boolean redirect(Participant member, Gossip gossip, int ring) { - if (!gossip.hasRedirect()) { - log.warn("Redirect from: {} on ring: {} did not contain redirect member note on: {}", member.getId(), ring, - node.getId()); - return false; - } - final var redirect = new NoteWrapper(gossip.getRedirect(), digestAlgo); - add(redirect); - processUpdates(gossip); - log.debug("Redirected from: {} to: {} on ring: {} on: {}", member.getId(), redirect.getId(), ring, - node.getId()); - return true; - } - /** * Redirect the member to the successor from this view's perspective * @@ -1281,24 +1283,29 @@ private Gossip redirectTo(Participant member, int ring, Participant successor, D assert member != null; assert successor != null; if (successor.getNote() == null) { - log.debug("Cannot redirect from: {} to: {} on ring: {} as note is null on: {}", node, successor, ring, - node.getId()); + log.debug("Cannot redirect: {} to: {} on ring: {} as note is null on: {}", member.getId(), + successor.getId(), ring, node.getId()); return Gossip.getDefaultInstance(); } var identity = successor.getNote(); if (identity == null) { - log.debug("Cannot redirect from: {} to: {} on ring: {} as note is null on: {}", node, successor, ring, - node.getId()); + log.debug("Cannot redirect: {} to: {} on ring: {} as note is null on: {}", member.getId(), + successor.getId(), ring, node.getId()); return Gossip.getDefaultInstance(); } - return Gossip.newBuilder() - .setRedirect(successor.getNote().getWrapped()) - .setNotes(processNotes(BloomFilter.from(digests.getNoteBff()))) - .setAccusations(processAccusations(BloomFilter.from(digests.getAccusationBff()))) - .setObservations(processObservations(BloomFilter.from(digests.getObservationBff()))) - .setJoins(viewManagement.processJoins(BloomFilter.from(digests.getJoinBiff()))) - .build(); + var gossip = Gossip.newBuilder() + .setRedirect(successor.getNote().getWrapped()) + .setNotes(processNotes(BloomFilter.from(digests.getNoteBff()))) + .setAccusations(processAccusations(BloomFilter.from(digests.getAccusationBff()))) + .setObservations(processObservations(BloomFilter.from(digests.getObservationBff()))) + .setJoins(viewManagement.processJoins(BloomFilter.from(digests.getJoinBiff()))) + .build(); + log.info("Redirecting: {} to: {} on ring: {} notes: {} acc: {} obv: {} joins: {} on: {}", member.getId(), + successor.getId(), ring, gossip.getNotes().getUpdatesCount(), + gossip.getAccusations().getUpdatesCount(), gossip.getObservations().getUpdatesCount(), + gossip.getJoins().getUpdatesCount(), node.getId()); + return gossip; } /** @@ -1990,7 +1997,7 @@ public void update(State request, Digest from) { @Override public Validation validateCoords(EventCoords request, Digest from) { var coordinates = EventCoordinates.from(request); - if (!viewManagement.isJoined()) { + if (!viewManagement.joined()) { log.info("Not yet joined!, ignoring validation request: {} from: {} on: {}", from, coordinates, node.getId()); return Validation.newBuilder().setResult(false).build(); diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 25c36eb9f..2aa75331d 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -15,12 +15,15 @@ import com.salesforce.apollo.fireflies.Binding.Bound; import com.salesforce.apollo.fireflies.View.Node; import com.salesforce.apollo.fireflies.View.Participant; +import com.salesforce.apollo.fireflies.comm.gossip.Fireflies; import com.salesforce.apollo.fireflies.proto.*; import com.salesforce.apollo.fireflies.proto.Update.Builder; import com.salesforce.apollo.membership.Context; import com.salesforce.apollo.membership.ReservoirSampler; +import com.salesforce.apollo.ring.SliceIterator; import com.salesforce.apollo.stereotomy.event.EstablishmentEvent; import com.salesforce.apollo.utils.Entropy; +import com.salesforce.apollo.utils.Utils; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -57,7 +60,6 @@ public class ViewManagement { private final Map>> pendingJoins = new ConcurrentSkipListMap<>(); private final View view; private final AtomicReference vote = new AtomicReference<>(); - private final List joinSeedSet = new CopyOnWriteArrayList<>(); private final Lock joinLock = new ReentrantLock(); private boolean bootstrap; private AtomicReference currentView = new AtomicReference<>(); @@ -149,7 +151,14 @@ void install(Ballot ballot) { ballot.leaving.stream().filter(d -> !node.getId().equals(d)).forEach(p -> view.remove(p)); - context.rebalance(context.totalCount() + ballot.joining.size()); + final var seedSet = context.sample(params.maximumTxfr(), Entropy.bitsStream(), node.getId()) + .stream() + .map(p -> p.note.getWrapped()) + .toList(); + + var cardinality = context.totalCount() + ballot.joining.size(); + + context.rebalance(cardinality); var joining = new ArrayList(); var pending = ballot.joining() .stream() @@ -165,19 +174,13 @@ void install(Ballot ballot) { .map(nw -> pendingJoins.remove(nw.getId())) .filter(p -> p != null) .toList(); + pendingJoins.clear(); setDiadem( HexBloom.construct(context.memberCount(), context.allMembers().map(p -> p.getId()), view.bootstrapView(), params.crowns())); view.reset(); - - var seedSet = new ArrayList(); // complete all pending joins - context.ring(Entropy.nextBitsStreamInt(context.getRingCount())) - .stream() - .limit(params.maximumTxfr()) - .map(p -> p.getNote().getWrapped()) - .forEach(sn -> seedSet.add(sn)); pending.forEach(r -> { try { r.accept(seedSet); @@ -197,10 +200,6 @@ void install(Ballot ballot) { view.notifyListeners(joining, ballot.leaving); } - boolean isJoined() { - return joined(); - } - /** * Formally join the view. Calculate the HEX-BLOOM crown and view, fail and stop if does not match currentView */ @@ -212,6 +211,8 @@ void join() { return; } var current = currentView(); + log.info("Joining view: {} cardinality: {} count: {} on: {}", current, context.cardinality(), + context.totalCount(), node.getId()); var calculated = HexBloom.construct(context.totalCount(), context.allMembers().map(p -> p.getId()), view.bootstrapView(), params.crowns()); @@ -252,7 +253,7 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time var note = new NoteWrapper(join.getNote(), digestAlgo); if (!from.equals(note.getId())) { responseObserver.onError( - new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Member not match note"))); + new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Member does not match note"))); return; } log.debug("Join requested from: {} view: {} context: {} cardinality: {} on: {}", from, thisView, @@ -260,7 +261,11 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time if (contains(from)) { log.debug("Already a member: {} view: {} context: {} cardinality: {} on: {}", from, thisView, context.getId(), context.cardinality(), node.getId()); - joined(Collections.emptySet(), from, responseObserver, timer); + pendingJoins.remove(from); + joined(context.sample(params.maximumTxfr(), Entropy.bitsStream(), node.getId()) + .stream() + .map(p -> p.note.getWrapped()) + .toList(), from, responseObserver, timer); return; } if (!thisView.equals(joinView)) { @@ -322,14 +327,16 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time if (timer != null) { timer.stop(); } - view.introduced(); log.info("Currently joining view: {} seeds: {} cardinality: {} count: {} on: {}", currentView.get(), bound.successors().size(), context.cardinality(), context.totalCount(), node.getId()); + if (context.totalCount() == context.cardinality()) { + join(); + } else { + var sample = new ArrayList<>(context.activeMembers()); + populate(sample, scheduler); + } }); - if (context.totalCount() == context.cardinality()) { - join(); - } }; } @@ -356,6 +363,37 @@ void maybeViewChange() { } } + void populate(List sample, ScheduledExecutorService scheduler) { + var populate = new SliceIterator("Populate: " + context.getId(), node, sample, view.comm); + var repopulate = new AtomicReference(); + repopulate.set(() -> { + populate.iterate((link, m) -> { + log.debug("Populating: {} contacting: {} on: {}", context.getId(), link.getMember().getId(), + node.getId()); + view.tick(); + return view.gossip(link, 0); + }, (futureSailor, link, m) -> { + futureSailor.ifPresent(g -> { + if (g.hasRedirect()) { + final Participant member = (Participant) link.getMember(); + if (g.hasRedirect()) { + view.stable(() -> view.redirect(member, g, 0)); + } + } else { + view.stable(() -> view.processUpdates(g)); + } + }); + return !joined(); + }, () -> { + if (!joined()) { + scheduler.schedule(Utils.wrapped(() -> repopulate.get(), log), params.retryDelay().toNanos(), + TimeUnit.NANOSECONDS); + } + }, scheduler, params.retryDelay()); + }); + repopulate.get().run(); + } + JoinGossip.Builder processJoins(BloomFilter bff) { JoinGossip.Builder builder = JoinGossip.newBuilder(); @@ -412,13 +450,13 @@ Redirect seed(Registration registration, Digest from) { } return view.stable(() -> { var newMember = view.new Participant(note.getId()); - final var successors = new TreeSet(context.successors(newMember, m -> context.isActive(m))); + final var sample = context.sample(params.maximumTxfr(), Entropy.bitsStream(), (Digest) null); - log.debug("Member seeding: {} view: {} context: {} successors: {} on: {}", newMember.getId(), currentView(), - context.getId(), successors.size(), node.getId()); + log.info("Member seeding: {} view: {} context: {} sample: {} on: {}", newMember.getId(), currentView(), + context.getId(), sample.size(), node.getId()); return Redirect.newBuilder() .setView(currentView().toDigeste()) - .addAllSuccessors(successors.stream().filter(p -> p != null).map(p -> p.getSeed()).toList()) + .addAllSample(sample.stream().filter(p -> p != null).map(p -> p.getSeed()).toList()) .setCardinality(context.cardinality()) .setBootstrap(bootstrap) .setRings(context.getRingCount()) diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java index dcdf9111f..31b0cc159 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java @@ -259,7 +259,7 @@ public void churn() throws Exception { } private void initialize() { - var parameters = Parameters.newBuilder().build(); + var parameters = Parameters.newBuilder().setMaximumTxfr(20).build(); registry = new MetricRegistry(); node0Registry = new MetricRegistry(); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java index 074437799..8b28f38ef 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java @@ -173,10 +173,7 @@ public void smokin() throws Exception { } private void initialize() { - var parameters = Parameters.newBuilder() - .setMaxPending(largeTests ? 10 : 10) - .setMaximumTxfr(largeTests ? 100 : 20) - .build(); + var parameters = Parameters.newBuilder().setMaxPending(10).setMaximumTxfr(largeTests ? 100 : 20).build(); registry = new MetricRegistry(); node0Registry = new MetricRegistry(); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java index 8774cc226..f43b99bc4 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java @@ -69,7 +69,7 @@ public class MtlsTest { private static Map> identities; static { - CARDINALITY = LARGE_TESTS ? 100 : 10; + CARDINALITY = LARGE_TESTS ? 20 : 10; } private List communications = new ArrayList<>(); @@ -106,7 +106,7 @@ public void after() { @Test public void smoke() throws Exception { - var parameters = Parameters.newBuilder().build(); + var parameters = Parameters.newBuilder().setMaximumTxfr(20).build(); final Duration duration = Duration.ofMillis(50); var registry = new MetricRegistry(); var node0Registry = new MetricRegistry(); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java index 8b0db5fca..f38cfab6c 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java @@ -58,7 +58,7 @@ public class SwarmTest { "large_tests"); static { - CARDINALITY = largeTests ? 500 : 50; + CARDINALITY = largeTests ? 100 : 50; } private List communications = new ArrayList<>(); diff --git a/grpc/src/main/proto/fireflies.proto b/grpc/src/main/proto/fireflies.proto index 7d63d1310..24d44a294 100644 --- a/grpc/src/main/proto/fireflies.proto +++ b/grpc/src/main/proto/fireflies.proto @@ -134,7 +134,7 @@ message Redirect { int32 cardinality = 2; int32 rings = 3; bool bootstrap = 4; - repeated Seed_ successors = 6; + repeated Seed_ sample = 6; } message Seed_ { diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/Context.java b/memberships/src/main/java/com/salesforce/apollo/membership/Context.java index 9253301fc..9887c7895 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/Context.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/Context.java @@ -365,6 +365,16 @@ static List uniqueSuccessors(final Context context, Digest diges */ Stream> rings(); + /** + * Answer a random sample of at least range size from the active members of the context + * + * @param range - the desired range + * @param entropy - source o randomness + * @param exc - the member to exclude from sample + * @return a random sample set of the view's live members. May be limited by the number of active members. + */ + List sample(int range, BitsStreamGenerator entropy, Digest exc); + /** * Answer a random sample of at least range size from the active members of the context * @@ -373,7 +383,7 @@ static List uniqueSuccessors(final Context context, Digest diges * @param excluded - the member to exclude from sample * @return a random sample set of the view's live members. May be limited by the number of active members. */ - List sample(int range, BitsStreamGenerator entropy, Digest exc); + List sample(int range, BitsStreamGenerator entropy, Predicate excluded); /** * Answer the total count of active and offline members of this context diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/ContextImpl.java b/memberships/src/main/java/com/salesforce/apollo/membership/ContextImpl.java index 990117ec7..acdfbc37f 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/ContextImpl.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/ContextImpl.java @@ -525,12 +525,27 @@ public Stream> rings() { * * @param range - the desired range * @param entropy - source o randomness - * @param excluded - the member to exclude from sample + * @param excluded - predicate to test for exclusion + * @return a random sample set of the view's live members. May be limited by the number of active members. + */ + @Override + public List sample(int range, BitsStreamGenerator entropy, Predicate excluded) { + return rings.get(entropy.nextInt(rings.size())) + .stream() + .collect(new ReservoirSampler(excluded, range, entropy)); + } + + /** + * Answer a random sample of at least range size from the active members of the context + * + * @param range - the desired range + * @param entropy - source o randomness + * @param exc - the member to exclude from sample * @return a random sample set of the view's live members. May be limited by the number of active members. */ @Override public List sample(int range, BitsStreamGenerator entropy, Digest exc) { - Member excluded = getMember(exc); + Member excluded = exc == null ? null : getMember(exc); return rings.get(entropy.nextInt(rings.size())) .stream() .collect(new ReservoirSampler(excluded, range, entropy)); diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/ReservoirSampler.java b/memberships/src/main/java/com/salesforce/apollo/membership/ReservoirSampler.java index feb122fc7..d62dde510 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/ReservoirSampler.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/ReservoirSampler.java @@ -6,31 +6,32 @@ */ package com.salesforce.apollo.membership; +import org.apache.commons.math3.random.BitsStreamGenerator; + import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.BinaryOperator; -import java.util.function.Function; -import java.util.function.Supplier; +import java.util.function.*; import java.util.stream.Collector; -import org.apache.commons.math3.random.BitsStreamGenerator; - public class ReservoirSampler implements Collector, List> { - private AtomicInteger c = new AtomicInteger(); - private final Object exclude; + private final Predicate exclude; private final BitsStreamGenerator rand; private final int sz; + private AtomicInteger c = new AtomicInteger(); public ReservoirSampler(int size, BitsStreamGenerator entropy) { this(null, size, entropy); } public ReservoirSampler(Object excluded, int size, BitsStreamGenerator entropy) { + this(t -> excluded == null ? false : excluded.equals(t), size, entropy); + } + + public ReservoirSampler(Predicate excluded, int size, BitsStreamGenerator entropy) { assert size >= 0; this.exclude = excluded; this.sz = size; @@ -66,7 +67,7 @@ public Supplier> supplier() { } private void addIt(final List in, T s) { - if (exclude != null && exclude.equals(s)) { + if (exclude != null && exclude.test(s)) { return; } if (in.size() < sz) { diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java index 96cacb1f5..0ce70b617 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java @@ -130,7 +130,7 @@ public void before() throws Exception { } protected int getCardinality() { - return LARGE_TESTS ? 100 : 5; + return LARGE_TESTS ? 10 : 5; } protected void instantiate(SigningMember member, Context context,