Skip to content

Commit

Permalink
prevent view replay on ViewMember in GenesisAssembly and ViewAssembly.
Browse files Browse the repository at this point in the history
Remove second thread from CI build - just 4 forks of testing now
  • Loading branch information
Hellblazer committed Apr 7, 2024
1 parent fed21df commit eb2f42b
Show file tree
Hide file tree
Showing 18 changed files with 255 additions and 173 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ jobs:
cache: 'maven'
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: ./mvnw -T 0.5C -batch-mode clean install -Ppre --file pom.xml -Dforks=4
run: ./mvnw -batch-mode clean install -Ppre --file pom.xml -Dforks=4
119 changes: 72 additions & 47 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions choam/src/main/java/com/salesforce/apollo/choam/Committee.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.StaticContext;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.cryptography.JohnHancock;
import com.salesforce.apollo.cryptography.Verifier;
import com.salesforce.apollo.cryptography.Verifier.DefaultVerifier;
Expand All @@ -34,9 +33,10 @@ public interface Committee {
static Map<Member, Verifier> validatorsOf(Reconfigure reconfigure, Context<Member> context) {
var validators = reconfigure.getJoinsList()
.stream()
.collect(Collectors.toMap(e -> context.getMember(new Digest(e.getMember().getId())),
e -> (Verifier) new DefaultVerifier(
publicKey(e.getMember().getConsensusKey()))));
.collect(
Collectors.toMap(e -> context.getMember(new Digest(e.getMember().getVm().getId())),
e -> (Verifier) new DefaultVerifier(
publicKey(e.getMember().getVm().getConsensusKey()))));
assert !validators.isEmpty() : "No validators in this reconfiguration of: " + context.getId();
return validators;
}
Expand Down Expand Up @@ -71,7 +71,7 @@ default void assembled() {

boolean isMember();

ViewMember join(Digest nextView, Digest from);
SignedViewMember join(Digest nextView, Digest from);

Logger log();

Expand Down Expand Up @@ -115,11 +115,11 @@ default boolean validate(HashedCertifiedBlock hb, Certification c, Map<Member, V

final boolean verified = verify.verify(new JohnHancock(c.getSignature()), hb.block.getHeader().toByteString());
if (!verified) {
log().debug("Failed verification: false using: {} key: {} on: {}", witness.getId(),
DigestAlgorithm.DEFAULT.digest(verify.toString()), params.member().getId());
} else {
log().trace("Verified: true using: {} key: {} on: {}", witness.getId(),
DigestAlgorithm.DEFAULT.digest(verify.toString()), params.member().getId());
log().debug("Failed verification: {} hash: {} height: {} using: {} : {} on: {}", hb.block.getBodyCase(),
hb.hash, hb.height(), witness.getId(), verify, params.member().getId());
} else if (log().isTraceEnabled()) {
log().trace("Verified: {} hash: {} height: {} using: {} : {} on: {}", hb.block.getBodyCase(), hb.hash,
hb.height(), witness.getId(), verify, params.member().getId());
}
return verified;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class GenesisAssembly implements Genesis {
private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class);
private final Ethereal controller;
private final ChRbcGossip coordinator;
private final ViewMember genesisMember;
private final SignedViewMember genesisMember;
private final Map<Digest, Member> nextAssembly;
private final Map<Digest, Proposed> proposals = new ConcurrentHashMap<>();
private final AtomicBoolean published = new AtomicBoolean();
Expand All @@ -60,7 +60,7 @@ public class GenesisAssembly implements Genesis {
private volatile Thread blockingThread;
private volatile HashedBlock reconfiguration;

public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms, ViewMember genesisMember,
public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms, SignedViewMember genesisMember,
String label) {
view = vc;
ds = new OneShot();
Expand Down Expand Up @@ -136,7 +136,6 @@ public void gather() {
var join = Join.newBuilder()
.setMember(genesisMember)
.addEndorsements(certification)
.setView(view.context().getId().toDigeste())
.setKerl(params().kerl().get())
.build();
var proposed = new Proposed(join, params().member());
Expand All @@ -160,8 +159,8 @@ public void gather(List<ByteString> preblock, boolean last) {
})
.filter(Objects::nonNull)
.filter(j -> !j.equals(Join.getDefaultInstance()))
.peek(
j -> log.info("Gathering: {} on: {}", Digest.from(j.getMember().getId()), params().member().getId()))
.peek(j -> log.info("Gathering: {} on: {}", Digest.from(j.getMember().getVm().getId()),
params().member().getId()))
.forEach(this::join);
}

Expand Down Expand Up @@ -196,6 +195,16 @@ public void nominations(List<ByteString> preblock, boolean last) {

@Override
public void publish() {
if (witnesses.size() < params().majority()) {
log.warn("Cannot publish genesis: {} with: {} witnesses on: {}", reconfiguration.hash, witnesses.size(),
params().member().getId());
return;
}
if (!published.compareAndSet(false, true)) {
log.debug("already published genesis: {} with {} witnesses on: {}", reconfiguration.hash, witnesses.size(),
params().member().getId());
return;
}
var b = CertifiedBlock.newBuilder().setBlock(reconfiguration.block);
witnesses.entrySet()
.stream()
Expand All @@ -204,8 +213,8 @@ public void publish() {
.forEach(v -> b.addCertifications(v.getWitness()));
view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), b.build()));
// controller.completeIt();
log.info("Genesis block: {} published for: {} on: {}", reconfiguration.hash, view.context().getId(),
params().member().getId());
log.info("Genesis block: {} published with {} witnesses for: {} on: {}", reconfiguration.hash, witnesses.size(),
view.context().getId(), params().member().getId());
}

public void start() {
Expand Down Expand Up @@ -240,11 +249,7 @@ private void certify(Validate v) {
var member = view.context().getMember(Digest.from(v.getWitness().getId()));
if (member != null) {
witnesses.put(member, v);
if (witnesses.size() >= params().majority()) {
if (published.compareAndSet(false, true)) {
publish();
}
}
publish();
}
}

Expand All @@ -264,40 +269,46 @@ private DataSource dataSource() {
}

private void join(Join join) {
final var vm = join.getMember();
final var mid = Digest.from(vm.getId());
final var svm = join.getMember();
final var mid = Digest.from(svm.getVm().getId());
final var m = nextAssembly.get(mid);
if (m == null) {
if (log.isTraceEnabled()) {
log.trace("Invalid view member: {} on: {}", ViewContext.print(vm, params().digestAlgorithm()),
params().member().getId());
}
log.warn("Invalid view member: {} on: {}", ViewContext.print(svm, params().digestAlgorithm()),
params().member().getId());
return;
}
if (m.equals(params().member())) {
return; // Don't process ourselves
}
final var viewId = Digest.from(svm.getVm().getView());
if (!viewId.equals(params().genesisViewId())) {
log.warn("Invalid view id for member: {} on: {}", ViewContext.print(svm, params().digestAlgorithm()),
params().member().getId());
return;
}

PubKey encoded = vm.getConsensusKey();
if (!m.verify(signature(svm.getSignature()), svm.getVm().toByteString())) {
log.warn("Could not verify view member: {} on: {}", ViewContext.print(svm, params().digestAlgorithm()),
params().member().getId());
return;
}

if (!m.verify(signature(vm.getSignature()), encoded.toByteString())) {
if (log.isTraceEnabled()) {
log.trace("Could not verify consensus key from view member: {} on: {}",
ViewContext.print(vm, params().digestAlgorithm()), params().member().getId());
}
PubKey encoded = svm.getVm().getConsensusKey();

if (!m.verify(signature(svm.getVm().getSignature()), encoded.toByteString())) {
log.warn("Could not verify consensus key from view member: {} on: {}",
ViewContext.print(svm, params().digestAlgorithm()), params().member().getId());
return;
}

PublicKey consensusKey = publicKey(encoded);
if (consensusKey == null) {
if (log.isTraceEnabled()) {
log.trace("Could not deserialize consensus key from view member: {} on: {}",
ViewContext.print(vm, params().digestAlgorithm()), params().member().getId());
}
log.warn("Could not deserialize consensus key from view member: {} on: {}",
ViewContext.print(svm, params().digestAlgorithm()), params().member().getId());
return;
}
if (log.isTraceEnabled()) {
log.trace("Valid view member: {} on: {}", ViewContext.print(vm, params().digestAlgorithm()),
log.trace("Valid view member: {} on: {}", ViewContext.print(svm, params().digestAlgorithm()),
params().member().getId());
}
var proposed = proposals.computeIfAbsent(mid, k -> new Proposed(join, m));
Expand Down Expand Up @@ -326,29 +337,29 @@ private void validate(Validate v) {
log.warn("Unknown certifier: {} on: {}", cid, params().member().getId());
return; // do not have the join yet
}
final var hash = Digest.from(v.getHash());
final var member = nextAssembly.get(hash);
final var vid = Digest.from(v.getHash());
final var member = nextAssembly.get(vid);
if (member == null) {
return;
}
var proposed = proposals.get(hash);
var proposed = proposals.get(vid);
if (proposed == null) {
log.warn("Invalid certification, unknown view join: {} on: {}", hash, params().member().getId());
log.warn("Invalid certification, unknown view join: {} on: {}", vid, params().member().getId());
return; // do not have the join yet
}
if (!view.validate(proposed.join.getMember(), v)) {
log.warn("Invalid certification for view join: {} from: {} on: {}", hash,
log.warn("Invalid certification for view join: {} from: {} on: {}", vid,
Digest.from(v.getWitness().getId()), params().member().getId());
return;
}
var prev = proposed.certifications.put(certifier, v.getWitness());
if (prev == null) {
log.debug("New validation of view member: {} hash: {} using certifier: {} witnesses: {} on: {}",
member.getId(), hash, certifier.getId(), proposed.certifications.values().size(),
member.getId(), vid, certifier.getId(), proposed.certifications.values().size(),
params().member().getId());
} else {
log.debug("Redundant validation of view member: {} hash: {} using certifier: {} on: {}", member.getId(),
hash, certifier.getId(), params().member().getId());
vid, certifier.getId(), params().member().getId());
}
}

Expand Down
26 changes: 23 additions & 3 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -52,8 +53,9 @@ public class Producer {
private final TxDataSource ds;
private final int lastEpoch;
private final Set<Member> nextAssembly = new HashSet<>();
private final Map<Digest, PendingBlock> pending = new ConcurrentHashMap<>();
private final Map<Digest, PendingBlock> pending = new ConcurrentSkipListMap<>();
private final BlockingQueue<Reassemble> pendingReassembles = new LinkedBlockingQueue<>();
private final Map<Digest, List<Validate>> pendingValidations = new ConcurrentSkipListMap<>();
private final AtomicReference<HashedBlock> previousBlock = new AtomicReference<>();
private final AtomicBoolean reconfigured = new AtomicBoolean();
private final AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -218,6 +220,7 @@ private void create(List<ByteString> preblock, boolean last) {
p.witnesses.put(params().member(), validation);
log.debug("Created block: {} hash: {} height: {} prev: {} last: {} on: {}", next.block.getBodyCase(),
next.hash, next.height(), lb.hash, last, params().member().getId());
processPendingValidations(next, p);
}
if (last) {
started.set(true);
Expand All @@ -238,6 +241,16 @@ private Parameters params() {
return view.params();
}

private void processPendingValidations(HashedBlock block, PendingBlock p) {
var pending = pendingValidations.remove(block.hash);
if (pending != null) {
pending.forEach(v -> validate(v, p, block.hash));
if (p.witnesses.size() >= params().majority()) {
publish(p);
}
}
}

private void produceAssemble() {
final var vlb = previousBlock.get();
nextViewId = vlb.hash;
Expand All @@ -257,13 +270,13 @@ private void produceAssemble() {
ds.offer(validation);
log.debug("Produced view assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash,
assemble.height(), assemble.block.getBodyCase(), getViewId(), params().member().getId());
processPendingValidations(assemble, p);
}

private void publish(PendingBlock p) {
log.debug("Published pending: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(),
p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId());
p.published.set(true);
pending.remove(p.block.hash);
final var cb = CertifiedBlock.newBuilder()
.setBlock(p.block.block)
.addAllCertifications(
Expand All @@ -276,8 +289,13 @@ private PendingBlock validate(Validate v) {
Digest hash = Digest.from(v.getHash());
var p = pending.get(hash);
if (p == null) {
pendingValidations.computeIfAbsent(hash, h -> new CopyOnWriteArrayList<>()).add(v);
return null;
}
return validate(v, p, hash);
}

private PendingBlock validate(Validate v, PendingBlock p, Digest hash) {
if (!view.validate(p.block, v)) {
log.trace("Invalid validate for: {} hash: {} on: {}", p.block.block.getBodyCase(), hash,
params().member().getId());
Expand Down Expand Up @@ -312,6 +330,7 @@ public void assembled() {
log.info("Reconfiguration block: {} height: {} slate: {} produced on: {}", reconfiguration.hash,
reconfiguration.height(), slate.keySet().stream().map(m -> m.getId()).sorted().toList(),
params().member().getId());
processPendingValidations(reconfiguration, p);
}

@Override
Expand Down Expand Up @@ -353,6 +372,7 @@ public void checkpoint() {
p.witnesses.put(params().member(), validation);
log.info("Produced checkpoint: {} height: {} for: {} on: {}", next.hash, next.height(), getViewId(),
params().member().getId());
processPendingValidations(next, p);
transitions.checkpointed();
}

Expand Down
Loading

0 comments on commit eb2f42b

Please sign in to comment.