Skip to content

Commit

Permalink
Moar view stuff
Browse files Browse the repository at this point in the history
Spawn a ViewManagement.populate using the supplied initial seed set to help ensure population coverage and prevent successor loops with other members who just happened to joined in the same view.
  • Loading branch information
Hellblazer committed Dec 30, 2023
1 parent b3d94ae commit b3e9be5
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,26 +181,25 @@ private boolean completeGateway(Participant member, CompletableFuture<Bound> gat

private void gatewaySRE(Digest v, Entrance link, StatusRuntimeException sre, AtomicInteger abandon) {
if (sre.getStatus().getCode().equals(Status.OUT_OF_RANGE.getCode())) {
log.debug("Gateway view: {} invalid: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
log.info("Gateway view: {} invalid: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
} else if (sre.getStatus().getCode().equals(Status.FAILED_PRECONDITION.getCode())) {
log.debug("Gateway view: {} unavailable: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
log.info("Gateway view: {} unavailable: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
} else if (sre.getStatus().getCode().equals(Status.PERMISSION_DENIED.getCode())) {
log.debug("Gateway view: {} permission denied: {} from: {} on: {}", v, sre.getMessage(),
link.getMember().getId(), node.getId());
log.info("Gateway view: {} permission denied: {} from: {} on: {}", v, sre.getMessage(),
link.getMember().getId(), node.getId());
abandon.incrementAndGet();
} else if (sre.getStatus().getCode().equals(Status.RESOURCE_EXHAUSTED.getCode())) {
log.debug("Gateway view: {} full: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
log.info("Gateway view: {} full: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
} else {
log.debug("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
log.info("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
}
;
}

private Join join(Digest v) {
Expand All @@ -224,8 +223,8 @@ private Join join(Digest v) {
this.context.rebalance(r.getCardinality());
node.nextNote(view);

log.debug("Completing redirect to view: {} context: {} successors: {} on: {}", view, this.context.getId(),
r.getSuccessorsCount(), node.getId());
log.debug("Completing redirect to view: {} context: {} sample: {} on: {}", view, this.context.getId(),
r.getSampleCount(), node.getId());
if (timer != null) {
timer.close();
}
Expand All @@ -234,12 +233,12 @@ private Join join(Digest v) {
}

private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecutorService scheduler) {
var successors = redirect.getSuccessorsList()
.stream()
.map(sn -> new NoteWrapper(sn.getNote(), digestAlgo))
.map(nw -> view.new Participant(nw))
.collect(Collectors.toList());
log.info("Redirecting to: {} context: {} successors: {} on: {}", v, this.context.getId(), successors.size(),
var sample = redirect.getSampleList()
.stream()
.map(sn -> new NoteWrapper(sn.getNote(), digestAlgo))
.map(nw -> view.new Participant(nw))
.collect(Collectors.toList());
log.info("Redirecting to: {} context: {} sample: {} on: {}", v, this.context.getId(), sample.size(),
node.getId());
var gateway = new CompletableFuture<Bound>();
var timer = metrics == null ? null : metrics.joinDuration().time();
Expand All @@ -258,7 +257,7 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu
this.context.rebalance(cardinality);
node.nextNote(v);

final var redirecting = new SliceIterator<>("Gateways", node, successors, approaches);
final var redirecting = new SliceIterator<>("Gateways", node, sample, approaches);
var majority = redirect.getBootstrap() ? 1 : Context.minimalQuorum(redirect.getRings(), this.context.getBias());
final var join = join(v);
final var abandon = new AtomicInteger();
Expand All @@ -267,36 +266,41 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu
log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId());
try {
var g = link.join(join, params.seedingTimeout());
if (g.equals(Gateway.getDefaultInstance())) {
log.debug("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId());
if (g == null || g.equals(Gateway.getDefaultInstance())) {
log.info("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId());
abandon.incrementAndGet();
return null;
}
return g;
} catch (StatusRuntimeException sre) {
gatewaySRE(v, link, sre, abandon);
return null;
} catch (Throwable t) {
log.info("Gateway view: {} error: {} from: {} on: {}", v, t.toString(), link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
return null;
}
}, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, trusts,
initialSeedSet, v, majority), () -> {
if (gateway.isDone()) {
return;
}
if (abandon.get() >= majority) {
log.debug("Abandoning Gateway view: {} reseeding on: {}", v, node.getId());
log.info("Abandoning Gateway view: {} reseeding on: {}", v, node.getId());
seeding();
} else {
abandon.set(0);
if (retries.get() < params.joinRetries()) {
log.debug("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(),
params.joinRetries(), node.getId());
log.info("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(),
params.joinRetries(), node.getId());
trusts.clear();
initialSeedSet.clear();
scheduler.schedule(Utils.wrapped(regate.get(), log),
Entropy.nextBitsStreamLong(params.retryDelay().toNanos()),
TimeUnit.NANOSECONDS);
} else {
log.error("Failed to join view: {} cannot obtain majority on: {}", view, node.getId());
log.error("Failed to join view: {} cannot obtain majority Gateway on: {}", view, node.getId());
view.stop();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

/**
* @author hal.hildebrand
*
*/
public record Parameters(int joinRetries, int minimumBiffCardinality, int rebuttalTimeout, int viewChangeRounds,
int finalizeViewRounds, double fpr, int maximumTxfr, Duration retryDelay, int maxPending,
Expand Down Expand Up @@ -40,10 +39,9 @@ public static class Builder {
/**
* Maximum number of elements to transfer per type per update
*/
private int maximumTxfr = 10;
private int maximumTxfr = 1024;
/**
* Maximum pending joins
*
*/
private int maxPending = 15;
/**
Expand Down Expand Up @@ -81,105 +79,105 @@ public int getCrowns() {
return crowns;
}

public Builder setCrowns(int crowns) {
this.crowns = crowns;
return this;
}

public int getFinalizeViewRounds() {
return finalizeViewRounds;
}

public Builder setFinalizeViewRounds(int finalizeViewRounds) {
this.finalizeViewRounds = finalizeViewRounds;
return this;
}

public double getFpr() {
return fpr;
}

public Builder setFpr(double fpr) {
this.fpr = fpr;
return this;
}

public int getJoinRetries() {
return joinRetries;
}

public int getMaximumTxfr() {
return maximumTxfr;
public Builder setJoinRetries(int joinRetries) {
this.joinRetries = joinRetries;
return this;
}

public int getMaxPending() {
return maxPending;
}

public int getMinimumBiffCardinality() {
return minimumBiffCardinality;
}

public int getRebuttalTimeout() {
return rebuttalTimeout;
}

public Duration getRetryDelay() {
return retryDelay;
}

public Duration getSeedingTimout() {
return seedingTimout;
}

public int getValidationRetries() {
return validationRetries;
}

public int getViewChangeRounds() {
return viewChangeRounds;
}

public Builder setCrowns(int crowns) {
this.crowns = crowns;
return this;
}

public Builder setFinalizeViewRounds(int finalizeViewRounds) {
this.finalizeViewRounds = finalizeViewRounds;
return this;
}

public Builder setFpr(double fpr) {
this.fpr = fpr;
public Builder setMaxPending(int maxPending) {
this.maxPending = maxPending;
return this;
}

public Builder setJoinRetries(int joinRetries) {
this.joinRetries = joinRetries;
return this;
public int getMaximumTxfr() {
return maximumTxfr;
}

public Builder setMaximumTxfr(int maximumTxfr) {
this.maximumTxfr = maximumTxfr;
return this;
}

public Builder setMaxPending(int maxPending) {
this.maxPending = maxPending;
return this;
public int getMinimumBiffCardinality() {
return minimumBiffCardinality;
}

public Builder setMinimumBiffCardinality(int minimumBiffCardinality) {
this.minimumBiffCardinality = minimumBiffCardinality;
return this;
}

public int getRebuttalTimeout() {
return rebuttalTimeout;
}

public Builder setRebuttalTimeout(int rebuttalTimeout) {
this.rebuttalTimeout = rebuttalTimeout;
return this;
}

public Duration getRetryDelay() {
return retryDelay;
}

public Builder setRetryDelay(Duration retryDelay) {
this.retryDelay = retryDelay;
return this;
}

public Duration getSeedingTimout() {
return seedingTimout;
}

public Builder setSeedingTimout(Duration seedingTimout) {
this.seedingTimout = seedingTimout;
return this;
}

public int getValidationRetries() {
return validationRetries;
}

public Builder setValidationRetries(int validationRetries) {
this.validationRetries = validationRetries;
return this;
}

public int getViewChangeRounds() {
return viewChangeRounds;
}

public Builder setViewChangeRounds(int viewChangeRounds) {
this.viewChangeRounds = viewChangeRounds;
return this;
Expand Down
Loading

0 comments on commit b3e9be5

Please sign in to comment.