diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 0d8b428c5..ad08aff39 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -9,6 +9,7 @@ import com.chiralbehaviors.tron.Fsm; import com.google.common.base.Function; import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; @@ -61,6 +62,8 @@ import static com.salesforce.apollo.choam.support.HashedBlock.height; import static com.salesforce.apollo.cryptography.QualifiedBase64.bs; import static com.salesforce.apollo.cryptography.QualifiedBase64.digest; +import static io.grpc.Status.FAILED_PRECONDITION; +import static io.grpc.Status.INVALID_ARGUMENT; /** * Combine Honnete Ober Advancer Mercantiles. @@ -94,6 +97,7 @@ public class CHOAM { private final TransSubmission txnSubmission = new TransSubmission(); private final AtomicReference view = new AtomicReference<>(); private final PendingViews pendingViews = new PendingViews(); + private final AtomicReference> join = new AtomicReference<>(); public CHOAM(Parameters params) { this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build()); @@ -246,6 +250,17 @@ public static List toGenesisData(List initializa .toList(); } + private static Block assembly(AtomicReference nextViewId, View view, HashedBlock head, + HashedBlock lastViewChange, Parameters params, HashedBlock lastCheckpoint) { + var body = Assemble.newBuilder().setView(view).build(); + return Block.newBuilder() + .setHeader( + buildHeader(params.digestAlgorithm(), body, head.hash, ULong.valueOf(0), lastCheckpoint.height(), + lastCheckpoint.hash, lastViewChange.height(), lastViewChange.hash)) + .setAssemble(body) + .build(); + } + public boolean active() { final var c = current.get(); HashedCertifiedBlock h = head.get(); @@ -464,7 +479,6 @@ private void combine(Msg m) { private BlockProducer constructBlock() { return new BlockProducer() { - @Override public Block checkpoint() { return CHOAM.this.checkpoint(); @@ -497,6 +511,17 @@ public void onFailure() { transitions.fail(); } + @Override + public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) { + final HashedCertifiedBlock v = view.get(); + return Block.newBuilder() + .setHeader( + buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(), + checkpoint.hash, v.height(), v.hash)) + .setAssemble(assemble) + .build(); + } + @Override public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) { final HashedCertifiedBlock v = view.get(); @@ -609,18 +634,16 @@ private boolean isNext(HashedBlock next) { return isNext; } - private SignedViewMember join(Digest nextView, Digest from) { - final var c = next.get(); - var inView = ViewMember.newBuilder(c.member).setView(nextView.toDigeste()).build(); - - if (log.isDebugEnabled()) { - log.debug("Joining view: {} from: {} view member: {} on: {}", nextView, from, - ViewContext.print(inView, params.digestAlgorithm()), params.member().getId()); + private Empty join(SignedViewMember nextView, Digest from) { + var c = current.get(); + if (c == null) { + log.trace("No committee for: {} to join: {} diadem: {} on: {}", from, + Digest.from(nextView.getVm().getView()), Digest.from(nextView.getVm().getDiadem()), + params.member().getId()); + throw new StatusRuntimeException(FAILED_PRECONDITION); } - return SignedViewMember.newBuilder() - .setVm(inView) - .setSignature(params.member().sign(inView.toByteString()).toSig()) - .build(); + c.join(nextView, from); + return Empty.getDefaultInstance(); } private Supplier pendingViews() { @@ -645,6 +668,11 @@ private void process() { reconfigure(h.hash, h.block.getGenesis().getInitialView()); break; } + case ASSEMBLE: { + params.processor().beginBlock(h.height(), h.hash); + c.assemble(h.block.getAssemble()); + break; + } case EXECUTIONS: { params.processor().beginBlock(h.height(), h.hash); execute(h.block.getExecutions().getExecutionsList()); @@ -666,6 +694,10 @@ private void process() { private void reconfigure(Digest hash, Reconfigure reconfigure) { log.info("Setting next view id: {} on: {}", hash, params.member().getId()); + var j = join.getAndSet(null); + if (j != null) { + j.cancel(true); + } nextViewId.set(hash); var pv = pendingViews.advance(); if (pv != null) { @@ -683,8 +715,9 @@ private void reconfigure(Digest hash, Reconfigure reconfigure) { if (Dag.validate(validators.size())) { current.set(new Associate(h, validators, currentView)); } else { - log.warn("Reconfiguration to associate failed: {} in view: {} on:{}", validators.size(), - new Digest(reconfigure.getId()), params.member().getId()); + log.warn("Reconfiguration to associate failed: {} committee: {} in view: {} on:{}", validators.size(), + new Digest(reconfigure.getId()), current.get().getClass().getSimpleName(), + params.member().getId()); transitions.fail(); } } else { @@ -745,8 +778,8 @@ private void restore() throws IllegalStateException { view.set(lastView); var validators = validatorsOf(reconfigure, params.context(), params.member().getId(), log); current.set(new Synchronizer(validators)); - log.info("Reconfigured to checkpoint view: {} on: {}", new Digest(reconfigure.getId()), - params.member().getId()); + log.info("Reconfigured to checkpoint view: {} committee: {} on: {}", new Digest(reconfigure.getId()), + current.get().getClass().getSimpleName(), params.member().getId()); } log.info("Restored to: {} lastView: {} lastCheckpoint: {} lastBlock: {} on: {}", geni.hash, view.get().hash, @@ -980,6 +1013,8 @@ public interface BlockProducer { void onFailure(); + Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint); + Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint); void publish(Digest hash, CertifiedBlock cb); @@ -1073,7 +1108,7 @@ public record PendingView(Digest diadem, Context context) { * @return the View determined by this Context and the supplied hash value */ public View getView(Digest hash) { - var builder = View.newBuilder().setDiadem(diadem.toDigeste()); + var builder = View.newBuilder().setDiadem(diadem.toDigeste()).setMajority(context.majority()); Committee.viewMembersOf(hash, context).forEach(d -> builder.addCommittee(d.getId().toDigeste())); return builder.build(); } @@ -1163,9 +1198,9 @@ public void fail() { @Override public void recover(HashedCertifiedBlock anchor) { - log.info("Anchor discovered: {} hash: {} height: {} on: {}", anchor.block.getBodyCase(), anchor.hash, - anchor.height(), params.member().getId()); current.set(new Formation()); + log.info("Anchor discovered: {} hash: {} height: {} committee: {} on: {}", anchor.block.getBodyCase(), + anchor.hash, anchor.height(), current.get().getClass().getSimpleName(), params.member().getId()); CHOAM.this.recover(anchor); } @@ -1220,8 +1255,9 @@ public Blocks fetchViewChain(BlockReplication request, Digest from) { } @Override - public SignedViewMember join(Digest nextView, Digest from) { - return CHOAM.this.join(nextView, from); + public Empty join(SignedViewMember nextView, Digest from) { + CHOAM.this.join(nextView, from); + return Empty.getDefaultInstance(); } @Override @@ -1248,6 +1284,19 @@ public void accept(HashedCertifiedBlock hb) { process(); } + @Override + public void assemble(Assemble assemble) { + var mid = params.member().getId(); + var view = assemble.getView(); + if (view.getCommitteeList().stream().map(Digest::from).noneMatch(mid::equals)) { + log.info("Assemble view: {}; Not associate: {} in diadem: {} on: {}", viewId, + getClass().getSimpleName(), Digest.from(view.getDiadem()), mid); + return; + } + log.info("Assemble view: {}; Associate in diadem: {} on: {}", viewId, Digest.from(view.getDiadem()), mid); + join(view); + } + @Override public void complete() { } @@ -1314,6 +1363,56 @@ public SubmitResult submitTxn(Transaction transaction) { public boolean validate(HashedCertifiedBlock hb) { return validate(hb, validators); } + + private void join(View view) { + var joining = new CompletableFuture(); + if (!join.compareAndSet(null, joining)) { + log.info("Ongoing join of: {} should have been cancelled on: {}", Digest.from(view.getDiadem()), + params.member().getId()); + transitions.fail(); + return; + } + var servers = new GroupIterator(validators.keySet()); + var joined = new HashSet(); + Thread.ofVirtual().start(Utils.wrapped(() -> { + while (!joining.isDone() && joined.size() < view.getMajority() && servers.hasNext()) { + Member target = servers.next(); + try (var link = comm.connect(target)) { + if (link == null) { + log.debug("No link for: {} for joining: {} on: {}", target.getId(), + Digest.from(view.getDiadem()), params.member().getId()); + continue; + } + log.trace("Joining view: {} diadem: {} on: {}", viewId, Digest.from(view.getDiadem()), + params.member().getId()); + final var c = next.get(); + var inView = ViewMember.newBuilder(c.member) + .setDiadem(view.getDiadem()) + .setView(nextViewId.get().toDigeste()) + .build(); + var svm = SignedViewMember.newBuilder() + .setVm(inView) + .setSignature(params.member().sign(inView.toByteString()).toSig()) + .build(); + try { + link.join(svm); + joined.add(target); + } catch (Throwable t) { + log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target, nextViewId, + Digest.from(view.getDiadem()), params.member().getId(), t); + } + } catch (StatusRuntimeException e) { + log.trace("Failed join attempt with: {} view: {} diadem: {} status:{} on: {}", target, + nextViewId, Digest.from(view.getDiadem()), e.getStatus(), params.member().getId()); + } catch (Throwable e) { + log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target, nextViewId, + Digest.from(view.getDiadem()), params.member().getId(), e); + } + } + joining.complete(null); + log.info("Finishing join of: {} on: {}", Digest.from(view.getDiadem()), params.member().getId()); + }, log)); + } } /** a member of the current committee */ @@ -1335,7 +1434,7 @@ private class Associate extends Administration { var pv = pendingViews(); producer = new Producer(nextViewId.get(), new ViewContext(context, params, pv, signer, validators, constructBlock()), - head.get(), checkpoint.get(), comm, getLabel()); + head.get(), checkpoint.get(), getLabel()); producer.start(); } @@ -1344,6 +1443,17 @@ public void complete() { producer.stop(); } + @Override + public void join(SignedViewMember nextView, Digest from) { + if (!from.equals(Digest.from(nextView.getVm().getId()))) { + log.trace("Join from: {} does not match {} from join: {} diadem: {} on: {}", from, + Digest.from(nextView.getVm().getId()), Digest.from(nextView.getVm().getView()), + Digest.from(nextView.getVm().getDiadem()), params.member().getId()); + throw new StatusRuntimeException(INVALID_ARGUMENT); + } + producer.join(nextView); + } + @Override public SubmitResult submit(Transaction request) { return producer.submit(request); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Committee.java b/choam/src/main/java/com/salesforce/apollo/choam/Committee.java index 144883eee..cd204d819 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Committee.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Committee.java @@ -6,11 +6,8 @@ */ package com.salesforce.apollo.choam; -import com.salesforce.apollo.choam.proto.Certification; -import com.salesforce.apollo.choam.proto.Reconfigure; -import com.salesforce.apollo.choam.proto.SubmitResult; +import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.choam.proto.SubmitResult.Result; -import com.salesforce.apollo.choam.proto.Transaction; import com.salesforce.apollo.choam.support.HashedCertifiedBlock; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.StaticContext; @@ -20,14 +17,17 @@ import com.salesforce.apollo.cryptography.Verifier.DefaultVerifier; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.MockMember; +import io.grpc.StatusRuntimeException; import org.slf4j.Logger; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey; +import static io.grpc.Status.ABORTED; /** * @author hal.hildebrand @@ -82,10 +82,17 @@ static Set viewMembersOf(Digest hash, Context baseContex void accept(HashedCertifiedBlock next); + default void assemble(Assemble assemble) { + } + void complete(); boolean isMember(); + default void join(SignedViewMember nextView, Digest from) { + throw new StatusRuntimeException(ABORTED); + } + Logger log(); void nextView(Digest diadem, Context pendingView); @@ -153,17 +160,18 @@ default boolean validate(HashedCertifiedBlock hb, Map validato } } final int toleranceLevel = params.context().toleranceLevel(); - log().trace("Validate: {} height: {} count: {} needed: {} on: {}}", hb.hash, hb.height(), valid, toleranceLevel, + log().trace("Validate: {} height: {} count: {} needed: {} on: {}", hb.hash, hb.height(), valid, toleranceLevel, params.member().getId()); return valid > toleranceLevel; } default boolean validateRegeneration(HashedCertifiedBlock hb) { - if (!hb.block.hasGenesis()) { + if (!Objects.requireNonNull(hb.block).hasGenesis()) { return false; } var reconfigure = hb.block.getGenesis().getInitialView(); var validators = validatorsOf(reconfigure, params().context(), params().member().getId(), log()); return !validators.isEmpty() && validate(hb, validators); } + } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index afaf5c97b..90a1da30f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -9,8 +9,6 @@ import com.chiralbehaviors.tron.Fsm; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; -import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; -import com.salesforce.apollo.choam.comm.Terminal; import com.salesforce.apollo.choam.fsm.Driven; import com.salesforce.apollo.choam.fsm.Driven.Earner; import com.salesforce.apollo.choam.fsm.Driven.Transitions; @@ -30,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; @@ -43,58 +42,51 @@ */ public class Producer { - private static final Logger log = LoggerFactory.getLogger(Producer.class); - private final AtomicBoolean assembled = new AtomicBoolean(); - private final AtomicReference checkpoint = new AtomicReference<>(); - private final CommonCommunications comms; - private final Ethereal controller; - private final ChRbcGossip coordinator; - private final TxDataSource ds; - private final int lastEpoch; - private final Map nextAssembly = new HashMap<>(); - private final Map pending = new ConcurrentSkipListMap<>(); - private final List pendingAssemblies = new CopyOnWriteArrayList<>(); - private final Map> pendingValidations = new ConcurrentSkipListMap<>(); - private final AtomicReference previousBlock = new AtomicReference<>(); - private final AtomicBoolean reconfigured = new AtomicBoolean(); - private final AtomicBoolean started = new AtomicBoolean(false); - private final Transitions transitions; - private final ViewContext view; - private final Digest nextViewId; - private final Semaphore serialize = new Semaphore(1); - private ViewAssembly assembly; - - public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, - CommonCommunications comms, String label) { + private static final Logger log = LoggerFactory.getLogger(Producer.class); + private final AtomicReference checkpoint = new AtomicReference<>(); + private final Ethereal controller; + private final ChRbcGossip coordinator; + private final TxDataSource ds; + private final Map pending = new ConcurrentSkipListMap<>(); + private final List pendingAssemblies = new CopyOnWriteArrayList<>(); + private final Map> pendingValidations = new ConcurrentSkipListMap<>(); + private final AtomicReference previousBlock = new AtomicReference<>(); + private final AtomicBoolean started = new AtomicBoolean(false); + private final Transitions transitions; + private final ViewContext view; + private final Digest nextViewId; + private final Semaphore serialize = new Semaphore(1); + private final ViewAssembly assembly; + private final int maxEpoch; + private volatile boolean assembled = false; + + public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label) { assert view != null; this.view = view; this.previousBlock.set(lastBlock); - this.comms = comms; this.checkpoint.set(checkpoint); this.nextViewId = nextViewId; final Parameters params = view.params(); final var producerParams = params.producer(); - final Builder ep = producerParams.ethereal(); - - lastEpoch = ep.getNumberOfEpochs() - 1; + final Builder ep = producerParams.ethereal().clone(); // Number of rounds we can provide data for final var blocks = ep.getEpochLength() - 2; - final int maxElements = blocks * lastEpoch; + maxEpoch = ep.getEpochLength(); - ds = new TxDataSource(params.member(), maxElements, params.metrics(), producerParams.maxBatchByteSize(), + ds = new TxDataSource(params.member(), blocks, params.metrics(), producerParams.maxBatchByteSize(), producerParams.batchInterval(), producerParams.maxBatchCount(), params().drainPolicy().build()); - log.info("Producer max elements: {} reconfiguration epoch: {} on: {}", maxElements, lastEpoch, + log.info("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch, params.member().getId()); var fsm = Fsm.construct(new DriveIn(), Transitions.class, Earner.INITIAL, true); fsm.setName("Producer%s on: %s".formatted(getViewId(), params.member().getId())); transitions = fsm.getTransitions(); - Config.Builder config = params().producer().ethereal().clone(); + Config.Builder config = ep.setNumberOfEpochs(-1); // Canonical assignment of members -> pid for Ethereal Short pid = view.roster().get(params().member().getId()); @@ -112,13 +104,34 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash coordinator = new ChRbcGossip(view.context(), params().member(), controller.processor(), params().communications(), producerMetrics); log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId()); + + var onConsensus = new CompletableFuture(); + onConsensus.whenComplete((v, throwable) -> { + if (throwable == null) { + produceAssemble(v); + } else { + log.warn("Error in view consensus on: {}", params.member().getId(), throwable); + } + }); + assembly = new ViewAssembly(nextViewId, view, Producer.this::addAssembly, onConsensus) { + @Override + public void complete() { + super.complete(); + log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(), + params().member().getId()); + assembled = true; + } + }; + } + + public void join(SignedViewMember viewMember) { + assembly.join(viewMember, true); } public void start() { if (!started.compareAndSet(false, true)) { return; } - reconfigure(); final Block prev = previousBlock.get().block; // genesis block won't ever be 0 if (prev.hasReconfigure() && prev.getReconfigure().getCheckpointTarget() == 0) { @@ -135,10 +148,6 @@ public void stop() { log.trace("Closing producer for: {} on: {}", getViewId(), params().member().getId()); controller.stop(); coordinator.stop(); - final var c = assembly; - if (c != null) { - c.stop(); - } ds.close(); } @@ -155,9 +164,11 @@ public SubmitResult submit(Transaction transaction) { private void addAssembly(Assemblies assemblies) { if (ds.offer(assemblies)) { - log.trace("Adding on: {}", params().member().getId()); + log.trace("Adding {} joins, {} views on: {}", assemblies.getJoinsCount(), assemblies.getViewsCount(), + params().member().getId()); } else { - log.trace("Cannot add join on: {}", params().member().getId()); + log.trace("Cannot add {} joins, {} views on: {}", assemblies.getJoinsCount(), assemblies.getViewsCount(), + params().member().getId()); } } @@ -201,7 +212,12 @@ private Digest getViewId() { private void newEpoch(Integer epoch) { log.trace("new epoch: {} on: {}", epoch, params().member().getId()); - transitions.newEpoch(epoch, lastEpoch); + var last = epoch >= maxEpoch && assembled; + if (last) { + controller.completeIt(); + Producer.this.transitions.viewComplete(); + } + transitions.newEpoch(epoch, last); } private Parameters params() { @@ -261,6 +277,28 @@ private void processTransactions(boolean last, List aggregate) { } } + private void produceAssemble(ViewAssembly.View v) { + final var vlb = previousBlock.get(); + var ass = Assemble.newBuilder() + .setView(View.newBuilder() + .setDiadem(v.diadem().toDigeste()) + .setMajority(v.majority()) + .addAllCommittee( + v.assembly().keySet().stream().sorted().map(Digest::toDigeste).toList())) + .build(); + final var assemble = new HashedBlock(params().digestAlgorithm(), + view.produce(vlb.height().add(1), vlb.hash, ass, checkpoint.get())); + previousBlock.set(assemble); + final var validation = view.generateValidation(assemble); + final var p = new PendingBlock(assemble, new HashMap<>(), new AtomicBoolean()); + pending.put(assemble.hash, p); + p.witnesses.put(params().member(), validation); + ds.offer(validation); + log.debug("View assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash, + assemble.height(), assemble.block.getBodyCase(), getViewId(), params().member().getId()); + transitions.assembled(); + } + private void publish(PendingBlock p) { assert p.witnesses.size() >= params().majority() : "Publishing non majority block"; log.debug("Published pending: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(), @@ -275,22 +313,28 @@ private void publish(PendingBlock p) { } private void reconfigure() { - log.debug("Starting view reconfiguration: {} on: {}", nextViewId, params().member().getId()); - assembly = new ViewAssembly(nextViewId, view, Producer.this::addAssembly, comms) { - @Override - public void complete() { - super.complete(); - log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(), - params().member().getId()); - assembled.set(true); - Producer.this.transitions.viewComplete(); - } - }; - assembly.start(); - assembly.assembled(); - var joins = new ArrayList<>(pendingAssemblies); - pendingAssemblies.clear(); - assembly.inbound().accept(joins); + final var slate = assembly.getSlate(); + assert slate != null && !slate.isEmpty() : "Slate is incorrect: %s".formatted( + slate.keySet().stream().sorted().toList()); + var reconfiguration = new HashedBlock(params().digestAlgorithm(), + view.reconfigure(slate, nextViewId, previousBlock.get(), + checkpoint.get())); + var validation = view.generateValidation(reconfiguration); + final var p = new PendingBlock(reconfiguration, new HashMap<>(), new AtomicBoolean()); + pending.put(reconfiguration.hash, p); + p.witnesses.put(params().member(), validation); + ds.offer(validation); + log.info("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(), + reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(), + params().member().getId()); + processPendingValidations(reconfiguration, p); + + log.trace("Draining on: {}", params().member().getId()); + ds.drain(); + final var dropped = ds.getRemainingTransactions(); + if (dropped != 0) { + log.warn("Dropped txns: {} on: {}", dropped, params().member().getId()); + } } private void serial(List preblock, Boolean last) { @@ -338,46 +382,13 @@ record PendingBlock(HashedBlock block, Map witnesses, AtomicBo private class DriveIn implements Driven { @Override - public void assembled() { - if (!reconfigured.compareAndSet(false, true)) { - log.debug("assembly already complete on: {}", params().member().getId()); - return; - } - final var slate = assembly.getSlate(); - var reconfiguration = new HashedBlock(params().digestAlgorithm(), - view.reconfigure(slate, nextViewId, previousBlock.get(), - checkpoint.get())); - var validation = view.generateValidation(reconfiguration); - final var p = new PendingBlock(reconfiguration, new HashMap<>(), new AtomicBoolean()); - pending.put(reconfiguration.hash, p); - p.witnesses.put(params().member(), validation); - ds.offer(validation); - // controller.completeIt(); - log.info("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(), - reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(), - params().member().getId()); - processPendingValidations(reconfiguration, p); - } - - @Override - public void checkAssembly() { - ds.drain(); - final var dropped = ds.getRemainingTransactions(); - if (dropped != 0) { - log.warn("Dropped txns: {} on: {}", dropped, params().member().getId()); - } - final var viewAssembly = assembly; - if (viewAssembly == null) { - log.error("Assemble block never processed on: {}", params().member().getId()); - transitions.failed(); - return; - } - viewAssembly.finalElection(); - log.debug("Final view assembly election on: {}", params().member().getId()); - if (assembled.get()) { - assembled(); - controller.completeIt(); - } + public void assemble() { + log.debug("Starting view diadem consensus for: {} on: {}", nextViewId, params().member().getId()); + startProduction(); + var joins = new ArrayList<>(pendingAssemblies); + pendingAssemblies.clear(); + assembly.start(); + assembly.inbound().accept(joins); } @Override @@ -419,6 +430,11 @@ public void fail() { view.onFailure(); } + @Override + public void reconfigure() { + Producer.this.reconfigure(); + } + @Override public void startProduction() { log.debug("Starting production for: {} on: {}", getViewId(), params().member().getId()); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index ec7b83dcc..045e41af2 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -10,8 +10,6 @@ import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; import com.google.common.collect.Sets; -import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; -import com.salesforce.apollo.choam.comm.Terminal; import com.salesforce.apollo.choam.fsm.Reconfiguration; import com.salesforce.apollo.choam.fsm.Reconfiguration.Reconfigure; import com.salesforce.apollo.choam.fsm.Reconfiguration.Transitions; @@ -21,26 +19,24 @@ import com.salesforce.apollo.cryptography.proto.Digeste; import com.salesforce.apollo.cryptography.proto.PubKey; import com.salesforce.apollo.membership.Member; -import com.salesforce.apollo.ring.SliceIterator; -import com.salesforce.apollo.utils.Utils; +import io.grpc.StatusRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.security.PublicKey; -import java.time.Duration; -import java.util.*; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; import static com.salesforce.apollo.choam.ViewContext.print; import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey; import static com.salesforce.apollo.cryptography.QualifiedBase64.signature; +import static io.grpc.Status.ABORTED; /** * View reconfiguration. Attempts to create a new view reconfiguration. The protocol comes to an agreement on the @@ -53,26 +49,23 @@ */ public class ViewAssembly { - private final static Logger log = LoggerFactory.getLogger(ViewAssembly.class); - protected final Transitions transitions; - private final AtomicBoolean cancelSlice = new AtomicBoolean(); - private final Digest nextViewId; - private final Map viewProposals = new ConcurrentHashMap<>(); - private final Map proposals = new ConcurrentHashMap<>(); - private final Consumer publisher; - private final Map slate = new HashMap<>(); - private final ViewContext view; - private final CommonCommunications comms; - private final Set polled = Collections.newSetFromMap( - new ConcurrentSkipListMap<>()); - private volatile View selected; + private final static Logger log = LoggerFactory.getLogger(ViewAssembly.class); + protected final Transitions transitions; + private final Digest nextViewId; + private final Map viewProposals = new ConcurrentHashMap<>(); + private final Map proposals = new ConcurrentHashMap<>(); + private final Consumer publisher; + private final Map slate = new HashMap<>(); + private final ViewContext view; + private final CompletableFuture onConsensus; + private volatile View selected; public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publisher, - CommonCommunications comms) { + CompletableFuture onConsensus) { view = vc; this.nextViewId = nextViewId; this.publisher = publisher; - this.comms = comms; + this.onConsensus = onConsensus; final Fsm fsm = Fsm.construct(new Recon(), Transitions.class, Reconfigure.AWAIT_ASSEMBLY, true); @@ -88,13 +81,6 @@ public Map getSlate() { public void start() { transitions.fsm().enterStartState(); - } - - public void stop() { - cancelSlice.set(true); - } - - void assembled() { transitions.assembled(); } @@ -105,7 +91,12 @@ void complete() { transitions.failed(); return; } - cancelSlice.set(true); + if (proposals.size() < selected.majority) { + log.info("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId, + proposals.keySet().stream().toList(), selected.majority, params().member().getId()); + transitions.failed(); + return; + } // Fill out the proposals with the unreachable members of the next assembly Sets.difference(selected.assembly.keySet(), proposals.keySet()) .forEach(m -> proposals.put(m, SignedViewMember.newBuilder() @@ -115,9 +106,6 @@ void complete() { .build())); log.debug("View Assembly: {} completed with: {} members on: {}", nextViewId, slate.size(), params().member().getId()); - } - - void finalElection() { transitions.complete(); } @@ -125,113 +113,7 @@ Consumer> inbound() { return lre -> lre.forEach(this::assemble); } - private void assemble(Assemblies ass) { - log.info("Assembling {} joins and {} views on: {}", ass.getJoinsCount(), ass.getViewsCount(), - params().member().getId()); - ass.getJoinsList().stream().filter(view::validate).forEach(sj -> join(sj.getJoin(), false)); - if (selected != null) { - return; - } - for (SignedViews svs : ass.getViewsList()) { - if (view.validate(svs)) { - log.info("Adding views: {} from: {}", - svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), - Digest.from(svs.getViews().getMember())); - viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews()); - } else { - log.info("Invalid views: {} from: {} on: {}", - svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), - Digest.from(svs.getViews().getMember()), params().member().getId()); - } - } - vote(); - } - - private Map assemblyOf(List committee) { - var last = view.pendingViews().last(); - return committee.stream() - .map(d -> last.context().getMember(Digest.from(d))) - .collect(Collectors.toMap(m -> m.getId(), m -> m)); - } - - private void castVote() { - var views = view.pendingViews() - .getViews(nextViewId) - .setMember(params().member().getId().toDigeste()) - .setVid(nextViewId.toDigeste()) - .build(); - log.info("Voting for: {} on: {}", nextViewId, params().member().getId()); - publisher.accept(Assemblies.newBuilder() - .addViews( - SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig())) - .build()); - } - - private void castVote(Views vs, List majorities, - Multiset consensus) { - var ordered = vs.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(); - var lastIndex = -1; - com.salesforce.apollo.choam.proto.View last = null; - for (var v : majorities) { - var i = ordered.indexOf(Digest.from(v.getDiadem())); - if (i != -1) { - if (i > lastIndex) { - last = v; - lastIndex = i; - } - } - } - if (last != null) { - consensus.add(last); - } - } - - private void completeSlice(Duration retryDelay, AtomicReference reiterate) { - if (gathered()) { - return; - } - - log.trace("Proposal incomplete of: {} polled: {}, total: {} retrying: {} on: {}", nextViewId, - polled.stream().sorted().toList(), selected.assembly.size(), retryDelay, params().member().getId()); - if (!cancelSlice.get()) { - Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()) - .schedule(() -> Thread.ofVirtual().start(Utils.wrapped(reiterate.get(), log)), - retryDelay.toNanos(), TimeUnit.NANOSECONDS); - } - } - - private boolean consider(Optional futureSailor, Terminal term, Member m) { - if (futureSailor.isEmpty()) { - return !gathered(); - } - SignedViewMember signedViewMember; - signedViewMember = futureSailor.get(); - if (signedViewMember.equals(SignedViewMember.getDefaultInstance())) { - log.debug("Empty join response from: {} on: {}", term.getMember().getId(), params().member().getId()); - return !gathered(); - } - var vm = new Digest(signedViewMember.getVm().getId()); - if (!m.getId().equals(vm)) { - log.debug("Invalid join response from: {} expected: {} on: {}", term.getMember().getId(), vm, - params().member().getId()); - return !gathered(); - } - log.debug("Join reply: {} on: {}", print(signedViewMember, params().digestAlgorithm()), - params().member().getId()); - join(signedViewMember, true); - return !gathered(); - } - - private boolean gathered() { - if (polled.size() == selected.assembly.size()) { - log.trace("Polled all +"); - cancelSlice.set(true); - return true; - } - return false; - } - - private void join(SignedViewMember svm, boolean direct) { + boolean join(SignedViewMember svm, boolean direct) { final var mid = Digest.from(svm.getVm().getId()); final var m = selected.assembly.get(mid); if (m == null) { @@ -239,7 +121,7 @@ private void join(SignedViewMember svm, boolean direct) { log.trace("Invalid view member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } - return; + throw new StatusRuntimeException(ABORTED); } var viewId = Digest.from(svm.getVm().getView()); if (!nextViewId.equals(viewId)) { @@ -247,7 +129,7 @@ private void join(SignedViewMember svm, boolean direct) { log.trace("Invalid view id for member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } - return; + return false; } if (log.isDebugEnabled()) { log.debug("Join of: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); @@ -258,7 +140,7 @@ private void join(SignedViewMember svm, boolean direct) { log.trace("Invalid signature for view member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } - return; + return false; } PubKey encoded = svm.getVm().getConsensusKey(); @@ -268,7 +150,7 @@ private void join(SignedViewMember svm, boolean direct) { log.trace("Could not verify consensus key from view member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } - return; + return false; } PublicKey consensusKey = publicKey(encoded); @@ -277,9 +159,8 @@ private void join(SignedViewMember svm, boolean direct) { log.trace("Could not deserialize consensus key from view member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } - return; + return false; } - polled.add(mid); if (proposals.putIfAbsent(mid, svm) == null) { if (direct) { var signature = view.sign(svm); @@ -300,6 +181,71 @@ private void join(SignedViewMember svm, boolean direct) { params().member().getId()); } } + if (proposals.size() == selected.assembly.size()) { + Thread.ofVirtual().start(transitions::gathered); + } + return true; + } + + private void assemble(Assemblies ass) { + log.info("Assembling {} joins and {} views on: {}", ass.getJoinsCount(), ass.getViewsCount(), + params().member().getId()); + ass.getJoinsList().stream().filter(view::validate).forEach(sj -> join(sj.getJoin(), false)); + if (selected != null) { + return; + } + for (SignedViews svs : ass.getViewsList()) { + if (view.validate(svs)) { + log.info("Adding views: {} from: {} on: {}", + svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + Digest.from(svs.getViews().getMember()), params().member().getId()); + viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews()); + } else { + log.info("Invalid views: {} from: {} on: {}", + svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + Digest.from(svs.getViews().getMember()), params().member().getId()); + } + } + vote(); + } + + private Map assemblyOf(List committee) { + var last = view.pendingViews().last(); + return committee.stream() + .map(d -> last.context().getMember(Digest.from(d))) + .collect(Collectors.toMap(Member::getId, m -> m)); + } + + private void castVote() { + var views = view.pendingViews() + .getViews(nextViewId) + .setMember(params().member().getId().toDigeste()) + .setVid(nextViewId.toDigeste()) + .build(); + log.info("Voting for: {} on: {}", nextViewId, params().member().getId()); + publisher.accept(Assemblies.newBuilder() + .addViews( + SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig())) + .build()); + } + + private void castVote(Views vs, List majorities, + Multiset consensus) { + var ordered = vs.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(); + var lastIndex = -1; + com.salesforce.apollo.choam.proto.View last = null; + for (var v : majorities) { + var i = ordered.indexOf(Digest.from(v.getDiadem())); + if (i != -1) { + if (i > lastIndex) { + last = v; + lastIndex = i; + } + } + } + if (last != null) { + consensus.add(last); + } } private Parameters params() { @@ -313,7 +259,7 @@ private void vote() { var majorities = candidates.entrySet() .stream() .filter(e -> e.getCount() >= majority) - .map(e -> e.getElement()) + .map(Multiset.Entry::getElement) .toList(); if (majorities.isEmpty()) { log.info("No majority views: {} on: {}", candidates.entrySet() @@ -328,15 +274,13 @@ private void vote() { params().member().getId()); } Multiset consensus = HashMultiset.create(); - viewProposals.values().forEach(vs -> { - castVote(vs, majorities, consensus); - }); + viewProposals.values().forEach(vs -> castVote(vs, majorities, consensus)); var ratification = consensus.entrySet() .stream() .filter(e -> e.getCount() >= majority) .map(Multiset.Entry::getElement) .sorted(Comparator.comparing(view -> Digest.from(view.getDiadem()))) - .collect(Collectors.toList()); + .toList(); if (consensus.isEmpty()) { log.debug("No consensus views on: {}", params().member().getId()); return; @@ -345,21 +289,16 @@ private void vote() { params().member().getId()); } var winner = ratification.getFirst(); - selected = new View(Digest.from(winner.getDiadem()), assemblyOf(winner.getCommitteeList())); + selected = new View(Digest.from(winner.getDiadem()), assemblyOf(winner.getCommitteeList()), + winner.getMajority()); if (log.isDebugEnabled()) { log.debug("Selected: {} on: {}", selected, params().member().getId()); } + onConsensus.complete(selected); transitions.viewDetermined(); } - private record View(Digest diadem, Map assembly) { - public View(CHOAM.PendingView view, Digest viewId) { - this(view.diadem(), Committee.viewMembersOf(viewId, view.context()) - .stream() - .collect(Collectors.toMap(Member::getId, m -> m))); - - } - + record View(Digest diadem, Map assembly, int majority) { @Override public String toString() { return "View{" + "diadem=" + diadem + ", assembly=" + assembly.keySet().stream().sorted().toList() + '}'; @@ -367,18 +306,21 @@ public String toString() { } private class Recon implements Reconfiguration { - @Override public void certify() { - if (proposals.size() == selected.assembly.size()) { - cancelSlice.set(true); - log.debug("Certifying: {} required: {} of: {} slate: {} on: {}", nextViewId, selected.assembly.size(), + if (proposals.size() >= selected.majority) { + log.debug("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority, nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); + + proposals.forEach((key, value) -> { + slate.put(key, joinOf(value)); + }); + log.debug("Electing view: {} slate: {} on: {}", nextViewId, slate.keySet().stream().sorted().toList(), + params().member().getId()); transitions.certified(); } else { - log.debug("Not certifying: {} required: {} slate: {} of: {} on: {}", nextViewId, - selected.assembly.size(), proposals.entrySet().stream().sorted().toList(), nextViewId, - params().member().getId()); + log.debug("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority, + proposals.entrySet().stream().sorted().toList(), nextViewId, params().member().getId()); } } @@ -390,51 +332,23 @@ public void complete() { @Override public void elect() { if (selected != null && proposals.size() == selected.assembly().size()) { - proposals.entrySet().forEach(e -> slate.put(e.getKey(), joinOf(e.getValue()))); - cancelSlice.set(true); + proposals.forEach((key, value) -> slate.put(key, joinOf(value))); log.debug("Electing view: {} slate: {} on: {}", nextViewId, slate.keySet().stream().sorted().toList(), params().member().getId()); transitions.complete(); } else { - log.error("Failed election, selected: {} required: {} slate: {} of: {} on: {}", selected != null, - view.context().getRingCount(), proposals.keySet().stream().sorted().toList(), nextViewId, - params().member().getId()); + log.error("Failed election; selected: {} proposed: {} of: {} on: {}", selected != null, + proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); transitions.failed(); } } @Override public void failed() { - stop(); view.onFailure(); log.debug("Failed view assembly for: {} on: {}", nextViewId, params().member().getId()); } - @Override - public void gather() { - assert selected != null : "Have not selected a view"; - log.trace("Gathering assembly for: {} on: {}", nextViewId, params().member().getId()); - AtomicReference reiterate = new AtomicReference<>(); - var retryDelay = Duration.ofMillis(10); - reiterate.set(() -> { - var slice = new ArrayList<>(selected.assembly.values()); - var committee = new SliceIterator<>("Committee for " + nextViewId, params().member(), slice, comms); - committee.iterate((term, m) -> { - if (polled.contains(m.getId())) { - return null; - } - log.trace("Requesting Join from: {} on: {}", term.getMember().getId(), params().member().getId()); - return term.join(nextViewId); - }, ViewAssembly.this::consider, () -> completeSlice(retryDelay, reiterate), params().gossipDuration()); - }); - reiterate.get().run(); - } - - @Override - public void nominate() { - transitions.nominated(); - } - @Override public void viewAgreement() { ViewAssembly.this.castVote(); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java index 938319992..fc5e3d4d2 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java @@ -150,6 +150,10 @@ public Block produce(ULong l, Digest hash, Executions executions, HashedBlock ch return blockProducer.produce(l, hash, executions, checkpoint); } + public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) { + return blockProducer.produce(height, prev, assemble, checkpoint); + } + public void publish(HashedCertifiedBlock block) { blockProducer.publish(block.hash, block.certifiedBlock); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/Concierge.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/Concierge.java index 9aea366ba..d8baf25c0 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/Concierge.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/Concierge.java @@ -6,6 +6,7 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.protobuf.Empty; import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.cryptography.Digest; @@ -20,7 +21,7 @@ public interface Concierge { Blocks fetchViewChain(BlockReplication request, Digest from); - SignedViewMember join(Digest nextView, Digest from); + Empty join(SignedViewMember nextView, Digest from); Initial sync(Synchronize request, Digest from); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java index a54b4c675..394f4664d 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java @@ -6,9 +6,9 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.Link; import com.salesforce.apollo.choam.proto.*; -import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; @@ -47,8 +47,8 @@ public Member getMember() { } @Override - public SignedViewMember join(Digest nextView) { - return service.join(nextView, member.getId()); + public Empty join(SignedViewMember join) { + return service.join(join, member.getId()); } @Override @@ -64,7 +64,7 @@ public Initial sync(Synchronize sync) { Blocks fetchViewChain(BlockReplication replication); - SignedViewMember join(Digest nextView); + Empty join(SignedViewMember join); Initial sync(Synchronize sync); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java index 0a3c237ff..26ee81aa0 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java @@ -6,11 +6,11 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.ManagedServerChannel; import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications; import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.choam.support.ChoamMetrics; -import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.membership.Member; /** @@ -60,8 +60,8 @@ public Member getMember() { } @Override - public SignedViewMember join(Digest nextView) { - return client.join(nextView.toDigeste()); + public Empty join(SignedViewMember vm) { + return client.join(vm); } public void release() { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalServer.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalServer.java index 28b74430b..d31c7371e 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalServer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalServer.java @@ -6,12 +6,12 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.RoutableService; import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.choam.proto.TerminalGrpc.TerminalImplBase; import com.salesforce.apollo.choam.support.ChoamMetrics; import com.salesforce.apollo.cryptography.Digest; -import com.salesforce.apollo.cryptography.proto.Digeste; import com.salesforce.apollo.protocols.ClientIdentity; import io.grpc.stub.StreamObserver; @@ -70,14 +70,14 @@ public void fetchViewChain(BlockReplication request, StreamObserver resp } @Override - public void join(Digeste nextView, StreamObserver responseObserver) { + public void join(SignedViewMember request, StreamObserver responseObserver) { Digest from = identity.getFrom(); if (from == null) { responseObserver.onError(new IllegalStateException("Member has been removed")); return; } router.evaluate(responseObserver, s -> { - responseObserver.onNext(s.join(Digest.from(nextView), from)); + responseObserver.onNext(s.join(request, from)); responseObserver.onCompleted(); }); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java index 88ae2bada..9b290ed26 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java @@ -21,9 +21,7 @@ */ public interface Driven { - void assembled(); - - void checkAssembly(); + void assemble(); void checkpoint(); @@ -33,30 +31,25 @@ public interface Driven { void fail(); + void reconfigure(); + void startProduction(); enum Earner implements Driven.Transitions { - AWAIT_VIEW { + ASSEMBLE { @Entry - public void checkAssembly() { - context().checkAssembly(); + public void assemble() { + context().assemble(); } @Override - public Transitions create(List preblock, boolean last) { - context().checkAssembly(); - return super.create(preblock, last); - } - - @Override - public Transitions lastBlock() { - return COMPLETE; + public Transitions assembled() { + return SPICE; } @Override - public Transitions viewComplete() { - context().assembled(); - return null; + public Transitions newEpoch(int epoch, boolean lastEpoch) { + return lastEpoch ? PROTOCOL_FAILURE : null; } }, CHECKPOINTING { @Entry @@ -66,7 +59,7 @@ public void check() { @Override public Transitions checkpointed() { - return SPICE; + return ASSEMBLE; } }, COMPLETE { @Entry @@ -81,7 +74,20 @@ public Transitions checkpoint() { @Override public Transitions start() { - return SPICE; + return ASSEMBLE; + } + }, END_EPOCHS { + @Override + public Transitions newEpoch(int epoch, boolean lastEpoch) { + if (lastEpoch) { + context().reconfigure(); + } + return null; + } + + @Override + public Transitions lastBlock() { + return COMPLETE; } }, PROTOCOL_FAILURE { @Override @@ -116,18 +122,13 @@ public void terminate() { } }, SPICE { @Override - public Transitions newEpoch(int epoch, int lastEpoch) { - return (lastEpoch == epoch) ? AWAIT_VIEW : null; - } - - @Entry - public void startProduction() { - context().startProduction(); + public Transitions newEpoch(int epoch, boolean lastEpoch) { + return lastEpoch ? PROTOCOL_FAILURE : null; } @Override public Transitions viewComplete() { - return null; + return END_EPOCHS; } } } @@ -136,6 +137,10 @@ public Transitions viewComplete() { interface Transitions extends FsmExecutor { Logger log = LoggerFactory.getLogger(Transitions.class); + default Transitions assembled() { + throw fsm().invalidTransitionOn(); + } + default Transitions checkpoint() { throw fsm().invalidTransitionOn(); } @@ -161,7 +166,7 @@ default Transitions lastBlock() { throw fsm().invalidTransitionOn(); } - default Transitions newEpoch(int epoch, int lastEpoch) { + default Transitions newEpoch(int epoch, boolean lastEpoch) { throw fsm().invalidTransitionOn(); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java index d69d515a3..5ebef559b 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java @@ -21,10 +21,6 @@ public interface Reconfiguration { void failed(); - void gather(); - - void nominate(); - void viewAgreement(); enum Reconfigure implements Transitions { @@ -48,36 +44,16 @@ public void certify() { public Transitions gathered() { return CERTIFICATION; } - - @Override - public Transitions validation() { - return CERTIFICATION; - } }, GATHER { - @Entry - public void assembly() { - context().gather(); - } - @Override public Transitions gathered() { - return NOMINATION; + return CERTIFICATION; } @Override public Transitions viewDetermined() { return null; } - }, NOMINATION { - @Entry - public void nominate() { - context().nominate(); - } - - @Override - public Transitions nominated() { - return CERTIFICATION; - } }, PROTOCOL_FAILURE { @Override public Transitions assembled() { @@ -99,25 +75,10 @@ public Transitions failed() { return null; } - @Override - public Transitions gathered() { - return null; - } - - @Override - public Transitions nominated() { - return null; - } - @Entry public void terminate() { context().failed(); } - - @Override - public Transitions validation() { - return null; - } }, RECONFIGURE { @Override public Transitions complete() { @@ -161,7 +122,7 @@ default Transitions certified() { } default Transitions complete() { - return Reconfigure.RECONFIGURE; + throw fsm().invalidTransitionOn(); } default Transitions failed() { @@ -172,14 +133,6 @@ default Transitions gathered() { return null; } - default Transitions nominated() { - throw fsm().invalidTransitionOn(); - } - - default Transitions validation() { - return null; - } - default Transitions viewDetermined() { throw fsm().invalidTransitionOn(); } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java index 679ab9e5e..8e8f8ac2d 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java @@ -6,6 +6,7 @@ */ package com.salesforce.apollo.choam; +import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; @@ -31,7 +32,6 @@ import com.salesforce.apollo.stereotomy.mem.MemKeyStore; import org.joou.ULong; import org.junit.jupiter.api.Test; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.LoggerFactory; @@ -88,18 +88,10 @@ public void genesis() throws Exception { Map servers = members.stream().collect(Collectors.toMap(m -> m, m -> mock(Concierge.class))); servers.forEach((m, s) -> { - when(s.join(any(Digest.class), any(Digest.class))).then(new Answer() { - @Override - public ViewMember answer(InvocationOnMock invocation) throws Throwable { - KeyPair keyPair = params.getViewSigAlgorithm().generateKeyPair(); - final PubKey consensus = bs(keyPair.getPublic()); - return ViewMember.newBuilder() - .setId(m.getId().toDigeste()) - .setConsensusKey(consensus) - .setSignature(((Signer) m).sign(consensus.toByteString()).toSig()) - .build(); - - } + when(s.join(any(SignedViewMember.class), any(Digest.class))).then((Answer) invocation -> { + KeyPair keyPair = params.getViewSigAlgorithm().generateKeyPair(); + final PubKey consensus = bs(keyPair.getPublic()); + return Empty.getDefaultInstance(); }); }); @@ -146,6 +138,11 @@ public void onFailure() { // do nothing } + @Override + public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) { + return null; + } + @Override public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) { return null; diff --git a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java index 07d4ae6c1..bf582d180 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java @@ -16,6 +16,7 @@ import com.salesforce.apollo.context.DynamicContext; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; @@ -91,7 +92,7 @@ public void genesisBootstrap() throws Exception { .filter( e -> !testSubject.getId().equals(e.getKey())) .map(Map.Entry::getValue) - .allMatch(c -> c.active())); + .allMatch(CHOAM::active)); assertTrue(active, "Group did not become active, test subject: " + testSubject.getId() + " txneer: " + txneer.getId() + " inactive: " + choams.entrySet() @@ -141,6 +142,9 @@ public SigningMember initialize(int checkpointBlockSize, int cardinality) throws .setBatchInterval(Duration.ofMillis(10)) .setMaxBatchByteSize(1024 * 1024) .setMaxBatchCount(10_000) + .setEthereal(Config.newBuilder() + .setEpochLength(7) + .setNumberOfEpochs(3)) .build()) .setGenerateGenesis(true) .setCheckpointBlockDelta(checkpointBlockSize); diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml index 2e23bba65..70a265e18 100644 --- a/choam/src/test/resources/logback-test.xml +++ b/choam/src/test/resources/logback-test.xml @@ -20,15 +20,15 @@ - + - + - + diff --git a/grpc/src/main/proto/choam.proto b/grpc/src/main/proto/choam.proto index 31c5af11d..4ff624da7 100644 --- a/grpc/src/main/proto/choam.proto +++ b/grpc/src/main/proto/choam.proto @@ -4,6 +4,9 @@ option java_multiple_files = true; option java_package = "com.salesforce.apollo.choam.proto"; option java_outer_classname = "ChoamProto"; option objc_class_prefix = "Chp"; + +import "google/protobuf/empty.proto"; + import "crypto.proto"; import "stereotomy.proto"; @@ -15,7 +18,7 @@ service TransactionSubmission { service Terminal { /* reconfiguration */ - rpc join (crypto.Digeste) returns (SignedViewMember) {} + rpc join (SignedViewMember) returns (google.protobuf.Empty) {} /* bootstrapping */ rpc sync(Synchronize) returns (Initial) {} @@ -47,6 +50,7 @@ message Block { Reconfigure reconfigure = 3; Checkpoint checkpoint = 4; Executions executions = 5; + Assemble assemble = 6; } } @@ -83,6 +87,10 @@ message Executions { repeated Transaction executions = 1; } +message Assemble { + View view = 1; +} + message FoundationSeal { stereotomy.KeyEvent_ foundation = 1; crypto.Sig signature = 2; @@ -130,7 +138,8 @@ message Views { message View { crypto.Digeste diadem = 1; - repeated crypto.Digeste committee = 2; + int32 majority = 2; + repeated crypto.Digeste committee = 3; } message SignedView { @@ -142,8 +151,9 @@ message SignedView { message ViewMember { crypto.Digeste id = 1; crypto.Digeste view = 2; - crypto.PubKey consensusKey = 3; - crypto.Sig signature = 4; + crypto.Digeste diadem = 3; + crypto.PubKey consensusKey = 4; + crypto.Sig signature = 5; } message SignedViewMember { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java index 9704c0b8d..375a4a179 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java @@ -57,6 +57,9 @@ public void evaluate(StreamObserver responseObserver, Consumer c) { } } c.accept(binding.service); + } catch (StatusRuntimeException sre) { + log.debug("Uncaught SRE in service evaluation: {} for context: {}", sre.getStatus(), context); + responseObserver.onError(sre); } catch (RejectedExecutionException e) { log.debug("Rejected execution context: {} on: {}", context, c); } catch (Throwable t) {