Skip to content

Commit

Permalink
interim
Browse files Browse the repository at this point in the history
cannot seem to connect to entrance server for get key state.  Weirdly intermittent connect (and subsequent fail on the retrieval) if debug/single stepping is happening.
  • Loading branch information
Hellblazer committed Dec 28, 2023
1 parent b984a14 commit 3b05df3
Show file tree
Hide file tree
Showing 13 changed files with 379 additions and 295 deletions.
170 changes: 53 additions & 117 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,33 @@

import com.codahale.metrics.Timer;
import com.google.common.collect.HashMultiset;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.salesforce.apollo.cryptography.proto.HexBloome;
import com.salesforce.apollo.fireflies.proto.*;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.cryptography.HexBloom;
import com.salesforce.apollo.cryptography.SignatureAlgorithm;
import com.salesforce.apollo.cryptography.proto.HexBloome;
import com.salesforce.apollo.fireflies.View.Node;
import com.salesforce.apollo.fireflies.View.Participant;
import com.salesforce.apollo.fireflies.View.Seed;
import com.salesforce.apollo.fireflies.View.Service;
import com.salesforce.apollo.fireflies.comm.entrance.Entrance;
import com.salesforce.apollo.fireflies.proto.*;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.ring.SliceIterator;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
import io.grpc.StatusRuntimeException;
import org.joou.ULong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -86,11 +86,13 @@ void seeding() {
var timer = metrics == null ? null : metrics.seedDuration().time();
seeding.whenComplete(join(duration, scheduler, timer));

var seedlings = new SliceIterator<>("Seedlings", node, seeds.stream()
.map(s -> seedFor(s))
.map(nw -> view.new Participant(nw))
.filter(p -> !node.getId().equals(p.getId()))
.collect(Collectors.toList()), approaches);
var bootstrappers = seeds.stream()
.map(this::seedFor)
.map(nw -> view.new Participant(nw))
.filter(p -> !node.getId().equals(p.getId()))
.collect(Collectors.toList());
view.phase1Validation(bootstrappers); // Phase 1 can only rely upon the kindness of our initial configuration
var seedlings = new SliceIterator<>("Seedlings", node, bootstrappers, approaches);
AtomicReference<Runnable> reseed = new AtomicReference<>();
reseed.set(() -> {
final var registration = registration();
Expand All @@ -111,46 +113,27 @@ private void bootstrap() {
log.info("Bootstrapping seed node view: {} context: {} on: {}", view.currentView(), this.context.getId(),
node.getId());
var nw = node.getNote();
final var sched = scheduler;
final var dur = duration;

view.bootstrap(nw, sched, dur);
view.bootstrap(nw, scheduler, duration);
}

private boolean complete(CompletableFuture<Redirect> redirect, Optional<ListenableFuture<Redirect>> futureSailor,
Member m) {
private boolean complete(CompletableFuture<Redirect> redirect, Optional<Redirect> futureSailor, Member m) {
if (futureSailor.isEmpty()) {
return true;
}
try {
final var r = futureSailor.get().get();
if (redirect.complete(r)) {
log.info("Redirect to view: {} context: {} from: {} on: {}", Digest.from(r.getView()),
this.context.getId(), m.getId(), node.getId());
}
return false;
} catch (ExecutionException ex) {
if (ex.getCause() instanceof StatusRuntimeException sre) {
switch (sre.getStatus().getCode()) {
case RESOURCE_EXHAUSTED:
log.trace("SRE in redirect: {} on: {}", sre.getStatus(), node.getId());
break;
default:
log.trace("SRE in redirect: {} on: {}", sre.getStatus(), node.getId());
}
} else {
log.error("Error in redirect: {} on: {}", ex.getCause(), node.getId());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (CancellationException e) {
// noop
if (redirect.isDone()) {
return true;
}
return true;
final var r = futureSailor.get();
if (redirect.complete(r)) {
log.info("Redirect to view: {} context: {} from: {} on: {}", Digest.from(r.getView()), this.context.getId(),
m.getId(), node.getId());
}
return false;
}

private boolean completeGateway(Participant member, CompletableFuture<Bound> gateway,
Optional<ListenableFuture<Gateway>> futureSailor, HashMultiset<HexBloome> diadems,
Optional<Gateway> futureSailor, HashMultiset<BootstrapTrust> trusts,
Set<SignedNote> initialSeedSet, Digest v, int majority) {
if (futureSailor.isEmpty()) {
return true;
Expand All @@ -159,45 +142,7 @@ private boolean completeGateway(Participant member, CompletableFuture<Bound> gat
return false;
}

Gateway g;
try {
g = futureSailor.get().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof StatusRuntimeException sre) {
switch (sre.getStatus().getCode()) {
case RESOURCE_EXHAUSTED:
log.trace("Resource exhausted in join: {} with: {} : {} on: {}", v, member.getId(), sre.getStatus(),
node.getId());
break;
case OUT_OF_RANGE:
log.debug("View change in join: {} with: {} : {} on: {}", v, member.getId(), sre.getStatus(),
node.getId());
view.resetBootstrapView();
node.reset();
Thread.ofVirtual().factory().newThread(Utils.wrapped(() -> seeding(), log)).start();
return false;
case DEADLINE_EXCEEDED:
log.trace("Join timeout for view: {} with: {} : {} on: {}", v, member.getId(), sre.getStatus(),
node.getId());
break;
case UNAUTHENTICATED:
log.trace("Join unauthenticated for view: {} with: {} : {} on: {}", v, member.getId(),
sre.getStatus(), node.getId());
break;
default:
log.warn("Failure in join: {} with: {} : {} on: {}", v, member.getId(), sre.getStatus(),
node.getId());
}
} else {
log.error("Failure in join: {} with: {} on: {}", v, member.getId(), node.getId(), e.getCause());
}
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (CancellationException e) {
return true;
}
Gateway g = futureSailor.get();

if (g.equals(Gateway.getDefaultInstance())) {
return true;
Expand All @@ -207,28 +152,29 @@ private boolean completeGateway(Participant member, CompletableFuture<Bound> gat
return true;
}

if (g.getDiadem().equals(HexBloome.getDefaultInstance())) {
log.trace("Empty view in join returned from: {} on: {}", member.getId(), node.getId());
if (g.getTrust().equals(BootstrapTrust.getDefaultInstance()) || g.getTrust()
.getDiadem()
.equals(HexBloome.getDefaultInstance())) {
log.trace("Empty bootstrap trust in join returned from: {} on: {}", member.getId(), node.getId());
return true;
}
diadems.add(g.getDiadem());
trusts.add(g.getTrust());
initialSeedSet.addAll(g.getInitialSeedSetList());
log.trace("Initial seed set count: {} view: {} from: {} on: {}", g.getInitialSeedSetCount(), v, member.getId(),
node.getId());

var vs = diadems.entrySet()
.stream()
.filter(e -> e.getCount() >= majority)
.map(e -> e.getElement())
.findFirst()
.orElse(null);
if (vs != null) {
if (validate(v, g, gateway, diadems, initialSeedSet, majority)) {
return false;
}
var trust = trusts.entrySet()
.stream()
.filter(e -> e.getCount() >= majority)
.map(e -> e.getElement())
.findFirst()
.orElse(null);
if (trust != null) {
validate(trust, gateway, initialSeedSet);
} else {
log.debug("Gateway received, trust count: {} majority: {} from: {} view: {} context: {} on: {}",
trusts.size(), majority, member.getId(), v, this.context.getId(), node.getId());
}
log.debug("Gateway received, view count: {} majority: {} from: {} view: {} context: {} on: {}", diadems.size(),
majority, member.getId(), v, this.context.getId(), node.getId());
return true;
}

Expand Down Expand Up @@ -281,7 +227,7 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu
var regate = new AtomicReference<Runnable>();
var retries = new AtomicInteger();

HashMultiset<HexBloome> diadems = HashMultiset.create();
HashMultiset<BootstrapTrust> trusts = HashMultiset.create();
HashSet<SignedNote> initialSeedSet = new HashSet<>();

final var cardinality = redirect.getCardinality();
Expand All @@ -298,12 +244,15 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu
redirecting.iterate((link, m) -> {
log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId());
return link.join(join, params.seedingTimeout());
}, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, diadems,
}, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, trusts,
initialSeedSet, v, majority), () -> {
if (gateway.isDone()) {
return;
}
if (retries.get() < params.joinRetries()) {
log.debug("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(),
params.joinRetries(), node.getId());
diadems.clear();
trusts.clear();
initialSeedSet.clear();
scheduler.schedule(exec(() -> regate.get().run()),
Entropy.nextBitsStreamLong(params.retryDelay().toNanos()), TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -338,28 +287,15 @@ private NoteWrapper seedFor(Seed seed) {
return new NoteWrapper(seedNote, digestAlgo);
}

private boolean validate(Digest v, Gateway g, CompletableFuture<Bound> gateway, HashMultiset<HexBloome> hexes,
Set<SignedNote> successors, int majority) {
final var max = hexes.entrySet()
.stream()
.filter(e -> e.getCount() >= majority)
.map(e -> e.getElement())
.findFirst();
var hex = max.orElse(null);
if (hex != null) {
final var hexBloom = new HexBloom(hex);
if (gateway.complete(
new Bound(hexBloom, successors.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList()))) {
log.info("Gateway acquired: {} context: {} on: {}", hexBloom.compact(), this.context.getId(),
node.getId());
}
return true;
private void validate(BootstrapTrust trust, CompletableFuture<Bound> gateway, Set<SignedNote> initialSeedSet) {
final var hexBloom = new HexBloom(trust.getDiadem());
if (gateway.complete(
new Bound(hexBloom, trust.getSuccessorsList().stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList(),
initialSeedSet.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList()))) {
log.info("Gateway acquired: {} context: {} on: {}", hexBloom.compact(), this.context.getId(), node.getId());
}
log.info("Gateway: {} majority not achieved: {} context: {} on: {}", v, majority, this.context.getId(),
node.getId());
return false;
}

record Bound(HexBloom view, List<NoteWrapper> successors) {
record Bound(HexBloom view, List<NoteWrapper> successors, List<NoteWrapper> initialSeedSet) {
}
}
Loading

0 comments on commit 3b05df3

Please sign in to comment.