diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 3ea01d60b..d004baddf 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -91,7 +91,7 @@ public class CHOAM { public CHOAM(Parameters params) { this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build()); this.params = params; - executions = Executors.newCachedThreadPool(Thread.ofVirtual().factory()); + executions = Executors.newVirtualThreadPerTaskExecutor(); nextView(); combine = new ReliableBroadcaster(params.context(), params.member(), params.combine(), params.communications(), diff --git a/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java b/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java index b3a396e60..7625aa1c2 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java @@ -13,13 +13,12 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.proto.Block; import com.salesforce.apollo.choam.proto.CertifiedBlock; import com.salesforce.apollo.choam.proto.Header; import com.salesforce.apollo.choam.proto.SubmitResult; import com.salesforce.apollo.choam.proto.SubmitResult.Result; -import com.salesforce.apollo.test.proto.ByteMessage; -import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.support.HashedCertifiedBlock; import com.salesforce.apollo.choam.support.InvalidTransaction; import com.salesforce.apollo.choam.support.SubmittedTransaction; @@ -31,6 +30,7 @@ import com.salesforce.apollo.stereotomy.StereotomyImpl; import com.salesforce.apollo.stereotomy.mem.MemKERL; import com.salesforce.apollo.stereotomy.mem.MemKeyStore; +import com.salesforce.apollo.test.proto.ByteMessage; import io.grpc.StatusRuntimeException; import org.junit.jupiter.api.Test; import org.slf4j.LoggerFactory; @@ -103,7 +103,7 @@ public void func() throws Exception { @Test public void scalingTest() throws Exception { - var exec = Executors.newCachedThreadPool(Thread.ofVirtual().factory()); + var exec = Executors.newVirtualThreadPerTaskExecutor(); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); Context context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getOrigin(), 9, 0.2, 3); var entropy = SecureRandom.getInstance("SHA1PRNG"); diff --git a/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java b/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java index c1a943ef5..eaa4fdc9c 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java @@ -7,8 +7,8 @@ package com.salesforce.apollo.choam; import com.google.protobuf.ByteString; -import com.salesforce.apollo.test.proto.ByteMessage; import com.salesforce.apollo.choam.support.InvalidTransaction; +import com.salesforce.apollo.test.proto.ByteMessage; import com.salesforce.apollo.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,8 +24,7 @@ class Transactioneer { private final static Random entropy = new Random(); private final static Logger log = LoggerFactory.getLogger(Transactioneer.class); - private final static Executor executor = Executors.newCachedThreadPool( - Thread.ofVirtual().factory()); + private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual() .factory()); private final AtomicInteger completed = new AtomicInteger(); diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 3d8ae208c..fa4525150 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -26,7 +26,6 @@ import com.salesforce.apollo.ring.SliceIterator; import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; -import io.grpc.Status; import io.grpc.StatusRuntimeException; import org.joou.ULong; import org.slf4j.Logger; @@ -35,7 +34,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -56,17 +55,15 @@ class Binding { private final FireflyMetrics metrics; private final Node node; private final Parameters params; - private final ScheduledExecutorService scheduler; private final List seeds; private final View view; - public Binding(View view, List seeds, Duration duration, ScheduledExecutorService scheduler, - Context context, CommonCommunications approaches, Node node, - Parameters params, FireflyMetrics metrics, DigestAlgorithm digestAlgo) { + public Binding(View view, List seeds, Duration duration, Context context, + CommonCommunications approaches, Node node, Parameters params, + FireflyMetrics metrics, DigestAlgorithm digestAlgo) { this.view = view; this.duration = duration; this.seeds = new ArrayList<>(seeds); - this.scheduler = scheduler; this.context = context; this.node = node; this.params = params; @@ -84,9 +81,9 @@ void seeding() { log.info("Seeding view: {} context: {} with seeds: {} started on: {}", view.currentView(), this.context.getId(), seeds.size(), node.getId()); - var seeding = new CompletableFuture(); + var redirect = new CompletableFuture(); var timer = metrics == null ? null : metrics.seedDuration().time(); - seeding.whenComplete(join(duration, scheduler, timer)); + redirect.whenComplete(join(duration, timer)); var bootstrappers = seeds.stream() .map(this::seedFor) @@ -95,13 +92,14 @@ void seeding() { .collect(Collectors.toList()); var seedlings = new SliceIterator<>("Seedlings", node, bootstrappers, approaches); AtomicReference reseed = new AtomicReference<>(); + var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); reseed.set(() -> { final var registration = registration(); seedlings.iterate((link, m) -> { log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), node.getId()); return link.seed(registration); - }, (futureSailor, link, m) -> complete(seeding, futureSailor, m), () -> { - if (!seeding.isDone()) { + }, (futureSailor, link, m) -> complete(redirect, futureSailor, m), () -> { + if (!redirect.isDone()) { scheduler.schedule(Utils.wrapped(reseed.get(), log), params.retryDelay().toNanos(), TimeUnit.NANOSECONDS); } @@ -115,7 +113,7 @@ private void bootstrap() { node.getId()); var nw = node.getNote(); - view.bootstrap(nw, scheduler, duration); + view.bootstrap(nw, duration); } private boolean complete(CompletableFuture redirect, Optional futureSailor, Member m) { @@ -123,14 +121,15 @@ private boolean complete(CompletableFuture redirect, Optional gateway, @@ -180,25 +179,29 @@ private boolean completeGateway(Participant member, CompletableFuture gat } private void gatewaySRE(Digest v, Entrance link, StatusRuntimeException sre, AtomicInteger abandon) { - if (sre.getStatus().getCode().equals(Status.OUT_OF_RANGE.getCode())) { + switch (sre.getStatus().getCode()) { + case OUT_OF_RANGE -> { log.info("Gateway view: {} invalid: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), node.getId()); abandon.incrementAndGet(); - } else if (sre.getStatus().getCode().equals(Status.FAILED_PRECONDITION.getCode())) { + } + case FAILED_PRECONDITION -> { log.info("Gateway view: {} unavailable: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), node.getId()); abandon.incrementAndGet(); - } else if (sre.getStatus().getCode().equals(Status.PERMISSION_DENIED.getCode())) { + } + case PERMISSION_DENIED -> { log.info("Gateway view: {} permission denied: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), node.getId()); abandon.incrementAndGet(); - } else if (sre.getStatus().getCode().equals(Status.RESOURCE_EXHAUSTED.getCode())) { + } + case RESOURCE_EXHAUSTED -> { log.info("Gateway view: {} full: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), node.getId()); abandon.incrementAndGet(); - } else { - log.info("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), - node.getId()); + } + default -> log.info("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), + node.getId()); } } @@ -206,8 +209,7 @@ private Join join(Digest v) { return Join.newBuilder().setView(v.toDigeste()).setNote(node.getNote().getWrapped()).build(); } - private BiConsumer join(Duration duration, ScheduledExecutorService scheduler, - Timer.Context timer) { + private BiConsumer join(Duration duration, Timer.Context timer) { return (r, t) -> { if (t != null) { log.error("Failed seeding on: {}", node.getId(), t); @@ -228,11 +230,11 @@ private Join join(Digest v) { if (timer != null) { timer.close(); } - join(r, view, duration, scheduler); + join(r, view, duration); }; } - private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecutorService scheduler) { + private void join(Redirect redirect, Digest v, Duration duration) { var sample = redirect.getSampleList() .stream() .map(sn -> new NoteWrapper(sn.getNote(), digestAlgo)) @@ -242,7 +244,7 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu node.getId()); var gateway = new CompletableFuture(); var timer = metrics == null ? null : metrics.joinDuration().time(); - gateway.whenComplete(view.join(scheduler, duration, timer)); + gateway.whenComplete(view.join(duration, timer)); var regate = new AtomicReference(); var retries = new AtomicInteger(); @@ -261,6 +263,7 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu var majority = redirect.getBootstrap() ? 1 : Context.minimalQuorum(redirect.getRings(), this.context.getBias()); final var join = join(v); final var abandon = new AtomicInteger(); + var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); regate.set(() -> { redirecting.iterate((link, m) -> { log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId()); diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index cb1265229..20245de62 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -197,8 +197,7 @@ public UUID register(ViewLifecycleListener listener) { /** * Start the View */ - public void start(CompletableFuture onJoin, Duration d, List seedpods, - ScheduledExecutorService scheduler) { + public void start(CompletableFuture onJoin, Duration d, List seedpods) { Objects.requireNonNull(onJoin, "Join completion must not be null"); if (!started.compareAndSet(false, true)) { return; @@ -214,10 +213,11 @@ public void start(CompletableFuture onJoin, Duration d, List seedpod context.clear(); node.reset(); + var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); var initial = Entropy.nextBitsStreamLong(d.toNanos()); scheduler.schedule(Utils.wrapped( - () -> new Binding(this, seeds, d, scheduler, context, approaches, node, params, metrics, digestAlgo).seeding(), - log), initial, TimeUnit.NANOSECONDS); + () -> new Binding(this, seeds, d, context, approaches, node, params, metrics, digestAlgo).seeding(), log), + initial, TimeUnit.NANOSECONDS); log.info("{} started on: {}", context.getId(), node.getId()); } @@ -225,12 +225,12 @@ public void start(CompletableFuture onJoin, Duration d, List seedpod /** * Start the View */ - public void start(Runnable onJoin, Duration d, List seedpods, ScheduledExecutorService scheduler) { + public void start(Runnable onJoin, Duration d, List seedpods) { final var futureSailor = new CompletableFuture(); futureSailor.whenComplete((v, t) -> { onJoin.run(); }); - start(futureSailor, d, seedpods, scheduler); + start(futureSailor, d, seedpods); } /** @@ -326,8 +326,8 @@ boolean addToView(NoteWrapper note) { return true; } - void bootstrap(NoteWrapper nw, ScheduledExecutorService sched, Duration dur) { - viewManagement.bootstrap(nw, sched, dur); + void bootstrap(NoteWrapper nw, Duration dur) { + viewManagement.bootstrap(nw, dur); } Digest bootstrapView() { @@ -405,9 +405,8 @@ void introduced() { introduced.set(true); } - BiConsumer join(ScheduledExecutorService scheduler, Duration duration, - com.codahale.metrics.Timer.Context timer) { - return viewManagement.join(scheduler, duration, timer); + BiConsumer join(Duration duration, com.codahale.metrics.Timer.Context timer) { + return viewManagement.join(duration, timer); } void notifyListeners(List joining, List leaving) { @@ -490,7 +489,8 @@ void resetBootstrapView() { viewManagement.resetBootstrapView(); } - void schedule(final Duration duration, final ScheduledExecutorService scheduler) { + void schedule(final Duration duration) { + var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); futureGossip = scheduler.schedule(Utils.wrapped(() -> gossip(duration, scheduler), log), Entropy.nextBitsStreamLong(duration.toNanos()), TimeUnit.NANOSECONDS); } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 2aa75331d..ce264a656 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -86,7 +86,7 @@ boolean addJoin(Digest id, NoteWrapper note) { return joins.put(id, note) == null; } - void bootstrap(NoteWrapper nw, final ScheduledExecutorService sched, final Duration dur) { + void bootstrap(NoteWrapper nw, final Duration dur) { joins.put(nw.getId(), nw); context.activate(node); @@ -95,7 +95,7 @@ void bootstrap(NoteWrapper nw, final ScheduledExecutorService sched, final Durat new Ballot(currentView(), Collections.emptyList(), Collections.singletonList(node.getId()), digestAlgo))); view.scheduleViewChange(); - view.schedule(dur, sched); + view.schedule(dur); log.info("Bootstrapped view: {} cardinality: {} count: {} context: {} on: {}", currentView(), context.cardinality(), context.activeCount(), context.getId(), node.getId()); @@ -154,11 +154,9 @@ void install(Ballot ballot) { final var seedSet = context.sample(params.maximumTxfr(), Entropy.bitsStream(), node.getId()) .stream() .map(p -> p.note.getWrapped()) - .toList(); + .collect(Collectors.toSet()); - var cardinality = context.totalCount() + ballot.joining.size(); - - context.rebalance(cardinality); + context.rebalance(context.totalCount() + ballot.joining.size()); var joining = new ArrayList(); var pending = ballot.joining() .stream() @@ -174,7 +172,6 @@ void install(Ballot ballot) { .map(nw -> pendingJoins.remove(nw.getId())) .filter(p -> p != null) .toList(); - pendingJoins.clear(); setDiadem( HexBloom.construct(context.memberCount(), context.allMembers().map(p -> p.getId()), view.bootstrapView(), @@ -261,7 +258,6 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time if (contains(from)) { log.debug("Already a member: {} view: {} context: {} cardinality: {} on: {}", from, thisView, context.getId(), context.cardinality(), node.getId()); - pendingJoins.remove(from); joined(context.sample(params.maximumTxfr(), Entropy.bitsStream(), node.getId()) .stream() .map(p -> p.note.getWrapped()) @@ -284,7 +280,7 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time new StatusRuntimeException(Status.RESOURCE_EXHAUSTED.withDescription("No room at the inn"))); return; } - pendingJoins.put(from, seeds -> { + pendingJoins.computeIfAbsent(from, d -> seeds -> { log.info("Gateway established for: {} view: {} context: {} cardinality: {} on: {}", from, currentView(), context.getId(), context.cardinality(), node.getId()); joined(seeds, from, responseObserver, timer); @@ -295,8 +291,7 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time }); } - BiConsumer join(ScheduledExecutorService scheduler, Duration duration, - Timer.Context timer) { + BiConsumer join(Duration duration, Timer.Context timer) { return (bound, t) -> { view.viewChange(() -> { final var hex = bound.view(); @@ -320,21 +315,19 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time context.allMembers().forEach(p -> p.clearAccusations()); - view.introduced(); - - view.schedule(duration, scheduler); + view.schedule(duration); if (timer != null) { timer.stop(); } + view.introduced(); log.info("Currently joining view: {} seeds: {} cardinality: {} count: {} on: {}", currentView.get(), bound.successors().size(), context.cardinality(), context.totalCount(), node.getId()); if (context.totalCount() == context.cardinality()) { join(); } else { - var sample = new ArrayList<>(context.activeMembers()); - populate(sample, scheduler); + populate(new ArrayList(context.activeMembers())); } }); }; @@ -363,9 +356,10 @@ void maybeViewChange() { } } - void populate(List sample, ScheduledExecutorService scheduler) { + void populate(List sample) { var populate = new SliceIterator("Populate: " + context.getId(), node, sample, view.comm); var repopulate = new AtomicReference(); + var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); repopulate.set(() -> { populate.iterate((link, m) -> { log.debug("Populating: {} contacting: {} on: {}", context.getId(), link.getMember().getId(), @@ -386,10 +380,9 @@ void populate(List sample, ScheduledExecutorService scheduler) { return !joined(); }, () -> { if (!joined()) { - scheduler.schedule(Utils.wrapped(() -> repopulate.get(), log), params.retryDelay().toNanos(), - TimeUnit.NANOSECONDS); + scheduler.schedule(Utils.wrapped(() -> repopulate.get(), log), 500, TimeUnit.MILLISECONDS); } - }, scheduler, params.retryDelay()); + }, scheduler, Duration.ofMillis(500)); }); repopulate.get().run(); } @@ -524,8 +517,8 @@ private void joined(Collection seedSet, Digest from, StreamObserver< .addAllSuccessors(successors) .setDiadem(diadem.get().toHexBloome())) .build(); - log.trace("Gateway initial seeding: {} successors: {} for: {} on: {}", gateway.getInitialSeedSetCount(), - successors.size(), from, node.getId()); + log.info("Gateway initial seeding: {} successors: {} for: {} on: {}", gateway.getInitialSeedSetCount(), + successors.size(), from, node.getId()); responseObserver.onNext(gateway); responseObserver.onCompleted(); if (timer != null) { diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/EntranceServer.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/EntranceServer.java index 91efd8922..ee8be6913 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/EntranceServer.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/EntranceServer.java @@ -46,8 +46,11 @@ public void join(Join request, StreamObserver responseObserver) { return; } router.evaluate(responseObserver, s -> { - // async handling - s.join(request, from, responseObserver, timer); + try { + s.join(request, from, responseObserver, timer); + } catch (Throwable t) { + responseObserver.onError(t); + } }); } @@ -65,7 +68,13 @@ public void seed(Registration request, StreamObserver responseObserver return; } router.evaluate(responseObserver, s -> { - var r = s.seed(request, from); + Redirect r; + try { + r = s.seed(request, from); + } catch (Throwable t) { + responseObserver.onError(t); + return; + } responseObserver.onNext(r); responseObserver.onCompleted(); if (timer != null) { @@ -85,7 +94,13 @@ public void validate(EventCoords request, StreamObserver responseObs return; } router.evaluate(responseObserver, s -> { - var r = s.validateCoords(request, from); + Validation r; + try { + r = s.validateCoords(request, from); + } catch (Throwable t) { + responseObserver.onError(t); + return; + } responseObserver.onNext(r); responseObserver.onCompleted(); }); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java index 31b0cc159..becea6bd7 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java @@ -34,7 +34,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -109,9 +108,7 @@ public void churn() throws Exception { var countdown = new AtomicReference<>(new CountDownLatch(1)); long then = System.currentTimeMillis(); - views.get(0) - .start(() -> countdown.get().countDown(), gossipDuration, Collections.emptyList(), - Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())); + views.get(0).start(() -> countdown.get().countDown(), gossipDuration, Collections.emptyList()); assertTrue(countdown.get().await(30, TimeUnit.SECONDS), "Kernel did not bootstrap"); @@ -120,8 +117,7 @@ public void churn() throws Exception { var bootstrappers = views.subList(1, seeds.size()); countdown.set(new CountDownLatch(bootstrappers.size())); - bootstrappers.forEach(v -> v.start(() -> countdown.get().countDown(), gossipDuration, bootstrapSeed, - Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()))); + bootstrappers.forEach(v -> v.start(() -> countdown.get().countDown(), gossipDuration, bootstrapSeed)); // Test that all seeds up var success = countdown.get().await(30, TimeUnit.SECONDS); @@ -149,8 +145,7 @@ public void churn() throws Exception { then = System.currentTimeMillis(); countdown.set(new CountDownLatch(toStart.size())); - toStart.forEach(view -> view.start(() -> countdown.get().countDown(), gossipDuration, seeds, - Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()))); + toStart.forEach(view -> view.start(() -> countdown.get().countDown(), gossipDuration, seeds)); success = countdown.get().await(30, TimeUnit.SECONDS); failed = testViews.stream() diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java index 917fffe16..42289d102 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java @@ -34,7 +34,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -111,17 +110,14 @@ public void smokin() throws Exception { final var gossipDuration = Duration.ofMillis(largeTests ? 70 : 5); var countdown = new AtomicReference<>(new CountDownLatch(1)); - views.get(0) - .start(() -> countdown.get().countDown(), gossipDuration, Collections.emptyList(), - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory())); + views.get(0).start(() -> countdown.get().countDown(), gossipDuration, Collections.emptyList()); assertTrue(countdown.get().await(largeTests ? 2400 : 30, TimeUnit.SECONDS), "Kernel did not bootstrap"); var bootstrappers = views.subList(0, seeds.size()); countdown.set(new CountDownLatch(seeds.size() - 1)); bootstrappers.subList(1, bootstrappers.size()) - .forEach(v -> v.start(() -> countdown.get().countDown(), gossipDuration, bootstrapSeed, - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()))); + .forEach(v -> v.start(() -> countdown.get().countDown(), gossipDuration, bootstrapSeed)); // Test that all bootstrappers up var success = countdown.get().await(largeTests ? 2400 : 30, TimeUnit.SECONDS); @@ -134,8 +130,7 @@ public void smokin() throws Exception { // Start remaining views countdown.set(new CountDownLatch(views.size() - seeds.size())); - views.forEach(v -> v.start(() -> countdown.get().countDown(), gossipDuration, seeds, - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()))); + views.forEach(v -> v.start(() -> countdown.get().countDown(), gossipDuration, seeds)); success = countdown.get().await(largeTests ? 2400 : 30, TimeUnit.SECONDS); @@ -173,7 +168,7 @@ public void smokin() throws Exception { } private void initialize() { - var parameters = Parameters.newBuilder().build(); + var parameters = Parameters.newBuilder().setMaxPending(5).setMaximumTxfr(5).build(); registry = new MetricRegistry(); node0Registry = new MetricRegistry(); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java index f43b99bc4..67a85ba71 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java @@ -45,7 +45,6 @@ import java.time.Instant; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -144,9 +143,7 @@ public void smoke() throws Exception { var countdown = new AtomicReference<>(new CountDownLatch(1)); - views.get(0) - .start(() -> countdown.get().countDown(), duration, Collections.emptyList(), - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory())); + views.get(0).start(() -> countdown.get().countDown(), duration, Collections.emptyList()); assertTrue(countdown.get().await(30, TimeUnit.SECONDS), "KERNEL did not stabilize"); @@ -155,14 +152,12 @@ public void smoke() throws Exception { countdown.set(new CountDownLatch(seedlings.size())); - seedlings.forEach(view -> view.start(() -> countdown.get().countDown(), duration, kernel, - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()))); + seedlings.forEach(view -> view.start(() -> countdown.get().countDown(), duration, kernel)); assertTrue(countdown.get().await(30, TimeUnit.SECONDS), "Seeds did not stabilize"); countdown.set(new CountDownLatch(views.size() - seeds.size())); - views.forEach(view -> view.start(() -> countdown.get().countDown(), duration, seeds, - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()))); + views.forEach(view -> view.start(() -> countdown.get().countDown(), duration, seeds)); assertTrue(Utils.waitForCondition(120_000, 1_000, () -> { return views.stream() diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java index f38cfab6c..9b31182fa 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java @@ -34,7 +34,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -112,17 +111,14 @@ public void swarm() throws Exception { final var gossipDuration = Duration.ofMillis(largeTests ? 150 : 5); var countdown = new AtomicReference<>(new CountDownLatch(1)); - views.get(0) - .start(() -> countdown.get().countDown(), gossipDuration, Collections.emptyList(), - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory())); + views.get(0).start(() -> countdown.get().countDown(), gossipDuration, Collections.emptyList()); assertTrue(countdown.get().await(60, TimeUnit.SECONDS), "Kernel did not bootstrap"); var bootstrappers = views.subList(0, seeds.size()); countdown.set(new CountDownLatch(seeds.size() - 1)); bootstrappers.subList(1, bootstrappers.size()) - .forEach(v -> v.start(() -> countdown.get().countDown(), gossipDuration, bootstrapSeed, - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()))); + .forEach(v -> v.start(() -> countdown.get().countDown(), gossipDuration, bootstrapSeed)); // Test that all bootstrappers up var success = countdown.get().await(largeTests ? 2400 : 60, TimeUnit.SECONDS); @@ -135,8 +131,7 @@ public void swarm() throws Exception { // Start remaining views countdown.set(new CountDownLatch(views.size() - seeds.size())); - views.forEach(v -> v.start(() -> countdown.get().countDown(), gossipDuration, seeds, - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()))); + views.forEach(v -> v.start(() -> countdown.get().countDown(), gossipDuration, seeds)); success = countdown.get().await(largeTests ? 2400 : 120, TimeUnit.SECONDS); diff --git a/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java b/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java index 1b0ca2eb8..d5dc7ecb3 100644 --- a/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java +++ b/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java @@ -8,17 +8,13 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.salesforce.apollo.cryptography.proto.Digeste; -import com.salesforce.apollo.demesne.proto.DemesneParameters; -import com.salesforce.apollo.demesne.proto.SubContext; -import com.salesforce.apollo.test.proto.ByteMessage; -import com.salesforce.apollo.test.proto.TestItGrpc; -import com.salesforce.apollo.test.proto.TestItGrpc.TestItBlockingStub; -import com.salesforce.apollo.test.proto.TestItGrpc.TestItImplBase; import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.comm.grpc.DomainSocketServerInterceptor; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.cryptography.proto.Digeste; +import com.salesforce.apollo.demesne.proto.DemesneParameters; +import com.salesforce.apollo.demesne.proto.SubContext; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; import com.salesforce.apollo.model.demesnes.DemesneImpl; @@ -37,6 +33,10 @@ import com.salesforce.apollo.stereotomy.mem.MemKERL; import com.salesforce.apollo.stereotomy.mem.MemKeyStore; import com.salesforce.apollo.stereotomy.services.proto.ProtoKERLAdapter; +import com.salesforce.apollo.test.proto.ByteMessage; +import com.salesforce.apollo.test.proto.TestItGrpc; +import com.salesforce.apollo.test.proto.TestItGrpc.TestItBlockingStub; +import com.salesforce.apollo.test.proto.TestItGrpc.TestItImplBase; import io.grpc.*; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator; @@ -69,9 +69,9 @@ public class DemesneSmoke { private final static Class clientChannelType = IMPL.getChannelType(); private static final Class serverChannelType = IMPL.getServerDomainSocketChannelClass(); - private final static Executor executor = Executors.newCachedThreadPool( - Thread.ofVirtual().factory()); - private EventLoopGroup eventLoopGroup; + + private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private EventLoopGroup eventLoopGroup; public static ClientInterceptor clientInterceptor(Digest ctx) { return new ClientInterceptor() { diff --git a/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java b/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java index a492e9a5f..11c2f2fd5 100644 --- a/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java +++ b/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java @@ -125,7 +125,7 @@ protected void instantiate(SigningMember member, Context context) { final var url = String.format("jdbc:h2:mem:%s-%s;DB_CLOSE_ON_EXIT=FALSE", member.getId(), prefix); JdbcConnectionPool connectionPool = JdbcConnectionPool.create(url, "", ""); connectionPool.setMaxConnections(10); - var exec = Executors.newCachedThreadPool(Thread.ofVirtual().factory()); + var exec = Executors.newVirtualThreadPerTaskExecutor(); var router = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(2)); routers.put(member, router); dhts.put(member, diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java index afb34c044..242cf2279 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java @@ -39,17 +39,16 @@ * @author hal.hildebrand */ public class Enclave implements RouterSupplier { - private final static Class channelType = IMPL.getChannelType(); - private static final Logger log = LoggerFactory.getLogger( - Enclave.class); - private final Executor executor = Executors.newCachedThreadPool( - Thread.ofVirtual().factory()); - private final DomainSocketAddress bridge; - private final Consumer contextRegistration; - private final DomainSocketAddress endpoint; - private final EventLoopGroup eventLoopGroup = IMPL.getEventLoopGroup(); - private final Member from; - private final String fromString; + private final static Class channelType = IMPL.getChannelType(); + private static final Logger log = LoggerFactory.getLogger(Enclave.class); + + private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final DomainSocketAddress bridge; + private final Consumer contextRegistration; + private final DomainSocketAddress endpoint; + private final EventLoopGroup eventLoopGroup = IMPL.getEventLoopGroup(); + private final Member from; + private final String fromString; public Enclave(Member from, DomainSocketAddress endpoint, DomainSocketAddress bridge, Consumer contextRegistration) { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java index 0c09a6d14..7047f2039 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java @@ -34,12 +34,13 @@ * @author hal.hildebrand */ public class LocalServer implements RouterSupplier { - private static final Logger log = LoggerFactory.getLogger(LocalServer.class); - private static final String NAME_TEMPLATE = "%s-%s"; - private final Executor clientExecutor = Executors.newCachedThreadPool(Thread.ofVirtual().factory()); - private final ClientInterceptor clientInterceptor; - private final Member from; - private final String prefix; + private static final Logger log = LoggerFactory.getLogger(LocalServer.class); + private static final String NAME_TEMPLATE = "%s-%s"; + private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + + private final ClientInterceptor clientInterceptor; + private final Member from; + private final String prefix; public LocalServer(String prefix, Member member) { this.from = member; @@ -73,7 +74,7 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier
  • serverBuilder = InProcessServerBuilder.forName(name) - .executor(Executors.newCachedThreadPool()) + .executor(Executors.newVirtualThreadPerTaskExecutor()) .intercept(ConcurrencyLimitServerInterceptor.newBuilder( limitsBuilder.build()) .statusSupplier( @@ -93,7 +94,7 @@ public Digest getFrom() { private ManagedChannel connectTo(Member to) { final var name = String.format(NAME_TEMPLATE, prefix, qb64(to.getId())); final InProcessChannelBuilder builder = InProcessChannelBuilder.forName(name) - .executor(clientExecutor) + .executor(executor) .usePlaintext() .intercept(clientInterceptor); disableTrash(builder); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java index 6ad3a5cdc..8d1eeff41 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java @@ -25,7 +25,7 @@ * @author hal.hildebrand */ public class MtlsClient { - private final Executor exec = Executors.newCachedThreadPool(Thread.ofVirtual().factory()); + private final Executor exec = Executors.newVirtualThreadPerTaskExecutor(); private final ManagedChannel channel; diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java index 8a2a42a5f..7ae6f4661 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java @@ -137,7 +137,7 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier
  • { - private final static Class channelType = IMPL.getChannelType(); - private final Executor executor = Executors.newCachedThreadPool( - Thread.ofVirtual().factory()); - private final String agent; - private final EventLoopGroup eventLoopGroup = IMPL.getEventLoopGroup(); - private final Demultiplexer inbound; - private final Duration keepAlive; - private final Demultiplexer outbound; + private final static Class channelType = IMPL.getChannelType(); + + private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final String agent; + private final EventLoopGroup eventLoopGroup = IMPL.getEventLoopGroup(); + private final Demultiplexer inbound; + private final Duration keepAlive; + private final Demultiplexer outbound; public Portal(Digest agent, ServerBuilder inbound, Function outbound, DomainSocketAddress bridge, Duration keepAlive, Function router) { diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java index b7da14d0c..6bbeb99e7 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java @@ -76,10 +76,8 @@ public void iterate(Digest digest, Runnable onMajority, BiFunction(); Thread.ofVirtual() - .factory() - .newThread( - () -> internalIterate(digest, onMajority, round, failedMajority, handler, onComplete, tally, traversed)) - .start(); + .start( + () -> internalIterate(digest, onMajority, round, failedMajority, handler, onComplete, tally, traversed)); } @@ -208,6 +206,7 @@ private void proceed(Digest key, final boolean allow, Runnable onMajority, Runna } private void schedule(Runnable proceed) { - scheduler.schedule(Utils.wrapped(proceed, log), frequency.toNanos(), TimeUnit.NANOSECONDS); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(proceed, log)), frequency.toNanos(), + TimeUnit.NANOSECONDS); } } diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java index 7869345a9..9d3d98749 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java @@ -53,7 +53,8 @@ public SliceIterator(String label, SigningMember member, List public void iterate(BiFunction round, SlicePredicateHandler handler, Runnable onComplete, ScheduledExecutorService scheduler, Duration frequency) { log.trace("Starting iteration of: <{}> on: {}", label, member.getId()); - internalIterate(round, handler, onComplete, scheduler, frequency); + Thread.ofVirtual() + .start(Utils.wrapped(() -> internalIterate(round, handler, onComplete, scheduler, frequency), log)); } public void iterate(BiFunction round, SlicePredicateHandler handler, @@ -120,7 +121,8 @@ private void proceed(final boolean allow, Runnable proceed, Runnable onComplete, } } else if (allow) { log.trace("Proceeding for: <{}> on: {}", label, member.getId()); - scheduler.schedule(Utils.wrapped(proceed, log), frequency.toNanos(), TimeUnit.NANOSECONDS); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(proceed, log)), frequency.toNanos(), + TimeUnit.NANOSECONDS); } else { log.trace("Termination for: <{}> on: {}", label, member.getId()); } diff --git a/memberships/src/test/java/com/salesforce/apollo/archipeligo/DemultiplexerTest.java b/memberships/src/test/java/com/salesforce/apollo/archipeligo/DemultiplexerTest.java index 017cff0c6..3d061fad3 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipeligo/DemultiplexerTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipeligo/DemultiplexerTest.java @@ -56,14 +56,14 @@ */ public class DemultiplexerTest { - private static final Class channelType = IMPL.getChannelType(); - private static final Executor executor = Executors.newCachedThreadPool( - Thread.ofVirtual().factory()); - private final EventLoopGroup eventLoopGroup = IMPL.getEventLoopGroup(); - private final List opened = new ArrayList<>(); - private Server serverA; - private Server serverB; - private Demultiplexer terminus; + private static final Class channelType = IMPL.getChannelType(); + private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + + private final EventLoopGroup eventLoopGroup = IMPL.getEventLoopGroup(); + private final List opened = new ArrayList<>(); + private Server serverA; + private Server serverB; + private Demultiplexer terminus; @AfterEach public void after() throws InterruptedException { diff --git a/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java b/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java index 17bdf5de3..6aa17a496 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipeligo/EnclaveTest.java @@ -51,9 +51,9 @@ */ public class EnclaveTest { private final static Class channelType = IMPL.getChannelType(); - private static final Executor executor = Executors.newCachedThreadPool( - Thread.ofVirtual().factory()); - private final TestItService local = new TestItService() { + private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + + private final TestItService local = new TestItService() { @Override public void close() throws IOException { @@ -69,7 +69,7 @@ public Any ping(Any request) { return null; } }; - private EventLoopGroup eventLoopGroup; + private EventLoopGroup eventLoopGroup; @AfterEach public void after() throws Exception { diff --git a/model/src/main/java/com/salesforce/apollo/model/Domain.java b/model/src/main/java/com/salesforce/apollo/model/Domain.java index a067676ac..87112adb7 100644 --- a/model/src/main/java/com/salesforce/apollo/model/Domain.java +++ b/model/src/main/java/com/salesforce/apollo/model/Domain.java @@ -60,16 +60,16 @@ * @author hal.hildebrand */ abstract public class Domain { - private static final Logger log = LoggerFactory.getLogger(Domain.class); - protected final Executor executor = Executors.newCachedThreadPool( - Thread.ofVirtual().factory()); - protected final CHOAM choam; - protected final ControlledIdentifierMember member; - protected final Mutator mutator; - protected final Oracle oracle; - protected final Parameters params; - protected final SqlStateMachine sqlStateMachine; - protected final Connection stateConnection; + private static final Logger log = LoggerFactory.getLogger(Domain.class); + + protected final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + protected final CHOAM choam; + protected final ControlledIdentifierMember member; + protected final Mutator mutator; + protected final Oracle oracle; + protected final Parameters params; + protected final SqlStateMachine sqlStateMachine; + protected final Connection stateConnection; public Domain(ControlledIdentifierMember member, Parameters.Builder params, String dbURL, Path checkpointBaseDir, RuntimeParameters.Builder runtime) { diff --git a/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java b/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java index a9dde6296..d0583365e 100644 --- a/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java +++ b/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java @@ -70,16 +70,16 @@ public class DemesneImpl implements Demesne { private static final Duration DEFAULT_GOSSIP_INTERVAL = Duration.ofMillis(5); private static final EventLoopGroup eventLoopGroup = IMPL.getEventLoopGroup(); private static final Logger log = LoggerFactory.getLogger(DemesneImpl.class); - private final static Executor executor = Executors.newCachedThreadPool( - Thread.ofVirtual().factory()); - private final KERL kerl; - private final OuterContextClient outer; - private final DemesneParameters parameters; - private final AtomicBoolean started = new AtomicBoolean(); - private final Thoth thoth; - private final Context context; - private volatile SubDomain domain; - private volatile Enclave enclave; + + private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final KERL kerl; + private final OuterContextClient outer; + private final DemesneParameters parameters; + private final AtomicBoolean started = new AtomicBoolean(); + private final Thoth thoth; + private final Context context; + private volatile SubDomain domain; + private volatile Enclave enclave; public DemesneImpl(DemesneParameters parameters) throws GeneralSecurityException, IOException { assert parameters.hasContext() : "Must define context id"; diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index d8dbe0201..41eb9388e 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -39,7 +39,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -135,17 +134,14 @@ public void viewChange(Context context, Digest viewId, List started.get().countDown(), gossipDuration, Collections.emptyList(), - Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())); + .start(() -> started.get().countDown(), gossipDuration, Collections.emptyList()); assertTrue(started.get().await(10, TimeUnit.SECONDS), "Cannot start up kernel"); started.set(new CountDownLatch(CARDINALITY - 1)); domains.subList(1, domains.size()) .forEach(d -> Thread.ofVirtual() .start(() -> d.getFoundation() - .start(() -> started.get().countDown(), gossipDuration, seeds, - Executors.newScheduledThreadPool(1, Thread.ofVirtual() - .factory())))); + .start(() -> started.get().countDown(), gossipDuration, seeds))); assertTrue(started.get().await(30, TimeUnit.SECONDS), "could not start views"); assertTrue(countdown.await(30, TimeUnit.SECONDS), "Could not join all members in all views"); diff --git a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java index 1d289fc14..db0f5712d 100644 --- a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java @@ -76,9 +76,9 @@ public class DemesneTest { private final static Class clientChannelType = IMPL.getChannelType(); private static final Class serverChannelType = IMPL.getServerDomainSocketChannelClass(); - private final static Executor executor = Executors.newCachedThreadPool( - Thread.ofVirtual().factory()); - private final TestItService local = new TestItService() { + private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + + private final TestItService local = new TestItService() { @Override public void close() throws IOException { @@ -94,7 +94,7 @@ public Any ping(Any request) { return null; } }; - private EventLoopGroup eventLoopGroup; + private EventLoopGroup eventLoopGroup; public static ClientInterceptor clientInterceptor(Digest ctx) { return new ClientInterceptor() { diff --git a/model/src/test/resources/logback-test.xml b/model/src/test/resources/logback-test.xml index c760efb9f..ce3bc4518 100644 --- a/model/src/test/resources/logback-test.xml +++ b/model/src/test/resources/logback-test.xml @@ -57,7 +57,7 @@ - + diff --git a/pom.xml b/pom.xml index 5a84d083f..4ff5f455c 100644 --- a/pom.xml +++ b/pom.xml @@ -640,10 +640,6 @@ org.apache.maven.plugins maven-surefire-plugin - - 1 - false - org.apache.maven.plugins diff --git a/protocols/src/test/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiterTest.java b/protocols/src/test/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiterTest.java index 522449db1..157b2ef24 100644 --- a/protocols/src/test/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiterTest.java +++ b/protocols/src/test/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiterTest.java @@ -16,7 +16,7 @@ public class LifoBlockingLimiterTest { - private final Executor executor = Executors.newCachedThreadPool(Thread.ofVirtual().factory()); + private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); private LifoBlockingLimiter blockingLimiter; private SettableLimit limit; private SimpleLimiter simpleLimiter; diff --git a/protocols/src/test/java/com/salesforce/apollo/comm/grpc/MtlsClient.java b/protocols/src/test/java/com/salesforce/apollo/comm/grpc/MtlsClient.java index 8489843d2..d3bc3f599 100644 --- a/protocols/src/test/java/com/salesforce/apollo/comm/grpc/MtlsClient.java +++ b/protocols/src/test/java/com/salesforce/apollo/comm/grpc/MtlsClient.java @@ -28,7 +28,7 @@ * @author hal.hildebrand */ public class MtlsClient { - private static final Executor exec = Executors.newCachedThreadPool(Thread.ofVirtual().factory()); + private static final Executor exec = Executors.newVirtualThreadPerTaskExecutor(); private final ManagedChannel channel; public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, ClientContextSupplier supplier, diff --git a/protocols/src/test/java/com/salesforce/apollo/comm/grpc/MtlsServer.java b/protocols/src/test/java/com/salesforce/apollo/comm/grpc/MtlsServer.java index 33f6174e7..dd6900baf 100644 --- a/protocols/src/test/java/com/salesforce/apollo/comm/grpc/MtlsServer.java +++ b/protocols/src/test/java/com/salesforce/apollo/comm/grpc/MtlsServer.java @@ -67,7 +67,7 @@ public Digest load(X509Certificate key) throws Exception { .withChildOption(ChannelOption.TCP_NODELAY, true) .intercept(interceptor) .intercept(EnableCompressionInterceptor.SINGLETON); - builder.executor(Executors.newCachedThreadPool(Thread.ofVirtual().factory())); + builder.executor(Executors.newVirtualThreadPerTaskExecutor()); server = builder.build(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java b/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java index 14f66760e..da88d2fd1 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java @@ -21,21 +21,20 @@ import java.util.function.Supplier; class Transactioneer { - private final static Random entropy = new Random(); - private final static Logger log = LoggerFactory.getLogger(Transactioneer.class); - private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, - Thread.ofVirtual() - .factory()); - private final Executor executor = Executors.newCachedThreadPool( - Thread.ofVirtual().factory()); - private final AtomicInteger completed = new AtomicInteger(); - private final CountDownLatch countdown; - private final AtomicReference inFlight = new AtomicReference<>(); - private final int max; - private final Mutator mutator; - private final Duration timeout; - private final Supplier update; - private final AtomicBoolean finished = new AtomicBoolean(); + private final static Random entropy = new Random(); + private final static Logger log = LoggerFactory.getLogger(Transactioneer.class); + private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual() + .factory()); + + private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final AtomicInteger completed = new AtomicInteger(); + private final CountDownLatch countdown; + private final AtomicReference inFlight = new AtomicReference<>(); + private final int max; + private final Mutator mutator; + private final Duration timeout; + private final Supplier update; + private final AtomicBoolean finished = new AtomicBoolean(); public Transactioneer(Supplier update, Mutator mutator, Duration timeout, int max, CountDownLatch countdown) { this.update = update;