Skip to content

Commit

Permalink
use HexBloom for checkpoint validation, no hash list.
Browse files Browse the repository at this point in the history
Removed the list of segment hashes for the checkpoint.  Use the HexBloom crown for the validation of the segments.
  • Loading branch information
Hellblazer committed Nov 19, 2023
1 parent 7e84b9c commit c971683
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 56 deletions.
8 changes: 6 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmen
if (length != 0 && count * segmentSize < length) {
count++;
}
var accumulator = new HexBloom.Accumulator(count, 2, initial);
var accumulator = new HexBloom.HexAccumulator(count, 2, initial);
Checkpoint.Builder builder = Checkpoint.newBuilder()
.setCount(count)
.setByteSize(length)
Expand All @@ -166,7 +166,11 @@ public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmen
var crown = accumulator.build();
log.info("Checkpoint length: {} segment size: {} count: {} crown: {} initial: {}", length, segmentSize,
builder.getCount(), crown, initial);
return builder.setCrown(crown.toHexBloome()).build();
var cp = builder.setCrown(crown.toHexBloome()).build();

var deserialized = HexBloom.from(cp.getCrown());
log.info("Deserialized checkpoint crown: {} initial: {}", deserialized, initial);
return cp;
}

public static Block genesis(Digest id, Map<Member, Join> joins, HashedBlock head, Context<Member> context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ private void checkpointCompletion(int threshold, Initial mostRecent) {
store.put(checkpointView);
assert !checkpointView.height()
.equals(Unsigned.ulong(0)) : "Should not attempt when bootstrapping from genesis";
var diadem = HexBloom.from(checkpoint.block.getCheckpoint().getCrown());
log.info("Assembling from checkpoint: {}:{} crown: {} last cp: {} on: {}", checkpoint.height(), checkpoint.hash,
HexBloom.from(checkpoint.block.getCheckpoint().getCrown()),
Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()), params.member().getId());
diadem, Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()), params.member().getId());

CheckpointAssembler assembler = new CheckpointAssembler(params.gossipDuration(), checkpoint.height(),
checkpoint.block.getCheckpoint(), params.member(),
Expand All @@ -136,17 +136,18 @@ private void checkpointCompletion(int threshold, Initial mostRecent) {

// assemble the checkpoint
checkpointAssembled = assembler.assemble(scheduler, params.gossipDuration()).whenComplete((cps, t) -> {
log.info("Restored checkpoint: {} on: {}", checkpoint.height(), params.member().getId());
if (!cps.validate(diadem, Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()))) {
throw new IllegalStateException("Cannot validate checkpoint: " + checkpoint.height());
}
log.info("Restored checkpoint: {} diadem: {} on: {}", checkpoint.height(), diadem, params.member().getId());
checkpointState = cps;
});
// reconstruct chain to genesis
mostRecent.getViewChainList()
.stream()
.filter(cb -> cb.getBlock().hasReconfigure())
.map(cb -> new HashedCertifiedBlock(params.digestAlgorithm(), cb))
.forEach(reconfigure -> {
store.put(reconfigure);
});
.forEach(reconfigure -> store.put(reconfigure));
scheduleViewChainCompletion(new AtomicReference<>(checkpointView.height()), ULong.valueOf(0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ public CheckpointAssembler(Duration frequency, ULong height, Checkpoint checkpoi

public CompletableFuture<CheckpointState> assemble(ScheduledExecutorService scheduler, Duration duration) {
if (checkpoint.getCount() == 0) {
log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem,
member.getId());
assembled.complete(new CheckpointState(checkpoint, state));
assembled(new CheckpointState(checkpoint, state));
} else {
gossip(scheduler, duration);
}
Expand All @@ -97,14 +95,18 @@ private boolean gossip(Optional<CheckpointSegments> futureSailor) {
}
if (process(futureSailor.get())) {
CheckpointState cs = new CheckpointState(checkpoint, state);
log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem,
member.getId());
assembled.complete(cs);
assembled(cs);
return false;
}
return true;
}

private void assembled(CheckpointState cs) {
log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem,
member.getId());
assembled.complete(cs);
}

private void gossip(ScheduledExecutorService scheduler, Duration duration) {
if (assembled.isDone()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import com.salesfoce.apollo.choam.proto.Checkpoint;
import com.salesfoce.apollo.choam.proto.Slice;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.HexBloom;
import com.salesforce.apollo.utils.Utils;
import org.h2.mvstore.MVMap;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.ArrayList;
Expand Down Expand Up @@ -77,4 +80,23 @@ public List<Slice> fetchSegments(BloomFilter<Integer> bff, int maxSegments) {
}
return slices;
}

public boolean validate(HexBloom diadem, Digest initial) {
var crowns = diadem.crowns();
var algorithm = crowns.get(0).getAlgorithm();
var accumulator = new HexBloom.Accumulator(diadem.getCardinality(), crowns.size(), initial);
state.keyIterator(0).forEachRemaining(i -> {
byte[] buf = state.get(i);
accumulator.add(algorithm.digest(buf));
});
for (int i = 0; i < crowns.size(); i++) {
var candidates = accumulator.wrappedCrowns();
if (!crowns.get(i).equals(candidates.get(i))) {
LoggerFactory.getLogger(CheckpointState.class)
.warn("Crown[{}] expected: {} found: {}", i, crowns.get(i), candidates.get(i));
return false;
}
}
return true;
}
}
2 changes: 1 addition & 1 deletion choam/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.choam.support.Bootstrapper" level="trace" additivity="false">
<logger name="com.salesforce.apollo.choam.support.Bootstrapper" level="info" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,10 @@ public List<Digest> wrappedCrowns(List<Function<Digest, Digest>> wrapingHash) {
}

public static class Accumulator {
private final List<AtomicReference<Digest>> accumulators;
private final int cardinality;
private final BloomFilter<Digest> membership;
private final List<Function<Digest, Digest>> hashes;
private int currentCount = 0;
protected final List<AtomicReference<Digest>> accumulators;
protected final int cardinality;
protected final List<Function<Digest, Digest>> hashes;
protected int currentCount = 0;

public Accumulator(int cardinality, int crowns, Digest initial, double fpr) {
this(cardinality, hashes(crowns), initial, fpr);
Expand All @@ -511,7 +510,6 @@ public Accumulator(int cardinality, List<Function<Digest, Digest>> crownHashes,
}
this.cardinality = cardinality;
this.hashes = crownHashes;
membership = new BloomFilter.DigestBloomFilter(DEFAULT_SEED, Math.max(MINIMUM_BFF_CARD, cardinality), fpr);
accumulators = IntStream.range(0, hashes.size())
.mapToObj(i -> hashes.get(i).apply(initial))
.map(d -> new AtomicReference<>(d))
Expand All @@ -522,6 +520,16 @@ public Accumulator(int cardinality, int crowns, Digest initial) {
this(cardinality, crowns, initial, DEFAULT_FPR);
}

public List<Digest> wrappedCrowns() {
return wrappedCrowns(hashWraps(accumulators.size()));
}

public List<Digest> wrappedCrowns(List<Function<Digest, Digest>> wrapingHash) {
return IntStream.range(0, accumulators.size())
.mapToObj(i -> wrapingHash.get(i).apply(accumulators.get(i).get()))
.toList();
}

public void add(Digest digest) {
if (currentCount == cardinality) {
throw new IllegalArgumentException("Current count already equal to cardinality: " + cardinality);
Expand All @@ -530,6 +538,36 @@ public void add(Digest digest) {
for (int i = 0; i < accumulators.size(); i++) {
accumulators.get(i).accumulateAndGet(hashes.get(i).apply(digest), (a, b) -> a.xor(b));
}
}

public List<Digest> crowns() {
return accumulators.stream().map(ar -> ar.get()).toList();
}

public List<Digest> wrapped() {
return accumulators.stream().map(ar -> ar.get()).toList();
}
}

public static class HexAccumulator extends Accumulator {
private final BloomFilter<Digest> membership;

public HexAccumulator(int cardinality, int crowns, Digest initial, double fpr) {
this(cardinality, hashes(crowns), initial, fpr);
}

public HexAccumulator(int cardinality, List<Function<Digest, Digest>> crownHashes, Digest initial, double fpr) {
super(cardinality, crownHashes, initial, fpr);
membership = new BloomFilter.DigestBloomFilter(DEFAULT_SEED, Math.max(MINIMUM_BFF_CARD, cardinality), fpr);
}

public HexAccumulator(int cardinality, int crowns, Digest initial) {
this(cardinality, crowns, initial, DEFAULT_FPR);
}

@Override
public void add(Digest digest) {
super.add(digest);
membership.add(digest);
}

Expand Down
70 changes: 35 additions & 35 deletions grpc/src/main/proto/crypto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,70 @@ import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";

package crypto;

message Biff {
int32 m = 1;
int32 k = 2;
int64 seed = 3;
int32 type = 4;
repeated uint64 bits = 5;
int32 m = 1;
int32 k = 2;
int64 seed = 3;
int32 type = 4;
repeated uint64 bits = 5;
}

message Digeste {
int32 type = 1;
repeated uint64 hash = 2;
int32 type = 1;
repeated uint64 hash = 2;
}

message Sig {
int32 code = 1;
repeated bytes signatures = 2;
int32 code = 1;
repeated bytes signatures = 2;
}

message PubKey {
int32 code = 1;
bytes encoded = 2;
int32 code = 1;
bytes encoded = 2;
}

message Clock {
uint64 prefix = 1;
bytes counts = 2;
uint64 prefix = 1;
bytes counts = 2;
}


message StampedClock {
oneof stamp {
google.protobuf.Timestamp timestamp = 1;
uint32 int = 2;
uint64 long = 3;
}
Clock clock = 5;
oneof stamp {
google.protobuf.Timestamp timestamp = 1;
uint32 int = 2;
uint64 long = 3;
}
Clock clock = 5;
}

message BloomeClock {
uint64 prefix = 1;
int32 k = 2;
bytes counts = 3;
uint64 prefix = 1;
int32 k = 2;
bytes counts = 3;
}

message StampedBloomeClock {
BloomeClock clock = 1;
google.protobuf.Timestamp stamp = 2;
BloomeClock clock = 1;
google.protobuf.Timestamp stamp = 2;
}

message IntStampedBloomeClock {
BloomeClock clock = 1;
int32 stamp = 2;
BloomeClock clock = 1;
int32 stamp = 2;
}

message CausalMessage {
Digeste source = 1;
StampedClock clock = 2;
google.protobuf.Any content = 3;
repeated Digeste parents = 4;
Digeste source = 1;
StampedClock clock = 2;
google.protobuf.Any content = 3;
repeated Digeste parents = 4;
}

message HexBloome {
repeated Digeste crowns = 1;
Biff membership = 2;
int32 cardinality = 3;
int32 cardinality = 1;
repeated Digeste crowns = 2;
Biff membership = 3;
}

0 comments on commit c971683

Please sign in to comment.