From 6a19047a5756d30d80675ef8fbde0d0461402754 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Thu, 6 Jun 2024 07:56:19 -0700 Subject: [PATCH] Believe the view Reconfiguration is now fixed. Significant work on join protocol. Added new Chillin' state to ensure we get full membership through slow joiners. --- choam/pom.xml | 11 - .../com/salesforce/apollo/choam/CHOAM.java | 204 +++++++++++------- .../salesforce/apollo/choam/ViewAssembly.java | 41 ++-- .../apollo/choam/comm/Terminal.java | 11 +- .../apollo/choam/comm/TerminalClient.java | 7 +- .../apollo/choam/fsm/Reconfiguration.java | 42 +++- .../salesforce/apollo/choam/DynamicTest.java | 4 +- .../apollo/choam/MembershipTests.java | 4 +- .../salesforce/apollo/choam/TestCHOAM.java | 8 +- choam/src/test/resources/logback-test.xml | 2 +- .../apollo/fireflies/ChurnTest.java | 5 +- .../apollo/fireflies/SwarmTest.java | 5 +- .../messaging/rbc/ReliableBroadcaster.java | 4 +- model/pom.xml | 7 - .../apollo/model/ContainmentDomainTest.java | 9 +- .../salesforce/apollo/model/DomainTest.java | 9 +- .../apollo/model/FireFliesTest.java | 9 +- pom.xml | 2 +- .../apollo/state/AbstractLifecycleTest.java | 5 +- .../salesforce/apollo/state/CHOAMTest.java | 5 +- 20 files changed, 239 insertions(+), 155 deletions(-) diff --git a/choam/pom.xml b/choam/pom.xml index 6ee71890c..d4c07c80e 100644 --- a/choam/pom.xml +++ b/choam/pom.xml @@ -53,15 +53,4 @@ test - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - - diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 449305faf..c50ed31b1 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -7,6 +7,7 @@ package com.salesforce.apollo.choam; import com.chiralbehaviors.tron.Fsm; +import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.InvalidProtocolBufferException; @@ -37,6 +38,7 @@ import com.salesforce.apollo.messaging.proto.AgedMessageOrBuilder; import com.salesforce.apollo.utils.Utils; import io.grpc.StatusRuntimeException; +import io.netty.util.concurrent.ImmediateExecutor; import org.h2.mvstore.MVMap; import org.joou.ULong; import org.slf4j.Logger; @@ -73,30 +75,31 @@ public class CHOAM { private static final Logger log = LoggerFactory.getLogger(CHOAM.class); - private final Map cachedCheckpoints = new ConcurrentHashMap<>(); - private final AtomicReference checkpoint = new AtomicReference<>(); - private final ReliableBroadcaster combine; - private final CommonCommunications comm; - private final AtomicReference current = new AtomicReference<>(); - private final ExecutorService executions; - private final AtomicReference> futureBootstrap = new AtomicReference<>(); - private final AtomicReference> futureSynchronization = new AtomicReference<>(); - private final AtomicReference genesis = new AtomicReference<>(); - private final AtomicReference head = new AtomicReference<>(); - private final ExecutorService linear; - private final AtomicReference next = new AtomicReference<>(); - private final AtomicReference nextViewId = new AtomicReference<>(); - private final Parameters params; - private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>(); - private final RoundScheduler roundScheduler; - private final Session session; - private final AtomicBoolean started = new AtomicBoolean(); - private final Store store; - private final CommonCommunications submissionComm; - private final Combine.Transitions transitions; - private final TransSubmission txnSubmission = new TransSubmission(); - private final AtomicReference view = new AtomicReference<>(); - private final PendingViews pendingViews = new PendingViews(); + private final Map cachedCheckpoints = new ConcurrentHashMap<>(); + private final AtomicReference checkpoint = new AtomicReference<>(); + private final ReliableBroadcaster combine; + private final CommonCommunications comm; + private final AtomicReference current = new AtomicReference<>(); + private final ExecutorService executions; + private final AtomicReference> futureBootstrap = new AtomicReference<>(); + private final AtomicReference> futureSynchronization = new AtomicReference<>(); + private final AtomicReference genesis = new AtomicReference<>(); + private final AtomicReference head = new AtomicReference<>(); + private final ExecutorService linear; + private final AtomicReference next = new AtomicReference<>(); + private final AtomicReference nextViewId = new AtomicReference<>(); + private final Parameters params; + private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>(); + private final RoundScheduler roundScheduler; + private final Session session; + private final AtomicBoolean started = new AtomicBoolean(); + private final Store store; + private final CommonCommunications submissionComm; + private final Combine.Transitions transitions; + private final TransSubmission txnSubmission = new TransSubmission(); + private final AtomicReference view = new AtomicReference<>(); + private final PendingViews pendingViews = new PendingViews(); + private volatile AtomicBoolean ongoingJoin; public CHOAM(Parameters params) { this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build()); @@ -367,9 +370,15 @@ public void stop() { } final var c = current.get(); if (c != null) { - c.complete(); + try { + c.complete(); + } catch (Throwable e) { + } + } + try { + combine.stop(); + } catch (Throwable e) { } - combine.stop(); } private void accept(HashedCertifiedBlock next) { @@ -733,6 +742,12 @@ private void reconfigure(Digest hash, Reconfigure reconfigure) { } else { current.set(new Client(validators, getViewId())); } + final var oj = ongoingJoin; + ongoingJoin = null; + if (oj != null) { + log.trace("Halting ongoing join on: {}", params.member().getId()); + oj.set(true); + } log.info("Reconfigured to view: {} committee: {} validators: {} on: {}", new Digest(reconfigure.getId()), current.get().getClass().getSimpleName(), validators.entrySet() .stream() @@ -1263,6 +1278,7 @@ public Blocks fetchViewChain(BlockReplication request, Digest from) { @Override public Empty join(SignedViewMember nextView, Digest from) { + log.trace("Member: {} joining on: {}", from, params.member().getId()); CHOAM.this.join(nextView, from); return Empty.getDefaultInstance(); } @@ -1275,10 +1291,9 @@ public Initial sync(Synchronize request, Digest from) { /** abstract class to maintain the common state */ private abstract class Administration implements Committee { - protected final Digest viewId; - private final GroupIterator servers; - private final Map validators; - private volatile JoinState ongoingJoin; + protected final Digest viewId; + private final GroupIterator servers; + private final Map validators; public Administration(Map validators, Digest viewId) { this.validators = validators; @@ -1288,12 +1303,6 @@ public Administration(Map validators, Digest viewId) { @Override public void accept(HashedCertifiedBlock hb) { - final var oj = ongoingJoin; - ongoingJoin = null; - if (oj != null) { - oj.halt.set(true); - oj.joining.interrupt(); - } process(); } @@ -1386,28 +1395,44 @@ private void join(View view) { if (ongoingJoin != null) { throw new IllegalStateException("Ongoing join should have been cancelled"); } - log.info("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), - params.member().getId()); + log.trace("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + params.member().getId()); var servers = new ConcurrentSkipListSet<>(validators.keySet()); var joined = new AtomicInteger(); var halt = new AtomicBoolean(false); - - ongoingJoin = new JoinState(halt, Thread.ofVirtual().start(Utils.wrapped(() -> { - log.error("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + ongoingJoin = halt; + Thread.ofVirtual().start(Utils.wrapped(() -> { + log.trace("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), params.member().getId()); - while (!halt.get() & joined.get() < view.getMajority()) { - join(view, servers, joined); - } - log.info("Finishing join of: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), - params.member().getId()); - ongoingJoin = null; - }, log()))); - } - private void join(View view, Collection servers, AtomicInteger joined) { + var scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); + AtomicReference action = new AtomicReference<>(); + var attempts = new AtomicInteger(); + action.set(() -> { + log.trace("Join attempt: {} halt: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(), + halt.get(), joined.get(), view.getMajority(), params.member().getId()); + if (!halt.get() & joined.get() < view.getMajority()) { + join(view, servers, joined); + if (joined.get() >= view.getMajority()) { + ongoingJoin = null; + log.trace("Finished join of: {} diadem: {} joins: {} on: {}", nextViewId.get(), + Digest.from(view.getDiadem()), joined.get(), params.member().getId()); + } else if (!halt.get()) { + log.trace("Rescheduling join of: {} diadem: {} joins: {} on: {}", nextViewId.get(), + Digest.from(view.getDiadem()), joined.get(), params.member().getId()); + scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS); + } + } + }); + scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS); + }, log())); + } + private void join(View view, Collection members, AtomicInteger joined) { + var sampled = new ArrayList<>(members); + Collections.shuffle(sampled); log.trace("Joining view: {} diadem: {} servers: {} on: {}", viewId, Digest.from(view.getDiadem()), - servers.stream().map(Member::getId).toList(), params.member().getId()); + sampled.stream().map(Member::getId).toList(), params.member().getId()); final var c = next.get(); var inView = ViewMember.newBuilder(c.member) .setDiadem(view.getDiadem()) @@ -1417,34 +1442,69 @@ private void join(View view, Collection servers, AtomicInteger joined) { .setVm(inView) .setSignature(params.member().sign(inView.toByteString()).toSig()) .build(); - var countdown = new CountDownLatch(servers.size()); - - servers.stream().map(comm::connect).filter(Objects::nonNull).forEach(t -> { - Thread.ofVirtual().start(Utils.wrapped(() -> { - try { - t.join(svm); - servers.remove(t.getMember()); - joined.incrementAndGet(); - countdown.countDown(); - log.trace("Joined with: {} view: {} diadem: {} on: {}", t.getMember().getId(), viewId, - Digest.from(view.getDiadem()), params.member().getId()); - } catch (StatusRuntimeException sre) { - log.trace("Failed join attempt: {} with: {} view: {} diadem: {} on: {}", sre.getStatus(), - t.getMember().getId(), nextViewId, Digest.from(view.getDiadem()), - params.member().getId(), sre); - } catch (Throwable throwable) { - log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", t.getMember().getId(), - nextViewId, Digest.from(view.getDiadem()), params.member().getId(), throwable); - } - }, log())); + var countdown = new CountDownLatch(sampled.size()); + sampled.stream().map(m -> { + var connection = comm.connect(m); + log.trace("connect to: {} is: {} on: {}", m.getId(), connection, params.member().getId()); + return connection; + }).map(t -> t == null ? null : join(view, t, svm)).forEach(t -> { + if (t == null) { + countdown.countDown(); + } else { + t.fs.addListener(() -> { + try { + t.fs.get(); + members.remove(t.m); + joined.incrementAndGet(); + log.trace("Joined with: {} view: {} diadem: {} on: {}", t.m.getId(), + Digest.from(inView.getId()), Digest.from(view.getDiadem()), + params.member().getId()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Failed to join with: {} view: {} diadem: {} on: {}", t.m.getId(), viewId, + Digest.from(view.getDiadem()), params.member().getId(), e.getCause()); + } catch (Throwable e) { + log.error("Failed to join with: {} view: {} diadem: {} on: {}", t.m.getId(), viewId, + Digest.from(view.getDiadem()), params.member().getId(), e); + } finally { + countdown.countDown(); + } + }, ImmediateExecutor.INSTANCE); + } }); try { - countdown.await(2, TimeUnit.SECONDS); + countdown.await(5, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } + private Attempt join(View view, Terminal t, SignedViewMember svm) { + try { + log.trace("Attempting to join with: {} context: {} diadem: {} on: {}", t.getMember().getId(), + context().getId(), Digest.from(view.getDiadem()), params.member().getId()); + return new Attempt(t.getMember(), t.join(svm)); + } catch (StatusRuntimeException sre) { + log.trace("Failed join attempt: {} with: {} view: {} diadem: {} on: {}", sre.getStatus(), + t.getMember().getId(), nextViewId, Digest.from(view.getDiadem()), params.member().getId(), + sre); + } catch (Throwable throwable) { + log.error("Failed join attempt with: {} view: {} diadem: {} on: {}", t.getMember().getId(), nextViewId, + Digest.from(view.getDiadem()), params.member().getId(), throwable); + } finally { + try { + t.close(); + } catch (IOException e) { + // ignored + } + } + return null; + } + + record Attempt(Member m, ListenableFuture fs) { + } + private record JoinState(AtomicBoolean halt, Thread joining) { } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index d922991fb..2e1d5cb11 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -83,9 +83,6 @@ public Map getSlate() { public void joined(SignedViewMember viewMember) { final var mid = Digest.from(viewMember.getVm().getId()); - if (!validate(mid, viewMember)) { - return; - } joins.put(mid, SignedJoin.newBuilder() .setMember(params().member().getId().toDigeste()) .setJoin(viewMember) @@ -219,7 +216,7 @@ void join(List joins) { proposals.put(mid, svm); } } - checkAssembly(); + transitions.checkAssembly(); } void newEpoch() { @@ -237,15 +234,15 @@ private Map assemblyOf(List committee) { .collect(Collectors.toMap(Member::getId, m -> m)); } - private void checkAssembly() { + private boolean checkAssembly() { if (selected == null) { - return; + return false; } if (proposals.size() == selected.majority) { transitions.certified(); - } else if (proposals.size() >= selected.majority) { - transitions.gathered(); + return true; } + return false; } private Parameters params() { @@ -405,24 +402,40 @@ private class Recon implements Reconfiguration { @Override public void certify() { if (proposals.size() == selected.majority) { - log.debug("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority, - nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); + log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority, + nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); transitions.certified(); } else { - countdown.set(3); - log.debug("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority, - proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); + countdown.set(4); + log.info("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority, + proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); } } public void checkAssembly() { - ViewAssembly.this.checkAssembly(); + if (ViewAssembly.this.checkAssembly()) { + return; + } + if (proposals.size() >= selected.majority) { + transitions.chill(); + } else { + log.info("Check assembly: {} on: {}", proposals.size(), params().member().getId()); + } } public void checkViews() { vote(); } + @Override + public void chill() { + if (ViewAssembly.this.checkAssembly()) { + transitions.certified(); + } else { + countdown.set(2); + } + } + @Override public void complete() { ViewAssembly.this.complete(); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java index 394f4664d..080aea7c8 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java @@ -6,6 +6,8 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.Link; import com.salesforce.apollo.choam.proto.*; @@ -47,8 +49,11 @@ public Member getMember() { } @Override - public Empty join(SignedViewMember join) { - return service.join(join, member.getId()); + public ListenableFuture join(SignedViewMember join) { + var j = service.join(join, member.getId()); + SettableFuture sf = SettableFuture.create(); + sf.set(j); + return sf; } @Override @@ -64,7 +69,7 @@ public Initial sync(Synchronize sync) { Blocks fetchViewChain(BlockReplication replication); - Empty join(SignedViewMember join); + ListenableFuture join(SignedViewMember join); Initial sync(Synchronize sync); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java index 26ee81aa0..48fe85f76 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java @@ -6,6 +6,7 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.ManagedServerChannel; import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications; @@ -20,12 +21,14 @@ public class TerminalClient implements Terminal { private final ManagedServerChannel channel; private final TerminalGrpc.TerminalBlockingStub client; + private final TerminalGrpc.TerminalFutureStub asyncClient; @SuppressWarnings("unused") private final ChoamMetrics metrics; public TerminalClient(ManagedServerChannel channel, ChoamMetrics metrics) { this.channel = channel; this.client = channel.wrap(TerminalGrpc.newBlockingStub(channel)); + this.asyncClient = channel.wrap(TerminalGrpc.newFutureStub(channel)); this.metrics = metrics; } @@ -60,8 +63,8 @@ public Member getMember() { } @Override - public Empty join(SignedViewMember vm) { - return client.join(vm); + public ListenableFuture join(SignedViewMember vm) { + return asyncClient.join(vm); } public void release() { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java index 01146befd..246daa51a 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java @@ -19,6 +19,8 @@ public interface Reconfiguration { void checkViews(); + void chill(); + void complete(); void failed(); @@ -60,10 +62,9 @@ public void certify() { context().certify(); } }, GATHER { - // We have a majority of the new committee Joins @Override - public Transitions gathered() { - return CERTIFICATION; + public Transitions chill() { + return CHILLIN; } // We have a full complement of the new committee Joins @@ -77,6 +78,29 @@ public Transitions certified() { public void gather() { context().checkAssembly(); } + + @Override + public Transitions checkAssembly() { + context().checkAssembly(); + return null; + } + }, CHILLIN { + @Override + public Transitions countdownCompleted() { + return certified(); + } + + // We have what we have + @Override + public Transitions certified() { + return CERTIFICATION; + } + + // Check to see if we already have a full complement of committee Joins + @Entry + public void chillin() { + context().chill(); + } }, PROTOCOL_FAILURE { @Override public Transitions certified() { @@ -139,6 +163,14 @@ default Transitions certified() { throw fsm().invalidTransitionOn(); } + default Transitions checkAssembly() { + throw fsm().invalidTransitionOn(); + } + + default Transitions chill() { + throw fsm().invalidTransitionOn(); + } + default Transitions complete() { throw fsm().invalidTransitionOn(); } @@ -151,10 +183,6 @@ default Transitions failed() { return Reconfigure.PROTOCOL_FAILURE; } - default Transitions gathered() { - throw fsm().invalidTransitionOn(); - } - default Transitions proposed() { throw fsm().invalidTransitionOn(); } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java index ebd508af6..fe29a7e45 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java @@ -3,7 +3,6 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContext; @@ -64,10 +63,9 @@ public void setUp() throws Exception { .toList(); final var prefix = UUID.randomUUID().toString(); - var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); routers = members.stream() .collect(Collectors.toMap(m -> m, m -> new LocalServer(prefix, m).router( - ServerConnectionCache.newBuilder().setTarget(cardinality * 2), executor))); + ServerConnectionCache.newBuilder().setTarget(cardinality * 2)))); var template = Parameters.newBuilder() .setGenerateGenesis(true) diff --git a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java index febe8f415..effd854f5 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java @@ -9,7 +9,6 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters.BootstrapParameters; import com.salesforce.apollo.choam.Parameters.ProducerParameters; @@ -171,10 +170,9 @@ public SigningMember initialize(int checkpointBlockSize, int cardinality) throws SigningMember testSubject = new ControlledIdentifierMember(stereotomy.newIdentifier()); final var prefix = UUID.randomUUID().toString(); - var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); routers = members.stream() .collect(Collectors.toMap(Member::getId, m -> new LocalServer(prefix, m).router( - ServerConnectionCache.newBuilder().setTarget(cardinality), executor))); + ServerConnectionCache.newBuilder().setTarget(cardinality)))); routers.put(testSubject.getId(), new LocalServer(prefix, testSubject).router( ServerConnectionCache.newBuilder().setTarget(cardinality))); choams = new HashMap<>(); diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index 66d941748..391706bf2 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -8,7 +8,10 @@ import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.MetricRegistry; -import com.salesforce.apollo.archipelago.*; +import com.salesforce.apollo.archipelago.LocalServer; +import com.salesforce.apollo.archipelago.Router; +import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters.ProducerParameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -121,12 +124,11 @@ public void before() throws Exception { .toList(); var context = new StaticContext<>(origin, 0.2, members, 3); final var prefix = UUID.randomUUID().toString(); - var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); routers = members.stream() .collect(Collectors.toMap(m -> m.getId(), m -> new LocalServer(prefix, m).router( ServerConnectionCache.newBuilder() .setMetrics(new ServerConnectionCacheMetricsImpl(registry)) - .setTarget(CARDINALITY), executor))); + .setTarget(CARDINALITY)))); choams = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { var recording = new AtomicInteger(); blocks.put(m.getId(), recording); diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml index 9f1825d70..d5be6a052 100644 --- a/choam/src/test/resources/logback-test.xml +++ b/choam/src/test/resources/logback-test.xml @@ -33,7 +33,7 @@ - + diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java index dcfc99b4f..9825e45f6 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java @@ -273,7 +273,6 @@ private void initialize() { AtomicBoolean frist = new AtomicBoolean(true); final var prefix = UUID.randomUUID().toString(); final var gatewayPrefix = UUID.randomUUID().toString(); - var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); views = members.values().stream().map(node -> { DynamicContext context = ctxBuilder.build(); FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(), @@ -289,8 +288,8 @@ private void initialize() { .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) - ? node0Registry : registry)), - executor); + ? node0Registry + : registry))); comms.start(); communications.add(comms); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java index b18c32a3e..0e4f93dc6 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java @@ -226,7 +226,6 @@ private void initialize() { AtomicBoolean frist = new AtomicBoolean(true); final var prefix = UUID.randomUUID().toString(); final var gatewayPrefix = UUID.randomUUID().toString(); - var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); views = members.values().stream().map(node -> { DynamicContext context = ctxBuilder.build(); FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(), @@ -242,8 +241,8 @@ private void initialize() { .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) - ? node0Registry : registry)), - executor); + ? node0Registry + : registry))); comms.start(); communications.add(comms); diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index 290f5299a..a4b9f149f 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -328,10 +328,10 @@ public static Parameters.Builder newBuilder() { public static class Builder implements Cloneable { private int bufferSize = 1500; private int dedupBufferSize = 100; - private double dedupFpr = Math.pow(10, -9); + private double dedupFpr = Math.pow(10, -6); private int deliveredCacheSize = 100; private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT; - private double falsePositiveRate = 0.00125; + private double falsePositiveRate = 0.0000125; private int maxMessages = 500; public Parameters build() { diff --git a/model/pom.xml b/model/pom.xml index 2db95ab73..3b9acd54a 100644 --- a/model/pom.xml +++ b/model/pom.xml @@ -76,13 +76,6 @@ - - org.apache.maven.plugins - maven-surefire-plugin - - false - - org.apache.maven.plugins maven-antrun-plugin diff --git a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java index 8df39f7d0..6494cbecb 100644 --- a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java @@ -6,7 +6,10 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.*; +import com.salesforce.apollo.archipelago.EndpointProvider; +import com.salesforce.apollo.archipelago.LocalServer; +import com.salesforce.apollo.archipelago.Router; +import com.salesforce.apollo.archipelago.ServerConnectionCache; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -79,11 +82,9 @@ public void before() throws Exception { var sealed = FoundationSeal.newBuilder().build(); final var group = DigestAlgorithm.DEFAULT.getOrigin(); - var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), - executor); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); routers.add(localRouter); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1), diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index 3cb29dd42..dd5f0503f 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -6,7 +6,10 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.*; +import com.salesforce.apollo.archipelago.EndpointProvider; +import com.salesforce.apollo.archipelago.LocalServer; +import com.salesforce.apollo.archipelago.Router; +import com.salesforce.apollo.archipelago.ServerConnectionCache; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -235,11 +238,9 @@ public void before() throws Exception { var sealed = FoundationSeal.newBuilder().build(); final var group = DigestAlgorithm.DEFAULT.getOrigin(); - var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), - executor); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); routers.add(localRouter); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1), diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index c6a48aa41..beb936b33 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -6,7 +6,10 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.*; +import com.salesforce.apollo.archipelago.EndpointProvider; +import com.salesforce.apollo.archipelago.LocalServer; +import com.salesforce.apollo.archipelago.Router; +import com.salesforce.apollo.archipelago.ServerConnectionCache; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -78,12 +81,10 @@ public void before() throws Exception { Digest group = DigestAlgorithm.DEFAULT.getOrigin(); var sealed = FoundationSeal.newBuilder().build(); - var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); identities.forEach((digest, id) -> { var context = new DynamicContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3); final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), - executor); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofSeconds(5), "jdbc:h2:mem:%s-state".formatted(digest), diff --git a/pom.xml b/pom.xml index 9e386ff13..416caaf06 100644 --- a/pom.xml +++ b/pom.xml @@ -783,7 +783,7 @@ 3.2.5 ${forks} - false + true -Xmx10G -Xms100M -Djdk.tracePinnedThreads=full diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index cc2d78819..028f4b67b 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -9,7 +9,6 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.CHOAM; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.BootstrapParameters; @@ -158,10 +157,8 @@ public void before() throws Exception { members.stream().filter(s -> s != testSubject).forEach(s -> context.activate(s)); final var prefix = UUID.randomUUID().toString(); - var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { - var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30), - executor); + var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30)); return localRouter; })); routers.put(testSubject.getId(), diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java index e3a526a70..e8cb71761 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java @@ -11,7 +11,6 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.CHOAM; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters; @@ -155,10 +154,8 @@ public void before() throws Exception { }).map(cpk -> new ControlledIdentifierMember(cpk)).map(e -> (SigningMember) e).toList(); members.forEach(m -> context.activate(m)); final var prefix = UUID.randomUUID().toString(); - var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { - var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30), - executor); + var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30)); return localRouter; })); choams = members.stream()