diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index f0970605d..c50ed31b1 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -7,6 +7,7 @@ package com.salesforce.apollo.choam; import com.chiralbehaviors.tron.Fsm; +import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.InvalidProtocolBufferException; @@ -35,9 +36,9 @@ import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.MessageAdapter; import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.Msg; import com.salesforce.apollo.messaging.proto.AgedMessageOrBuilder; -import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; import io.grpc.StatusRuntimeException; +import io.netty.util.concurrent.ImmediateExecutor; import org.h2.mvstore.MVMap; import org.joou.ULong; import org.slf4j.Logger; @@ -47,7 +48,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.security.KeyPair; -import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -75,31 +75,31 @@ public class CHOAM { private static final Logger log = LoggerFactory.getLogger(CHOAM.class); - private final Map cachedCheckpoints = new ConcurrentHashMap<>(); - private final AtomicReference checkpoint = new AtomicReference<>(); - private final ReliableBroadcaster combine; - private final CommonCommunications comm; - private final AtomicReference current = new AtomicReference<>(); - private final ExecutorService executions; - private final AtomicReference> futureBootstrap = new AtomicReference<>(); - private final AtomicReference> futureSynchronization = new AtomicReference<>(); - private final AtomicReference genesis = new AtomicReference<>(); - private final AtomicReference head = new AtomicReference<>(); - private final ExecutorService linear; - private final AtomicReference next = new AtomicReference<>(); - private final AtomicReference nextViewId = new AtomicReference<>(); - private final Parameters params; - private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>(); - private final RoundScheduler roundScheduler; - private final Session session; - private final AtomicBoolean started = new AtomicBoolean(); - private final Store store; - private final CommonCommunications submissionComm; - private final Combine.Transitions transitions; - private final TransSubmission txnSubmission = new TransSubmission(); - private final AtomicReference view = new AtomicReference<>(); - private final PendingViews pendingViews = new PendingViews(); - private final AtomicReference> join = new AtomicReference<>(); + private final Map cachedCheckpoints = new ConcurrentHashMap<>(); + private final AtomicReference checkpoint = new AtomicReference<>(); + private final ReliableBroadcaster combine; + private final CommonCommunications comm; + private final AtomicReference current = new AtomicReference<>(); + private final ExecutorService executions; + private final AtomicReference> futureBootstrap = new AtomicReference<>(); + private final AtomicReference> futureSynchronization = new AtomicReference<>(); + private final AtomicReference genesis = new AtomicReference<>(); + private final AtomicReference head = new AtomicReference<>(); + private final ExecutorService linear; + private final AtomicReference next = new AtomicReference<>(); + private final AtomicReference nextViewId = new AtomicReference<>(); + private final Parameters params; + private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>(); + private final RoundScheduler roundScheduler; + private final Session session; + private final AtomicBoolean started = new AtomicBoolean(); + private final Store store; + private final CommonCommunications submissionComm; + private final Combine.Transitions transitions; + private final TransSubmission txnSubmission = new TransSubmission(); + private final AtomicReference view = new AtomicReference<>(); + private final PendingViews pendingViews = new PendingViews(); + private volatile AtomicBoolean ongoingJoin; public CHOAM(Parameters params) { this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build()); @@ -360,13 +360,25 @@ public void stop() { return; } session.cancelAll(); - linear.shutdown(); - executions.shutdown(); + try { + linear.shutdown(); + } catch (Throwable e) { + } + try { + executions.shutdown(); + } catch (Throwable e) { + } final var c = current.get(); if (c != null) { - c.complete(); + try { + c.complete(); + } catch (Throwable e) { + } + } + try { + combine.stop(); + } catch (Throwable e) { } - combine.stop(); } private void accept(HashedCertifiedBlock next) { @@ -705,10 +717,6 @@ private void process() { private void reconfigure(Digest hash, Reconfigure reconfigure) { log.info("Setting next view id: {} on: {}", hash, params.member().getId()); - var j = join.getAndSet(null); - if (j != null) { - j.cancel(true); - } nextViewId.set(hash); var pv = pendingViews.advance(); if (pv != null) { @@ -734,6 +742,12 @@ private void reconfigure(Digest hash, Reconfigure reconfigure) { } else { current.set(new Client(validators, getViewId())); } + final var oj = ongoingJoin; + ongoingJoin = null; + if (oj != null) { + log.trace("Halting ongoing join on: {}", params.member().getId()); + oj.set(true); + } log.info("Reconfigured to view: {} committee: {} validators: {} on: {}", new Digest(reconfigure.getId()), current.get().getClass().getSimpleName(), validators.entrySet() .stream() @@ -1264,6 +1278,7 @@ public Blocks fetchViewChain(BlockReplication request, Digest from) { @Override public Empty join(SignedViewMember nextView, Digest from) { + log.trace("Member: {} joining on: {}", from, params.member().getId()); CHOAM.this.join(nextView, from); return Empty.getDefaultInstance(); } @@ -1276,10 +1291,9 @@ public Initial sync(Synchronize request, Digest from) { /** abstract class to maintain the common state */ private abstract class Administration implements Committee { - protected final Digest viewId; - - private final GroupIterator servers; - private final Map validators; + protected final Digest viewId; + private final GroupIterator servers; + private final Map validators; public Administration(Map validators, Digest viewId) { this.validators = validators; @@ -1378,64 +1392,47 @@ public boolean validate(HashedCertifiedBlock hb) { } private void join(View view) { - var joining = new CompletableFuture(); - if (!join.compareAndSet(null, joining)) { - log.info("Ongoing join of: {} should have been cancelled on: {}", Digest.from(view.getDiadem()), - params.member().getId()); - transitions.fail(); - return; + if (ongoingJoin != null) { + throw new IllegalStateException("Ongoing join should have been cancelled"); } - log.info("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), - params.member().getId()); - var servers = new GroupIterator(validators.keySet()); - var joined = new HashSet(); - - var delay = Duration.ofMillis(Entropy.nextSecureInt(5)); - - Thread.ofPlatform().start(() -> { - log.error("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + log.trace("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + params.member().getId()); + var servers = new ConcurrentSkipListSet<>(validators.keySet()); + var joined = new AtomicInteger(); + var halt = new AtomicBoolean(false); + ongoingJoin = halt; + Thread.ofVirtual().start(Utils.wrapped(() -> { + log.trace("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), params.member().getId()); - while (!joining.isDone() && joined.size() < view.getMajority()) { - try { - Thread.sleep(delay.toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - join(view, servers, joined); - } - log.info("Finishing join of: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), - params.member().getId()); - joining.complete(null); - }); - } - private void join(View view, GroupIterator servers, HashSet joined) { - Member target = servers.next(); - if (joined.contains(target)) { - log.trace("Already joined with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId.get(), - Digest.from(view.getDiadem()), params.member().getId()); - return; - } - try (var link = comm.connect(target)) { - join(view, link, target, joined); - } catch (StatusRuntimeException e) { - log.trace("Failed join attempt with: {} view: {} diadem: {} status:{} on: {}", target.getId(), - nextViewId, Digest.from(view.getDiadem()), e.getStatus(), params.member().getId()); - } catch (Throwable e) { - log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId, - Digest.from(view.getDiadem()), params.member().getId(), e); - } + var scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); + AtomicReference action = new AtomicReference<>(); + var attempts = new AtomicInteger(); + action.set(() -> { + log.trace("Join attempt: {} halt: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(), + halt.get(), joined.get(), view.getMajority(), params.member().getId()); + if (!halt.get() & joined.get() < view.getMajority()) { + join(view, servers, joined); + if (joined.get() >= view.getMajority()) { + ongoingJoin = null; + log.trace("Finished join of: {} diadem: {} joins: {} on: {}", nextViewId.get(), + Digest.from(view.getDiadem()), joined.get(), params.member().getId()); + } else if (!halt.get()) { + log.trace("Rescheduling join of: {} diadem: {} joins: {} on: {}", nextViewId.get(), + Digest.from(view.getDiadem()), joined.get(), params.member().getId()); + scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS); + } + } + }); + scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS); + }, log())); } - private void join(View view, Terminal link, Member target, HashSet joined) { - if (link == null) { - log.debug("No link for: {} for joining: {} on: {}", target.getId(), Digest.from(view.getDiadem()), - params.member().getId()); - return; - } - log.trace("Joining view: {} diadem: {} on: {}", viewId, Digest.from(view.getDiadem()), - params.member().getId()); + private void join(View view, Collection members, AtomicInteger joined) { + var sampled = new ArrayList<>(members); + Collections.shuffle(sampled); + log.trace("Joining view: {} diadem: {} servers: {} on: {}", viewId, Digest.from(view.getDiadem()), + sampled.stream().map(Member::getId).toList(), params.member().getId()); final var c = next.get(); var inView = ViewMember.newBuilder(c.member) .setDiadem(view.getDiadem()) @@ -1445,18 +1442,70 @@ private void join(View view, Terminal link, Member target, HashSet joine .setVm(inView) .setSignature(params.member().sign(inView.toByteString()).toSig()) .build(); + var countdown = new CountDownLatch(sampled.size()); + sampled.stream().map(m -> { + var connection = comm.connect(m); + log.trace("connect to: {} is: {} on: {}", m.getId(), connection, params.member().getId()); + return connection; + }).map(t -> t == null ? null : join(view, t, svm)).forEach(t -> { + if (t == null) { + countdown.countDown(); + } else { + t.fs.addListener(() -> { + try { + t.fs.get(); + members.remove(t.m); + joined.incrementAndGet(); + log.trace("Joined with: {} view: {} diadem: {} on: {}", t.m.getId(), + Digest.from(inView.getId()), Digest.from(view.getDiadem()), + params.member().getId()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Failed to join with: {} view: {} diadem: {} on: {}", t.m.getId(), viewId, + Digest.from(view.getDiadem()), params.member().getId(), e.getCause()); + } catch (Throwable e) { + log.error("Failed to join with: {} view: {} diadem: {} on: {}", t.m.getId(), viewId, + Digest.from(view.getDiadem()), params.member().getId(), e); + } finally { + countdown.countDown(); + } + }, ImmediateExecutor.INSTANCE); + } + }); try { - link.join(svm); - joined.add(target); - log.trace("Joined with: {} view: {} diadem: {} on: {}", target.getId(), viewId, - Digest.from(view.getDiadem()), params.member().getId()); + countdown.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private Attempt join(View view, Terminal t, SignedViewMember svm) { + try { + log.trace("Attempting to join with: {} context: {} diadem: {} on: {}", t.getMember().getId(), + context().getId(), Digest.from(view.getDiadem()), params.member().getId()); + return new Attempt(t.getMember(), t.join(svm)); } catch (StatusRuntimeException sre) { log.trace("Failed join attempt: {} with: {} view: {} diadem: {} on: {}", sre.getStatus(), - target.getId(), nextViewId, Digest.from(view.getDiadem()), params.member().getId(), sre); - } catch (Throwable t) { - log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId, - Digest.from(view.getDiadem()), params.member().getId(), t); + t.getMember().getId(), nextViewId, Digest.from(view.getDiadem()), params.member().getId(), + sre); + } catch (Throwable throwable) { + log.error("Failed join attempt with: {} view: {} diadem: {} on: {}", t.getMember().getId(), nextViewId, + Digest.from(view.getDiadem()), params.member().getId(), throwable); + } finally { + try { + t.close(); + } catch (IOException e) { + // ignored + } } + return null; + } + + record Attempt(Member m, ListenableFuture fs) { + } + + private record JoinState(AtomicBoolean halt, Thread joining) { } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 8db4483ca..19bc5580f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -128,7 +128,7 @@ public boolean complete() { } public void join(SignedViewMember viewMember) { - assembly.join(viewMember, true); + assembly.joined(viewMember); } public void start() { @@ -234,7 +234,7 @@ private Parameters params() { private void processAssemblies(List aggregate) { var aggs = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList(); - log.trace("Consuming {} assemblies from {} units on: {}", aggs.size(), aggregate.size(), + log.trace("Consuming: {} assemblies from: {} units on: {}", aggs.size(), aggregate.size(), params().member().getId()); assembly.assemble(aggs); } @@ -313,14 +313,14 @@ private void publish(PendingBlock p) { } private void publish(PendingBlock p, boolean beacon) { - assert p.witnesses.size() >= params().majority() : "Publishing non majority block"; + assert p.witnesses.size() >= params().majority() : "Attempt to publish non majority block"; var publish = p.published.compareAndSet(false, true); if (!publish && !beacon) { log.trace("Already published: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(), p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId()); return; } - log.trace("Publishing {}pending: {} hash: {} height: {} witnesses: {} on: {}", beacon ? "(beacon) " : "", + log.trace("Publishing {}: {} hash: {} height: {} witnesses: {} on: {}", beacon ? "(beacon) " : "(pending)", p.block.block.getBodyCase(), p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId()); final var cb = CertifiedBlock.newBuilder() @@ -357,21 +357,22 @@ private void reconfigure() { } private void serial(List preblock, Boolean last) { + try { + serialize.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } Thread.ofVirtual().start(() -> { try { - serialize.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - try { - transitions.create(preblock, last); + create(preblock, last); } catch (Throwable t) { log.error("Error processing preblock last: {} on: {}", last, params().member().getId(), t); } finally { serialize.release(); } }); + } private PendingBlock validate(Validate v) { @@ -436,11 +437,6 @@ public void complete() { stop(); } - @Override - public void create(List preblock, boolean last) { - Producer.this.create(preblock, last); - } - @Override public void fail() { stop(); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index 96d5cbd37..2e1d5cb11 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -59,6 +59,7 @@ public class ViewAssembly { private final AtomicInteger countdown = new AtomicInteger(); private final List pendingJoins = new CopyOnWriteArrayList<>(); private final AtomicBoolean started = new AtomicBoolean(false); + private final Map joins = new ConcurrentHashMap<>(); private volatile Vue selected; public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publisher, @@ -80,6 +81,23 @@ public Map getSlate() { return slate; } + public void joined(SignedViewMember viewMember) { + final var mid = Digest.from(viewMember.getVm().getId()); + joins.put(mid, SignedJoin.newBuilder() + .setMember(params().member().getId().toDigeste()) + .setJoin(viewMember) + .setSignature(view.sign(viewMember).toSig()) + .build()); + if (selected != null && joins.size() >= selected.majority) { + publishJoins(); + } else if (selected == null) { + log.trace("Awaiting view selection to publish joins: {} on: {}", joins.size(), params().member().getId()); + } else { + log.trace("Awaiting required majority: {} of: {} to publish joins: {} on: {}", selected.diadem, + selected.majority, joins.size(), params().member().getId()); + } + } + public void start() { if (!started.compareAndSet(false, true)) { return; @@ -95,23 +113,31 @@ void assemble(List asses) { if (asses.isEmpty()) { return; } + var viewz = asses.stream().flatMap(a -> a.getViewsList().stream()).toList(); + var joinz = asses.stream().flatMap(a -> a.getJoinsList().stream()).toList(); + log.debug("Assemblies: {} joins: {} views: {} on: {}", asses.size(), joinz.size(), viewz.size(), + params().member().getId()); - var joins = asses.stream() - .flatMap(a -> a.getJoinsList().stream()) + var joins = joinz.stream() + .filter(SignedJoin::hasJoin) .filter(view -> !proposals.containsKey(Digest.from(view.getJoin().getVm().getId()))) .filter(signedJoin -> !SignedJoin.getDefaultInstance().equals(signedJoin)) .filter(view::validate) .toList(); - var views = asses.stream().flatMap(a -> a.getViewsList().stream()).filter(SignedViews::hasViews).toList(); + if (!joins.isEmpty()) { + log.debug("Assembling joins: {} on: {}", joins.size(), params().member().getId()); + join(joins.stream().map(SignedJoin::getJoin).toList()); + } - log.debug("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId()); + var views = viewz.stream().filter(SignedViews::hasViews).toList(); + if (views.isEmpty()) { + return; + } + log.debug("Assembling views: {} on: {}", views.size(), params().member().getId()); - joins.forEach(sj -> join(sj.getJoin(), false)); if (selected != null) { - if (!views.isEmpty()) { - log.trace("Already selected: {}, ignoring views: {} on: {}", selected.diadem, views.size(), - params().member().getId()); - } + log.trace("Already selected: {}, ignoring views: {} on: {}", selected.diadem, views.size(), + params().member().getId()); return; } views.forEach(svs -> { @@ -170,89 +196,27 @@ boolean complete() { return true; } - void join(SignedViewMember svm, boolean direct) { + void join(List joins) { if (!started.get()) { return; } - - final var mid = Digest.from(svm.getVm().getId()); - if (proposals.containsKey(mid)) { - log.trace("Redundant join from: {} on: {}", print(svm, params().digestAlgorithm()), - params().member().getId()); - return; - } if (selected == null) { - pendingJoins.add(svm); - log.trace("Pending join from: {} on: {}", print(svm, params().digestAlgorithm()), - params().member().getId()); + pendingJoins.addAll(joins); + log.trace("Pending joins: {} on: {}", joins.size(), params().member().getId()); return; } - final var m = selected.assembly.get(mid); - if (m == null) { - if (log.isTraceEnabled()) { - log.trace("Invalid view member: {} on: {}", print(svm, params().digestAlgorithm()), - params().member().getId()); - } - return; - } - var viewId = Digest.from(svm.getVm().getView()); - if (!nextViewId.equals(viewId)) { - if (log.isTraceEnabled()) { - log.trace("Invalid view id for member: {} on: {}", print(svm, params().digestAlgorithm()), - params().member().getId()); - } - return; - } - if (log.isDebugEnabled()) { - log.debug("Join of: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); - } - - if (!m.verify(JohnHancock.from(svm.getSignature()), svm.getVm().toByteString())) { - if (log.isTraceEnabled()) { - log.trace("Invalid signature for view member: {} on: {}", print(svm, params().digestAlgorithm()), + for (var svm : joins) { + final var mid = Digest.from(svm.getVm().getId()); + if (proposals.containsKey(mid)) { + log.trace("Redundant join from: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); + continue; } - return; - } - - PubKey encoded = svm.getVm().getConsensusKey(); - - if (!m.verify(signature(svm.getVm().getSignature()), encoded.toByteString())) { - if (log.isTraceEnabled()) { - log.trace("Could not verify consensus key from view member: {} on: {}", - print(svm, params().digestAlgorithm()), params().member().getId()); - } - return; - } - - PublicKey consensusKey = publicKey(encoded); - if (consensusKey == null) { - if (log.isTraceEnabled()) { - log.trace("Could not deserialize consensus key from view member: {} on: {}", - print(svm, params().digestAlgorithm()), params().member().getId()); + if (validate(mid, svm)) { + proposals.put(mid, svm); } - return; } - - if (direct) { - var signature = view.sign(svm); - publisher.accept(Assemblies.newBuilder() - .addJoins(SignedJoin.newBuilder() - .setJoin(svm) - .setMember(params().member().getId().toDigeste()) - .setSignature(signature.toSig()) - .build()) - .build()); - if (log.isTraceEnabled()) { - log.trace("Publishing view member: {} sig: {} on: {}", print(svm, params().digestAlgorithm()), - params().digestAlgorithm().digest(signature.toSig().toByteString()), - params().member().getId()); - } - } else if (proposals.putIfAbsent(mid, svm) == null) { - log.trace("Adding discovered view member: {} on: {}", print(svm, params().digestAlgorithm()), - params().member().getId()); - } - checkAssembly(); + transitions.checkAssembly(); } void newEpoch() { @@ -270,18 +234,39 @@ private Map assemblyOf(List committee) { .collect(Collectors.toMap(Member::getId, m -> m)); } - private void checkAssembly() { + private boolean checkAssembly() { + if (selected == null) { + return false; + } if (proposals.size() == selected.majority) { transitions.certified(); - } else if (proposals.size() >= selected.majority) { - transitions.gathered(); + return true; } + return false; } private Parameters params() { return view.params(); } + private void propose(Views vs, List majorities, Multiset consensus) { + var ordered = vs.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(); + var lastIndex = -1; + View last = null; + for (var v : majorities) { + var i = ordered.indexOf(Digest.from(v.getDiadem())); + if (i != -1) { + if (i > lastIndex) { + last = v; + lastIndex = i; + } + } + } + if (last != null) { + consensus.add(last); + } + } + private void propose() { var views = view.pendingViews() .getViews(nextViewId) @@ -297,22 +282,62 @@ private void propose() { .build()); } - private void propose(Views vs, List majorities, Multiset consensus) { - var ordered = vs.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(); - var lastIndex = -1; - View last = null; - for (var v : majorities) { - var i = ordered.indexOf(Digest.from(v.getDiadem())); - if (i != -1) { - if (i > lastIndex) { - last = v; - lastIndex = i; - } + private void publishJoins() { + log.trace("Publish joins: {} on: {}", joins.size(), params().member().getId()); + var b = Assemblies.newBuilder(); + joins.values().forEach(b::addJoins); + publisher.accept(b.build()); + } + + private boolean validate(Digest mid, SignedViewMember svm) { + final var m = selected.assembly.get(mid); + if (m == null) { + if (log.isTraceEnabled()) { + log.trace("Invalid view member: {} on: {}", print(svm, params().digestAlgorithm()), + params().member().getId()); } + return false; } - if (last != null) { - consensus.add(last); + var viewId = Digest.from(svm.getVm().getView()); + if (!nextViewId.equals(viewId)) { + if (log.isTraceEnabled()) { + log.trace("Invalid view id for member: {} on: {}", print(svm, params().digestAlgorithm()), + params().member().getId()); + } + return false; + } + if (log.isDebugEnabled()) { + log.debug("Join of: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); + } + + if (!m.verify(JohnHancock.from(svm.getSignature()), svm.getVm().toByteString())) { + if (log.isTraceEnabled()) { + log.trace("Invalid signature for view member: {} on: {}", print(svm, params().digestAlgorithm()), + params().member().getId()); + } + return false; + } + + PubKey encoded = svm.getVm().getConsensusKey(); + + if (!m.verify(signature(svm.getVm().getSignature()), encoded.toByteString())) { + if (log.isTraceEnabled()) { + log.trace("Could not verify consensus key from view member: {} on: {}", + print(svm, params().digestAlgorithm()), params().member().getId()); + } + return false; } + + PublicKey consensusKey = publicKey(encoded); + if (consensusKey == null) { + if (log.isTraceEnabled()) { + log.trace("Could not deserialize consensus key from view member: {} on: {}", + print(svm, params().digestAlgorithm()), params().member().getId()); + } + return false; + } + log.trace("Validated svm: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); + return true; } private void vote() { @@ -359,7 +384,10 @@ private void vote() { } onConsensus.complete(selected); transitions.viewAcquired(); - pendingJoins.forEach(svm -> join(svm, false)); + if (joins.size() >= selected.majority) { + publishJoins(); + } + join(pendingJoins); pendingJoins.clear(); } @@ -374,24 +402,40 @@ private class Recon implements Reconfiguration { @Override public void certify() { if (proposals.size() == selected.majority) { - log.debug("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority, - nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); + log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority, + nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); transitions.certified(); } else { - countdown.set(3); - log.debug("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority, - proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); + countdown.set(4); + log.info("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority, + proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); } } public void checkAssembly() { - ViewAssembly.this.checkAssembly(); + if (ViewAssembly.this.checkAssembly()) { + return; + } + if (proposals.size() >= selected.majority) { + transitions.chill(); + } else { + log.info("Check assembly: {} on: {}", proposals.size(), params().member().getId()); + } } public void checkViews() { vote(); } + @Override + public void chill() { + if (ViewAssembly.this.checkAssembly()) { + transitions.certified(); + } else { + countdown.set(2); + } + } + @Override public void complete() { ViewAssembly.this.complete(); @@ -412,9 +456,5 @@ public void finish() { public void publishViews() { propose(); } - - private Join joinOf(SignedViewMember vm) { - return Join.newBuilder().setMember(vm).build(); - } } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java index c204030db..46a0c9d71 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java @@ -248,19 +248,15 @@ public boolean validate(SignedJoin join) { public boolean validate(SignedViews sv) { Verifier v = verifierOf(sv); if (v == null) { - if (log.isDebugEnabled()) { - log.debug("No verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()), - params.member().getId()); - } + log.debug("No verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()), + params.member().getId()); return false; } var validated = v.verify(JohnHancock.from(sv.getSignature()), sv.getViews().toByteString()); if (!validated) { - if (log.isTraceEnabled()) { - log.trace("Cannot validate views signed by: {} on: {}", Digest.from(sv.getViews().getMember()), - params().member().getId()); - } - } else if (log.isTraceEnabled()) { + log.trace("Cannot validate views signed by: {} on: {}", Digest.from(sv.getViews().getMember()), + params().member().getId()); + } else { log.trace("Validated views signed by: {} on: {}", Digest.from(sv.getViews().getMember()), params().member().getId()); } @@ -282,8 +278,7 @@ protected Verifier verifierOf(SignedViews sv) { private Verifier getVerifier(Member m) { if (m == null) { if (log.isDebugEnabled()) { - log.debug("Unable to get verifier by non existent member: {} on: {}", m.getId(), - params.member().getId()); + log.debug("Unable to get verifier by non existent member on: {}", params.member().getId()); } return null; } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java index 394f4664d..080aea7c8 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java @@ -6,6 +6,8 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.Link; import com.salesforce.apollo.choam.proto.*; @@ -47,8 +49,11 @@ public Member getMember() { } @Override - public Empty join(SignedViewMember join) { - return service.join(join, member.getId()); + public ListenableFuture join(SignedViewMember join) { + var j = service.join(join, member.getId()); + SettableFuture sf = SettableFuture.create(); + sf.set(j); + return sf; } @Override @@ -64,7 +69,7 @@ public Initial sync(Synchronize sync) { Blocks fetchViewChain(BlockReplication replication); - Empty join(SignedViewMember join); + ListenableFuture join(SignedViewMember join); Initial sync(Synchronize sync); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java index 26ee81aa0..48fe85f76 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java @@ -6,6 +6,7 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.ManagedServerChannel; import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications; @@ -20,12 +21,14 @@ public class TerminalClient implements Terminal { private final ManagedServerChannel channel; private final TerminalGrpc.TerminalBlockingStub client; + private final TerminalGrpc.TerminalFutureStub asyncClient; @SuppressWarnings("unused") private final ChoamMetrics metrics; public TerminalClient(ManagedServerChannel channel, ChoamMetrics metrics) { this.channel = channel; this.client = channel.wrap(TerminalGrpc.newBlockingStub(channel)); + this.asyncClient = channel.wrap(TerminalGrpc.newFutureStub(channel)); this.metrics = metrics; } @@ -60,8 +63,8 @@ public Member getMember() { } @Override - public Empty join(SignedViewMember vm) { - return client.join(vm); + public ListenableFuture join(SignedViewMember vm) { + return asyncClient.join(vm); } public void release() { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java index 9b290ed26..2cf86b242 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java @@ -8,12 +8,9 @@ import com.chiralbehaviors.tron.Entry; import com.chiralbehaviors.tron.FsmExecutor; -import com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - /** * Leaf action interface for the Producer FSM * @@ -27,8 +24,6 @@ public interface Driven { void complete(); - void create(List preblock, boolean last); - void fail(); void reconfigure(); @@ -149,11 +144,6 @@ default Transitions checkpointed() { throw fsm().invalidTransitionOn(); } - default Transitions create(List preblock, boolean last) { - context().create(preblock, last); - return null; - } - default Transitions establish() { throw fsm().invalidTransitionOn(); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java index 01146befd..246daa51a 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java @@ -19,6 +19,8 @@ public interface Reconfiguration { void checkViews(); + void chill(); + void complete(); void failed(); @@ -60,10 +62,9 @@ public void certify() { context().certify(); } }, GATHER { - // We have a majority of the new committee Joins @Override - public Transitions gathered() { - return CERTIFICATION; + public Transitions chill() { + return CHILLIN; } // We have a full complement of the new committee Joins @@ -77,6 +78,29 @@ public Transitions certified() { public void gather() { context().checkAssembly(); } + + @Override + public Transitions checkAssembly() { + context().checkAssembly(); + return null; + } + }, CHILLIN { + @Override + public Transitions countdownCompleted() { + return certified(); + } + + // We have what we have + @Override + public Transitions certified() { + return CERTIFICATION; + } + + // Check to see if we already have a full complement of committee Joins + @Entry + public void chillin() { + context().chill(); + } }, PROTOCOL_FAILURE { @Override public Transitions certified() { @@ -139,6 +163,14 @@ default Transitions certified() { throw fsm().invalidTransitionOn(); } + default Transitions checkAssembly() { + throw fsm().invalidTransitionOn(); + } + + default Transitions chill() { + throw fsm().invalidTransitionOn(); + } + default Transitions complete() { throw fsm().invalidTransitionOn(); } @@ -151,10 +183,6 @@ default Transitions failed() { return Reconfigure.PROTOCOL_FAILURE; } - default Transitions gathered() { - throw fsm().invalidTransitionOn(); - } - default Transitions proposed() { throw fsm().invalidTransitionOn(); } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java index d3d583f35..7cb99d6d1 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java @@ -3,6 +3,7 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContext; @@ -25,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -41,6 +43,7 @@ public class DynamicTest { private Map routers; private Map choams; private Map> contexts; + private ExecutorService executor; @BeforeEach public void setUp() throws Exception { @@ -61,21 +64,21 @@ public void setUp() throws Exception { .map(ControlledIdentifierMember::new) .map(e -> (Member) e) .toList(); - + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); final var prefix = UUID.randomUUID().toString(); routers = members.stream() .collect(Collectors.toMap(m -> m, m -> new LocalServer(prefix, m).router( - ServerConnectionCache.newBuilder().setTarget(cardinality * 2)))); + ServerConnectionCache.newBuilder().setTarget(cardinality * 2), executor))); var template = Parameters.newBuilder() .setGenerateGenesis(true) .setBootstrap(Parameters.BootstrapParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .build()) .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .setProducer(Parameters.ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .setBatchInterval(Duration.ofMillis(50)) .setMaxBatchByteSize(1024 * 1024) .setMaxBatchCount(10_000) @@ -212,10 +215,13 @@ public void tearDown() throws Exception { choams = null; } if (routers != null) { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); routers = null; } members = null; + if (executor != null) { + executor.shutdown(); + } } private CHOAM constructCHOAM(SigningMember m, Parameters.Builder params, Context context) { diff --git a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java index 079dbe402..b113b13c8 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java @@ -183,7 +183,7 @@ public Block reconfigure(Map joining, Digest nextViewId, HashedBlo genii.values().forEach(GenesisAssembly::start); complete.await(15, TimeUnit.SECONDS); } finally { - communications.values().forEach(r -> r.close(Duration.ofSeconds(1))); + communications.values().forEach(r -> r.close(Duration.ofSeconds(0))); genii.values().forEach(GenesisAssembly::stop); } } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java index f56f69436..effd854f5 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java @@ -207,7 +207,7 @@ private void shutdown() { choams = null; } if (routers != null) { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); routers = null; } } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index 8f0aff039..64be54d3c 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -8,10 +8,7 @@ import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.MetricRegistry; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; -import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters.ProducerParameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -71,11 +68,12 @@ public class TestCHOAM { private MetricRegistry registry; private Map routers; private ScheduledExecutorService scheduler; + private ExecutorService executor; @AfterEach public void after() throws Exception { if (routers != null) { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); routers = null; } if (choams != null) { @@ -85,13 +83,17 @@ public void after() throws Exception { if (scheduler != null) { scheduler.shutdown(); } + if (executor != null) { + executor.shutdown(); + } members = null; registry = null; } @BeforeEach public void before() throws Exception { - scheduler = Executors.newScheduledThreadPool(10); + scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory()); + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var origin = DigestAlgorithm.DEFAULT.getOrigin(); registry = new MetricRegistry(); var metrics = new ChoamMetricsImpl(origin, registry); @@ -102,15 +104,15 @@ public void before() throws Exception { var params = Parameters.newBuilder() .setGenerateGenesis(true) .setGenesisViewId(origin.prefix(entropy.nextLong())) - .setGossipDuration(Duration.ofMillis(30)) + .setGossipDuration(Duration.ofMillis(20)) .setProducer(ProducerParameters.newBuilder() .setMaxBatchCount(15_000) .setMaxBatchByteSize(200 * 1024 * 1024) - .setGossipDuration(Duration.ofMillis(30)) + .setGossipDuration(Duration.ofMillis(20)) .setBatchInterval(Duration.ofMillis(50)) .setEthereal(Config.newBuilder() - .setNumberOfEpochs(3) - .setEpochLength(7)) + .setNumberOfEpochs(12) + .setEpochLength(15)) .build()) .setCheckpointBlockDelta(3); @@ -128,7 +130,7 @@ public void before() throws Exception { .collect(Collectors.toMap(m -> m.getId(), m -> new LocalServer(prefix, m).router( ServerConnectionCache.newBuilder() .setMetrics(new ServerConnectionCacheMetricsImpl(registry)) - .setTarget(CARDINALITY)))); + .setTarget(CARDINALITY), executor))); choams = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { var recording = new AtomicInteger(); blocks.put(m.getId(), recording); @@ -171,7 +173,7 @@ public void submitMultiplTxn() throws Exception { final var transactioneers = new ArrayList(); final var clientCount = LARGE_TESTS ? 1_500 : 5; - final var max = LARGE_TESTS ? 100 : 10; + final var max = LARGE_TESTS ? 100 : 5; final var countdown = new CountDownLatch(clientCount * choams.size()); choams.values().forEach(c -> { for (int i = 0; i < clientCount; i++) { @@ -197,7 +199,7 @@ public void submitMultiplTxn() throws Exception { .filter(i -> i < max) .count()); } finally { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); choams.values().forEach(e -> e.stop()); System.out.println(); diff --git a/cryptography/src/main/java/com/salesforce/apollo/cryptography/Digest.java b/cryptography/src/main/java/com/salesforce/apollo/cryptography/Digest.java index de61d2eb4..2c1f7761d 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/cryptography/Digest.java +++ b/cryptography/src/main/java/com/salesforce/apollo/cryptography/Digest.java @@ -124,7 +124,7 @@ public int compareTo(Digest id) { return 0; } if (hash.length != id.hash.length) { - throw new IllegalArgumentException("hash length incorrect for algorithm"); + return -1; } for (int i = 0; i < hash.length; i++) { int compare = Long.compareUnsigned(hash[i], id.hash[i]); diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Adder.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Adder.java index 0d040d191..b920398de 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Adder.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Adder.java @@ -600,8 +600,8 @@ private boolean decodeParents(Waiting wp) { * Answer the bloom filter with the commits the receiver has */ private Biff haveCommits() { - var bff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), conf.epochLength() * 2 * conf.nProc() * 2, - conf.fpr()); + var n = conf.epochLength() * conf.nProc() * 4; + var bff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), n, 1.0 / ((double) n * 2.0)); signedCommits.keySet().forEach(d -> bff.add(d)); return bff.toBff(); } @@ -610,8 +610,8 @@ private Biff haveCommits() { * Answer the bloom filter with the prevotes the receiver has */ private Biff havePreVotes() { - var bff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), conf.epochLength() * 2 * conf.nProc() * 2, - conf.fpr()); + var n = conf.epochLength() * conf.nProc() * 4; + var bff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), n, 1.0 / ((double) n * 2)); signedPrevotes.keySet().forEach(d -> bff.add(d)); return bff.toBff(); } @@ -620,8 +620,8 @@ private Biff havePreVotes() { * Answer the bloom filter with the units the receiver has */ private Biff haveUnits() { - var bff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), conf.epochLength() * 2 * conf.nProc() * 2, - conf.fpr()); + var n = conf.epochLength() * conf.nProc() * 4; + var bff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), n, 1.0 / ((double) n * 2)); waiting.keySet().forEach(d -> bff.add(d)); dag.have(bff); return bff.toBff(); diff --git a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java index c6f38106f..7f8443c27 100644 --- a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java +++ b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java @@ -160,7 +160,7 @@ public void unbounded() throws NoSuchAlgorithmException, InterruptedException, I } finally { controllers.forEach(Ethereal::stop); gossipers.forEach(ChRbcGossip::stop); - comms.forEach(e -> e.close(Duration.ofSeconds(1))); + comms.forEach(e -> e.close(Duration.ofSeconds(0))); } final var expected = expectedEpochs * (EPOCH_LENGTH - 1); @@ -287,7 +287,7 @@ private void one(int iteration) controllers.forEach(c -> System.out.println(c.dump())); controllers.forEach(Ethereal::stop); gossipers.forEach(ChRbcGossip::stop); - comms.forEach(e -> e.close(Duration.ofSeconds(1))); + comms.forEach(e -> e.close(Duration.ofSeconds(0))); } final var expected = NUM_EPOCHS * (EPOCH_LENGTH - 1); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java index d209c439b..fab24a1bd 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/ChurnTest.java @@ -29,6 +29,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -52,6 +53,7 @@ public class ChurnTest { private MetricRegistry node0Registry; private MetricRegistry registry; private List views; + private ExecutorService executor; @BeforeAll public static void beforeClass() throws Exception { @@ -74,11 +76,14 @@ public void after() { views.clear(); } - communications.forEach(e -> e.close(Duration.ofSeconds(1))); + communications.forEach(e -> e.close(Duration.ofSeconds(0))); communications.clear(); - gateways.forEach(e -> e.close(Duration.ofSeconds(1))); + gateways.forEach(e -> e.close(Duration.ofSeconds(0))); gateways.clear(); + if (executor != null) { + executor.shutdown(); + } } @Test @@ -210,8 +215,8 @@ public void churn() throws Exception { for (int j = c.size() - 1; j >= c.size() - delta; j--) { final var view = c.get(j); view.stop(); - r.get(j).close(Duration.ofSeconds(1)); - g.get(j).close(Duration.ofSeconds(1)); + r.get(j).close(Duration.ofSeconds(0)); + g.get(j).close(Duration.ofSeconds(0)); removed.add(view.getNode().getId()); } c = c.subList(0, c.size() - delta); @@ -239,7 +244,7 @@ public void churn() throws Exception { } views.forEach(e -> e.stop()); - communications.forEach(e -> e.close(Duration.ofSeconds(1))); + communications.forEach(e -> e.close(Duration.ofSeconds(0))); System.out.println(); @@ -260,6 +265,7 @@ public void churn() throws Exception { } private void initialize() { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var parameters = Parameters.newBuilder().setMaximumTxfr(20).build(); registry = new MetricRegistry(); node0Registry = new MetricRegistry(); @@ -282,14 +288,15 @@ private void initialize() { .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) ? node0Registry - : registry))); + : registry)), + executor); var gateway = new LocalServer(gatewayPrefix, node).router(ServerConnectionCache.newBuilder() .setTarget(200) .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) - ? node0Registry - : registry))); + ? node0Registry : registry)), + executor); comms.start(); communications.add(comms); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java index a6e58f479..1300fe554 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java @@ -83,10 +83,10 @@ public void after() { views.clear(); } - communications.forEach(e -> e.close(Duration.ofSeconds(1))); + communications.forEach(e -> e.close(Duration.ofSeconds(0))); communications.clear(); - gateways.forEach(e -> e.close(Duration.ofSeconds(1))); + gateways.forEach(e -> e.close(Duration.ofSeconds(0))); gateways.clear(); } @@ -209,7 +209,7 @@ private void initialize() { } private void post() { - communications.forEach(e -> e.close(Duration.ofSeconds(1))); + communications.forEach(e -> e.close(Duration.ofSeconds(0))); views.forEach(view -> view.stop()); System.out.println("Node 0 metrics"); ConsoleReporter.forRegistry(node0Registry) diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java index bc805d703..36a616dd3 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/MtlsTest.java @@ -39,6 +39,7 @@ import java.time.Instant; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -127,7 +128,7 @@ public void smoke() throws Exception { builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry)); CertificateWithPrivateKey certWithKey = certs.get(node.getId()); Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router( - builder); + builder, Executors.newVirtualThreadPerTaskExecutor()); communications.add(comms); return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, Verifiers.NONE, comms, parameters, DigestAlgorithm.DEFAULT, metrics); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java index 0e4f93dc6..632cc0613 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java @@ -29,6 +29,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -62,6 +63,7 @@ public class SwarmTest { private MetricRegistry node0Registry; private MetricRegistry registry; private List views; + private ExecutorService executor; @BeforeAll public static void beforeClass() throws Exception { @@ -89,6 +91,9 @@ public void after() { gateways.forEach(e -> e.close(Duration.ofSeconds(1))); gateways.clear(); + if (executor != null) { + executor.shutdown(); + } } @Test @@ -204,6 +209,7 @@ public void swarm() throws Exception { } private void initialize() { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var parameters = Parameters.newBuilder() .setMaxPending(50) .setMaximumTxfr(20) @@ -235,14 +241,15 @@ private void initialize() { .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) ? node0Registry - : registry))); + : registry)), + executor); var gateway = new LocalServer(gatewayPrefix, node).router(ServerConnectionCache.newBuilder() .setTarget(200) .setMetrics( new ServerConnectionCacheMetricsImpl( frist.getAndSet(false) - ? node0Registry - : registry))); + ? node0Registry : registry)), + executor); comms.start(); communications.add(comms); diff --git a/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java b/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java index c84e85eac..5b3f962c4 100644 --- a/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java +++ b/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java @@ -97,8 +97,8 @@ public void clientSmoke() throws Exception { var invitation = gorgoneionClient.apply(Duration.ofSeconds(60)); - gorgonRouter.close(Duration.ofSeconds(1)); - clientRouter.close(Duration.ofSeconds(1)); + gorgonRouter.close(Duration.ofSeconds(0)); + clientRouter.close(Duration.ofSeconds(0)); assertNotNull(invitation); assertNotEquals(Validations.getDefaultInstance(), invitation); @@ -113,10 +113,10 @@ public void clientSmoke() throws Exception { @AfterEach public void closeRouters() { if (gorgonRouter != null) { - gorgonRouter.close(Duration.ofSeconds(3)); + gorgonRouter.close(Duration.ofSeconds(0)); } if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(3)); + clientRouter.close(Duration.ofSeconds(0)); } } diff --git a/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java b/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java index 57386081a..b31f69259 100644 --- a/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java +++ b/gorgoneion/src/test/java/com/salesforce/apollo/gorgoneion/GorgoneionTest.java @@ -109,8 +109,8 @@ public void smokin() throws Exception { .build()) .setNonce(fs) .build(), Duration.ofSeconds(1)); - gorgonRouter.close(Duration.ofSeconds(1)); - clientRouter.close(Duration.ofSeconds(1)); + gorgonRouter.close(Duration.ofSeconds(0)); + clientRouter.close(Duration.ofSeconds(0)); assertNotNull(invitation); assertNotEquals(Validations.getDefaultInstance(), invitation); assertEquals(1, invitation.getValidationsCount()); diff --git a/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java b/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java index 8e2a95111..0bf42a2cc 100644 --- a/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java +++ b/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java @@ -169,7 +169,7 @@ public static void smoke(Oracle oracle) throws Exception { public void after() { domains.forEach(n -> n.stop()); domains.clear(); - routers.values().forEach(r -> r.close(Duration.ofSeconds(1))); + routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); } diff --git a/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java b/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java index 1e5ffce2e..1944a54cc 100644 --- a/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java +++ b/leyden/src/test/java/com/salesforce/apollo/leyden/LeydenJarTest.java @@ -46,7 +46,7 @@ public class LeydenJarTest { @AfterEach public void after() { - routers.values().forEach(r -> r.close(Duration.ofSeconds(2))); + routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); dhts.values().forEach(t -> t.stop()); dhts.clear(); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java index 5ae364d6c..a7a9c6548 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.function.Predicate; @@ -46,7 +46,6 @@ public class Enclave implements RouterSupplier { private final static Class channelType = IMPL.getChannelType(); private static final Logger log = LoggerFactory.getLogger(Enclave.class); - private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); private final DomainSocketAddress bridge; private final Consumer contextRegistration; private final DomainSocketAddress endpoint; @@ -63,10 +62,6 @@ public Enclave(Member from, DomainSocketAddress endpoint, DomainSocketAddress br this.fromString = qb64(from.getId()); } - public void close() { - eventLoopGroup.shutdownGracefully(); - } - /** * @return the DomainSocketAddress for this Enclave */ @@ -77,7 +72,10 @@ public DomainSocketAddress getEndpoint() { @Override public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry, List interceptors, - Predicate validator) { + Predicate validator, ExecutorService executor) { + if (executor == null) { + executor = Executors.newVirtualThreadPerTaskExecutor(); + } var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get()); if (limitsRegistry != null) { limitsBuilder.metricRegistry(limitsRegistry); @@ -132,7 +130,6 @@ public void start(Listener responseListener, Metadata headers) { }; final var builder = NettyChannelBuilder.forAddress(bridge) .withOption(ChannelOption.TCP_NODELAY, true) - .executor(executor) .eventLoopGroup(eventLoopGroup) .channelType(channelType) .usePlaintext() diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java index fcee2a5af..530f0e90a 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java @@ -25,7 +25,8 @@ import java.lang.reflect.Method; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Predicate; import java.util.function.Supplier; @@ -39,7 +40,6 @@ public class LocalServer implements RouterSupplier { private static final Logger log = LoggerFactory.getLogger(LocalServer.class); private static final String NAME_TEMPLATE = "%s-%s"; - private final Executor executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); private final ClientInterceptor clientInterceptor; private final Member from; private final String prefix; @@ -67,18 +67,19 @@ public Member getFrom() { return from; } - @Override public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry, List interceptors, - Predicate validator) { + Predicate validator, ExecutorService executor) { + if (executor == null) { + executor = Executors.newVirtualThreadPerTaskExecutor(); + } String name = String.format(NAME_TEMPLATE, prefix, qb64(from.getId())); var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get()); if (limitsRegistry != null) { limitsBuilder.metricRegistry(limitsRegistry); } ServerBuilder serverBuilder = InProcessServerBuilder.forName(name) - .executor( - UnsafeExecutors.newVirtualThreadPerTaskExecutor()) + .executor(executor) .intercept(ConcurrencyLimitServerInterceptor.newBuilder( limitsBuilder.build()) .statusSupplier( @@ -101,7 +102,6 @@ public Digest getFrom() { private ManagedChannel connectTo(Member to) { final var name = String.format(NAME_TEMPLATE, prefix, qb64(to.getId())); final InProcessChannelBuilder builder = InProcessChannelBuilder.forName(name) - .executor(executor) .usePlaintext() .intercept(clientInterceptor); disableTrash(builder); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java index 58db9ec8e..8f5b2a7be 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java @@ -19,7 +19,6 @@ import io.netty.handler.ssl.ClientAuth; import java.net.SocketAddress; -import java.util.concurrent.Executor; /** * @author hal.hildebrand @@ -29,11 +28,10 @@ public class MtlsClient { private final ManagedChannel channel; public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, ClientContextSupplier supplier, - CertificateValidator validator, Executor executor) { + CertificateValidator validator) { Limiter limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build(); channel = NettyChannelBuilder.forAddress(address) - .executor(executor) .withOption(ChannelOption.TCP_NODELAY, true) .sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3)) .intercept(new ConcurrencyLimitClientInterceptor(limiter, diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java index 0a800fa92..4cf97b97a 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java @@ -43,8 +43,7 @@ import java.security.cert.X509Certificate; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -63,7 +62,6 @@ public class MtlsServer implements RouterSupplier { private final Member from; private final Context.Key sslSessionContext = Context.key("SSLSession"); private final ServerContextSupplier supplier; - private final Executor executor; public MtlsServer(Member from, EndpointProvider epProvider, Function contextSupplier, ServerContextSupplier supplier) { @@ -71,7 +69,6 @@ public MtlsServer(Member from, EndpointProvider epProvider, Function() { @Override public Digest load(X509Certificate key) throws Exception { @@ -142,7 +139,10 @@ public static SslContext forServer(ClientAuth clientAuth, String alias, X509Cert @Override public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry, List interceptors, - Predicate validator) { + Predicate validator, ExecutorService executor) { + if (executor == null) { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); + } var limitsBuilder = new GrpcServerLimiterBuilder().limit(serverLimit.get()); if (limitsRegistry != null) { limitsBuilder.metricRegistry(limitsRegistry); @@ -181,7 +181,7 @@ private ManagedChannel connectTo(Member to) { var address = epProvider.addressFor(to); log.debug("Connecting to: {} address: {} on: {}", to.getId(), address, from.getId()); return new MtlsClient(address, epProvider.getClientAuth(), epProvider.getAlias(), contextSupplier.apply(from), - epProvider.getValidator(), executor).getChannel(); + epProvider.getValidator()).getChannel(); } private X509Certificate getCert() { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java index f0a04955a..4c17fb968 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.time.Duration; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -38,12 +38,12 @@ public class Portal { private final static Class channelType = IMPL.getChannelType(); - private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); - private final String agent; - private final EventLoopGroup eventLoopGroup = IMPL.getEventLoopGroup(); - private final Demultiplexer inbound; - private final Duration keepAlive; - private final Demultiplexer outbound; + private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + private final String agent; + private final EventLoopGroup eventLoopGroup = IMPL.getEventLoopGroup(); + private final Demultiplexer inbound; + private final Duration keepAlive; + private final Demultiplexer outbound; public Portal(Digest agent, ServerBuilder inbound, Function outbound, DomainSocketAddress bridge, Duration keepAlive, Function router) { @@ -63,6 +63,7 @@ public Portal(Digest agent, ServerBuilder inbound, Function contextRegistration; - private final Member from; - private final MutableHandlerRegistry registry = new MutableHandlerRegistry(); - private final Server server; - private final Map> services = new ConcurrentHashMap<>(); - private final AtomicBoolean started = new AtomicBoolean(); - private final Predicate validator; + private final static Logger log = LoggerFactory.getLogger(RouterImpl.class); + + private final ServerConnectionCache cache; + private final ClientIdentity clientIdentityProvider; + private final Consumer contextRegistration; + private final Member from; + private final MutableHandlerRegistry registry = new MutableHandlerRegistry(); + private final Server server; + private final Map> services = new ConcurrentHashMap<>(); + private final AtomicBoolean started = new AtomicBoolean(); + private final Predicate validator; public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, ClientIdentity clientIdentityProvider) { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java index 5a8d71df2..8277eb526 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterSupplier.java @@ -13,6 +13,7 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.*; import java.util.function.Predicate; import java.util.function.Supplier; @@ -20,6 +21,18 @@ * @author hal.hildebrand */ public interface RouterSupplier { + static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory) { + return newCachedThreadPool(corePoolSize, threadFactory, true); + } + + static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory, boolean preStart) { + var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, + new SynchronousQueue(), threadFactory); + if (preStart) { + threadPoolExecutor.prestartAllCoreThreads(); + } + return threadPoolExecutor; + } default Router router() { return router(ServerConnectionCache.newBuilder(), RouterImpl::defaultServerLimit, null); @@ -29,6 +42,10 @@ default Router router(ServerConnectionCache.Builder cacheBuilder) { return router(cacheBuilder, RouterImpl::defaultServerLimit, null); } + default Router router(ServerConnectionCache.Builder cacheBuilder, ExecutorService executor) { + return router(cacheBuilder, RouterImpl::defaultServerLimit, null, Collections.emptyList(), null, executor); + } + default Router router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry) { return router(cacheBuilder, serverLimit, limitsRegistry, Collections.emptyList()); @@ -39,8 +56,14 @@ default Router router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, + LimitsRegistry limitsRegistry, List interceptors, + Predicate validator) { + return router(cacheBuilder, serverLimit, limitsRegistry, interceptors, validator, null); + + } + Router router(ServerConnectionCache.Builder cacheBuilder, Supplier serverLimit, LimitsRegistry limitsRegistry, List interceptors, - Predicate validator); - + Predicate validator, ExecutorService executor); } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java index fca126e8d..d568bd361 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/ServerConnectionCache.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.time.Instant; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; @@ -44,18 +45,19 @@ */ public class ServerConnectionCache { - private final static Logger log = LoggerFactory.getLogger( - ServerConnectionCache.class); - private final Map cache = new HashMap<>(); - private final Clock clock; - private final ServerConnectionFactory factory; - private final ReentrantLock lock = new ReentrantLock(true); - private final ServerConnectionCacheMetrics metrics; - private final Duration minIdle; - private final PriorityQueue queue = new PriorityQueue<>(); - private final int target; - private final Digest member; - private final CallCredentials credentials; + private final static Logger log = LoggerFactory.getLogger(ServerConnectionCache.class); + + private final Map cache = new HashMap<>(); + private final Clock clock; + private final ServerConnectionFactory factory; + private final ReentrantLock lock = new ReentrantLock(true); + private final ServerConnectionCacheMetrics metrics; + private final Duration minIdle; + private final PriorityQueue queue = new PriorityQueue<>(); + private final int target; + private final Digest member; + private final CallCredentials credentials; + private final AtomicBoolean open = new AtomicBoolean(true); public ServerConnectionCache(Digest member, CallCredentials credentials, ServerConnectionFactory factory, int target, Duration minIdle, Clock clock, ServerConnectionCacheMetrics metrics) { @@ -74,6 +76,9 @@ public static Builder newBuilder() { } public ManagedServerChannel borrow(Digest context, Member to) { + if (!open.get()) { + throw new IllegalStateException("not open on: " + member); + } return lock(() -> { if (cache.size() >= target) { log.debug("Cache target open connections exceeded: {}, opening to: {} on: {}", target, to.getId(), @@ -110,15 +115,21 @@ public ManagedServerChannel borrow(Digest context, Member to) { } public T borrow(Digest context, Member to, CreateClientCommunications createFunction) { + if (!open.get()) { + throw new IllegalStateException("not open on: " + member); + } return createFunction.create(borrow(context, to)); } public void close() { + if (!open.compareAndSet(true, false)) { + return; + } lock(() -> { log.info("Closing connection cache on: {}", member); for (ReleasableManagedChannel conn : new ArrayList<>(cache.values())) { try { - conn.channel.shutdownNow(); + conn.channel.shutdown(); if (metrics != null) { metrics.channelOpenDuration().update(Duration.between(conn.created, Instant.now(clock))); metrics.openConnections().dec(); @@ -134,6 +145,9 @@ public void close() { } public void release(ReleasableManagedChannel connection) { + if (!open.get()) { + return; + } lock(() -> { if (connection.decrementBorrow()) { log.debug("Releasing connection to: {} on: {}", connection.member.getId(), member); @@ -150,7 +164,7 @@ public void release(ReleasableManagedChannel connection) { private boolean close(ReleasableManagedChannel connection) { if (connection.isCloseable()) { try { - connection.channel.shutdownNow(); + connection.channel.shutdown(); } catch (Throwable t) { log.debug("Error closing connection to: {} on: {}", connection.member.getId(), connection.member); } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java index 1b0e35cb6..5d4c19057 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java @@ -39,7 +39,10 @@ public class UnsafeExecutors { } public static ExecutorService newVirtualThreadPerTaskExecutor() { - return virtualThreadExecutor(new ForkJoinPool()); + var executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.prestartAllCoreThreads(); + return virtualThreadExecutor(executor); } public static B configureBuilderExecutor(B builder, Executor executor) { @@ -62,6 +65,94 @@ private static void setExecutor(Object builder, Object executor) { } } + public static ThreadPoolExecutor newCachedThreadPool(int corePoolSize) { + return newCachedThreadPool(corePoolSize, true); + } + + public static ThreadPoolExecutor newCachedThreadPool(int corePoolSize, boolean prestart) { + var executorService = newCachedThreadPool(corePoolSize, new ForkJoinPool()); + if (prestart) { + executorService.prestartAllCoreThreads(); + } + return executorService; + } + + public static ThreadPoolExecutor newCachedThreadPool(int corePoolSize, ExecutorService executor) { + ThreadFactory factory = r -> { + var builder = Thread.ofVirtual(); + setExecutor(builder, executor); + return builder.unstarted(r); + }; + return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, + new SynchronousQueue(), factory) { + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit); + } + + @Override + public boolean isShutdown() { + return executor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executor.isTerminated() && super.isTerminated(); + } + + @Override + public void shutdown() { + executor.shutdown(); + super.shutdown(); + } + + @Override + public List shutdownNow() { + var returned = executor.shutdownNow(); + super.shutdownNow(); + return returned; + } + }; + } + + public static ExecutorService newFixedThreadPool(int nThreads, ExecutorService executor) { + ThreadFactory factory = r -> { + var builder = Thread.ofVirtual(); + setExecutor(builder, executor); + return builder.unstarted(r); + }; + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), factory) { + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit); + } + + @Override + public boolean isShutdown() { + return executor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executor.isTerminated() && super.isTerminated(); + } + + @Override + public void shutdown() { + executor.shutdown(); + super.shutdown(); + } + + @Override + public List shutdownNow() { + var returned = executor.shutdownNow(); + super.shutdownNow(); + return returned; + } + }; + } + private static class BTB { private int characteristics; private long counter; diff --git a/memberships/src/main/java/com/salesforce/apollo/context/Context.java b/memberships/src/main/java/com/salesforce/apollo/context/Context.java index b1038e4db..ec82cd7dd 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/Context.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/Context.java @@ -95,15 +95,11 @@ static int minMajority(int bias, double pByz, int cardinality) { Stream allMembers(); /** - * @param start - * @param stop * @return Return all counter-clockwise items between (but not including) start and stop */ Iterable betweenPredecessors(int ring, T start, T stop); /** - * @param start - * @param stop * @return all clockwise items between (but not including) start item and stop item. */ Iterable betweenSuccessor(int ring, T start, T stop); @@ -199,8 +195,6 @@ default int diameter() { T getMember(Digest memberID); /** - * @param i - * @param ring * @return the i'th Member in Ring 0 of the receiver */ T getMember(int i, int ring); @@ -253,10 +247,6 @@ default Digest hashFor(Digest d, int ring) { /** * Answer true if the member is a successor of the supplied digest on any ring - * - * @param m - * @param digest - * @return */ boolean isSuccessorOf(T m, Digest digest); @@ -317,8 +307,6 @@ default int majority() { Iterable predecessors(int ring, Digest location); /** - * @param location - * @param predicate * @return an Iterable of all items counter-clock wise in the ring from (but excluding) start location to (but * excluding) the first item where predicate(item) evaluates to True. */ @@ -327,8 +315,6 @@ default int majority() { Iterable predecessors(int ring, T start); /** - * @param start - * @param predicate * @return an Iterable of all items counter-clock wise in the ring from (but excluding) start location to (but * excluding) the first item where predicate(item) evaluates to True. */ @@ -376,46 +362,34 @@ default int majority() { /** * Stream the members of the ring in hashed order - * - * @param ring - * @return */ Stream stream(int ring); /** - * @param ring - * @param predicate * @return a Stream of all items counter-clock wise in the ring from (but excluding) start location to (but * excluding) the first item where predicate(item) evaluates to True. */ Stream streamPredecessors(int ring, Digest location, Predicate predicate); /** - * @param ring - * @param predicate * @return a list of all items counter-clock wise in the ring from (but excluding) start item to (but excluding) the * first item where predicate(item) evaluates to True. */ Stream streamPredecessors(int ring, T m, Predicate predicate); /** - * @param ring - * @param predicate * @return a Stream of all items counter-clock wise in the ring from (but excluding) start location to (but * excluding) the first item where predicate(item) evaluates to True. */ Stream streamSuccessors(int ring, Digest location, Predicate predicate); /** - * @param ring - * @param predicate * @return a Stream of all items counter-clock wise in the ring from (but excluding) start item to (but excluding) * the first item where predicate(item) evaluates to True. */ Stream streamSuccessors(int ring, T m, Predicate predicate); /** - * @param ring * @return a iterable of all items counter-clock wise in the ring from (but excluding) start location to (but * excluding) the first item */ @@ -490,16 +464,12 @@ default List> successors(Digest digest, T ignore } /** - * @param ring - * @param predicate * @return an Iterable of all items counter-clock wise in the ring from (but excluding) start location to (but * excluding) the first item where predicate(item) evaluates to True. */ Iterable successors(int ring, Digest location, Predicate predicate); /** - * @param ring - * @param predicate * @return an Iterable of all items counter-clock wise in the ring from (but excluding) start item to (but * excluding) the first item where predicate(item) evaluates to True. */ @@ -522,8 +492,7 @@ default int toleranceLevel() { } /** - * @param member - * @return the iteratator to traverse the ring starting at the member + * @return the iterator to traverse the ring starting at the member */ Iterable traverse(int ring, T member); diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index 8de6acb5d..a4b9f149f 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -77,6 +77,7 @@ public ReliableBroadcaster(Context context, SigningMember member, Parame r -> new RbcServer(communications.getClientIdentityProvider(), metrics, r), getCreate(metrics), ReliableBroadcast.getLocalLoopback(member)); gossiper = new RingCommunications<>(context, member, this.comm); + gossiper.ignoreSelf(); this.adapter = adapter; } @@ -230,18 +231,18 @@ private Reconcile gossipRound(ReliableBroadcast link, int ring) { if (!started.get()) { return null; } - log.trace("rbc gossiping[{}] with: {} ring: {} on: {}", buffer.round(), member.getId(), + log.trace("rbc gossiping[{}:{}] with: {} ring: {} on: {}", context.getId(), buffer.round(), link.getMember().getId(), ring, member.getId()); try { return link.gossip( MessageBff.newBuilder().setRing(ring).setDigests(buffer.forReconcilliation().toBff()).build()); } catch (StatusRuntimeException sre) { - log.trace("rbc gossiping[{}] failed: {} with: {} ring: {} on: {}", buffer.round(), sre.getStatus(), - link.getMember().getId(), ring, member.getId()); + log.trace("rbc gossiping[{}:{}] failed: {} with: {} ring: {} on: {}", context.getId(), buffer.round(), + sre.getStatus(), link.getMember().getId(), ring, member.getId()); return null; } catch (Throwable e) { - log.trace("rbc gossiping[{}] failed with: {} ring: {} on: {}", buffer.round(), link.getMember().getId(), - ring, member.getId(), e); + log.trace("rbc gossiping[{}:{}] failed with: {} ring: {} on: {}", context.getId(), buffer.round(), + link.getMember().getId(), ring, member.getId(), e); return null; } } @@ -327,10 +328,10 @@ public static Parameters.Builder newBuilder() { public static class Builder implements Cloneable { private int bufferSize = 1500; private int dedupBufferSize = 100; - private double dedupFpr = Math.pow(10, -9); + private double dedupFpr = Math.pow(10, -6); private int deliveredCacheSize = 100; private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT; - private double falsePositiveRate = 0.00125; + private double falsePositiveRate = 0.0000125; private int maxMessages = 500; public Parameters build() { diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java index 137db0a44..ba283f25f 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java @@ -32,13 +32,13 @@ public class SliceIterator { private static final Logger log = LoggerFactory.getLogger(SliceIterator.class); - private final CommonCommunications comm; - private final String label; - private final SigningMember member; - private final List slice; - private final ScheduledExecutorService scheduler; - private Member current; - private Iterator currentIteration; + private final CommonCommunications comm; + private final String label; + private final SigningMember member; + private final List slice; + private final ScheduledExecutorService scheduler; + private volatile Member current; + private volatile Iterator currentIteration; public SliceIterator(String label, SigningMember member, Collection slice, CommonCommunications comm) { diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java index 55e8c78f2..85ce08693 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java @@ -142,9 +142,9 @@ public void smokin() throws Exception { msg = resultB.unpack(ByteMessage.class); assertEquals("Hello Server B", msg.getContents().toStringUtf8()); - portal.close(Duration.ofSeconds(1)); - router1.close(Duration.ofSeconds(1)); - router2.close(Duration.ofSeconds(1)); + portal.close(Duration.ofSeconds(0)); + router1.close(Duration.ofSeconds(0)); + router2.close(Duration.ofSeconds(0)); } private ManagedChannel handler(DomainSocketAddress address) { diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/FernetTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/FernetTest.java index 615a4cf43..612a34e13 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipelago/FernetTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/FernetTest.java @@ -115,8 +115,8 @@ public void smokin() throws Exception { assertNotNull(resultB); assertEquals("Hello Server A", resultB.unpack(ByteMessage.class).getContents().toStringUtf8()); - routerA.close(Duration.ofSeconds(1)); - routerB.close(Duration.ofSeconds(1)); + routerA.close(Duration.ofSeconds(0)); + routerB.close(Duration.ofSeconds(0)); } public interface TestIt { diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/LocalServerTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/LocalServerTest.java index 3099ac332..3c7cf715d 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipelago/LocalServerTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/LocalServerTest.java @@ -84,8 +84,8 @@ public void smokin() throws Exception { assertNotNull(resultB); assertEquals("Hello Server A", resultB.unpack(ByteMessage.class).getContents().toStringUtf8()); - routerA.close(Duration.ofSeconds(1)); - routerB.close(Duration.ofSeconds(1)); + routerA.close(Duration.ofSeconds(0)); + routerB.close(Duration.ofSeconds(0)); } public interface TestIt { diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/RouterTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/RouterTest.java index 919c8aaaa..884142ded 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipelago/RouterTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/RouterTest.java @@ -83,7 +83,7 @@ public Any ping(Any request) { msg = resultB.unpack(ByteMessage.class); assertEquals("Hello Server B", msg.getContents().toStringUtf8()); - router.close(Duration.ofSeconds(1)); + router.close(Duration.ofSeconds(0)); } public interface TestIt { diff --git a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java index c7cf320ce..b9fc84dfb 100644 --- a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java @@ -70,7 +70,7 @@ public void after() { if (messengers != null) { messengers.forEach(e -> e.stop()); } - communications.forEach(e -> e.close(Duration.ofMillis(1))); + communications.forEach(e -> e.close(Duration.ofMillis(0))); } @Test @@ -140,7 +140,7 @@ public void broadcast() throws Exception { receiver.reset(); } } - communications.forEach(e -> e.close(Duration.ofMillis(1))); + communications.forEach(e -> e.close(Duration.ofMillis(0))); System.out.println(); diff --git a/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java b/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java index 3c2f6b81a..fc0ac9be5 100644 --- a/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/ring/RingCommunicationsTest.java @@ -95,7 +95,7 @@ public Any ping(Any request) { assertFalse(pinged1.get()); assertTrue(pinged2.get()); } finally { - router.close(Duration.ofSeconds(5)); + router.close(Duration.ofSeconds(0)); } } } diff --git a/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java b/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java index bf9d9bf67..00db46be2 100644 --- a/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/ring/RingIteratorTest.java @@ -105,7 +105,7 @@ public Any ping(Any request) { assertFalse(pinged1.get()); assertTrue(pinged2.get()); } finally { - router.close(Duration.ofSeconds(2)); + router.close(Duration.ofSeconds(0)); } } } diff --git a/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java b/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java index 500fd165f..b0abd0790 100644 --- a/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java @@ -103,7 +103,7 @@ public Any ping(Any request) { assertTrue(pinged1.get()); assertTrue(pinged2.get()); } finally { - router.close(Duration.ofSeconds(2)); + router.close(Duration.ofSeconds(0)); } } } diff --git a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java index c316dd9d7..641c211c8 100644 --- a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java @@ -6,10 +6,7 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.EndpointProvider; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -36,6 +33,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -50,18 +48,22 @@ public class ContainmentDomainTest { "Give me food or give me slack or kill me".getBytes()); private final ArrayList domains = new ArrayList<>(); private final ArrayList routers = new ArrayList<>(); + private ExecutorService executor; @AfterEach public void after() { domains.forEach(Domain::stop); domains.clear(); - routers.forEach(r -> r.close(Duration.ofSeconds(100))); + routers.forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); + if (executor != null) { + executor.shutdown(); + } } @BeforeEach public void before() throws Exception { - + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); final var commsDirectory = Path.of("target/comms"); commsDirectory.toFile().mkdirs(); @@ -84,7 +86,8 @@ public void before() throws Exception { final var group = DigestAlgorithm.DEFAULT.getOrigin(); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); routers.add(localRouter); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1), @@ -123,18 +126,18 @@ private Builder params() { .setGenerateGenesis(true) .setGenesisViewId(GENESIS_VIEW_ID) .setBootstrap(Parameters.BootstrapParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .build()) .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .setProducer(Parameters.ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .setBatchInterval(Duration.ofMillis(50)) .setMaxBatchByteSize(1024 * 1024) .setMaxBatchCount(10_000) .setEthereal(Config.newBuilder() - .setNumberOfEpochs(3) - .setEpochLength(7)) + .setNumberOfEpochs(12) + .setEpochLength(33)) .build()) .setCheckpointBlockDelta(200) .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index 9b915ed9a..122956188 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -6,10 +6,7 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.EndpointProvider; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -38,6 +35,7 @@ import java.util.Arrays; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -55,6 +53,7 @@ public class DomainTest { "Give me food or give me slack or kill me".getBytes()); private final ArrayList domains = new ArrayList<>(); private final ArrayList routers = new ArrayList<>(); + private ExecutorService executor; public static void smoke(Oracle oracle) throws Exception { // Namespace @@ -215,12 +214,16 @@ public static void smoke(Oracle oracle) throws Exception { public void after() { domains.forEach(Domain::stop); domains.clear(); - routers.forEach(r -> r.close(Duration.ofSeconds(1))); + routers.forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); + if (executor != null) { + executor.shutdown(); + } } @BeforeEach public void before() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var ffParams = com.salesforce.apollo.fireflies.Parameters.newBuilder(); var entropy = SecureRandom.getInstance("SHA1PRNG"); entropy.setSeed(new byte[] { 6, 6, 6 }); @@ -240,7 +243,8 @@ public void before() throws Exception { final var group = DigestAlgorithm.DEFAULT.getOrigin(); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); routers.add(localRouter); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1), @@ -280,18 +284,18 @@ private Builder params() { .setGenerateGenesis(true) .setGenesisViewId(GENESIS_VIEW_ID) .setBootstrap(Parameters.BootstrapParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .build()) .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .setProducer(Parameters.ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .setBatchInterval(Duration.ofMillis(50)) .setMaxBatchByteSize(1024 * 1024) .setMaxBatchCount(10_000) .setEthereal(Config.newBuilder() - .setNumberOfEpochs(3) - .setEpochLength(7)) + .setNumberOfEpochs(12) + .setEpochLength(33)) .build()) .setCheckpointBlockDelta(200) .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index cb9b466d1..867d8d4d0 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -6,10 +6,7 @@ */ package com.salesforce.apollo.model; -import com.salesforce.apollo.archipelago.EndpointProvider; -import com.salesforce.apollo.archipelago.LocalServer; -import com.salesforce.apollo.archipelago.Router; -import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.*; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; @@ -37,6 +34,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -55,17 +53,22 @@ public class FireFliesTest { private final List domains = new ArrayList<>(); private final Map routers = new HashMap<>(); + private ExecutorService executor; @AfterEach public void after() { domains.forEach(n -> n.stop()); domains.clear(); - routers.values().forEach(r -> r.close(Duration.ofSeconds(1))); + routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); + if (executor != null) { + executor.shutdown(); + } } @BeforeEach public void before() throws Exception { + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); var ffParams = com.salesforce.apollo.fireflies.Parameters.newBuilder(); var entropy = SecureRandom.getInstance("SHA1PRNG"); entropy.setSeed(new byte[] { 6, 6, 6 }); @@ -84,7 +87,8 @@ public void before() throws Exception { identities.forEach((digest, id) -> { var context = new DynamicContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3); final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID()); var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofSeconds(5), "jdbc:h2:mem:%s-state".formatted(digest), @@ -175,18 +179,18 @@ private Builder params() { .setGenerateGenesis(true) .setGenesisViewId(GENESIS_VIEW_ID) .setBootstrap(Parameters.BootstrapParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .build()) .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .setProducer(Parameters.ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(50)) + .setGossipDuration(Duration.ofMillis(20)) .setBatchInterval(Duration.ofMillis(50)) .setMaxBatchByteSize(1024 * 1024) .setMaxBatchCount(10_000) .setEthereal(Config.newBuilder() - .setNumberOfEpochs(3) - .setEpochLength(7)) + .setNumberOfEpochs(12) + .setEpochLength(33)) .build()) .setCheckpointBlockDelta(200) .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() diff --git a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java index 877e7661b..202d4c4a6 100644 --- a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java @@ -186,9 +186,9 @@ public void portal() throws Exception { msg = resultB.unpack(ByteMessage.class); assertEquals("Hello Server B", msg.getContents().toStringUtf8()); - portal.close(Duration.ofSeconds(1)); - router1.close(Duration.ofSeconds(1)); - router2.close(Duration.ofSeconds(1)); + portal.close(Duration.ofSeconds(0)); + router1.close(Duration.ofSeconds(0)); + router2.close(Duration.ofSeconds(0)); } @Test diff --git a/pom.xml b/pom.xml index c4879bd2b..da01491d9 100644 --- a/pom.xml +++ b/pom.xml @@ -783,9 +783,7 @@ 3.2.5 ${forks} - true - -Xmx10G -Xms4G - -Djdk.tracePinnedThreads=full + false diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index 3c8802818..2ababf109 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -9,6 +9,7 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.CHOAM; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.BootstrapParameters; @@ -41,10 +42,7 @@ import java.sql.Statement; import java.time.Duration; import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -75,6 +73,7 @@ abstract public class AbstractLifecycleTest { // } private final List GENESIS_DATA; private final Map parameters = new HashMap<>(); + protected ExecutorService executor; protected SecureRandom entropy; protected CountDownLatch checkpointOccurred; protected Map choams; @@ -111,7 +110,7 @@ private static Txn initialInsert() { @AfterEach public void after() throws Exception { if (routers != null) { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); routers = null; } if (choams != null) { @@ -122,6 +121,9 @@ public void after() throws Exception { scheduler.shutdownNow(); scheduler = null; } + if (executor != null) { + executor.shutdown(); + } updaters.values().forEach(up -> up.close()); updaters.clear(); parameters.clear(); @@ -132,7 +134,8 @@ public void after() throws Exception { @BeforeEach public void before() throws Exception { - scheduler = Executors.newScheduledThreadPool(10); + scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory()); + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); checkpointOccurred = new CountDownLatch(CARDINALITY); checkpointDirBase = new File("target/ct-chkpoints-" + Entropy.nextBitsStreamLong()); Utils.clean(checkpointDirBase); @@ -209,7 +212,7 @@ protected void post() throws Exception { .toList()); choams.values().forEach(e -> e.stop()); - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); final ULong target = updaters.values() .stream() .map(ssm -> ssm.getCurrentBlock()) diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java index 433ef9acb..1ec5449d1 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java @@ -11,6 +11,7 @@ import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.archipelago.UnsafeExecutors; import com.salesforce.apollo.choam.CHOAM; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters; @@ -82,6 +83,7 @@ public class CHOAMTest { private MetricRegistry registry; private Map routers; private ScheduledExecutorService scheduler; + private ExecutorService executor; private static Txn initialInsert() { return Txn.newBuilder() @@ -96,7 +98,7 @@ private static Txn initialInsert() { @AfterEach public void after() throws Exception { if (routers != null) { - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); routers = null; } if (choams != null) { @@ -107,6 +109,9 @@ public void after() throws Exception { scheduler.shutdownNow(); scheduler = null; } + if (executor != null) { + executor.shutdown(); + } updaters.values().forEach(up -> up.close()); updaters.clear(); members = null; @@ -122,7 +127,8 @@ public void after() throws Exception { @BeforeEach public void before() throws Exception { - scheduler = Executors.newScheduledThreadPool(10); + scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory()); + executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); registry = new MetricRegistry(); checkpointDirBase = new File("target/ct-chkpoints-" + Entropy.nextBitsStreamLong()); Utils.clean(checkpointDirBase); @@ -155,7 +161,8 @@ public void before() throws Exception { members.forEach(m -> context.activate(m)); final var prefix = UUID.randomUUID().toString(); routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> { - var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30), + executor); return localRouter; })); choams = members.stream() @@ -230,7 +237,7 @@ public void submitMultiplTxn() throws Exception { .toList()); } finally { choams.values().forEach(e -> e.stop()); - routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers.values().forEach(e -> e.close(Duration.ofSeconds(0))); System.out.println("Final block height: " + members.stream() .map(m -> updaters.get(m)) diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java index 3a0926d20..18233c2bf 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestBinder.java @@ -42,11 +42,11 @@ public class TestBinder { @AfterEach public void after() { if (serverRouter != null) { - serverRouter.close(Duration.ofMillis(1)); + serverRouter.close(Duration.ofMillis(0)); serverRouter = null; } if (clientRouter != null) { - clientRouter.close(Duration.ofMillis(1)); + clientRouter.close(Duration.ofMillis(0)); clientRouter = null; } } diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java index 9ba52e7b9..eab2a64a1 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventObserver.java @@ -44,11 +44,11 @@ public class TestEventObserver { @AfterEach public void after() { if (serverRouter != null) { - serverRouter.close(Duration.ofSeconds(1)); + serverRouter.close(Duration.ofSeconds(0)); serverRouter = null; } if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(1)); + clientRouter.close(Duration.ofSeconds(0)); clientRouter = null; } } diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java index 0ac4b5ac1..3b69cf798 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestEventValidation.java @@ -39,11 +39,11 @@ public class TestEventValidation { @AfterEach public void after() { if (serverRouter != null) { - serverRouter.close(Duration.ofSeconds(1)); + serverRouter.close(Duration.ofSeconds(0)); serverRouter = null; } if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(1)); + clientRouter.close(Duration.ofSeconds(0)); clientRouter = null; } } diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestKerlService.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestKerlService.java index 1c4721d54..e7d81f368 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestKerlService.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestKerlService.java @@ -51,11 +51,11 @@ public class TestKerlService { @AfterEach public void after() { if (serverRouter != null) { - serverRouter.close(Duration.ofSeconds(1)); + serverRouter.close(Duration.ofSeconds(0)); serverRouter = null; } if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(1)); + clientRouter.close(Duration.ofSeconds(0)); clientRouter = null; } } diff --git a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestResolver.java b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestResolver.java index 19da10932..705e23e77 100644 --- a/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestResolver.java +++ b/stereotomy-services/src/test/java/com/salesforce/apollo/stereotomy/services/grpc/TestResolver.java @@ -40,11 +40,11 @@ public class TestResolver { @AfterEach public void after() { if (serverRouter != null) { - serverRouter.close(Duration.ofSeconds(1)); + serverRouter.close(Duration.ofSeconds(0)); serverRouter = null; } if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(1)); + clientRouter.close(Duration.ofSeconds(0)); clientRouter = null; } } diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java index fd22fde20..a761af4d6 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java @@ -100,7 +100,7 @@ public static RotationEvent rotation(KeyPair prevNext, final Digest prevDigest, @AfterEach public void after() { - routers.values().forEach(r -> r.close(Duration.ofSeconds(2))); + routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); dhts.values().forEach(t -> t.stop()); dhts.clear(); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java index 208189545..e1021f3be 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java @@ -50,7 +50,7 @@ public class BootstrappingTest extends AbstractDhtTest { @AfterEach public void closeClient() throws Exception { if (clientRouter != null) { - clientRouter.close(Duration.ofSeconds(3)); + clientRouter.close(Duration.ofSeconds(0)); } } diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java index 71fb3662a..62fa3dbaf 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java @@ -54,7 +54,7 @@ public class DhtRebalanceTest { @AfterEach public void afterIt() throws Exception { - routers.values().forEach(r -> r.close(Duration.ofSeconds(1))); + routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); dhts.clear(); contexts.clear(); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/PublisherTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/PublisherTest.java index 485fe03ac..f74821219 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/PublisherTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/PublisherTest.java @@ -79,8 +79,8 @@ public void smokin() throws Exception { client.publish(KERL_.getDefaultInstance(), Collections.emptyList()); client.publishEvents(Collections.emptyList(), Collections.emptyList()); } finally { - clientRouter.close(Duration.ofSeconds(1)); - serverRouter.close(Duration.ofSeconds(1)); + clientRouter.close(Duration.ofSeconds(0)); + serverRouter.close(Duration.ofSeconds(0)); } }