Skip to content

Commit

Permalink
refactor bft subset. simplify TxDataSource
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jul 4, 2024
1 parent d743f34 commit 2bdb1b7
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ public void drain() {
@Override
public ByteString getData() {
var builder = UnitData.newBuilder();
var r = new ArrayList<Assemblies>();
assemblies.drainTo(r);
builder.addAllAssemblies(r);

var v = new ArrayList<Validate>();
validations.drainTo(v);
builder.addAllValidations(v);
if (!draining.get()) {
if (processing.size() > 0 || (validations.isEmpty() || assemblies.isEmpty())) {
try {
Expand All @@ -89,6 +82,30 @@ public ByteString getData() {
}
}
}
var r = new ArrayList<Assemblies>();
assemblies.drainTo(r);
builder.addAllAssemblies(r);

var v = new ArrayList<Validate>();
validations.drainTo(v);
builder.addAllValidations(v);
if (draining.get() && r.isEmpty() && v.isEmpty()) {
var target = System.currentTimeMillis() + batchInterval.toMillis();
while (System.currentTimeMillis() < target) {
assemblies.drainTo(r);
validations.drainTo(v);
builder.addAllAssemblies(r);
builder.addAllValidations(v);
if (!v.isEmpty() || !r.isEmpty()) {
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

ByteString bs = builder.build().toByteString();
if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.Util;
import com.salesforce.apollo.ring.RingCommunications;
import org.apache.commons.math3.random.BitsStreamGenerator;

import java.util.*;
Expand Down Expand Up @@ -106,19 +105,19 @@ static int minMajority(int bias, double pByz, int cardinality) {

/**
* @param hash - the point on the rings to determine successors
* @return the Set of Members constructed from the sucessors of the supplied hash on each of the receiver Context's
* @return the Set of Members constructed from the successors of the supplied hash on each of the receiver Context's
* rings
*/
default LinkedHashSet<T> bftSubset(Digest hash) {
default SequencedSet<T> bftSubset(Digest hash) {
return bftSubset(hash, m -> true);
}

/**
* @param hash - the point on the rings to determine successors
* @param filter - the filter to apply to successors
* @return the Set of Members constructed from the sucessors of the supplied hash on each of the receiver Context's
* @return the Set of Members constructed from the successors of the supplied hash on each of the receiver Context's
*/
default LinkedHashSet<T> bftSubset(Digest hash, Predicate<T> filter) {
default SequencedSet<T> bftSubset(Digest hash, Predicate<T> filter) {
var collector = new LinkedHashSet<T>();
uniqueSuccessors(hash, filter, collector);
return collector;
Expand Down Expand Up @@ -437,12 +436,12 @@ default int majority() {

Iterable<T> successors(int ring, Digest location);

default List<RingCommunications.iteration<T>> successors(Digest digest, T ignore, boolean noDuplicates, T member) {
var traversal = new ArrayList<RingCommunications.iteration<T>>();
default List<iteration<T>> successors(Digest digest, T ignore, boolean noDuplicates, T member) {
var traversal = new ArrayList<iteration<T>>();
var traversed = new TreeSet<T>();
for (int ring = 0; ring < getRingCount(); ring++) {
if (size() == 1) {
traversal.add(new RingCommunications.iteration<>(member, ring));
traversal.add(new iteration<>(member, ring));
continue;
}
T successor = findSuccessor(ring, digest, m -> {
Expand All @@ -458,7 +457,7 @@ default List<RingCommunications.iteration<T>> successors(Digest digest, T ignore
}
return Context.IterateResult.SUCCESS;
});
traversal.add(new RingCommunications.iteration<>(successor, ring));
traversal.add(new iteration<>(successor, ring));
}
return traversal;
}
Expand Down Expand Up @@ -502,16 +501,12 @@ default int toleranceLevel() {
*/
void uniqueSuccessors(Digest key, Predicate<T> test, Set<T> collector);

default Set<T> uniqueSuccessors(Digest digest) {
var collected = new HashSet<T>();
uniqueSuccessors(digest, collected);
return collected;
}

/**
* collect the list of successors to the key on each ring, providing a unique member per ring if possible.
*/
void uniqueSuccessors(Digest key, Set<T> collector);
default void uniqueSuccessors(Digest key, Set<T> collector) {
uniqueSuccessors(key, t -> true, collector);
}

boolean validRing(int ring);

Expand All @@ -522,6 +517,15 @@ enum IterateResult {
CONTINUE, FAIL, SUCCESS
}

record iteration<T extends Member>(T m, int ring) {

@Override
public String toString() {
return String.format("[%s,%s]", m == null ? "<null>" : m.getId(), ring);
}

}

/**
* @author hal.hildebrand
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,23 +726,24 @@ public Iterable<T> traverse(int ring, T member) {
}

/**
* @return the list of successor to the key on each ring that pass the provided predicate test
* @return the list of successor to the key on each ring that passes the provided predicate test
*/
@Override
public void uniqueSuccessors(Digest key, Predicate<T> test, Set<T> collector) {
var delegate = ring(0).successor(key, test);
if (delegate == null) {
return;
}
for (Ring<T> ring : rings) {
T successor = ring.successor(key, m -> !collector.contains(m) && test.test(m));
T successor = ring.successor(hashFor(delegate, ring.index), m -> !collector.contains(m) && test.test(m));
if (successor != null) {
collector.add(successor);
} else {
collector.add(delegate);
}
}
}

@Override
public void uniqueSuccessors(Digest key, Set<T> collector) {
uniqueSuccessors(key, t -> true, collector);
}

@Override
public boolean validRing(int ring) {
return ring >= 0 && ring < rings.size();
Expand Down Expand Up @@ -1150,7 +1151,6 @@ public Stream<T> stream() {
}

/**
* @param start
* @param predicate
* @return a Stream of all items counter-clock wise in the ring from (but excluding) start location to (but
* excluding) the first item where predicate(item) evaluates to True.
Expand All @@ -1160,7 +1160,6 @@ public Stream<T> streamPredecessors(Digest location, Predicate<T> predicate) {
}

/**
* @param start
* @param predicate
* @return a list of all items counter-clock wise in the ring from (but excluding) start item to (but excluding)
* the first item where predicate(item) evaluates to True.
Expand All @@ -1170,7 +1169,6 @@ public Stream<T> streamPredecessors(T m, Predicate<T> predicate) {
}

/**
* @param start
* @param predicate
* @return a Stream of all items counter-clock wise in the ring from (but excluding) start location to (but
* excluding) the first item where predicate(item) evaluates to True.
Expand All @@ -1180,7 +1178,6 @@ public Stream<T> streamSuccessors(Digest location, Predicate<T> predicate) {
}

/**
* @param start
* @param predicate
* @return a Stream of all items counter-clock wise in the ring from (but excluding) start item to (but
* excluding) the first item where predicate(item) evaluates to True.
Expand All @@ -1190,8 +1187,6 @@ public Stream<T> streamSuccessors(T m, Predicate<T> predicate) {
}

/**
* @param start
* @param predicate
* @return a iterable of all items counter-clock wise in the ring from (but excluding) start location to (but
* excluding) the first item where predicate(item) evaluates to True.
*/
Expand Down Expand Up @@ -1219,7 +1214,7 @@ public T successor(T m) {
/**
* @param m - the member
* @param predicate - the test predicate
* @return the first successor of m for which predicate evaluates to True. m is never evaluated..
* @return the first successor of m for which predicate evaluates to True. m is never evaluated.
*/
public T successor(T m, Predicate<T> predicate) {
return succ(hash(m), predicate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,19 +449,21 @@ public Iterable<T> traverse(int ring, T member) {

@Override
public void uniqueSuccessors(Digest key, Predicate<T> test, Set<T> collector) {
var delegate = ring(0).successor(key, test);
if (delegate == null) {
return;
}
for (int ring = 0; ring < rings.length; ring++) {
T successor = ring(ring).successor(key, m -> !collector.contains(m) && test.test(m));
StaticRing r = ring(ring);
T successor = r.successor(hashFor(delegate, r.index), m -> !collector.contains(m) && test.test(m));
if (successor != null) {
collector.add(successor);
} else {
collector.add(delegate);
}
}
}

@Override
public void uniqueSuccessors(Digest key, Set<T> collector) {
uniqueSuccessors(key, t -> true, collector);
}

@Override
public boolean validRing(int ring) {
return ring >= 0 && ring < rings.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class RingCommunications<T extends Member, Comm extends Link> {
final SigningMember member;
private final CommonCommunications<Comm, ?> comm;
private final Lock lock = new ReentrantLock();
private final List<iteration<T>> traversalOrder = new ArrayList<>();
private final List<Context.iteration<T>> traversalOrder = new ArrayList<>();
protected boolean noDuplicates = true;
volatile int currentIndex = -1;
private boolean ignoreSelf;
Expand Down Expand Up @@ -140,31 +140,23 @@ private Destination<T, Comm> linkFor(Digest digest) {
return null;
}
final var current = currentIndex;
iteration<T> successor = null;
Context.iteration<T> successor = null;
try {
successor = traversalOrder.get(current);
final Comm link = comm.connect(successor.m);
final Comm link = comm.connect(successor.m());
if (link == null) {
log.trace("No connection to {} on: {}", successor.m == null ? "<null>" : successor.m.getId(),
log.trace("No connection to {} on: {}", successor.m() == null ? "<null>" : successor.m().getId(),
member.getId());
}
return new Destination<>(successor.m, link, successor.ring);
return new Destination<>(successor.m(), link, successor.ring());
} catch (Throwable e) {
log.trace("error opening connection to {}: {} on: {}", successor.m == null ? "<null>" : successor.m.getId(),
log.trace("error opening connection to {}: {} on: {}",
successor.m() == null ? "<null>" : successor.m().getId(),
(e.getCause() != null ? e.getCause() : e).getMessage(), member.getId());
return new Destination<>(successor.m, null, successor.ring);
return new Destination<>(successor.m(), null, successor.ring());
}
}

public record Destination<M, Q>(M member, Q link, int ring) {
}

public record iteration<T extends Member>(T m, int ring) {

@Override
public String toString() {
return String.format("[%s,%s]", m == null ? "<null>" : m.getId(), ring);
}

}
}

0 comments on commit 2bdb1b7

Please sign in to comment.