From 2bdb1b7db87518aa72a92673554f82be5fd24ff5 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Thu, 4 Jul 2024 08:46:09 -0700 Subject: [PATCH] refactor bft subset. simplify TxDataSource --- .../apollo/choam/support/TxDataSource.java | 31 ++++++++++++---- .../salesforce/apollo/context/Context.java | 36 ++++++++++--------- .../apollo/context/DynamicContextImpl.java | 23 +++++------- .../apollo/context/StaticContext.java | 14 ++++---- .../apollo/ring/RingCommunications.java | 24 +++++-------- 5 files changed, 69 insertions(+), 59 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java index 670f98ac3..cc29baf4e 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java @@ -69,13 +69,6 @@ public void drain() { @Override public ByteString getData() { var builder = UnitData.newBuilder(); - var r = new ArrayList(); - assemblies.drainTo(r); - builder.addAllAssemblies(r); - - var v = new ArrayList(); - validations.drainTo(v); - builder.addAllValidations(v); if (!draining.get()) { if (processing.size() > 0 || (validations.isEmpty() || assemblies.isEmpty())) { try { @@ -89,6 +82,30 @@ public ByteString getData() { } } } + var r = new ArrayList(); + assemblies.drainTo(r); + builder.addAllAssemblies(r); + + var v = new ArrayList(); + validations.drainTo(v); + builder.addAllValidations(v); + if (draining.get() && r.isEmpty() && v.isEmpty()) { + var target = System.currentTimeMillis() + batchInterval.toMillis(); + while (System.currentTimeMillis() < target) { + assemblies.drainTo(r); + validations.drainTo(v); + builder.addAllAssemblies(r); + builder.addAllValidations(v); + if (!v.isEmpty() || !r.isEmpty()) { + break; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } ByteString bs = builder.build().toByteString(); if (metrics != null) { diff --git a/memberships/src/main/java/com/salesforce/apollo/context/Context.java b/memberships/src/main/java/com/salesforce/apollo/context/Context.java index ec82cd7dd..384bbd3a6 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/Context.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/Context.java @@ -10,7 +10,6 @@ import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.Util; -import com.salesforce.apollo.ring.RingCommunications; import org.apache.commons.math3.random.BitsStreamGenerator; import java.util.*; @@ -106,19 +105,19 @@ static int minMajority(int bias, double pByz, int cardinality) { /** * @param hash - the point on the rings to determine successors - * @return the Set of Members constructed from the sucessors of the supplied hash on each of the receiver Context's + * @return the Set of Members constructed from the successors of the supplied hash on each of the receiver Context's * rings */ - default LinkedHashSet bftSubset(Digest hash) { + default SequencedSet bftSubset(Digest hash) { return bftSubset(hash, m -> true); } /** * @param hash - the point on the rings to determine successors * @param filter - the filter to apply to successors - * @return the Set of Members constructed from the sucessors of the supplied hash on each of the receiver Context's + * @return the Set of Members constructed from the successors of the supplied hash on each of the receiver Context's */ - default LinkedHashSet bftSubset(Digest hash, Predicate filter) { + default SequencedSet bftSubset(Digest hash, Predicate filter) { var collector = new LinkedHashSet(); uniqueSuccessors(hash, filter, collector); return collector; @@ -437,12 +436,12 @@ default int majority() { Iterable successors(int ring, Digest location); - default List> successors(Digest digest, T ignore, boolean noDuplicates, T member) { - var traversal = new ArrayList>(); + default List> successors(Digest digest, T ignore, boolean noDuplicates, T member) { + var traversal = new ArrayList>(); var traversed = new TreeSet(); for (int ring = 0; ring < getRingCount(); ring++) { if (size() == 1) { - traversal.add(new RingCommunications.iteration<>(member, ring)); + traversal.add(new iteration<>(member, ring)); continue; } T successor = findSuccessor(ring, digest, m -> { @@ -458,7 +457,7 @@ default List> successors(Digest digest, T ignore } return Context.IterateResult.SUCCESS; }); - traversal.add(new RingCommunications.iteration<>(successor, ring)); + traversal.add(new iteration<>(successor, ring)); } return traversal; } @@ -502,16 +501,12 @@ default int toleranceLevel() { */ void uniqueSuccessors(Digest key, Predicate test, Set collector); - default Set uniqueSuccessors(Digest digest) { - var collected = new HashSet(); - uniqueSuccessors(digest, collected); - return collected; - } - /** * collect the list of successors to the key on each ring, providing a unique member per ring if possible. */ - void uniqueSuccessors(Digest key, Set collector); + default void uniqueSuccessors(Digest key, Set collector) { + uniqueSuccessors(key, t -> true, collector); + } boolean validRing(int ring); @@ -522,6 +517,15 @@ enum IterateResult { CONTINUE, FAIL, SUCCESS } + record iteration(T m, int ring) { + + @Override + public String toString() { + return String.format("[%s,%s]", m == null ? "" : m.getId(), ring); + } + + } + /** * @author hal.hildebrand **/ diff --git a/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java b/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java index 58fc888b7..43cb35a3d 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java @@ -726,23 +726,24 @@ public Iterable traverse(int ring, T member) { } /** - * @return the list of successor to the key on each ring that pass the provided predicate test + * @return the list of successor to the key on each ring that passes the provided predicate test */ @Override public void uniqueSuccessors(Digest key, Predicate test, Set collector) { + var delegate = ring(0).successor(key, test); + if (delegate == null) { + return; + } for (Ring ring : rings) { - T successor = ring.successor(key, m -> !collector.contains(m) && test.test(m)); + T successor = ring.successor(hashFor(delegate, ring.index), m -> !collector.contains(m) && test.test(m)); if (successor != null) { collector.add(successor); + } else { + collector.add(delegate); } } } - @Override - public void uniqueSuccessors(Digest key, Set collector) { - uniqueSuccessors(key, t -> true, collector); - } - @Override public boolean validRing(int ring) { return ring >= 0 && ring < rings.size(); @@ -1150,7 +1151,6 @@ public Stream stream() { } /** - * @param start * @param predicate * @return a Stream of all items counter-clock wise in the ring from (but excluding) start location to (but * excluding) the first item where predicate(item) evaluates to True. @@ -1160,7 +1160,6 @@ public Stream streamPredecessors(Digest location, Predicate predicate) { } /** - * @param start * @param predicate * @return a list of all items counter-clock wise in the ring from (but excluding) start item to (but excluding) * the first item where predicate(item) evaluates to True. @@ -1170,7 +1169,6 @@ public Stream streamPredecessors(T m, Predicate predicate) { } /** - * @param start * @param predicate * @return a Stream of all items counter-clock wise in the ring from (but excluding) start location to (but * excluding) the first item where predicate(item) evaluates to True. @@ -1180,7 +1178,6 @@ public Stream streamSuccessors(Digest location, Predicate predicate) { } /** - * @param start * @param predicate * @return a Stream of all items counter-clock wise in the ring from (but excluding) start item to (but * excluding) the first item where predicate(item) evaluates to True. @@ -1190,8 +1187,6 @@ public Stream streamSuccessors(T m, Predicate predicate) { } /** - * @param start - * @param predicate * @return a iterable of all items counter-clock wise in the ring from (but excluding) start location to (but * excluding) the first item where predicate(item) evaluates to True. */ @@ -1219,7 +1214,7 @@ public T successor(T m) { /** * @param m - the member * @param predicate - the test predicate - * @return the first successor of m for which predicate evaluates to True. m is never evaluated.. + * @return the first successor of m for which predicate evaluates to True. m is never evaluated. */ public T successor(T m, Predicate predicate) { return succ(hash(m), predicate); diff --git a/memberships/src/main/java/com/salesforce/apollo/context/StaticContext.java b/memberships/src/main/java/com/salesforce/apollo/context/StaticContext.java index d91662176..7c78c165c 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/StaticContext.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/StaticContext.java @@ -449,19 +449,21 @@ public Iterable traverse(int ring, T member) { @Override public void uniqueSuccessors(Digest key, Predicate test, Set collector) { + var delegate = ring(0).successor(key, test); + if (delegate == null) { + return; + } for (int ring = 0; ring < rings.length; ring++) { - T successor = ring(ring).successor(key, m -> !collector.contains(m) && test.test(m)); + StaticRing r = ring(ring); + T successor = r.successor(hashFor(delegate, r.index), m -> !collector.contains(m) && test.test(m)); if (successor != null) { collector.add(successor); + } else { + collector.add(delegate); } } } - @Override - public void uniqueSuccessors(Digest key, Set collector) { - uniqueSuccessors(key, t -> true, collector); - } - @Override public boolean validRing(int ring) { return ring >= 0 && ring < rings.length; diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java b/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java index 572b41827..fe1cb61b8 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java @@ -34,7 +34,7 @@ public class RingCommunications { final SigningMember member; private final CommonCommunications comm; private final Lock lock = new ReentrantLock(); - private final List> traversalOrder = new ArrayList<>(); + private final List> traversalOrder = new ArrayList<>(); protected boolean noDuplicates = true; volatile int currentIndex = -1; private boolean ignoreSelf; @@ -140,31 +140,23 @@ private Destination linkFor(Digest digest) { return null; } final var current = currentIndex; - iteration successor = null; + Context.iteration successor = null; try { successor = traversalOrder.get(current); - final Comm link = comm.connect(successor.m); + final Comm link = comm.connect(successor.m()); if (link == null) { - log.trace("No connection to {} on: {}", successor.m == null ? "" : successor.m.getId(), + log.trace("No connection to {} on: {}", successor.m() == null ? "" : successor.m().getId(), member.getId()); } - return new Destination<>(successor.m, link, successor.ring); + return new Destination<>(successor.m(), link, successor.ring()); } catch (Throwable e) { - log.trace("error opening connection to {}: {} on: {}", successor.m == null ? "" : successor.m.getId(), + log.trace("error opening connection to {}: {} on: {}", + successor.m() == null ? "" : successor.m().getId(), (e.getCause() != null ? e.getCause() : e).getMessage(), member.getId()); - return new Destination<>(successor.m, null, successor.ring); + return new Destination<>(successor.m(), null, successor.ring()); } } public record Destination(M member, Q link, int ring) { } - - public record iteration(T m, int ring) { - - @Override - public String toString() { - return String.format("[%s,%s]", m == null ? "" : m.getId(), ring); - } - - } }