From ace496ad27c6a5be00fece8eef66b73815752323 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Fri, 26 Apr 2024 17:24:02 -0700 Subject: [PATCH] interim. moar --- .../com/salesforce/apollo/choam/CHOAM.java | 108 ++++--- .../salesforce/apollo/choam/Committee.java | 3 + .../apollo/choam/GenesisAssembly.java | 1 + .../com/salesforce/apollo/choam/Producer.java | 46 ++- .../salesforce/apollo/choam/ViewAssembly.java | 266 +++++++++++------- .../salesforce/apollo/choam/fsm/Genesis.java | 5 + .../apollo/choam/fsm/Reconfiguration.java | 49 ++-- .../apollo/choam/support/TxDataSource.java | 10 +- .../salesforce/apollo/choam/DynamicTest.java | 8 +- .../salesforce/apollo/choam/TestCHOAM.java | 11 +- .../apollo/choam/Transactioneer.java | 3 +- .../apollo/cryptography/Verifier.java | 8 +- .../apollo/state/AbstractLifecycleTest.java | 30 +- .../apollo/state/CheckpointBootstrapTest.java | 9 +- 14 files changed, 341 insertions(+), 216 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index ad08aff39..e861285c1 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -36,6 +36,7 @@ import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.MessageAdapter; import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.Msg; import com.salesforce.apollo.messaging.proto.AgedMessageOrBuilder; +import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; import io.grpc.StatusRuntimeException; import org.h2.mvstore.MVMap; @@ -47,6 +48,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.security.KeyPair; +import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -1105,7 +1107,7 @@ public record PendingView(Digest diadem, Context context) { * * @param hash - the "cut" across the rings of the context, determining the successors and thus the committee * members of the view - * @return the View determined by this Context and the supplied hash value + * @return the Vue determined by this Context and the supplied hash value */ public View getView(Digest hash) { var builder = View.newBuilder().setDiadem(diadem.toDigeste()).setMajority(context.majority()); @@ -1326,6 +1328,11 @@ public Parameters params() { @Override public SubmitResult submitTxn(Transaction transaction) { + if (!started.get()) { + log.trace("Failed submitting txn: {} no servers available in: {} on: {}", + hashOf(transaction, params.digestAlgorithm()), viewId, params.member().getId()); + return SubmitResult.newBuilder().setResult(Result.ERROR_SUBMITTING).setErrorMsg("Shutdown").build(); + } if (!servers.hasNext()) { log.trace("Failed submitting txn: {} no servers available in: {} on: {}", hashOf(transaction, params.digestAlgorithm()), viewId, params.member().getId()); @@ -1365,6 +1372,8 @@ public boolean validate(HashedCertifiedBlock hb) { } private void join(View view) { + log.info("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + params.member().getId()); var joining = new CompletableFuture(); if (!join.compareAndSet(null, joining)) { log.info("Ongoing join of: {} should have been cancelled on: {}", Digest.from(view.getDiadem()), @@ -1374,44 +1383,71 @@ private void join(View view) { } var servers = new GroupIterator(validators.keySet()); var joined = new HashSet(); - Thread.ofVirtual().start(Utils.wrapped(() -> { - while (!joining.isDone() && joined.size() < view.getMajority() && servers.hasNext()) { - Member target = servers.next(); - try (var link = comm.connect(target)) { - if (link == null) { - log.debug("No link for: {} for joining: {} on: {}", target.getId(), - Digest.from(view.getDiadem()), params.member().getId()); - continue; - } - log.trace("Joining view: {} diadem: {} on: {}", viewId, Digest.from(view.getDiadem()), - params.member().getId()); - final var c = next.get(); - var inView = ViewMember.newBuilder(c.member) - .setDiadem(view.getDiadem()) - .setView(nextViewId.get().toDigeste()) - .build(); - var svm = SignedViewMember.newBuilder() - .setVm(inView) - .setSignature(params.member().sign(inView.toByteString()).toSig()) - .build(); - try { - link.join(svm); - joined.add(target); - } catch (Throwable t) { - log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target, nextViewId, - Digest.from(view.getDiadem()), params.member().getId(), t); - } - } catch (StatusRuntimeException e) { - log.trace("Failed join attempt with: {} view: {} diadem: {} status:{} on: {}", target, - nextViewId, Digest.from(view.getDiadem()), e.getStatus(), params.member().getId()); - } catch (Throwable e) { - log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target, nextViewId, - Digest.from(view.getDiadem()), params.member().getId(), e); + + var delay = Duration.ofMillis(Entropy.nextSecureInt(100)); + + Thread.ofVirtual().start(() -> { + log.error("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + params.member().getId()); + while (!joining.isDone() && joined.size() < view.getMajority()) { + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; } + join(view, servers, joined); } + log.info("Finishing join of: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + params.member().getId()); joining.complete(null); - log.info("Finishing join of: {} on: {}", Digest.from(view.getDiadem()), params.member().getId()); - }, log)); + }); + } + + private void join(View view, GroupIterator servers, HashSet joined) { + Member target = servers.next(); + if (joined.contains(target)) { + log.trace("Already joined with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId.get(), + Digest.from(view.getDiadem()), params.member().getId()); + return; + } + try (var link = comm.connect(target)) { + join(view, link, target, joined); + } catch (StatusRuntimeException e) { + log.trace("Failed join attempt with: {} view: {} diadem: {} status:{} on: {}", target.getId(), + nextViewId, Digest.from(view.getDiadem()), e.getStatus(), params.member().getId()); + } catch (Throwable e) { + log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId, + Digest.from(view.getDiadem()), params.member().getId(), e); + } + } + + private void join(View view, Terminal link, Member target, HashSet joined) { + if (link == null) { + log.debug("No link for: {} for joining: {} on: {}", target.getId(), Digest.from(view.getDiadem()), + params.member().getId()); + return; + } + log.trace("Joining view: {} diadem: {} on: {}", viewId, Digest.from(view.getDiadem()), + params.member().getId()); + final var c = next.get(); + var inView = ViewMember.newBuilder(c.member) + .setDiadem(view.getDiadem()) + .setView(nextViewId.get().toDigeste()) + .build(); + var svm = SignedViewMember.newBuilder() + .setVm(inView) + .setSignature(params.member().sign(inView.toByteString()).toSig()) + .build(); + try { + link.join(svm); + joined.add(target); + log.trace("Joined with: {} view: {} diadem: {} on: {}", target.getId(), viewId, + Digest.from(view.getDiadem()), params.member().getId()); + } catch (Throwable t) { + log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId, + Digest.from(view.getDiadem()), params.member().getId(), t); + } } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Committee.java b/choam/src/main/java/com/salesforce/apollo/choam/Committee.java index cd204d819..bbb2713ec 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Committee.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Committee.java @@ -90,6 +90,9 @@ default void assemble(Assemble assemble) { boolean isMember(); default void join(SignedViewMember nextView, Digest from) { + log().trace("Error joining by: {} view: {} diadem: {} invalid committee: {} on: {}", from, + Digest.from(nextView.getVm().getView()), Digest.from(nextView.getVm().getView()), + this.getClass().getSimpleName(), params().member().getId()); throw new StatusRuntimeException(ABORTED); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index 865a954b5..c8d4e41a4 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -237,6 +237,7 @@ public void stop() { private void certify(Validate v) { if (reconfiguration == null) { pendingValidations.add(v); + return; } log.trace("Validating reconfiguration block: {} height: {} on: {}", reconfiguration.hash, reconfiguration.height(), params().member().getId()); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 90a1da30f..c3412ccb8 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -27,7 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -48,7 +51,6 @@ public class Producer { private final ChRbcGossip coordinator; private final TxDataSource ds; private final Map pending = new ConcurrentSkipListMap<>(); - private final List pendingAssemblies = new CopyOnWriteArrayList<>(); private final Map> pendingValidations = new ConcurrentSkipListMap<>(); private final AtomicReference previousBlock = new AtomicReference<>(); private final AtomicBoolean started = new AtomicBoolean(false); @@ -105,7 +107,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash params().communications(), producerMetrics); log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId()); - var onConsensus = new CompletableFuture(); + var onConsensus = new CompletableFuture(); onConsensus.whenComplete((v, throwable) -> { if (throwable == null) { produceAssemble(v); @@ -115,11 +117,14 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash }); assembly = new ViewAssembly(nextViewId, view, Producer.this::addAssembly, onConsensus) { @Override - public void complete() { - super.complete(); - log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(), - params().member().getId()); - assembled = true; + public boolean complete() { + if (super.complete()) { + log.debug("Vue reconfiguration: {} gathered: {} complete on: {}", nextViewId, + getSlate().keySet().stream().sorted().toList(), params().member().getId()); + assembled = true; + return true; + } + return false; } }; } @@ -212,10 +217,13 @@ private Digest getViewId() { private void newEpoch(Integer epoch) { log.trace("new epoch: {} on: {}", epoch, params().member().getId()); + assembly.newEpoch(); var last = epoch >= maxEpoch && assembled; if (last) { controller.completeIt(); Producer.this.transitions.viewComplete(); + } else { + ds.reset(); } transitions.newEpoch(epoch, last); } @@ -225,17 +233,10 @@ private Parameters params() { } private void processAssemblies(List aggregate) { - var joins = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList(); - final var ass = assembly; - if (ass != null) { - log.trace("Consuming {} units, {} assemblies on: {}", aggregate.size(), joins.size(), - params().member().getId()); - ass.inbound().accept(joins); - } else { - log.trace("Pending {} units, {} assemblies on: {}", aggregate.size(), joins.size(), - params().member().getId()); - pendingAssemblies.addAll(joins); - } + var aggs = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList(); + log.trace("Consuming {} assemblies from {} units on: {}", aggregate.size(), aggs.size(), + params().member().getId()); + assembly.assemble(aggs); } private void processPendingValidations(HashedBlock block, PendingBlock p) { @@ -277,7 +278,7 @@ private void processTransactions(boolean last, List aggregate) { } } - private void produceAssemble(ViewAssembly.View v) { + private void produceAssemble(ViewAssembly.Vue v) { final var vlb = previousBlock.get(); var ass = Assemble.newBuilder() .setView(View.newBuilder() @@ -294,7 +295,7 @@ private void produceAssemble(ViewAssembly.View v) { pending.put(assemble.hash, p); p.witnesses.put(params().member(), validation); ds.offer(validation); - log.debug("View assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash, + log.debug("Vue assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash, assemble.height(), assemble.block.getBodyCase(), getViewId(), params().member().getId()); transitions.assembled(); } @@ -385,10 +386,7 @@ private class DriveIn implements Driven { public void assemble() { log.debug("Starting view diadem consensus for: {} on: {}", nextViewId, params().member().getId()); startProduction(); - var joins = new ArrayList<>(pendingAssemblies); - pendingAssemblies.clear(); assembly.start(); - assembly.inbound().accept(joins); } @Override diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index 045e41af2..47d7cad78 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -19,7 +19,6 @@ import com.salesforce.apollo.cryptography.proto.Digeste; import com.salesforce.apollo.cryptography.proto.PubKey; import com.salesforce.apollo.membership.Member; -import io.grpc.StatusRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,13 +29,15 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; import static com.salesforce.apollo.choam.ViewContext.print; import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey; import static com.salesforce.apollo.cryptography.QualifiedBase64.signature; -import static io.grpc.Status.ABORTED; /** * View reconfiguration. Attempts to create a new view reconfiguration. The protocol comes to an agreement on the @@ -57,11 +58,14 @@ public class ViewAssembly { private final Consumer publisher; private final Map slate = new HashMap<>(); private final ViewContext view; - private final CompletableFuture onConsensus; - private volatile View selected; + private final CompletableFuture onConsensus; + private final AtomicInteger countdown = new AtomicInteger(); + private final List pendingJoins = new CopyOnWriteArrayList<>(); + private final AtomicBoolean started = new AtomicBoolean(false); + private volatile Vue selected; public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publisher, - CompletableFuture onConsensus) { + CompletableFuture onConsensus) { view = vc; this.nextViewId = nextViewId; this.publisher = publisher; @@ -80,48 +84,116 @@ public Map getSlate() { } public void start() { + if (!started.compareAndSet(false, true)) { + return; + } transitions.fsm().enterStartState(); - transitions.assembled(); } - void complete() { + void assemble(List asses) { + if (!started.get()) { + return; + } + + if (asses.isEmpty()) { + return; + } + + var joins = asses.stream() + .flatMap(a -> a.getJoinsList().stream()) + .filter(view -> !proposals.containsKey(Digest.from(view.getJoin().getVm().getId()))) + .filter(signedJoin -> !SignedJoin.getDefaultInstance().equals(signedJoin)) + .filter(view::validate) + .toList(); + var views = asses.stream().flatMap(a -> a.getViewsList().stream()).filter(SignedViews::hasViews).toList(); + + log.info("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId()); + + joins.forEach(sj -> join(sj.getJoin(), false)); + if (selected != null) { + if (!views.isEmpty()) { + log.trace("Already selected: {}, ignoring views: {} on: {}", selected.diadem, views.size(), + params().member().getId()); + } + return; + } + views.forEach(svs -> { + if (view.validate(svs)) { + log.info("Adding views: {} from: {} on: {}", + svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + Digest.from(svs.getViews().getMember()), params().member().getId()); + viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews()); + if (viewProposals.size() == params().context().getRingCount()) { + transitions.certified(); + countdown.set(-1); + } + } else { + log.info("Invalid views: {} from: {} on: {}", + svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + Digest.from(svs.getViews().getMember()), params().member().getId()); + } + }); + } + + boolean complete() { if (selected == null) { log.info("Cannot complete view assembly: {} as selected is null on: {}", nextViewId, params().member().getId()); transitions.failed(); - return; + return false; } if (proposals.size() < selected.majority) { log.info("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId, proposals.keySet().stream().toList(), selected.majority, params().member().getId()); transitions.failed(); - return; + return false; } + proposals.forEach((d, svm) -> slate.put(d, Join.newBuilder().setMember(svm).build())); + // Fill out the proposals with the unreachable members of the next assembly - Sets.difference(selected.assembly.keySet(), proposals.keySet()) - .forEach(m -> proposals.put(m, SignedViewMember.newBuilder() - .setVm(ViewMember.newBuilder() - .setId(m.toDigeste()) - .setView(nextViewId.toDigeste())) - .build())); + var missing = Sets.difference(selected.assembly.keySet(), proposals.keySet()); + if (!missing.isEmpty()) { + log.info("Missing proposals: {} on: {}", missing.stream().sorted().toList(), params().member().getId()); + missing.forEach(m -> slate.put(m, Join.newBuilder() + .setMember(SignedViewMember.newBuilder() + .setVm(ViewMember.newBuilder() + .setId(m.toDigeste()) + .setView( + nextViewId.toDigeste()))) + .build())); + } + assert slate.size() == selected.assembly.size() : "Invalid slate: " + slate.size() + " expected: " + + selected.assembly.size(); log.debug("View Assembly: {} completed with: {} members on: {}", nextViewId, slate.size(), params().member().getId()); transitions.complete(); + return true; } - Consumer> inbound() { - return lre -> lre.forEach(this::assemble); - } + void join(SignedViewMember svm, boolean direct) { + if (!started.get()) { + return; + } - boolean join(SignedViewMember svm, boolean direct) { final var mid = Digest.from(svm.getVm().getId()); + if (proposals.containsKey(mid)) { + log.trace("Redundant join from: {} on: {}", print(svm, params().digestAlgorithm()), + params().member().getId()); + return; + } + if (selected == null) { + pendingJoins.add(svm); + log.trace("Pending join from: {} on: {}", print(svm, params().digestAlgorithm()), + params().member().getId()); + return; + } final var m = selected.assembly.get(mid); if (m == null) { if (log.isTraceEnabled()) { log.trace("Invalid view member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } - throw new StatusRuntimeException(ABORTED); + return; } var viewId = Digest.from(svm.getVm().getView()); if (!nextViewId.equals(viewId)) { @@ -129,7 +201,7 @@ boolean join(SignedViewMember svm, boolean direct) { log.trace("Invalid view id for member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } - return false; + return; } if (log.isDebugEnabled()) { log.debug("Join of: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); @@ -140,7 +212,7 @@ boolean join(SignedViewMember svm, boolean direct) { log.trace("Invalid signature for view member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } - return false; + return; } PubKey encoded = svm.getVm().getConsensusKey(); @@ -150,7 +222,7 @@ boolean join(SignedViewMember svm, boolean direct) { log.trace("Could not verify consensus key from view member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } - return false; + return; } PublicKey consensusKey = publicKey(encoded); @@ -159,54 +231,35 @@ boolean join(SignedViewMember svm, boolean direct) { log.trace("Could not deserialize consensus key from view member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } - return false; + return; } - if (proposals.putIfAbsent(mid, svm) == null) { - if (direct) { - var signature = view.sign(svm); - publisher.accept(Assemblies.newBuilder() - .addJoins(SignedJoin.newBuilder() - .setJoin(svm) - .setMember(params().member().getId().toDigeste()) - .setSignature(signature.toSig()) - .build()) - .build()); - if (log.isTraceEnabled()) { - log.trace("Publishing view member: {} sig: {} on: {}", print(svm, params().digestAlgorithm()), - params().digestAlgorithm().digest(signature.toSig().toByteString()), - params().member().getId()); - } - } else if (log.isTraceEnabled()) { - log.trace("Adding discovered view member: {} on: {}", print(svm, params().digestAlgorithm()), + + if (direct) { + var signature = view.sign(svm); + publisher.accept(Assemblies.newBuilder() + .addJoins(SignedJoin.newBuilder() + .setJoin(svm) + .setMember(params().member().getId().toDigeste()) + .setSignature(signature.toSig()) + .build()) + .build()); + if (log.isTraceEnabled()) { + log.trace("Publishing view member: {} sig: {} on: {}", print(svm, params().digestAlgorithm()), + params().digestAlgorithm().digest(signature.toSig().toByteString()), params().member().getId()); } + } else if (proposals.putIfAbsent(mid, svm) == null) { + log.trace("Adding discovered view member: {} on: {}", print(svm, params().digestAlgorithm()), + params().member().getId()); } - if (proposals.size() == selected.assembly.size()) { - Thread.ofVirtual().start(transitions::gathered); - } - return true; + checkAssembly(); } - private void assemble(Assemblies ass) { - log.info("Assembling {} joins and {} views on: {}", ass.getJoinsCount(), ass.getViewsCount(), - params().member().getId()); - ass.getJoinsList().stream().filter(view::validate).forEach(sj -> join(sj.getJoin(), false)); - if (selected != null) { - return; + void newEpoch() { + var current = countdown.decrementAndGet(); + if (current == 0) { + transitions.certified(); } - for (SignedViews svs : ass.getViewsList()) { - if (view.validate(svs)) { - log.info("Adding views: {} from: {} on: {}", - svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), - Digest.from(svs.getViews().getMember()), params().member().getId()); - viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews()); - } else { - log.info("Invalid views: {} from: {} on: {}", - svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), - Digest.from(svs.getViews().getMember()), params().member().getId()); - } - } - vote(); } private Map assemblyOf(List committee) { @@ -216,24 +269,37 @@ private Map assemblyOf(List committee) { .collect(Collectors.toMap(Member::getId, m -> m)); } - private void castVote() { + private void checkAssembly() { + if (proposals.size() >= selected.majority) { + countdown.set(-1); + transitions.certified(); + } + } + + private Parameters params() { + return view.params(); + } + + private void propose() { var views = view.pendingViews() .getViews(nextViewId) .setMember(params().member().getId().toDigeste()) .setVid(nextViewId.toDigeste()) .build(); - log.info("Voting for: {} on: {}", nextViewId, params().member().getId()); + log.info("Proposing for: {} views: {} on: {}", nextViewId, + views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + params().member().getId()); publisher.accept(Assemblies.newBuilder() .addViews( SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig())) .build()); + countdown.set(2); } - private void castVote(Views vs, List majorities, - Multiset consensus) { + private void propose(Views vs, List majorities, Multiset consensus) { var ordered = vs.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(); var lastIndex = -1; - com.salesforce.apollo.choam.proto.View last = null; + View last = null; for (var v : majorities) { var i = ordered.indexOf(Digest.from(v.getDiadem())); if (i != -1) { @@ -248,12 +314,8 @@ private void castVote(Views vs, List maj } } - private Parameters params() { - return view.params(); - } - private void vote() { - Multiset candidates = HashMultiset.create(); + Multiset candidates = HashMultiset.create(); viewProposals.values().forEach(v -> candidates.addAll(v.getViewsList())); var majority = params().majority(); var majorities = candidates.entrySet() @@ -273,8 +335,8 @@ private void vote() { log.trace("Majority views: {} on: {}", majorities.stream().map(v -> Digest.from(v.getDiadem())).toList(), params().member().getId()); } - Multiset consensus = HashMultiset.create(); - viewProposals.values().forEach(vs -> castVote(vs, majorities, consensus)); + Multiset consensus = HashMultiset.create(); + viewProposals.values().forEach(vs -> propose(vs, majorities, consensus)); var ratification = consensus.entrySet() .stream() .filter(e -> e.getCount() >= majority) @@ -289,16 +351,18 @@ private void vote() { params().member().getId()); } var winner = ratification.getFirst(); - selected = new View(Digest.from(winner.getDiadem()), assemblyOf(winner.getCommitteeList()), - winner.getMajority()); + selected = new Vue(Digest.from(winner.getDiadem()), assemblyOf(winner.getCommitteeList()), + winner.getMajority()); if (log.isDebugEnabled()) { log.debug("Selected: {} on: {}", selected, params().member().getId()); } onConsensus.complete(selected); - transitions.viewDetermined(); + transitions.certified(); + pendingJoins.forEach(svm -> join(svm, false)); + pendingJoins.clear(); } - record View(Digest diadem, Map assembly, int majority) { + public record Vue(Digest diadem, Map assembly, int majority) { @Override public String toString() { return "View{" + "diadem=" + diadem + ", assembly=" + assembly.keySet().stream().sorted().toList() + '}'; @@ -308,22 +372,29 @@ public String toString() { private class Recon implements Reconfiguration { @Override public void certify() { - if (proposals.size() >= selected.majority) { + if (proposals.size() == selected.majority) { log.debug("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority, nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); - proposals.forEach((key, value) -> { - slate.put(key, joinOf(value)); - }); - log.debug("Electing view: {} slate: {} on: {}", nextViewId, slate.keySet().stream().sorted().toList(), - params().member().getId()); + proposals.forEach((key, value) -> slate.put(key, joinOf(value))); transitions.certified(); } else { + countdown.set(2); log.debug("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority, - proposals.entrySet().stream().sorted().toList(), nextViewId, params().member().getId()); + proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); } } + public void checkAssembly() { + countdown.set(2); + ViewAssembly.this.checkAssembly(); + } + + public void checkViews() { + countdown.set(2); + vote(); + } + @Override public void complete() { ViewAssembly.this.complete(); @@ -331,14 +402,14 @@ public void complete() { @Override public void elect() { - if (selected != null && proposals.size() == selected.assembly().size()) { - proposals.forEach((key, value) -> slate.put(key, joinOf(value))); - log.debug("Electing view: {} slate: {} on: {}", nextViewId, slate.keySet().stream().sorted().toList(), - params().member().getId()); + if (selected != null && proposals.size() >= selected.majority) { + log.debug("Electing view: {} required: {} proposed: {} on: {}", nextViewId, selected.majority, + proposals.keySet().stream().sorted().toList(), params().member().getId()); transitions.complete(); } else { - log.error("Failed election; selected: {} proposed: {} of: {} on: {}", selected != null, - proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); + log.error("Failed election selected: {} required: {} proposed: {} of: {} on: {}", selected != null, + selected != null ? -1 : selected.majority, proposals.keySet().stream().sorted().toList(), + nextViewId, params().member().getId()); transitions.failed(); } } @@ -350,8 +421,13 @@ public void failed() { } @Override - public void viewAgreement() { - ViewAssembly.this.castVote(); + public void finish() { + started.set(false); + } + + @Override + public void publishViews() { + propose(); } private Join joinOf(SignedViewMember vm) { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java index 0cdf3ac47..e917bc716 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java @@ -41,6 +41,11 @@ public Transitions process(List preblock, boolean last) { context().certify(preblock, last); return last ? PUBLISH : null; } + + @Override + public Transitions nextEpoch(Integer epoch) { + return null; + } }, FAIL { }, INITIAL { @Entry diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java index 5ebef559b..7740e9032 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java @@ -15,18 +15,29 @@ public interface Reconfiguration { void certify(); + void checkAssembly(); + + void checkViews(); + void complete(); void elect(); void failed(); - void viewAgreement(); + void finish(); + + void publishViews(); enum Reconfigure implements Transitions { AWAIT_ASSEMBLY { + @Entry + public void publish() { + context().publishViews(); + } + @Override - public Transitions assembled() { + public Transitions certified() { return VIEW_AGREEMENT; } }, CERTIFICATION { @@ -39,27 +50,17 @@ public Transitions certified() { public void certify() { context().certify(); } - - @Override - public Transitions gathered() { - return CERTIFICATION; - } }, GATHER { @Override - public Transitions gathered() { + public Transitions certified() { return CERTIFICATION; } - @Override - public Transitions viewDetermined() { - return null; + @Entry + public void gather() { + context().checkAssembly(); } }, PROTOCOL_FAILURE { - @Override - public Transitions assembled() { - return null; - } - @Override public Transitions certified() { return null; @@ -92,6 +93,7 @@ public void elect() { }, RECONFIGURED { @Override public Transitions complete() { + context().finish(); return null; } @@ -102,20 +104,17 @@ public void completion() { }, VIEW_AGREEMENT { @Entry public void viewConsensus() { - context().viewAgreement(); + context().checkViews(); } @Override - public Transitions viewDetermined() { + public Transitions certified() { return GATHER; } } } interface Transitions extends FsmExecutor { - default Transitions assembled() { - throw fsm().invalidTransitionOn(); - } default Transitions certified() { throw fsm().invalidTransitionOn(); @@ -128,13 +127,5 @@ default Transitions complete() { default Transitions failed() { return Reconfigure.PROTOCOL_FAILURE; } - - default Transitions gathered() { - return null; - } - - default Transitions viewDetermined() { - throw fsm().invalidTransitionOn(); - } } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java index 350908e90..8053d4878 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java @@ -65,13 +65,13 @@ public void close() { if (metrics != null) { metrics.dropped(processing.size(), validations.size(), assemblies.size()); } - log.trace("Closing with remaining txns: {}({}:{}) validations: {} assemblies: {} on: {}", processing.size(), + log.debug("Closing with remaining txns: {}({}:{}) validations: {} assemblies: {} on: {}", processing.size(), processing.added(), processing.taken(), validations.size(), assemblies.size(), member.getId()); } public void drain() { draining.set(true); - log.trace("Draining with remaining txns: {}({}:{}) on: {}", processing.size(), processing.added(), + log.debug("Draining with remaining txns: {}({}:{}) on: {}", processing.size(), processing.added(), processing.taken(), member.getId()); } @@ -171,4 +171,10 @@ public boolean offer(Transaction txn) { public void offer(Validate generateValidation) { validations.offer(generateValidation); } + + public void reset() { + log.debug("Clearing with remaining txns: {}({}:{}) validations: {} assemblies: {} on: {}", processing.size(), + processing.added(), processing.taken(), validations.size(), assemblies.size(), member.getId()); + processing.clear(); + } } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java index 5e64bf6e5..1465f1868 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java @@ -70,18 +70,18 @@ public void setUp() throws Exception { var template = Parameters.newBuilder() .setGenerateGenesis(true) .setBootstrap(Parameters.BootstrapParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(20)) + .setGossipDuration(Duration.ofMillis(5)) .build()) .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) .setGossipDuration(Duration.ofMillis(10)) .setProducer(Parameters.ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(20)) - .setBatchInterval(Duration.ofMillis(10)) + .setGossipDuration(Duration.ofMillis(5)) + .setBatchInterval(Duration.ofMillis(5)) .setMaxBatchByteSize(1024 * 1024) .setMaxBatchCount(10_000) .setEthereal(Config.newBuilder() .setNumberOfEpochs(3) - .setEpochLength(20)) + .setEpochLength(7)) .build()) .setCheckpointBlockDelta(checkpointBlockSize) .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index d29eea582..232bad3e9 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -20,6 +20,7 @@ import com.salesforce.apollo.context.StaticContext; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; import com.salesforce.apollo.stereotomy.StereotomyImpl; @@ -102,11 +103,11 @@ public void before() throws Exception { .setMaxBatchByteSize(200 * 1024 * 1024) .setGossipDuration(Duration.ofMillis(10)) .setBatchInterval(Duration.ofMillis(50)) + .setEthereal(Config.newBuilder() + .setNumberOfEpochs(3) + .setEpochLength(7)) .build()) - .setCheckpointBlockDelta(1); - if (LARGE_TESTS) { - params.getProducer().ethereal().setNumberOfEpochs(5).setEpochLength(60); - } + .setCheckpointBlockDelta(3); checkpointOccurred = new CompletableFuture<>(); var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy); @@ -202,7 +203,7 @@ public void submitMultiplTxn() throws Exception { .build() .report(); } - assertTrue(checkpointOccurred.get()); + assertTrue(checkpointOccurred.get(5, TimeUnit.SECONDS)); } private Function wrap(Function checkpointer) { diff --git a/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java b/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java index eaa4fdc9c..3ce6f88b4 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java @@ -25,8 +25,7 @@ class Transactioneer { private final static Random entropy = new Random(); private final static Logger log = LoggerFactory.getLogger(Transactioneer.class); private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); - private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual() - .factory()); + private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final AtomicInteger completed = new AtomicInteger(); private final CountDownLatch countdown; private final List> inFlight = new CopyOnWriteArrayList<>(); diff --git a/cryptography/src/main/java/com/salesforce/apollo/cryptography/Verifier.java b/cryptography/src/main/java/com/salesforce/apollo/cryptography/Verifier.java index 181a5d52a..48e94f796 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/cryptography/Verifier.java +++ b/cryptography/src/main/java/com/salesforce/apollo/cryptography/Verifier.java @@ -155,8 +155,8 @@ public Filtered filtered(SigningThreshold threshold, JohnHancock signature, Inpu } @Override - public boolean verify(JohnHancock signature, InputStream message) { - return false; + public String toString() { + return ""; } @Override @@ -164,6 +164,10 @@ public boolean verify(SigningThreshold threshold, JohnHancock signature, InputSt return false; } + @Override + public boolean verify(JohnHancock signature, InputStream message) { + return false; + } } class MockVerifier implements Verifier { diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index dded314b5..8a6820c5c 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -20,6 +20,7 @@ import com.salesforce.apollo.context.DynamicContextImpl; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; @@ -55,11 +56,11 @@ * @author hal.hildebrand */ abstract public class AbstractLifecycleTest { - protected static final int CARDINALITY = 5; - private static final Digest GENESIS_VIEW_ID = DigestAlgorithm.DEFAULT.digest( + protected static final int CARDINALITY = 5; + private static final Digest GENESIS_VIEW_ID = DigestAlgorithm.DEFAULT.digest( "Give me food or give me slack or kill me".getBytes()); - protected final AtomicReference checkpointHeight = new AtomicReference<>(); - protected final Map updaters = new HashMap<>(); + protected final AtomicReference checkpointHeight = new AtomicReference<>(); + protected final Map updaters = new HashMap<>(); // static { // ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Session.class)).setLevel(Level.TRACE); // ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(CHOAM.class)).setLevel(Level.TRACE); @@ -70,15 +71,15 @@ abstract public class AbstractLifecycleTest { // ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Fsm.class)).setLevel(Level.TRACE); // ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(TxDataSource.class)).setLevel(Level.TRACE); // } - private final List GENESIS_DATA; - private final Map parameters = new HashMap<>(); - protected SecureRandom entropy; - protected CountDownLatch checkpointOccurred; - protected Map choams; - protected List members; - protected Map routers; - protected SigningMember testSubject; - protected int toleranceLevel; + private final List GENESIS_DATA; + private final Map parameters = new HashMap<>(); + protected SecureRandom entropy; + protected CountDownLatch checkpointOccurred; + protected Map choams; + protected List members; + protected Map routers; + protected SigningMember testSubject; + protected int toleranceLevel; DynamicContextImpl context; private File baseDir; private File checkpointDirBase; @@ -356,6 +357,9 @@ private Builder parameters(Context context) { .setBatchInterval(Duration.ofMillis(10)) .setMaxBatchByteSize(1024 * 1024) .setMaxBatchCount(3000) + .setEthereal(Config.newBuilder() + .setEpochLength(7) + .setNumberOfEpochs(3)) .build()) .setGossipDuration(Duration.ofMillis(10)) .setCheckpointBlockDelta(checkpointBlockSize()) diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CheckpointBootstrapTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CheckpointBootstrapTest.java index 6b00a6385..d15827def 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CheckpointBootstrapTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CheckpointBootstrapTest.java @@ -7,6 +7,7 @@ package com.salesforce.apollo.state; import com.salesforce.apollo.context.DynamicContext; +import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.utils.Utils; import org.joou.ULong; import org.junit.jupiter.api.Test; @@ -37,7 +38,7 @@ public void checkpointBootstrap() throws Exception { checkpointOccurred.await(30, TimeUnit.SECONDS); ULong chkptHeight = checkpointHeight.get(); - assertNotNull(chkptHeight, "Null checkpoint height!"); + assertNotNull(chkptHeight, "No checkpoint"); System.out.println("Checkpoint at height: " + chkptHeight); var processed = Utils.waitForCondition(10_000, 1_000, () -> { @@ -64,11 +65,11 @@ public void checkpointBootstrap() throws Exception { System.out.println("Starting late joining node"); var choam = choams.get(testSubject.getId()); - ((DynamicContext) choam.context().delegate()).activate(testSubject); + ((DynamicContext) choam.context().delegate()).activate(testSubject); choam.start(); routers.get(testSubject.getId()).start(); - assertTrue(Utils.waitForCondition(30_000, 1_000, () -> choam.active()), + assertTrue(Utils.waitForCondition(30_000, 1_000, choam::active), "Test subject did not become active: " + choam.logState()); members.add(testSubject); post(); @@ -76,7 +77,7 @@ public void checkpointBootstrap() throws Exception { @Override protected int checkpointBlockSize() { - return 3; + return 2; } @Override