From 975d644b48169f3e551c8c534e98144e4b939ee7 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Mon, 27 May 2024 14:46:15 -0700 Subject: [PATCH] better logging. bootstrap sampling via successor cuts of ring 0 iteration. checkpoint assembly gossip of reconfiguration committee members only. Some general tidying up --- .../com/salesforce/apollo/choam/CHOAM.java | 14 +- .../com/salesforce/apollo/choam/Producer.java | 10 +- .../salesforce/apollo/choam/ViewAssembly.java | 18 +-- .../salesforce/apollo/choam/ViewContext.java | 8 +- .../apollo/choam/support/Bootstrapper.java | 66 ++++++---- .../choam/support/CheckpointAssembler.java | 30 +++-- .../support/CheckpointAssemblerTest.java | 8 +- choam/src/test/resources/logback-test.xml | 6 +- .../salesforce/apollo/fireflies/Binding.java | 24 ++-- .../com/salesforce/apollo/fireflies/View.java | 61 ++------- .../apollo/fireflies/ViewManagement.java | 23 ++-- fireflies/src/test/resources/logback-test.xml | 2 +- .../salesforce/apollo/context/Context.java | 7 + .../apollo/context/DelegatedContext.java | 5 + .../apollo/context/DynamicContextImpl.java | 7 + .../com/salesforce/apollo/context/Search.java | 122 ++++++++++++++++++ .../apollo/context/StaticContext.java | 19 ++- .../apollo/context/StaticSearch.java | 120 +++++++++++++++++ .../salesforce/apollo/ring/SliceIterator.java | 8 +- 19 files changed, 415 insertions(+), 143 deletions(-) create mode 100644 memberships/src/main/java/com/salesforce/apollo/context/Search.java create mode 100644 memberships/src/main/java/com/salesforce/apollo/context/StaticSearch.java diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index d0bf95adf..605c9314d 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -880,9 +880,6 @@ private SubmitResult submit(Transaction request, Digest from) { } private Initial sync(Synchronize request, Digest from) { - if (from == null) { - return Initial.getDefaultInstance(); - } final HashedCertifiedBlock g = genesis.get(); if (g != null) { Initial.Builder initial = Initial.newBuilder(); @@ -897,13 +894,10 @@ private Initial sync(Synchronize request, Digest from) { } final ULong lastReconfig = ULong.valueOf(cp.block.getHeader().getLastReconfig()); HashedCertifiedBlock lastView = null; - if (lastReconfig.equals(ULong.valueOf(0))) { - lastView = cp; - } else { - var stored = store.getCertifiedBlock(lastReconfig); - if (stored != null) { - lastView = new HashedCertifiedBlock(params.digestAlgorithm(), stored); - } + + var stored = store.getCertifiedBlock(lastReconfig); + if (stored != null) { + lastView = new HashedCertifiedBlock(params.digestAlgorithm(), stored); } if (lastView == null) { lastView = g; diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 28519e234..ff1497af1 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -79,8 +79,8 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash producerParams.batchInterval(), producerParams.maxBatchCount(), params().drainPolicy().build()); - log.info("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch, - params.member().getId()); + log.debug("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch, + params.member().getId()); var fsm = Fsm.construct(new DriveIn(), Transitions.class, Earner.INITIAL, true); fsm.setName("Producer%s on: %s".formatted(getViewId(), params.member().getId())); @@ -343,9 +343,9 @@ private void reconfigure() { pending.put(reconfiguration.hash, p); p.witnesses.put(params().member(), validation); ds.offer(validation); - log.info("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(), - reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(), - params().member().getId()); + log.trace("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(), + reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(), + params().member().getId()); processPendingValidations(reconfiguration, p); log.trace("Draining on: {}", params().member().getId()); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index 33d536491..baf5250ee 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -107,7 +107,7 @@ void assemble(List asses) { .toList(); var views = asses.stream().flatMap(a -> a.getViewsList().stream()).filter(SignedViews::hasViews).toList(); - log.info("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId()); + log.debug("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId()); joins.forEach(sj -> join(sj.getJoin(), false)); if (selected != null) { @@ -124,7 +124,7 @@ void assemble(List asses) { Digest.from(svs.getViews().getMember()), params().member().getId()); viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews()); } else { - log.info("Invalid views: {} from: {} on: {}", + log.warn("Invalid views: {} from: {} on: {}", svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), Digest.from(svs.getViews().getMember()), params().member().getId()); } @@ -140,14 +140,14 @@ void assemble(List asses) { boolean complete() { if (selected == null) { - log.info("Cannot complete view assembly: {} as selected is null on: {}", nextViewId, - params().member().getId()); + log.error("Cannot complete view assembly: {} as selected is null on: {}", nextViewId, + params().member().getId()); transitions.failed(); return false; } if (proposals.size() < selected.majority) { - log.info("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId, - proposals.keySet().stream().sorted().toList(), selected.majority, params().member().getId()); + log.error("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId, + proposals.keySet().stream().sorted().toList(), selected.majority, params().member().getId()); transitions.failed(); return false; } @@ -290,9 +290,9 @@ private void propose() { .setMember(params().member().getId().toDigeste()) .setVid(nextViewId.toDigeste()) .build(); - log.info("Proposing for: {} views: {} on: {}", nextViewId, - views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), - params().member().getId()); + log.debug("Proposing for: {} views: {} on: {}", nextViewId, + views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + params().member().getId()); publisher.accept(Assemblies.newBuilder() .addViews( SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig())) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java index a9678206d..a88d3e692 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java @@ -244,16 +244,16 @@ public boolean validate(SignedViews sv) { Verifier v = verifierOf(sv); if (v == null) { if (log.isDebugEnabled()) { - log.debug("no verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()), - params.member().getId()); + log.warn("No verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()), + params.member().getId()); } return false; } var validated = v.verify(JohnHancock.from(sv.getSignature()), sv.getViews().toByteString()); if (!validated) { if (log.isTraceEnabled()) { - log.trace("Cannot validate views signed by: {} on: {}", Digest.from(sv.getViews().getMember()), - params().member().getId()); + log.warn("Cannot validate views signed by: {} on: {}", Digest.from(sv.getViews().getMember()), + params().member().getId()); } } else if (log.isTraceEnabled()) { log.trace("Validated views signed by: {} on: {}", Digest.from(sv.getViews().getMember()), diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java index 60ccc78d3..4e989392f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java @@ -36,6 +36,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -43,22 +44,23 @@ * @author hal.hildebrand */ public class Bootstrapper { - private static final Logger log = LoggerFactory.getLogger( - Bootstrapper.class); - private final HashedCertifiedBlock anchor; - private final CompletableFuture anchorSynchronized = new CompletableFuture<>(); - private final CommonCommunications comms; - private final ULong lastCheckpoint; - private final Parameters params; - private final Store store; - private final CompletableFuture sync = new CompletableFuture<>(); - private final CompletableFuture viewChainSynchronized = new CompletableFuture<>(); - private final ScheduledExecutorService scheduler; - private volatile HashedCertifiedBlock checkpoint; - private volatile CompletableFuture checkpointAssembled; - private volatile CheckpointState checkpointState; - private volatile HashedCertifiedBlock checkpointView; - private volatile HashedCertifiedBlock genesis; + private static final Logger log = LoggerFactory.getLogger(Bootstrapper.class); + + private final HashedCertifiedBlock anchor; + private final CompletableFuture anchorSynchronized = new CompletableFuture<>(); + private final CommonCommunications comms; + private final ULong lastCheckpoint; + private final Parameters params; + private final Store store; + private final CompletableFuture sync = new CompletableFuture<>(); + private final CompletableFuture viewChainSynchronized = new CompletableFuture<>(); + private final ScheduledExecutorService scheduler; + private final AtomicInteger sampleIndex = new AtomicInteger(); + private volatile HashedCertifiedBlock checkpoint; + private volatile CompletableFuture checkpointAssembled; + private volatile CheckpointState checkpointState; + private volatile HashedCertifiedBlock checkpointView; + private volatile HashedCertifiedBlock genesis; public Bootstrapper(HashedCertifiedBlock anchor, Parameters params, Store store, CommonCommunications bootstrapComm) { @@ -122,8 +124,16 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { log.info("Assembling from checkpoint: {}:{} crown: {} last cp: {} on: {}", checkpoint.height(), checkpoint.hash, crown.compactWrapped(), Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()), params.member().getId()); - - CheckpointAssembler assembler = new CheckpointAssembler(params.gossipDuration(), checkpoint.height(), + var committee = checkpointView.certifiedBlock.getBlock() + .getReconfigure() + .getJoinsList() + .stream() + .map(j -> j.getMember().getVm().getId()) + .map(Digest::from) + .map(d -> params.context().getMember(d)) + .filter(m -> m != null) + .toList(); + CheckpointAssembler assembler = new CheckpointAssembler(committee, params.gossipDuration(), checkpoint.height(), checkpoint.block.getCheckpoint(), params.member(), store, comms, params.context(), threshold, params.digestAlgorithm()); @@ -366,11 +376,21 @@ private void sample() { } HashMap votes = new HashMap<>(); Synchronize s = Synchronize.newBuilder().setHeight(anchor.height().longValue()).build(); - final var randomCut = params.digestAlgorithm().random(); - new RingIterator<>(params.gossipDuration(), params.context(), params.member(), comms, true, scheduler).iterate( - randomCut, (link, ring) -> synchronize(s, link), - (tally, futureSailor, destination) -> synchronize(futureSailor, votes, destination), - t -> computeGenesis(votes)); + var si = sampleIndex.getAndIncrement(); + var member = params.context().getMember(si, Entropy.nextBitsStreamInt(params.context().getRingCount())); + if (member == null) { + log.warn("No members: {} to sample on: {}", params.context().size(), params.member().getId()); + computeGenesis(votes); + return; + } + var randomCut = member.getId(); + log.info("Random cut: {} on: {}", randomCut, params.member().getId()); + var iterator = new RingIterator(params.gossipDuration(), params.context(), params.member(), + comms, true, scheduler); + iterator.allowDuplicates(); + iterator.iterate(randomCut, (link, _) -> synchronize(s, link), + (_, futureSailor, destination) -> synchronize(futureSailor, votes, destination), + t -> computeGenesis(votes)); } private void scheduleAnchorCompletion(AtomicReference start, ULong anchorTo) { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java index 809d764fc..e382e8f5b 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java @@ -19,7 +19,7 @@ import com.salesforce.apollo.cryptography.HexBloom; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; -import com.salesforce.apollo.ring.RingIterator; +import com.salesforce.apollo.ring.SliceIterator; import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; import org.h2.mvstore.MVMap; @@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -51,10 +53,12 @@ public class CheckpointAssembler { private final SigningMember member; private final MVMap state; private final HexBloom diadem; + private final List committee; - public CheckpointAssembler(Duration frequency, ULong height, Checkpoint checkpoint, SigningMember member, - Store store, CommonCommunications comms, Context context, - double falsePositiveRate, DigestAlgorithm digestAlgorithm) { + public CheckpointAssembler(List committee, Duration frequency, ULong height, Checkpoint checkpoint, + SigningMember member, Store store, CommonCommunications comms, + Context context, double falsePositiveRate, DigestAlgorithm digestAlgorithm) { + this.committee = new ArrayList<>(committee); this.height = height; this.member = member; this.checkpoint = checkpoint; @@ -112,11 +116,19 @@ private void gossip(ScheduledExecutorService scheduler, Duration duration) { } log.info("Assembly of checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem.compactWrapped(), member.getId()); - var ringer = new RingIterator<>(frequency, context, member, comms, true, scheduler); - ringer.iterate(digestAlgorithm.random(), (link, ring) -> gossip(link), - (tally, result, destination) -> gossip(result), t -> scheduler.schedule( - () -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(scheduler, duration), log)), duration.toMillis(), - TimeUnit.MILLISECONDS)); + + var ringer = new SliceIterator<>("Assembly[%s:%s]".formatted(diadem.compactWrapped(), member.getId()), member, + committee, comms); + ringer.iterate((link, m) -> { + log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId()); + return gossip(link); + }, (result, link, m) -> gossip(result), () -> { + if (!assembled.isDone()) { + scheduler.schedule( + () -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(scheduler, duration), log)), + duration.toMillis(), TimeUnit.MILLISECONDS); + } + }, duration); } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/support/CheckpointAssemblerTest.java b/choam/src/test/java/com/salesforce/apollo/choam/support/CheckpointAssemblerTest.java index 6a0e2199e..8a1f95dcf 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/support/CheckpointAssemblerTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/support/CheckpointAssemblerTest.java @@ -125,12 +125,18 @@ public CheckpointSegments answer(InvocationOnMock invocation) throws Throwable { return CheckpointSegments.newBuilder().addAllSegments(fetched).build(); } }); + when(client.getMember()).then(new Answer<>() { + @Override + public Member answer(InvocationOnMock invocation) { + return members.get(1); + } + }); @SuppressWarnings("unchecked") CommonCommunications comm = mock(CommonCommunications.class); when(comm.connect(any())).thenReturn(client); Store store2 = new Store(DigestAlgorithm.DEFAULT, new MVStore.Builder().open()); - CheckpointAssembler boot = new CheckpointAssembler(Duration.ofMillis(10), ULong.valueOf(0), checkpoint, + CheckpointAssembler boot = new CheckpointAssembler(members, Duration.ofMillis(10), ULong.valueOf(0), checkpoint, bootstrapping, store2, comm, context, 0.00125, DigestAlgorithm.DEFAULT); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml index 830a6923d..d5be6a052 100644 --- a/choam/src/test/resources/logback-test.xml +++ b/choam/src/test/resources/logback-test.xml @@ -25,6 +25,10 @@ + + + + @@ -33,7 +37,7 @@ - + diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 4e107d8a9..33201391b 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -183,13 +183,13 @@ private boolean completeGateway(Participant member, CompletableFuture gat private void gatewaySRE(Digest v, Entrance link, StatusRuntimeException sre, AtomicInteger abandon) { switch (sre.getStatus().getCode()) { case OUT_OF_RANGE -> { - log.info("Gateway view: {} invalid: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), - node.getId()); + log.debug("Gateway view: {} invalid: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), + node.getId()); abandon.incrementAndGet(); } case FAILED_PRECONDITION -> { - log.info("Gateway view: {} unavailable: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), - node.getId()); + log.trace("Gateway view: {} unavailable: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), + node.getId()); abandon.incrementAndGet(); } case PERMISSION_DENIED -> { @@ -198,8 +198,8 @@ private void gatewaySRE(Digest v, Entrance link, StatusRuntimeException sre, Ato abandon.incrementAndGet(); } case RESOURCE_EXHAUSTED -> { - log.info("Gateway view: {} full: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), - node.getId()); + log.debug("Gateway view: {} full: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), + node.getId()); abandon.incrementAndGet(); } default -> log.info("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(), @@ -224,8 +224,8 @@ private Join join(Digest v) { Thread.ofVirtual().start(Utils.wrapped(() -> { var view = Digest.from(r.getView()); - log.info("Rebalancing to cardinality: {} (validate) for: {} context: {} on: {}", r.getCardinality(), - view, context.getId(), node.getId()); + log.debug("Rebalancing to cardinality: {} (validate) for: {} context: {} on: {}", r.getCardinality(), + view, context.getId(), node.getId()); this.context.rebalance(r.getCardinality()); node.nextNote(view); @@ -259,8 +259,8 @@ private void join(Redirect redirect, Digest v, Duration duration) { final var cardinality = redirect.getCardinality(); - log.info("Rebalancing to cardinality: {} (join) for: {} context: {} on: {}", cardinality, v, context.getId(), - node.getId()); + log.debug("Rebalancing to cardinality: {} (join) for: {} context: {} on: {}", cardinality, v, context.getId(), + node.getId()); this.context.rebalance(cardinality); node.nextNote(v); @@ -281,7 +281,7 @@ private void join(Redirect redirect, Digest v, Duration duration) { try { var g = link.join(join, params.seedingTimeout()); if (g == null || g.equals(Gateway.getDefaultInstance())) { - log.info("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId()); + log.debug("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId()); abandon.incrementAndGet(); return null; } @@ -301,7 +301,7 @@ private void join(Redirect redirect, Digest v, Duration duration) { return; } if (abandon.get() >= majority) { - log.info("Abandoning Gateway view: {} reseeding on: {}", v, node.getId()); + log.debug("Abandoning Gateway view: {} reseeding on: {}", v, node.getId()); seeding(); } else { abandon.set(0); diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index af5f58d1b..825e5442b 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -616,7 +616,6 @@ protected Gossip gossip(Fireflies link, int ring) { .setRing(ring) .setGossip(commonDigests()) .build()); - log.trace("gossiping with: {} on: {}", link.getMember().getId(), node.getId()); try { return link.gossip(gossip); } catch (Throwable e) { @@ -1301,44 +1300,6 @@ private void recover(Participant member) { } } - /** - * Redirect the member to the successor from this view's perspective - * - * @param member - * @param ring - * @param successor - * @param digests - * @return the Gossip containing the successor's Identity and Note from this view - */ - private Gossip redirectTo(Participant member, int ring, Participant successor, Digests digests) { - assert member != null; - assert successor != null; - if (successor.getNote() == null) { - log.debug("Cannot redirect: {} to: {} on ring: {} as note is null on: {}", member.getId(), - successor.getId(), ring, node.getId()); - return Gossip.getDefaultInstance(); - } - - var identity = successor.getNote(); - if (identity == null) { - log.debug("Cannot redirect: {} to: {} on ring: {} as note is null on: {}", member.getId(), - successor.getId(), ring, node.getId()); - return Gossip.getDefaultInstance(); - } - var gossip = Gossip.newBuilder() - .setRedirect(successor.getNote().getWrapped()) - .setNotes(processNotes(BloomFilter.from(digests.getNoteBff()))) - .setAccusations(processAccusations(BloomFilter.from(digests.getAccusationBff()))) - .setObservations(processObservations(BloomFilter.from(digests.getObservationBff()))) - .setJoins(viewManagement.processJoins(BloomFilter.from(digests.getJoinBiff()))) - .build(); - log.trace("Redirect: {} to: {} on ring: {} notes: {} acc: {} obv: {} joins: {} on: {}", member.getId(), - successor.getId(), ring, gossip.getNotes().getUpdatesCount(), - gossip.getAccusations().getUpdatesCount(), gossip.getObservations().getUpdatesCount(), - gossip.getJoins().getUpdatesCount(), node.getId()); - return gossip; - } - /** * Process the gossip response, providing the updates requested by the the other member and processing the updates * provided by the other member @@ -1900,13 +1861,13 @@ public Gossip rumors(SayWhat request, Digest from) { "Not introduced!, ring: %s from: %s on: %s".formatted(request.getRing(), from, node.getId()))); } return stable(() -> { - validate(from, request); final var ring = request.getRing(); if (!context.validRing(ring)) { log.debug("invalid ring: {} from: {} on: {}", ring, from, node.getId()); throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription( "invalid ring: %s from: %s on: %s".formatted(ring, from, node.getId()))); } + validate(from, request); Participant member = context.getActiveMember(from); if (member == null) { @@ -1927,21 +1888,19 @@ public Gossip rumors(SayWhat request, Digest from) { } Gossip g; + var builder = Gossip.newBuilder(); final var digests = request.getGossip(); if (!successor.equals(node)) { - g = redirectTo(member, ring, successor, digests); + builder.setRedirect(successor.getNote().getWrapped()); log.debug("Redirected: {} on: {}", member.getId(), node.getId()); - } else { - g = Gossip.newBuilder() - .setNotes(processNotes(from, BloomFilter.from(digests.getNoteBff()), params.fpr())) - .setAccusations( - processAccusations(BloomFilter.from(digests.getAccusationBff()), params.fpr())) - .setObservations( - processObservations(BloomFilter.from(digests.getObservationBff()), params.fpr())) - .setJoins( - viewManagement.processJoins(BloomFilter.from(digests.getJoinBiff()), params.fpr())) - .build(); } + g = builder.setNotes(processNotes(from, BloomFilter.from(digests.getNoteBff()), params.fpr())) + .setAccusations( + processAccusations(BloomFilter.from(digests.getAccusationBff()), params.fpr())) + .setObservations( + processObservations(BloomFilter.from(digests.getObservationBff()), params.fpr())) + .setJoins(viewManagement.processJoins(BloomFilter.from(digests.getJoinBiff()), params.fpr())) + .build(); if (g.getNotes().getUpdatesCount() + g.getAccusations().getUpdatesCount() + g.getObservations() .getUpdatesCount() + g.getJoins().getUpdatesCount() != 0) { diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 5a62ea8d0..e65d036d1 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -250,8 +250,8 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time } var note = new NoteWrapper(join.getNote(), digestAlgo); if (!from.equals(note.getId())) { - log.info("Ignored join of view: {} from: {} does not match: {} on: {}", joinView, from, note.getId(), - node.getId()); + log.debug("Ignored join of view: {} from: {} does not match: {} on: {}", joinView, from, note.getId(), + node.getId()); responseObserver.onError( new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Member does not match note"))); return; @@ -259,7 +259,7 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time if (!view.validate(note.getIdentifier())) { responseObserver.onError( new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Invalid identifier"))); - log.info("Ignored join of view: {} from: {} invalid identifier on: {}", joinView, from, node.getId()); + log.debug("Ignored join of view: {} from: {} invalid identifier on: {}", joinView, from, node.getId()); return; } view.stable(() -> { @@ -312,8 +312,8 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time view.viewChange(() -> { final var hex = bound.view(); - log.info("Rebalancing to cardinality: {} (join) for: {} context: {} on: {}", hex.getCardinality(), - hex.compactWrapped(), context.getId(), node.getId()); + log.debug("Rebalancing to cardinality: {} (join) for: {} context: {} on: {}", hex.getCardinality(), + hex.compactWrapped(), context.getId(), node.getId()); context.rebalance(hex.getCardinality()); context.activate(node); diadem.set(hex); @@ -333,8 +333,9 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time } view.introduced(); - log.info("Currently joining view: {} seeds: {} cardinality: {} count: {} on: {}", currentView.get(), - bound.successors().size(), cardinality(), context.totalCount(), node.getId()); + log.debug("Currently joining view: {} seeds: {} cardinality: {} count: {} on: {}", + currentView.get(), bound.successors().size(), cardinality(), context.totalCount(), + node.getId()); if (context.totalCount() == cardinality()) { join(); } else { @@ -466,8 +467,8 @@ Redirect seed(Registration registration, Digest from) { var newMember = view.new Participant(note.getId()); final var sample = context.sample(params.maximumTxfr(), Entropy.bitsStream(), (Digest) null); - log.info("Member seeding: {} view: {} context: {} sample: {} on: {}", newMember.getId(), currentView(), - context.getId(), sample.size(), node.getId()); + log.debug("Member seeding: {} view: {} context: {} sample: {} on: {}", newMember.getId(), currentView(), + context.getId(), sample.size(), node.getId()); return Redirect.newBuilder() .setView(currentView().toDigeste()) .addAllSample( @@ -518,8 +519,8 @@ private void initiateViewChange() { .setSignature(signature.toSig()) .build(); view.initiate(viewChange); - log.info("View change vote: {} joins: {} leaves: {} on: {}", currentView(), change.getJoinsCount(), - change.getLeavesCount(), node.getId()); + log.debug("View change vote: {} joins: {} leaves: {} on: {}", currentView(), change.getJoinsCount(), + change.getLeavesCount(), node.getId()); }); } diff --git a/fireflies/src/test/resources/logback-test.xml b/fireflies/src/test/resources/logback-test.xml index 603d038bc..f86c9c940 100644 --- a/fireflies/src/test/resources/logback-test.xml +++ b/fireflies/src/test/resources/logback-test.xml @@ -14,7 +14,7 @@ - + diff --git a/memberships/src/main/java/com/salesforce/apollo/context/Context.java b/memberships/src/main/java/com/salesforce/apollo/context/Context.java index ed43d7fa0..c93f70024 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/Context.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/Context.java @@ -208,6 +208,13 @@ default int diameter() { */ T getMember(Digest memberID); + /** + * @param i + * @param ring + * @return the i'th Member in Ring 0 of the receiver + */ + T getMember(int i, int ring); + /** * Answer the probability {0, 1} that any given member is byzantine */ diff --git a/memberships/src/main/java/com/salesforce/apollo/context/DelegatedContext.java b/memberships/src/main/java/com/salesforce/apollo/context/DelegatedContext.java index d0265f05c..53195c04c 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/DelegatedContext.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/DelegatedContext.java @@ -94,6 +94,11 @@ public T getMember(Digest memberID) { return delegate.getMember(memberID); } + @Override + public T getMember(int i, int ring) { + return delegate.getMember(i, ring); + } + @Override public double getProbabilityByzantine() { return delegate.getProbabilityByzantine(); diff --git a/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java b/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java index dce34e277..953900b2b 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java @@ -258,6 +258,13 @@ public T getMember(Digest memberID) { return tracked == null ? null : tracked.member(); } + @Override + public T getMember(int i, int r) { + i = i % size(); + var ring = ring(i); + return (T) ring.get(i); + } + @Override public Collection getOffline() { return members.values().stream().filter(e -> !e.isActive()).map(Tracked::member).toList(); diff --git a/memberships/src/main/java/com/salesforce/apollo/context/Search.java b/memberships/src/main/java/com/salesforce/apollo/context/Search.java new file mode 100644 index 000000000..11c3ec672 --- /dev/null +++ b/memberships/src/main/java/com/salesforce/apollo/context/Search.java @@ -0,0 +1,122 @@ +package com.salesforce.apollo.context; + +import com.salesforce.apollo.utils.Pair; + +import java.util.Comparator; +import java.util.List; + +/** + * Search used to search in an ordered collection of Vector of type T comparisons are done using K which can be + * extracted from T + * + * @param vector type + * @param the key used for sorting to extract from T and to compare + * @author lindenb + */ +public abstract class Search { + private final Comparator comparator; + + /** + * Search + * + * @param comparator used to compare two keys of type K + */ + public Search(Comparator comparator) { + this.comparator = comparator; + } + + /** C+ equals range */ + public Pair equal_range(List dataVector, K select) { + return equal_range(dataVector, 0, dataVector.size(), select); + } + + /** + * C+ equals range + * + * @param bounds array of two integers [begin; end] + * @param select the value to search + */ + public Pair equal_range(List dataVector, Pair bounds, K select) { + return equal_range(dataVector, bounds.a(), bounds.b(), select); + } + + /** C+ equals range */ + public Pair equal_range(List dataVector, int first, int last, K subject) { + int left = lower_bound(dataVector, first, last, subject); + int right = upper_bound(dataVector, left, last, subject); + return new Pair(left, right); + } + + /** @return the internal comparator */ + public Comparator getComparator() { + return this.comparator; + } + + /** method used to extract the key(K) from an object (T) */ + public abstract K getKey(T value); + + /** @return True if the vector is sorted */ + public boolean isSorted(List dataVector) { + return isSorted(dataVector, 0, dataVector.size()); + } + + /** @return True if the vector is sorted between begin and end */ + public boolean isSorted(List dataVector, int begin, int end) { + while (begin + 1 < end) { + if (this.comparator.compare(getKey(dataVector.get(begin)), getKey(dataVector.get(begin + 1))) > 0) { + return false; + } + ++begin; + } + return true; + } + + /** C+ lower_bound */ + public int lower_bound(List dataVector, K select) { + return lower_bound(dataVector, 0, dataVector.size(), select); + } + + /** C+ lower_bound */ + public int lower_bound(List dataVector, int first, int last, K select) { + int len = last - first; + while (len > 0) { + int half = len / 2; + int middle = first + half; + T x = dataVector.get(middle); + if (this.comparator.compare(getKey(x), select) < 0) { + first = middle + 1; + len -= half + 1; + } else { + len = half; + } + } + return first; + } + + @Override + public String toString() { + return "Algorithm(" + this.comparator + ")"; + } + + /** C+ upper_bound */ + public int upper_bound(List dataVector, K select) { + return upper_bound(dataVector, 0, dataVector.size(), select); + } + + /** C+ upper_bound */ + public int upper_bound(List dataVector, int first, int last, K select) { + int len = last - first; + while (len > 0) { + int half = len / 2; + int middle = first + half; + T x = dataVector.get(middle); + if (this.comparator.compare(select, getKey(x)) < 0) { + len = half; + } else { + first = middle + 1; + len -= half + 1; + } + } + return first; + } +} diff --git a/memberships/src/main/java/com/salesforce/apollo/context/StaticContext.java b/memberships/src/main/java/com/salesforce/apollo/context/StaticContext.java index b88083b54..533ab59bc 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/StaticContext.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/StaticContext.java @@ -140,6 +140,13 @@ public T getMember(Digest memberID) { return index >= 0 && (members[index].member.getId().equals(memberID)) ? members[index].member : null; } + @Override + public T getMember(int i, int r) { + i = i % size(); + var ring = new StaticRing(r); + return ring.get(i); + } + @Override public double getProbabilityByzantine() { return pByz; @@ -461,7 +468,7 @@ private Digest[] hashesFor(T m) { } private void initialize(Collection members) { - record ringMapping(Tracked m, short i) { + record ringMapping(Tracked m, int i) { } { var i = 0; @@ -472,7 +479,7 @@ record ringMapping(Tracked m, short i) { Arrays.sort(this.members, Comparator.comparing(t -> t.member)); for (int j = 0; j < rings.length; j++) { var mapped = new TreeMap>(); - for (short i = 0; i < this.members.length; i++) { + for (var i = 0; i < this.members.length; i++) { var m = this.members[i]; mapped.put(Context.hashFor(id, j, m.member.getId()), new ringMapping<>(m, i)); } @@ -580,6 +587,10 @@ public T findSuccessor(T m, Function predicate) { return succ(hashFor(m), predicate); } + public T get(int i) { + return ring().get(i, members).member; + } + public Digest hashFor(Digest d) { return Context.hashFor(id, index, d); } @@ -692,8 +703,8 @@ public Stream stream() { @Override public Iterator iterator() { return new Iterator() { - private final Ring ring = ring(); - private int current = 0; + private final Ring ring = ring(); + private int current = 0; @Override public boolean hasNext() { diff --git a/memberships/src/main/java/com/salesforce/apollo/context/StaticSearch.java b/memberships/src/main/java/com/salesforce/apollo/context/StaticSearch.java new file mode 100644 index 000000000..de23a1568 --- /dev/null +++ b/memberships/src/main/java/com/salesforce/apollo/context/StaticSearch.java @@ -0,0 +1,120 @@ +package com.salesforce.apollo.context; + +import com.salesforce.apollo.utils.Pair; + +import java.util.Comparator; + +/** + * StaticSearch is used to search in an Array of type T comparisons are done using K which can be extracted from T + * + * @param vector type + * @param the key used for sorting to extract from T and to compare + * @author lindenb + */ +public abstract class StaticSearch { + private final Comparator comparator; + + /** + * Search + * + * @param comparator used to compare two keys of type K + */ + public StaticSearch(Comparator comparator) { + this.comparator = comparator; + } + + /** C+ equals range */ + public Pair equal_range(T[] dataVector, K select) { + return equal_range(dataVector, 0, dataVector.length, select); + } + + /** + * C+ equals range + * + * @param bounds array of two integers [begin; end] + * @param select the value to search + */ + public Pair equal_range(T[] dataVector, Pair bounds, K select) { + return equal_range(dataVector, bounds.a(), bounds.b(), select); + } + + /** C+ equals range */ + public Pair equal_range(T[] dataVector, int first, int last, K subject) { + int left = lower_bound(dataVector, first, last, subject); + int right = upper_bound(dataVector, left, last, subject); + return new Pair(left, right); + } + + /** @return the internal comparator */ + public Comparator getComparator() { + return this.comparator; + } + + /** method used to extract the key(K) from an object (T) */ + public abstract K getKey(T value); + + /** @return True if the vector is sorted */ + public boolean isSorted(T[] dataVector) { + return isSorted(dataVector, 0, dataVector.length); + } + + /** @return True if the vector is sorted between begin and end */ + public boolean isSorted(T[] dataVector, int begin, int end) { + while (begin + 1 < end) { + if (this.comparator.compare(getKey(dataVector[begin]), getKey(dataVector[begin + 1])) > 0) { + return false; + } + ++begin; + } + return true; + } + + /** C+ lower_bound */ + public int lower_bound(T[] dataVector, K select) { + return lower_bound(dataVector, 0, dataVector.length, select); + } + + /** C+ lower_bound */ + public int lower_bound(T[] dataVector, int first, int last, K select) { + int len = last - first; + while (len > 0) { + int half = len / 2; + int middle = first + half; + T x = dataVector[middle]; + if (this.comparator.compare(getKey(x), select) < 0) { + first = middle + 1; + len -= half + 1; + } else { + len = half; + } + } + return first; + } + + @Override + public String toString() { + return "Algorithm(" + this.comparator + ")"; + } + + /** C+ upper_bound */ + public int upper_bound(T[] dataVector, K select) { + return upper_bound(dataVector, 0, dataVector.length, select); + } + + /** C+ upper_bound */ + public int upper_bound(T[] dataVector, int first, int last, K select) { + int len = last - first; + while (len > 0) { + int half = len / 2; + int middle = first + half; + T x = dataVector[middle]; + if (this.comparator.compare(select, getKey(x)) < 0) { + len = half; + } else { + first = middle + 1; + len -= half + 1; + } + } + return first; + } +} diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java index a64b3b5a2..f4ebf020b 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java @@ -71,9 +71,10 @@ private void internalIterate(BiFunction round, SlicePredica Consumer allowed = allow -> proceed(allow, proceed, onComplete, frequency); try (Comm link = next()) { - if (link == null) { + if (link == null || link.getMember() == null) { log.trace("No link for iteration of: <{}> on: {}", label, member.getId()); - allowed.accept(handler.handle(Optional.empty(), link, slice.get(slice.size() - 1))); + allowed.accept( + handler.handle(Optional.empty(), link, slice.isEmpty() ? null : slice.get(slice.size() - 1))); return; } log.trace("Iteration of: <{}> to: {} on: {}", label, link.getMember().getId(), member.getId()); @@ -108,6 +109,9 @@ private Comm next() { Entropy.secureShuffle(slice); currentIteration = slice.iterator(); } + if (!currentIteration.hasNext()) { + return null; + } current = currentIteration.next(); return linkFor(current); }