Skip to content

Commit

Permalink
handle bootstrapping context
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed May 23, 2024
1 parent ce84f28 commit cb47d0a
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,14 @@ public void setUp() throws Exception {

@Test
public void smokin() throws Exception {

var bootstrap = members.subList(0, 4);
var kernel = bootstrap.get(0);
contexts.get(kernel).activate(kernel);
routers.get(kernel).start();
choams.get(kernel).start();

bootstrap.forEach(member -> bootstrap.forEach(m -> contexts.get(member).activate(m)));

bootstrap.forEach(member -> bootstrap.forEach(m -> contexts.get(member).activate(m)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public void enroll(Notarization request, Digest from) {
log.warn("Invalid notarization for: {} from: {} on: {}", identifier, from, member.getId());
throw new StatusRuntimeException(Status.UNAUTHENTICATED.withDescription("Invalid notarization"));
}
log.info("Enrolling notorization for: {} from: {} on: {}", identifier, from, member.getId());
log.info("Enrolling notarization for: {} from: {} on: {}", identifier, from, member.getId());
Gorgoneion.this.enroll(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public class RingCommunications<T extends Member, Comm extends Link> {
final SigningMember member;
private final CommonCommunications<Comm, ?> comm;
private final Direction direction;
private final boolean ignoreSelf;
private final Lock lock = new ReentrantLock();
private final List<iteration<T>> traversalOrder = new ArrayList<>();
protected boolean noDuplicates = true;
volatile int currentIndex = -1;
private boolean ignoreSelf;

public RingCommunications(Context<T> context, SigningMember member, CommonCommunications<Comm, ?> comm) {
this(context, member, comm, false);
Expand All @@ -66,6 +66,10 @@ public RingCommunications<T, Comm> allowDuplicates() {
return this;
}

public void dontIgnoreSelf() {
this.ignoreSelf = false;
}

public <Q> void execute(BiFunction<Comm, Integer, Q> round, SyncHandler<T, Q, Comm> handler) {
final var next = next(member.getId());
if (next == null || next.member == null) {
Expand All @@ -81,13 +85,17 @@ public <Q> void execute(BiFunction<Comm, Integer, Q> round, SyncHandler<T, Q, Co
}
}

public void ignoreSelf() {
this.ignoreSelf = true;
}

public RingCommunications<T, Comm> noDuplicates() {
noDuplicates = true;
return this;
}

public void reset() {
currentIndex = 0;
currentIndex = -1;
traversalOrder.clear();
log.trace("Reset on: {}", member.getId());
}
Expand All @@ -102,6 +110,10 @@ List<iteration<T>> calculateTraversal(Digest digest) {
var traversal = new ArrayList<iteration<T>>();
var traversed = new TreeSet<T>();
for (int ring = 0; ring < context.getRingCount(); ring++) {
if (context.size() == 1) {
traversal.add(new iteration<>((T) member, ring));
continue;
}
T successor = direction.retrieve(context, ring, digest, m -> {
if (ignoreSelf && m.equals(member)) {
return Context.IterateResult.CONTINUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private <Q> void internalIterate(Digest digest, Runnable onMajority, BiFunction<

var next = next(digest);
log.trace("Iteration: {} tally: {} for digest: {} on: {} ring: {} complete: false on: {}", iteration(),
tally.get(), digest, context.getId(), next.ring(), member.getId());
tally.get(), digest, context.getId(), next == null ? "<null>" : next.ring(), member.getId());
if (next == null || next.link() == null) {
log.trace("No successor found for digest: {} on: {} iteration: {} traversed: {} ring: {} on: {}", digest,
context.getId(), iteration(), traversed, currentIndex, member.getId());
Expand All @@ -125,8 +125,8 @@ private <Q> void internalIterate(Digest digest, Runnable onMajority, BiFunction<
digest, context.getId(), tally.get(), member.getId());
schedule(proceed);
} else {
log.trace("Completed on iteration: {} on: {} for digest: {} for: {} tally: {} on: {}", iteration(),
digest, context.getId(), tally.get(), member.getId());
log.trace("Completed on iteration: {} for digest: {} for: {} tally: {} on: {}", iteration(), digest,
context.getId(), tally.get(), member.getId());
}
return;
}
Expand Down
111 changes: 47 additions & 64 deletions thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,13 @@ public KeyState_ append(AttachmentEvent event) {
Supplier<Boolean> isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture<KeyStates>();
HashMultiset<KeyStates> gathered = HashMultiset.create();
new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null,
(link, r) -> link.append(
Collections.emptyList(),
Collections.singletonList(
event)), null,
(tally, futureSailor, destination) -> mutate(
gathered, futureSailor,
identifier, isTimedOut,
tally, destination,
"append events"),
t -> completeIt(result,
gathered));
var iterator = new RingIterator<Member, DhtService>(operationsFrequency, context, member, scheduler, dhtComms);
iterator.dontIgnoreSelf();
iterator.iterate(identifier, null,
(link, r) -> link.append(Collections.emptyList(), Collections.singletonList(event)), null,
(tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut,
tally, destination, "append events"),
t -> completeIt(result, gathered));
try {
List<KeyState_> s = result.get().getKeyStatesList();
return s.isEmpty() ? null : s.getFirst();
Expand Down Expand Up @@ -239,16 +234,12 @@ public List<KeyState_> append(KERL_ kerl) {
Supplier<Boolean> isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture<KeyStates>();
HashMultiset<KeyStates> gathered = HashMultiset.create();
new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null,
(link, r) -> link.append(
kerl), null,
(tally, futureSailor, destination) -> mutate(
gathered, futureSailor,
identifier, isTimedOut,
tally, destination,
"append kerl"),
t -> completeIt(result,
gathered));
var iterator = new RingIterator<Member, DhtService>(operationsFrequency, context, member, scheduler, dhtComms);
iterator.dontIgnoreSelf();
iterator.iterate(identifier, null, (link, r) -> link.append(kerl), null,
(tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut,
tally, destination, "append kerl"),
t -> completeIt(result, gathered));
try {
return result.get().getKeyStatesList();
} catch (InterruptedException e) {
Expand All @@ -272,17 +263,12 @@ public KeyState_ append(KeyEvent_ event) {
Supplier<Boolean> isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture<KeyStates>();
HashMultiset<KeyStates> gathered = HashMultiset.create();
new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null,
(link, r) -> link.append(
Collections.singletonList(
event)), null,
(tally, futureSailor, destination) -> mutate(
gathered, futureSailor,
identifier, isTimedOut,
tally, destination,
"append events"),
t -> completeIt(result,
gathered));
var iterator = new RingIterator<Member, DhtService>(operationsFrequency, context, member, scheduler, dhtComms);
iterator.dontIgnoreSelf();
iterator.iterate(identifier, null, (link, r) -> link.append(Collections.singletonList(event)), null,
(tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut,
tally, destination, "append event"),
t -> completeIt(result, gathered));
try {
var ks = result.get();
return ks.getKeyStatesCount() == 0 ? KeyState_.getDefaultInstance() : ks.getKeyStatesList().getFirst();
Expand Down Expand Up @@ -334,16 +320,12 @@ public Empty appendAttachments(List<AttachmentEvent> events) {
Supplier<Boolean> isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture<Empty>();
HashMultiset<Empty> gathered = HashMultiset.create();
new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null,
(link, r) -> link.appendAttachments(
events), null,
(tally, futureSailor, destination) -> mutate(
gathered, futureSailor,
identifier, isTimedOut,
tally, destination,
"append attachments"),
t -> completeIt(result,
gathered));
var iterator = new RingIterator<Member, DhtService>(operationsFrequency, context, member, scheduler, dhtComms);
iterator.dontIgnoreSelf();
iterator.iterate(identifier, null, (link, r) -> link.appendAttachments(events), null,
(tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut,
tally, destination, "append attachments"),
t -> completeIt(result, gathered));

try {
return result.get();
Expand Down Expand Up @@ -372,16 +354,12 @@ public Empty appendValidations(Validations validations) {
Supplier<Boolean> isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture<Empty>();
HashMultiset<Empty> gathered = HashMultiset.create();
new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null,
(link, r) -> link.appendValidations(
validations), null,
(tally, futureSailor, destination) -> mutate(
gathered, futureSailor,
identifier, isTimedOut,
tally, destination,
"append validations"),
t -> completeIt(result,
gathered));
var iterator = new RingIterator<Member, DhtService>(operationsFrequency, context, member, scheduler, dhtComms);
iterator.dontIgnoreSelf();
iterator.iterate(identifier, null, (link, r) -> link.appendValidations(validations), null,
(tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut,
tally, destination, "append validations"),
t -> completeIt(result, gathered));
try {
return result.get();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -799,8 +777,9 @@ private <T> void completeIt(CompletableFuture<T> result, HashMultiset<T> gathere
.stream()
.max(Ordering.natural().onResultOf(Multiset.Entry::getCount))
.orElse(null);
var majority = context.size() == 1 ? 1 : context.majority();
if (max != null) {
if (max.getCount() >= context.majority()) {
if (max.getCount() >= majority) {
try {
result.complete(max.getElement());
} catch (Throwable t) {
Expand All @@ -810,8 +789,8 @@ private <T> void completeIt(CompletableFuture<T> result, HashMultiset<T> gathere
}
}
result.completeExceptionally(new CompletionException(
"Unable to achieve majority, max: " + (max == null ? 0 : max.getCount()) + " required: " + context.majority()
+ " on: " + member.getId()));
"Unable to achieve majority, max: " + (max == null ? 0 : max.getCount()) + " required: " + majority + " on: "
+ member.getId()));
}

private boolean failedMajority(CompletableFuture<?> result, int maxAgree, String operation) {
Expand Down Expand Up @@ -867,38 +846,42 @@ private <T> boolean mutate(HashMultiset<T> gathered, Optional<T> futureSailor, D
Supplier<Boolean> isTimedOut, AtomicInteger tally,
RingCommunications.Destination<Member, DhtService> destination, String action) {
if (futureSailor.isEmpty()) {
log.debug("Failed {}: {} from: {} on: {}", action, identifier, destination.member().getId(),
member.getId());
log.debug("Failed {}: {} tally: {} from: {} on: {}", action, identifier, tally.get(),
destination.member().getId(), member.getId());
return !isTimedOut.get();
}
T content = futureSailor.get();
log.trace("{}: {} from: {} on: {}", action, identifier, destination.member().getId(), member.getId());
gathered.add(content);
gathered.entrySet()
.stream()
.max(Ordering.natural().onResultOf(Entry::getCount))
.ifPresent(max -> tally.set(max.getCount()));
log.warn("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(),
member.getId());
return !isTimedOut.get();
}

private <T> boolean read(CompletableFuture<T> result, HashMultiset<T> gathered, AtomicInteger tally,
Optional<T> futureSailor, Digest identifier, Supplier<Boolean> isTimedOut,
RingCommunications.Destination<Member, DhtService> destination, String action, T empty) {
if (futureSailor.isEmpty()) {
log.debug("Failed {}: {} from: {} on: {}", action, identifier, destination.member().getId(),
member.getId());
log.debug("Failed {}: {} tally: {} from: {} on: {}", action, identifier, tally,
destination.member().getId(), member.getId());
return !isTimedOut.get();
}
T content = futureSailor.get();
log.trace("{}: {} from: {} on: {}", action, identifier, destination.member().getId(), member.getId());
log.trace("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(),
member.getId());
gathered.add(content);
var max = max(gathered);
if (max != null) {
tally.set(max.getCount());
final var majority = tally.get() >= context.majority();
var ctxMajority = context.size() == 1 ? 1 : context.majority();
final var majority = tally.get() >= ctxMajority;
if (majority) {
result.complete(max.getElement());
log.debug("Majority: {} achieved: {}: {} on: {}", max.getCount(), action, identifier, member.getId());
log.debug("Majority: {} achieved: {}: {} tally: {} on: {}", max.getCount(), action, identifier,
tally.get(), member.getId());
return false;
}
}
Expand Down
Loading

0 comments on commit cb47d0a

Please sign in to comment.