diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 1341ac442..722583487 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -8,33 +8,33 @@ import com.codahale.metrics.Timer; import com.google.common.collect.HashMultiset; -import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; -import com.salesforce.apollo.cryptography.proto.HexBloome; -import com.salesforce.apollo.fireflies.proto.*; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.HexBloom; import com.salesforce.apollo.cryptography.SignatureAlgorithm; +import com.salesforce.apollo.cryptography.proto.HexBloome; import com.salesforce.apollo.fireflies.View.Node; import com.salesforce.apollo.fireflies.View.Participant; import com.salesforce.apollo.fireflies.View.Seed; import com.salesforce.apollo.fireflies.View.Service; import com.salesforce.apollo.fireflies.comm.entrance.Entrance; +import com.salesforce.apollo.fireflies.proto.*; import com.salesforce.apollo.membership.Context; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.ring.SliceIterator; import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; -import io.grpc.StatusRuntimeException; import org.joou.ULong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -86,11 +86,13 @@ void seeding() { var timer = metrics == null ? null : metrics.seedDuration().time(); seeding.whenComplete(join(duration, scheduler, timer)); - var seedlings = new SliceIterator<>("Seedlings", node, seeds.stream() - .map(s -> seedFor(s)) - .map(nw -> view.new Participant(nw)) - .filter(p -> !node.getId().equals(p.getId())) - .collect(Collectors.toList()), approaches); + var bootstrappers = seeds.stream() + .map(this::seedFor) + .map(nw -> view.new Participant(nw)) + .filter(p -> !node.getId().equals(p.getId())) + .collect(Collectors.toList()); + view.phase1Validation(bootstrappers); // Phase 1 can only rely upon the kindness of our initial configuration + var seedlings = new SliceIterator<>("Seedlings", node, bootstrappers, approaches); AtomicReference reseed = new AtomicReference<>(); reseed.set(() -> { final var registration = registration(); @@ -111,46 +113,27 @@ private void bootstrap() { log.info("Bootstrapping seed node view: {} context: {} on: {}", view.currentView(), this.context.getId(), node.getId()); var nw = node.getNote(); - final var sched = scheduler; - final var dur = duration; - view.bootstrap(nw, sched, dur); + view.bootstrap(nw, scheduler, duration); } - private boolean complete(CompletableFuture redirect, Optional> futureSailor, - Member m) { + private boolean complete(CompletableFuture redirect, Optional futureSailor, Member m) { if (futureSailor.isEmpty()) { return true; } - try { - final var r = futureSailor.get().get(); - if (redirect.complete(r)) { - log.info("Redirect to view: {} context: {} from: {} on: {}", Digest.from(r.getView()), - this.context.getId(), m.getId(), node.getId()); - } - return false; - } catch (ExecutionException ex) { - if (ex.getCause() instanceof StatusRuntimeException sre) { - switch (sre.getStatus().getCode()) { - case RESOURCE_EXHAUSTED: - log.trace("SRE in redirect: {} on: {}", sre.getStatus(), node.getId()); - break; - default: - log.trace("SRE in redirect: {} on: {}", sre.getStatus(), node.getId()); - } - } else { - log.error("Error in redirect: {} on: {}", ex.getCause(), node.getId()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (CancellationException e) { - // noop + if (redirect.isDone()) { + return true; } - return true; + final var r = futureSailor.get(); + if (redirect.complete(r)) { + log.info("Redirect to view: {} context: {} from: {} on: {}", Digest.from(r.getView()), this.context.getId(), + m.getId(), node.getId()); + } + return false; } private boolean completeGateway(Participant member, CompletableFuture gateway, - Optional> futureSailor, HashMultiset diadems, + Optional futureSailor, HashMultiset trusts, Set initialSeedSet, Digest v, int majority) { if (futureSailor.isEmpty()) { return true; @@ -159,45 +142,7 @@ private boolean completeGateway(Participant member, CompletableFuture gat return false; } - Gateway g; - try { - g = futureSailor.get().get(); - } catch (ExecutionException e) { - if (e.getCause() instanceof StatusRuntimeException sre) { - switch (sre.getStatus().getCode()) { - case RESOURCE_EXHAUSTED: - log.trace("Resource exhausted in join: {} with: {} : {} on: {}", v, member.getId(), sre.getStatus(), - node.getId()); - break; - case OUT_OF_RANGE: - log.debug("View change in join: {} with: {} : {} on: {}", v, member.getId(), sre.getStatus(), - node.getId()); - view.resetBootstrapView(); - node.reset(); - Thread.ofVirtual().factory().newThread(Utils.wrapped(() -> seeding(), log)).start(); - return false; - case DEADLINE_EXCEEDED: - log.trace("Join timeout for view: {} with: {} : {} on: {}", v, member.getId(), sre.getStatus(), - node.getId()); - break; - case UNAUTHENTICATED: - log.trace("Join unauthenticated for view: {} with: {} : {} on: {}", v, member.getId(), - sre.getStatus(), node.getId()); - break; - default: - log.warn("Failure in join: {} with: {} : {} on: {}", v, member.getId(), sre.getStatus(), - node.getId()); - } - } else { - log.error("Failure in join: {} with: {} on: {}", v, member.getId(), node.getId(), e.getCause()); - } - return true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } catch (CancellationException e) { - return true; - } + Gateway g = futureSailor.get(); if (g.equals(Gateway.getDefaultInstance())) { return true; @@ -207,28 +152,29 @@ private boolean completeGateway(Participant member, CompletableFuture gat return true; } - if (g.getDiadem().equals(HexBloome.getDefaultInstance())) { - log.trace("Empty view in join returned from: {} on: {}", member.getId(), node.getId()); + if (g.getTrust().equals(BootstrapTrust.getDefaultInstance()) || g.getTrust() + .getDiadem() + .equals(HexBloome.getDefaultInstance())) { + log.trace("Empty bootstrap trust in join returned from: {} on: {}", member.getId(), node.getId()); return true; } - diadems.add(g.getDiadem()); + trusts.add(g.getTrust()); initialSeedSet.addAll(g.getInitialSeedSetList()); log.trace("Initial seed set count: {} view: {} from: {} on: {}", g.getInitialSeedSetCount(), v, member.getId(), node.getId()); - var vs = diadems.entrySet() - .stream() - .filter(e -> e.getCount() >= majority) - .map(e -> e.getElement()) - .findFirst() - .orElse(null); - if (vs != null) { - if (validate(v, g, gateway, diadems, initialSeedSet, majority)) { - return false; - } + var trust = trusts.entrySet() + .stream() + .filter(e -> e.getCount() >= majority) + .map(e -> e.getElement()) + .findFirst() + .orElse(null); + if (trust != null) { + validate(trust, gateway, initialSeedSet); + } else { + log.debug("Gateway received, trust count: {} majority: {} from: {} view: {} context: {} on: {}", + trusts.size(), majority, member.getId(), v, this.context.getId(), node.getId()); } - log.debug("Gateway received, view count: {} majority: {} from: {} view: {} context: {} on: {}", diadems.size(), - majority, member.getId(), v, this.context.getId(), node.getId()); return true; } @@ -281,7 +227,7 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu var regate = new AtomicReference(); var retries = new AtomicInteger(); - HashMultiset diadems = HashMultiset.create(); + HashMultiset trusts = HashMultiset.create(); HashSet initialSeedSet = new HashSet<>(); final var cardinality = redirect.getCardinality(); @@ -298,12 +244,15 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu redirecting.iterate((link, m) -> { log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId()); return link.join(join, params.seedingTimeout()); - }, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, diadems, + }, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, trusts, initialSeedSet, v, majority), () -> { + if (gateway.isDone()) { + return; + } if (retries.get() < params.joinRetries()) { log.debug("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(), params.joinRetries(), node.getId()); - diadems.clear(); + trusts.clear(); initialSeedSet.clear(); scheduler.schedule(exec(() -> regate.get().run()), Entropy.nextBitsStreamLong(params.retryDelay().toNanos()), TimeUnit.NANOSECONDS); @@ -338,28 +287,15 @@ private NoteWrapper seedFor(Seed seed) { return new NoteWrapper(seedNote, digestAlgo); } - private boolean validate(Digest v, Gateway g, CompletableFuture gateway, HashMultiset hexes, - Set successors, int majority) { - final var max = hexes.entrySet() - .stream() - .filter(e -> e.getCount() >= majority) - .map(e -> e.getElement()) - .findFirst(); - var hex = max.orElse(null); - if (hex != null) { - final var hexBloom = new HexBloom(hex); - if (gateway.complete( - new Bound(hexBloom, successors.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList()))) { - log.info("Gateway acquired: {} context: {} on: {}", hexBloom.compact(), this.context.getId(), - node.getId()); - } - return true; + private void validate(BootstrapTrust trust, CompletableFuture gateway, Set initialSeedSet) { + final var hexBloom = new HexBloom(trust.getDiadem()); + if (gateway.complete( + new Bound(hexBloom, trust.getSuccessorsList().stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList(), + initialSeedSet.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList()))) { + log.info("Gateway acquired: {} context: {} on: {}", hexBloom.compact(), this.context.getId(), node.getId()); } - log.info("Gateway: {} majority not achieved: {} context: {} on: {}", v, majority, this.context.getId(), - node.getId()); - return false; } - record Bound(HexBloom view, List successors) { + record Bound(HexBloom view, List successors, List initialSeedSet) { } } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Bootstrapper.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Bootstrapper.java index 303cc1f79..da8b65eec 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Bootstrapper.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Bootstrapper.java @@ -5,7 +5,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.RemovalCause; import com.google.common.collect.HashMultiset; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; import com.salesforce.apollo.archipelago.RouterImpl; import com.salesforce.apollo.cryptography.JohnHancock; import com.salesforce.apollo.cryptography.SigningThreshold; @@ -21,7 +21,6 @@ import com.salesforce.apollo.stereotomy.event.proto.KeyState_; import com.salesforce.apollo.stereotomy.event.protobuf.KeyStateImpl; import com.salesforce.apollo.stereotomy.identifier.Identifier; -import io.grpc.StatusRuntimeException; import org.checkerframework.checker.nullness.qual.Nullable; import org.joou.ULong; import org.slf4j.Logger; @@ -29,12 +28,13 @@ import java.io.InputStream; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; /** * Verifiers that delegate to the joining member's successors in the full context for key state retrieval @@ -45,7 +45,8 @@ * @author hal.hildebrand **/ public class Bootstrapper implements Verifiers { - private final static Logger log = LoggerFactory.getLogger(Bootstrapper.class); + private final static Logger log = LoggerFactory.getLogger( + Bootstrapper.class); private final List successors; private final SigningMember member; private final int majority; @@ -54,13 +55,16 @@ public class Bootstrapper implements Verifiers { private final RouterImpl.CommonCommunications communications; private final Duration operationTimeout; private final Duration operationsFrequency; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, + Thread.ofVirtual() + .factory()); public Bootstrapper(S member, Duration operationTimeout, List successors, int majority, Duration operationsFrequency, RouterImpl.CommonCommunications communications) { this.member = member; - this.successors = successors; + this.successors = new ArrayList<>(successors); this.majority = majority; this.communications = communications; this.operationTimeout = operationTimeout; @@ -109,13 +113,13 @@ public Verifier.Filtered filtered(EventCoordinates coordinates, SigningThreshold @Override public Optional getKeyState(EventCoordinates coordinates) { log.trace("Get key state: {} on: {}", coordinates, member); - return getKeyState(coordinates); + return Optional.of(Bootstrapper.this.getKeyState(coordinates)); } @Override public Optional getKeyState(Identifier identifier, ULong seqNum) { log.trace("Get key state: {}:{} on: {}", identifier, seqNum, member); - return getKeyState(identifier, seqNum); + return Optional.of(Bootstrapper.this.getKeyState(identifier, seqNum)); } @Override @@ -155,26 +159,20 @@ public Optional verifierFor(EventCoordinates coordinates) { return Optional.of(new BootstrapVerifier(coordinates.getIdentifier())); } - private boolean complete(CompletableFuture ksFuture, - Optional> futureSailor, HashMultiset keystates, - Member m) { + protected KeyState getKeyState(Identifier identifier, ULong sequenceNumber) { + return ksSeq.get(new IdentifierSequence(identifier, sequenceNumber)); + } + + private boolean complete(CompletableFuture ksFuture, Optional futureSailor, + HashMultiset keystates, Member m) { if (futureSailor.isEmpty()) { return true; } - try { - final var ks = futureSailor.get().get(); - keystates.add(ks); - } catch (ExecutionException ex) { - if (ex.getCause() instanceof StatusRuntimeException sre) { - log.trace("SRE in get key state: {} on: {}", sre.getStatus(), member.getId()); - } else { - log.error("Error in key state: {} on: {}", ex.getCause(), member.getId()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (CancellationException e) { - // noop + if (ksFuture.isDone()) { + return true; } + final var ks = futureSailor.get(); + keystates.add(ks); var vs = keystates.entrySet() .stream() @@ -192,26 +190,16 @@ private boolean complete(CompletableFuture ksFuture, return true; } - private boolean completeValidation(CompletableFuture valid, - Optional> futureSailor, + private boolean completeValidation(CompletableFuture valid, Optional futureSailor, HashMultiset validations, Member m) { if (futureSailor.isEmpty()) { return true; } - try { - final var v = futureSailor.get().get(); - validations.add(v); - } catch (ExecutionException ex) { - if (ex.getCause() instanceof StatusRuntimeException sre) { - log.trace("SRE in validate: {} on: {}", sre.getStatus(), member.getId()); - } else { - log.error("Error in validate: {} on: {}", ex.getCause(), member.getId()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (CancellationException e) { - // noop + if (valid.isDone()) { + return true; } + final var v = futureSailor.get(); + validations.add(v); var validation = validations.entrySet() .stream() @@ -228,18 +216,20 @@ private boolean completeValidation(CompletableFuture valid, } private KeyState delegate(EventCoordinates coordinates) { + log.info("Get key state: {} from slice on: {}", coordinates, member); var iterator = new SliceIterator<>("Retrieve KeyState", member, successors, communications); final var coords = coordinates.toEventCoords(); var ks = new CompletableFuture(); HashMultiset keystates = HashMultiset.create(); iterator.iterate((link, m) -> { - log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId()); + log.debug("Requesting Key State from: {} on: {}", link.getMember().getId(), member.getId()); return link.getKeyState(coords); }, (futureSailor, link, m) -> complete(ks, futureSailor, keystates, m), () -> { if (!ks.isDone()) { + log.warn("Failed to retrieve key state: {} from slice on: {}", coordinates, member); ks.complete(null); } - }, Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), Duration.ofMillis(10)); + }, scheduler, operationsFrequency); try { return ks.get(); } catch (InterruptedException e) { @@ -251,18 +241,20 @@ private KeyState delegate(EventCoordinates coordinates) { } private KeyState delegate(IdentifierSequence idSeq) { + log.info("Get key state: {} from slice on: {}", idSeq, member); var iterator = new SliceIterator<>("Retrieve KeyState", member, successors, communications); final var identifierSeq = idSeq.toIdSeq(); var ks = new CompletableFuture(); HashMultiset keystates = HashMultiset.create(); iterator.iterate((link, m) -> { - log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId()); + log.debug("Requesting Key State from: {} on: {}", link.getMember().getId(), member.getId()); return link.getKeyState(identifierSeq); }, (futureSailor, link, m) -> complete(ks, futureSailor, keystates, m), () -> { if (!ks.isDone()) { + log.warn("Failed to retrieve key state: {} from slice on: {}", idSeq, member); ks.complete(null); } - }, Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), Duration.ofMillis(10)); + }, scheduler, operationsFrequency); try { return ks.get(); } catch (InterruptedException e) { @@ -273,18 +265,29 @@ private KeyState delegate(IdentifierSequence idSeq) { return null; } + private KeyState getKeyState(EventCoordinates coordinates) { + return ksCoords.get(coordinates); + } + private boolean validate(EventCoordinates coordinates) { + log.info("Validate event: {} from slice on: {}", coordinates, member); + var succ = successors.stream().filter(m -> coordinates.getIdentifier().equals(m.getId())).findFirst(); + if (succ.isPresent()) { + return true; + } var iterator = new SliceIterator<>("Retrieve KeyState", member, successors, communications); var valid = new CompletableFuture(); HashMultiset validations = HashMultiset.create(); iterator.iterate((link, m) -> { - log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId()); + log.debug("Requesting Validation: {} from: {} on: {}", coordinates, link.getMember().getId(), + member.getId()); return link.validate(coordinates.toEventCoords()); }, (futureSailor, link, m) -> completeValidation(valid, futureSailor, validations, m), () -> { if (!valid.isDone()) { + log.warn("Failed to validate: {} from slice on: {}", coordinates, member); valid.complete(Validation.newBuilder().setResult(false).build()); } - }, Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), Duration.ofMillis(10)); + }, scheduler, operationsFrequency); try { return valid.get().getResult(); } catch (InterruptedException e) { @@ -295,6 +298,60 @@ private boolean validate(EventCoordinates coordinates) { return false; } + public static class ViewEventValidation implements EventValidation { + private EventValidation delegate; + + public ViewEventValidation(EventValidation delegate) { + this.delegate = delegate; + } + + @Override + public Verifier.Filtered filtered(EventCoordinates coordinates, SigningThreshold threshold, + JohnHancock signature, InputStream message) { + return delegate.filtered(coordinates, threshold, signature, message); + } + + @Override + public Optional getKeyState(EventCoordinates coordinates) { + return delegate.getKeyState(coordinates); + } + + @Override + public Optional getKeyState(Identifier identifier, ULong seqNum) { + return delegate.getKeyState(identifier, seqNum); + } + + @Override + public boolean validate(EstablishmentEvent event) { + return delegate.validate(event); + } + + @Override + public boolean validate(EventCoordinates coordinates) { + return delegate.validate(coordinates); + } + + @Override + public boolean verify(EventCoordinates coordinates, SigningThreshold threshold, JohnHancock signature, + InputStream message) { + return delegate.verify(coordinates, threshold, signature, message); + } + + @Override + public boolean verify(EventCoordinates coordinates, JohnHancock signature, ByteString byteString) { + return delegate.verify(coordinates, signature, byteString); + } + + @Override + public boolean verify(EventCoordinates coordinates, JohnHancock signature, InputStream message) { + return delegate.verify(coordinates, signature, message); + } + + void setDelegate(EventValidation delegate) { + this.delegate = delegate; + } + } + private record IdentifierSequence(Identifier identifier, ULong seqNum) { public IdentAndSeq toIdSeq() { return IdentAndSeq.newBuilder() @@ -317,11 +374,14 @@ public BootstrapVerifier(Identifier identifier) { @Override protected KeyState getKeyState(ULong sequenceNumber) { - return ksSeq.get(new IdentifierSequence(identifier, sequenceNumber)); + var key = new IdentifierSequence(identifier, sequenceNumber); + log.info("Get key state: {} on: {}", key, member); + return ksSeq.get(key); } @Override protected KeyState getKeyState(EventCoordinates coordinates) { + log.info("Get key state: {} on: {}", coordinates, member); return ksCoords.get(coordinates); } } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 39a723352..7eed6cf0e 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -36,7 +36,6 @@ import com.salesforce.apollo.stereotomy.EventValidation; import com.salesforce.apollo.stereotomy.KeyState; import com.salesforce.apollo.stereotomy.event.proto.EventCoords; -import com.salesforce.apollo.stereotomy.event.proto.KERL_; import com.salesforce.apollo.stereotomy.event.proto.KeyState_; import com.salesforce.apollo.stereotomy.identifier.Identifier; import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; @@ -103,10 +102,11 @@ public class View { private final Set shunned = new ConcurrentSkipListSet<>(); private final AtomicBoolean started = new AtomicBoolean(); private final Map timers = new HashMap<>(); - private final EventValidation validation; private final ReadWriteLock viewChange = new ReentrantReadWriteLock( true); private final ViewManagement viewManagement; + private final EventValidation viewValidation; + private volatile EventValidation validation; private volatile ScheduledFuture futureGossip; public View(Context context, ControlledIdentifierMember member, InetSocketAddress endpoint, @@ -119,10 +119,10 @@ public View(Context context, ControlledIdentifierMember member, Ine EventValidation validation, Router communications, Parameters params, Router gateway, DigestAlgorithm digestAlgo, FireflyMetrics metrics) { this.metrics = metrics; - this.validation = validation; this.params = params; this.digestAlgo = digestAlgo; this.context = context; + this.viewValidation = validation; this.roundTimers = new RoundScheduler(String.format("Timers for: %s", context.getId()), context.timeToLive()); this.node = new Node(member, endpoint); viewManagement = new ViewManagement(this, context, params, metrics, node, digestAlgo); @@ -133,15 +133,16 @@ public View(Context context, ControlledIdentifierMember member, Ine this.approaches = gateway.create(node, context.getId(), service, service.getClass().getCanonicalName() + ":approach", r -> new EntranceServer(gateway.getClientIdentityProvider(), r, metrics), - EntranceClient.getCreate(metrics), Entrance.getLocalLoopback(node)); + EntranceClient.getCreate(metrics), Entrance.getLocalLoopback(node, service)); gossiper = new RingCommunications<>(context, node, comm); + this.validation = EventValidation.NONE; } /** * Check the validity of a mask. A mask is valid if the following conditions are satisfied: * *
-     * - The mask is of length 2t+1
+     * - The mask is of length bias*t+1
      * - the mask has exactly t + 1 enabled elements.
      * 
* @@ -349,7 +350,7 @@ void finalizeViewChange() { final var cardinality = context.memberCount(); final var superMajority = cardinality - ((cardinality - 1) / 4); if (observations.size() < superMajority) { - log.trace("Do not have supermajority: {} required: {} for: {} on: {}", observations.size(), + log.trace("Do not have super majority: {} required: {} for: {} on: {}", observations.size(), superMajority, currentView(), node.getId()); scheduleFinalizeViewChange(2); return; @@ -387,6 +388,11 @@ void finalizeViewChange() { }); } + void finalizeViewValidation() { + validation = viewValidation; + log.info("Finalized view validation on: {}", node.getId()); + } + /** * Test accessible * @@ -426,6 +432,18 @@ void notifyListeners(List joining, List leaving) { }); } + void phase1Validation(List seeds) { + validation = new Bootstrapper(node, Duration.ofSeconds(5), seeds, 1, Duration.ofMillis(10), + approaches).getValidator(); + log.info("Phase 1 validation: {} on: {}", seeds.size(), node.getId()); + } + + void phase2Validation(List successors) { + validation = new Bootstrapper(node, Duration.ofSeconds(5), successors, context.majority(), + Duration.ofMillis(10), approaches).getValidator(); + log.info("Phase 2 validation on: {}", node.getId()); + } + /** * Remove the participant from the context * @@ -787,7 +805,7 @@ private boolean addToCurrentView(NoteWrapper note) { /** * If we monitor the target and haven't issued an alert, do so * - * @param sa + * @param target */ private void amplify(Participant target) { context.rings() @@ -974,7 +992,7 @@ private Gossip gossip(Fireflies link, int ring) { /** * Handle the gossip response from the destination * - * @param futureSailor + * @param result * @param destination * @param duration * @param scheduler @@ -1100,7 +1118,7 @@ private AccusationGossip.Builder processAccusations(BloomFilter bff) { * members * * @param p - * @param digests + * @param bff * @return */ private AccusationGossip processAccusations(BloomFilter bff, double p) { @@ -1137,7 +1155,7 @@ private NoteGossip.Builder processNotes(BloomFilter bff) { * * @param from * @param p - * @param digests + * @param bff */ private NoteGossip processNotes(Digest from, BloomFilter bff, double p) { NoteGossip.Builder builder = processNotes(bff); @@ -1170,8 +1188,7 @@ private ViewChangeGossip.Builder processObservations(BloomFilter bff) { * the inbound digests that the view has more recent information * * @param p - * @param from - * @param digests + * @param bff */ private ViewChangeGossip processObservations(BloomFilter bff, double p) { ViewChangeGossip.Builder builder = processObservations(bff); @@ -1488,10 +1505,6 @@ public Seed_ getSeed() { .build(); } - public KERL_ kerl() { - return wrapped.kerl(); - } - public JohnHancock sign(byte[] message) { return wrapped.sign(message); } @@ -1761,15 +1774,15 @@ void addAccusation(AccusationWrapper accusation) { return; } if (n.getEpoch() != accusation.getEpoch()) { - log.trace("Invalid epoch discarding accusation from {} on {} ring {} on: {}", accusation.getAccuser(), - getId(), ringNumber, node.getId()); + log.trace("Invalid epoch discarding accusation from: {} context: {} ring {} on: {}", + accusation.getAccuser(), getId(), ringNumber, node.getId()); return; } if (n.getMask().get(ringNumber)) { validAccusations[ringNumber] = accusation; if (log.isDebugEnabled()) { - log.debug("Member {} is accusing {} ring: {} on: {}", accusation.getAccuser(), getId(), ringNumber, - node.getId()); + log.debug("Member: {} is accusing: {} context: {} ring: {} on: {}", accusation.getAccuser(), + accusation.getAccused(), getId(), ringNumber, node.getId()); } } } @@ -1780,7 +1793,8 @@ void addAccusation(AccusationWrapper accusation) { void clearAccusations() { for (var acc : validAccusations) { if (acc != null) { - log.trace("Clearing accusations for: {} on: {}", getId(), node.getId()); + log.trace("Clearing accusations for: {} context: {} on: {}", acc.getAccused(), getId(), + node.getId()); break; } } @@ -1810,7 +1824,7 @@ NoteWrapper getNote() { void invalidateAccusationOnRing(int index) { validAccusations[index] = null; - log.trace("Invalidating accusations of: {} ring: {} on: {}", getId(), index, node.getId()); + log.trace("Invalidating accusations context: {} ring: {} on: {}", getId(), index, node.getId()); } boolean isAccused() { @@ -1847,21 +1861,29 @@ public class Service implements EntranceService, FFService, ServiceRouting { @Override public KeyState getKeyState(Identifier identifier, ULong seqNum, Digest from) { - if (!introduced.get()) { - log.trace("Not introduced!, ignoring key state request from: {} on: {}", from, node.getId()); + if (!viewManagement.isJoined()) { + log.trace("Not yet joined!, ignoring key state request for: {}:{} request from: {} on: {}", identifier, + seqNum, from, node.getId()); return null; } + log.trace("Retrieving key state: {}:{} for: {} on: {}", identifier, seqNum, from, node.getId()); var keyState = validation.getKeyState(identifier, seqNum); + log.trace("Returning key state: {}:{} -> {} to: {} on: {}", identifier, seqNum, keyState.isPresent(), from, + node.getId()); return keyState.isEmpty() ? null : keyState.get(); } @Override public KeyState getKeyState(EventCoordinates coordinates, Digest from) { - if (!introduced.get()) { - log.trace("Not introduced!, ignoring key state request from: {} on: {}", from, node.getId()); + if (!viewManagement.isJoined()) { + log.trace("Not yet joined!, ignoring key state request for: {} request from: {} on: {}", coordinates, + from, node.getId()); return null; } + log.trace("Retrieving key state: {} for: {} on: {}", coordinates, from, node.getId()); var keyState = validation.getKeyState(coordinates); + log.trace("Returning key state: {} -> {} to: {} on: {}", coordinates, keyState.isPresent(), from, + node.getId()); return keyState.isEmpty() ? null : keyState.get(); } @@ -1989,7 +2011,16 @@ public void update(State request, Digest from) { @Override public Validation validateCoords(EventCoords request, Digest from) { - return Validation.newBuilder().setResult(validation.validate(EventCoordinates.from(request))).build(); + var coordinates = EventCoordinates.from(request); + if (!viewManagement.isJoined()) { + log.trace("Not yet joined!, ignoring validation request: {} from: {} on: {}", from, coordinates, + node.getId()); + return Validation.newBuilder().setResult(false).build(); + } + log.trace("Validating event: {} for: {} on: {}", request, from, node.getId()); + var validate = validation.validate(coordinates); + log.trace("Returning validate: {}:{} to: {} on: {}", coordinates, validate, from, node.getId()); + return Validation.newBuilder().setResult(validate).build(); } } } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index fd4952bec..127ff65d4 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -88,8 +88,11 @@ void bootstrap(NoteWrapper nw, final ScheduledExecutorService sched, final Durat context.activate(node); resetBootstrapView(); - view.viewChange(() -> install( - new Ballot(currentView(), Collections.emptyList(), Collections.singletonList(node.getId()), digestAlgo))); + view.viewChange(() -> { + view.finalizeViewValidation(); + install( + new Ballot(currentView(), Collections.emptyList(), Collections.singletonList(node.getId()), digestAlgo)); + }); view.scheduleViewChange(); view.schedule(dur, sched); @@ -219,6 +222,7 @@ synchronized void join() { view.stop(); throw new IllegalStateException("Invalid crown"); } + view.finalizeViewValidation(); setDiadem(calculated); view.notifyListeners(context.allMembers().map(p -> p.note.getCoordinates()).toList(), Collections.emptyList()); onJoined.complete(null); @@ -302,9 +306,13 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time currentView.set(hex.compact()); bound.successors().forEach(nw -> view.addToView(nw)); + bound.initialSeedSet().forEach(nw -> view.addToView(nw)); view.reset(); + // Phase 2. We have formally started joining the view, but haven't filled out membership + view.phase2Validation(bound.successors().stream().map(nw -> context.getMember(nw.getId())).toList()); + context.allMembers().forEach(p -> p.clearAccusations()); view.introduced(); @@ -462,10 +470,24 @@ private void initiateViewChange() { private void joined(Collection seedSet, Digest from, StreamObserver responseObserver, Timer.Context timer) { var unique = new HashSet(seedSet); - context.successors(from, m -> context.isActive(m)).forEach(p -> unique.add(p.getNote().getWrapped())); - final var builder = Gateway.newBuilder().addAllInitialSeedSet(unique).setDiadem(diadem.get().toHexBloome()); - log.trace("Gateway initial seeding: {} for: {} on: {}", builder.getInitialSeedSetCount(), from, node.getId()); - var gateway = builder.build(); + final var initialSeeds = new ArrayList(seedSet); + final var successors = new HashSet(); + + context.successors(from, m -> context.isActive(m)).forEach(p -> { + var sn = p.getNote().getWrapped(); + if (unique.add(sn)) { + initialSeeds.add(sn); + } + successors.add(sn); + }); + var gateway = Gateway.newBuilder() + .addAllInitialSeedSet(initialSeeds) + .setTrust(BootstrapTrust.newBuilder() + .addAllSuccessors(successors) + .setDiadem(diadem.get().toHexBloome())) + .build(); + log.trace("Gateway initial seeding: {} successors: {} for: {} on: {}", gateway.getInitialSeedSetCount(), + successors.size(), from, node.getId()); responseObserver.onNext(gateway); responseObserver.onCompleted(); if (timer != null) { diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/Entrance.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/Entrance.java index 1664808b4..b8aab0a76 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/Entrance.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/Entrance.java @@ -6,14 +6,16 @@ */ package com.salesforce.apollo.fireflies.comm.entrance; -import com.google.common.util.concurrent.ListenableFuture; import com.salesforce.apollo.archipelago.Link; import com.salesforce.apollo.fireflies.View.Node; import com.salesforce.apollo.fireflies.proto.*; import com.salesforce.apollo.membership.Member; +import com.salesforce.apollo.stereotomy.EventCoordinates; import com.salesforce.apollo.stereotomy.event.proto.EventCoords; import com.salesforce.apollo.stereotomy.event.proto.IdentAndSeq; import com.salesforce.apollo.stereotomy.event.proto.KeyState_; +import com.salesforce.apollo.stereotomy.identifier.Identifier; +import org.joou.ULong; import java.io.IOException; import java.time.Duration; @@ -23,7 +25,7 @@ */ public interface Entrance extends Link { - static Entrance getLocalLoopback(Node node) { + static Entrance getLocalLoopback(Node node, EntranceService service) { return new Entrance() { @Override @@ -31,13 +33,15 @@ public void close() throws IOException { } @Override - public ListenableFuture getKeyState(IdentAndSeq idSeq) { - return null; + public KeyState_ getKeyState(IdentAndSeq idSeq) { + + return service.getKeyState(Identifier.from(idSeq.getIdentifier()), + ULong.valueOf(idSeq.getSequenceNumber()), getMember().getId()).toKeyState_(); } @Override - public ListenableFuture getKeyState(EventCoords coords) { - return null; + public KeyState_ getKeyState(EventCoords coords) { + return service.getKeyState(EventCoordinates.from(coords), getMember().getId()).toKeyState_(); } @Override @@ -46,29 +50,29 @@ public Member getMember() { } @Override - public ListenableFuture join(Join join, Duration timeout) { + public Gateway join(Join join, Duration timeout) { return null; } @Override - public ListenableFuture seed(Registration registration) { + public Redirect seed(Registration registration) { return null; } @Override - public ListenableFuture validate(EventCoords coords) { - return null; + public Validation validate(EventCoords coords) { + return service.validateCoords(coords, getMember().getId()); } }; } - ListenableFuture getKeyState(IdentAndSeq idSeq); + KeyState_ getKeyState(IdentAndSeq idSeq); - ListenableFuture getKeyState(EventCoords coords); + KeyState_ getKeyState(EventCoords coords); - ListenableFuture join(Join join, Duration timeout); + Gateway join(Join join, Duration timeout); - ListenableFuture seed(Registration registration); + Redirect seed(Registration registration); - ListenableFuture validate(EventCoords coords); + Validation validate(EventCoords coords); } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/EntranceClient.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/EntranceClient.java index 6297da18b..2a226d0a5 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/EntranceClient.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/comm/entrance/EntranceClient.java @@ -6,12 +6,10 @@ */ package com.salesforce.apollo.fireflies.comm.entrance; -import com.google.common.util.concurrent.ListenableFuture; import com.salesforce.apollo.archipelago.ManagedServerChannel; import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications; import com.salesforce.apollo.fireflies.FireflyMetrics; import com.salesforce.apollo.fireflies.proto.*; -import com.salesforce.apollo.fireflies.proto.EntranceGrpc.EntranceFutureStub; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.stereotomy.event.proto.EventCoords; import com.salesforce.apollo.stereotomy.event.proto.IdentAndSeq; @@ -25,13 +23,13 @@ */ public class EntranceClient implements Entrance { - private final ManagedServerChannel channel; - private final EntranceFutureStub client; - private final FireflyMetrics metrics; + private final ManagedServerChannel channel; + private final EntranceGrpc.EntranceBlockingStub client; + private final FireflyMetrics metrics; public EntranceClient(ManagedServerChannel channel, FireflyMetrics metrics) { this.channel = channel; - this.client = EntranceGrpc.newFutureStub(channel).withCompression("gzip"); + this.client = EntranceGrpc.newBlockingStub(channel).withCompression("gzip"); this.metrics = metrics; } @@ -46,12 +44,12 @@ public void close() { } @Override - public ListenableFuture getKeyState(IdentAndSeq idSeq) { + public KeyState_ getKeyState(IdentAndSeq idSeq) { return client.getKeyStateIdentifier(idSeq); } @Override - public ListenableFuture getKeyState(EventCoords coords) { + public KeyState_ getKeyState(EventCoords coords) { return client.getKeyStateCoords(coords); } @@ -61,52 +59,48 @@ public Member getMember() { } @Override - public ListenableFuture join(Join join, Duration timeout) { + public Gateway join(Join join, Duration timeout) { if (metrics != null) { var serializedSize = join.getSerializedSize(); metrics.outboundBandwidth().mark(serializedSize); metrics.outboundJoin().update(serializedSize); } - ListenableFuture result = client.withDeadlineAfter(timeout.toNanos(), TimeUnit.NANOSECONDS).join(join); - result.addListener(() -> { - if (metrics != null) { - try { - var serializedSize = result.get().getSerializedSize(); - metrics.inboundBandwidth().mark(serializedSize); - metrics.inboundGateway().update(serializedSize); - } catch (Throwable e) { - // nothing - } + Gateway result = client.withDeadlineAfter(timeout.toNanos(), TimeUnit.NANOSECONDS).join(join); + if (metrics != null) { + try { + var serializedSize = result.getSerializedSize(); + metrics.inboundBandwidth().mark(serializedSize); + metrics.inboundGateway().update(serializedSize); + } catch (Throwable e) { + // nothing } - }, r -> r.run()); + } return result; } @Override - public ListenableFuture seed(Registration registration) { + public Redirect seed(Registration registration) { if (metrics != null) { var serializedSize = registration.getSerializedSize(); metrics.outboundBandwidth().mark(serializedSize); metrics.outboundSeed().update(serializedSize); } - ListenableFuture result = client.seed(registration); - result.addListener(() -> { - if (metrics != null) { - try { - var serializedSize = result.get().getSerializedSize(); - metrics.inboundBandwidth().mark(serializedSize); - metrics.inboundRedirect().update(serializedSize); - } catch (Throwable e) { - // nothing - } + Redirect result = client.seed(registration); + if (metrics != null) { + try { + var serializedSize = result.getSerializedSize(); + metrics.inboundBandwidth().mark(serializedSize); + metrics.inboundRedirect().update(serializedSize); + } catch (Throwable e) { + // nothing } - }, r -> r.run()); + } return result; } @Override - public ListenableFuture validate(EventCoords coords) { + public Validation validate(EventCoords coords) { return client.validate(coords); } } diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/BootstrapVerifiersTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/BootstrapVerifiersTest.java index 93fd7fb2e..4ed9eb952 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/BootstrapVerifiersTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/BootstrapVerifiersTest.java @@ -1,7 +1,5 @@ package com.salesforce.apollo.fireflies; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.salesforce.apollo.archipelago.RouterImpl; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.JohnHancock; @@ -56,24 +54,23 @@ public void smokin() throws Exception { Digeste testDigest = DigestAlgorithm.DEFAULT.getLast().toDigeste(); Entrance client = mock(Entrance.class); - SettableFuture ks = SettableFuture.create(); - ks.set(KeyState_.newBuilder().setDigest(testDigest).build()); + KeyState_ ks = KeyState_.newBuilder().setDigest(testDigest).build(); when(client.getKeyState(any(EventCoords.class))).then(new Answer<>() { @Override - public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { + public KeyState_ answer(InvocationOnMock invocation) throws Throwable { return ks; } }); when(client.getKeyState(any(IdentAndSeq.class))).then(new Answer<>() { @Override - public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { + public KeyState_ answer(InvocationOnMock invocation) throws Throwable { return ks; } }); when(client.getMember()).then(new Answer<>() { @Override public Member answer(InvocationOnMock invocation) throws Throwable { - return member; + return members.getLast(); } }); diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java index 5c87e3c7e..9bf3fb6fd 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/E2ETest.java @@ -57,7 +57,7 @@ public class E2ETest { "large_tests"); static { - CARDINALITY = largeTests ? 30 : 10; + CARDINALITY = largeTests ? 30 : 11; } private List communications = new ArrayList<>(); @@ -104,7 +104,7 @@ public void smokin() throws Exception { final var seeds = members.values() .stream() .map(m -> new Seed(m.getEvent().getCoordinates(), new InetSocketAddress(0))) - .limit(largeTests ? 100 : 10) + .limit(largeTests ? 10 : 1) .toList(); final var bootstrapSeed = seeds.subList(0, 1); diff --git a/fireflies/src/test/resources/logback-test.xml b/fireflies/src/test/resources/logback-test.xml index 4791e0131..e546542d6 100644 --- a/fireflies/src/test/resources/logback-test.xml +++ b/fireflies/src/test/resources/logback-test.xml @@ -2,11 +2,11 @@ - + - - %msg%n + %d{mm:ss.SSS} [%thread] %-5level %logger{0} - %msg%n @@ -14,39 +14,37 @@ ff.log false - - %d{mm:ss.SSS} - %msg%n - + %d{mm:ss.SSS} [%thread] %-5level %logger{0} - %msg%n - - - + + + - - - - + + + + - - - + + + - - - + + + - - - + + + - - - + + + - - + + diff --git a/grpc/src/main/proto/fireflies.proto b/grpc/src/main/proto/fireflies.proto index 8333b6bf4..a25827e1b 100644 --- a/grpc/src/main/proto/fireflies.proto +++ b/grpc/src/main/proto/fireflies.proto @@ -150,7 +150,11 @@ message Join { } message Gateway { + BootstrapTrust trust = 1; + repeated SignedNote initialSeedSet = 2; +} + +message BootstrapTrust { crypto.HexBloome diadem = 1; repeated SignedNote successors = 2; - repeated SignedNote initialSeedSet = 3; } diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java index a6a7ce0e0..2069dd696 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java @@ -47,15 +47,13 @@ public SliceIterator(String label, SigningMember member, List this.comm = comm; Entropy.secureShuffle(slice); this.currentIteration = slice.iterator(); - log.debug("Slice: {}", slice.stream().map(m -> m.getId()).toList()); + log.debug("Slice for: <{}> is: {} on: {}", label, slice.stream().map(m -> m.getId()).toList(), member.getId()); } public void iterate(BiFunction round, SlicePredicateHandler handler, Runnable onComplete, ScheduledExecutorService scheduler, Duration frequency) { - Thread.ofVirtual() - .factory() - .newThread(Utils.wrapped(() -> internalIterate(round, handler, onComplete, scheduler, frequency), log)) - .start(); + log.trace("Starting iteration of: <{}> on: {}", label, member.getId()); + internalIterate(round, handler, onComplete, scheduler, frequency); } public void iterate(BiFunction round, SlicePredicateHandler handler, @@ -70,16 +68,17 @@ private void internalIterate(BiFunction round, SlicePredica Consumer allowed = allow -> proceed(allow, proceed, onComplete, scheduler, frequency); try (Comm link = next()) { if (link == null) { + log.trace("No link for iteration of: <{}> on: {}", label, member.getId()); allowed.accept(handler.handle(Optional.empty(), link, slice.get(slice.size() - 1))); return; } - log.trace("Iteration on: {} index: {} to: {} on: {}", label, current.getId(), link.getMember(), - member.getId()); + log.trace("Iteration of: <{}> to: {} on: {}", label, link.getMember().getId(), member.getId()); T result = null; try { result = round.apply(link, link.getMember()); } catch (StatusRuntimeException e) { - log.trace("Error applying round", e); + log.trace("Error applying: <{}> slice to: {} on: {}", label, link.getMember().getId(), member.getId(), + e); } allowed.accept(handler.handle(Optional.ofNullable(result), link, link.getMember())); } catch (IOException e) { @@ -91,8 +90,8 @@ private Comm linkFor(Member m) { try { return comm.connect(m); } catch (Throwable e) { - log.error("error opening connection to {}: {}", m.getId(), - (e.getCause() != null ? e.getCause() : e).getMessage()); + log.error("error opening connection of: <{}> to {}: {} on: {}", label, m.getId(), + (e.getCause() != null ? e.getCause() : e).getMessage(), member.getId()); } return null; } @@ -108,19 +107,19 @@ private Comm next() { private void proceed(final boolean allow, Runnable proceed, Runnable onComplete, ScheduledExecutorService scheduler, Duration frequency) { - log.trace("Determining continuation for: {} final itr: {} allow: {} on: {}", label, !currentIteration.hasNext(), - allow, member.getId()); + log.trace("Determining continuation for: <{}> final itr: {} allow: {} on: {}", label, + !currentIteration.hasNext(), allow, member.getId()); if (!currentIteration.hasNext() && allow) { - log.trace("Final iteration of: {} on: {}", label, member.getId()); + log.trace("Final iteration of: <{}> on: {}", label, member.getId()); if (onComplete != null) { log.trace("Completing iteration for: {} on: {}", label, member.getId()); onComplete.run(); } } else if (allow) { - log.trace("Proceeding for: {} on: {}", label, member.getId()); + log.trace("Proceeding for: <{}> on: {}", label, member.getId()); scheduler.schedule(Utils.wrapped(proceed, log), frequency.toNanos(), TimeUnit.NANOSECONDS); } else { - log.trace("Termination for: {} on: {}", label, member.getId()); + log.trace("Termination for: <{}> on: {}", label, member.getId()); } } diff --git a/model/src/test/resources/logback-test.xml b/model/src/test/resources/logback-test.xml index 6c69bdf98..c760efb9f 100644 --- a/model/src/test/resources/logback-test.xml +++ b/model/src/test/resources/logback-test.xml @@ -45,7 +45,7 @@ - + diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/EventValidation.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/EventValidation.java index e3429aa62..7929e4e25 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/EventValidation.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/EventValidation.java @@ -64,6 +64,45 @@ public boolean verify(EventCoordinates coordinates, SigningThreshold threshold, } }; + EventValidation NO_VALIDATION = new EventValidation() { + @Override + public Filtered filtered(EventCoordinates coordinates, SigningThreshold threshold, JohnHancock signature, + InputStream message) { + return null; + } + + @Override + public Optional getKeyState(EventCoordinates coordinates) { + return Optional.empty(); + } + + @Override + public Optional getKeyState(Identifier identifier, ULong seqNum) { + return Optional.empty(); + } + + @Override + public boolean validate(EstablishmentEvent event) { + return false; + } + + @Override + public boolean validate(EventCoordinates coordinates) { + return false; + } + + @Override + public boolean verify(EventCoordinates coordinates, JohnHancock signature, InputStream message) { + return false; + } + + @Override + public boolean verify(EventCoordinates coordinates, SigningThreshold threshold, JohnHancock signature, + InputStream message) { + return false; + } + }; + Filtered filtered(EventCoordinates coordinates, SigningThreshold threshold, JohnHancock signature, InputStream message);