Skip to content

Commit

Permalink
firm up join
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 29, 2024
1 parent e40b25c commit a799b86
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 39 deletions.
71 changes: 33 additions & 38 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,31 +76,31 @@
public class CHOAM {
private static final Logger log = LoggerFactory.getLogger(CHOAM.class);

private final Map<ULong, CheckpointState> cachedCheckpoints = new ConcurrentHashMap<>();
private final AtomicReference<HashedCertifiedBlock> checkpoint = new AtomicReference<>();
private final ReliableBroadcaster combine;
private final CommonCommunications<Terminal, Concierge> comm;
private final AtomicReference<Committee> current = new AtomicReference<>();
private final AtomicReference<CompletableFuture<SynchronizedState>> futureBootstrap = new AtomicReference<>();
private final AtomicReference<ScheduledFuture<?>> futureSynchronization = new AtomicReference<>();
private final AtomicReference<HashedCertifiedBlock> genesis = new AtomicReference<>();
private final AtomicReference<HashedCertifiedBlock> head = new AtomicReference<>();
private final AtomicReference<nextView> next = new AtomicReference<>();
private final AtomicReference<Digest> nextViewId = new AtomicReference<>();
private final Parameters params;
private final PriorityBlockingQueue<HashedCertifiedBlock> pending = new PriorityBlockingQueue<>();
private final RoundScheduler roundScheduler;
private final Session session;
private final AtomicBoolean started = new AtomicBoolean();
private final Store store;
private final CommonCommunications<TxnSubmission, Submitter> submissionComm;
private final Combine.Transitions transitions;
private final TransSubmission txnSubmission = new TransSubmission();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final PendingViews pendingViews = new PendingViews();
private final ScheduledExecutorService scheduler;
private final Semaphore linear = new Semaphore(1);
private volatile AtomicBoolean ongoingJoin;
private final Map<ULong, CheckpointState> cachedCheckpoints = new ConcurrentHashMap<>();
private final AtomicReference<HashedCertifiedBlock> checkpoint = new AtomicReference<>();
private final ReliableBroadcaster combine;
private final CommonCommunications<Terminal, Concierge> comm;
private final AtomicReference<Committee> current = new AtomicReference<>();
private final AtomicReference<CompletableFuture<SynchronizedState>> futureBootstrap = new AtomicReference<>();
private final AtomicReference<ScheduledFuture<?>> futureSynchronization = new AtomicReference<>();
private final AtomicReference<HashedCertifiedBlock> genesis = new AtomicReference<>();
private final AtomicReference<HashedCertifiedBlock> head = new AtomicReference<>();
private final AtomicReference<nextView> next = new AtomicReference<>();
private final AtomicReference<Digest> nextViewId = new AtomicReference<>();
private final Parameters params;
private final PriorityBlockingQueue<HashedCertifiedBlock> pending = new PriorityBlockingQueue<>();
private final RoundScheduler roundScheduler;
private final Session session;
private final AtomicBoolean started = new AtomicBoolean();
private final Store store;
private final CommonCommunications<TxnSubmission, Submitter> submissionComm;
private final Combine.Transitions transitions;
private final TransSubmission txnSubmission = new TransSubmission();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final PendingViews pendingViews = new PendingViews();
private final ScheduledExecutorService scheduler;
private final Semaphore linear = new Semaphore(1);
private final AtomicBoolean ongoingJoin = new AtomicBoolean();

public CHOAM(Parameters params) {
scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
Expand Down Expand Up @@ -734,11 +734,8 @@ private void reconfigure(Digest hash, Reconfigure reconfigure) {
} else {
current.set(new Client(validators, getViewId()));
}
final var oj = ongoingJoin;
ongoingJoin = null;
if (oj != null) {
if (ongoingJoin.compareAndSet(true, false)) {
log.trace("Halting ongoing join on: {}", params.member().getId());
oj.set(true);
}
log.info("Reconfigured to view: {} committee: {} validators: {} on: {}", new Digest(reconfigure.getId()),
current.get().getClass().getSimpleName(), validators.entrySet()
Expand Down Expand Up @@ -1402,30 +1399,28 @@ public boolean validate(HashedCertifiedBlock hb) {
}

private void join(View view) {
if (ongoingJoin != null) {
if (!ongoingJoin.compareAndSet(false, true)) {
throw new IllegalStateException("Ongoing join should have been cancelled");
}
log.trace("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
var servers = new ConcurrentSkipListSet<>(validators.keySet());
var joined = new AtomicInteger();
var halt = new AtomicBoolean(false);
ongoingJoin = halt;
log.trace("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
var scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory());
AtomicReference<Runnable> action = new AtomicReference<>();
var attempts = new AtomicInteger();
action.set(() -> {
log.trace("Join attempt: {} halt: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(),
halt.get(), joined.get(), view.getMajority(), params.member().getId());
if (!halt.get() & joined.get() < view.getMajority()) {
log.trace("Join attempt: {} ongoing: {} joined: {} majority: {} on: {}", attempts.incrementAndGet(),
ongoingJoin.get(), joined.get(), view.getMajority(), params.member().getId());
if (ongoingJoin.get() & joined.get() < view.getMajority()) {
join(view, servers, joined);
if (joined.get() >= view.getMajority()) {
ongoingJoin = null;
ongoingJoin.set(false);
log.trace("Finished join of: {} diadem: {} joins: {} on: {}", nextViewId.get(),
Digest.from(view.getDiadem()), joined.get(), params.member().getId());
} else if (!halt.get()) {
} else if (ongoingJoin.get()) {
log.trace("Rescheduling join of: {} diadem: {} joins: {} on: {}", nextViewId.get(),
Digest.from(view.getDiadem()), joined.get(), params.member().getId());
scheduler.schedule(action.get(), 50, TimeUnit.MILLISECONDS);
Expand All @@ -1450,7 +1445,7 @@ private void join(View view, Collection<Member> members, AtomicInteger joined) {
.setSignature(params.member().sign(inView.toByteString()).toSig())
.build();
var countdown = new CountDownLatch(sampled.size());
sampled.stream().map(m -> {
sampled.parallelStream().map(m -> {
var connection = comm.connect(m);
log.trace("connect to: {} is: {} on: {}", m.getId(), connection, params.member().getId());
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,5 @@ public int compareTo(UnitTask o) {
public void run() {
consumer.accept(unit);
}

}
}

0 comments on commit a799b86

Please sign in to comment.