diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 6c30b9780..35e6a1491 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -25,6 +25,7 @@ import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DelegatedContext; import com.salesforce.apollo.context.StaticContext; +import com.salesforce.apollo.context.ViewChange; import com.salesforce.apollo.cryptography.*; import com.salesforce.apollo.cryptography.Signer.SignerImpl; import com.salesforce.apollo.cryptography.proto.PubKey; @@ -322,15 +323,14 @@ public String logState() { /** * A view change has occurred - * - * @param context - the new membership context - * @param diadem - the compact HexBloom of the context view */ - public void rotateViewKeys(Context context, Digest diadem) { + public void rotateViewKeys(ViewChange viewChange) { + var context = viewChange.context(); + var diadem = viewChange.diadem(); ((DelegatedContext) combine.getContext()).setContext(context); var c = current.get(); if (c != null) { - c.nextView(diadem, context); + c.nextView(viewChange.diadem(), context); } else { log.info("Acquiring new view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(), params.member().getId()); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index 3b68bc7a8..a8f66e6bf 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -152,7 +152,7 @@ public void gather(List preblock, boolean last) { try { return Join.parseFrom(bs); } catch (InvalidProtocolBufferException e) { - log.warn("error parsing join: {} on: {}", bs, params().member().getId(), e); + log.trace("error parsing join: {} on: {}", bs, params().member().getId(), e); return null; } }) diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 6c1e411e2..77795a803 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -16,9 +16,9 @@ import com.salesforce.apollo.archipelago.Router.ServiceRouting; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.bloomFilters.BloomFilter; -import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContext; import com.salesforce.apollo.context.DynamicContextImpl; +import com.salesforce.apollo.context.ViewChange; import com.salesforce.apollo.cryptography.*; import com.salesforce.apollo.cryptography.proto.Biff; import com.salesforce.apollo.fireflies.Binding.Bound; @@ -58,6 +58,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -87,25 +88,23 @@ public class View { private static final String SCHEDULED_VIEW_CHANGE = "Scheduled View Change"; final CommonCommunications comm; - final AtomicBoolean started = new AtomicBoolean(); + final AtomicBoolean started = new AtomicBoolean(); private final CommonCommunications approaches; private final DynamicContext context; private final DigestAlgorithm digestAlgo; private final RingCommunications gossiper; - private final AtomicBoolean introduced = new AtomicBoolean(); - private final List> viewChangeListeners = new CopyOnWriteArrayList<>(); - private final Executor viewNotificationQueue = Executors.newSingleThreadExecutor( - Thread.ofVirtual().factory()); + private final AtomicBoolean introduced = new AtomicBoolean(); + private final List> viewChangeListeners = new CopyOnWriteArrayList<>(); + private final Executor viewNotificationQueue; private final FireflyMetrics metrics; private final Node node; - private final Map observations = new ConcurrentSkipListMap<>(); + private final Map observations = new ConcurrentSkipListMap<>(); private final Parameters params; - private final ConcurrentMap pendingRebuttals = new ConcurrentSkipListMap<>(); + private final ConcurrentMap pendingRebuttals = new ConcurrentSkipListMap<>(); private final RoundScheduler roundTimers; - private final Set shunned = new ConcurrentSkipListSet<>(); - private final Map timers = new HashMap<>(); - private final ReadWriteLock viewChange = new ReentrantReadWriteLock( - true); + private final Set shunned = new ConcurrentSkipListSet<>(); + private final Map timers = new HashMap<>(); + private final ReadWriteLock viewChange; private final ViewManagement viewManagement; private final EventValidation validation; private final Verifiers verifiers; @@ -141,6 +140,8 @@ public View(DynamicContext context, ControlledIdentifierMember memb gossiper.ignoreSelf(); this.validation = validation; this.verifiers = verifiers; + viewNotificationQueue = Executors.newSingleThreadExecutor(Thread.ofVirtual().factory()); + viewChange = new ReentrantReadWriteLock(true); } /** @@ -170,7 +171,7 @@ public static boolean isValidMask(BitSet mask, DynamicContext context) { /** * Deregister the listener */ - public void deregister(BiConsumer listener) { + public void deregister(Consumer listener) { viewChangeListeners.remove(listener); } @@ -191,7 +192,7 @@ public Digest getNodeId() { /** * Register the listener to receive view changes */ - public void register(BiConsumer listener) { + public void register(Consumer listener) { viewChangeListeners.add(listener); } @@ -416,14 +417,15 @@ void introduced() { } void notifyListeners(List joining, List leaving) { - final var current = currentView(); - var sc = context.asStatic(); + final var viewChange = new ViewChange(context.asStatic(), currentView(), + joining.stream().map(SelfAddressingIdentifier::getDigest).toList(), + Collections.unmodifiableList(leaving)); viewNotificationQueue.execute(Utils.wrapped(() -> { viewChangeListeners.forEach(listener -> { try { log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ", listener, currentView(), context.size(), joining.size(), leaving.size(), node.getId()); - listener.accept(sc, current); + listener.accept(viewChange); } catch (Throwable e) { log.error("error in view change listener: {} on: {} ", listener, node.getId(), e); } diff --git a/memberships/src/main/java/com/salesforce/apollo/context/ViewChange.java b/memberships/src/main/java/com/salesforce/apollo/context/ViewChange.java new file mode 100644 index 000000000..b1cdc4d8c --- /dev/null +++ b/memberships/src/main/java/com/salesforce/apollo/context/ViewChange.java @@ -0,0 +1,8 @@ +package com.salesforce.apollo.context; + +import com.salesforce.apollo.cryptography.Digest; + +import java.util.Collection; + +public record ViewChange(Context context, Digest diadem, Collection joining, Collection leaving) { +} diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index cb3f48ded..8f961b1d2 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -10,6 +10,7 @@ import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContext; +import com.salesforce.apollo.context.ViewChange; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.SignatureAlgorithm; @@ -32,6 +33,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.RejectedExecutionException; import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * The logical domain of the current "Process" - OS and Simulation defined, 'natch. @@ -51,7 +53,7 @@ public class ProcessDomain extends Domain { private final Verifiers.DelegatedVerifiers verifiers; private final ProcessDomainParameters parameters; private final List> lifecycleListeners = new CopyOnWriteArrayList<>(); - private final BiConsumer listener = listener(); + private final Consumer listener = listener(); public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDomainParameters pdParams, Builder builder, Parameters.RuntimeParameters.Builder runtime, String endpoint, @@ -118,13 +120,13 @@ public void stop() { } } - protected BiConsumer listener() { - return (context, diadem) -> { - dht.nextView(context, diadem); - choam.rotateViewKeys(context, diadem); + protected Consumer listener() { + return (viewChange) -> { + dht.nextView(viewChange); + choam.rotateViewKeys(viewChange); - log.info("View change: {} for: {} cardinality: {} on: {}", diadem, params.context().getId(), context.size(), - params.member().getId()); + log.info("View change: {} for: {} cardinality: {} on: {}", viewChange.diadem(), params.context().getId(), + viewChange.context().size(), params.member().getId()); }; } diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index 0b3105f54..248c7b3bf 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -11,8 +11,8 @@ import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.proto.FoundationSeal; -import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContextImpl; +import com.salesforce.apollo.context.ViewChange; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.delphinius.Oracle; @@ -36,7 +36,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -115,14 +115,14 @@ public void smokin() throws Exception { final var seeds = Collections.singletonList( new Seed(domains.getFirst().getMember().getIdentifier().getIdentifier(), EndpointProvider.allocatePort())); domains.forEach(d -> { - BiConsumer c = (context, viewId) -> { - if (context.cardinality() == CARDINALITY) { - System.out.printf("Full view: %s members: %s on: %s%n", viewId, context.cardinality(), - d.getMember().getId()); + Consumer c = viewChange -> { + if (viewChange.context().cardinality() == CARDINALITY) { + System.out.printf("Full view: %s members: %s on: %s%n", viewChange.diadem(), + viewChange.context().cardinality(), d.getMember().getId()); countdown.countDown(); } else { - System.out.printf("Members joining: %s members: %s on: %s%n", viewId, context.cardinality(), - d.getMember().getId()); + System.out.printf("Members joining: %s members: %s on: %s%n", viewChange.diadem(), + viewChange.context().cardinality(), d.getMember().getId()); } }; d.foundation.register(c); diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index 90c3ba5c6..e2aa9646d 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -18,6 +18,7 @@ import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DelegatedContext; import com.salesforce.apollo.context.StaticContext; +import com.salesforce.apollo.context.ViewChange; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.Verifier; @@ -72,11 +73,12 @@ import java.sql.SQLException; import java.time.Duration; import java.time.Instant; -import java.time.temporal.TemporalAmount; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -108,17 +110,18 @@ public class KerlDHT implements ProtoKERLService { private final SigningMember member; private final RingCommunications reconcile; private final CommonCommunications reconcileComms; - private final Reconcile reconciliation = new Reconcile(); + private final Reconcile reconciliation = new Reconcile(); private final ScheduledExecutorService scheduler; - private final Service service = new Service(); - private final AtomicBoolean started = new AtomicBoolean(); - private final TemporalAmount operationTimeout; + private final Service service = new Service(); + private final AtomicBoolean started = new AtomicBoolean(); + private final Duration operationTimeout; + private final ReadWriteLock viewSynchronous = new ReentrantReadWriteLock(); public KerlDHT(Duration operationsFrequency, Context context, SigningMember member, BiFunction wrap, JdbcConnectionPool connectionPool, - DigestAlgorithm digestAlgorithm, Router communications, TemporalAmount operationTimeout, + DigestAlgorithm digestAlgorithm, Router communications, Duration operationTimeout, double falsePositiveRate, StereotomyMetrics metrics) { - this.context = new DelegatedContext((Context) new StaticContext<>(context)); + this.context = new DelegatedContext<>((Context) new StaticContext<>(context)); this.member = member; this.operationTimeout = operationTimeout; this.fpr = falsePositiveRate; @@ -162,7 +165,7 @@ public KerlDHT(Duration operationsFrequency, Context context, public KerlDHT(Duration operationsFrequency, Context context, SigningMember member, JdbcConnectionPool connectionPool, DigestAlgorithm digestAlgorithm, Router communications, - TemporalAmount operationTimeout, double falsePositiveRate, StereotomyMetrics metrics) { + Duration operationTimeout, double falsePositiveRate, StereotomyMetrics metrics) { this(operationsFrequency, context, member, (t, k) -> k, connectionPool, digestAlgorithm, communications, operationTimeout, falsePositiveRate, metrics); } @@ -201,12 +204,23 @@ public KeyState_ append(AttachmentEvent event) { HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.dontIgnoreSelf(); - iterator.iterate(identifier, null, - (link, r) -> link.append(Collections.emptyList(), Collections.singletonList(event)), null, - (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, - tally, destination, "append events"), - t -> completeIt(result, gathered)); + final var lock = viewSynchronous.readLock(); try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + try { + iterator.iterate(identifier, null, + (link, r) -> link.append(Collections.emptyList(), Collections.singletonList(event)), null, + (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, + isTimedOut, tally, destination, + "append events"), + t -> completeIt(result, gathered)); List s = result.get().getKeyStatesList(); return s.isEmpty() ? null : s.getFirst(); } catch (InterruptedException e) { @@ -218,6 +232,8 @@ public KeyState_ append(AttachmentEvent event) { return KeyState_.getDefaultInstance(); } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -237,11 +253,22 @@ public List append(KERL_ kerl) { HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.dontIgnoreSelf(); - iterator.iterate(identifier, null, (link, r) -> link.append(kerl), null, - (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, - tally, destination, "append kerl"), - t -> completeIt(result, gathered)); + final var lock = viewSynchronous.readLock(); try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + try { + iterator.iterate(identifier, null, (link, r) -> link.append(kerl), null, + (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, + isTimedOut, tally, destination, + "append kerl"), + t -> completeIt(result, gathered)); return result.get().getKeyStatesList(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -252,6 +279,8 @@ public List append(KERL_ kerl) { return Collections.emptyList(); } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -266,11 +295,22 @@ public KeyState_ append(KeyEvent_ event) { HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.dontIgnoreSelf(); - iterator.iterate(identifier, null, (link, r) -> link.append(Collections.singletonList(event)), null, - (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, - tally, destination, "append event"), - t -> completeIt(result, gathered)); + final var lock = viewSynchronous.readLock(); try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + try { + iterator.iterate(identifier, null, (link, r) -> link.append(Collections.singletonList(event)), null, + (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, + isTimedOut, tally, destination, + "append event"), + t -> completeIt(result, gathered)); var ks = result.get(); return ks.getKeyStatesCount() == 0 ? KeyState_.getDefaultInstance() : ks.getKeyStatesList().getFirst(); } catch (InterruptedException e) { @@ -282,6 +322,8 @@ public KeyState_ append(KeyEvent_ event) { return KeyState_.getDefaultInstance(); } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -290,9 +332,23 @@ public List append(List events) { if (events.isEmpty()) { return Collections.emptyList(); } - List states = new ArrayList<>(); - events.stream().map(this::append).forEach(states::add); - return states; + final var lock = viewSynchronous.readLock(); + try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + try { + List states = new ArrayList<>(); + events.stream().map(this::append).forEach(states::add); + return states; + } finally { + lock.lock(); + } } @Override @@ -300,11 +356,25 @@ public List append(List events, List atta if (events.isEmpty()) { return Collections.emptyList(); } - List states = new ArrayList<>(); - events.stream().map(this::append).forEach(states::add); + final var lock = viewSynchronous.readLock(); + try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + try { + List states = new ArrayList<>(); + events.stream().map(this::append).forEach(states::add); - attachments.forEach(this::append); - return states; + attachments.forEach(this::append); + return states; + } finally { + lock.lock(); + } } @Override @@ -323,12 +393,23 @@ public Empty appendAttachments(List events) { HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.dontIgnoreSelf(); - iterator.iterate(identifier, null, (link, r) -> link.appendAttachments(events), null, - (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, - tally, destination, "append attachments"), - t -> completeIt(result, gathered)); + final var lock = viewSynchronous.readLock(); + try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } try { + iterator.iterate(identifier, null, (link, r) -> link.appendAttachments(events), null, + (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, + isTimedOut, tally, destination, + "append attachments"), + t -> completeIt(result, gathered)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -339,6 +420,8 @@ public Empty appendAttachments(List events) { return Empty.getDefaultInstance(); } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -357,11 +440,22 @@ public Empty appendValidations(Validations validations) { HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); iterator.dontIgnoreSelf(); - iterator.iterate(identifier, null, (link, r) -> link.appendValidations(validations), null, - (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, - tally, destination, "append validations"), - t -> completeIt(result, gathered)); + final var lock = viewSynchronous.readLock(); try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + try { + iterator.iterate(identifier, null, (link, r) -> link.appendValidations(validations), null, + (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, + isTimedOut, tally, destination, + "append validations"), + t -> completeIt(result, gathered)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -372,6 +466,8 @@ public Empty appendValidations(Validations validations) { return Empty.getDefaultInstance(); } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -409,13 +505,24 @@ public Attachment getAttachment(EventCoords coordinates) { HashMultiset gathered = HashMultiset.create(); var operation = "getAttachment(%s)".formatted(EventCoordinates.from(coordinates)); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(identifier, null, (link, r) -> link.getAttachment(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, identifier, - isTimedOut, destination, "get attachment", - Attachment.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + final var lock = viewSynchronous.readLock(); + try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } try { + iterator.iterate(identifier, null, (link, r) -> link.getAttachment(coordinates), + () -> failedMajority(result, maxCount(gathered), operation), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, + identifier, isTimedOut, destination, + "get attachment", + Attachment.getDefaultInstance()), + t -> failedMajority(result, maxCount(gathered), operation)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -426,6 +533,8 @@ public Attachment getAttachment(EventCoords coordinates) { return null; } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -444,13 +553,23 @@ public KERL_ getKERL(Ident identifier) { HashMultiset gathered = HashMultiset.create(); var operation = "getKerl(%s)".formatted(Identifier.from(identifier)); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKERL(identifier), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KERL_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + final var lock = viewSynchronous.readLock(); + try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } try { + iterator.iterate(digest, null, (link, r) -> link.getKERL(identifier), + () -> failedMajority(result, maxCount(gathered), operation), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation, + KERL_.getDefaultInstance()), + t -> failedMajority(result, maxCount(gathered), operation)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -461,6 +580,8 @@ public KERL_ getKERL(Ident identifier) { return KERL_.getDefaultInstance(); } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -483,13 +604,23 @@ public KeyEvent_ getKeyEvent(EventCoords coordinates) { var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyEvent(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyEvent_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + final var lock = viewSynchronous.readLock(); try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + try { + iterator.iterate(digest, null, (link, r) -> link.getKeyEvent(coordinates), + () -> failedMajority(result, maxCount(gathered), operation), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation, + KeyEvent_.getDefaultInstance()), + t -> failedMajority(result, maxCount(gathered), operation)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -500,6 +631,8 @@ public KeyEvent_ getKeyEvent(EventCoords coordinates) { return KeyEvent_.getDefaultInstance(); } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -519,13 +652,23 @@ public KeyState_ getKeyState(EventCoords coordinates) { var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyState(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyState_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + final var lock = viewSynchronous.readLock(); try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + try { + iterator.iterate(digest, null, (link, r) -> link.getKeyState(coordinates), + () -> failedMajority(result, maxCount(gathered), operation), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation, + KeyState_.getDefaultInstance()), + t -> failedMajority(result, maxCount(gathered), operation)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -536,6 +679,8 @@ public KeyState_ getKeyState(EventCoords coordinates) { return KeyState_.getDefaultInstance(); } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -556,13 +701,23 @@ public KeyState_ getKeyState(Ident identifier, long sequenceNumber) { var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyState(identAndSeq), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyState_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + final var lock = viewSynchronous.readLock(); try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + try { + iterator.iterate(digest, null, (link, r) -> link.getKeyState(identAndSeq), + () -> failedMajority(result, maxCount(gathered), operation), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation, + KeyState_.getDefaultInstance()), + t -> failedMajority(result, maxCount(gathered), operation)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -573,6 +728,8 @@ public KeyState_ getKeyState(Ident identifier, long sequenceNumber) { return KeyState_.getDefaultInstance(); } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -592,13 +749,23 @@ public KeyState_ getKeyState(Ident identifier) { var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyState(identifier), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyState_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + final var lock = viewSynchronous.readLock(); + try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } try { + iterator.iterate(digest, null, (link, r) -> link.getKeyState(identifier), + () -> failedMajority(result, maxCount(gathered), operation), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation, + KeyState_.getDefaultInstance()), + t -> failedMajority(result, maxCount(gathered), operation)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -609,6 +776,8 @@ public KeyState_ getKeyState(Ident identifier) { return KeyState_.getDefaultInstance(); } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -628,13 +797,23 @@ public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinat var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyStateWithAttachments(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyStateWithAttachments_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + final var lock = viewSynchronous.readLock(); + try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } try { + iterator.iterate(digest, null, (link, r) -> link.getKeyStateWithAttachments(coordinates), + () -> failedMajority(result, maxCount(gathered), operation), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation, + KeyStateWithAttachments_.getDefaultInstance()), + t -> failedMajority(result, maxCount(gathered), operation)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -645,6 +824,8 @@ public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinat return null; } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -664,13 +845,23 @@ public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndVal var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyStateWithEndorsementsAndValidations(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyStateWithEndorsementsAndValidations_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + final var lock = viewSynchronous.readLock(); try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + try { + iterator.iterate(digest, null, (link, r) -> link.getKeyStateWithEndorsementsAndValidations(coordinates), + () -> failedMajority(result, maxCount(gathered), operation), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation, + KeyStateWithEndorsementsAndValidations_.getDefaultInstance()), + t -> failedMajority(result, maxCount(gathered), operation)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -681,6 +872,8 @@ public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndVal return null; } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -700,13 +893,23 @@ public Validations getValidations(EventCoords coordinates) { var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(identifier, null, (link, r) -> link.getValidations(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, identifier, - isTimedOut, destination, operation, - Validations.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + final var lock = viewSynchronous.readLock(); + try { + if (!lock.tryLock(operationTimeout.toNanos(), TimeUnit.NANOSECONDS)) { + log.info("Could not acquire view synchronous lock on: {}", member.getId()); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } try { + iterator.iterate(identifier, null, (link, r) -> link.getValidations(coordinates), + () -> failedMajority(result, maxCount(gathered), operation), + (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, + identifier, isTimedOut, destination, operation, + Validations.getDefaultInstance()), + t -> failedMajority(result, maxCount(gathered), operation)); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -717,6 +920,8 @@ public Validations getValidations(EventCoords coordinates) { return null; } throw new IllegalStateException(e.getCause()); + } finally { + lock.unlock(); } } @@ -734,18 +939,16 @@ public Optional verifierFor(Identifier identifier) { }; } - public Entry max(HashMultiset gathered) { - return gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)).orElse(null); - } - - public int maxCount(HashMultiset gathered) { - final var max = gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)); - return max.map(Entry::getCount).orElse(0); - } - - public void nextView(Context c, Digest diadem) { - log.info("Next view: {} context: {} on: {}", diadem, context.getId(), member.getId()); - context.setContext(c); + public void nextView(ViewChange viewChange) { + final var lock = viewSynchronous.writeLock(); + lock.lock(); + try { + log.info("Next view: {} context: {} on: {}", viewChange.diadem(), viewChange.context().getId(), + member.getId()); + context.setContext(viewChange.context()); + } finally { + lock.unlock(); + } } public void start(Duration duration) { @@ -850,6 +1053,15 @@ private CombinedIntervals keyIntervals() { return new CombinedIntervals(intervals); } + private Entry max(HashMultiset gathered) { + return gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)).orElse(null); + } + + private int maxCount(HashMultiset gathered) { + final var max = gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)); + return max.map(Entry::getCount).orElse(0); + } + private boolean mutate(HashMultiset gathered, Optional futureSailor, Digest identifier, Supplier isTimedOut, AtomicInteger tally, RingCommunications.Destination destination, String action) {