diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index f90ad6ca0..f65003a97 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -76,31 +76,31 @@ public class CHOAM { private static final Logger log = LoggerFactory.getLogger(CHOAM.class); - private final Map cachedCheckpoints = new ConcurrentHashMap<>(); - private final AtomicReference checkpoint = new AtomicReference<>(); - private final ReliableBroadcaster combine; - private final CommonCommunications comm; - private final AtomicReference current = new AtomicReference<>(); - private final AtomicReference> futureBootstrap = new AtomicReference<>(); - private final AtomicReference> futureSynchronization = new AtomicReference<>(); - private final AtomicReference genesis = new AtomicReference<>(); - private final AtomicReference head = new AtomicReference<>(); - private final AtomicReference next = new AtomicReference<>(); - private final AtomicReference nextViewId = new AtomicReference<>(); - private final Parameters params; - private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>(); - private final RoundScheduler roundScheduler; - private final Session session; - private final AtomicBoolean started = new AtomicBoolean(); - private final Store store; - private final CommonCommunications submissionComm; - private final Combine.Transitions transitions; - private final TransSubmission txnSubmission = new TransSubmission(); - private final AtomicReference view = new AtomicReference<>(); - private final PendingViews pendingViews = new PendingViews(); - private final ScheduledExecutorService scheduler; - private final Semaphore linear = new Semaphore(1); - private volatile AtomicBoolean ongoingJoin; + private final Map cachedCheckpoints = new ConcurrentHashMap<>(); + private final AtomicReference checkpoint = new AtomicReference<>(); + private final ReliableBroadcaster combine; + private final CommonCommunications comm; + private final AtomicReference current = new AtomicReference<>(); + private final AtomicReference> futureBootstrap = new AtomicReference<>(); + private final AtomicReference> futureSynchronization = new AtomicReference<>(); + private final AtomicReference genesis = new AtomicReference<>(); + private final AtomicReference head = new AtomicReference<>(); + private final AtomicReference next = new AtomicReference<>(); + private final AtomicReference nextViewId = new AtomicReference<>(); + private final Parameters params; + private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>(); + private final RoundScheduler roundScheduler; + private final Session session; + private final AtomicBoolean started = new AtomicBoolean(); + private final Store store; + private final CommonCommunications submissionComm; + private final Combine.Transitions transitions; + private final TransSubmission txnSubmission = new TransSubmission(); + private final AtomicReference view = new AtomicReference<>(); + private final PendingViews pendingViews = new PendingViews(); + private final ScheduledExecutorService scheduler; + private final Semaphore linear = new Semaphore(1); + private final AtomicBoolean ongoingJoin = new AtomicBoolean(); public CHOAM(Parameters params) { scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); @@ -734,11 +734,8 @@ private void reconfigure(Digest hash, Reconfigure reconfigure) { } else { current.set(new Client(validators, getViewId())); } - final var oj = ongoingJoin; - ongoingJoin = null; - if (oj != null) { + if (ongoingJoin.compareAndSet(true, false)) { log.trace("Halting ongoing join on: {}", params.member().getId()); - oj.set(true); } log.info("Reconfigured to view: {} committee: {} validators: {} on: {}", new Digest(reconfigure.getId()), current.get().getClass().getSimpleName(), validators.entrySet() @@ -1402,30 +1399,28 @@ public boolean validate(HashedCertifiedBlock hb) { } private void join(View view) { - if (ongoingJoin != null) { + if (!ongoingJoin.compareAndSet(false, true)) { throw new IllegalStateException("Ongoing join should have been cancelled"); } log.trace("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), params.member().getId()); var servers = new ConcurrentSkipListSet<>(validators.keySet()); var joined = new AtomicInteger(); - var halt = new AtomicBoolean(false); - ongoingJoin = halt; log.trace("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), params.member().getId()); var scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); AtomicReference action = new AtomicReference<>(); var attempts = new AtomicInteger(); action.set(() -> { - log.trace("Join attempt: {} halt: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(), - halt.get(), joined.get(), view.getMajority(), params.member().getId()); - if (!halt.get() & joined.get() < view.getMajority()) { + log.trace("Join attempt: {} ongoing: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(), + ongoingJoin.get(), joined.get(), view.getMajority(), params.member().getId()); + if (ongoingJoin.get() & joined.get() < view.getMajority()) { join(view, servers, joined); if (joined.get() >= view.getMajority()) { - ongoingJoin = null; + ongoingJoin.set(false); log.trace("Finished join of: {} diadem: {} joins: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), joined.get(), params.member().getId()); - } else if (!halt.get()) { + } else if (ongoingJoin.get()) { log.trace("Rescheduling join of: {} diadem: {} joins: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), joined.get(), params.member().getId()); scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS); @@ -1450,7 +1445,7 @@ private void join(View view, Collection members, AtomicInteger joined) { .setSignature(params.member().sign(inView.toByteString()).toSig()) .build(); var countdown = new CountDownLatch(sampled.size()); - sampled.stream().map(m -> { + sampled.parallelStream().map(m -> { var connection = comm.connect(m); log.trace("connect to: {} is: {} on: {}", m.getId(), connection, params.member().getId()); return connection; diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java index 67ea81f4e..cbead8839 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java @@ -429,6 +429,5 @@ public int compareTo(UnitTask o) { public void run() { consumer.accept(unit); } - } }