diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 14365b314..f0ef6409f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -145,7 +145,7 @@ public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmen if (length != 0 && count * segmentSize < length) { count++; } - var accumulator = new HexBloom.Accumulator(count, 2, initial); + var accumulator = new HexBloom.HexAccumulator(count, 2, initial); Checkpoint.Builder builder = Checkpoint.newBuilder() .setCount(count) .setByteSize(length) @@ -166,7 +166,11 @@ public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmen var crown = accumulator.build(); log.info("Checkpoint length: {} segment size: {} count: {} crown: {} initial: {}", length, segmentSize, builder.getCount(), crown, initial); - return builder.setCrown(crown.toHexBloome()).build(); + var cp = builder.setCrown(crown.toHexBloome()).build(); + + var deserialized = HexBloom.from(cp.getCrown()); + log.info("Deserialized checkpoint crown: {} initial: {}", deserialized, initial); + return cp; } public static Block genesis(Digest id, Map joins, HashedBlock head, Context context, diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java index add36dbc9..037f7b203 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java @@ -125,9 +125,9 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { store.put(checkpointView); assert !checkpointView.height() .equals(Unsigned.ulong(0)) : "Should not attempt when bootstrapping from genesis"; + var diadem = HexBloom.from(checkpoint.block.getCheckpoint().getCrown()); log.info("Assembling from checkpoint: {}:{} crown: {} last cp: {} on: {}", checkpoint.height(), checkpoint.hash, - HexBloom.from(checkpoint.block.getCheckpoint().getCrown()), - Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()), params.member().getId()); + diadem, Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()), params.member().getId()); CheckpointAssembler assembler = new CheckpointAssembler(params.gossipDuration(), checkpoint.height(), checkpoint.block.getCheckpoint(), params.member(), @@ -136,7 +136,10 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { // assemble the checkpoint checkpointAssembled = assembler.assemble(scheduler, params.gossipDuration()).whenComplete((cps, t) -> { - log.info("Restored checkpoint: {} on: {}", checkpoint.height(), params.member().getId()); + if (!cps.validate(diadem, Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()))) { + throw new IllegalStateException("Cannot validate checkpoint: " + checkpoint.height()); + } + log.info("Restored checkpoint: {} diadem: {} on: {}", checkpoint.height(), diadem, params.member().getId()); checkpointState = cps; }); // reconstruct chain to genesis @@ -144,9 +147,7 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { .stream() .filter(cb -> cb.getBlock().hasReconfigure()) .map(cb -> new HashedCertifiedBlock(params.digestAlgorithm(), cb)) - .forEach(reconfigure -> { - store.put(reconfigure); - }); + .forEach(reconfigure -> store.put(reconfigure)); scheduleViewChainCompletion(new AtomicReference<>(checkpointView.height()), ULong.valueOf(0)); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java index deb601ca1..67fc5c5fc 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java @@ -70,9 +70,7 @@ public CheckpointAssembler(Duration frequency, ULong height, Checkpoint checkpoi public CompletableFuture assemble(ScheduledExecutorService scheduler, Duration duration) { if (checkpoint.getCount() == 0) { - log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem, - member.getId()); - assembled.complete(new CheckpointState(checkpoint, state)); + assembled(new CheckpointState(checkpoint, state)); } else { gossip(scheduler, duration); } @@ -97,14 +95,18 @@ private boolean gossip(Optional futureSailor) { } if (process(futureSailor.get())) { CheckpointState cs = new CheckpointState(checkpoint, state); - log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem, - member.getId()); - assembled.complete(cs); + assembled(cs); return false; } return true; } + private void assembled(CheckpointState cs) { + log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem, + member.getId()); + assembled.complete(cs); + } + private void gossip(ScheduledExecutorService scheduler, Duration duration) { if (assembled.isDone()) { return; diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointState.java b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointState.java index 4bb2a7a51..dfab2142e 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointState.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointState.java @@ -10,8 +10,11 @@ import com.salesfoce.apollo.choam.proto.Checkpoint; import com.salesfoce.apollo.choam.proto.Slice; import com.salesforce.apollo.bloomFilters.BloomFilter; +import com.salesforce.apollo.crypto.Digest; +import com.salesforce.apollo.crypto.HexBloom; import com.salesforce.apollo.utils.Utils; import org.h2.mvstore.MVMap; +import org.slf4j.LoggerFactory; import java.io.*; import java.util.ArrayList; @@ -77,4 +80,23 @@ public List fetchSegments(BloomFilter bff, int maxSegments) { } return slices; } + + public boolean validate(HexBloom diadem, Digest initial) { + var crowns = diadem.crowns(); + var algorithm = crowns.get(0).getAlgorithm(); + var accumulator = new HexBloom.Accumulator(diadem.getCardinality(), crowns.size(), initial); + state.keyIterator(0).forEachRemaining(i -> { + byte[] buf = state.get(i); + accumulator.add(algorithm.digest(buf)); + }); + for (int i = 0; i < crowns.size(); i++) { + var candidates = accumulator.wrappedCrowns(); + if (!crowns.get(i).equals(candidates.get(i))) { + LoggerFactory.getLogger(CheckpointState.class) + .warn("Crown[{}] expected: {} found: {}", i, crowns.get(i), candidates.get(i)); + return false; + } + } + return true; + } } diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml index 6b78912b1..bdc4aacb5 100644 --- a/choam/src/test/resources/logback-test.xml +++ b/choam/src/test/resources/logback-test.xml @@ -24,7 +24,7 @@ - + diff --git a/cryptography/src/main/java/com/salesforce/apollo/crypto/HexBloom.java b/cryptography/src/main/java/com/salesforce/apollo/crypto/HexBloom.java index 31b173690..3a6e55ac4 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/crypto/HexBloom.java +++ b/cryptography/src/main/java/com/salesforce/apollo/crypto/HexBloom.java @@ -489,11 +489,10 @@ public List wrappedCrowns(List> wrapingHash) { } public static class Accumulator { - private final List> accumulators; - private final int cardinality; - private final BloomFilter membership; - private final List> hashes; - private int currentCount = 0; + protected final List> accumulators; + protected final int cardinality; + protected final List> hashes; + protected int currentCount = 0; public Accumulator(int cardinality, int crowns, Digest initial, double fpr) { this(cardinality, hashes(crowns), initial, fpr); @@ -511,7 +510,6 @@ public Accumulator(int cardinality, List> crownHashes, } this.cardinality = cardinality; this.hashes = crownHashes; - membership = new BloomFilter.DigestBloomFilter(DEFAULT_SEED, Math.max(MINIMUM_BFF_CARD, cardinality), fpr); accumulators = IntStream.range(0, hashes.size()) .mapToObj(i -> hashes.get(i).apply(initial)) .map(d -> new AtomicReference<>(d)) @@ -522,6 +520,16 @@ public Accumulator(int cardinality, int crowns, Digest initial) { this(cardinality, crowns, initial, DEFAULT_FPR); } + public List wrappedCrowns() { + return wrappedCrowns(hashWraps(accumulators.size())); + } + + public List wrappedCrowns(List> wrapingHash) { + return IntStream.range(0, accumulators.size()) + .mapToObj(i -> wrapingHash.get(i).apply(accumulators.get(i).get())) + .toList(); + } + public void add(Digest digest) { if (currentCount == cardinality) { throw new IllegalArgumentException("Current count already equal to cardinality: " + cardinality); @@ -530,6 +538,36 @@ public void add(Digest digest) { for (int i = 0; i < accumulators.size(); i++) { accumulators.get(i).accumulateAndGet(hashes.get(i).apply(digest), (a, b) -> a.xor(b)); } + } + + public List crowns() { + return accumulators.stream().map(ar -> ar.get()).toList(); + } + + public List wrapped() { + return accumulators.stream().map(ar -> ar.get()).toList(); + } + } + + public static class HexAccumulator extends Accumulator { + private final BloomFilter membership; + + public HexAccumulator(int cardinality, int crowns, Digest initial, double fpr) { + this(cardinality, hashes(crowns), initial, fpr); + } + + public HexAccumulator(int cardinality, List> crownHashes, Digest initial, double fpr) { + super(cardinality, crownHashes, initial, fpr); + membership = new BloomFilter.DigestBloomFilter(DEFAULT_SEED, Math.max(MINIMUM_BFF_CARD, cardinality), fpr); + } + + public HexAccumulator(int cardinality, int crowns, Digest initial) { + this(cardinality, crowns, initial, DEFAULT_FPR); + } + + @Override + public void add(Digest digest) { + super.add(digest); membership.add(digest); } diff --git a/grpc/src/main/proto/crypto.proto b/grpc/src/main/proto/crypto.proto index 21c16b62b..d62f921ac 100644 --- a/grpc/src/main/proto/crypto.proto +++ b/grpc/src/main/proto/crypto.proto @@ -8,70 +8,70 @@ import "google/protobuf/any.proto"; import "google/protobuf/timestamp.proto"; package crypto; - + message Biff { - int32 m = 1; - int32 k = 2; - int64 seed = 3; - int32 type = 4; - repeated uint64 bits = 5; + int32 m = 1; + int32 k = 2; + int64 seed = 3; + int32 type = 4; + repeated uint64 bits = 5; } message Digeste { - int32 type = 1; - repeated uint64 hash = 2; + int32 type = 1; + repeated uint64 hash = 2; } message Sig { - int32 code = 1; - repeated bytes signatures = 2; + int32 code = 1; + repeated bytes signatures = 2; } message PubKey { - int32 code = 1; - bytes encoded = 2; + int32 code = 1; + bytes encoded = 2; } message Clock { - uint64 prefix = 1; - bytes counts = 2; + uint64 prefix = 1; + bytes counts = 2; } message StampedClock { - oneof stamp { - google.protobuf.Timestamp timestamp = 1; - uint32 int = 2; - uint64 long = 3; - } - Clock clock = 5; + oneof stamp { + google.protobuf.Timestamp timestamp = 1; + uint32 int = 2; + uint64 long = 3; + } + Clock clock = 5; } - + message BloomeClock { - uint64 prefix = 1; - int32 k = 2; - bytes counts = 3; + uint64 prefix = 1; + int32 k = 2; + bytes counts = 3; } message StampedBloomeClock { - BloomeClock clock = 1; - google.protobuf.Timestamp stamp = 2; + BloomeClock clock = 1; + google.protobuf.Timestamp stamp = 2; } message IntStampedBloomeClock { - BloomeClock clock = 1; - int32 stamp = 2; + BloomeClock clock = 1; + int32 stamp = 2; } message CausalMessage { - Digeste source = 1; - StampedClock clock = 2; - google.protobuf.Any content = 3; - repeated Digeste parents = 4; + Digeste source = 1; + StampedClock clock = 2; + google.protobuf.Any content = 3; + repeated Digeste parents = 4; } message HexBloome { - repeated Digeste crowns = 1; - Biff membership = 2; - int32 cardinality = 3; + int32 cardinality = 1; + repeated Digeste crowns = 2; + Biff membership = 3; }