From 153321668079ffa7d2928e8ecad777fd73129b5a Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Tue, 11 Jun 2024 09:37:47 -0700 Subject: [PATCH] gate view change in KerlDHT --- .../apollo/model/ProcessDomain.java | 1 + .../com/salesforce/apollo/thoth/KerlDHT.java | 24 +++++++++++++------ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index 51645eded..cb3f48ded 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -120,6 +120,7 @@ public void stop() { protected BiConsumer listener() { return (context, diadem) -> { + dht.nextView(context, diadem); choam.rotateViewKeys(context, diadem); log.info("View change: {} for: {} cardinality: {} on: {}", diadem, params.context().getId(), context.size(), diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index d816b0838..90c3ba5c6 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -16,6 +16,8 @@ import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.archipelago.server.FernetServerInterceptor; import com.salesforce.apollo.context.Context; +import com.salesforce.apollo.context.DelegatedContext; +import com.salesforce.apollo.context.StaticContext; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.Verifier; @@ -96,7 +98,7 @@ public class KerlDHT implements ProtoKERLService { private final Ani ani; private final CachingKERL cache; private final JdbcConnectionPool connectionPool; - private final Context context; + private final DelegatedContext context; private final CommonCommunications dhtComms; private final double fpr; private final Duration operationsFrequency; @@ -116,9 +118,7 @@ public KerlDHT(Duration operationsFrequency, Context context, BiFunction wrap, JdbcConnectionPool connectionPool, DigestAlgorithm digestAlgorithm, Router communications, TemporalAmount operationTimeout, double falsePositiveRate, StereotomyMetrics metrics) { - @SuppressWarnings("unchecked") - final var casting = (Context) context; - this.context = casting; + this.context = new DelegatedContext((Context) new StaticContext<>(context)); this.member = member; this.operationTimeout = operationTimeout; this.fpr = falsePositiveRate; @@ -743,6 +743,11 @@ public int maxCount(HashMultiset gathered) { return max.map(Entry::getCount).orElse(0); } + public void nextView(Context c, Digest diadem) { + log.info("Next view: {} context: {} on: {}", diadem, context.getId(), member.getId()); + context.setContext(c); + } + public void start(Duration duration) { start(duration, null); } @@ -788,6 +793,8 @@ private void completeIt(CompletableFuture result, HashMultiset gathere } return; } + } else { + log.warn("Unable to achieve majority, max agree: 0 required: {}", majority + " on: {}", member.getId()); } result.completeExceptionally(new CompletionException( "Unable to achieve majority, max: " + (max == null ? 0 : max.getCount()) + " required: " + majority + " on: " @@ -795,10 +802,10 @@ private void completeIt(CompletableFuture result, HashMultiset gathere } private boolean failedMajority(CompletableFuture result, int maxAgree, String operation) { - log.debug("Unable to achieve majority read: {}, max: {} required: {} on: {}", operation, maxAgree, + log.debug("Unable to achieve majority read: {}, max agree: {} required: {} on: {}", operation, maxAgree, context.toleranceLevel() + 1, member.getId()); return result.completeExceptionally(new CompletionException( - "Unable to achieve majority read: " + operation + ", max: " + maxAgree + " required: " + "Unable to achieve majority read: " + operation + ", max agree: " + maxAgree + " required: " + context.toleranceLevel() + 1 + " on: " + member.getId())); } @@ -857,7 +864,7 @@ private boolean mutate(HashMultiset gathered, Optional futureSailor, D .stream() .max(Ordering.natural().onResultOf(Entry::getCount)) .ifPresent(max -> tally.set(max.getCount())); - log.warn("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(), + log.warn("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(), member.getId()); return !isTimedOut.get(); } @@ -884,6 +891,9 @@ private boolean read(CompletableFuture result, HashMultiset gathered, log.debug("Majority: {} achieved: {}: {} tally: {} on: {}", max.getCount(), action, identifier, tally.get(), member.getId()); return false; + } else { + log.info("Majority: {} required: {} not achieved: {}: {} tally: {} on: {}", max.getCount(), ctxMajority, + action, identifier, tally.get(), member.getId()); } } return !isTimedOut.get();