Skip to content

Commit

Permalink
gate view change in KerlDHT
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 11, 2024
1 parent 889e7d5 commit 1533216
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void stop() {

protected BiConsumer<Context, Digest> listener() {
return (context, diadem) -> {
dht.nextView(context, diadem);
choam.rotateViewKeys(context, diadem);

log.info("View change: {} for: {} cardinality: {} on: {}", diadem, params.context().getId(), context.size(),
Expand Down
24 changes: 17 additions & 7 deletions thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.archipelago.server.FernetServerInterceptor;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DelegatedContext;
import com.salesforce.apollo.context.StaticContext;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.cryptography.Verifier;
Expand Down Expand Up @@ -96,7 +98,7 @@ public class KerlDHT implements ProtoKERLService {
private final Ani ani;
private final CachingKERL cache;
private final JdbcConnectionPool connectionPool;
private final Context<Member> context;
private final DelegatedContext<Member> context;
private final CommonCommunications<DhtService, ProtoKERLService> dhtComms;
private final double fpr;
private final Duration operationsFrequency;
Expand All @@ -116,9 +118,7 @@ public KerlDHT(Duration operationsFrequency, Context<? extends Member> context,
BiFunction<KerlDHT, KERL.AppendKERL, KERL.AppendKERL> wrap, JdbcConnectionPool connectionPool,
DigestAlgorithm digestAlgorithm, Router communications, TemporalAmount operationTimeout,
double falsePositiveRate, StereotomyMetrics metrics) {
@SuppressWarnings("unchecked")
final var casting = (Context<Member>) context;
this.context = casting;
this.context = new DelegatedContext<Member>((Context<Member>) new StaticContext<>(context));
this.member = member;
this.operationTimeout = operationTimeout;
this.fpr = falsePositiveRate;
Expand Down Expand Up @@ -743,6 +743,11 @@ public int maxCount(HashMultiset<?> gathered) {
return max.map(Entry::getCount).orElse(0);
}

public void nextView(Context c, Digest diadem) {
log.info("Next view: {} context: {} on: {}", diadem, context.getId(), member.getId());
context.setContext(c);
}

public void start(Duration duration) {
start(duration, null);
}
Expand Down Expand Up @@ -788,17 +793,19 @@ private <T> void completeIt(CompletableFuture<T> result, HashMultiset<T> gathere
}
return;
}
} else {
log.warn("Unable to achieve majority, max agree: 0 required: {}", majority + " on: {}", member.getId());
}
result.completeExceptionally(new CompletionException(
"Unable to achieve majority, max: " + (max == null ? 0 : max.getCount()) + " required: " + majority + " on: "
+ member.getId()));
}

private boolean failedMajority(CompletableFuture<?> result, int maxAgree, String operation) {
log.debug("Unable to achieve majority read: {}, max: {} required: {} on: {}", operation, maxAgree,
log.debug("Unable to achieve majority read: {}, max agree: {} required: {} on: {}", operation, maxAgree,
context.toleranceLevel() + 1, member.getId());
return result.completeExceptionally(new CompletionException(
"Unable to achieve majority read: " + operation + ", max: " + maxAgree + " required: "
"Unable to achieve majority read: " + operation + ", max agree: " + maxAgree + " required: "
+ context.toleranceLevel() + 1 + " on: " + member.getId()));
}

Expand Down Expand Up @@ -857,7 +864,7 @@ private <T> boolean mutate(HashMultiset<T> gathered, Optional<T> futureSailor, D
.stream()
.max(Ordering.natural().onResultOf(Entry::getCount))
.ifPresent(max -> tally.set(max.getCount()));
log.warn("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(),
log.warn("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(),
member.getId());
return !isTimedOut.get();
}
Expand All @@ -884,6 +891,9 @@ private <T> boolean read(CompletableFuture<T> result, HashMultiset<T> gathered,
log.debug("Majority: {} achieved: {}: {} tally: {} on: {}", max.getCount(), action, identifier,
tally.get(), member.getId());
return false;
} else {
log.info("Majority: {} required: {} not achieved: {}: {} tally: {} on: {}", max.getCount(), ctxMajority,
action, identifier, tally.get(), member.getId());
}
}
return !isTimedOut.get();
Expand Down

0 comments on commit 1533216

Please sign in to comment.