From deacf1a6fd5c6ce3af816a12f101262d2e844e6f Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sun, 2 Jun 2024 11:42:17 -0700 Subject: [PATCH] use SliceIterator for Ethereal.simplify RingCommunications. --- .../apollo/choam/GenesisAssembly.java | 3 +- .../com/salesforce/apollo/choam/Producer.java | 8 +- .../salesforce/apollo/choam/ViewContext.java | 5 + .../apollo/choam/support/Bootstrapper.java | 4 +- .../salesforce/apollo/ethereal/Ethereal.java | 4 +- .../salesforce/apollo/ethereal/Processor.java | 5 +- .../ethereal/memberships/ChRbcGossip.java | 175 ++++++++---------- .../apollo/ethereal/EtherealTest.java | 23 ++- grpc/src/main/proto/ethereal.proto | 5 +- .../salesforce/apollo/leyden/LeydenJar.java | 11 +- .../salesforce/apollo/context/Context.java | 37 +++- .../apollo/ring/RingCommunications.java | 78 +------- .../salesforce/apollo/ring/RingIterator.java | 12 -- .../apollo/ring/RingCommunicationsTest.java | 26 ++- .../apollo/ring/RingIteratorTest.java | 4 +- .../com/salesforce/apollo/thoth/KerlDHT.java | 28 +-- 16 files changed, 179 insertions(+), 249 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index 44633efdc..f8cdbee1b 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -97,7 +97,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications comms, config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId()); controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(), transitions::process, transitions::nextEpoch, label); - coordinator = new ChRbcGossip(reContext, params().member(), controller.processor(), params().communications(), + coordinator = new ChRbcGossip(reContext.getId(), params().member(), nextAssembly.values(), + controller.processor(), params().communications(), params().metrics() == null ? null : params().metrics().getGensisMetrics()); log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(), reContext.getId(), nextAssembly.keySet(), params().member().getId()); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index ff1497af1..8db4483ca 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -99,10 +99,10 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash config.setLabel("Producer" + getViewId() + " on: " + params().member().getId()); var producerMetrics = params().metrics() == null ? null : params().metrics().getProducerMetrics(); - controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, - (preblock, last) -> serial(preblock, last), this::newEpoch, label); - coordinator = new ChRbcGossip(view.context(), params().member(), controller.processor(), - params().communications(), producerMetrics); + controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, this::serial, + this::newEpoch, label); + coordinator = new ChRbcGossip(view.context().getId(), params().member(), view.membership(), + controller.processor(), params().communications(), producerMetrics); log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId()); var onConsensus = new CompletableFuture(); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java index feaca8bf6..c204030db 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey; @@ -135,6 +136,10 @@ public Signer getSigner() { return signer; } + public Set membership() { + return validators.keySet(); + } + /** * The process has failed */ diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java index 1e2f24f96..8c59d987d 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java @@ -386,8 +386,8 @@ private void sample() { } var randomCut = member.getId(); log.info("Random cut: {} on: {}", randomCut, params.member().getId()); - var iterator = new RingIterator(params.gossipDuration(), params.context(), params.member(), - comms, true, scheduler); + var iterator = new RingIterator<>(params.gossipDuration(), params.context(), params.member(), comms, true, + scheduler); iterator.allowDuplicates(); iterator.iterate(randomCut, (link, _) -> synchronize(s, link), (_, futureSailor, destination) -> synchronize(futureSailor, votes, destination), diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java index f11310a96..6cac3b2bb 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java @@ -141,8 +141,8 @@ public String dump() { public Processor processor() { return new Processor() { @Override - public Gossip gossip(Digest context, int ring) { - final var builder = Gossip.newBuilder().setRing(ring); + public Gossip gossip(Digest context) { + final var builder = Gossip.newBuilder(); final var current = currentEpoch.get(); epochs.entrySet() .stream() diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Processor.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Processor.java index e663c30aa..8601d01dd 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Processor.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Processor.java @@ -7,9 +7,9 @@ package com.salesforce.apollo.ethereal; +import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.ethereal.proto.Gossip; import com.salesforce.apollo.ethereal.proto.Update; -import com.salesforce.apollo.cryptography.Digest; /** * @author hal.hildebrand @@ -20,10 +20,9 @@ public interface Processor { * First phase request. Answer the gossip for the current state of the receiver * * @param context - the digest id of the context for routing - * @param ring - the ring we're gossiping on * @return the Gossip */ - Gossip gossip(Digest context, int ring); + Gossip gossip(Digest context); /** * First phase reply. Answer the Update from the receiver's state, based on the suppled Have diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java index 69b7a0685..829b7e2de 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java @@ -6,11 +6,9 @@ */ package com.salesforce.apollo.ethereal.memberships; -import com.codahale.metrics.Timer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.archipelago.server.FernetServerInterceptor; -import com.salesforce.apollo.context.Context; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.ethereal.Processor; import com.salesforce.apollo.ethereal.memberships.comm.EtherealMetrics; @@ -22,7 +20,7 @@ import com.salesforce.apollo.ethereal.proto.Update; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; -import com.salesforce.apollo.ring.RingCommunications; +import com.salesforce.apollo.ring.SliceIterator; import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; import io.grpc.StatusRuntimeException; @@ -30,6 +28,9 @@ import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -51,29 +52,28 @@ public class ChRbcGossip { private static final Logger log = LoggerFactory.getLogger( ChRbcGossip.class); private final CommonCommunications comm; - private final Context context; + private final Digest id; private final SigningMember member; private final EtherealMetrics metrics; private final Processor processor; - private final RingCommunications ring; + private final SliceIterator ring; private final AtomicBoolean started = new AtomicBoolean(); private final Terminal terminal = new Terminal(); + private final List membership; private volatile ScheduledFuture scheduled; - public ChRbcGossip(Context context, SigningMember member, Processor processor, Router communications, - EtherealMetrics m) { + public ChRbcGossip(Digest id, SigningMember member, Collection membership, Processor processor, + Router communications, EtherealMetrics m) { this.processor = processor; - this.context = context; this.member = member; this.metrics = m; - comm = communications.create(member, context.getId(), terminal, getClass().getCanonicalName(), + this.id = id; + this.membership = new ArrayList<>(membership); + comm = communications.create(member, id, terminal, getClass().getCanonicalName(), r -> new GossiperServer(communications.getClientIdentityProvider(), metrics, r), getCreate(metrics), Gossiper.getLocalLoopback(member)); - ring = new RingCommunications<>(context, member, this.comm); - } - - public Context getContext() { - return context; + ring = new SliceIterator("ChRbcGossip[%s on: %s]".formatted(id, member.getId()), member, membership, + comm); } /** @@ -91,16 +91,20 @@ public void start(Duration duration, Predicate Thread.ofVirtual().start(Utils.wrapped(() -> { - try { - oneRound(duration, scheduler); - } catch (Throwable e) { - log.error("Error in gossip on: {}", member.getId(), e); - } - }, log)), initialDelay.toMillis(), TimeUnit.MILLISECONDS); + log.trace("Starting GossipService[{}] on: {}", id, member.getId()); + comm.register(id, terminal, validator); + try { + var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> { + try { + gossip(duration, scheduler); + } catch (Throwable e) { + log.error("Error in gossip on: {}", member.getId(), e); + } + }, log)), initialDelay.toMillis(), TimeUnit.MILLISECONDS); + } catch (Throwable e) { + log.error("Error in gossip on: {}", member.getId(), e); + } } /** @@ -110,8 +114,8 @@ public void stop() { if (!started.compareAndSet(true, false)) { return; } - log.trace("Stopping GossipService [{}] for {}", context.getId(), member.getId()); - comm.deregister(context.getId()); + log.trace("Stopping GossipService [{}] for {}", id, member.getId()); + comm.deregister(id); final var current = scheduled; scheduled = null; if (current != null) { @@ -119,23 +123,42 @@ public void stop() { } } + private void gossip(Duration frequency, ScheduledExecutorService scheduler) { + if (!started.get()) { + return; + } + var timer = metrics == null ? null : metrics.gossipRoundDuration().time(); + ring.iterate((link, _) -> gossipRound(link), (result, link, _) -> { + handle(result, link); + return true; + }, () -> { + if (timer != null) { + timer.stop(); + } + if (started.get()) { + scheduled = scheduler.schedule( + () -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(frequency, scheduler), log)), + frequency.toNanos(), TimeUnit.NANOSECONDS); + } + }, frequency); + } + /** * Perform the first phase of the gossip. Send our partner the Have state of the receiver */ - private Update gossipRound(Gossiper link, int ring) { + private Update gossipRound(Gossiper link) { if (!started.get()) { return null; } - log.trace("gossiping[{}] with {} ring: {} on {}", context.getId(), link.getMember(), ring, member); + log.trace("gossiping[{}] with {} on {}", id, link.getMember(), member); try { - return link.gossip(processor.gossip(context.getId(), ring)); + return link.gossip(processor.gossip(id)); } catch (StatusRuntimeException e) { - log.debug("gossiping[{}] failed: {} with: {} with {} ring: {} on: {}", context.getId(), e.getMessage(), - member.getId(), ring, link.getMember().getId(), member.getId()); + log.debug("gossiping[{}] failed: {} with: {} with {} on: {}", id, e.getMessage(), member.getId(), + link.getMember().getId(), member.getId()); return null; } catch (Throwable e) { - log.warn("gossiping[{}] failed: {} from {} with {} ring: {} on: {}", context.getId(), member.getId(), ring, - link.getMember().getId(), ring, member.getId(), e); + log.warn("gossiping:{} with: {} failed on: {}", id, link.getMember().getId(), member.getId(), e); return null; } } @@ -143,61 +166,33 @@ private Update gossipRound(Gossiper link, int ring) { /** * The second phase of the gossip. Handle the update from our gossip partner */ - private void handle(Optional result, RingCommunications.Destination destination, - Duration duration, ScheduledExecutorService scheduler, Timer.Context timer) { - if (!started.get() || destination == null || destination.link() == null) { - if (timer != null) { - timer.stop(); - } + private void handle(Optional result, Gossiper link) { + if (!started.get()) { return; } - try { - if (result.isEmpty()) { - if (timer != null) { - timer.stop(); - } - return; - } - Update update = result.get(); - if (update.equals(Update.getDefaultInstance())) { - return; - } - try { - var u = processor.update(update); - if (!Update.getDefaultInstance().equals(u)) { - log.trace("Gossip update with: {} on: {}", destination.member().getId(), member.getId()); - destination.link() - .update(ContextUpdate.newBuilder().setRing(destination.ring()).setUpdate(u).build()); - } - } catch (StatusRuntimeException e) { - log.debug("gossiping[{}] failed: {} with: {} with {} ring: {} on: {}", context.getId(), e.getMessage(), - member.getId(), ring, destination.member().getId(), member.getId()); - } catch (Throwable e) { - log.warn("gossiping[{}] failed: {} with: {} with {} ring: {} on: {}", context.getId(), e.getMessage(), - member.getId(), ring, destination.member().getId(), member.getId(), e); - } - } finally { - if (timer != null) { - timer.stop(); - } - if (started.get()) { - scheduled = scheduler.schedule( - () -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(duration, scheduler), log)), - duration.toMillis(), TimeUnit.MILLISECONDS); - } + if (link == null) { + return; } - } - - /** - * Perform one round of gossip - */ - private void oneRound(Duration duration, ScheduledExecutorService scheduler) { - if (!started.get()) { + if (result.isEmpty()) { return; } - var timer = metrics == null ? null : metrics.gossipRoundDuration().time(); - ring.execute(this::gossipRound, - (result, destination) -> handle(result, destination, duration, scheduler, timer)); + Update update = result.get(); + if (update.equals(Update.getDefaultInstance())) { + return; + } + try { + var u = processor.update(update); + if (!Update.getDefaultInstance().equals(u)) { + log.trace("Gossip update with: {} on: {}", link.getMember().getId(), member.getId()); + link.update(ContextUpdate.newBuilder().setUpdate(u).build()); + } + } catch (StatusRuntimeException e) { + log.debug("gossiping[{}] failed: {} with: {} with {} on: {}", id, e.getMessage(), member.getId(), + link.getMember().getId(), member.getId()); + } catch (Throwable e) { + log.warn("gossiping[{}] failed: {} with: {} with {} on: {}", id, e.getMessage(), member.getId(), + link.getMember().getId(), member.getId(), e); + } } /** @@ -206,13 +201,6 @@ private void oneRound(Duration duration, ScheduledExecutorService scheduler) { private class Terminal implements GossiperService, Router.ServiceRouting { @Override public Update gossip(Gossip request, Digest from) { - Member predecessor = context.predecessor(request.getRing(), member); - if (predecessor == null || !from.equals(predecessor.getId())) { - log.debug("Invalid inbound gossip context: {} from: {} on ring: {} - not predecessor: {} on: {}", - context.getId(), from, request.getRing(), - predecessor == null ? "" : predecessor.getId(), member.getId()); - return Update.getDefaultInstance(); - } final var update = processor.gossip(request); log.trace("GossipService received from: {} missing: {} on: {}", from, update.getMissingCount(), member.getId()); @@ -221,13 +209,6 @@ public Update gossip(Gossip request, Digest from) { @Override public void update(ContextUpdate request, Digest from) { - Member predecessor = context.predecessor(request.getRing(), member); - if (predecessor == null || !from.equals(predecessor.getId())) { - log.debug("Invalid inbound update context:{} from: {} on ring: {} - not predecessor: {} on: {}", - context.getId(), from, request.getRing(), - predecessor == null ? "" : predecessor.getId(), member.getId()); - return; - } log.trace("gossip update with {} on: {}", from, member.getId()); processor.updateFrom(request.getUpdate()); } diff --git a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java index 250e02f11..c6f38106f 100644 --- a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java +++ b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java @@ -13,7 +13,7 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.context.StaticContext; +import com.salesforce.apollo.context.DynamicContext; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.Signer; import com.salesforce.apollo.ethereal.memberships.ChRbcGossip; @@ -97,8 +97,12 @@ public void unbounded() throws NoSuchAlgorithmException, InterruptedException, I .map(ControlledIdentifierMember::new) .map(e -> (Member) e) .toList(); - - StaticContext context = new StaticContext<>(DigestAlgorithm.DEFAULT.getOrigin(), 0.1, members, 3); + DynamicContext context = DynamicContext.newBuilder() + .setBias(3) + .setpByz(0.1) + .setId(DigestAlgorithm.DEFAULT.getOrigin()) + .build(); + context.activate(members); var metrics = new EtherealMetricsImpl(context.getId(), "test", registry); var builder = Config.newBuilder().setnProc((short) NPROC).setNumberOfEpochs(-1).setEpochLength(EPOCH_LENGTH); @@ -132,7 +136,8 @@ public void unbounded() throws NoSuchAlgorithmException, InterruptedException, I } }, "Test: " + i); - var gossiper = new ChRbcGossip(context, (SigningMember) member, controller.processor(), com, metrics); + var gossiper = new ChRbcGossip(context.getId(), (SigningMember) member, members, controller.processor(), + com, metrics); gossipers.add(gossiper); dataSources.add(ds); controllers.add(controller); @@ -219,7 +224,12 @@ private void one(int iteration) .map(e -> (Member) e) .toList(); - StaticContext context = new StaticContext<>(DigestAlgorithm.DEFAULT.getOrigin(), 0.1, members, 3); + DynamicContext context = DynamicContext.newBuilder() + .setBias(3) + .setpByz(0.1) + .setId(DigestAlgorithm.DEFAULT.getOrigin()) + .build(); + context.activate(members); var metrics = new EtherealMetricsImpl(context.getId(), "test", registry); var builder = Config.newBuilder() .setnProc((short) NPROC) @@ -254,7 +264,8 @@ private void one(int iteration) } }, "Test: " + i); - var gossiper = new ChRbcGossip(context, (SigningMember) member, controller.processor(), com, metrics); + var gossiper = new ChRbcGossip(context.getId(), (SigningMember) member, members, controller.processor(), + com, metrics); gossipers.add(gossiper); dataSources.add(ds); controllers.add(controller); diff --git a/grpc/src/main/proto/ethereal.proto b/grpc/src/main/proto/ethereal.proto index da9f52823..91620e271 100644 --- a/grpc/src/main/proto/ethereal.proto +++ b/grpc/src/main/proto/ethereal.proto @@ -15,9 +15,8 @@ service Gossiper { } message Gossip { - int32 ring = 1; - crypto.Biff have = 2; - repeated Have haves = 3; + crypto.Biff have = 1; + repeated Have haves = 2; } message Have { diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java b/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java index 2ad10c645..a945046d6 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java @@ -105,8 +105,7 @@ public void bind(Binding bound) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); var gathered = HashMultiset.create(); - var iterate = new RingIterator(operationsFrequency, context, member, scheduler, - binderComms); + var iterate = new RingIterator<>(operationsFrequency, context, member, scheduler, binderComms); iterate.iterate(hash, null, (link, r) -> { link.bind(bound); return ""; @@ -133,8 +132,7 @@ public Bound get(Key keyAndToken) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); var gathered = HashMultiset.create(); - var iterate = new RingIterator(operationsFrequency, context, member, scheduler, - binderComms); + var iterate = new RingIterator<>(operationsFrequency, context, member, scheduler, binderComms); iterate.iterate(hash, null, (link, r) -> { var bound = link.get(keyAndToken); log.debug("Get {}: bound: <{}:{}> from: {} on: {}", hash, bound.getKey().toStringUtf8(), @@ -188,8 +186,7 @@ public void unbind(Key keyAndToken) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); var gathered = HashMultiset.create(); - var iterate = new RingIterator(operationsFrequency, context, member, scheduler, - binderComms); + var iterate = new RingIterator<>(operationsFrequency, context, member, scheduler, binderComms); iterate.iterate(hash, null, (link, r) -> { link.unbind(keyAndToken); return ""; @@ -221,7 +218,7 @@ private void add(Digest hash, Bound bound, Digest digest) { private Stream bindingsIn(KeyInterval i) { Iterator it = new Iterator() { private final Iterator iterate = bottled.keyIterator(i.getBegin()); - private Digest next; + private Digest next; { if (iterate.hasNext()) { diff --git a/memberships/src/main/java/com/salesforce/apollo/context/Context.java b/memberships/src/main/java/com/salesforce/apollo/context/Context.java index 194a71ae4..b1038e4db 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/Context.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/Context.java @@ -10,11 +10,10 @@ import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.Util; +import com.salesforce.apollo.ring.RingCommunications; import org.apache.commons.math3.random.BitsStreamGenerator; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Stream; @@ -114,7 +113,7 @@ static int minMajority(int bias, double pByz, int cardinality) { * @return the Set of Members constructed from the sucessors of the supplied hash on each of the receiver Context's * rings */ - default Set bftSubset(Digest hash) { + default LinkedHashSet bftSubset(Digest hash) { return bftSubset(hash, m -> true); } @@ -123,8 +122,8 @@ default Set bftSubset(Digest hash) { * @param filter - the filter to apply to successors * @return the Set of Members constructed from the sucessors of the supplied hash on each of the receiver Context's */ - default Set bftSubset(Digest hash, Predicate filter) { - var collector = new HashSet(); + default LinkedHashSet bftSubset(Digest hash, Predicate filter) { + var collector = new LinkedHashSet(); uniqueSuccessors(hash, filter, collector); return collector; } @@ -464,6 +463,32 @@ default int majority() { Iterable successors(int ring, Digest location); + default List> successors(Digest digest, T ignore, boolean noDuplicates, T member) { + var traversal = new ArrayList>(); + var traversed = new TreeSet(); + for (int ring = 0; ring < getRingCount(); ring++) { + if (size() == 1) { + traversal.add(new RingCommunications.iteration<>(member, ring)); + continue; + } + T successor = findSuccessor(ring, digest, m -> { + if (ignore != null && ignore.equals(m)) { + return Context.IterateResult.CONTINUE; + } + if (noDuplicates) { + if (traversed.add(m)) { + return Context.IterateResult.SUCCESS; + } else { + return Context.IterateResult.CONTINUE; + } + } + return Context.IterateResult.SUCCESS; + }); + traversal.add(new RingCommunications.iteration<>(successor, ring)); + } + return traversal; + } + /** * @param ring * @param predicate diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java b/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java index 91286415a..572b41827 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java @@ -20,11 +20,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.TreeSet; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; -import java.util.function.Function; /** * @author hal.hildebrand @@ -35,7 +33,6 @@ public class RingCommunications { final Context context; final SigningMember member; private final CommonCommunications comm; - private final Direction direction; private final Lock lock = new ReentrantLock(); private final List> traversalOrder = new ArrayList<>(); protected boolean noDuplicates = true; @@ -48,13 +45,6 @@ public RingCommunications(Context context, SigningMember member, CommonCommun public RingCommunications(Context context, SigningMember member, CommonCommunications comm, boolean ignoreSelf) { - this(Direction.SUCCESSOR, context, member, comm, ignoreSelf); - } - - public RingCommunications(Direction direction, Context context, SigningMember member, - CommonCommunications comm, boolean ignoreSelf) { - assert direction != null && context != null && member != null && comm != null; - this.direction = direction; this.context = context; this.member = member; this.comm = comm; @@ -105,43 +95,15 @@ public String toString() { return "RingCommunications [" + context.getId() + ":" + member.getId() + ":" + currentIndex + "]"; } - @SuppressWarnings("unchecked") - List> calculateTraversal(Digest digest) { - var traversal = new ArrayList>(); - var traversed = new TreeSet(); - for (int ring = 0; ring < context.getRingCount(); ring++) { - if (context.size() == 1) { - traversal.add(new iteration<>((T) member, ring)); - continue; - } - T successor = direction.retrieve(context, ring, digest, m -> { - if (ignoreSelf && m.equals(member)) { - return Context.IterateResult.CONTINUE; - } - if (noDuplicates) { - if (traversed.add(m)) { - return Context.IterateResult.SUCCESS; - } else { - return Context.IterateResult.CONTINUE; - } - } - return Context.IterateResult.SUCCESS; - }); - if (successor != null) { - traversal.add(new iteration<>(successor == null ? (T) member : successor, ring)); - } - } - return traversal; - } - final RingCommunications.Destination next(Digest digest) { lock.lock(); try { final var current = currentIndex; final var count = traversalOrder.size(); if (count == 0 || current == count - 1) { + var successors = context.successors(digest, ignoreSelf ? (T) member : null, noDuplicates, (T) member); traversalOrder.clear(); - traversalOrder.addAll(calculateTraversal(digest)); + traversalOrder.addAll(successors); Entropy.secureShuffle(traversalOrder); log.trace("New traversal order: {}:{} on: {}", context.getRingCount(), traversalOrder, member.getId()); } @@ -194,44 +156,10 @@ private Destination linkFor(Digest digest) { } } - public enum Direction { - PREDECESSOR { - @Override - public T retrieve(Context context, int ring, Digest hash, - Function test) { - return context.findPredecessor(ring, hash, test); - } - - @Override - public T retrieve(Context context, int ring, T member, - Function test) { - return context.findPredecessor(ring, member, test); - } - }, SUCCESSOR { - @Override - public T retrieve(Context context, int ring, Digest hash, - Function test) { - return context.findSuccessor(ring, hash, test); - } - - @Override - public T retrieve(Context context, int ring, T member, - Function test) { - return context.findSuccessor(ring, member, test); - } - }; - - public abstract T retrieve(Context context, int ring, Digest hash, - Function test); - - public abstract T retrieve(Context context, int ring, T member, - Function test); - } - public record Destination(M member, Q link, int ring) { } - private record iteration(T m, int ring) { + public record iteration(T m, int ring) { @Override public String toString() { diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java index 338564441..c582c09e3 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java @@ -51,18 +51,6 @@ public RingIterator(Duration frequency, Context context, SigningMember member this(frequency, context, member, comm, false, scheduler); } - public RingIterator(Duration frequency, Direction direction, Context context, SigningMember member, - CommonCommunications comm, boolean ignoreSelf, ScheduledExecutorService scheduler) { - super(direction, context, member, comm, ignoreSelf); - this.scheduler = scheduler; - this.frequency = frequency; - } - - public RingIterator(Duration frequency, Direction direction, Context context, SigningMember member, - ScheduledExecutorService scheduler, CommonCommunications comm) { - this(frequency, direction, context, member, comm, false, scheduler); - } - @Override public RingIterator allowDuplicates() { super.allowDuplicates(); diff --git a/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java b/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java index 8998468d0..3c2f6b81a 100644 --- a/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java @@ -28,8 +28,8 @@ public class RingCommunicationsTest { @Test public void smokin() throws Exception { - var serverMember1 = new SigningMemberImpl(Utils.getMember(0), ULong.MIN); - var serverMember2 = new SigningMemberImpl(Utils.getMember(1), ULong.MIN); + var serverMember1 = new SigningMemberImpl(Utils.getMember(1), ULong.MIN); + var serverMember2 = new SigningMemberImpl(Utils.getMember(2), ULong.MIN); var pinged1 = new AtomicBoolean(); var pinged2 = new AtomicBoolean(); @@ -77,24 +77,20 @@ public Any ping(Any request) { .setFactory(to -> InProcessChannelBuilder.forName(name).build()); var router = new RouterImpl(serverMember1, serverBuilder, cacheBuilder, null); try { - RouterImpl.CommonCommunications commsA = router.create(serverMember1, - context.getId(), - new ServiceImpl(local1, "A"), - "A", ServerImpl::new, - TestItClient::new, local1); + var commsA = router.create(serverMember1, context.getId(), new ServiceImpl(local1, "A"), "A", + ServerImpl::new, TestItClient::new, local1); - RouterImpl.CommonCommunications commsB = router.create(serverMember2, - context.getId(), - new ServiceImpl(local2, "B"), - "B", ServerImpl::new, - TestItClient::new, local2); + router.create(serverMember2, context.getId(), new ServiceImpl(local2, "B"), "B", ServerImpl::new, + TestItClient::new, local2); router.start(); var sync = new RingCommunications(context, serverMember1, commsA); - sync.allowDuplicates(); + sync.noDuplicates(); var countdown = new CountDownLatch(1); - sync.execute((link, round) -> link.ping(Any.getDefaultInstance()), - (result, destination) -> countdown.countDown()); + for (var i = 0; i < context.getRingCount(); i++) { + sync.execute((link, round) -> link.ping(Any.getDefaultInstance()), + (result, destination) -> countdown.countDown()); + } assertTrue(countdown.await(1, TimeUnit.SECONDS), "Completed: " + countdown.getCount()); assertFalse(pinged1.get()); assertTrue(pinged2.get()); diff --git a/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java b/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java index 15857b392..bf9d9bf67 100644 --- a/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java @@ -93,8 +93,8 @@ public Any ping(Any request) { router.start(); var frequency = Duration.ofMillis(1); var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); - var sync = new RingIterator(frequency, context, serverMember1, scheduler, commsA); - sync.allowDuplicates(); + var sync = new RingIterator<>(frequency, context, serverMember1, scheduler, commsA); + sync.noDuplicates(); var countdown = new CountDownLatch(3); sync.iterate(context.getId(), (link, round) -> link.ping(Any.getDefaultInstance()), (round, result, link) -> { diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index d26b94557..ff748b5c3 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -199,7 +199,7 @@ public KeyState_ append(AttachmentEvent event) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.dontIgnoreSelf(); iterator.iterate(identifier, null, (link, r) -> link.append(Collections.emptyList(), Collections.singletonList(event)), null, @@ -235,7 +235,7 @@ public List append(KERL_ kerl) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.dontIgnoreSelf(); iterator.iterate(identifier, null, (link, r) -> link.append(kerl), null, (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, @@ -264,7 +264,7 @@ public KeyState_ append(KeyEvent_ event) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.dontIgnoreSelf(); iterator.iterate(identifier, null, (link, r) -> link.append(Collections.singletonList(event)), null, (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, @@ -321,7 +321,7 @@ public Empty appendAttachments(List events) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.dontIgnoreSelf(); iterator.iterate(identifier, null, (link, r) -> link.appendAttachments(events), null, (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, @@ -355,7 +355,7 @@ public Empty appendValidations(Validations validations) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.dontIgnoreSelf(); iterator.iterate(identifier, null, (link, r) -> link.appendValidations(validations), null, (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, @@ -408,7 +408,7 @@ public Attachment getAttachment(EventCoords coordinates) { var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var operation = "getAttachment(%s)".formatted(EventCoordinates.from(coordinates)); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.iterate(identifier, null, (link, r) -> link.getAttachment(coordinates), () -> failedMajority(result, maxCount(gathered), operation), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, identifier, @@ -443,7 +443,7 @@ public KERL_ getKERL(Ident identifier) { var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var operation = "getKerl(%s)".formatted(Identifier.from(identifier)); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.iterate(digest, null, (link, r) -> link.getKERL(identifier), () -> failedMajority(result, maxCount(gathered), operation), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, @@ -482,7 +482,7 @@ public KeyEvent_ getKeyEvent(EventCoords coordinates) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.iterate(digest, null, (link, r) -> link.getKeyEvent(coordinates), () -> failedMajority(result, maxCount(gathered), operation), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, @@ -518,7 +518,7 @@ public KeyState_ getKeyState(EventCoords coordinates) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.iterate(digest, null, (link, r) -> link.getKeyState(coordinates), () -> failedMajority(result, maxCount(gathered), operation), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, @@ -555,7 +555,7 @@ public KeyState_ getKeyState(Ident identifier, long sequenceNumber) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.iterate(digest, null, (link, r) -> link.getKeyState(identAndSeq), () -> failedMajority(result, maxCount(gathered), operation), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, @@ -591,7 +591,7 @@ public KeyState_ getKeyState(Ident identifier) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.iterate(digest, null, (link, r) -> link.getKeyState(identifier), () -> failedMajority(result, maxCount(gathered), operation), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, @@ -627,7 +627,7 @@ public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinat Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.iterate(digest, null, (link, r) -> link.getKeyStateWithAttachments(coordinates), () -> failedMajority(result, maxCount(gathered), operation), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, @@ -663,7 +663,7 @@ public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndVal Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.iterate(digest, null, (link, r) -> link.getKeyStateWithEndorsementsAndValidations(coordinates), () -> failedMajority(result, maxCount(gathered), operation), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, @@ -699,7 +699,7 @@ public Validations getValidations(EventCoords coordinates) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.iterate(identifier, null, (link, r) -> link.getValidations(coordinates), () -> failedMajority(result, maxCount(gathered), operation), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, identifier,