diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 2f8889e90..877c4a896 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -9,6 +9,7 @@ import com.chiralbehaviors.tron.Fsm; import com.google.common.base.Function; import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; @@ -35,6 +36,7 @@ import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.MessageAdapter; import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.Msg; import com.salesforce.apollo.messaging.proto.AgedMessageOrBuilder; +import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; import io.grpc.StatusRuntimeException; import org.h2.mvstore.MVMap; @@ -46,6 +48,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.security.KeyPair; +import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -61,6 +64,8 @@ import static com.salesforce.apollo.choam.support.HashedBlock.height; import static com.salesforce.apollo.cryptography.QualifiedBase64.bs; import static com.salesforce.apollo.cryptography.QualifiedBase64.digest; +import static io.grpc.Status.FAILED_PRECONDITION; +import static io.grpc.Status.INVALID_ARGUMENT; /** * Combine Honnete Ober Advancer Mercantiles. @@ -94,6 +99,7 @@ public class CHOAM { private final TransSubmission txnSubmission = new TransSubmission(); private final AtomicReference view = new AtomicReference<>(); private final PendingViews pendingViews = new PendingViews(); + private final AtomicReference> join = new AtomicReference<>(); public CHOAM(Parameters params) { this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build()); @@ -101,11 +107,11 @@ public CHOAM(Parameters params) { executions = Executors.newVirtualThreadPerTaskExecutor(); pendingViews.add(params.context().getId(), params.context().delegate()); - nextView(); + rotateViewKeys(); var bContext = new DelegatedContext<>(params.context()); - var adapter = new MessageAdapter(any -> true, (Function) this::signatureHash, - (Function>) any -> Collections.emptyList(), - (m, any) -> any, + var adapter = new MessageAdapter(_ -> true, (Function) this::signatureHash, + (Function>) _ -> Collections.emptyList(), + (_, any) -> any, (Function) AgedMessageOrBuilder::getContent); combine = new ReliableBroadcaster(bContext, params.member(), params.combine(), params.communications(), @@ -141,7 +147,7 @@ public CHOAM(Parameters params) { transitions = fsm.getTransitions(); roundScheduler = new RoundScheduler("CHOAM" + params.member().getId() + params.context().getId(), params.context().timeToLive()); - combine.register(i -> roundScheduler.tick()); + combine.register(_ -> roundScheduler.tick()); session = new Session(params, service()); } @@ -176,11 +182,11 @@ public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmen } var crown = accumulator.build(); log.info("Checkpoint length: {} segment size: {} count: {} crown: {} initial: {} on: {}", length, segmentSize, - builder.getCount(), crown, initial, id); + builder.getCount(), crown.compactWrapped(), initial, id); var cp = builder.setCrown(crown.toHexBloome()).build(); var deserialized = HexBloom.from(cp.getCrown()); - log.info("Deserialized checkpoint crown: {} initial: {} on: {}", deserialized, initial, id); + log.info("Deserialized checkpoint crown: {} initial: {} on: {}", deserialized.compactWrapped(), initial, id); return cp; } @@ -228,7 +234,7 @@ public static Block reconfigure(Digest nextViewId, Map joins, Hash } public static Map rosterMap(Context baseContext, Collection members) { - return members.stream().collect(Collectors.toMap(m -> m, m -> baseContext.getMember(m))); + return members.stream().collect(Collectors.toMap(m -> m, baseContext::getMember)); } public static List toGenesisData(List initializationData) { @@ -246,6 +252,17 @@ public static List toGenesisData(List initializa .toList(); } + private static Block assembly(AtomicReference nextViewId, View view, HashedBlock head, + HashedBlock lastViewChange, Parameters params, HashedBlock lastCheckpoint) { + var body = Assemble.newBuilder().setView(view).build(); + return Block.newBuilder() + .setHeader( + buildHeader(params.digestAlgorithm(), body, head.hash, ULong.valueOf(0), lastCheckpoint.height(), + lastCheckpoint.hash, lastViewChange.height(), lastViewChange.hash)) + .setAssemble(body) + .build(); + } + public boolean active() { final var c = current.get(); HashedCertifiedBlock h = head.get(); @@ -308,7 +325,7 @@ public String logState() { * @param context - the new membership context * @param diadem - the compact HexBloom of the context view */ - public void nextView(Context context, Digest diadem) { + public void rotateViewKeys(Context context, Digest diadem) { ((DelegatedContext) combine.getContext()).setContext(context); var c = current.get(); if (c != null) { @@ -376,26 +393,6 @@ private void cancelSynchronization() { } } - private boolean checkJoin(Digest nextView, Digest from) { - Member source = params.context().getMember(from); - if (source == null) { - log.debug("Request to join from non member: {} on: {}", from, params.member().getId()); - return false; - } - final var nextId = nextViewId.get(); - if (nextId == null) { - log.debug("Cannot join view: {} from: {}, next view has not been defined on: {}", nextView, source.getId(), - params.member().getId()); - return false; - } - if (!nextId.equals(nextView)) { - log.debug("Request to join incorrect view: {} expected: {} from: {} on: {}", nextView, nextId, - source.getId(), params.member().getId()); - return false; - } - return true; - } - private Block checkpoint() { transitions.beginCheckpoint(); HashedBlock lb = head.get(); @@ -484,7 +481,6 @@ private void combine(Msg m) { private BlockProducer constructBlock() { return new BlockProducer() { - @Override public Block checkpoint() { return CHOAM.this.checkpoint(); @@ -494,6 +490,8 @@ public Block checkpoint() { public Block genesis(Map joining, Digest nextViewId, HashedBlock previous) { final HashedCertifiedBlock cp = checkpoint.get(); final HashedCertifiedBlock v = view.get(); + log.trace("Genesis cp: {} view: {} previous: {} on: {}", cp.hash, v.hash, previous.hash, + params.member().getId()); var g = CHOAM.genesis(nextViewId, joining, previous, v, params, cp, params.genesisData() .apply(joining.keySet() .stream() @@ -502,7 +500,7 @@ public Block genesis(Map joining, Digest nextViewId, HashedBlock p .getMember( m)) .filter( - m -> m != null) + Objects::nonNull) .collect( Collectors.toMap( m -> m, @@ -517,6 +515,17 @@ public void onFailure() { transitions.fail(); } + @Override + public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) { + final HashedCertifiedBlock v = view.get(); + return Block.newBuilder() + .setHeader( + buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(), + checkpoint.hash, v.height(), v.hash)) + .setAssemble(assemble) + .build(); + } + @Override public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) { final HashedCertifiedBlock v = view.get(); @@ -533,9 +542,9 @@ public Block produce(ULong height, Digest prev, Executions executions, HashedBlo @Override public void publish(Digest hash, CertifiedBlock cb) { - log.info("Publishing: {} hash: {} height: {} certifications: {} on: {}", cb.getBlock().getBodyCase(), - hash, ULong.valueOf(cb.getBlock().getHeader().getHeight()), cb.getCertificationsCount(), - params.member().getId()); + log.trace("Publishing: {} hash: {} height: {} certifications: {} on: {}", cb.getBlock().getBodyCase(), + hash, ULong.valueOf(cb.getBlock().getHeader().getHeight()), cb.getCertificationsCount(), + params.member().getId()); combine.publish(cb, true); } @@ -629,38 +638,20 @@ private boolean isNext(HashedBlock next) { return isNext; } - private SignedViewMember join(Digest nextView, Digest from) { - final var c = current.get(); + private Empty join(SignedViewMember nextView, Digest from) { + var c = current.get(); if (c == null) { - return SignedViewMember.getDefaultInstance(); - } - return c.join(nextView, from); - } - - private void nextView() { - KeyPair keyPair = params.viewSigAlgorithm().generateKeyPair(); - PubKey pubKey = bs(keyPair.getPublic()); - JohnHancock signed = params.member().sign(pubKey.toByteString()); - if (signed == null) { - log.error("Unable to generate and sign consensus key on: {}", params.member().getId()); - return; + log.trace("No committee for: {} to join: {} diadem: {} on: {}", from, + Digest.from(nextView.getVm().getView()), Digest.from(nextView.getVm().getDiadem()), + params.member().getId()); + throw new StatusRuntimeException(FAILED_PRECONDITION); } - var committee = current.get(); - log.trace("Generated next view consensus key: {} sig: {} committee: {} on: {}", - params.digestAlgorithm().digest(pubKey.getEncoded()), - params.digestAlgorithm().digest(signed.toSig().toByteString()), - committee == null ? "" : committee.getClass().getSimpleName(), params.member().getId()); - next.set(new nextView(ViewMember.newBuilder() - .setId(params.member().getId().toDigeste()) - .setConsensusKey(pubKey) - .setSignature(signed.toSig()) - .build(), keyPair)); + c.join(nextView, from); + return Empty.getDefaultInstance(); } private Supplier pendingViews() { - return () -> { - return pendingViews; - }; + return () -> pendingViews; } private void process() { @@ -677,11 +668,15 @@ private void process() { case GENESIS: { cancelSynchronization(); cancelBootstrap(); - transitions.regenerated(); genesisInitialization(h, h.block.getGenesis().getInitializeList()); reconfigure(h.hash, h.block.getGenesis().getInitialView()); break; } + case ASSEMBLE: { + params.processor().beginBlock(h.height(), h.hash); + c.assemble(h.block.getAssemble()); + break; + } case EXECUTIONS: { params.processor().beginBlock(h.height(), h.hash); execute(h.block.getExecutions().getExecutionsList()); @@ -703,17 +698,20 @@ private void process() { private void reconfigure(Digest hash, Reconfigure reconfigure) { log.info("Setting next view id: {} on: {}", hash, params.member().getId()); + var j = join.getAndSet(null); + if (j != null) { + j.cancel(true); + } nextViewId.set(hash); var pv = pendingViews.advance(); if (pv != null) { - // always advance view. params.context().setContext(pv.context); } final Committee c = current.get(); c.complete(); - var validators = validatorsOf(reconfigure, params.context()); + var validators = validatorsOf(reconfigure, params.context(), params.member().getId(), log); final var currentView = next.get(); - nextView(); + transitions.rotateViewKeys(); final HashedCertifiedBlock h = head.get(); view.set(h); session.setView(h); @@ -721,8 +719,9 @@ private void reconfigure(Digest hash, Reconfigure reconfigure) { if (Dag.validate(validators.size())) { current.set(new Associate(h, validators, currentView)); } else { - log.warn("Reconfiguration to associate failed: {} in view: {} on:{}", validators.size(), - new Digest(reconfigure.getId()), params.member().getId()); + log.warn("Reconfiguration to associate failed: {} committee: {} in view: {} on:{}", validators.size(), + new Digest(reconfigure.getId()), current.get().getClass().getSimpleName(), + params.member().getId()); transitions.fail(); } } else { @@ -781,10 +780,10 @@ private void restore() throws IllegalStateException { Reconfigure reconfigure = lastView.block.hasGenesis() ? lastView.block.getGenesis().getInitialView() : lastView.block.getReconfigure(); view.set(lastView); - var validators = validatorsOf(reconfigure, params.context()); + var validators = validatorsOf(reconfigure, params.context(), params.member().getId(), log); current.set(new Synchronizer(validators)); - log.info("Reconfigured to checkpoint view: {} on: {}", new Digest(reconfigure.getId()), - params.member().getId()); + log.info("Reconfigured to checkpoint view: {} committee: {} on: {}", new Digest(reconfigure.getId()), + current.get().getClass().getSimpleName(), params.member().getId()); } log.info("Restored to: {} lastView: {} lastCheckpoint: {} lastBlock: {} on: {}", geni.hash, view.get().hash, @@ -797,6 +796,29 @@ private void restoreFrom(HashedCertifiedBlock block, CheckpointState checkpoint) restore(); } + private void rotateViewKeys() { + // if (current.get() != null && !(current.get() instanceof Associate)) { + // log.info("rotate view calls on: {}", params.member().getId(), new Exception("Rotate view keys")); + // } + KeyPair keyPair = params.viewSigAlgorithm().generateKeyPair(); + PubKey pubKey = bs(keyPair.getPublic()); + JohnHancock signed = params.member().sign(pubKey.toByteString()); + if (signed == null) { + log.error("Unable to generate and sign consensus key on: {}", params.member().getId()); + return; + } + var committee = current.get(); + log.trace("Generated next view consensus key: {} sig: {} committee: {} on: {}", + params.digestAlgorithm().digest(pubKey.getEncoded()), + params.digestAlgorithm().digest(signed.toSig().toByteString()), + committee == null ? "" : committee.getClass().getSimpleName(), params.member().getId()); + next.set(new nextView(ViewMember.newBuilder() + .setId(params.member().getId().toDigeste()) + .setConsensusKey(pubKey) + .setSignature(signed.toSig()) + .build(), keyPair)); + } + private Function service() { return stx -> { // log.trace("Submitting transaction: {} in service() on: {}", stx.hash(), params.member()); @@ -832,7 +854,7 @@ private Digest signatureHash(ByteString any) { /** * Submit a transaction from a client * - * @return + * @return the SubmitResult describing the outcome */ private SubmitResult submit(Transaction request, Digest from) { if (from == null) { @@ -914,7 +936,7 @@ private void synchronize(SynchronizedState state) { params.member().getId()); try { linear.execute(Utils.wrapped(() -> { - transitions.regenerated(); + transitions.synchd(); transitions.combine(); }, log)); } catch (RejectedExecutionException e) { @@ -995,6 +1017,8 @@ public interface BlockProducer { void onFailure(); + Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint); + Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint); void publish(Digest hash, CertifiedBlock cb); @@ -1063,7 +1087,7 @@ public PendingView get(Digest diadem) { public Views.Builder getViews(Digest hash) { var builder = Views.newBuilder(); - views.values().stream().map(pv -> pv.getView(hash)).forEach(v -> builder.addViews(v)); + views.values().stream().map(pv -> pv.getView(hash)).forEach(builder::addViews); return builder; } @@ -1077,17 +1101,6 @@ public PendingView last() { l.unlock(); } } - - public PendingView pop() { - final var l = lock.writeLock(); - try { - l.lock(); - var v = views.pollFirstEntry(); - return v == null ? null : v.getValue(); - } finally { - l.unlock(); - } - } } public record PendingView(Digest diadem, Context context) { @@ -1096,10 +1109,10 @@ public record PendingView(Digest diadem, Context context) { * * @param hash - the "cut" across the rings of the context, determining the successors and thus the committee * members of the view - * @return the View determined by this Context and the supplied hash value + * @return the Vue determined by this Context and the supplied hash value */ public View getView(Digest hash) { - var builder = View.newBuilder().setDiadem(diadem.toDigeste()); + var builder = View.newBuilder().setDiadem(diadem.toDigeste()).setMajority(context.majority()); Committee.viewMembersOf(hash, context).forEach(d -> builder.addCommittee(d.getId().toDigeste())); return builder.build(); } @@ -1189,9 +1202,9 @@ public void fail() { @Override public void recover(HashedCertifiedBlock anchor) { - log.info("Anchor discovered: {} hash: {} height: {} on: {}", anchor.block.getBodyCase(), anchor.hash, - anchor.height(), params.member().getId()); current.set(new Formation()); + log.info("Anchor discovered: {} hash: {} height: {} committee: {} on: {}", anchor.block.getBodyCase(), + anchor.hash, anchor.height(), current.get().getClass().getSimpleName(), params.member().getId()); CHOAM.this.recover(anchor); } @@ -1200,6 +1213,11 @@ public void regenerate() { current.get().regenerate(); } + @Override + public void rotateViewKeys() { + CHOAM.this.rotateViewKeys(); + } + private void synchronizationFailed() { cancelSynchronization(); var activeCount = context().totalCount(); @@ -1241,8 +1259,9 @@ public Blocks fetchViewChain(BlockReplication request, Digest from) { } @Override - public SignedViewMember join(Digest nextView, Digest from) { - return CHOAM.this.join(nextView, from); + public Empty join(SignedViewMember nextView, Digest from) { + CHOAM.this.join(nextView, from); + return Empty.getDefaultInstance(); } @Override @@ -1270,32 +1289,25 @@ public void accept(HashedCertifiedBlock hb) { } @Override - public void complete() { + public void assemble(Assemble assemble) { + var mid = params.member().getId(); + var view = assemble.getView(); + if (view.getCommitteeList().stream().map(Digest::from).noneMatch(mid::equals)) { + log.info("Assemble view: {}; Not associate: {} in diadem: {} on: {}", viewId, + getClass().getSimpleName(), Digest.from(view.getDiadem()), mid); + return; + } + log.info("Assemble view: {}; Associate in diadem: {} on: {}", viewId, Digest.from(view.getDiadem()), mid); + join(view); } @Override - public boolean isMember() { - return validators.containsKey(params.member()); + public void complete() { } @Override - public SignedViewMember join(Digest nextView, Digest from) { - if (!checkJoin(nextView, from)) { - log.debug("Join requested for invalid view: {} from: {} on: {}", nextView, from, - params.member().getId()); - return SignedViewMember.getDefaultInstance(); - } - final var c = next.get(); - var inView = ViewMember.newBuilder(c.member).setView(nextView.toDigeste()).build(); - - if (log.isDebugEnabled()) { - log.debug("Joining view: {} from: {} view member: {} on: {}", nextView, from, - ViewContext.print(inView, params.digestAlgorithm()), params.member().getId()); - } - return SignedViewMember.newBuilder() - .setVm(inView) - .setSignature(params.member().sign(inView.toByteString()).toSig()) - .build(); + public boolean isMember() { + return validators.containsKey(params.member()); } @Override @@ -1318,6 +1330,11 @@ public Parameters params() { @Override public SubmitResult submitTxn(Transaction transaction) { + if (!started.get()) { + log.trace("Failed submitting txn: {} no servers available in: {} on: {}", + hashOf(transaction, params.digestAlgorithm()), viewId, params.member().getId()); + return SubmitResult.newBuilder().setResult(Result.ERROR_SUBMITTING).setErrorMsg("Shutdown").build(); + } if (!servers.hasNext()) { log.trace("Failed submitting txn: {} no servers available in: {} on: {}", hashOf(transaction, params.digestAlgorithm()), viewId, params.member().getId()); @@ -1355,6 +1372,88 @@ public SubmitResult submitTxn(Transaction transaction) { public boolean validate(HashedCertifiedBlock hb) { return validate(hb, validators); } + + private void join(View view) { + var joining = new CompletableFuture(); + if (!join.compareAndSet(null, joining)) { + log.info("Ongoing join of: {} should have been cancelled on: {}", Digest.from(view.getDiadem()), + params.member().getId()); + transitions.fail(); + return; + } + log.info("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + params.member().getId()); + var servers = new GroupIterator(validators.keySet()); + var joined = new HashSet(); + + var delay = Duration.ofMillis(Entropy.nextSecureInt(5)); + + Thread.ofPlatform().start(() -> { + log.error("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + params.member().getId()); + while (!joining.isDone() && joined.size() < view.getMajority()) { + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + join(view, servers, joined); + } + log.info("Finishing join of: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()), + params.member().getId()); + joining.complete(null); + }); + } + + private void join(View view, GroupIterator servers, HashSet joined) { + Member target = servers.next(); + if (joined.contains(target)) { + log.trace("Already joined with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId.get(), + Digest.from(view.getDiadem()), params.member().getId()); + return; + } + try (var link = comm.connect(target)) { + join(view, link, target, joined); + } catch (StatusRuntimeException e) { + log.trace("Failed join attempt with: {} view: {} diadem: {} status:{} on: {}", target.getId(), + nextViewId, Digest.from(view.getDiadem()), e.getStatus(), params.member().getId()); + } catch (Throwable e) { + log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId, + Digest.from(view.getDiadem()), params.member().getId(), e); + } + } + + private void join(View view, Terminal link, Member target, HashSet joined) { + if (link == null) { + log.debug("No link for: {} for joining: {} on: {}", target.getId(), Digest.from(view.getDiadem()), + params.member().getId()); + return; + } + log.trace("Joining view: {} diadem: {} on: {}", viewId, Digest.from(view.getDiadem()), + params.member().getId()); + final var c = next.get(); + var inView = ViewMember.newBuilder(c.member) + .setDiadem(view.getDiadem()) + .setView(nextViewId.get().toDigeste()) + .build(); + var svm = SignedViewMember.newBuilder() + .setVm(inView) + .setSignature(params.member().sign(inView.toByteString()).toSig()) + .build(); + try { + link.join(svm); + joined.add(target); + log.trace("Joined with: {} view: {} diadem: {} on: {}", target.getId(), viewId, + Digest.from(view.getDiadem()), params.member().getId()); + } catch (StatusRuntimeException sre) { + log.trace("Failed join attempt: {} with: {} view: {} diadem: {} on: {}", sre.getStatus(), + target.getId(), nextViewId, Digest.from(view.getDiadem()), params.member().getId(), sre); + } catch (Throwable t) { + log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId, + Digest.from(view.getDiadem()), params.member().getId(), t); + } + } } /** a member of the current committee */ @@ -1376,7 +1475,7 @@ private class Associate extends Administration { var pv = pendingViews(); producer = new Producer(nextViewId.get(), new ViewContext(context, params, pv, signer, validators, constructBlock()), - head.get(), checkpoint.get(), comm, getLabel()); + head.get(), checkpoint.get(), getLabel()); producer.start(); } @@ -1385,6 +1484,17 @@ public void complete() { producer.stop(); } + @Override + public void join(SignedViewMember nextView, Digest from) { + if (!from.equals(Digest.from(nextView.getVm().getId()))) { + log.trace("Join from: {} does not match {} from join: {} diadem: {} on: {}", from, + Digest.from(nextView.getVm().getId()), Digest.from(nextView.getVm().getView()), + Digest.from(nextView.getVm().getDiadem()), params.member().getId()); + throw new StatusRuntimeException(INVALID_ARGUMENT); + } + producer.join(nextView); + } + @Override public SubmitResult submit(Transaction request) { return producer.submit(request); @@ -1451,24 +1561,6 @@ public boolean isMember() { return formation.isMember(params.member()); } - @Override - public SignedViewMember join(Digest nextView, Digest from) { - if (!checkJoin(nextView, from)) { - return SignedViewMember.getDefaultInstance(); - } - final var c = next.get(); - var inView = ViewMember.newBuilder(c.member).setView(nextView.toDigeste()).build(); - - if (log.isDebugEnabled()) { - log.debug("Joining view: {} from: {} view member: {} on: {}", nextView, from, - ViewContext.print(inView, params.digestAlgorithm()), params.member().getId()); - } - return SignedViewMember.newBuilder() - .setVm(inView) - .setSignature(params.member().sign(inView.toByteString()).toSig()) - .build(); - } - @Override public Logger log() { return log; @@ -1530,11 +1622,6 @@ public boolean isMember() { return false; } - @Override - public SignedViewMember join(Digest nextView, Digest from) { - return SignedViewMember.getDefaultInstance(); - } - @Override public Logger log() { return log; diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Committee.java b/choam/src/main/java/com/salesforce/apollo/choam/Committee.java index 21734f6ad..bbb2713ec 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Committee.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Committee.java @@ -16,27 +16,45 @@ import com.salesforce.apollo.cryptography.Verifier; import com.salesforce.apollo.cryptography.Verifier.DefaultVerifier; import com.salesforce.apollo.membership.Member; +import com.salesforce.apollo.membership.MockMember; +import io.grpc.StatusRuntimeException; import org.slf4j.Logger; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey; +import static io.grpc.Status.ABORTED; /** * @author hal.hildebrand */ public interface Committee { - static Map validatorsOf(Reconfigure reconfigure, Context context) { - var validators = reconfigure.getJoinsList() - .stream() - .collect( - Collectors.toMap(e -> context.getMember(new Digest(e.getMember().getVm().getId())), - e -> (Verifier) new DefaultVerifier( - publicKey(e.getMember().getVm().getConsensusKey())))); + static Map validatorsOf(Reconfigure reconfigure, Context context, Digest member, + Logger log) { + var validators = reconfigure.getJoinsList().stream().collect(Collectors.toMap(e -> { + var id = new Digest(e.getMember().getVm().getId()); + var m = context.getMember(id); + if (m == null) { + log.info("No member for validator: {}, returning mock on: {}", id, member); + return new MockMember(id); + } else { + return m; + } + }, e -> { + var vm = e.getMember().getVm(); + if (vm.hasConsensusKey()) { + + return new DefaultVerifier(publicKey(vm.getConsensusKey())); + } else { + log.info("No member for validator: {}, returning mock on: {}", Digest.from(vm.getId()), member); + return Verifier.NO_VERIFIER; + } + })); assert !validators.isEmpty() : "No validators in this reconfiguration of: " + context.getId(); return validators; } @@ -64,11 +82,19 @@ static Set viewMembersOf(Digest hash, Context baseContex void accept(HashedCertifiedBlock next); + default void assemble(Assemble assemble) { + } + void complete(); boolean isMember(); - SignedViewMember join(Digest nextView, Digest from); + default void join(SignedViewMember nextView, Digest from) { + log().trace("Error joining by: {} view: {} diadem: {} invalid committee: {} on: {}", from, + Digest.from(nextView.getVm().getView()), Digest.from(nextView.getVm().getView()), + this.getClass().getSimpleName(), params().member().getId()); + throw new StatusRuntimeException(ABORTED); + } Logger log(); @@ -123,8 +149,8 @@ default boolean validate(HashedCertifiedBlock hb, Certification c, Map validators) { Parameters params = params(); - - log().trace("Validating block: {} height: {} certs: {} on: {}", hb.hash, hb.height(), + log().trace("Validating block: {} hash: {} height: {} certs: {} on: {}", hb.block.getBodyCase(), hb.hash, + hb.height(), hb.certifiedBlock.getCertificationsList().stream().map(c -> new Digest(c.getId())).toList(), params.member().getId()); int valid = 0; @@ -137,17 +163,18 @@ default boolean validate(HashedCertifiedBlock hb, Map validato } } final int toleranceLevel = params.context().toleranceLevel(); - log().trace("Validate: {} height: {} count: {} needed: {} on: {}}", hb.hash, hb.height(), valid, toleranceLevel, + log().trace("Validate: {} height: {} count: {} needed: {} on: {}", hb.hash, hb.height(), valid, toleranceLevel, params.member().getId()); return valid > toleranceLevel; } default boolean validateRegeneration(HashedCertifiedBlock hb) { - if (!hb.block.hasGenesis()) { + if (!Objects.requireNonNull(hb.block).hasGenesis()) { return false; } var reconfigure = hb.block.getGenesis().getInitialView(); - var validators = validatorsOf(reconfigure, params().context()); + var validators = validatorsOf(reconfigure, params().context(), params().member().getId(), log()); return !validators.isEmpty() && validate(hb, validators); } + } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index 865a954b5..c8d4e41a4 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -237,6 +237,7 @@ public void stop() { private void certify(Validate v) { if (reconfiguration == null) { pendingValidations.add(v); + return; } log.trace("Validating reconfiguration block: {} height: {} on: {}", reconfiguration.hash, reconfiguration.height(), params().member().getId()); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java b/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java index 983d6414f..2a3449d64 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java @@ -570,7 +570,7 @@ public Builder setMaxGossipDelay(Duration maxGossipDelay) { } } - public static class LimiterBuilder { + public static class LimiterBuilder implements Cloneable { private Duration backlogDuration = Duration.ofSeconds(1); private int backlogSize = 1_000; private double backoffRatio = 0.5; @@ -597,6 +597,14 @@ public Limiter build(String name, MetricRegistry metrics) { .build(); } + public LimiterBuilder clone() { + try { + return (LimiterBuilder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new IllegalStateException(e); + } + } + public int getBacklogSize() { return backlogSize; } @@ -706,11 +714,20 @@ public Parameters build(RuntimeParameters runtime) { @Override public Builder clone() { + Builder clone; try { - return (Builder) super.clone(); + clone = (Builder) super.clone(); } catch (CloneNotSupportedException e) { throw new IllegalStateException("well, that was unexpected"); } + clone.setMvBuilder(mvBuilder.clone()); + clone.setProducer( + new ProducerParameters(producer.ethereal.clone(), producer.gossipDuration, producer.maxBatchByteSize(), + producer.batchInterval, producer.maxBatchCount(), producer.maxGossipDelay)); + clone.setTxnLimiterBuilder(txnLimiterBuilder.clone()); + clone.setSubmitPolicy(submitPolicy.clone()); + clone.setDrainPolicy(drainPolicy.clone()); + return clone; } public BootstrapParameters getBootstrap() { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 5366d85c3..c29d20d8c 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -9,8 +9,6 @@ import com.chiralbehaviors.tron.Fsm; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; -import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; -import com.salesforce.apollo.choam.comm.Terminal; import com.salesforce.apollo.choam.fsm.Driven; import com.salesforce.apollo.choam.fsm.Driven.Earner; import com.salesforce.apollo.choam.fsm.Driven.Transitions; @@ -30,8 +28,10 @@ import org.slf4j.LoggerFactory; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -42,57 +42,51 @@ */ public class Producer { - private static final Logger log = LoggerFactory.getLogger(Producer.class); - private final AtomicBoolean assembled = new AtomicBoolean(); - private final AtomicReference checkpoint = new AtomicReference<>(); - private final CommonCommunications comms; - private final Ethereal controller; - private final ChRbcGossip coordinator; - private final TxDataSource ds; - private final int lastEpoch; - private final Map nextAssembly = new HashMap<>(); - private final Map pending = new ConcurrentSkipListMap<>(); - private final List pendingAssemblies = new CopyOnWriteArrayList<>(); - private final Map> pendingValidations = new ConcurrentSkipListMap<>(); - private final AtomicReference previousBlock = new AtomicReference<>(); - private final AtomicBoolean reconfigured = new AtomicBoolean(); - private final AtomicBoolean started = new AtomicBoolean(false); - private final Transitions transitions; - private final ViewContext view; - private final Digest nextViewId; - private ViewAssembly assembly; - - public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, - CommonCommunications comms, String label) { + private static final Logger log = LoggerFactory.getLogger(Producer.class); + private final AtomicReference checkpoint = new AtomicReference<>(); + private final Ethereal controller; + private final ChRbcGossip coordinator; + private final TxDataSource ds; + private final Map pending = new ConcurrentSkipListMap<>(); + private final Map> pendingValidations = new ConcurrentSkipListMap<>(); + private final AtomicReference previousBlock = new AtomicReference<>(); + private final AtomicBoolean started = new AtomicBoolean(false); + private final Transitions transitions; + private final ViewContext view; + private final Digest nextViewId; + private final Semaphore serialize = new Semaphore(1); + private final ViewAssembly assembly; + private final int maxEpoch; + private volatile int preblocks = 0; + private volatile boolean assembled = false; + + public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label) { assert view != null; this.view = view; this.previousBlock.set(lastBlock); - this.comms = comms; this.checkpoint.set(checkpoint); this.nextViewId = nextViewId; final Parameters params = view.params(); final var producerParams = params.producer(); - final Builder ep = producerParams.ethereal(); - - lastEpoch = ep.getNumberOfEpochs() - 1; + final Builder ep = producerParams.ethereal().clone(); // Number of rounds we can provide data for final var blocks = ep.getEpochLength() - 2; - final int maxElements = blocks * lastEpoch; + maxEpoch = ep.getEpochLength(); - ds = new TxDataSource(params.member(), maxElements, params.metrics(), producerParams.maxBatchByteSize(), + ds = new TxDataSource(params.member(), blocks, params.metrics(), producerParams.maxBatchByteSize(), producerParams.batchInterval(), producerParams.maxBatchCount(), params().drainPolicy().build()); - log.info("Producer max elements: {} reconfiguration epoch: {} on: {}", maxElements, lastEpoch, + log.info("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch, params.member().getId()); var fsm = Fsm.construct(new DriveIn(), Transitions.class, Earner.INITIAL, true); fsm.setName("Producer%s on: %s".formatted(getViewId(), params.member().getId())); transitions = fsm.getTransitions(); - Config.Builder config = params().producer().ethereal().clone(); + Config.Builder config = ep.setNumberOfEpochs(-1); // Canonical assignment of members -> pid for Ethereal Short pid = view.roster().get(params().member().getId()); @@ -106,22 +100,48 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash config.setLabel("Producer" + getViewId() + " on: " + params().member().getId()); var producerMetrics = params().metrics() == null ? null : params().metrics().getProducerMetrics(); controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, - transitions::create, this::newEpoch, label); + (preblock, last) -> serial(preblock, last), this::newEpoch, label); coordinator = new ChRbcGossip(view.context(), params().member(), controller.processor(), params().communications(), producerMetrics); log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId()); + + var onConsensus = new CompletableFuture(); + onConsensus.whenComplete((v, throwable) -> { + if (throwable == null) { + produceAssemble(v); + } else { + log.warn("Error in view consensus on: {}", params.member().getId(), throwable); + } + }); + assembly = new ViewAssembly(nextViewId, view, Producer.this::addAssembly, onConsensus) { + @Override + public boolean complete() { + if (super.complete()) { + log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, + getSlate().keySet().stream().sorted().toList(), params().member().getId()); + assembled = true; + return true; + } + return false; + } + }; + } + + public void join(SignedViewMember viewMember) { + assembly.join(viewMember, true); } public void start() { if (!started.compareAndSet(false, true)) { return; } - reconfigure(); final Block prev = previousBlock.get().block; // genesis block won't ever be 0 - if (prev.hasReconfigure() && prev.getReconfigure().getCheckpointTarget() == 0) { + if (prev.hasGenesis() || (prev.hasReconfigure() && prev.getReconfigure().getCheckpointTarget() == 0)) { transitions.checkpoint(); } else { + log.trace("Checkpoint target: {} for: {} on: {}", prev.getReconfigure().getCheckpointTarget(), + params().context().getId(), params().member().getId()); transitions.start(); } } @@ -133,10 +153,6 @@ public void stop() { log.trace("Closing producer for: {} on: {}", getViewId(), params().member().getId()); controller.stop(); coordinator.stop(); - final var c = assembly; - if (c != null) { - c.stop(); - } ds.close(); } @@ -153,9 +169,11 @@ public SubmitResult submit(Transaction transaction) { private void addAssembly(Assemblies assemblies) { if (ds.offer(assemblies)) { - log.trace("Adding on: {}", params().member().getId()); + log.trace("Adding {} joins, {} views on: {}", assemblies.getJoinsCount(), assemblies.getViewsCount(), + params().member().getId()); } else { - log.trace("Cannot add join on: {}", params().member().getId()); + log.trace("Cannot add {} joins, {} views on: {}", assemblies.getJoinsCount(), assemblies.getViewsCount(), + params().member().getId()); } } @@ -199,7 +217,15 @@ private Digest getViewId() { private void newEpoch(Integer epoch) { log.trace("new epoch: {} on: {}", epoch, params().member().getId()); - transitions.newEpoch(epoch, lastEpoch); + assembly.newEpoch(); + var last = epoch >= maxEpoch && assembled; + if (last) { + controller.completeIt(); + Producer.this.transitions.viewComplete(); + } else { + ds.reset(); + } + transitions.newEpoch(epoch, last); } private Parameters params() { @@ -207,17 +233,10 @@ private Parameters params() { } private void processAssemblies(List aggregate) { - var joins = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList(); - final var ass = assembly; - if (ass != null) { - log.trace("Consuming {} units, {} assemblies on: {}", aggregate.size(), joins.size(), - params().member().getId()); - ass.inbound().accept(joins); - } else { - log.trace("Pending {} units, {} assemblies on: {}", aggregate.size(), joins.size(), - params().member().getId()); - pendingAssemblies.addAll(joins); - } + var aggs = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList(); + log.trace("Consuming {} assemblies from {} units on: {}", aggs.size(), aggregate.size(), + params().member().getId()); + assembly.assemble(aggs); } private void processPendingValidations(HashedBlock block, PendingBlock p) { @@ -234,29 +253,56 @@ private void processTransactions(boolean last, List aggregate) { HashedBlock lb = previousBlock.get(); final var txns = aggregate.stream().flatMap(e -> e.getTransactionsList().stream()).toList(); - if (!txns.isEmpty()) { - log.trace("transactions: {} combined hash: {} height: {} on: {}", txns.size(), txns.stream() - .map(t -> CHOAM.hashOf(t, - params().digestAlgorithm())) - .reduce(Digest::xor) - .orElse(null), - lb.height().add(1), params().member().getId()); - var builder = Executions.newBuilder(); - txns.forEach(builder::addExecutions); - - var next = new HashedBlock(params().digestAlgorithm(), - view.produce(lb.height().add(1), lb.hash, builder.build(), checkpoint.get())); - previousBlock.set(next); - - final var validation = view.generateValidation(next); - ds.offer(validation); - final var p = new PendingBlock(next, new HashMap<>(), new AtomicBoolean()); - pending.put(next.hash, p); - p.witnesses.put(params().member(), validation); - log.debug("Produced block: {} hash: {} height: {} prev: {} last: {} on: {}", next.block.getBodyCase(), - next.hash, next.height(), lb.hash, last, params().member().getId()); - processPendingValidations(next, p); + if (txns.isEmpty()) { + if (preblocks % 5 == 0) { + pending.values() + .stream() + .filter(pb -> pb.published.get()) + .max(Comparator.comparing(pb -> pb.block.height())) + .ifPresent(this::publish); + } + return; } + log.trace("transactions: {} combined hash: {} height: {} on: {}", txns.size(), + txns.stream().map(t -> CHOAM.hashOf(t, params().digestAlgorithm())).reduce(Digest::xor).orElse(null), + lb.height().add(1), params().member().getId()); + var builder = Executions.newBuilder(); + txns.forEach(builder::addExecutions); + + var next = new HashedBlock(params().digestAlgorithm(), + view.produce(lb.height().add(1), lb.hash, builder.build(), checkpoint.get())); + previousBlock.set(next); + + final var validation = view.generateValidation(next); + ds.offer(validation); + final var p = new PendingBlock(next, new HashMap<>(), new AtomicBoolean()); + pending.put(next.hash, p); + p.witnesses.put(params().member(), validation); + log.debug("Produced block: {} hash: {} height: {} prev: {} last: {} on: {}", next.block.getBodyCase(), + next.hash, next.height(), lb.hash, last, params().member().getId()); + processPendingValidations(next, p); + } + + private void produceAssemble(ViewAssembly.Vue v) { + final var vlb = previousBlock.get(); + var ass = Assemble.newBuilder() + .setView(View.newBuilder() + .setDiadem(v.diadem().toDigeste()) + .setMajority(v.majority()) + .addAllCommittee( + v.assembly().keySet().stream().sorted().map(Digest::toDigeste).toList())) + .build(); + final var assemble = new HashedBlock(params().digestAlgorithm(), + view.produce(vlb.height().add(1), vlb.hash, ass, checkpoint.get())); + previousBlock.set(assemble); + final var validation = view.generateValidation(assemble); + final var p = new PendingBlock(assemble, new HashMap<>(), new AtomicBoolean()); + pending.put(assemble.hash, p); + p.witnesses.put(params().member(), validation); + ds.offer(validation); + log.debug("View assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash, + assemble.height(), assemble.block.getBodyCase(), getViewId(), params().member().getId()); + transitions.assembled(); } private void publish(PendingBlock p) { @@ -273,22 +319,47 @@ private void publish(PendingBlock p) { } private void reconfigure() { - log.debug("Starting view reconfiguration: {} on: {}", nextViewId, params().member().getId()); - assembly = new ViewAssembly(nextViewId, view, Producer.this::addAssembly, comms) { - @Override - public void complete() { - super.complete(); - log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(), - params().member().getId()); - assembled.set(true); - Producer.this.transitions.viewComplete(); + final var slate = assembly.getSlate(); + assert slate != null && !slate.isEmpty() : "Slate is incorrect: %s".formatted( + slate.keySet().stream().sorted().toList()); + var reconfiguration = new HashedBlock(params().digestAlgorithm(), + view.reconfigure(slate, nextViewId, previousBlock.get(), + checkpoint.get())); + var validation = view.generateValidation(reconfiguration); + final var p = new PendingBlock(reconfiguration, new HashMap<>(), new AtomicBoolean()); + pending.put(reconfiguration.hash, p); + p.witnesses.put(params().member(), validation); + ds.offer(validation); + log.info("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(), + reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(), + params().member().getId()); + processPendingValidations(reconfiguration, p); + + log.trace("Draining on: {}", params().member().getId()); + ds.drain(); + final var dropped = ds.getRemainingTransactions(); + if (dropped != 0) { + log.warn("Dropped txns: {} on: {}", dropped, params().member().getId()); + } + } + + private void serial(List preblock, Boolean last) { + Thread.ofVirtual().start(() -> { + try { + serialize.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; } - }; - assembly.start(); - assembly.assembled(); - var joins = new ArrayList<>(pendingAssemblies); - pendingAssemblies.clear(); - assembly.inbound().accept(joins); + try { + preblocks++; + transitions.create(preblock, last); + } catch (Throwable t) { + log.error("Error processing preblock last: {} on: {}", last, params().member().getId(), t); + } finally { + serialize.release(); + } + }); } private PendingBlock validate(Validate v) { @@ -302,12 +373,13 @@ private PendingBlock validate(Validate v) { } private PendingBlock validate(Validate v, PendingBlock p, Digest hash) { + var from = Digest.from(v.getWitness().getId()); if (!view.validate(p.block, v)) { - log.trace("Invalid validate for: {} hash: {} on: {}", p.block.block.getBodyCase(), hash, + log.trace("Invalid validate from: {} for: {} hash: {} on: {}", from, p.block.block.getBodyCase(), hash, params().member().getId()); return null; } - p.witnesses.put(view.context().getMember(Digest.from(v.getWitness().getId())), v); + p.witnesses.put(view.context().getMember(from), v); return p; } @@ -318,46 +390,10 @@ record PendingBlock(HashedBlock block, Map witnesses, AtomicBo private class DriveIn implements Driven { @Override - public void assembled() { - if (!reconfigured.compareAndSet(false, true)) { - log.debug("assembly already complete on: {}", params().member().getId()); - return; - } - final var slate = assembly.getSlate(); - var reconfiguration = new HashedBlock(params().digestAlgorithm(), - view.reconfigure(slate, nextViewId, previousBlock.get(), - checkpoint.get())); - var validation = view.generateValidation(reconfiguration); - final var p = new PendingBlock(reconfiguration, new HashMap<>(), new AtomicBoolean()); - pending.put(reconfiguration.hash, p); - p.witnesses.put(params().member(), validation); - ds.offer(validation); - // controller.completeIt(); - log.info("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(), - reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(), - params().member().getId()); - processPendingValidations(reconfiguration, p); - } - - @Override - public void checkAssembly() { - ds.drain(); - final var dropped = ds.getRemainingTransactions(); - if (dropped != 0) { - log.warn("Dropped txns: {} on: {}", dropped, params().member().getId()); - } - final var viewAssembly = assembly; - if (viewAssembly == null) { - log.error("Assemble block never processed on: {}", params().member().getId()); - transitions.failed(); - return; - } - viewAssembly.finalElection(); - log.debug("Final view assembly election on: {}", params().member().getId()); - if (assembled.get()) { - assembled(); - controller.completeIt(); - } + public void assemble() { + log.debug("Starting view diadem consensus for: {} on: {}", nextViewId, params().member().getId()); + startProduction(); + assembly.start(); } @Override @@ -399,6 +435,11 @@ public void fail() { view.onFailure(); } + @Override + public void reconfigure() { + Producer.this.reconfigure(); + } + @Override public void startProduction() { log.debug("Starting production for: {} on: {}", getViewId(), params().member().getId()); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index 351e74ee7..33d536491 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -9,8 +9,7 @@ import com.chiralbehaviors.tron.Fsm; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; -import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; -import com.salesforce.apollo.choam.comm.Terminal; +import com.google.common.collect.Sets; import com.salesforce.apollo.choam.fsm.Reconfiguration; import com.salesforce.apollo.choam.fsm.Reconfiguration.Reconfigure; import com.salesforce.apollo.choam.fsm.Reconfiguration.Transitions; @@ -20,23 +19,23 @@ import com.salesforce.apollo.cryptography.proto.Digeste; import com.salesforce.apollo.cryptography.proto.PubKey; import com.salesforce.apollo.membership.Member; -import com.salesforce.apollo.ring.SliceIterator; -import com.salesforce.apollo.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.security.PublicKey; -import java.time.Duration; -import java.util.*; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; +import static com.salesforce.apollo.choam.ViewContext.print; import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey; import static com.salesforce.apollo.cryptography.QualifiedBase64.signature; @@ -51,26 +50,26 @@ */ public class ViewAssembly { - private final static Logger log = LoggerFactory.getLogger(ViewAssembly.class); - protected final Transitions transitions; - private final AtomicBoolean cancelSlice = new AtomicBoolean(); - private final Digest nextViewId; - private final Map viewProposals = new ConcurrentHashMap<>(); - private final Map proposals = new ConcurrentHashMap<>(); - private final Consumer publisher; - private final Map slate = new HashMap<>(); - private final ViewContext view; - private final CommonCommunications comms; - private final Set polled = Collections.newSetFromMap( - new ConcurrentSkipListMap<>()); - private volatile View selected; + private final static Logger log = LoggerFactory.getLogger(ViewAssembly.class); + protected final Transitions transitions; + private final Digest nextViewId; + private final Map viewProposals = new ConcurrentHashMap<>(); + private final Map proposals = new ConcurrentHashMap<>(); + private final Consumer publisher; + private final Map slate = new HashMap<>(); + private final ViewContext view; + private final CompletableFuture onConsensus; + private final AtomicInteger countdown = new AtomicInteger(); + private final List pendingJoins = new CopyOnWriteArrayList<>(); + private final AtomicBoolean started = new AtomicBoolean(false); + private volatile Vue selected; public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publisher, - CommonCommunications comms) { + CompletableFuture onConsensus) { view = vc; this.nextViewId = nextViewId; this.publisher = publisher; - this.comms = comms; + this.onConsensus = onConsensus; final Fsm fsm = Fsm.construct(new Recon(), Transitions.class, Reconfigure.AWAIT_ASSEMBLY, true); @@ -85,139 +84,116 @@ public Map getSlate() { } public void start() { + if (!started.compareAndSet(false, true)) { + return; + } transitions.fsm().enterStartState(); } - public void stop() { - cancelSlice.set(true); - } - - void assembled() { - transitions.assembled(); - } + void assemble(List asses) { + if (!started.get()) { + return; + } - void complete() { - cancelSlice.set(true); - proposals.entrySet() - .stream() - .forEach(e -> slate.put(e.getKey(), Join.newBuilder().setMember(e.getValue()).build())); - log.debug("View Assembly: {} completed with: {} members on: {}", nextViewId, slate.size(), - params().member().getId()); - } + if (asses.isEmpty()) { + return; + } - void finalElection() { - transitions.complete(); - } + var joins = asses.stream() + .flatMap(a -> a.getJoinsList().stream()) + .filter(view -> !proposals.containsKey(Digest.from(view.getJoin().getVm().getId()))) + .filter(signedJoin -> !SignedJoin.getDefaultInstance().equals(signedJoin)) + .filter(view::validate) + .toList(); + var views = asses.stream().flatMap(a -> a.getViewsList().stream()).filter(SignedViews::hasViews).toList(); - Consumer> inbound() { - return lre -> lre.forEach(this::assemble); - } + log.info("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId()); - private void assemble(Assemblies ass) { - log.info("Assembling {} joins and {} views on: {}", ass.getJoinsCount(), ass.getViewsCount(), - params().member().getId()); - ass.getJoinsList().stream().filter(view::validate).forEach(sj -> join(sj.getJoin(), false)); + joins.forEach(sj -> join(sj.getJoin(), false)); if (selected != null) { - log.info("View already selected: {} on: {}", selected.diadem, params().member().getId()); + if (!views.isEmpty()) { + log.trace("Already selected: {}, ignoring views: {} on: {}", selected.diadem, views.size(), + params().member().getId()); + } return; } - for (SignedViews svs : ass.getViewsList()) { + views.forEach(svs -> { if (view.validate(svs)) { + log.info("Adding views: {} from: {} on: {}", + svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + Digest.from(svs.getViews().getMember()), params().member().getId()); viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews()); + } else { + log.info("Invalid views: {} from: {} on: {}", + svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + Digest.from(svs.getViews().getMember()), params().member().getId()); } + }); + if (viewProposals.size() >= params().majority()) { + transitions.proposed(); + } else { + log.trace("Incomplete view proposals: {} is less than majority: {} views: {} on: {}", viewProposals.size(), + params().majority(), viewProposals.keySet().stream().sorted().toList(), + params().member().getId()); } - vote(); - } - - private Map assemblyOf(List committee) { - var last = view.pendingViews().last(); - return committee.stream() - .map(d -> last.context().getMember(Digest.from(d))) - .collect(Collectors.toMap(m -> m.getId(), m -> m)); - } - - private void castVote() { - var views = view.pendingViews() - .getViews(nextViewId) - .setMember(params().member().getId().toDigeste()) - .setVid(nextViewId.toDigeste()) - .build(); - log.info("Voting for: {} on: {}", nextViewId, params().member().getId()); - publisher.accept(Assemblies.newBuilder() - .addViews( - SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig())) - .build()); } - private void castVote(Views vs, List majorities, - Multiset consensus) { - var ordered = vs.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(); - var lastIndex = -1; - com.salesforce.apollo.choam.proto.View last = null; - for (var v : majorities) { - var i = ordered.indexOf(Digest.from(v.getDiadem())); - if (i != -1) { - if (i > lastIndex) { - last = v; - lastIndex = i; - } - } + boolean complete() { + if (selected == null) { + log.info("Cannot complete view assembly: {} as selected is null on: {}", nextViewId, + params().member().getId()); + transitions.failed(); + return false; } - if (last != null) { - consensus.add(last); + if (proposals.size() < selected.majority) { + log.info("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId, + proposals.keySet().stream().sorted().toList(), selected.majority, params().member().getId()); + transitions.failed(); + return false; + } + proposals.forEach((d, svm) -> slate.put(d, Join.newBuilder().setMember(svm).build())); + + // Fill out the proposals with the unreachable members of the next assembly + var missing = Sets.difference(selected.assembly.keySet(), proposals.keySet()); + if (!missing.isEmpty()) { + log.info("Missing proposals: {} on: {}", missing.stream().sorted().toList(), params().member().getId()); + missing.forEach(m -> slate.put(m, Join.newBuilder() + .setMember(SignedViewMember.newBuilder() + .setVm(ViewMember.newBuilder() + .setId(m.toDigeste()) + .setView( + nextViewId.toDigeste()))) + .build())); } + assert slate.size() == selected.assembly.size() : "Invalid slate: " + slate.size() + " expected: " + + selected.assembly.size(); + log.debug("View Assembly: {} completed assembly: {} on: {}", nextViewId, + slate.keySet().stream().sorted().toList(), params().member().getId()); + transitions.complete(); + return true; } - private void completeSlice(Duration retryDelay, AtomicReference reiterate) { - if (gathered()) { + void join(SignedViewMember svm, boolean direct) { + if (!started.get()) { return; } - log.trace("Proposal incomplete of: {} polled: {}, total: {} retrying: {} on: {}", nextViewId, - polled.stream().sorted().toList(), selected.assembly.size(), retryDelay, params().member().getId()); - if (!cancelSlice.get()) { - Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()) - .schedule(() -> Thread.ofVirtual().start(Utils.wrapped(reiterate.get(), log)), - retryDelay.toNanos(), TimeUnit.NANOSECONDS); - } - } - - private boolean consider(Optional futureSailor, Terminal term, Member m) { - if (futureSailor.isEmpty()) { - return !gathered(); - } - SignedViewMember signedViewMember; - signedViewMember = futureSailor.get(); - if (signedViewMember.equals(SignedViewMember.getDefaultInstance())) { - log.debug("Empty join response from: {} on: {}", term.getMember().getId(), params().member().getId()); - return !gathered(); - } - var vm = new Digest(signedViewMember.getVm().getId()); - if (!m.getId().equals(vm)) { - log.debug("Invalid join response from: {} expected: {} on: {}", term.getMember().getId(), vm, + final var mid = Digest.from(svm.getVm().getId()); + if (proposals.containsKey(mid)) { + log.trace("Redundant join from: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); - return !gathered(); + return; } - log.debug("Join reply from: {} on: {}", term.getMember().getId(), params().member().getId()); - join(signedViewMember, true); - return !gathered(); - } - - private boolean gathered() { - if (polled.size() == selected.assembly.size()) { - log.trace("Polled all +"); - cancelSlice.set(true); - return true; + if (selected == null) { + pendingJoins.add(svm); + log.trace("Pending join from: {} on: {}", print(svm, params().digestAlgorithm()), + params().member().getId()); + return; } - return false; - } - - private void join(SignedViewMember svm, boolean direct) { - final var mid = Digest.from(svm.getVm().getId()); final var m = selected.assembly.get(mid); if (m == null) { if (log.isTraceEnabled()) { - log.trace("Invalid view member: {} on: {}", ViewContext.print(svm, params().digestAlgorithm()), + log.trace("Invalid view member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } return; @@ -225,20 +201,19 @@ private void join(SignedViewMember svm, boolean direct) { var viewId = Digest.from(svm.getVm().getView()); if (!nextViewId.equals(viewId)) { if (log.isTraceEnabled()) { - log.trace("Invalid view id for member: {} on: {}", ViewContext.print(svm, params().digestAlgorithm()), + log.trace("Invalid view id for member: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } return; } if (log.isDebugEnabled()) { - log.debug("Join request from: {} vm: {} on: {}", m.getId(), - ViewContext.print(svm, params().digestAlgorithm()), params().member().getId()); + log.debug("Join of: {} on: {}", print(svm, params().digestAlgorithm()), params().member().getId()); } if (!m.verify(JohnHancock.from(svm.getSignature()), svm.getVm().toByteString())) { if (log.isTraceEnabled()) { - log.trace("Invalid signature for view member: {} on: {}", - ViewContext.print(svm, params().digestAlgorithm()), params().member().getId()); + log.trace("Invalid signature for view member: {} on: {}", print(svm, params().digestAlgorithm()), + params().member().getId()); } return; } @@ -248,7 +223,7 @@ private void join(SignedViewMember svm, boolean direct) { if (!m.verify(signature(svm.getVm().getSignature()), encoded.toByteString())) { if (log.isTraceEnabled()) { log.trace("Could not verify consensus key from view member: {} on: {}", - ViewContext.print(svm, params().digestAlgorithm()), params().member().getId()); + print(svm, params().digestAlgorithm()), params().member().getId()); } return; } @@ -257,31 +232,51 @@ private void join(SignedViewMember svm, boolean direct) { if (consensusKey == null) { if (log.isTraceEnabled()) { log.trace("Could not deserialize consensus key from view member: {} on: {}", - ViewContext.print(svm, params().digestAlgorithm()), params().member().getId()); + print(svm, params().digestAlgorithm()), params().member().getId()); } return; } - polled.add(mid); - if (proposals.putIfAbsent(mid, svm) == null) { - if (direct) { - var signature = view.sign(svm); - publisher.accept(Assemblies.newBuilder() - .addJoins(SignedJoin.newBuilder() - .setJoin(svm) - .setMember(params().member().getId().toDigeste()) - .setSignature(signature.toSig()) - .build()) - .build()); - if (log.isTraceEnabled()) { - log.trace("Publishing view member: {} sig: {} on: {}", - ViewContext.print(svm, params().digestAlgorithm()), - params().digestAlgorithm().digest(signature.toSig().toByteString()), - params().member().getId()); - } - } else if (log.isTraceEnabled()) { - log.trace("Adding discovered view member: {} on: {}", - ViewContext.print(svm, params().digestAlgorithm()), params().member().getId()); + + if (direct) { + var signature = view.sign(svm); + publisher.accept(Assemblies.newBuilder() + .addJoins(SignedJoin.newBuilder() + .setJoin(svm) + .setMember(params().member().getId().toDigeste()) + .setSignature(signature.toSig()) + .build()) + .build()); + if (log.isTraceEnabled()) { + log.trace("Publishing view member: {} sig: {} on: {}", print(svm, params().digestAlgorithm()), + params().digestAlgorithm().digest(signature.toSig().toByteString()), + params().member().getId()); } + } else if (proposals.putIfAbsent(mid, svm) == null) { + log.trace("Adding discovered view member: {} on: {}", print(svm, params().digestAlgorithm()), + params().member().getId()); + } + checkAssembly(); + } + + void newEpoch() { + var current = countdown.decrementAndGet(); + if (current == 0) { + transitions.countdownCompleted(); + } + } + + private Map assemblyOf(List committee) { + var last = view.pendingViews().last(); + return committee.stream() + .map(d -> last.context().getMember(Digest.from(d))) + .collect(Collectors.toMap(Member::getId, m -> m)); + } + + private void checkAssembly() { + if (proposals.size() == selected.majority) { + transitions.certified(); + } else if (proposals.size() >= selected.majority) { + transitions.gathered(); } } @@ -289,130 +284,135 @@ private Parameters params() { return view.params(); } + private void propose() { + var views = view.pendingViews() + .getViews(nextViewId) + .setMember(params().member().getId().toDigeste()) + .setVid(nextViewId.toDigeste()) + .build(); + log.info("Proposing for: {} views: {} on: {}", nextViewId, + views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + params().member().getId()); + publisher.accept(Assemblies.newBuilder() + .addViews( + SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig())) + .build()); + } + + private void propose(Views vs, List majorities, Multiset consensus) { + var ordered = vs.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(); + var lastIndex = -1; + View last = null; + for (var v : majorities) { + var i = ordered.indexOf(Digest.from(v.getDiadem())); + if (i != -1) { + if (i > lastIndex) { + last = v; + lastIndex = i; + } + } + } + if (last != null) { + consensus.add(last); + } + } + private void vote() { - Multiset candidates = HashMultiset.create(); + Multiset candidates = HashMultiset.create(); viewProposals.values().forEach(v -> candidates.addAll(v.getViewsList())); var majority = params().majority(); var majorities = candidates.entrySet() .stream() .filter(e -> e.getCount() >= majority) - .map(e -> e.getElement()) + .map(Multiset.Entry::getElement) .toList(); if (majorities.isEmpty()) { - log.info("No majority views on: {}", params().member().getId()); + log.info("No majority views: {} on: {}", candidates.entrySet() + .stream() + .map(e -> "%s:%s".formatted(e.getCount(), Digest.from( + e.getElement().getDiadem()))) + .toList(), params().member().getId()); return; } if (log.isTraceEnabled()) { - log.trace("Majority views: {} on: {}", majorities.stream().map(v -> Digest.from(v.getDiadem())), + log.trace("Majority views: {} on: {}", majorities.stream().map(v -> Digest.from(v.getDiadem())).toList(), params().member().getId()); } - Multiset consensus = HashMultiset.create(); - viewProposals.values().forEach(vs -> { - castVote(vs, majorities, consensus); - }); + Multiset consensus = HashMultiset.create(); + viewProposals.values().forEach(vs -> propose(vs, majorities, consensus)); var ratification = consensus.entrySet() .stream() .filter(e -> e.getCount() >= majority) - .map(e -> e.getElement()) - .collect(Collectors.toList()); + .map(Multiset.Entry::getElement) + .sorted(Comparator.comparing(view -> Digest.from(view.getDiadem()))) + .toList(); if (consensus.isEmpty()) { - log.info("No consensus views on: {}", params().member().getId()); + log.debug("No consensus views on: {}", params().member().getId()); return; } else if (log.isTraceEnabled()) { - log.trace("Consensus views: {} on: {}", ratification.stream().map(v -> Digest.from(v.getDiadem())), + log.debug("Consensus views: {} on: {}", ratification.stream().map(v -> Digest.from(v.getDiadem())).toList(), params().member().getId()); } - ratification.sort(Comparator.comparing(v -> Digest.from(v.getDiadem()))); var winner = ratification.getFirst(); - selected = new View(Digest.from(winner.getDiadem()), assemblyOf(winner.getCommitteeList())); + selected = new Vue(Digest.from(winner.getDiadem()), assemblyOf(winner.getCommitteeList()), + winner.getMajority()); if (log.isDebugEnabled()) { - log.debug("Selected view: {} on: {}", selected, params().member().getId()); + log.debug("Selected: {} on: {}", selected, params().member().getId()); } - transitions.viewDetermined(); + onConsensus.complete(selected); + transitions.viewAcquired(); + pendingJoins.forEach(svm -> join(svm, false)); + pendingJoins.clear(); } - private record View(Digest diadem, Map assembly) { - public View(CHOAM.PendingView view, Digest viewId) { - this(view.diadem(), Committee.viewMembersOf(viewId, view.context()) - .stream() - .collect(Collectors.toMap(m -> m.getId(), m -> m))); - + public record Vue(Digest diadem, Map assembly, int majority) { + @Override + public String toString() { + return "View{" + "diadem=" + diadem + ", assembly=" + assembly.keySet().stream().sorted().toList() + '}'; } } private class Recon implements Reconfiguration { - @Override public void certify() { - if (proposals.size() == selected.assembly.size()) { - cancelSlice.set(true); - log.debug("Certifying: {} required: {} of: {} slate: {} on: {}", proposals.size(), - selected.assembly.size(), nextViewId, proposals.keySet().stream().sorted().toList(), - params().member().getId()); + if (proposals.size() == selected.majority) { + log.debug("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority, + nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); transitions.certified(); } else { - log.debug("Not certifying: {} required: {} slate: {} of: {} on: {}", proposals.size(), - selected.assembly.size(), proposals.entrySet().stream().sorted().toList(), nextViewId, - params().member().getId()); + countdown.set(3); + log.debug("Not certifying: {} majority: {} slate: {} of: {} on: {}", nextViewId, selected.majority, + proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); } } - @Override - public void complete() { - ViewAssembly.this.complete(); + public void checkAssembly() { + ViewAssembly.this.checkAssembly(); + } + + public void checkViews() { + vote(); } @Override - public void elect() { - proposals.entrySet().forEach(e -> slate.put(e.getKey(), joinOf(e.getValue()))); - if (slate.size() == view.context().getRingCount()) { - cancelSlice.set(true); - log.debug("Electing: {} of: {} slate: {} on: {}", slate.size(), nextViewId, - slate.keySet().stream().sorted().toList(), params().member().getId()); - transitions.complete(); - } else { - log.error("Failed election, required: {} slate: {} of: {} on: {}", view.context().getRingCount(), - proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); - } + public void complete() { + ViewAssembly.this.complete(); } @Override public void failed() { - stop(); view.onFailure(); log.debug("Failed view assembly for: {} on: {}", nextViewId, params().member().getId()); } @Override - public void gather() { - if (selected == null) { - selected = new View(view.pendingViews().last(), nextViewId); - } - log.trace("Gathering assembly for: {} on: {}", nextViewId, params().member().getId()); - AtomicReference reiterate = new AtomicReference<>(); - var retryDelay = Duration.ofMillis(10); - reiterate.set(() -> { - var slice = new ArrayList<>(selected.assembly.values()); - var committee = new SliceIterator<>("Committee for " + nextViewId, params().member(), slice, comms); - committee.iterate((term, m) -> { - if (polled.contains(m.getId())) { - return null; - } - log.trace("Requesting Join from: {} on: {}", term.getMember().getId(), params().member().getId()); - return term.join(nextViewId); - }, ViewAssembly.this::consider, () -> completeSlice(retryDelay, reiterate), params().gossipDuration()); - }); - reiterate.get().run(); - } - - @Override - public void nominate() { - transitions.nominated(); + public void finish() { + started.set(false); } @Override - public void viewAgreement() { - ViewAssembly.this.castVote(); + public void publishViews() { + propose(); } private Join joinOf(SignedViewMember vm) { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java index b00dd32ec..696933e4e 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java @@ -77,6 +77,10 @@ public Block checkpoint() { return blockProducer.checkpoint(); } + public int committeeSize() { + return validators.size(); + } + public Context context() { return context; } @@ -150,6 +154,10 @@ public Block produce(ULong l, Digest hash, Executions executions, HashedBlock ch return blockProducer.produce(l, hash, executions, checkpoint); } + public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) { + return blockProducer.produce(height, prev, assemble, checkpoint); + } + public void publish(HashedCertifiedBlock block) { blockProducer.publish(block.hash, block.certifiedBlock); } @@ -172,7 +180,9 @@ public JohnHancock sign(SignedViewMember svm) { public JohnHancock sign(Views views) { if (log.isTraceEnabled()) { - log.trace("Signing views on: {}", params.member().getId()); + log.trace("Signing views: {} on: {}", + views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(), + params.member().getId()); } return signer.sign(views.toByteString()); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/Concierge.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/Concierge.java index 9aea366ba..d8baf25c0 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/Concierge.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/Concierge.java @@ -6,6 +6,7 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.protobuf.Empty; import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.cryptography.Digest; @@ -20,7 +21,7 @@ public interface Concierge { Blocks fetchViewChain(BlockReplication request, Digest from); - SignedViewMember join(Digest nextView, Digest from); + Empty join(SignedViewMember nextView, Digest from); Initial sync(Synchronize request, Digest from); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java index a54b4c675..394f4664d 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/Terminal.java @@ -6,9 +6,9 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.Link; import com.salesforce.apollo.choam.proto.*; -import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; @@ -47,8 +47,8 @@ public Member getMember() { } @Override - public SignedViewMember join(Digest nextView) { - return service.join(nextView, member.getId()); + public Empty join(SignedViewMember join) { + return service.join(join, member.getId()); } @Override @@ -64,7 +64,7 @@ public Initial sync(Synchronize sync) { Blocks fetchViewChain(BlockReplication replication); - SignedViewMember join(Digest nextView); + Empty join(SignedViewMember join); Initial sync(Synchronize sync); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java index 0a3c237ff..26ee81aa0 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalClient.java @@ -6,11 +6,11 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.ManagedServerChannel; import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications; import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.choam.support.ChoamMetrics; -import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.membership.Member; /** @@ -60,8 +60,8 @@ public Member getMember() { } @Override - public SignedViewMember join(Digest nextView) { - return client.join(nextView.toDigeste()); + public Empty join(SignedViewMember vm) { + return client.join(vm); } public void release() { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalServer.java b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalServer.java index 28b74430b..d31c7371e 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalServer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/comm/TerminalServer.java @@ -6,12 +6,12 @@ */ package com.salesforce.apollo.choam.comm; +import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.RoutableService; import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.choam.proto.TerminalGrpc.TerminalImplBase; import com.salesforce.apollo.choam.support.ChoamMetrics; import com.salesforce.apollo.cryptography.Digest; -import com.salesforce.apollo.cryptography.proto.Digeste; import com.salesforce.apollo.protocols.ClientIdentity; import io.grpc.stub.StreamObserver; @@ -70,14 +70,14 @@ public void fetchViewChain(BlockReplication request, StreamObserver resp } @Override - public void join(Digeste nextView, StreamObserver responseObserver) { + public void join(SignedViewMember request, StreamObserver responseObserver) { Digest from = identity.getFrom(); if (from == null) { responseObserver.onError(new IllegalStateException("Member has been removed")); return; } router.evaluate(responseObserver, s -> { - responseObserver.onNext(s.join(Digest.from(nextView), from)); + responseObserver.onNext(s.join(request, from)); responseObserver.onCompleted(); }); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Combine.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Combine.java index 4f4235bac..93dcbe4a1 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Combine.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Combine.java @@ -35,6 +35,8 @@ public interface Combine { void regenerate(); + void rotateViewKeys(); + enum Mercantile implements Transitions { AWAITING_REGENERATION { @Exit @@ -97,6 +99,12 @@ public Transitions combine() { return null; } + @Override + public Transitions rotateViewKeys() { + context().rotateViewKeys(); + return null; + } + }, PROTOCOL_FAILURE { @Entry public void failIt() { @@ -151,6 +159,17 @@ public Transitions nextView() { return RECOVERING; } + @Override + public Transitions regenerated() { + return OPERATIONAL; + } + + @Override + public Transitions rotateViewKeys() { + context().rotateViewKeys(); + return OPERATIONAL; + } + @Entry public void regenerateView() { context().regenerate(); @@ -160,11 +179,11 @@ public void regenerateView() { public Transitions combine() { return null; // Just queue up any blocks } - }; - @Override - public Transitions regenerated() { - return OPERATIONAL; + @Override + public Transitions synchd() { + return OPERATIONAL; + } } } @@ -201,10 +220,18 @@ default Transitions regenerated() { throw fsm().invalidTransitionOn(); } + default Transitions rotateViewKeys() { + return null; + } + default Transitions start() { throw fsm().invalidTransitionOn(); } + default Transitions synchd() { + throw fsm().invalidTransitionOn(); + } + default Transitions synchronizationFailed() { throw fsm().invalidTransitionOn(); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java index 88ae2bada..9b290ed26 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java @@ -21,9 +21,7 @@ */ public interface Driven { - void assembled(); - - void checkAssembly(); + void assemble(); void checkpoint(); @@ -33,30 +31,25 @@ public interface Driven { void fail(); + void reconfigure(); + void startProduction(); enum Earner implements Driven.Transitions { - AWAIT_VIEW { + ASSEMBLE { @Entry - public void checkAssembly() { - context().checkAssembly(); + public void assemble() { + context().assemble(); } @Override - public Transitions create(List preblock, boolean last) { - context().checkAssembly(); - return super.create(preblock, last); - } - - @Override - public Transitions lastBlock() { - return COMPLETE; + public Transitions assembled() { + return SPICE; } @Override - public Transitions viewComplete() { - context().assembled(); - return null; + public Transitions newEpoch(int epoch, boolean lastEpoch) { + return lastEpoch ? PROTOCOL_FAILURE : null; } }, CHECKPOINTING { @Entry @@ -66,7 +59,7 @@ public void check() { @Override public Transitions checkpointed() { - return SPICE; + return ASSEMBLE; } }, COMPLETE { @Entry @@ -81,7 +74,20 @@ public Transitions checkpoint() { @Override public Transitions start() { - return SPICE; + return ASSEMBLE; + } + }, END_EPOCHS { + @Override + public Transitions newEpoch(int epoch, boolean lastEpoch) { + if (lastEpoch) { + context().reconfigure(); + } + return null; + } + + @Override + public Transitions lastBlock() { + return COMPLETE; } }, PROTOCOL_FAILURE { @Override @@ -116,18 +122,13 @@ public void terminate() { } }, SPICE { @Override - public Transitions newEpoch(int epoch, int lastEpoch) { - return (lastEpoch == epoch) ? AWAIT_VIEW : null; - } - - @Entry - public void startProduction() { - context().startProduction(); + public Transitions newEpoch(int epoch, boolean lastEpoch) { + return lastEpoch ? PROTOCOL_FAILURE : null; } @Override public Transitions viewComplete() { - return null; + return END_EPOCHS; } } } @@ -136,6 +137,10 @@ public Transitions viewComplete() { interface Transitions extends FsmExecutor { Logger log = LoggerFactory.getLogger(Transitions.class); + default Transitions assembled() { + throw fsm().invalidTransitionOn(); + } + default Transitions checkpoint() { throw fsm().invalidTransitionOn(); } @@ -161,7 +166,7 @@ default Transitions lastBlock() { throw fsm().invalidTransitionOn(); } - default Transitions newEpoch(int epoch, int lastEpoch) { + default Transitions newEpoch(int epoch, boolean lastEpoch) { throw fsm().invalidTransitionOn(); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java index 0cdf3ac47..e917bc716 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java @@ -41,6 +41,11 @@ public Transitions process(List preblock, boolean last) { context().certify(preblock, last); return last ? PUBLISH : null; } + + @Override + public Transitions nextEpoch(Integer epoch) { + return null; + } }, FAIL { }, INITIAL { @Entry diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java index d69d515a3..01146befd 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java @@ -15,75 +15,69 @@ public interface Reconfiguration { void certify(); - void complete(); + void checkAssembly(); - void elect(); + void checkViews(); - void failed(); + void complete(); - void gather(); + void failed(); - void nominate(); + void finish(); - void viewAgreement(); + void publishViews(); enum Reconfigure implements Transitions { AWAIT_ASSEMBLY { + // Publish the Views of this node + @Entry + public void publish() { + context().publishViews(); + } + + // We have a majority of members submitting view proposals @Override - public Transitions assembled() { + public Transitions proposed() { return VIEW_AGREEMENT; } }, CERTIFICATION { + // We have a full complement of the committee view proposals @Override public Transitions certified() { - return RECONFIGURE; + return RECONFIGURED; } + // We have waited to get a full compliment and have now finished waiting + @Override + public Transitions countdownCompleted() { + return RECONFIGURED; + } + + // See if we already have a full complement of Joins of the next committee + // if not set a deadline @Entry public void certify() { context().certify(); } - + }, GATHER { + // We have a majority of the new committee Joins @Override public Transitions gathered() { return CERTIFICATION; } + // We have a full complement of the new committee Joins @Override - public Transitions validation() { + public Transitions certified() { return CERTIFICATION; } - }, GATHER { - @Entry - public void assembly() { - context().gather(); - } - - @Override - public Transitions gathered() { - return NOMINATION; - } - @Override - public Transitions viewDetermined() { - return null; - } - }, NOMINATION { + // Check to see if we already have a full complement of committee Joins @Entry - public void nominate() { - context().nominate(); - } - - @Override - public Transitions nominated() { - return CERTIFICATION; + public void gather() { + context().checkAssembly(); } }, PROTOCOL_FAILURE { - @Override - public Transitions assembled() { - return null; - } - @Override public Transitions certified() { return null; @@ -99,69 +93,58 @@ public Transitions failed() { return null; } - @Override - public Transitions gathered() { - return null; - } - - @Override - public Transitions nominated() { - return null; - } - @Entry public void terminate() { context().failed(); } - - @Override - public Transitions validation() { - return null; - } - }, RECONFIGURE { - @Override - public Transitions complete() { - return RECONFIGURED; - } - - @Entry - public void elect() { - context().elect(); - } }, RECONFIGURED { + // Finish and close down the assembly @Override public Transitions complete() { + context().finish(); return null; } + // Complete the configuration protocol + // The slate of the ViewAssembly now contains + // the SignedViewMembers of the next committee @Entry public void completion() { context().complete(); } }, VIEW_AGREEMENT { + // Vote on the Views gathered @Entry public void viewConsensus() { - context().viewAgreement(); + context().checkViews(); } + // The View for the assembly has been selected @Override - public Transitions viewDetermined() { + public Transitions viewAcquired() { return GATHER; } + + // no op+ + @Override + public Transitions proposed() { + return null; + } } } interface Transitions extends FsmExecutor { - default Transitions assembled() { - throw fsm().invalidTransitionOn(); - } default Transitions certified() { throw fsm().invalidTransitionOn(); } default Transitions complete() { - return Reconfigure.RECONFIGURE; + throw fsm().invalidTransitionOn(); + } + + default Transitions countdownCompleted() { + throw fsm().invalidTransitionOn(); } default Transitions failed() { @@ -169,18 +152,14 @@ default Transitions failed() { } default Transitions gathered() { - return null; - } - - default Transitions nominated() { throw fsm().invalidTransitionOn(); } - default Transitions validation() { - return null; + default Transitions proposed() { + throw fsm().invalidTransitionOn(); } - default Transitions viewDetermined() { + default Transitions viewAcquired() { throw fsm().invalidTransitionOn(); } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java index 1b15d8022..a47e4438d 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java @@ -120,7 +120,8 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { .equals(Unsigned.ulong(0)) : "Should not attempt when bootstrapping from genesis"; var diadem = HexBloom.from(checkpoint.block.getCheckpoint().getCrown()); log.info("Assembling from checkpoint: {}:{} crown: {} last cp: {} on: {}", checkpoint.height(), checkpoint.hash, - diadem, Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()), params.member().getId()); + diadem.compactWrapped(), Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()), + params.member().getId()); CheckpointAssembler assembler = new CheckpointAssembler(params.gossipDuration(), checkpoint.height(), checkpoint.block.getCheckpoint(), params.member(), @@ -132,7 +133,7 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { if (!cps.validate(diadem, Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()))) { throw new IllegalStateException("Cannot validate checkpoint: " + checkpoint.height()); } - log.info("Restored checkpoint: {} diadem: {} on: {}", checkpoint.height(), diadem.compact(), + log.info("Restored checkpoint: {} diadem: {} on: {}", checkpoint.height(), diadem.compactWrapped(), params.member().getId()); checkpointState = cps; }); @@ -142,7 +143,13 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { .filter(cb -> cb.getBlock().hasReconfigure()) .map(cb -> new HashedCertifiedBlock(params.digestAlgorithm(), cb)) .forEach(reconfigure -> store.put(reconfigure)); - scheduleViewChainCompletion(new AtomicReference<>(checkpointView.height()), ULong.valueOf(0)); + var lastReconfig = ULong.valueOf(checkpointView.block.getHeader().getLastReconfig()); + var zero = ULong.valueOf(0); + if (lastReconfig.equals(zero)) { + viewChainSynchronized.complete(true); + } else { + scheduleViewChainCompletion(new AtomicReference<>(lastReconfig), zero); + } } private boolean completeAnchor(Optional futureSailor, AtomicReference start, ULong end, @@ -213,7 +220,9 @@ private Blocks completeViewChain(Terminal link, AtomicReference start, UL long seed = Entropy.nextBitsStreamLong(); ULongBloomFilter blocksBff = new BloomFilter.ULongBloomFilter(seed, params.bootstrap().maxViewBlocks() * 2, params.combine().falsePositiveRate()); - start.set(store.lastViewChainFrom(start.get())); + var lastViewChain = store.lastViewChainFrom(start.get()); + assert lastViewChain != null : "last view chain from: " + start.get() + " is null"; + start.set(lastViewChain); store.viewChainFrom(start.get(), end).forEachRemaining(h -> blocksBff.add(h)); BlockReplication replication = BlockReplication.newBuilder() .setBlocksBff(blocksBff.toBff()) @@ -467,7 +476,7 @@ private void validateAnchor() { try { store.validate(anchor.height(), to); anchorSynchronized.complete(true); - log.info("Anchor chain to checkpoint synchronized on: {}", params.member().getId()); + log.info("Anchor chain to checkpoint synchronized to: {} on: {}", to, params.member().getId()); } catch (Throwable e) { log.error("Anchor chain from: {} to: {} does not validate on: {}", anchor.height(), to, params.member().getId(), e); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java index 548179ab3..809d764fc 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java @@ -77,8 +77,8 @@ public CompletableFuture assemble(ScheduledExecutorService sche } private void assembled(CheckpointState cs) { - log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem, - member.getId()); + log.info("Assembled checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), + diadem.compactWrapped(), member.getId()); assembled.complete(cs); } @@ -110,8 +110,8 @@ private void gossip(ScheduledExecutorService scheduler, Duration duration) { if (assembled.isDone()) { return; } - log.info("Assembly of checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), diadem, - member.getId()); + log.info("Assembly of checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(), + diadem.compactWrapped(), member.getId()); var ringer = new RingIterator<>(frequency, context, member, comms, true, scheduler); ringer.iterate(digestAlgorithm.random(), (link, ring) -> gossip(link), (tally, result, destination) -> gossip(result), t -> scheduler.schedule( diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/ExponentialBackoffPolicy.java b/choam/src/main/java/com/salesforce/apollo/choam/support/ExponentialBackoffPolicy.java index 2f959a38b..f514edff7 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/ExponentialBackoffPolicy.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/ExponentialBackoffPolicy.java @@ -58,7 +58,7 @@ private long uniformRandom(double low, double high) { return (long) (Entropy.nextBitsStreamDouble() * mag + low); } - public static class Builder { + public static class Builder implements Cloneable { private Duration initialBackoff = Duration.ofMillis(10); private double jitter = .2; private Duration maxBackoff = Duration.ofMillis(500); @@ -68,6 +68,14 @@ public ExponentialBackoffPolicy build() { return new ExponentialBackoffPolicy(initialBackoff, jitter, maxBackoff, multiplier); } + public Builder clone() { + try { + return (Builder) super.clone(); + } catch (CloneNotSupportedException e) { + throw new IllegalStateException(e); + } + } + public Duration getInitialBackoff() { return initialBackoff; } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Store.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Store.java index 11c0a07f5..e15df2bb2 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Store.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Store.java @@ -72,7 +72,7 @@ public byte[] block(ULong height) { public Iterator blocksFrom(ULong from, ULong to, int max) { return new Iterator<>() { ULong next; - int remaining = max; + int remaining = max; { next = from == null ? ULong.valueOf(0) : from; @@ -247,7 +247,7 @@ public ULong lastViewChainFrom(ULong height) { } next = viewChain.get(next); } - return last; + return next == null ? last : next; } public void put(HashedCertifiedBlock cb) { @@ -353,8 +353,8 @@ public Iterator viewChainFrom(ULong from, ULong to) { ULong next; { - next = viewChain.get(from); - if (!viewChain.containsKey(next)) { + next = from; + if (!viewChain.containsKey(from)) { next = null; } } @@ -412,10 +412,12 @@ private void put(Digest hash, Block block) { hashes.put(height, hash); hashToHeight.put(hash, height); if (block.hasReconfigure() || block.hasGenesis()) { + log.trace("insert view chain: {}:{}", height, hash); viewChain.put(ULong.valueOf(block.getHeader().getHeight()), ULong.valueOf(block.getHeader().getLastReconfig())); + } else { + log.trace("insert: {}:{}", height, hash); } - log.trace("insert: {}:{}", height, hash); } private T transactionally(Callable action) throws ExecutionException { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java index 350908e90..8053d4878 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java @@ -65,13 +65,13 @@ public void close() { if (metrics != null) { metrics.dropped(processing.size(), validations.size(), assemblies.size()); } - log.trace("Closing with remaining txns: {}({}:{}) validations: {} assemblies: {} on: {}", processing.size(), + log.debug("Closing with remaining txns: {}({}:{}) validations: {} assemblies: {} on: {}", processing.size(), processing.added(), processing.taken(), validations.size(), assemblies.size(), member.getId()); } public void drain() { draining.set(true); - log.trace("Draining with remaining txns: {}({}:{}) on: {}", processing.size(), processing.added(), + log.debug("Draining with remaining txns: {}({}:{}) on: {}", processing.size(), processing.added(), processing.taken(), member.getId()); } @@ -171,4 +171,10 @@ public boolean offer(Transaction txn) { public void offer(Validate generateValidation) { validations.offer(generateValidation); } + + public void reset() { + log.debug("Clearing with remaining txns: {}({}:{}) validations: {} assemblies: {} on: {}", processing.size(), + processing.added(), processing.taken(), validations.size(), assemblies.size(), member.getId()); + processing.clear(); + } } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java new file mode 100644 index 000000000..cfd225050 --- /dev/null +++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java @@ -0,0 +1,229 @@ +package com.salesforce.apollo.choam; + +import com.salesforce.apollo.archipelago.LocalServer; +import com.salesforce.apollo.archipelago.Router; +import com.salesforce.apollo.archipelago.ServerConnectionCache; +import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; +import com.salesforce.apollo.context.Context; +import com.salesforce.apollo.context.DynamicContext; +import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.ethereal.Config; +import com.salesforce.apollo.membership.Member; +import com.salesforce.apollo.membership.SigningMember; +import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; +import com.salesforce.apollo.stereotomy.StereotomyImpl; +import com.salesforce.apollo.stereotomy.mem.MemKERL; +import com.salesforce.apollo.stereotomy.mem.MemKeyStore; +import com.salesforce.apollo.utils.Utils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.security.SecureRandom; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * @author hal.hildebrand + **/ +public class DynamicTest { + private static final int cardinality = 10; + private static final int checkpointBlockSize = 10; + + private List members; + private Map routers; + private Map choams; + private Map> contexts; + + @BeforeEach + public void setUp() throws Exception { + choams = new HashMap<>(); + contexts = new HashMap<>(); + var contextBuilder = DynamicContext.newBuilder().setBias(3); + SecureRandom entropy = SecureRandom.getInstance("SHA1PRNG"); + entropy.setSeed(new byte[] { 6, 6, 6 }); + var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy); + + members = IntStream.range(0, cardinality) + .mapToObj(_ -> stereotomy.newIdentifier()) + .map(ControlledIdentifierMember::new) + .map(e -> (Member) e) + .toList(); + members = IntStream.range(0, cardinality) + .mapToObj(_ -> stereotomy.newIdentifier()) + .map(ControlledIdentifierMember::new) + .map(e -> (Member) e) + .toList(); + + final var prefix = UUID.randomUUID().toString(); + routers = members.stream() + .collect(Collectors.toMap(m -> m, m -> new LocalServer(prefix, m).router( + ServerConnectionCache.newBuilder().setTarget(cardinality * 2)))); + + var template = Parameters.newBuilder() + .setGenerateGenesis(true) + .setBootstrap(Parameters.BootstrapParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(50)) + .build()) + .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) + .setGossipDuration(Duration.ofMillis(50)) + .setProducer(Parameters.ProducerParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(50)) + .setBatchInterval(Duration.ofMillis(50)) + .setMaxBatchByteSize(1024 * 1024) + .setMaxBatchCount(10_000) + .setEthereal(Config.newBuilder() + .setNumberOfEpochs(3) + .setEpochLength(7)) + .build()) + .setCheckpointBlockDelta(checkpointBlockSize) + .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() + .setInitialBackoff(Duration.ofMillis(1)) + .setMaxBackoff(Duration.ofMillis(1))); + + members.subList(0, 4).forEach(m -> { + var context = (DynamicContext) contextBuilder.build(); + contexts.put(m, context); + + choams.put(m, constructCHOAM((SigningMember) m, template.clone().setGenerateGenesis(true), context)); + }); + members.subList(4, members.size()).forEach(m -> { + var context = (DynamicContext) contextBuilder.build(); + contexts.put(m, context); + choams.put(m, constructCHOAM((SigningMember) m, template.clone().setGenerateGenesis(false), context)); + }); + } + + @Test + public void smokin() throws Exception { + var bootstrap = members.subList(0, 4); + + bootstrap.forEach(member -> bootstrap.forEach(m -> contexts.get(member).activate(m))); + + bootstrap.parallelStream().forEach(member -> { + routers.get(member).start(); + choams.get(member).start(); + }); + boolean active = Utils.waitForCondition(10_000, 1_000, () -> bootstrap.stream() + .map(m -> choams.get(m)) + .allMatch(CHOAM::active)); + assertTrue(active, "Bootstrap did not become active, inactive: " + bootstrap.stream() + .map(m -> choams.get(m)) + .filter(c -> !c.active()) + .map(CHOAM::logState) + .toList()); + System.out.println("**"); + System.out.println("** Bootstrap active: " + bootstrap.stream().map(Member::getId).toList()); + System.out.println("**"); + + var next = members.subList(4, 7); + // Bootstrap group knows about the new members, but not vice versa + bootstrap.forEach(member -> next.forEach(m -> contexts.get(member).activate(m))); + // Next group just knows about itself, not the bootstrap group + next.forEach(member -> next.forEach(m -> contexts.get(member).activate(m))); + + Thread.sleep(2000); + + System.out.println("**"); + System.out.println("** Starting next 3: " + next.stream().map(Member::getId).toList()); + System.out.println("**"); + next.parallelStream().forEach(member -> { + routers.get(member).start(); + choams.get(member).start(); + }); + Thread.sleep(2000); + + System.out.println("**"); + System.out.println("** Next 3 joining: " + next.stream().map(Member::getId).toList()); + System.out.println("**"); + // now let the next members know about the bootstrap group + next.forEach(member -> bootstrap.forEach(m -> contexts.get(member).activate(m))); + + active = Utils.waitForCondition(30_000, 1_000, + () -> next.stream().map(m -> choams.get(m)).allMatch(CHOAM::active)); + assertTrue(active, "Next 3 did not become active, inactive: " + next.stream() + .map(m -> choams.get(m)) + .filter(c -> !c.active()) + .map(CHOAM::logState) + .toList()); + System.out.println("**"); + System.out.println("** Next 3 active: " + next.stream().map(Member::getId).toList()); + System.out.println("**"); + + var remaining = members.subList(7, members.size()); + // Bootstrap group knows about the new members, but not vice versa + bootstrap.forEach(member -> remaining.forEach(m -> contexts.get(member).activate(m))); + // the next group knows about the new members, but not vice versa + next.forEach(member -> remaining.forEach(m -> contexts.get(member).activate(m))); + + // the remaining group just knows about itself, not the bootstrap nor the next group + remaining.forEach(member -> remaining.forEach(m -> contexts.get(member).activate(m))); + + Thread.sleep(2000); + + System.out.println("**"); + System.out.println("** Starting: " + remaining.stream().map(Member::getId).toList()); + System.out.println("**"); + + remaining.parallelStream().forEach(member -> { + routers.get(member).start(); + choams.get(member).start(); + }); + Thread.sleep(2000); + + System.out.println("**"); + System.out.println("** Remaining group joining: " + remaining.stream().map(Member::getId).toList()); + System.out.println("**"); + // now let the remaining members know about the bootstrap group + remaining.forEach(member -> bootstrap.forEach(m -> contexts.get(member).activate(m))); + // and the next group + remaining.forEach(member -> next.forEach(m -> contexts.get(member).activate(m))); + + active = Utils.waitForCondition(30_000, 1_000, + () -> remaining.stream().map(m -> choams.get(m)).allMatch(CHOAM::active)); + assertTrue(active, "Remaining did not become active, inactive: " + remaining.stream() + .map(m -> choams.get(m)) + .filter(c -> !c.active()) + .map(CHOAM::logState) + .toList()); + System.out.println("**"); + System.out.println("** Remaining active: " + remaining.stream().map(Member::getId).toList()); + System.out.println("**"); + } + + @AfterEach + public void tearDown() throws Exception { + if (choams != null) { + choams.values().forEach(CHOAM::stop); + choams = null; + } + if (routers != null) { + routers.values().forEach(e -> e.close(Duration.ofSeconds(1))); + routers = null; + } + members = null; + } + + private CHOAM constructCHOAM(SigningMember m, Parameters.Builder params, Context context) { + final CHOAM.TransactionExecutor processor = (index, hash, t, f, executor) -> { + if (f != null) { + f.completeAsync(Object::new, executor); + } + }; + + params.getProducer().ethereal().setSigner(m); + return new CHOAM(params.build(Parameters.RuntimeParameters.newBuilder() + .setMember(m) + .setCommunications(routers.get(m)) + .setProcessor(processor) + .setContext(context) + .build())); + } +} diff --git a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java index 679ab9e5e..8e8f8ac2d 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java @@ -6,6 +6,7 @@ */ package com.salesforce.apollo.choam; +import com.google.protobuf.Empty; import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; @@ -31,7 +32,6 @@ import com.salesforce.apollo.stereotomy.mem.MemKeyStore; import org.joou.ULong; import org.junit.jupiter.api.Test; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.LoggerFactory; @@ -88,18 +88,10 @@ public void genesis() throws Exception { Map servers = members.stream().collect(Collectors.toMap(m -> m, m -> mock(Concierge.class))); servers.forEach((m, s) -> { - when(s.join(any(Digest.class), any(Digest.class))).then(new Answer() { - @Override - public ViewMember answer(InvocationOnMock invocation) throws Throwable { - KeyPair keyPair = params.getViewSigAlgorithm().generateKeyPair(); - final PubKey consensus = bs(keyPair.getPublic()); - return ViewMember.newBuilder() - .setId(m.getId().toDigeste()) - .setConsensusKey(consensus) - .setSignature(((Signer) m).sign(consensus.toByteString()).toSig()) - .build(); - - } + when(s.join(any(SignedViewMember.class), any(Digest.class))).then((Answer) invocation -> { + KeyPair keyPair = params.getViewSigAlgorithm().generateKeyPair(); + final PubKey consensus = bs(keyPair.getPublic()); + return Empty.getDefaultInstance(); }); }); @@ -146,6 +138,11 @@ public void onFailure() { // do nothing } + @Override + public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) { + return null; + } + @Override public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) { return null; diff --git a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java index 07d4ae6c1..f56f69436 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java @@ -16,6 +16,7 @@ import com.salesforce.apollo.context.DynamicContext; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; @@ -32,6 +33,8 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -55,16 +58,18 @@ public class MembershipTests { // ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Fsm.class)).setLevel(Level.TRACE); } - private Map choams; - private List members; - private Map routers; - private DynamicContext context; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); + private Map choams; + private List members; + private Map routers; + private DynamicContext context; @AfterEach public void after() throws Exception { shutdown(); members = null; context = null; + scheduler.shutdownNow(); } @Test @@ -91,7 +96,7 @@ public void genesisBootstrap() throws Exception { .filter( e -> !testSubject.getId().equals(e.getKey())) .map(Map.Entry::getValue) - .allMatch(c -> c.active())); + .allMatch(CHOAM::active)); assertTrue(active, "Group did not become active, test subject: " + testSubject.getId() + " txneer: " + txneer.getId() + " inactive: " + choams.entrySet() @@ -103,7 +108,7 @@ public void genesisBootstrap() throws Exception { .toList()); final var countdown = new CountDownLatch(1); - var transactioneer = new Transactioneer(txneer.getSession(), timeout, 1, countdown); + var transactioneer = new Transactioneer(scheduler, txneer.getSession(), timeout, 1, countdown); transactioneer.start(); assertTrue(countdown.await(30, TimeUnit.SECONDS), "Could not submit transaction"); @@ -141,6 +146,9 @@ public SigningMember initialize(int checkpointBlockSize, int cardinality) throws .setBatchInterval(Duration.ofMillis(10)) .setMaxBatchByteSize(1024 * 1024) .setMaxBatchCount(10_000) + .setEthereal(Config.newBuilder() + .setEpochLength(7) + .setNumberOfEpochs(3)) .build()) .setGenerateGenesis(true) .setCheckpointBlockDelta(checkpointBlockSize); diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index d29eea582..549a7087d 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -20,6 +20,7 @@ import com.salesforce.apollo.context.StaticContext; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; import com.salesforce.apollo.stereotomy.StereotomyImpl; @@ -69,6 +70,7 @@ public class TestCHOAM { private List members; private MetricRegistry registry; private Map routers; + private ScheduledExecutorService scheduler; @AfterEach public void after() throws Exception { @@ -80,12 +82,16 @@ public void after() throws Exception { choams.values().forEach(e -> e.stop()); choams = null; } + if (scheduler != null) { + scheduler.shutdown(); + } members = null; registry = null; } @BeforeEach public void before() throws Exception { + scheduler = Executors.newScheduledThreadPool(10); var origin = DigestAlgorithm.DEFAULT.getOrigin(); registry = new MetricRegistry(); var metrics = new ChoamMetricsImpl(origin, registry); @@ -102,11 +108,11 @@ public void before() throws Exception { .setMaxBatchByteSize(200 * 1024 * 1024) .setGossipDuration(Duration.ofMillis(10)) .setBatchInterval(Duration.ofMillis(50)) + .setEthereal(Config.newBuilder() + .setNumberOfEpochs(3) + .setEpochLength(7)) .build()) - .setCheckpointBlockDelta(1); - if (LARGE_TESTS) { - params.getProducer().ethereal().setNumberOfEpochs(5).setEpochLength(60); - } + .setCheckpointBlockDelta(3); checkpointOccurred = new CompletableFuture<>(); var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy); @@ -169,7 +175,7 @@ public void submitMultiplTxn() throws Exception { final var countdown = new CountDownLatch(clientCount * choams.size()); choams.values().forEach(c -> { for (int i = 0; i < clientCount; i++) { - transactioneers.add(new Transactioneer(c.getSession(), timeout, max, countdown)); + transactioneers.add(new Transactioneer(scheduler, c.getSession(), timeout, max, countdown)); } }); @@ -202,7 +208,7 @@ public void submitMultiplTxn() throws Exception { .build() .report(); } - assertTrue(checkpointOccurred.get()); + assertTrue(checkpointOccurred.get(5, TimeUnit.SECONDS)); } private Function wrap(Function checkpointer) { diff --git a/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java b/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java index eaa4fdc9c..7466042b8 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java @@ -25,8 +25,7 @@ class Transactioneer { private final static Random entropy = new Random(); private final static Logger log = LoggerFactory.getLogger(Transactioneer.class); private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); - private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual() - .factory()); + private final ScheduledExecutorService scheduler; private final AtomicInteger completed = new AtomicInteger(); private final CountDownLatch countdown; private final List> inFlight = new CopyOnWriteArrayList<>(); @@ -39,7 +38,9 @@ class Transactioneer { .build(); private final AtomicBoolean finished = new AtomicBoolean(); - Transactioneer(Session session, Duration timeout, int max, CountDownLatch countdown) { + Transactioneer(ScheduledExecutorService scheduler, Session session, Duration timeout, int max, + CountDownLatch countdown) { + this.scheduler = scheduler; this.session = session; this.timeout = timeout; this.max = max; diff --git a/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java b/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java index 0275f948c..e0a2b6233 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java @@ -63,6 +63,7 @@ public void smoke() throws Exception { var context = new StaticContext(DigestAlgorithm.DEFAULT.getOrigin(), 0.2, members, 3); TestChain testChain = new TestChain(bootstrapStore); testChain.genesis() + .checkpoint() .userBlocks(10) .viewChange() .userBlocks(10) diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml index c1cacf519..830a6923d 100644 --- a/choam/src/test/resources/logback-test.xml +++ b/choam/src/test/resources/logback-test.xml @@ -13,14 +13,23 @@ + + + + - + + - + + + + + @@ -28,23 +37,23 @@ - + - + - + - + - + diff --git a/cryptography/src/main/java/com/salesforce/apollo/cryptography/Verifier.java b/cryptography/src/main/java/com/salesforce/apollo/cryptography/Verifier.java index 3a198af47..48e94f796 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/cryptography/Verifier.java +++ b/cryptography/src/main/java/com/salesforce/apollo/cryptography/Verifier.java @@ -22,6 +22,8 @@ * @author hal.hildebrand */ public interface Verifier { + Verifier NO_VERIFIER = new NoVerifier(); + default Filtered filtered(SigningThreshold threshold, JohnHancock signature, byte[]... message) { return filtered(threshold, signature, BbBackedInputStream.aggregate(message)); } @@ -145,6 +147,29 @@ public boolean verify(SigningThreshold threshold, JohnHancock signature, InputSt } } + class NoVerifier implements Verifier { + + @Override + public Filtered filtered(SigningThreshold threshold, JohnHancock signature, InputStream message) { + return new Filtered(false, signature.signatureCount(), signature); + } + + @Override + public String toString() { + return ""; + } + + @Override + public boolean verify(SigningThreshold threshold, JohnHancock signature, InputStream message) { + return false; + } + + @Override + public boolean verify(JohnHancock signature, InputStream message) { + return false; + } + } + class MockVerifier implements Verifier { @Override diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 2f757a215..133bc29da 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -350,7 +350,8 @@ private void validate(BootstrapTrust trust, CompletableFuture gateway, Se if (gateway.complete( new Bound(hexBloom, trust.getSuccessorsList().stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList(), initialSeedSet.stream().map(sn -> new NoteWrapper(sn, digestAlgo)).toList()))) { - log.info("Gateway acquired: {} context: {} on: {}", hexBloom.compact(), this.context.getId(), node.getId()); + log.info("Gateway acquired: {} context: {} on: {}", hexBloom.compactWrapped(), this.context.getId(), + node.getId()); } } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 6ce83fd74..9129c1ac8 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -192,8 +192,8 @@ void install(Ballot ballot) { log.info( "Installed view: {} -> {} crown: {} for context: {} cardinality: {} count: {} pending: {} leaving: {} joining: {} on: {}", - previousView, currentView.get(), diadem.get(), context.getId(), cardinality(), context.allMembers().count(), - pending.size(), ballot.leaving.size(), ballot.joining.size(), node.getId()); + previousView, currentView.get(), diadem.get().compactWrapped(), context.getId(), cardinality(), + context.allMembers().count(), pending.size(), ballot.leaving.size(), ballot.joining.size(), node.getId()); view.notifyListeners(joining, ballot.leaving); } @@ -311,7 +311,7 @@ void join(Join join, Digest from, StreamObserver responseObserver, Time final var hex = bound.view(); log.info("Rebalancing to cardinality: {} (join) for: {} context: {} on: {}", hex.getCardinality(), - hex.compact(), context.getId(), node.getId()); + hex.compactWrapped(), context.getId(), node.getId()); context.rebalance(hex.getCardinality()); context.activate(node); diadem.set(hex); diff --git a/grpc/src/main/proto/choam.proto b/grpc/src/main/proto/choam.proto index 31c5af11d..4ff624da7 100644 --- a/grpc/src/main/proto/choam.proto +++ b/grpc/src/main/proto/choam.proto @@ -4,6 +4,9 @@ option java_multiple_files = true; option java_package = "com.salesforce.apollo.choam.proto"; option java_outer_classname = "ChoamProto"; option objc_class_prefix = "Chp"; + +import "google/protobuf/empty.proto"; + import "crypto.proto"; import "stereotomy.proto"; @@ -15,7 +18,7 @@ service TransactionSubmission { service Terminal { /* reconfiguration */ - rpc join (crypto.Digeste) returns (SignedViewMember) {} + rpc join (SignedViewMember) returns (google.protobuf.Empty) {} /* bootstrapping */ rpc sync(Synchronize) returns (Initial) {} @@ -47,6 +50,7 @@ message Block { Reconfigure reconfigure = 3; Checkpoint checkpoint = 4; Executions executions = 5; + Assemble assemble = 6; } } @@ -83,6 +87,10 @@ message Executions { repeated Transaction executions = 1; } +message Assemble { + View view = 1; +} + message FoundationSeal { stereotomy.KeyEvent_ foundation = 1; crypto.Sig signature = 2; @@ -130,7 +138,8 @@ message Views { message View { crypto.Digeste diadem = 1; - repeated crypto.Digeste committee = 2; + int32 majority = 2; + repeated crypto.Digeste committee = 3; } message SignedView { @@ -142,8 +151,9 @@ message SignedView { message ViewMember { crypto.Digeste id = 1; crypto.Digeste view = 2; - crypto.PubKey consensusKey = 3; - crypto.Sig signature = 4; + crypto.Digeste diadem = 3; + crypto.PubKey consensusKey = 4; + crypto.Sig signature = 5; } message SignedViewMember { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java index 9704c0b8d..375a4a179 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java @@ -57,6 +57,9 @@ public void evaluate(StreamObserver responseObserver, Consumer c) { } } c.accept(binding.service); + } catch (StatusRuntimeException sre) { + log.debug("Uncaught SRE in service evaluation: {} for context: {}", sre.getStatus(), context); + responseObserver.onError(sre); } catch (RejectedExecutionException e) { log.debug("Rejected execution context: {} on: {}", context, c); } catch (Throwable t) { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java index e24521423..e7506c4e9 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java @@ -197,7 +197,7 @@ public void start() { } catch (IOException e) { throw new IllegalStateException("Cannot start server", e); } - log.info("Started router: {}", server.getListenSockets()); + log.info("Started router: {} on: {}", server.getListenSockets(), from.getId()); } private RoutableService getRoutableService(Member member, Digest context, Service service, @@ -213,7 +213,7 @@ private RoutableService getRoutableService(Member member, Dig }); routing.bind(context, service, validator); contextRegistration.accept(context); - log.info("Communications created for: " + member.getId()); + log.info("Communications created for: {} on: {}", member.getId(), from.getId()); return routing; } diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index 91d3df275..86ef23faa 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -29,6 +29,7 @@ import com.salesforce.apollo.ring.RingCommunications; import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; +import io.grpc.StatusRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -234,6 +235,10 @@ private Reconcile gossipRound(ReliableBroadcast link, int ring) { try { return link.gossip( MessageBff.newBuilder().setRing(ring).setDigests(buffer.forReconcilliation().toBff()).build()); + } catch (StatusRuntimeException sre) { + log.trace("rbc gossiping[{}] failed: {} with: {} ring: {} on: {}", buffer.round(), sre.getStatus(), + link.getMember().getId(), ring, member.getId()); + return null; } catch (Throwable e) { log.trace("rbc gossiping[{}] failed with: {} ring: {} on: {}", buffer.round(), link.getMember().getId(), ring, member.getId(), e); @@ -276,6 +281,8 @@ private void handle(Optional result, roundListeners.values().forEach(l -> { try { l.accept(gossipRound); + } catch (StatusRuntimeException e) { + log.error("error: {} sending round() to listener on: {}", e.getStatus(), member.getId(), e); } catch (Throwable e) { log.error("error sending round() to listener on: {}", member.getId(), e); } @@ -318,13 +325,12 @@ public static Parameters.Builder newBuilder() { } public static class Builder implements Cloneable { - private int bufferSize = 1500; - + private int bufferSize = 1500; private int dedupBufferSize = 100; private double dedupFpr = Math.pow(10, -9); private int deliveredCacheSize = 100; private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT; - private double falsePositiveRate = 0.00125; + private double falsePositiveRate = 0.000125; private int maxMessages = 500; public Parameters build() { diff --git a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java index 1623aa18c..c7cf320ce 100644 --- a/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/messaging/rbc/RbcTest.java @@ -55,7 +55,7 @@ public class RbcTest { private static final boolean LARGE_TESTS = Boolean.getBoolean("large_tests"); private static final Parameters.Builder parameters = Parameters.newBuilder() .setMaxMessages(100) - .setFalsePositiveRate(0.0125) + .setFalsePositiveRate(0.00125) .setBufferSize(500) .setDedupBufferSize( LARGE_TESTS ? 100 * 100 : 50 * 50) diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java index 288d41173..2052db4f5 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java @@ -177,9 +177,9 @@ protected void stopServices() { Thread.currentThread().interrupt(); } hostedDomains.values().forEach(d -> d.stop()); - var portalELG = portalEventLoopGroup.shutdownGracefully(100, 1_000, TimeUnit.MILLISECONDS); - var serverELG = contextEventLoopGroup.shutdownGracefully(100, 1_000, TimeUnit.MILLISECONDS); - var clientELG = clientEventLoopGroup.shutdownGracefully(100, 1_000, TimeUnit.MILLISECONDS); + var portalELG = portalEventLoopGroup.shutdownGracefully(); + var serverELG = contextEventLoopGroup.shutdownGracefully(); + var clientELG = clientEventLoopGroup.shutdownGracefully(); try { if (clientELG.await(30, TimeUnit.SECONDS)) { log.info("Did not completely shutdown client event loop group for process: {}", member.getId()); diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index 26f8ce4f8..b3f1cd90b 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -121,7 +121,7 @@ public void stop() { protected BiConsumer listener() { return (context, diadem) -> { - choam.nextView(context, diadem); + choam.rotateViewKeys(context, diadem); log.info("View change: {} for: {} cardinality: {} on: {}", diadem, params.context().getId(), context.totalCount(), params.member().getId()); diff --git a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java index 5dd3cbf23..b4e85a6a8 100644 --- a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java @@ -11,13 +11,14 @@ import com.salesforce.apollo.archipelago.ServerConnectionCache; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; -import com.salesforce.apollo.choam.Parameters.ProducerParameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.proto.FoundationSeal; +import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.DynamicContextImpl; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.delphinius.Oracle; +import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; import com.salesforce.apollo.stereotomy.StereotomyImpl; import com.salesforce.apollo.stereotomy.identifier.spec.IdentifierSpecification; @@ -54,7 +55,7 @@ public class ContainmentDomainTest { public void after() { domains.forEach(Domain::stop); domains.clear(); - routers.forEach(r -> r.close(Duration.ofSeconds(1))); + routers.forEach(r -> r.close(Duration.ofSeconds(100))); routers.clear(); } @@ -90,13 +91,11 @@ public void before() throws Exception { "jdbc:h2:mem:%s-state".formatted(d), checkpointDirBase, Duration.ofMillis(10), 0.00125, Duration.ofMinutes(1), 3, 10, 0.1); - var domain = new ProcessContainerDomain(group, member, pdParams, params, RuntimeParameters.newBuilder() - .setFoundation( - sealed) - .setContext( - context) - .setCommunications( - localRouter), + var domain = new ProcessContainerDomain(group, member, pdParams, params.clone(), + RuntimeParameters.newBuilder() + .setFoundation(sealed) + .setContext(context) + .setCommunications(localRouter), new InetSocketAddress(0), commsDirectory, ffParams, IdentifierSpecification.newBuilder(), null); domains.add(domain); @@ -120,18 +119,27 @@ public void smoke() throws Exception { } private Builder params() { - var params = Parameters.newBuilder() - .setGenerateGenesis(true) - .setGenesisViewId(GENESIS_VIEW_ID) - .setGossipDuration(Duration.ofMillis(10)) - .setProducer(ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(20)) - .setBatchInterval(Duration.ofMillis(100)) - .setMaxBatchByteSize(1024 * 1024) - .setMaxBatchCount(3000) - .build()) - .setCheckpointBlockDelta(200); - params.getProducer().ethereal().setEpochLength(4).setNumberOfEpochs(4); - return params; + var template = Parameters.newBuilder() + .setGenerateGenesis(true) + .setGenesisViewId(GENESIS_VIEW_ID) + .setBootstrap(Parameters.BootstrapParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(50)) + .build()) + .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) + .setGossipDuration(Duration.ofMillis(50)) + .setProducer(Parameters.ProducerParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(50)) + .setBatchInterval(Duration.ofMillis(50)) + .setMaxBatchByteSize(1024 * 1024) + .setMaxBatchCount(10_000) + .setEthereal(Config.newBuilder() + .setNumberOfEpochs(3) + .setEpochLength(7)) + .build()) + .setCheckpointBlockDelta(200) + .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() + .setInitialBackoff(Duration.ofMillis(1)) + .setMaxBackoff(Duration.ofMillis(1))); + return template; } } diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index 45a6e9e47..3450e04c7 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -11,14 +11,15 @@ import com.salesforce.apollo.archipelago.ServerConnectionCache; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; -import com.salesforce.apollo.choam.Parameters.ProducerParameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.proto.FoundationSeal; +import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.DynamicContextImpl; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.delphinius.Oracle; import com.salesforce.apollo.delphinius.Oracle.Assertion; +import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; import com.salesforce.apollo.stereotomy.StereotomyImpl; import com.salesforce.apollo.stereotomy.mem.MemKERL; @@ -220,11 +221,13 @@ public void before() throws Exception { "jdbc:h2:mem:%s-state;DB_CLOSE_DELAY=-1".formatted( d), checkpointDirBase, Duration.ofMillis(10), 0.00125, Duration.ofMinutes(1), 3, 10, 0.1); - var domain = new ProcessDomain(group, member, pdParams, params, RuntimeParameters.newBuilder() - .setFoundation(sealed) - .setContext(context) - .setCommunications( - localRouter), + var domain = new ProcessDomain(group, member, pdParams, params.clone(), RuntimeParameters.newBuilder() + .setFoundation( + sealed) + .setContext( + context) + .setCommunications( + localRouter), new InetSocketAddress(0), ffParams, null); domains.add(domain); localRouter.start(); @@ -247,18 +250,27 @@ public void smoke() throws Exception { } private Builder params() { - var params = Parameters.newBuilder() - .setGenerateGenesis(true) - .setGenesisViewId(GENESIS_VIEW_ID) - .setGossipDuration(Duration.ofMillis(10)) - .setProducer(ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(20)) - .setBatchInterval(Duration.ofMillis(100)) - .setMaxBatchByteSize(1024 * 1024) - .setMaxBatchCount(3000) - .build()) - .setCheckpointBlockDelta(200); - params.getProducer().ethereal().setEpochLength(7).setNumberOfEpochs(3); - return params; + var template = Parameters.newBuilder() + .setGenerateGenesis(true) + .setGenesisViewId(GENESIS_VIEW_ID) + .setBootstrap(Parameters.BootstrapParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(50)) + .build()) + .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) + .setGossipDuration(Duration.ofMillis(50)) + .setProducer(Parameters.ProducerParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(50)) + .setBatchInterval(Duration.ofMillis(50)) + .setMaxBatchByteSize(1024 * 1024) + .setMaxBatchCount(10_000) + .setEthereal(Config.newBuilder() + .setNumberOfEpochs(3) + .setEpochLength(7)) + .build()) + .setCheckpointBlockDelta(200) + .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() + .setInitialBackoff(Duration.ofMillis(1)) + .setMaxBackoff(Duration.ofMillis(1))); + return template; } } diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index 509577801..f492fa0e0 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -11,14 +11,15 @@ import com.salesforce.apollo.archipelago.ServerConnectionCache; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.Builder; -import com.salesforce.apollo.choam.Parameters.ProducerParameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.proto.FoundationSeal; +import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContextImpl; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.delphinius.Oracle; +import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.fireflies.View.Seed; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; import com.salesforce.apollo.stereotomy.StereotomyImpl; @@ -89,11 +90,12 @@ public void before() throws Exception { "jdbc:h2:mem:%s-state".formatted(digest), checkpointDirBase, Duration.ofMillis(10), 0.00125, Duration.ofSeconds(5), 3, 10, 0.1); - var node = new ProcessDomain(group, member, pdParams, params, RuntimeParameters.newBuilder() - .setFoundation(sealed) - .setContext(context) - .setCommunications( - localRouter), + var node = new ProcessDomain(group, member, pdParams, params.clone(), RuntimeParameters.newBuilder() + .setFoundation( + sealed) + .setContext(context) + .setCommunications( + localRouter), new InetSocketAddress(0), ffParams, null); domains.add(node); routers.put(node, localRouter); @@ -169,19 +171,27 @@ public void smokin() throws Exception { } private Builder params() { - var params = Parameters.newBuilder() - .setGenerateGenesis(true) - .setGenesisViewId(GENESIS_VIEW_ID) - .setGossipDuration(Duration.ofMillis(50)) - .setProducer(ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(50)) - .setBatchInterval(Duration.ofMillis(100)) - .setMaxBatchByteSize(1024 * 1024) - .setMaxBatchCount(3000) - .build()) - .setCheckpointBlockDelta(200); - - params.getProducer().ethereal().setEpochLength(7).setNumberOfEpochs(3); - return params; + var template = Parameters.newBuilder() + .setGenerateGenesis(true) + .setGenesisViewId(GENESIS_VIEW_ID) + .setBootstrap(Parameters.BootstrapParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(50)) + .build()) + .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) + .setGossipDuration(Duration.ofMillis(50)) + .setProducer(Parameters.ProducerParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(50)) + .setBatchInterval(Duration.ofMillis(50)) + .setMaxBatchByteSize(1024 * 1024) + .setMaxBatchCount(10_000) + .setEthereal(Config.newBuilder() + .setNumberOfEpochs(3) + .setEpochLength(7)) + .build()) + .setCheckpointBlockDelta(200) + .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() + .setInitialBackoff(Duration.ofMillis(1)) + .setMaxBackoff(Duration.ofMillis(1))); + return template; } } diff --git a/sql-state/pom.xml b/sql-state/pom.xml index 1e73a795c..890ebc727 100644 --- a/sql-state/pom.xml +++ b/sql-state/pom.xml @@ -84,6 +84,14 @@ org.codehaus.mojo build-helper-maven-plugin + + org.apache.maven.plugins + maven-surefire-plugin + + 1 + false + + diff --git a/sql-state/src/main/java/com/salesforce/apollo/state/SqlStateMachine.java b/sql-state/src/main/java/com/salesforce/apollo/state/SqlStateMachine.java index 74eca970b..deb9aaa43 100644 --- a/sql-state/src/main/java/com/salesforce/apollo/state/SqlStateMachine.java +++ b/sql-state/src/main/java/com/salesforce/apollo/state/SqlStateMachine.java @@ -337,7 +337,7 @@ void initializeState() { } catch (LiquibaseException e) { throw new IllegalStateException("unable to initialize db state on: " + id, e); } - log.debug("Initialized state on: {}", url); + log.debug("Initialized state: {} on: {}", url, id); } private int[] acceptBatch(Batch batch) throws SQLException { @@ -408,7 +408,7 @@ private CallResult acceptCall(Call call) throws SQLException { exec.executeUpdate(); break; default: - log.debug("Invalid statement execution enum: {}", call.getExecution()); + log.debug("Invalid statement execution enum: {} on: {}", call.getExecution(), id); return new CallResult(out, results); } for (int j = 1; j <= call.getOutParametersCount(); j++) { @@ -483,7 +483,7 @@ private List acceptPreparedStatement(Statement statement) throws SQLE exec.executeUpdate(); break; default: - log.debug("Invalid statement execution enum: {}", statement.getExecution()); + log.debug("Invalid statement execution enum: {} on: {}", statement.getExecution(), id); return Collections.emptyList(); } CachedRowSet rowset = factory.createCachedRowSet(); @@ -562,7 +562,7 @@ private void beginBlock(ULong height, Digest hash) { begin(height, hash); withContext(() -> { updateCurrent(height, hash, -1, Digest.NONE); - log.debug("Begin block: {} hash: {} on: {}", height, hash, url); + log.debug("Begin block: {} hash: {} on: {}", height, hash, id); }); } @@ -635,7 +635,7 @@ private void exception(@SuppressWarnings("rawtypes") CompletableFuture onComplet private void execute(int index, Digest txnHash, Txn tx, @SuppressWarnings("rawtypes") CompletableFuture onCompletion, Executor executor) { - log.debug("executing: {}", tx.getExecutionCase()); + log.debug("executing: {} on: {}", tx.getExecutionCase(), id); var executing = executingBlock.get(); updateCurrent(executing.height, executing.blkHash, index, txnHash); @@ -702,7 +702,7 @@ private void initializeStatements() throws SQLException { deleteEvents = connection.prepareStatement(DELETE_FROM_APOLLO_INTERNAL_TRAMPOLINE); getEvents = connection.prepareStatement(SELECT_FROM_APOLLO_INTERNAL_TRAMPOLINE); updateCurrent = connection.prepareStatement(UPDATE_CURRENT); - log.info("Engine statements initialized"); + log.info("Engine statements initialized on: {}", id); } private baseAndAccessor liquibase(ChangeLog changeLog) throws IOException { @@ -995,7 +995,7 @@ public void execute(int index, Digest txnHash, Transaction tx, try { txn = Txn.parseFrom(tx.getContent()); } catch (InvalidProtocolBufferException e) { - log.warn("invalid txn: {}", tx, e); + log.warn("invalid txn: {} on: {}", tx, id, e); onComplete.completeExceptionally(e); return; } diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index dded314b5..45343e6de 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -20,6 +20,7 @@ import com.salesforce.apollo.context.DynamicContextImpl; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; @@ -41,6 +42,8 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -55,11 +58,11 @@ * @author hal.hildebrand */ abstract public class AbstractLifecycleTest { - protected static final int CARDINALITY = 5; - private static final Digest GENESIS_VIEW_ID = DigestAlgorithm.DEFAULT.digest( + protected static final int CARDINALITY = 5; + private static final Digest GENESIS_VIEW_ID = DigestAlgorithm.DEFAULT.digest( "Give me food or give me slack or kill me".getBytes()); - protected final AtomicReference checkpointHeight = new AtomicReference<>(); - protected final Map updaters = new HashMap<>(); + protected final AtomicReference checkpointHeight = new AtomicReference<>(); + protected final Map updaters = new HashMap<>(); // static { // ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Session.class)).setLevel(Level.TRACE); // ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(CHOAM.class)).setLevel(Level.TRACE); @@ -70,19 +73,20 @@ abstract public class AbstractLifecycleTest { // ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Fsm.class)).setLevel(Level.TRACE); // ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(TxDataSource.class)).setLevel(Level.TRACE); // } - private final List GENESIS_DATA; - private final Map parameters = new HashMap<>(); - protected SecureRandom entropy; - protected CountDownLatch checkpointOccurred; - protected Map choams; - protected List members; - protected Map routers; - protected SigningMember testSubject; - protected int toleranceLevel; + private final List GENESIS_DATA; + private final Map parameters = new HashMap<>(); + protected SecureRandom entropy; + protected CountDownLatch checkpointOccurred; + protected Map choams; + protected List members; + protected Map routers; + protected SigningMember testSubject; + protected int toleranceLevel; DynamicContextImpl context; - private File baseDir; - private File checkpointDirBase; - private List transactioneers; + private File baseDir; + private File checkpointDirBase; + private List transactioneers; + private ScheduledExecutorService scheduler; { var txns = MigrationTest.initializeBookSchema(); @@ -114,6 +118,10 @@ public void after() throws Exception { choams.values().forEach(e -> e.stop()); choams = null; } + if (scheduler != null) { + scheduler.shutdownNow(); + scheduler = null; + } updaters.values().forEach(up -> up.close()); updaters.clear(); parameters.clear(); @@ -124,6 +132,7 @@ public void after() throws Exception { @BeforeEach public void before() throws Exception { + scheduler = Executors.newScheduledThreadPool(10); checkpointOccurred = new CountDownLatch(CARDINALITY); checkpointDirBase = new File("target/ct-chkpoints-" + Entropy.nextBitsStreamLong()); Utils.clean(checkpointDirBase); @@ -300,7 +309,8 @@ protected void pre() throws Exception { .toList())); var mutator = txneer.getMutator(choams.get(members.getLast().getId()).getSession()); - transactioneers.add(new Transactioneer(() -> update(entropy, mutator), mutator, timeout, 1, countdown)); + transactioneers.add( + new Transactioneer(scheduler, () -> update(entropy, mutator), mutator, timeout, 1, countdown)); System.out.println("Transaction member: " + members.getLast().getId()); System.out.println("Starting txns"); transactioneers.stream().forEach(e -> e.start()); @@ -356,6 +366,9 @@ private Builder parameters(Context context) { .setBatchInterval(Duration.ofMillis(10)) .setMaxBatchByteSize(1024 * 1024) .setMaxBatchCount(3000) + .setEthereal(Config.newBuilder() + .setEpochLength(7) + .setNumberOfEpochs(3)) .build()) .setGossipDuration(Duration.ofMillis(10)) .setCheckpointBlockDelta(checkpointBlockSize()) diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java index 1e264a6f7..433ef9acb 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java @@ -81,6 +81,7 @@ public class CHOAMTest { private List members; private MetricRegistry registry; private Map routers; + private ScheduledExecutorService scheduler; private static Txn initialInsert() { return Txn.newBuilder() @@ -102,6 +103,10 @@ public void after() throws Exception { choams.values().forEach(e -> e.stop()); choams = null; } + if (scheduler != null) { + scheduler.shutdownNow(); + scheduler = null; + } updaters.values().forEach(up -> up.close()); updaters.clear(); members = null; @@ -117,6 +122,7 @@ public void after() throws Exception { @BeforeEach public void before() throws Exception { + scheduler = Executors.newScheduledThreadPool(10); registry = new MetricRegistry(); checkpointDirBase = new File("target/ct-chkpoints-" + Entropy.nextBitsStreamLong()); Utils.clean(checkpointDirBase); @@ -184,7 +190,7 @@ public void submitMultiplTxn() throws Exception { var mutator = e.getValue().getMutator(choams.get(e.getKey().getId()).getSession()); for (int i = 0; i < clientCount; i++) { transactioneers.add( - new Transactioneer(() -> update(entropy, mutator), mutator, timeout, max, countdown)); + new Transactioneer(scheduler, () -> update(entropy, mutator), mutator, timeout, max, countdown)); } }); System.out.println("Starting txns"); diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CheckpointBootstrapTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CheckpointBootstrapTest.java index 6b00a6385..fda84de71 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CheckpointBootstrapTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CheckpointBootstrapTest.java @@ -7,6 +7,7 @@ package com.salesforce.apollo.state; import com.salesforce.apollo.context.DynamicContext; +import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.utils.Utils; import org.joou.ULong; import org.junit.jupiter.api.Test; @@ -34,10 +35,10 @@ public class CheckpointBootstrapTest extends AbstractLifecycleTest { public void checkpointBootstrap() throws Exception { pre(); - checkpointOccurred.await(30, TimeUnit.SECONDS); + checkpointOccurred.await(10, TimeUnit.SECONDS); ULong chkptHeight = checkpointHeight.get(); - assertNotNull(chkptHeight, "Null checkpoint height!"); + assertNotNull(chkptHeight, "No checkpoint"); System.out.println("Checkpoint at height: " + chkptHeight); var processed = Utils.waitForCondition(10_000, 1_000, () -> { @@ -64,11 +65,11 @@ public void checkpointBootstrap() throws Exception { System.out.println("Starting late joining node"); var choam = choams.get(testSubject.getId()); - ((DynamicContext) choam.context().delegate()).activate(testSubject); + ((DynamicContext) choam.context().delegate()).activate(testSubject); choam.start(); routers.get(testSubject.getId()).start(); - assertTrue(Utils.waitForCondition(30_000, 1_000, () -> choam.active()), + assertTrue(Utils.waitForCondition(30_000, 1_000, choam::active), "Test subject did not become active: " + choam.logState()); members.add(testSubject); post(); diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java b/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java index 19c3ded10..8140fb295 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java @@ -21,12 +21,11 @@ import java.util.function.Supplier; class Transactioneer { - private final static Random entropy = new Random(); - private final static Logger log = LoggerFactory.getLogger(Transactioneer.class); - private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual() - .factory()); + private final static Random entropy = new Random(); + private final static Logger log = LoggerFactory.getLogger(Transactioneer.class); + private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); - private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final ScheduledExecutorService scheduler; private final AtomicInteger completed = new AtomicInteger(); private final CountDownLatch countdown; private final AtomicReference inFlight = new AtomicReference<>(); @@ -36,7 +35,9 @@ class Transactioneer { private final Supplier update; private final AtomicBoolean finished = new AtomicBoolean(); - public Transactioneer(Supplier update, Mutator mutator, Duration timeout, int max, CountDownLatch countdown) { + public Transactioneer(ScheduledExecutorService scheduler, Supplier update, Mutator mutator, Duration timeout, + int max, CountDownLatch countdown) { + this.scheduler = scheduler; this.update = update; this.timeout = timeout; this.max = max; diff --git a/sql-state/src/test/resources/logback-test.xml b/sql-state/src/test/resources/logback-test.xml index 02b9f6e38..798240567 100644 --- a/sql-state/src/test/resources/logback-test.xml +++ b/sql-state/src/test/resources/logback-test.xml @@ -24,7 +24,7 @@ - + @@ -36,11 +36,15 @@ + + + + - +