Skip to content

Commit

Permalink
use HexBloom for checkpoint validation, no hash list.
Browse files Browse the repository at this point in the history
Removed the list of segment hashes for the checkpoint.  Use the HexBloom crown for the validation of the segments.
  • Loading branch information
Hellblazer committed Nov 19, 2023
1 parent 023ea06 commit 7e84b9c
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 192 deletions.
27 changes: 17 additions & 10 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,31 +135,38 @@ public CHOAM(Parameters params) {
session = new Session(params, service());
}

public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmentSize, Digest initialCrown) {
public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmentSize, Digest initial) {
assert segmentSize > 0 : "segment size must be > 0 : " + segmentSize;
long length = 0;
if (state != null) {
length = state.length();
}
Checkpoint.Builder builder = Checkpoint.newBuilder().setByteSize(length).setSegmentSize(segmentSize);
int count = (int) (length / segmentSize);
if (length != 0 && count * segmentSize < length) {
count++;
}
var accumulator = new HexBloom.Accumulator(count, 2, initial);
Checkpoint.Builder builder = Checkpoint.newBuilder()
.setCount(count)
.setByteSize(length)
.setSegmentSize(segmentSize);

if (state != null) {
byte[] buff = new byte[segmentSize];
try (FileInputStream fis = new FileInputStream(state)) {
for (int read = fis.read(buff); read > 0; read = fis.read(buff)) {
ByteString segment = ByteString.copyFrom(buff, 0, read);
builder.addSegments(algo.digest(segment).toDigeste());
accumulator.add(algo.digest(segment));
}
} catch (IOException e) {
log.error("Invalid checkpoint!", e);
return null;
}
}

var crown = HexBloom.construct(builder.getSegmentsCount(),
builder.getSegmentsList().stream().map(d -> Digest.from(d)), initialCrown, 2)
.compactWrapped();
log.info("Checkpoint length: {} segment size: {} count: {} crown: {}", length, segmentSize,
builder.getSegmentsCount(), crown);
return builder.setCrown(crown.toDigeste()).build();
var crown = accumulator.build();
log.info("Checkpoint length: {} segment size: {} count: {} crown: {} initial: {}", length, segmentSize,
builder.getCount(), crown, initial);
return builder.setCrown(crown.toHexBloome()).build();
}

public static Block genesis(Digest id, Map<Member, Join> joins, HashedBlock head, Context<Member> context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ private void checkpointCompletion(int threshold, Initial mostRecent) {
store.put(checkpointView);
assert !checkpointView.height()
.equals(Unsigned.ulong(0)) : "Should not attempt when bootstrapping from genesis";
log.info("Assembling from checkpoint: {}:{} on: {}", checkpoint.height(), checkpoint.hash,
params.member().getId());
log.info("Assembling from checkpoint: {}:{} crown: {} last cp: {} on: {}", checkpoint.height(), checkpoint.hash,
HexBloom.from(checkpoint.block.getCheckpoint().getCrown()),
Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()), params.member().getId());

CheckpointAssembler assembler = new CheckpointAssembler(params.gossipDuration(), checkpoint.height(),
checkpoint.block.getCheckpoint(), params.member(),
Expand All @@ -135,7 +136,6 @@ private void checkpointCompletion(int threshold, Initial mostRecent) {

// assemble the checkpoint
checkpointAssembled = assembler.assemble(scheduler, params.gossipDuration()).whenComplete((cps, t) -> {
validate(cps);
log.info("Restored checkpoint: {} on: {}", checkpoint.height(), params.member().getId());
checkpointState = cps;
});
Expand All @@ -150,22 +150,6 @@ private void checkpointCompletion(int threshold, Initial mostRecent) {
scheduleViewChainCompletion(new AtomicReference<>(checkpointView.height()), ULong.valueOf(0));
}

private void validate(CheckpointState cps) {
var crown = HexBloom.construct(cps.checkpoint.getSegmentsCount(),
cps.checkpoint.getSegmentsList().stream().map(d -> Digest.from(d)),
Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()),
params.crowns());
var have = crown.compactWrapped();
var expected = Digest.from(cps.checkpoint.getCrown());
if (!have.equals(expected)) {
log.error("Invalid crown for checkpointed state have: {} expected: {} on: {}", have, expected,
params.member());
throw new IllegalStateException(
"Invalid crown for checkpointed state have: %s expected: %s on: %s".formatted(have, expected,
params.member()));
}
}

private boolean completeAnchor(Optional<Blocks> futureSailor, AtomicReference<ULong> start, ULong end,
RingCommunications.Destination<Member, Terminal> destination) {
if (sync.isDone() || anchorSynchronized.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.salesforce.apollo.choam.comm.Terminal;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.DigestAlgorithm;
import com.salesforce.apollo.crypto.HexBloom;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;
Expand All @@ -26,8 +27,6 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -49,10 +48,10 @@ public class CheckpointAssembler {
private final DigestAlgorithm digestAlgorithm;
private final double fpr;
private final Duration frequency;
private final List<Digest> hashes = new ArrayList<>();
private final ULong height;
private final SigningMember member;
private final MVMap<Integer, byte[]> state;
private final HexBloom diadem;

public CheckpointAssembler(Duration frequency, ULong height, Checkpoint checkpoint, SigningMember member,
Store store, CommonCommunications<Terminal, Concierge> comms, Context<Member> context,
Expand All @@ -66,13 +65,13 @@ public CheckpointAssembler(Duration frequency, ULong height, Checkpoint checkpoi
this.digestAlgorithm = digestAlgorithm;
this.frequency = frequency;
state = store.createCheckpoint(height);
checkpoint.getSegmentsList().stream().map(bs -> new Digest(bs)).forEach(hash -> hashes.add(hash));
diadem = HexBloom.from(checkpoint.getCrown());
}

public CompletableFuture<CheckpointState> assemble(ScheduledExecutorService scheduler, Duration duration) {
if (checkpoint.getSegmentsCount() == 0) {
log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getSegmentsCount(),
Digest.from(checkpoint.getCrown()), member.getId());
if (checkpoint.getCount() == 0) {
log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem,
member.getId());
assembled.complete(new CheckpointState(checkpoint, state));
} else {
gossip(scheduler, duration);
Expand All @@ -82,8 +81,8 @@ public CompletableFuture<CheckpointState> assemble(ScheduledExecutorService sche

private CheckpointReplication buildRequest() {
long seed = Entropy.nextBitsStreamLong();
BloomFilter<Integer> segmentsBff = new BloomFilter.IntBloomFilter(seed, checkpoint.getSegmentsCount(), fpr);
IntStream.range(0, checkpoint.getSegmentsCount()).filter(i -> state.containsKey(i)).forEach(i -> {
BloomFilter<Integer> segmentsBff = new BloomFilter.IntBloomFilter(seed, checkpoint.getCount(), fpr);
IntStream.range(0, checkpoint.getCount()).filter(i -> state.containsKey(i)).forEach(i -> {
segmentsBff.add(i);
});
return CheckpointReplication.newBuilder()
Expand All @@ -98,8 +97,8 @@ private boolean gossip(Optional<CheckpointSegments> futureSailor) {
}
if (process(futureSailor.get())) {
CheckpointState cs = new CheckpointState(checkpoint, state);
log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getSegmentsCount(),
Digest.from(checkpoint.getCrown()), member.getId());
log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem,
member.getId());
assembled.complete(cs);
return false;
}
Expand All @@ -110,8 +109,8 @@ private void gossip(ScheduledExecutorService scheduler, Duration duration) {
if (assembled.isDone()) {
return;
}
log.info("Assembly of checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getSegmentsCount(),
Digest.from(checkpoint.getCrown()), member.getId());
log.info("Assembly of checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem,
member.getId());
var ringer = new RingIterator<>(frequency, context, member, comms, true, scheduler);
ringer.iterate(randomCut(digestAlgorithm), (link, ring) -> gossip(link),
(tally, result, destination) -> gossip(result),
Expand All @@ -132,13 +131,10 @@ private CheckpointSegments gossip(Terminal link) {
private boolean process(CheckpointSegments segments) {
segments.getSegmentsList().forEach(segment -> {
Digest hash = digestAlgorithm.digest(segment.getBlock());
int index = segment.getIndex();
if (index >= 0 && index < hashes.size()) {
if (hash.equals(hashes.get(index))) {
state.computeIfAbsent(index, i -> segment.getBlock().toByteArray());
}
if (diadem.contains(hash)) {
state.computeIfAbsent(segment.getIndex(), i -> segment.getBlock().toByteArray());
}
});
return state.size() == hashes.size();
return state.size() == checkpoint.getCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void close() {

public List<Slice> fetchSegments(BloomFilter<Integer> bff, int maxSegments) {
List<Slice> slices = new ArrayList<>();
for (int i = 0; i < checkpoint.getSegmentsCount(); i++) {
for (int i = 0; i < checkpoint.getCount(); i++) {
if (!bff.contains(i)) {
slices.add(Slice.newBuilder().setIndex(i).setBlock(ByteString.copyFrom(state.get(i))).build());
if (slices.size() >= maxSegments) {
Expand Down
58 changes: 24 additions & 34 deletions choam/src/main/java/com/salesforce/apollo/choam/support/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,32 @@
*/
package com.salesforce.apollo.choam.support;

import static com.salesforce.apollo.choam.support.HashedBlock.height;
import com.google.protobuf.InvalidProtocolBufferException;
import com.salesfoce.apollo.choam.proto.*;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.DigestAlgorithm;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.joou.ULong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.StreamSupport;

import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.joou.ULong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.InvalidProtocolBufferException;
import com.salesfoce.apollo.choam.proto.Block;
import com.salesfoce.apollo.choam.proto.Blocks;
import com.salesfoce.apollo.choam.proto.Certification;
import com.salesfoce.apollo.choam.proto.Certifications;
import com.salesfoce.apollo.choam.proto.CertifiedBlock;
import com.salesfoce.apollo.choam.proto.Checkpoint;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.DigestAlgorithm;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import static com.salesforce.apollo.choam.support.HashedBlock.height;

/**
* Kind of a DAO for "nosql" block storage with MVStore from H2
*
* @author hal.hildebrand
*
*/
public class Store {

Expand Down Expand Up @@ -85,7 +72,7 @@ public byte[] block(ULong height) {
public Iterator<ULong> blocksFrom(ULong from, ULong to, int max) {
return new Iterator<>() {
ULong next;
int remaining = max;
int remaining = max;

{
next = from;
Expand Down Expand Up @@ -155,8 +142,8 @@ public MVMap<Integer, byte[]> createCheckpoint(ULong blockHeight) {
return blocks.store.openMap(String.format(CHECKPOINT_TEMPLATE, blockHeight));
}

public void fetchBlocks(BloomFilter<ULong> blocksBff, Blocks.Builder replication, int max, ULong from,
ULong to) throws IllegalStateException {
public void fetchBlocks(BloomFilter<ULong> blocksBff, Blocks.Builder replication, int max, ULong from, ULong to)
throws IllegalStateException {
StreamSupport.stream(((Iterable<ULong>) () -> blocksFrom(from, to, max)).spliterator(), false)
.filter(s -> !blocksBff.contains(s))
.map(height -> getCertifiedBlock(height))
Expand Down Expand Up @@ -291,8 +278,8 @@ public MVMap<Integer, byte[]> putCheckpoint(ULong blockHeight, File state, Check
} catch (IOException e) {
throw new IllegalStateException("Error storing checkpoint " + blockHeight, e);
}
assert cp.size() == checkpoint.getSegmentsCount() : "Invalid number of segments: " + cp.size()
+ " should be: " + checkpoint.getSegmentsCount();
assert cp.size() == checkpoint.getCount() : "Invalid number of segments: " + cp.size() + " should be: "
+ checkpoint.getCount();
checkpoints.put(blockHeight, cp);
return cp;
});
Expand Down Expand Up @@ -321,8 +308,9 @@ public void validate(ULong from, ULong to) throws IllegalStateException {
} else {
Digest pointer = new Digest(current.block.getHeader().getPrevious());
if (!prevHash.get().equals(pointer)) {
throw new IllegalStateException(String.format("Invalid chain (%s, %s) block: %s has invalid previous hash: %s, expected: %s",
from, to, l, pointer, prevHash.get()));
throw new IllegalStateException(
String.format("Invalid chain (%s, %s) block: %s has invalid previous hash: %s, expected: %s",
from, to, l, pointer, prevHash.get()));
} else {
prevHash.set(current.hash);
}
Expand All @@ -349,8 +337,9 @@ public void validateViewChain(ULong from) throws IllegalStateException {
next = ULong.valueOf(current.block.getHeader().getLastReconfig());
current = getBlock(next);
} else {
throw new IllegalStateException(String.format("Invalid view chain (%s, %s) invalid: %s expected: %s have: %s",
from, 0, current.height(), pointer, current.hash));
throw new IllegalStateException(
String.format("Invalid view chain (%s, %s) invalid: %s expected: %s have: %s", from, 0,
current.height(), pointer, current.hash));
}
}
}
Expand All @@ -362,6 +351,7 @@ public long version() {
public Iterator<ULong> viewChainFrom(ULong from, ULong to) {
return new Iterator<>() {
ULong next;

{
next = viewChain.get(from);
if (!viewChain.containsKey(next)) {
Expand Down
10 changes: 7 additions & 3 deletions choam/src/test/java/com/salesforce/apollo/choam/TestChain.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import com.salesfoce.apollo.choam.proto.*;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock;
import com.salesforce.apollo.choam.support.Store;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.DigestAlgorithm;
import org.slf4j.LoggerFactory;

/**
* @author hal.hildebrand
Expand All @@ -32,6 +32,7 @@ public TestChain(Store store) {

public TestChain anchor() {
anchor = lastBlock;
LoggerFactory.getLogger(TestChain.class).debug("Anchor: {}", lastBlock.hash);
return this;
}

Expand Down Expand Up @@ -126,11 +127,12 @@ private HashedCertifiedBlock checkpointBlock() {
.setCheckpoint(
CHOAM.checkpoint(
DigestAlgorithm.DEFAULT,
null, 0,
DigestAlgorithm.DEFAULT.getOrigin()))
null, 1,
checkpoint.hash))
.build())
.build());
store.put(lastBlock);
LoggerFactory.getLogger(TestChain.class).debug("Checkpoint: {}", lastBlock.hash);
return lastBlock;
}

Expand Down Expand Up @@ -161,6 +163,7 @@ private HashedCertifiedBlock reconfigureBlock() {
.build())
.build());
store.put(lastBlock);
LoggerFactory.getLogger(TestChain.class).debug("Reconfigure: {}", lastBlock.hash);
return lastBlock;
}

Expand Down Expand Up @@ -193,6 +196,7 @@ private HashedCertifiedBlock userBlock() {
.build());
store.put(block);
lastBlock = block;
LoggerFactory.getLogger(TestChain.class).debug("Executions: {}", lastBlock.hash);
return lastBlock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.salesfoce.apollo.choam.proto.Blocks;
import com.salesfoce.apollo.choam.proto.Initial;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
import com.salesforce.apollo.choam.TestChain;
Expand All @@ -25,7 +26,6 @@
import com.salesforce.apollo.stereotomy.StereotomyImpl;
import com.salesforce.apollo.stereotomy.mem.MemKERL;
import com.salesforce.apollo.stereotomy.mem.MemKeyStore;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import org.h2.mvstore.MVStore;
import org.joou.ULong;
import org.junit.jupiter.api.Test;
Expand Down
Loading

0 comments on commit 7e84b9c

Please sign in to comment.