diff --git a/choam/pom.xml b/choam/pom.xml
index 6ee71890c..d4c07c80e 100644
--- a/choam/pom.xml
+++ b/choam/pom.xml
@@ -53,15 +53,4 @@
test
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
-
- false
-
-
-
-
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
index 449305faf..c50ed31b1 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
@@ -7,6 +7,7 @@
package com.salesforce.apollo.choam;
import com.chiralbehaviors.tron.Fsm;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -37,6 +38,7 @@
import com.salesforce.apollo.messaging.proto.AgedMessageOrBuilder;
import com.salesforce.apollo.utils.Utils;
import io.grpc.StatusRuntimeException;
+import io.netty.util.concurrent.ImmediateExecutor;
import org.h2.mvstore.MVMap;
import org.joou.ULong;
import org.slf4j.Logger;
@@ -73,30 +75,31 @@
public class CHOAM {
private static final Logger log = LoggerFactory.getLogger(CHOAM.class);
- private final Map cachedCheckpoints = new ConcurrentHashMap<>();
- private final AtomicReference checkpoint = new AtomicReference<>();
- private final ReliableBroadcaster combine;
- private final CommonCommunications comm;
- private final AtomicReference current = new AtomicReference<>();
- private final ExecutorService executions;
- private final AtomicReference> futureBootstrap = new AtomicReference<>();
- private final AtomicReference> futureSynchronization = new AtomicReference<>();
- private final AtomicReference genesis = new AtomicReference<>();
- private final AtomicReference head = new AtomicReference<>();
- private final ExecutorService linear;
- private final AtomicReference next = new AtomicReference<>();
- private final AtomicReference nextViewId = new AtomicReference<>();
- private final Parameters params;
- private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>();
- private final RoundScheduler roundScheduler;
- private final Session session;
- private final AtomicBoolean started = new AtomicBoolean();
- private final Store store;
- private final CommonCommunications submissionComm;
- private final Combine.Transitions transitions;
- private final TransSubmission txnSubmission = new TransSubmission();
- private final AtomicReference view = new AtomicReference<>();
- private final PendingViews pendingViews = new PendingViews();
+ private final Map cachedCheckpoints = new ConcurrentHashMap<>();
+ private final AtomicReference checkpoint = new AtomicReference<>();
+ private final ReliableBroadcaster combine;
+ private final CommonCommunications comm;
+ private final AtomicReference current = new AtomicReference<>();
+ private final ExecutorService executions;
+ private final AtomicReference> futureBootstrap = new AtomicReference<>();
+ private final AtomicReference> futureSynchronization = new AtomicReference<>();
+ private final AtomicReference genesis = new AtomicReference<>();
+ private final AtomicReference head = new AtomicReference<>();
+ private final ExecutorService linear;
+ private final AtomicReference next = new AtomicReference<>();
+ private final AtomicReference nextViewId = new AtomicReference<>();
+ private final Parameters params;
+ private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>();
+ private final RoundScheduler roundScheduler;
+ private final Session session;
+ private final AtomicBoolean started = new AtomicBoolean();
+ private final Store store;
+ private final CommonCommunications submissionComm;
+ private final Combine.Transitions transitions;
+ private final TransSubmission txnSubmission = new TransSubmission();
+ private final AtomicReference view = new AtomicReference<>();
+ private final PendingViews pendingViews = new PendingViews();
+ private volatile AtomicBoolean ongoingJoin;
public CHOAM(Parameters params) {
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
@@ -367,9 +370,15 @@ public void stop() {
}
final var c = current.get();
if (c != null) {
- c.complete();
+ try {
+ c.complete();
+ } catch (Throwable e) {
+ }
+ }
+ try {
+ combine.stop();
+ } catch (Throwable e) {
}
- combine.stop();
}
private void accept(HashedCertifiedBlock next) {
@@ -733,6 +742,12 @@ private void reconfigure(Digest hash, Reconfigure reconfigure) {
} else {
current.set(new Client(validators, getViewId()));
}
+ final var oj = ongoingJoin;
+ ongoingJoin = null;
+ if (oj != null) {
+ log.trace("Halting ongoing join on: {}", params.member().getId());
+ oj.set(true);
+ }
log.info("Reconfigured to view: {} committee: {} validators: {} on: {}", new Digest(reconfigure.getId()),
current.get().getClass().getSimpleName(), validators.entrySet()
.stream()
@@ -1263,6 +1278,7 @@ public Blocks fetchViewChain(BlockReplication request, Digest from) {
@Override
public Empty join(SignedViewMember nextView, Digest from) {
+ log.trace("Member: {} joining on: {}", from, params.member().getId());
CHOAM.this.join(nextView, from);
return Empty.getDefaultInstance();
}
@@ -1275,10 +1291,9 @@ public Initial sync(Synchronize request, Digest from) {
/** abstract class to maintain the common state */
private abstract class Administration implements Committee {
- protected final Digest viewId;
- private final GroupIterator servers;
- private final Map validators;
- private volatile JoinState ongoingJoin;
+ protected final Digest viewId;
+ private final GroupIterator servers;
+ private final Map validators;
public Administration(Map validators, Digest viewId) {
this.validators = validators;
@@ -1288,12 +1303,6 @@ public Administration(Map validators, Digest viewId) {
@Override
public void accept(HashedCertifiedBlock hb) {
- final var oj = ongoingJoin;
- ongoingJoin = null;
- if (oj != null) {
- oj.halt.set(true);
- oj.joining.interrupt();
- }
process();
}
@@ -1386,28 +1395,44 @@ private void join(View view) {
if (ongoingJoin != null) {
throw new IllegalStateException("Ongoing join should have been cancelled");
}
- log.info("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
- params.member().getId());
+ log.trace("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
+ params.member().getId());
var servers = new ConcurrentSkipListSet<>(validators.keySet());
var joined = new AtomicInteger();
var halt = new AtomicBoolean(false);
-
- ongoingJoin = new JoinState(halt, Thread.ofVirtual().start(Utils.wrapped(() -> {
- log.error("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
+ ongoingJoin = halt;
+ Thread.ofVirtual().start(Utils.wrapped(() -> {
+ log.trace("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
- while (!halt.get() & joined.get() < view.getMajority()) {
- join(view, servers, joined);
- }
- log.info("Finishing join of: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
- params.member().getId());
- ongoingJoin = null;
- }, log())));
- }
- private void join(View view, Collection servers, AtomicInteger joined) {
+ var scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory());
+ AtomicReference action = new AtomicReference<>();
+ var attempts = new AtomicInteger();
+ action.set(() -> {
+ log.trace("Join attempt: {} halt: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(),
+ halt.get(), joined.get(), view.getMajority(), params.member().getId());
+ if (!halt.get() & joined.get() < view.getMajority()) {
+ join(view, servers, joined);
+ if (joined.get() >= view.getMajority()) {
+ ongoingJoin = null;
+ log.trace("Finished join of: {} diadem: {} joins: {} on: {}", nextViewId.get(),
+ Digest.from(view.getDiadem()), joined.get(), params.member().getId());
+ } else if (!halt.get()) {
+ log.trace("Rescheduling join of: {} diadem: {} joins: {} on: {}", nextViewId.get(),
+ Digest.from(view.getDiadem()), joined.get(), params.member().getId());
+ scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS);
+ }
+ }
+ });
+ scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS);
+ }, log()));
+ }
+ private void join(View view, Collection members, AtomicInteger joined) {
+ var sampled = new ArrayList<>(members);
+ Collections.shuffle(sampled);
log.trace("Joining view: {} diadem: {} servers: {} on: {}", viewId, Digest.from(view.getDiadem()),
- servers.stream().map(Member::getId).toList(), params.member().getId());
+ sampled.stream().map(Member::getId).toList(), params.member().getId());
final var c = next.get();
var inView = ViewMember.newBuilder(c.member)
.setDiadem(view.getDiadem())
@@ -1417,34 +1442,69 @@ private void join(View view, Collection servers, AtomicInteger joined) {
.setVm(inView)
.setSignature(params.member().sign(inView.toByteString()).toSig())
.build();
- var countdown = new CountDownLatch(servers.size());
-
- servers.stream().map(comm::connect).filter(Objects::nonNull).forEach(t -> {
- Thread.ofVirtual().start(Utils.wrapped(() -> {
- try {
- t.join(svm);
- servers.remove(t.getMember());
- joined.incrementAndGet();
- countdown.countDown();
- log.trace("Joined with: {} view: {} diadem: {} on: {}", t.getMember().getId(), viewId,
- Digest.from(view.getDiadem()), params.member().getId());
- } catch (StatusRuntimeException sre) {
- log.trace("Failed join attempt: {} with: {} view: {} diadem: {} on: {}", sre.getStatus(),
- t.getMember().getId(), nextViewId, Digest.from(view.getDiadem()),
- params.member().getId(), sre);
- } catch (Throwable throwable) {
- log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", t.getMember().getId(),
- nextViewId, Digest.from(view.getDiadem()), params.member().getId(), throwable);
- }
- }, log()));
+ var countdown = new CountDownLatch(sampled.size());
+ sampled.stream().map(m -> {
+ var connection = comm.connect(m);
+ log.trace("connect to: {} is: {} on: {}", m.getId(), connection, params.member().getId());
+ return connection;
+ }).map(t -> t == null ? null : join(view, t, svm)).forEach(t -> {
+ if (t == null) {
+ countdown.countDown();
+ } else {
+ t.fs.addListener(() -> {
+ try {
+ t.fs.get();
+ members.remove(t.m);
+ joined.incrementAndGet();
+ log.trace("Joined with: {} view: {} diadem: {} on: {}", t.m.getId(),
+ Digest.from(inView.getId()), Digest.from(view.getDiadem()),
+ params.member().getId());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Failed to join with: {} view: {} diadem: {} on: {}", t.m.getId(), viewId,
+ Digest.from(view.getDiadem()), params.member().getId(), e.getCause());
+ } catch (Throwable e) {
+ log.error("Failed to join with: {} view: {} diadem: {} on: {}", t.m.getId(), viewId,
+ Digest.from(view.getDiadem()), params.member().getId(), e);
+ } finally {
+ countdown.countDown();
+ }
+ }, ImmediateExecutor.INSTANCE);
+ }
});
try {
- countdown.await(2, TimeUnit.SECONDS);
+ countdown.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
+ private Attempt join(View view, Terminal t, SignedViewMember svm) {
+ try {
+ log.trace("Attempting to join with: {} context: {} diadem: {} on: {}", t.getMember().getId(),
+ context().getId(), Digest.from(view.getDiadem()), params.member().getId());
+ return new Attempt(t.getMember(), t.join(svm));
+ } catch (StatusRuntimeException sre) {
+ log.trace("Failed join attempt: {} with: {} view: {} diadem: {} on: {}", sre.getStatus(),
+ t.getMember().getId(), nextViewId, Digest.from(view.getDiadem()), params.member().getId(),
+ sre);
+ } catch (Throwable throwable) {
+ log.error("Failed join attempt with: {} view: {} diadem: {} on: {}", t.getMember().getId(), nextViewId,
+ Digest.from(view.getDiadem()), params.member().getId(), throwable);
+ } finally {
+ try {
+ t.close();
+ } catch (IOException e) {
+ // ignored
+ }
+ }
+ return null;
+ }
+
+ record Attempt(Member m, ListenableFuture fs) {
+ }
+
private record JoinState(AtomicBoolean halt, Thread joining) {
}
}
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
index d922991fb..2e1d5cb11 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
@@ -83,9 +83,6 @@ public Map getSlate() {
public void joined(SignedViewMember viewMember) {
final var mid = Digest.from(viewMember.getVm().getId());
- if (!validate(mid, viewMember)) {
- return;
- }
joins.put(mid, SignedJoin.newBuilder()
.setMember(params().member().getId().toDigeste())
.setJoin(viewMember)
@@ -219,7 +216,7 @@ void join(List joins) {
proposals.put(mid, svm);
}
}
- checkAssembly();
+ transitions.checkAssembly();
}
void newEpoch() {
@@ -237,15 +234,15 @@ private Map assemblyOf(List committee) {
.collect(Collectors.toMap(Member::getId, m -> m));
}
- private void checkAssembly() {
+ private boolean checkAssembly() {
if (selected == null) {
- return;
+ return false;
}
if (proposals.size() == selected.majority) {
transitions.certified();
- } else if (proposals.size() >= selected.majority) {
- transitions.gathered();
+ return true;
}
+ return false;
}
private Parameters params() {
@@ -405,24 +402,40 @@ private class Recon implements Reconfiguration {
@Override
public void certify() {
if (proposals.size() == selected.majority) {
- log.debug("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority,
- nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId());
+ log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority,
+ nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId());
transitions.certified();
} else {
- countdown.set(3);
- log.debug("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority,
- proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId());
+ countdown.set(4);
+ log.info("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority,
+ proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId());
}
}
public void checkAssembly() {
- ViewAssembly.this.checkAssembly();
+ if (ViewAssembly.this.checkAssembly()) {
+ return;
+ }
+ if (proposals.size() >= selected.majority) {
+ transitions.chill();
+ } else {
+ log.info("Check assembly: {} on: {}", proposals.size(), params().member().getId());
+ }
}
public void checkViews() {
vote();
}
+ @Override
+ public void chill() {
+ if (ViewAssembly.this.checkAssembly()) {
+ transitions.certified();
+ } else {
+ countdown.set(2);
+ }
+ }
+
@Override
public void complete() {
ViewAssembly.this.complete();
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java
index 394f4664d..080aea7c8 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java
@@ -6,6 +6,8 @@
*/
package com.salesforce.apollo.choam.comm;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Empty;
import com.salesforce.apollo.archipelago.Link;
import com.salesforce.apollo.choam.proto.*;
@@ -47,8 +49,11 @@ public Member getMember() {
}
@Override
- public Empty join(SignedViewMember join) {
- return service.join(join, member.getId());
+ public ListenableFuture join(SignedViewMember join) {
+ var j = service.join(join, member.getId());
+ SettableFuture sf = SettableFuture.create();
+ sf.set(j);
+ return sf;
}
@Override
@@ -64,7 +69,7 @@ public Initial sync(Synchronize sync) {
Blocks fetchViewChain(BlockReplication replication);
- Empty join(SignedViewMember join);
+ ListenableFuture join(SignedViewMember join);
Initial sync(Synchronize sync);
}
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java
index 26ee81aa0..48fe85f76 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java
@@ -6,6 +6,7 @@
*/
package com.salesforce.apollo.choam.comm;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
@@ -20,12 +21,14 @@ public class TerminalClient implements Terminal {
private final ManagedServerChannel channel;
private final TerminalGrpc.TerminalBlockingStub client;
+ private final TerminalGrpc.TerminalFutureStub asyncClient;
@SuppressWarnings("unused")
private final ChoamMetrics metrics;
public TerminalClient(ManagedServerChannel channel, ChoamMetrics metrics) {
this.channel = channel;
this.client = channel.wrap(TerminalGrpc.newBlockingStub(channel));
+ this.asyncClient = channel.wrap(TerminalGrpc.newFutureStub(channel));
this.metrics = metrics;
}
@@ -60,8 +63,8 @@ public Member getMember() {
}
@Override
- public Empty join(SignedViewMember vm) {
- return client.join(vm);
+ public ListenableFuture join(SignedViewMember vm) {
+ return asyncClient.join(vm);
}
public void release() {
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java
index 01146befd..246daa51a 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java
@@ -19,6 +19,8 @@ public interface Reconfiguration {
void checkViews();
+ void chill();
+
void complete();
void failed();
@@ -60,10 +62,9 @@ public void certify() {
context().certify();
}
}, GATHER {
- // We have a majority of the new committee Joins
@Override
- public Transitions gathered() {
- return CERTIFICATION;
+ public Transitions chill() {
+ return CHILLIN;
}
// We have a full complement of the new committee Joins
@@ -77,6 +78,29 @@ public Transitions certified() {
public void gather() {
context().checkAssembly();
}
+
+ @Override
+ public Transitions checkAssembly() {
+ context().checkAssembly();
+ return null;
+ }
+ }, CHILLIN {
+ @Override
+ public Transitions countdownCompleted() {
+ return certified();
+ }
+
+ // We have what we have
+ @Override
+ public Transitions certified() {
+ return CERTIFICATION;
+ }
+
+ // Check to see if we already have a full complement of committee Joins
+ @Entry
+ public void chillin() {
+ context().chill();
+ }
}, PROTOCOL_FAILURE {
@Override
public Transitions certified() {
@@ -139,6 +163,14 @@ default Transitions certified() {
throw fsm().invalidTransitionOn();
}
+ default Transitions checkAssembly() {
+ throw fsm().invalidTransitionOn();
+ }
+
+ default Transitions chill() {
+ throw fsm().invalidTransitionOn();
+ }
+
default Transitions complete() {
throw fsm().invalidTransitionOn();
}
@@ -151,10 +183,6 @@ default Transitions failed() {
return Reconfigure.PROTOCOL_FAILURE;
}
- default Transitions gathered() {
- throw fsm().invalidTransitionOn();
- }
-
default Transitions proposed() {
throw fsm().invalidTransitionOn();
}
diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java
index ebd508af6..fe29a7e45 100644
--- a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java
+++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java
@@ -3,7 +3,6 @@
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
-import com.salesforce.apollo.archipelago.UnsafeExecutors;
import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DynamicContext;
@@ -64,10 +63,9 @@ public void setUp() throws Exception {
.toList();
final var prefix = UUID.randomUUID().toString();
- var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
routers = members.stream()
.collect(Collectors.toMap(m -> m, m -> new LocalServer(prefix, m).router(
- ServerConnectionCache.newBuilder().setTarget(cardinality * 2), executor)));
+ ServerConnectionCache.newBuilder().setTarget(cardinality * 2))));
var template = Parameters.newBuilder()
.setGenerateGenesis(true)
diff --git a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java
index febe8f415..effd854f5 100644
--- a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java
+++ b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java
@@ -9,7 +9,6 @@
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
-import com.salesforce.apollo.archipelago.UnsafeExecutors;
import com.salesforce.apollo.choam.CHOAM.TransactionExecutor;
import com.salesforce.apollo.choam.Parameters.BootstrapParameters;
import com.salesforce.apollo.choam.Parameters.ProducerParameters;
@@ -171,10 +170,9 @@ public SigningMember initialize(int checkpointBlockSize, int cardinality) throws
SigningMember testSubject = new ControlledIdentifierMember(stereotomy.newIdentifier());
final var prefix = UUID.randomUUID().toString();
- var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
routers = members.stream()
.collect(Collectors.toMap(Member::getId, m -> new LocalServer(prefix, m).router(
- ServerConnectionCache.newBuilder().setTarget(cardinality), executor)));
+ ServerConnectionCache.newBuilder().setTarget(cardinality))));
routers.put(testSubject.getId(), new LocalServer(prefix, testSubject).router(
ServerConnectionCache.newBuilder().setTarget(cardinality)));
choams = new HashMap<>();
diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java
index 66d941748..391706bf2 100644
--- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java
+++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java
@@ -8,7 +8,10 @@
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
-import com.salesforce.apollo.archipelago.*;
+import com.salesforce.apollo.archipelago.LocalServer;
+import com.salesforce.apollo.archipelago.Router;
+import com.salesforce.apollo.archipelago.ServerConnectionCache;
+import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl;
import com.salesforce.apollo.choam.CHOAM.TransactionExecutor;
import com.salesforce.apollo.choam.Parameters.ProducerParameters;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
@@ -121,12 +124,11 @@ public void before() throws Exception {
.toList();
var context = new StaticContext<>(origin, 0.2, members, 3);
final var prefix = UUID.randomUUID().toString();
- var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
routers = members.stream()
.collect(Collectors.toMap(m -> m.getId(), m -> new LocalServer(prefix, m).router(
ServerConnectionCache.newBuilder()
.setMetrics(new ServerConnectionCacheMetricsImpl(registry))
- .setTarget(CARDINALITY), executor)));
+ .setTarget(CARDINALITY))));
choams = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> {
var recording = new AtomicInteger();
blocks.put(m.getId(), recording);
diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml
index 9f1825d70..d5be6a052 100644
--- a/choam/src/test/resources/logback-test.xml
+++ b/choam/src/test/resources/logback-test.xml
@@ -33,7 +33,7 @@
-
+
diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java
index dcfc99b4f..9825e45f6 100644
--- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java
+++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java
@@ -273,7 +273,6 @@ private void initialize() {
AtomicBoolean frist = new AtomicBoolean(true);
final var prefix = UUID.randomUUID().toString();
final var gatewayPrefix = UUID.randomUUID().toString();
- var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
views = members.values().stream().map(node -> {
DynamicContext context = ctxBuilder.build();
FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(),
@@ -289,8 +288,8 @@ private void initialize() {
.setMetrics(
new ServerConnectionCacheMetricsImpl(
frist.getAndSet(false)
- ? node0Registry : registry)),
- executor);
+ ? node0Registry
+ : registry)));
comms.start();
communications.add(comms);
diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java
index b18c32a3e..0e4f93dc6 100644
--- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java
+++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java
@@ -226,7 +226,6 @@ private void initialize() {
AtomicBoolean frist = new AtomicBoolean(true);
final var prefix = UUID.randomUUID().toString();
final var gatewayPrefix = UUID.randomUUID().toString();
- var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
views = members.values().stream().map(node -> {
DynamicContext context = ctxBuilder.build();
FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(),
@@ -242,8 +241,8 @@ private void initialize() {
.setMetrics(
new ServerConnectionCacheMetricsImpl(
frist.getAndSet(false)
- ? node0Registry : registry)),
- executor);
+ ? node0Registry
+ : registry)));
comms.start();
communications.add(comms);
diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java
index 290f5299a..a4b9f149f 100644
--- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java
+++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java
@@ -328,10 +328,10 @@ public static Parameters.Builder newBuilder() {
public static class Builder implements Cloneable {
private int bufferSize = 1500;
private int dedupBufferSize = 100;
- private double dedupFpr = Math.pow(10, -9);
+ private double dedupFpr = Math.pow(10, -6);
private int deliveredCacheSize = 100;
private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT;
- private double falsePositiveRate = 0.00125;
+ private double falsePositiveRate = 0.0000125;
private int maxMessages = 500;
public Parameters build() {
diff --git a/model/pom.xml b/model/pom.xml
index 2db95ab73..3b9acd54a 100644
--- a/model/pom.xml
+++ b/model/pom.xml
@@ -76,13 +76,6 @@
-
- org.apache.maven.plugins
- maven-surefire-plugin
-
- false
-
-
org.apache.maven.plugins
maven-antrun-plugin
diff --git a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java
index 8df39f7d0..6494cbecb 100644
--- a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java
+++ b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java
@@ -6,7 +6,10 @@
*/
package com.salesforce.apollo.model;
-import com.salesforce.apollo.archipelago.*;
+import com.salesforce.apollo.archipelago.EndpointProvider;
+import com.salesforce.apollo.archipelago.LocalServer;
+import com.salesforce.apollo.archipelago.Router;
+import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
@@ -79,11 +82,9 @@ public void before() throws Exception {
var sealed = FoundationSeal.newBuilder().build();
final var group = DigestAlgorithm.DEFAULT.getOrigin();
- var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
identities.forEach((d, id) -> {
final var member = new ControlledIdentifierMember(id);
- var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30),
- executor);
+ var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30));
routers.add(localRouter);
var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID());
var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1),
diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java
index 3cb29dd42..dd5f0503f 100644
--- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java
+++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java
@@ -6,7 +6,10 @@
*/
package com.salesforce.apollo.model;
-import com.salesforce.apollo.archipelago.*;
+import com.salesforce.apollo.archipelago.EndpointProvider;
+import com.salesforce.apollo.archipelago.LocalServer;
+import com.salesforce.apollo.archipelago.Router;
+import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
@@ -235,11 +238,9 @@ public void before() throws Exception {
var sealed = FoundationSeal.newBuilder().build();
final var group = DigestAlgorithm.DEFAULT.getOrigin();
- var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
identities.forEach((d, id) -> {
final var member = new ControlledIdentifierMember(id);
- var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30),
- executor);
+ var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30));
routers.add(localRouter);
var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID());
var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1),
diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java
index c6a48aa41..beb936b33 100644
--- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java
+++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java
@@ -6,7 +6,10 @@
*/
package com.salesforce.apollo.model;
-import com.salesforce.apollo.archipelago.*;
+import com.salesforce.apollo.archipelago.EndpointProvider;
+import com.salesforce.apollo.archipelago.LocalServer;
+import com.salesforce.apollo.archipelago.Router;
+import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
@@ -78,12 +81,10 @@ public void before() throws Exception {
Digest group = DigestAlgorithm.DEFAULT.getOrigin();
var sealed = FoundationSeal.newBuilder().build();
- var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
identities.forEach((digest, id) -> {
var context = new DynamicContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3);
final var member = new ControlledIdentifierMember(id);
- var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30),
- executor);
+ var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30));
var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID());
var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofSeconds(5),
"jdbc:h2:mem:%s-state".formatted(digest),
diff --git a/pom.xml b/pom.xml
index 9e386ff13..416caaf06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -783,7 +783,7 @@
3.2.5
${forks}
- false
+ true
-Xmx10G -Xms100M
-Djdk.tracePinnedThreads=full
diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java
index cc2d78819..028f4b67b 100644
--- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java
+++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java
@@ -9,7 +9,6 @@
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
-import com.salesforce.apollo.archipelago.UnsafeExecutors;
import com.salesforce.apollo.choam.CHOAM;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.BootstrapParameters;
@@ -158,10 +157,8 @@ public void before() throws Exception {
members.stream().filter(s -> s != testSubject).forEach(s -> context.activate(s));
final var prefix = UUID.randomUUID().toString();
- var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> {
- var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30),
- executor);
+ var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30));
return localRouter;
}));
routers.put(testSubject.getId(),
diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java
index e3a526a70..e8cb71761 100644
--- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java
+++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java
@@ -11,7 +11,6 @@
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
-import com.salesforce.apollo.archipelago.UnsafeExecutors;
import com.salesforce.apollo.choam.CHOAM;
import com.salesforce.apollo.choam.CHOAM.TransactionExecutor;
import com.salesforce.apollo.choam.Parameters;
@@ -155,10 +154,8 @@ public void before() throws Exception {
}).map(cpk -> new ControlledIdentifierMember(cpk)).map(e -> (SigningMember) e).toList();
members.forEach(m -> context.activate(m));
final var prefix = UUID.randomUUID().toString();
- var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> {
- var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30),
- executor);
+ var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30));
return localRouter;
}));
choams = members.stream()