From 8de73a2ec1af789e8baf4fbe412f4d21ecefa381 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Mon, 1 Jan 2024 08:21:58 -0800 Subject: [PATCH] Moar cleanup wrt scheduling Also remove delegated verifier/validation as I don't believe that will be necessary --- .../com/salesforce/apollo/choam/Session.java | 5 ++-- .../salesforce/apollo/choam/ViewAssembly.java | 8 +++--- .../apollo/choam/support/Bootstrapper.java | 15 +++++------ .../choam/support/CheckpointAssembler.java | 13 +++++----- .../ethereal/memberships/ChRbcGossip.java | 16 ++++++------ .../salesforce/apollo/fireflies/Binding.java | 6 ++--- .../apollo/fireflies/Bootstrapper.java | 4 +-- .../com/salesforce/apollo/fireflies/View.java | 25 +++++++++++-------- .../apollo/fireflies/ViewManagement.java | 3 ++- .../salesforce/apollo/leyden/LeydenJar.java | 4 ++- .../messaging/rbc/ReliableBroadcaster.java | 12 ++++++--- .../salesforce/apollo/model/SubDomain.java | 7 ++++-- .../com/salesforce/apollo/thoth/KerlDHT.java | 4 ++- 13 files changed, 71 insertions(+), 51 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Session.java b/choam/src/main/java/com/salesforce/apollo/choam/Session.java index 8a9723132..99273d129 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Session.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Session.java @@ -21,6 +21,7 @@ import com.salesforce.apollo.cryptography.JohnHancock; import com.salesforce.apollo.cryptography.Signer; import com.salesforce.apollo.cryptography.Verifier; +import com.salesforce.apollo.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,7 +170,7 @@ public CompletableFuture submit(Message transaction, Duration timeout) th if (params.metrics() != null) { params.metrics().transactionSubmittedSuccess(); } - var futureTimeout = scheduler.schedule(() -> { + var futureTimeout = scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> { if (result.isDone()) { return; } @@ -179,7 +180,7 @@ public CompletableFuture submit(Message transaction, Duration timeout) th if (params.metrics() != null) { params.metrics().transactionComplete(to); } - }, timeout.toMillis(), TimeUnit.MILLISECONDS); + }, log)), timeout.toMillis(), TimeUnit.MILLISECONDS); return result.whenComplete((r, t) -> { futureTimeout.cancel(true); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index eeffa3991..c7cf17ad9 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -7,16 +7,17 @@ package com.salesforce.apollo.choam; import com.chiralbehaviors.tron.Fsm; -import com.salesforce.apollo.choam.proto.*; -import com.salesforce.apollo.cryptography.proto.PubKey; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.choam.comm.Terminal; import com.salesforce.apollo.choam.fsm.Reconfiguration; import com.salesforce.apollo.choam.fsm.Reconfiguration.Reconfigure; import com.salesforce.apollo.choam.fsm.Reconfiguration.Transitions; +import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.cryptography.Digest; +import com.salesforce.apollo.cryptography.proto.PubKey; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.ring.SliceIterator; +import com.salesforce.apollo.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -150,7 +151,8 @@ private void completeSlice(AtomicReference retryDelay, AtomicReference proposals.keySet().stream().toList(), nextAssembly.size(), delay, params().member().getId()); if (!cancelSlice.get()) { Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()) - .schedule(() -> reiterate.get().run(), delay.toMillis(), TimeUnit.MILLISECONDS); + .schedule(() -> Thread.ofVirtual().start(Utils.wrapped(reiterate.get(), log)), delay.toMillis(), + TimeUnit.MILLISECONDS); } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java index 672158dd7..67e260ed6 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java @@ -8,13 +8,13 @@ import com.google.common.collect.Multiset; import com.google.common.collect.TreeMultiset; -import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.bloomFilters.BloomFilter; import com.salesforce.apollo.bloomFilters.BloomFilter.ULongBloomFilter; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.comm.Concierge; import com.salesforce.apollo.choam.comm.Terminal; +import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.HexBloom; @@ -23,6 +23,7 @@ import com.salesforce.apollo.ring.RingIterator; import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Pair; +import com.salesforce.apollo.utils.Utils; import org.joou.ULong; import org.joou.Unsigned; import org.slf4j.Logger; @@ -377,14 +378,14 @@ private void scheduleAnchorCompletion(AtomicReference start, ULong anchor } log.info("Scheduling Anchor completion ({} to {}) duration: {} on: {}", start, anchorTo, params.gossipDuration(), params.member().getId()); - scheduler.schedule(() -> { + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> { try { anchor(start, anchorTo); } catch (Throwable e) { log.error("Cannot execute completeViewChain on: {}", params.member().getId()); sync.completeExceptionally(e); } - }, params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS); + }, log)), params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS); } private void scheduleSample() { @@ -392,7 +393,7 @@ private void scheduleSample() { return; } log.info("Scheduling state sample on: {}", params.member().getId()); - scheduler.schedule(() -> { + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> { final HashedCertifiedBlock established = genesis; if (sync.isDone() || established != null) { log.trace("Synchronization isDone: {} genesis: {} on: {}", sync.isDone(), @@ -406,7 +407,7 @@ private void scheduleSample() { sync.completeExceptionally(e); e.printStackTrace(); } - }, params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS); + }, log)), params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS); } private void scheduleViewChainCompletion(AtomicReference start, ULong to) { @@ -418,14 +419,14 @@ private void scheduleViewChainCompletion(AtomicReference start, ULong to) } log.info("Scheduling view chain completion ({} to {}) duration: {} on: {}", start, to, params.gossipDuration(), params.member().getId()); - scheduler.schedule(() -> { + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> { try { completeViewChain(start, to); } catch (Throwable e) { log.error("Cannot execute completeViewChain on: {}", params.member().getId()); sync.completeExceptionally(e); } - }, params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS); + }, log)), params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS); } private boolean synchronize(Optional futureSailor, HashMap votes, diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java index c12b616b7..ece562b48 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java @@ -6,13 +6,13 @@ */ package com.salesforce.apollo.choam.support; -import com.salesforce.apollo.choam.proto.Checkpoint; -import com.salesforce.apollo.choam.proto.CheckpointReplication; -import com.salesforce.apollo.choam.proto.CheckpointSegments; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.bloomFilters.BloomFilter; import com.salesforce.apollo.choam.comm.Concierge; import com.salesforce.apollo.choam.comm.Terminal; +import com.salesforce.apollo.choam.proto.Checkpoint; +import com.salesforce.apollo.choam.proto.CheckpointReplication; +import com.salesforce.apollo.choam.proto.CheckpointSegments; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.HexBloom; @@ -21,6 +21,7 @@ import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.ring.RingIterator; import com.salesforce.apollo.utils.Entropy; +import com.salesforce.apollo.utils.Utils; import org.h2.mvstore.MVMap; import org.joou.ULong; import org.slf4j.Logger; @@ -115,9 +116,9 @@ private void gossip(ScheduledExecutorService scheduler, Duration duration) { member.getId()); var ringer = new RingIterator<>(frequency, context, member, comms, true, scheduler); ringer.iterate(randomCut(digestAlgorithm), (link, ring) -> gossip(link), - (tally, result, destination) -> gossip(result), - t -> scheduler.schedule(() -> gossip(scheduler, duration), duration.toMillis(), - TimeUnit.MILLISECONDS)); + (tally, result, destination) -> gossip(result), t -> scheduler.schedule( + () -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(scheduler, duration), log)), duration.toMillis(), + TimeUnit.MILLISECONDS)); } diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java index 4e7ebd52b..effcb308f 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java @@ -7,9 +7,6 @@ package com.salesforce.apollo.ethereal.memberships; import com.codahale.metrics.Timer; -import com.salesforce.apollo.ethereal.proto.ContextUpdate; -import com.salesforce.apollo.ethereal.proto.Gossip; -import com.salesforce.apollo.ethereal.proto.Update; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.cryptography.Digest; @@ -18,11 +15,15 @@ import com.salesforce.apollo.ethereal.memberships.comm.Gossiper; import com.salesforce.apollo.ethereal.memberships.comm.GossiperServer; import com.salesforce.apollo.ethereal.memberships.comm.GossiperService; +import com.salesforce.apollo.ethereal.proto.ContextUpdate; +import com.salesforce.apollo.ethereal.proto.Gossip; +import com.salesforce.apollo.ethereal.proto.Update; import com.salesforce.apollo.membership.Context; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.ring.RingCommunications; import com.salesforce.apollo.utils.Entropy; +import com.salesforce.apollo.utils.Utils; import io.grpc.StatusRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,13 +85,13 @@ public void start(Duration duration) { log.trace("Starting GossipService[{}] on: {}", context.getId(), member.getId()); comm.register(context.getId(), new Terminal()); var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); - scheduler.schedule(() -> { + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> { try { oneRound(duration, scheduler); } catch (Throwable e) { log.error("Error in gossip on: {}", member.getId(), e); } - }, initialDelay.toMillis(), TimeUnit.MILLISECONDS); + }, log)), initialDelay.toMillis(), TimeUnit.MILLISECONDS); } /** @@ -175,8 +176,9 @@ private void handle(Optional result, RingCommunications.Destination oneRound(duration, scheduler), duration.toMillis(), - TimeUnit.MILLISECONDS); + scheduled = scheduler.schedule( + () -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(duration, scheduler), log)), + duration.toMillis(), TimeUnit.MILLISECONDS); } } } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 202613d6b..161164c5a 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -100,8 +100,8 @@ void seeding() { return link.seed(registration); }, (futureSailor, link, m) -> complete(redirect, futureSailor, m), () -> { if (!redirect.isDone()) { - scheduler.schedule(Utils.wrapped(reseed.get(), log), params.retryDelay().toNanos(), - TimeUnit.NANOSECONDS); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(reseed.get(), log)), + params.retryDelay().toNanos(), TimeUnit.NANOSECONDS); } }, scheduler, params.retryDelay()); }); @@ -299,7 +299,7 @@ private void join(Redirect redirect, Digest v, Duration duration) { params.joinRetries(), node.getId()); trusts.clear(); initialSeedSet.clear(); - scheduler.schedule(Utils.wrapped(regate.get(), log), + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(regate.get(), log)), Entropy.nextBitsStreamLong(params.retryDelay().toNanos()), TimeUnit.NANOSECONDS); } else { diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Bootstrapper.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Bootstrapper.java index 55e1fea22..579cf8a68 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Bootstrapper.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Bootstrapper.java @@ -32,9 +32,7 @@ import java.util.concurrent.Executors; /** - * Verifiers that delegate to the joining member's successors in the full context for key state retrieval - *

- * This is used to bootstrap the node via delegated key state resolution of the joined group + * Verifiers that delegate to a majority of the sample for event validation and verification *

* * @author hal.hildebrand diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 40e1e83fb..97dc711ee 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -108,8 +108,8 @@ public class View { private final ReadWriteLock viewChange = new ReentrantReadWriteLock( true); private final ViewManagement viewManagement; - private final EventValidation.DelegatedEventValidation validation; - private final Verifiers.DelegatedVerifiers verifiers; + private final EventValidation validation; + private final Verifiers verifiers; private volatile ScheduledFuture futureGossip; public View(Context context, ControlledIdentifierMember member, InetSocketAddress endpoint, @@ -138,8 +138,8 @@ public View(Context context, ControlledIdentifierMember member, Ine r -> new EntranceServer(gateway.getClientIdentityProvider(), r, metrics), EntranceClient.getCreate(metrics), Entrance.getLocalLoopback(node, service)); gossiper = new RingCommunications<>(context, node, comm); - this.validation = new EventValidation.DelegatedEventValidation(validation); - this.verifiers = new Verifiers.DelegatedVerifiers(verifiers); + this.validation = validation; + this.verifiers = verifiers; } /** @@ -222,9 +222,10 @@ public void start(CompletableFuture onJoin, Duration d, List seedpod var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); var initial = Entropy.nextBitsStreamLong(d.toNanos()); - scheduler.schedule(Utils.wrapped( - () -> new Binding(this, seeds, d, context, approaches, node, params, metrics, digestAlgo).seeding(), log), - initial, TimeUnit.NANOSECONDS); + scheduler.schedule(() -> Thread.ofVirtual() + .start(Utils.wrapped( + () -> new Binding(this, seeds, d, context, approaches, node, params, metrics, + digestAlgo).seeding(), log)), initial, TimeUnit.NANOSECONDS); log.info("{} started on: {}", context.getId(), node.getId()); } @@ -495,8 +496,9 @@ void resetBootstrapView() { void schedule(final Duration duration) { var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); - futureGossip = scheduler.schedule(Utils.wrapped(() -> gossip(duration, scheduler), log), - Entropy.nextBitsStreamLong(duration.toNanos()), TimeUnit.NANOSECONDS); + futureGossip = scheduler.schedule( + () -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(duration, scheduler), log)), + Entropy.nextBitsStreamLong(duration.toNanos()), TimeUnit.NANOSECONDS); } void scheduleFinalizeViewChange() { @@ -1083,8 +1085,9 @@ private void gossip(Optional result, RingCommunications.Destination gossip(duration, scheduler), log), duration.toNanos(), - TimeUnit.NANOSECONDS); + futureGossip = scheduler.schedule( + () -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(duration, scheduler), log)), duration.toNanos(), + TimeUnit.NANOSECONDS); } } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 564ee2f8e..09bb5878f 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -380,7 +380,8 @@ void populate(List sample) { return !joined(); }, () -> { if (!joined()) { - scheduler.schedule(Utils.wrapped(() -> repopulate.get(), log), 500, TimeUnit.MILLISECONDS); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(repopulate.get(), log)), 500, + TimeUnit.MILLISECONDS); } }, scheduler, Duration.ofMillis(500)); }); diff --git a/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java b/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java index fd8806900..a37cb1210 100644 --- a/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java +++ b/leyden/src/main/java/com/salesforce/apollo/leyden/LeydenJar.java @@ -20,6 +20,7 @@ import com.salesforce.apollo.ring.RingIterator; import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Hex; +import com.salesforce.apollo.utils.Utils; import io.grpc.Status; import io.grpc.StatusRuntimeException; import org.h2.mvstore.MVMap; @@ -380,7 +381,8 @@ private void reconcile(Optional result, member.getId()); } if (started.get()) { - scheduler.schedule(() -> reconcile(scheduler, duration), duration.toNanos(), TimeUnit.NANOSECONDS); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> reconcile(scheduler, duration), log)), + duration.toNanos(), TimeUnit.NANOSECONDS); } } diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index 638ec77d5..de5ae9bb4 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -10,8 +10,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; -import com.salesforce.apollo.cryptography.proto.Biff; -import com.salesforce.apollo.messaging.proto.*; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.bloomFilters.BloomFilter; @@ -20,13 +18,16 @@ import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.JohnHancock; +import com.salesforce.apollo.cryptography.proto.Biff; import com.salesforce.apollo.membership.Context; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.membership.messaging.rbc.comms.RbcServer; import com.salesforce.apollo.membership.messaging.rbc.comms.ReliableBroadcast; +import com.salesforce.apollo.messaging.proto.*; import com.salesforce.apollo.ring.RingCommunications; import com.salesforce.apollo.utils.Entropy; +import com.salesforce.apollo.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -187,7 +188,8 @@ public void start(Duration duration) { log.info("Starting Reliable Broadcaster[{}] for {}", context.getId(), member.getId()); comm.register(context.getId(), new Service()); var scheduler = Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()); - scheduler.schedule(() -> oneRound(duration, scheduler), initialDelay, TimeUnit.MILLISECONDS); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(duration, scheduler), log)), + initialDelay, TimeUnit.MILLISECONDS); } public void stop() { @@ -251,7 +253,9 @@ private void handle(Optional result, } if (started.get()) { try { - scheduler.schedule(() -> oneRound(duration, scheduler), duration.toMillis(), TimeUnit.MILLISECONDS); + scheduler.schedule( + () -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(duration, scheduler), log)), + duration.toMillis(), TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { return; } diff --git a/model/src/main/java/com/salesforce/apollo/model/SubDomain.java b/model/src/main/java/com/salesforce/apollo/model/SubDomain.java index deee6f1f3..cb0c1aae6 100644 --- a/model/src/main/java/com/salesforce/apollo/model/SubDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/SubDomain.java @@ -25,6 +25,7 @@ import com.salesforce.apollo.model.comms.DelegationService; import com.salesforce.apollo.ring.RingCommunications; import com.salesforce.apollo.utils.Entropy; +import com.salesforce.apollo.utils.Utils; import org.h2.mvstore.MVMap; import org.h2.mvstore.MVStore; import org.slf4j.Logger; @@ -106,7 +107,8 @@ public void start() { super.start(); Duration initialDelay = gossipInterval.plusMillis(Entropy.nextBitsStreamLong(gossipInterval.toMillis())); log.trace("Starting SubDomain[{}:{}]", params.context().getId(), member.getId()); - scheduler.schedule(() -> oneRound(), initialDelay.toMillis(), TimeUnit.MILLISECONDS); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(), log)), + initialDelay.toMillis(), TimeUnit.MILLISECONDS); } @Override @@ -171,7 +173,8 @@ private void handle(Optional result, timer.stop(); } if (started.get()) { - scheduler.schedule(() -> oneRound(), gossipInterval.toMillis(), TimeUnit.MILLISECONDS); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(), log)), + gossipInterval.toMillis(), TimeUnit.MILLISECONDS); } } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index 0db065182..45521e8e4 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -47,6 +47,7 @@ import com.salesforce.apollo.thoth.proto.Update; import com.salesforce.apollo.thoth.proto.Updating; import com.salesforce.apollo.utils.Entropy; +import com.salesforce.apollo.utils.Utils; import liquibase.Liquibase; import liquibase.Scope; import liquibase.Scope.Attr; @@ -948,7 +949,8 @@ private void reconcile(Optional result, } } if (started.get()) { - scheduler.schedule(() -> reconcile(scheduler, duration), duration.toMillis(), TimeUnit.MILLISECONDS); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> reconcile(scheduler, duration), log)), + duration.toMillis(), TimeUnit.MILLISECONDS); } }