Skip to content

Commit

Permalink
BFT alignment (#204)
Browse files Browse the repository at this point in the history
* better logging. bootstrap sampling via successor cuts of ring 0 iteration.

checkpoint assembly gossip of reconfiguration committee members only.

Some general tidying up

* better logging

* set maven opts

* set surefire jvm mem opts

* fix logging, back out some exec/sched changes in routing

* cleanup

* anchor at first checkpoint correctly, filter unknown members

* allow providing the scheduler

* ignore self in gossip, better logging

* no aux populating

* ignore

* rename

* use super majority of observers. some clean up, crisp fpr defaults, etc

Fails on churn test @30 members due to knocking out > 1/4 of the observers.

* fix BFT logic, plus clean up

* double T/O

* use bftSlice for gorgoneion/maat successors

* use SliceIterator for Ethereal.simplify RingCommunications.

* squelch observer logging

* clean up, slice iterator traversal now copy on write array, cleaner Bootstrapper
  • Loading branch information
Hellblazer authored Jun 2, 2024
1 parent 04a0afa commit aa700a0
Show file tree
Hide file tree
Showing 55 changed files with 1,192 additions and 978 deletions.
1 change: 0 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on: [ push ]

jobs:
build:

runs-on: ubuntu-latest

steps:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
**/.classpath
**/.DS_Store
**/bin/
**/.run/
/bin/
*.csv
**/dependency-reduced-pom.xml
Expand Down
27 changes: 12 additions & 15 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,6 @@ private SubmitResult submit(Transaction request, Digest from) {
}

private Initial sync(Synchronize request, Digest from) {
if (from == null) {
return Initial.getDefaultInstance();
}
final HashedCertifiedBlock g = genesis.get();
if (g != null) {
Initial.Builder initial = Initial.newBuilder();
Expand All @@ -897,13 +894,10 @@ private Initial sync(Synchronize request, Digest from) {
}
final ULong lastReconfig = ULong.valueOf(cp.block.getHeader().getLastReconfig());
HashedCertifiedBlock lastView = null;
if (lastReconfig.equals(ULong.valueOf(0))) {
lastView = cp;
} else {
var stored = store.getCertifiedBlock(lastReconfig);
if (stored != null) {
lastView = new HashedCertifiedBlock(params.digestAlgorithm(), stored);
}

var stored = store.getCertifiedBlock(lastReconfig);
if (stored != null) {
lastView = new HashedCertifiedBlock(params.digestAlgorithm(), stored);
}
if (lastView == null) {
lastView = g;
Expand Down Expand Up @@ -1120,7 +1114,8 @@ public record PendingView(Digest diadem, Context<Member> context) {
*/
public View getView(Digest hash) {
var builder = View.newBuilder().setDiadem(diadem.toDigeste()).setMajority(context.majority());
Committee.viewMembersOf(hash, context).forEach(d -> builder.addCommittee(d.getId().toDigeste()));
((Context<? super Member>) context).bftSubset(hash).forEach(
d -> builder.addCommittee(d.getId().toDigeste()));
return builder.build();
}
}
Expand All @@ -1134,8 +1129,8 @@ public class Combiner implements Combine {
public void anchor() {
HashedCertifiedBlock anchor = pending.poll();
var pending = pendingViews.last().context;
if (anchor != null && pending.totalCount() >= pending.majority()) {
log.info("Synchronizing from anchor: {} cardinality: {} on: {}", anchor.hash, pending.totalCount(),
if (anchor != null && pending.size() >= pending.majority()) {
log.info("Synchronizing from anchor: {} cardinality: {} on: {}", anchor.hash, pending.size(),
params.member().getId());
transitions.bootstrap(anchor);
}
Expand Down Expand Up @@ -1181,9 +1176,10 @@ public void awaitSynchronization() {
synchronizationFailed();
} catch (IllegalStateException e) {
final var c = current.get();
Context<Member> memberContext = context();
log.debug(
"Synchronization quorum formation failed: {}, members: {} desired: {} required: {}, no anchor to recover from: {} on: {}",
e.getMessage(), context().totalCount(), context().getRingCount(), params.majority(),
e.getMessage(), memberContext.size(), context().getRingCount(), params.majority(),
c == null ? "<no formation>" : c.getClass().getSimpleName(), params.member().getId());
awaitSynchronization();
}
Expand Down Expand Up @@ -1227,7 +1223,8 @@ public void rotateViewKeys() {

private void synchronizationFailed() {
cancelSynchronization();
var activeCount = context().totalCount();
Context<Member> memberContext = context();
var activeCount = memberContext.size();
var majority = params.majority();
if (params.generateGenesis() && activeCount >= majority) {
if (current.get() == null && current.compareAndSet(null, new Formation())) {
Expand Down
14 changes: 1 addition & 13 deletions choam/src/main/java/com/salesforce/apollo/choam/Committee.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.grpc.StatusRuntimeException;
import org.slf4j.Logger;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -63,23 +62,12 @@ static Map<Member, Verifier> validatorsOf(Reconfigure reconfigure, Context<Membe
* Create a view based on the cut of the supplied hash across the rings of the base context
*/
static Context<Member> viewFor(Digest hash, Context<? super Member> baseContext) {
Set<Member> successors = viewMembersOf(hash, baseContext);
Set<Member> successors = (Set<Member>) baseContext.bftSubset(hash);
var newView = new StaticContext<>(hash, baseContext.getProbabilityByzantine(), 3, successors,
baseContext.getEpsilon(), successors.size());
return newView;
}

static Set<Member> viewMembersOf(Digest hash, Context<? super Member> baseContext) {
Set<Member> successors = new HashSet<>();
baseContext.successors(hash, m -> {
if (successors.size() == baseContext.getRingCount()) {
return false;
}
return successors.add(m);
});
return successors;
}

void accept(HashedCertifiedBlock next);

default void assemble(Assemble assemble) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.salesforce.apollo.choam.support.HashedCertifiedBlock;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock.NullBlock;
import com.salesforce.apollo.choam.support.OneShot;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.StaticContext;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.proto.PubKey;
Expand Down Expand Up @@ -64,9 +65,9 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
String label) {
view = vc;
ds = new OneShot();
nextAssembly = Committee.viewMembersOf(view.context().getId(), view.pendingViews().last().context())
.stream()
.collect(Collectors.toMap(Member::getId, m -> m));
Digest hash = view.context().getId();
nextAssembly = ((Set<Member>) ((Context<? super Member>) view.pendingViews().last().context()).bftSubset(
hash)).stream().collect(Collectors.toMap(Member::getId, m -> m));
if (!Dag.validate(nextAssembly.size())) {
throw new IllegalStateException("Invalid BFT cardinality: " + nextAssembly.size());
}
Expand Down Expand Up @@ -96,7 +97,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId());
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(),
transitions::process, transitions::nextEpoch, label);
coordinator = new ChRbcGossip(reContext, params().member(), controller.processor(), params().communications(),
coordinator = new ChRbcGossip(reContext.getId(), params().member(), nextAssembly.values(),
controller.processor(), params().communications(),
params().metrics() == null ? null : params().metrics().getGensisMetrics());
log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(),
reContext.getId(), nextAssembly.keySet(), params().member().getId());
Expand Down
18 changes: 9 additions & 9 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
producerParams.batchInterval(), producerParams.maxBatchCount(),
params().drainPolicy().build());

log.info("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch,
params.member().getId());
log.debug("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch,
params.member().getId());

var fsm = Fsm.construct(new DriveIn(), Transitions.class, Earner.INITIAL, true);
fsm.setName("Producer%s on: %s".formatted(getViewId(), params.member().getId()));
Expand All @@ -99,10 +99,10 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash

config.setLabel("Producer" + getViewId() + " on: " + params().member().getId());
var producerMetrics = params().metrics() == null ? null : params().metrics().getProducerMetrics();
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds,
(preblock, last) -> serial(preblock, last), this::newEpoch, label);
coordinator = new ChRbcGossip(view.context(), params().member(), controller.processor(),
params().communications(), producerMetrics);
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, this::serial,
this::newEpoch, label);
coordinator = new ChRbcGossip(view.context().getId(), params().member(), view.membership(),
controller.processor(), params().communications(), producerMetrics);
log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId());

var onConsensus = new CompletableFuture<ViewAssembly.Vue>();
Expand Down Expand Up @@ -343,9 +343,9 @@ private void reconfigure() {
pending.put(reconfiguration.hash, p);
p.witnesses.put(params().member(), validation);
ds.offer(validation);
log.info("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(),
reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(),
params().member().getId());
log.trace("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(),
reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(),
params().member().getId());
processPendingValidations(reconfiguration, p);

log.trace("Draining on: {}", params().member().getId());
Expand Down
4 changes: 2 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public static <T> CompletableFuture<T> retryNesting(Supplier<CompletableFuture<T
CompletableFuture<T> cf = supplier.get();
for (int i = 0; i < maxRetries; i++) {
final var attempt = i;
cf = cf.thenApply(CompletableFuture::completedFuture).exceptionally(__ -> {
log.trace("resubmitting after attempt: {}", attempt + 1);
cf = cf.thenApply(CompletableFuture::completedFuture).exceptionally(e -> {
log.info("resubmitting after attempt: {} exception: {}", attempt + 1, e.toString());
return supplier.get();
}).thenCompose(java.util.function.Function.identity());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void assemble(List<Assemblies> asses) {
.toList();
var views = asses.stream().flatMap(a -> a.getViewsList().stream()).filter(SignedViews::hasViews).toList();

log.info("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId());
log.debug("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId());

joins.forEach(sj -> join(sj.getJoin(), false));
if (selected != null) {
Expand All @@ -124,7 +124,7 @@ void assemble(List<Assemblies> asses) {
Digest.from(svs.getViews().getMember()), params().member().getId());
viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews());
} else {
log.info("Invalid views: {} from: {} on: {}",
log.warn("Invalid views: {} from: {} on: {}",
svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
Digest.from(svs.getViews().getMember()), params().member().getId());
}
Expand All @@ -140,14 +140,14 @@ void assemble(List<Assemblies> asses) {

boolean complete() {
if (selected == null) {
log.info("Cannot complete view assembly: {} as selected is null on: {}", nextViewId,
params().member().getId());
log.error("Cannot complete view assembly: {} as selected is null on: {}", nextViewId,
params().member().getId());
transitions.failed();
return false;
}
if (proposals.size() < selected.majority) {
log.info("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId,
proposals.keySet().stream().sorted().toList(), selected.majority, params().member().getId());
log.error("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId,
proposals.keySet().stream().sorted().toList(), selected.majority, params().member().getId());
transitions.failed();
return false;
}
Expand Down Expand Up @@ -290,9 +290,9 @@ private void propose() {
.setMember(params().member().getId().toDigeste())
.setVid(nextViewId.toDigeste())
.build();
log.info("Proposing for: {} views: {} on: {}", nextViewId,
views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
params().member().getId());
log.debug("Proposing for: {} views: {} on: {}", nextViewId,
views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
params().member().getId());
publisher.accept(Assemblies.newBuilder()
.addViews(
SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey;
Expand Down Expand Up @@ -135,6 +136,10 @@ public Signer getSigner() {
return signer;
}

public Set<Member> membership() {
return validators.keySet();
}

/**
* The process has failed
*/
Expand Down Expand Up @@ -244,7 +249,7 @@ public boolean validate(SignedViews sv) {
Verifier v = verifierOf(sv);
if (v == null) {
if (log.isDebugEnabled()) {
log.debug("no verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()),
log.debug("No verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()),
params.member().getId());
}
return false;
Expand Down
Loading

0 comments on commit aa700a0

Please sign in to comment.