Skip to content

Commit

Permalink
d'oh. observers now concurrent skip list. reject joins if not observe…
Browse files Browse the repository at this point in the history
…r. better logging
  • Loading branch information
Hellblazer committed Jun 18, 2024
1 parent 4a47caa commit 1f0a47f
Showing 1 changed file with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@

import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
Expand All @@ -51,7 +48,7 @@ public class ViewManagement {
private static final Logger log = LoggerFactory.getLogger(ViewManagement.class);

final AtomicReference<HexBloom> diadem = new AtomicReference<>();
final Set<Digest> observers = new TreeSet<>();
final Set<Digest> observers = new ConcurrentSkipListSet<>();
private final AtomicInteger attempt = new AtomicInteger();
private final Digest bootstrapView;
private final DynamicContext<Participant> context;
Expand Down Expand Up @@ -288,6 +285,11 @@ void join(Join join, Digest from, StreamObserver<Gateway> responseObserver, Time
.toList(), from, responseObserver, timer);
return;
}
if (!observers.contains(node.getId())) {
log.warn("Join not observer! from: {} observers: {} on: {}", from, observers, node.getId());
responseObserver.onNext(Gateway.getDefaultInstance());
return;
}
if (!thisView.equals(joinView)) {
responseObserver.onError(new StatusRuntimeException(
Status.OUT_OF_RANGE.withDescription("View: " + joinView + " does not match: " + thisView)));
Expand Down Expand Up @@ -376,8 +378,8 @@ boolean joined() {
*/
void maybeViewChange() {
if (context.size() == 1 && joins.size() < context.getRingCount() - 1) {
log.info("Do not have minimum cluster size: {} required: {} for: {} on: {}", joins.size() + context.size(),
4, currentView(), node.getId());
log.info("Cannot form cluster: {} with: {} members, required:4 on: {}", currentView(),
joins.size() + context.size(), node.getId());
view.scheduleViewChange();
return;
}
Expand All @@ -390,7 +392,7 @@ void maybeViewChange() {
// Use pending rebuttals as a proxy for stability
if (view.hasPendingRebuttals()) {
log.debug("Pending rebuttals in view: {} on: {}", currentView(), node.getId());
view.scheduleViewChange(1); // 1 TTL round to check again
view.scheduleViewChange(2); // 2 TTL round2 to check again
} else {
view.scheduleFinalizeViewChange();
}
Expand Down Expand Up @@ -474,8 +476,8 @@ Redirect seed(Registration registration, Digest from) {

final var introductions = observers.stream().map(context::getMember).toList();

log.debug("Member seeding: {} view: {} context: {} introductions: {} on: {}", newMember.getId(),
currentView(), context.getId(), introductions.size(), node.getId());
log.info("Member seeding: {} view: {} context: {} introductions: {} on: {}", newMember.getId(),
currentView(), context.getId(), introductions.stream().map(p -> p.getId()).toList(), node.getId());
return Redirect.newBuilder()
.setView(currentView().toDigeste())
.addAllIntroductions(introductions.stream()
Expand Down Expand Up @@ -599,7 +601,7 @@ private void setDiadem(final HexBloom hex) {
diadem.set(hex);
currentView.set(diadem.get().compactWrapped());
resetObservers();
log.debug("View: {} set diadem: {} observers: {} view: {} context: {} size: {} on: {}", context.getId(),
log.trace("View: {} set diadem: {} observers: {} view: {} context: {} size: {} on: {}", context.getId(),
diadem.get().compactWrapped(), observers.stream().toList(), currentView(), context.getId(),
context.size(), node.getId());
}
Expand Down

0 comments on commit 1f0a47f

Please sign in to comment.