diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 6f2733b6f..14365b314 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -135,31 +135,38 @@ public CHOAM(Parameters params) { session = new Session(params, service()); } - public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmentSize, Digest initialCrown) { + public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmentSize, Digest initial) { + assert segmentSize > 0 : "segment size must be > 0 : " + segmentSize; long length = 0; if (state != null) { length = state.length(); } - Checkpoint.Builder builder = Checkpoint.newBuilder().setByteSize(length).setSegmentSize(segmentSize); + int count = (int) (length / segmentSize); + if (length != 0 && count * segmentSize < length) { + count++; + } + var accumulator = new HexBloom.Accumulator(count, 2, initial); + Checkpoint.Builder builder = Checkpoint.newBuilder() + .setCount(count) + .setByteSize(length) + .setSegmentSize(segmentSize); + if (state != null) { byte[] buff = new byte[segmentSize]; try (FileInputStream fis = new FileInputStream(state)) { for (int read = fis.read(buff); read > 0; read = fis.read(buff)) { ByteString segment = ByteString.copyFrom(buff, 0, read); - builder.addSegments(algo.digest(segment).toDigeste()); + accumulator.add(algo.digest(segment)); } } catch (IOException e) { log.error("Invalid checkpoint!", e); return null; } } - - var crown = HexBloom.construct(builder.getSegmentsCount(), - builder.getSegmentsList().stream().map(d -> Digest.from(d)), initialCrown, 2) - .compactWrapped(); - log.info("Checkpoint length: {} segment size: {} count: {} crown: {}", length, segmentSize, - builder.getSegmentsCount(), crown); - return builder.setCrown(crown.toDigeste()).build(); + var crown = accumulator.build(); + log.info("Checkpoint length: {} segment size: {} count: {} crown: {} initial: {}", length, segmentSize, + builder.getCount(), crown, initial); + return builder.setCrown(crown.toHexBloome()).build(); } public static Block genesis(Digest id, Map joins, HashedBlock head, Context context, diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java index 9bf983bf3..add36dbc9 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java @@ -125,8 +125,9 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { store.put(checkpointView); assert !checkpointView.height() .equals(Unsigned.ulong(0)) : "Should not attempt when bootstrapping from genesis"; - log.info("Assembling from checkpoint: {}:{} on: {}", checkpoint.height(), checkpoint.hash, - params.member().getId()); + log.info("Assembling from checkpoint: {}:{} crown: {} last cp: {} on: {}", checkpoint.height(), checkpoint.hash, + HexBloom.from(checkpoint.block.getCheckpoint().getCrown()), + Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()), params.member().getId()); CheckpointAssembler assembler = new CheckpointAssembler(params.gossipDuration(), checkpoint.height(), checkpoint.block.getCheckpoint(), params.member(), @@ -135,7 +136,6 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { // assemble the checkpoint checkpointAssembled = assembler.assemble(scheduler, params.gossipDuration()).whenComplete((cps, t) -> { - validate(cps); log.info("Restored checkpoint: {} on: {}", checkpoint.height(), params.member().getId()); checkpointState = cps; }); @@ -150,22 +150,6 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { scheduleViewChainCompletion(new AtomicReference<>(checkpointView.height()), ULong.valueOf(0)); } - private void validate(CheckpointState cps) { - var crown = HexBloom.construct(cps.checkpoint.getSegmentsCount(), - cps.checkpoint.getSegmentsList().stream().map(d -> Digest.from(d)), - Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()), - params.crowns()); - var have = crown.compactWrapped(); - var expected = Digest.from(cps.checkpoint.getCrown()); - if (!have.equals(expected)) { - log.error("Invalid crown for checkpointed state have: {} expected: {} on: {}", have, expected, - params.member()); - throw new IllegalStateException( - "Invalid crown for checkpointed state have: %s expected: %s on: %s".formatted(have, expected, - params.member())); - } - } - private boolean completeAnchor(Optional futureSailor, AtomicReference start, ULong end, RingCommunications.Destination destination) { if (sync.isDone() || anchorSynchronized.isDone()) { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java index 1ec6633c8..deb601ca1 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java @@ -15,6 +15,7 @@ import com.salesforce.apollo.choam.comm.Terminal; import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.DigestAlgorithm; +import com.salesforce.apollo.crypto.HexBloom; import com.salesforce.apollo.membership.Context; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; @@ -26,8 +27,6 @@ import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -49,10 +48,10 @@ public class CheckpointAssembler { private final DigestAlgorithm digestAlgorithm; private final double fpr; private final Duration frequency; - private final List hashes = new ArrayList<>(); private final ULong height; private final SigningMember member; private final MVMap state; + private final HexBloom diadem; public CheckpointAssembler(Duration frequency, ULong height, Checkpoint checkpoint, SigningMember member, Store store, CommonCommunications comms, Context context, @@ -66,13 +65,13 @@ public CheckpointAssembler(Duration frequency, ULong height, Checkpoint checkpoi this.digestAlgorithm = digestAlgorithm; this.frequency = frequency; state = store.createCheckpoint(height); - checkpoint.getSegmentsList().stream().map(bs -> new Digest(bs)).forEach(hash -> hashes.add(hash)); + diadem = HexBloom.from(checkpoint.getCrown()); } public CompletableFuture assemble(ScheduledExecutorService scheduler, Duration duration) { - if (checkpoint.getSegmentsCount() == 0) { - log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getSegmentsCount(), - Digest.from(checkpoint.getCrown()), member.getId()); + if (checkpoint.getCount() == 0) { + log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem, + member.getId()); assembled.complete(new CheckpointState(checkpoint, state)); } else { gossip(scheduler, duration); @@ -82,8 +81,8 @@ public CompletableFuture assemble(ScheduledExecutorService sche private CheckpointReplication buildRequest() { long seed = Entropy.nextBitsStreamLong(); - BloomFilter segmentsBff = new BloomFilter.IntBloomFilter(seed, checkpoint.getSegmentsCount(), fpr); - IntStream.range(0, checkpoint.getSegmentsCount()).filter(i -> state.containsKey(i)).forEach(i -> { + BloomFilter segmentsBff = new BloomFilter.IntBloomFilter(seed, checkpoint.getCount(), fpr); + IntStream.range(0, checkpoint.getCount()).filter(i -> state.containsKey(i)).forEach(i -> { segmentsBff.add(i); }); return CheckpointReplication.newBuilder() @@ -98,8 +97,8 @@ private boolean gossip(Optional futureSailor) { } if (process(futureSailor.get())) { CheckpointState cs = new CheckpointState(checkpoint, state); - log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getSegmentsCount(), - Digest.from(checkpoint.getCrown()), member.getId()); + log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem, + member.getId()); assembled.complete(cs); return false; } @@ -110,8 +109,8 @@ private void gossip(ScheduledExecutorService scheduler, Duration duration) { if (assembled.isDone()) { return; } - log.info("Assembly of checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getSegmentsCount(), - Digest.from(checkpoint.getCrown()), member.getId()); + log.info("Assembly of checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem, + member.getId()); var ringer = new RingIterator<>(frequency, context, member, comms, true, scheduler); ringer.iterate(randomCut(digestAlgorithm), (link, ring) -> gossip(link), (tally, result, destination) -> gossip(result), @@ -132,13 +131,10 @@ private CheckpointSegments gossip(Terminal link) { private boolean process(CheckpointSegments segments) { segments.getSegmentsList().forEach(segment -> { Digest hash = digestAlgorithm.digest(segment.getBlock()); - int index = segment.getIndex(); - if (index >= 0 && index < hashes.size()) { - if (hash.equals(hashes.get(index))) { - state.computeIfAbsent(index, i -> segment.getBlock().toByteArray()); - } + if (diadem.contains(hash)) { + state.computeIfAbsent(segment.getIndex(), i -> segment.getBlock().toByteArray()); } }); - return state.size() == hashes.size(); + return state.size() == checkpoint.getCount(); } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointState.java b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointState.java index 57285e623..4bb2a7a51 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointState.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointState.java @@ -67,7 +67,7 @@ public void close() { public List fetchSegments(BloomFilter bff, int maxSegments) { List slices = new ArrayList<>(); - for (int i = 0; i < checkpoint.getSegmentsCount(); i++) { + for (int i = 0; i < checkpoint.getCount(); i++) { if (!bff.contains(i)) { slices.add(Slice.newBuilder().setIndex(i).setBlock(ByteString.copyFrom(state.get(i))).build()); if (slices.size() >= maxSegments) { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Store.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Store.java index 1e4f6d977..65ec6292f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Store.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Store.java @@ -6,45 +6,32 @@ */ package com.salesforce.apollo.choam.support; -import static com.salesforce.apollo.choam.support.HashedBlock.height; +import com.google.protobuf.InvalidProtocolBufferException; +import com.salesfoce.apollo.choam.proto.*; +import com.salesforce.apollo.bloomFilters.BloomFilter; +import com.salesforce.apollo.crypto.Digest; +import com.salesforce.apollo.crypto.DigestAlgorithm; +import org.h2.mvstore.MVMap; +import org.h2.mvstore.MVStore; +import org.joou.ULong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.StreamSupport; -import org.h2.mvstore.MVMap; -import org.h2.mvstore.MVStore; -import org.joou.ULong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.salesfoce.apollo.choam.proto.Block; -import com.salesfoce.apollo.choam.proto.Blocks; -import com.salesfoce.apollo.choam.proto.Certification; -import com.salesfoce.apollo.choam.proto.Certifications; -import com.salesfoce.apollo.choam.proto.CertifiedBlock; -import com.salesfoce.apollo.choam.proto.Checkpoint; -import com.salesforce.apollo.crypto.Digest; -import com.salesforce.apollo.crypto.DigestAlgorithm; -import com.salesforce.apollo.bloomFilters.BloomFilter; +import static com.salesforce.apollo.choam.support.HashedBlock.height; /** * Kind of a DAO for "nosql" block storage with MVStore from H2 * * @author hal.hildebrand - * */ public class Store { @@ -85,7 +72,7 @@ public byte[] block(ULong height) { public Iterator blocksFrom(ULong from, ULong to, int max) { return new Iterator<>() { ULong next; - int remaining = max; + int remaining = max; { next = from; @@ -155,8 +142,8 @@ public MVMap createCheckpoint(ULong blockHeight) { return blocks.store.openMap(String.format(CHECKPOINT_TEMPLATE, blockHeight)); } - public void fetchBlocks(BloomFilter blocksBff, Blocks.Builder replication, int max, ULong from, - ULong to) throws IllegalStateException { + public void fetchBlocks(BloomFilter blocksBff, Blocks.Builder replication, int max, ULong from, ULong to) + throws IllegalStateException { StreamSupport.stream(((Iterable) () -> blocksFrom(from, to, max)).spliterator(), false) .filter(s -> !blocksBff.contains(s)) .map(height -> getCertifiedBlock(height)) @@ -291,8 +278,8 @@ public MVMap putCheckpoint(ULong blockHeight, File state, Check } catch (IOException e) { throw new IllegalStateException("Error storing checkpoint " + blockHeight, e); } - assert cp.size() == checkpoint.getSegmentsCount() : "Invalid number of segments: " + cp.size() - + " should be: " + checkpoint.getSegmentsCount(); + assert cp.size() == checkpoint.getCount() : "Invalid number of segments: " + cp.size() + " should be: " + + checkpoint.getCount(); checkpoints.put(blockHeight, cp); return cp; }); @@ -321,8 +308,9 @@ public void validate(ULong from, ULong to) throws IllegalStateException { } else { Digest pointer = new Digest(current.block.getHeader().getPrevious()); if (!prevHash.get().equals(pointer)) { - throw new IllegalStateException(String.format("Invalid chain (%s, %s) block: %s has invalid previous hash: %s, expected: %s", - from, to, l, pointer, prevHash.get())); + throw new IllegalStateException( + String.format("Invalid chain (%s, %s) block: %s has invalid previous hash: %s, expected: %s", + from, to, l, pointer, prevHash.get())); } else { prevHash.set(current.hash); } @@ -349,8 +337,9 @@ public void validateViewChain(ULong from) throws IllegalStateException { next = ULong.valueOf(current.block.getHeader().getLastReconfig()); current = getBlock(next); } else { - throw new IllegalStateException(String.format("Invalid view chain (%s, %s) invalid: %s expected: %s have: %s", - from, 0, current.height(), pointer, current.hash)); + throw new IllegalStateException( + String.format("Invalid view chain (%s, %s) invalid: %s expected: %s have: %s", from, 0, + current.height(), pointer, current.hash)); } } } @@ -362,6 +351,7 @@ public long version() { public Iterator viewChainFrom(ULong from, ULong to) { return new Iterator<>() { ULong next; + { next = viewChain.get(from); if (!viewChain.containsKey(next)) { diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestChain.java b/choam/src/test/java/com/salesforce/apollo/choam/TestChain.java index 909dd2631..3d93b02ad 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestChain.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestChain.java @@ -9,8 +9,8 @@ import com.salesfoce.apollo.choam.proto.*; import com.salesforce.apollo.choam.support.HashedCertifiedBlock; import com.salesforce.apollo.choam.support.Store; -import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.DigestAlgorithm; +import org.slf4j.LoggerFactory; /** * @author hal.hildebrand @@ -32,6 +32,7 @@ public TestChain(Store store) { public TestChain anchor() { anchor = lastBlock; + LoggerFactory.getLogger(TestChain.class).debug("Anchor: {}", lastBlock.hash); return this; } @@ -126,11 +127,12 @@ private HashedCertifiedBlock checkpointBlock() { .setCheckpoint( CHOAM.checkpoint( DigestAlgorithm.DEFAULT, - null, 0, - DigestAlgorithm.DEFAULT.getOrigin())) + null, 1, + checkpoint.hash)) .build()) .build()); store.put(lastBlock); + LoggerFactory.getLogger(TestChain.class).debug("Checkpoint: {}", lastBlock.hash); return lastBlock; } @@ -161,6 +163,7 @@ private HashedCertifiedBlock reconfigureBlock() { .build()) .build()); store.put(lastBlock); + LoggerFactory.getLogger(TestChain.class).debug("Reconfigure: {}", lastBlock.hash); return lastBlock; } @@ -193,6 +196,7 @@ private HashedCertifiedBlock userBlock() { .build()); store.put(block); lastBlock = block; + LoggerFactory.getLogger(TestChain.class).debug("Executions: {}", lastBlock.hash); return lastBlock; } } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java b/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java index 962850763..754732bff 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java @@ -10,6 +10,7 @@ import com.salesfoce.apollo.choam.proto.Blocks; import com.salesfoce.apollo.choam.proto.Initial; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; +import com.salesforce.apollo.bloomFilters.BloomFilter; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.TestChain; @@ -25,7 +26,6 @@ import com.salesforce.apollo.stereotomy.StereotomyImpl; import com.salesforce.apollo.stereotomy.mem.MemKERL; import com.salesforce.apollo.stereotomy.mem.MemKeyStore; -import com.salesforce.apollo.bloomFilters.BloomFilter; import org.h2.mvstore.MVStore; import org.joou.ULong; import org.junit.jupiter.api.Test; diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml index de375794e..6b78912b1 100644 --- a/choam/src/test/resources/logback-test.xml +++ b/choam/src/test/resources/logback-test.xml @@ -8,7 +8,7 @@ %d{mm:ss.SSS} [%thread] %-5level %logger{0} - - %msg%n + - %msg%n @@ -24,6 +24,10 @@ + + + + diff --git a/cryptography/src/main/java/com/salesforce/apollo/crypto/HexBloom.java b/cryptography/src/main/java/com/salesforce/apollo/crypto/HexBloom.java index 81d6aa005..31b173690 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/crypto/HexBloom.java +++ b/cryptography/src/main/java/com/salesforce/apollo/crypto/HexBloom.java @@ -26,11 +26,11 @@ * @author hal.hildebrand */ public class HexBloom { - public static final double DEFAULT_FPR = 0.0001; public static final long DEFAULT_SEED = Primes.PRIMES[666]; private static final Function IDENTITY = d -> d; private static int MINIMUM_BFF_CARD = 100; + private final int cardinality; private final Digest[] crowns; private BloomFilter membership; @@ -107,8 +107,6 @@ public static HexBloom construct(int currentCount, Stream currentMembers * @param added - digests added that are not present in the currentMembership list * @param crowns - the current crown state corresponding to the currentMembership * @param removed - digests removed that are present in the currentMembership list - * @param hashes - the list of functions for computing the hash of a digest for a given crown - * @param fpr - desired false positive rate for membership bloomfilter * @return the HexBloom representing the new state */ public static HexBloom construct(int currentCount, Stream currentMembership, List added, @@ -489,4 +487,55 @@ public List wrappedCrowns() { public List wrappedCrowns(List> wrapingHash) { return IntStream.range(0, crowns.length).mapToObj(i -> wrapingHash.get(i).apply(crowns[i])).toList(); } + + public static class Accumulator { + private final List> accumulators; + private final int cardinality; + private final BloomFilter membership; + private final List> hashes; + private int currentCount = 0; + + public Accumulator(int cardinality, int crowns, Digest initial, double fpr) { + this(cardinality, hashes(crowns), initial, fpr); + } + + public Accumulator(int cardinality, List> crownHashes, Digest initial, double fpr) { + if (cardinality < 0) { + throw new IllegalArgumentException(("Cardinality must be >= 0")); + } + if (crownHashes == null || crownHashes.isEmpty()) { + throw new IllegalArgumentException("Crown hashes must not be null or empty"); + } + if (fpr <= 0) { + throw new IllegalArgumentException("False positive rate must be > 0"); + } + this.cardinality = cardinality; + this.hashes = crownHashes; + membership = new BloomFilter.DigestBloomFilter(DEFAULT_SEED, Math.max(MINIMUM_BFF_CARD, cardinality), fpr); + accumulators = IntStream.range(0, hashes.size()) + .mapToObj(i -> hashes.get(i).apply(initial)) + .map(d -> new AtomicReference<>(d)) + .toList(); + } + + public Accumulator(int cardinality, int crowns, Digest initial) { + this(cardinality, crowns, initial, DEFAULT_FPR); + } + + public void add(Digest digest) { + if (currentCount == cardinality) { + throw new IllegalArgumentException("Current count already equal to cardinality: " + cardinality); + } + currentCount++; + for (int i = 0; i < accumulators.size(); i++) { + accumulators.get(i).accumulateAndGet(hashes.get(i).apply(digest), (a, b) -> a.xor(b)); + } + membership.add(digest); + } + + public HexBloom build() { + assert currentCount == cardinality : "Did not add all members, missing: " + (cardinality - currentCount); + return new HexBloom(cardinality, accumulators.stream().map(ar -> ar.get()).toList(), membership); + } + } } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 1e3011aee..a9acb415c 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -6,37 +6,11 @@ */ package com.salesforce.apollo.fireflies; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.codahale.metrics.Timer; import com.google.common.base.Objects; -import com.salesfoce.apollo.fireflies.proto.Gateway; -import com.salesfoce.apollo.fireflies.proto.Join; -import com.salesfoce.apollo.fireflies.proto.JoinGossip; -import com.salesfoce.apollo.fireflies.proto.Redirect; -import com.salesfoce.apollo.fireflies.proto.Registration; -import com.salesfoce.apollo.fireflies.proto.SignedNote; -import com.salesfoce.apollo.fireflies.proto.SignedViewChange; +import com.salesfoce.apollo.fireflies.proto.*; import com.salesfoce.apollo.fireflies.proto.Update.Builder; -import com.salesfoce.apollo.fireflies.proto.ViewChange; +import com.salesforce.apollo.bloomFilters.BloomFilter; import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.DigestAlgorithm; import com.salesforce.apollo.crypto.HexBloom; @@ -47,70 +21,46 @@ import com.salesforce.apollo.membership.ReservoirSampler; import com.salesforce.apollo.stereotomy.EventCoordinates; import com.salesforce.apollo.utils.Entropy; -import com.salesforce.apollo.bloomFilters.BloomFilter; - import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; /** - * * Management of the view state logic * * @author hal.hildebrand - * */ public class ViewManagement { - record Ballot(Digest view, List leaving, List joining, int hash) { - - Ballot(Digest view, List leaving, List joining, DigestAlgorithm algo) { - this(view, leaving, joining, - view.xor(joining.stream().reduce((a, b) -> a.xor(b)).orElse(algo.getOrigin())) - .xor(leaving.stream() - .reduce((a, b) -> a.xor(b)) - .orElse(algo.getOrigin()) - .xor(joining.stream().reduce((a, b) -> a.xor(b)).orElse(algo.getOrigin()))) - .hashCode()); - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof Ballot b) { - return Objects.equal(view, b.view) && Objects.equal(leaving, b.leaving) && - Objects.equal(joining, b.joining); - } - return false; - } - - @Override - public int hashCode() { - return hash; - } - - @Override - public String toString() { - return String.format("{h: %s, j: %s, l: %s}", hash, joining.size(), leaving.size()); - } - } - private static final Logger log = LoggerFactory.getLogger(ViewManagement.class); - private final AtomicInteger attempt = new AtomicInteger(); - private boolean bootstrap; private final Digest bootstrapView; private final Context context; - private AtomicReference currentView = new AtomicReference<>(); - private AtomicReference diadem = new AtomicReference<>(); private final DigestAlgorithm digestAlgo; private final ConcurrentMap joins = new ConcurrentSkipListMap<>(); private final FireflyMetrics metrics; private final Node node; - private CompletableFuture onJoined; private final Parameters params; private final Map>> pendingJoins = new ConcurrentSkipListMap<>(); private final View view; private final AtomicReference vote = new AtomicReference<>(); - + private boolean bootstrap; + private AtomicReference currentView = new AtomicReference<>(); + private AtomicReference diadem = new AtomicReference<>(); + private CompletableFuture onJoined; ViewManagement(View view, Context context, Parameters params, FireflyMetrics metrics, Node node, DigestAlgorithm digestAlgo) { this.node = node; @@ -136,8 +86,8 @@ void bootstrap(NoteWrapper nw, final ScheduledExecutorService sched, final Durat context.activate(node); resetBootstrapView(); - view.viewChange(() -> install(new Ballot(currentView(), Collections.emptyList(), - Collections.singletonList(node.getId()), digestAlgo))); + view.viewChange(() -> install( + new Ballot(currentView(), Collections.emptyList(), Collections.singletonList(node.getId()), digestAlgo))); view.scheduleViewChange(); view.schedule(dur, sched); @@ -175,16 +125,15 @@ Digest currentView() { */ BloomFilter getJoinsBff(long seed, double p) { BloomFilter bff = new BloomFilter.DigestBloomFilter(seed, Math.max(params.minimumBiffCardinality(), - joins.size() * 2), - p); + joins.size() * 2), p); joins.keySet().forEach(d -> bff.add(d)); return bff; } /** * Install the new view - * - * @param view + * + * @param ballot */ void install(Ballot ballot) { // The circle of life @@ -214,8 +163,9 @@ void install(Ballot ballot) { .filter(p -> p != null) .toList(); - setDiadem(HexBloom.construct(context.memberCount(), context.allMembers().map(p -> p.getId()), - view.bootstrapView(), params.crowns())); + setDiadem( + HexBloom.construct(context.memberCount(), context.allMembers().map(p -> p.getId()), view.bootstrapView(), + params.crowns())); view.reset(); var seedSet = new ArrayList(); @@ -236,10 +186,10 @@ void install(Ballot ballot) { metrics.viewChanges().mark(); } - log.info("Installed view: {} from: {} crown: {} for context: {} cardinality: {} count: {} pending: {} leaving: {} joining: {} on: {}", - currentView.get(), previousView, diadem.get(), context.getId(), context.cardinality(), - context.allMembers().count(), pending.size(), ballot.leaving.size(), ballot.joining.size(), - node.getId()); + log.info( + "Installed view: {} from: {} crown: {} for context: {} cardinality: {} count: {} pending: {} leaving: {} joining: {} on: {}", + currentView.get(), previousView, diadem.get(), context.getId(), context.cardinality(), + context.allMembers().count(), pending.size(), ballot.leaving.size(), ballot.joining.size(), node.getId()); view.notifyListeners(joining, ballot.leaving); } @@ -249,8 +199,7 @@ boolean isJoined() { } /** - * Formally join the view. Calculate the HEX-BLOOM crown and view, fail and stop - * if does not match currentView + * Formally join the view. Calculate the HEX-BLOOM crown and view, fail and stop if does not match currentView */ synchronized void join() { assert context.totalCount() == context.cardinality(); @@ -293,7 +242,8 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time var thisView = currentView(); var note = new NoteWrapper(join.getNote(), digestAlgo); if (!from.equals(note.getId())) { - responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Member not match note"))); + responseObserver.onError( + new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Member not match note"))); return; } log.debug("Join requested from: {} view: {} context: {} cardinality: {} on: {}", from, thisView, @@ -305,17 +255,19 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time return; } if (!thisView.equals(joinView)) { - responseObserver.onError(new StatusRuntimeException(Status.OUT_OF_RANGE.withDescription("View: " - + joinView + " does not match: " + thisView))); + responseObserver.onError(new StatusRuntimeException( + Status.OUT_OF_RANGE.withDescription("View: " + joinView + " does not match: " + thisView))); return; } if (!View.isValidMask(note.getMask(), context)) { - log.warn("Invalid join mask: {} majority: {} from member: {} view: {} context: {} cardinality: {} on: {}", - note.getMask(), context.majority(), from, thisView, context.getId(), context.cardinality(), - node.getId()); + log.warn( + "Invalid join mask: {} majority: {} from member: {} view: {} context: {} cardinality: {} on: {}", + note.getMask(), context.majority(), from, thisView, context.getId(), context.cardinality(), + node.getId()); } if (pendingJoins.size() >= params.maxPending()) { - responseObserver.onError(new StatusRuntimeException(Status.RESOURCE_EXHAUSTED.withDescription("No room at the inn"))); + responseObserver.onError( + new StatusRuntimeException(Status.RESOURCE_EXHAUSTED.withDescription("No room at the inn"))); return; } pendingJoins.put(from, seeds -> { @@ -408,9 +360,8 @@ JoinGossip.Builder processJoins(BloomFilter bff) { } /** - * Process the inbound joins from the gossip. Reconcile the differences between - * the view's state and the digests of the gossip. Update the reply with the - * list of digests the view requires, as well as proposed updates based on the + * Process the inbound joins from the gossip. Reconcile the differences between the view's state and the digests of + * the gossip. Update the reply with the list of digests the view requires, as well as proposed updates based on the * inbound digests that the view has more recent information * * @param p @@ -451,8 +402,7 @@ Redirect seed(Registration registration, Digest from) { return Redirect.getDefaultInstance(); } return view.stable(() -> { - var newMember = view.new Participant( - note.getId()); + var newMember = view.new Participant(note.getId()); final var successors = new TreeSet(context.successors(newMember, m -> context.isActive(m))); log.debug("Member seeding: {} view: {} context: {} successors: {} on: {}", newMember.getId(), currentView(), @@ -492,9 +442,8 @@ private void initiateViewChange() { .setObserver(node.getId().toDigeste()) .setCurrent(currentView().toDigeste()) .setAttempt(attempt.getAndIncrement()) - .addAllLeaves(view.streamShunned() - .map(id -> id.toDigeste()) - .collect(Collectors.toSet())) + .addAllLeaves( + view.streamShunned().map(id -> id.toDigeste()).collect(Collectors.toSet())) .addAllJoins(joins.keySet().stream().map(id -> id.toDigeste()).toList()); ViewChange change = builder.build(); vote.set(change); @@ -529,4 +478,37 @@ private void setDiadem(final HexBloom hex) { diadem.set(hex); currentView.set(diadem.get().compactWrapped()); } + + record Ballot(Digest view, List leaving, List joining, int hash) { + + Ballot(Digest view, List leaving, List joining, DigestAlgorithm algo) { + this(view, leaving, joining, view.xor(joining.stream().reduce((a, b) -> a.xor(b)).orElse(algo.getOrigin())) + .xor(leaving.stream() + .reduce((a, b) -> a.xor(b)) + .orElse(algo.getOrigin()) + .xor(joining.stream() + .reduce((a, b) -> a.xor(b)) + .orElse(algo.getOrigin()))) + .hashCode()); + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof Ballot b) { + return Objects.equal(view, b.view) && Objects.equal(leaving, b.leaving) && Objects.equal(joining, + b.joining); + } + return false; + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public String toString() { + return String.format("{h: %s, j: %s, l: %s}", hash, joining.size(), leaving.size()); + } + } } diff --git a/grpc/src/main/proto/choam.proto b/grpc/src/main/proto/choam.proto index f3619bbd8..18d489be0 100644 --- a/grpc/src/main/proto/choam.proto +++ b/grpc/src/main/proto/choam.proto @@ -76,8 +76,8 @@ message Reconfigure { message Checkpoint { int64 byteSize = 1; int32 segmentSize = 2; - crypto.Digeste crown = 3; - repeated crypto.Digeste segments = 4; + int32 count = 3; + crypto.HexBloome crown = 4; } message Executions {