diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 9fa49f0ba..91f74ffa4 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -15,10 +15,6 @@ jobs: key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-maven- - - name: Set up Maven - uses: stCarolas/setup-maven@v4.5 - with: - maven-version: 3.9.4 - uses: actions/checkout@v3 - uses: graalvm/setup-graalvm@v1 with: @@ -27,4 +23,4 @@ jobs: cache: 'maven' github-token: ${{ secrets.GITHUB_TOKEN }} - name: Build with Maven - run: mvn -batch-mode clean install -Ppre --file pom.xml + run: ./mvnw -batch-mode clean install -Ppre --file pom.xml diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 09fde0e52..03c431f71 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -87,10 +87,12 @@ public class CHOAM { private final Combine.Transitions transitions; private final TransSubmission txnSubmission = new TransSubmission(); private final AtomicReference view = new AtomicReference<>(); + private final AtomicReference diadem = new AtomicReference<>(); public CHOAM(Parameters params) { this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build()); this.params = params; + diadem.set(params.digestAlgorithm().getLast()); executions = Executors.newVirtualThreadPerTaskExecutor(); nextView(); @@ -173,10 +175,10 @@ public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmen return cp; } - public static Block genesis(Digest id, Map joins, HashedBlock head, Context context, - HashedBlock lastViewChange, Parameters params, HashedBlock lastCheckpoint, - Iterable initialization) { - var reconfigure = reconfigure(id, joins, context, params, params.checkpointBlockDelta()); + public static Block genesis(Digest id, Digest diadem, Map joins, HashedBlock head, + Context context, HashedBlock lastViewChange, Parameters params, + HashedBlock lastCheckpoint, Iterable initialization) { + var reconfigure = reconfigure(id, diadem, joins, context, params, params.checkpointBlockDelta()); return Block.newBuilder() .setHeader(buildHeader(params.digestAlgorithm(), reconfigure, head.hash, ULong.valueOf(0), lastCheckpoint.height(), lastCheckpoint.hash, lastViewChange.height(), @@ -194,9 +196,12 @@ public static String print(Join join, DigestAlgorithm da) { + "certifications: " + join.getEndorsementsList().stream().map(c -> ViewContext.print(c, da)).toList() + "]"; } - public static Reconfigure reconfigure(Digest nextViewId, Map joins, Context context, - Parameters params, int checkpointTarget) { - var builder = Reconfigure.newBuilder().setCheckpointTarget(checkpointTarget).setId(nextViewId.toDigeste()); + public static Reconfigure reconfigure(Digest nextViewId, Digest diadem, Map joins, + Context context, Parameters params, int checkpointTarget) { + var builder = Reconfigure.newBuilder() + .setCheckpointTarget(checkpointTarget) + .setId(nextViewId.toDigeste()) + .setView(diadem.toDigeste()); // Canonical labeling of the view members for Ethereal var remapped = rosterMap(context, joins.keySet()); @@ -207,14 +212,14 @@ public static Reconfigure reconfigure(Digest nextViewId, Map joins return reconfigure; } - public static Block reconfigure(Digest nextViewId, Map joins, HashedBlock head, + public static Block reconfigure(Digest nextViewId, Digest diadem, Map joins, HashedBlock head, Context context, HashedBlock lastViewChange, Parameters params, HashedBlock lastCheckpoint) { final Block lvc = lastViewChange.block; int lastTarget = lvc.hasGenesis() ? lvc.getGenesis().getInitialView().getCheckpointTarget() : lvc.getReconfigure().getCheckpointTarget(); int checkpointTarget = lastTarget == 0 ? params.checkpointBlockDelta() : lastTarget - 1; - var reconfigure = reconfigure(nextViewId, joins, context, params, checkpointTarget); + var reconfigure = reconfigure(nextViewId, diadem, joins, context, params, checkpointTarget); return Block.newBuilder() .setHeader(buildHeader(params.digestAlgorithm(), reconfigure, head.hash, head.height().add(1), lastCheckpoint.height(), lastCheckpoint.hash, lastViewChange.height(), @@ -294,11 +299,17 @@ public String logState() { params.member().getId()); } + public void setDiadem(Digest diadem) { + log.info("Set diadem: {} on: {}", diadem, params.member().getId()); + this.diadem.set(diadem); + } + public void start() { if (!started.compareAndSet(false, true)) { return; } - log.info("CHOAM startup, majority: {} on: {}", params.majority(), params.member().getId()); + log.info("CHOAM startup: {} diadem: {}, majority: {} on: {}", params.context().getId(), diadem.get(), + params.majority(), params.member().getId()); combine.start(params.producer().gossipDuration()); transitions.fsm().enterStartState(); transitions.start(); @@ -463,33 +474,40 @@ public Block checkpoint() { return CHOAM.this.checkpoint(); } + @Override + public Digest diadem() { + return diadem.get(); + } + @Override public Block genesis(Map joining, Digest nextViewId, HashedBlock previous) { final HashedCertifiedBlock cp = checkpoint.get(); final HashedCertifiedBlock v = view.get(); - return CHOAM.genesis(nextViewId, joining, previous, params.context(), v, params, cp, + var current = diadem.get(); + log.info("Create genesis: {}", diadem.get()); + return CHOAM.genesis(nextViewId, current, joining, previous, params.context(), v, params, cp, params.genesisData().apply(joining)); } @Override - public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) { + public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) { final HashedCertifiedBlock v = view.get(); return Block.newBuilder() .setHeader( - buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(), + buildHeader(params.digestAlgorithm(), executions, prev, height, checkpoint.height(), checkpoint.hash, v.height(), v.hash)) - .setAssemble(assemble) + .setExecutions(executions) .build(); } @Override - public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) { + public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) { final HashedCertifiedBlock v = view.get(); return Block.newBuilder() .setHeader( - buildHeader(params.digestAlgorithm(), executions, prev, height, checkpoint.height(), + buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(), checkpoint.hash, v.height(), v.hash)) - .setExecutions(executions) + .setAssemble(assemble) .build(); } @@ -504,7 +522,8 @@ public void publish(CertifiedBlock cb) { public Block reconfigure(Map joining, Digest nextViewId, HashedBlock previous, HashedBlock checkpoint) { final HashedCertifiedBlock v = view.get(); - return CHOAM.reconfigure(nextViewId, joining, previous, params.context(), v, params, checkpoint); + return CHOAM.reconfigure(nextViewId, diadem.get(), joining, previous, params.context(), v, params, + checkpoint); } }; } @@ -612,9 +631,10 @@ private void nextView() { log.error("Unable to generate and sign consensus key on: {}", params.member().getId()); return; } - log.trace("Generated next view consensus key: {} sig: {} on: {}", + var current = diadem.get(); + log.trace("Generated next view consensus key: {} sig: {} diadem: {} on: {}", params.digestAlgorithm().digest(pubKey.getEncoded()), - params.digestAlgorithm().digest(signed.toSig().toByteString()), params.member().getId()); + params.digestAlgorithm().digest(signed.toSig().toByteString()), current, params.member().getId()); next.set(new nextView(ViewMember.newBuilder() .setId(params.member().getId().toDigeste()) .setConsensusKey(pubKey) @@ -631,7 +651,13 @@ private void process() { case ASSEMBLE: { params.processor().beginBlock(h.height(), h.hash); nextViewId.set(Digest.from(h.block.getAssemble().getNextView())); - log.info("Next view id: {} on: {}", nextViewId.get(), params.member().getId()); + var ass = Digest.from(h.block.getAssemble().getDiadem()); + var current = diadem.getAndSet(ass); + if (!current.equals(ass)) { + log.info("Next view id: {} diadem: {} does match current: {} on: {}", nextViewId.get(), ass, current, + params.member().getId()); + } + log.info("Next view id: {} diadem: {} on: {}", nextViewId.get(), diadem.get(), params.member().getId()); c.assembled(); break; } @@ -935,12 +961,14 @@ private void synchronizedProcess(CertifiedBlock certifiedBlock) { public interface BlockProducer { Block checkpoint(); - Block genesis(Map joining, Digest nextViewId, HashedBlock previous); + Digest diadem(); - Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint); + Block genesis(Map joining, Digest nextViewId, HashedBlock previous); Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint); + Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint); + void publish(CertifiedBlock cb); Block reconfigure(Map joining, Digest nextViewId, HashedBlock previous, HashedBlock checkpoint); @@ -1129,7 +1157,7 @@ public ViewMember join(Digest nextView, Digest from) { log.debug("Joining view: {} from: {} view member: {} on: {}", nextView, from, ViewContext.print(c.member, params.digestAlgorithm()), params.member().getId()); } - return c.member; + return ViewMember.newBuilder(c.member).setDiadem(diadem.get().toDigeste()).build(); } @Override @@ -1144,6 +1172,14 @@ public Parameters params() { @Override public SubmitResult submitTxn(Transaction transaction) { + if (!servers.hasNext()) { + log.trace("Failed submitting txn: {} no servers available in: {} on: {}", + hashOf(transaction, params.digestAlgorithm()), viewId, params.member().getId()); + return SubmitResult.newBuilder() + .setResult(Result.ERROR_SUBMITTING) + .setErrorMsg("no servers available") + .build(); + } Member target = servers.next(); try (var link = submissionComm.connect(target)) { if (link == null) { @@ -1268,11 +1304,12 @@ public ViewMember join(Digest nextView, Digest from) { return ViewMember.getDefaultInstance(); } final var c = next.get(); + var cd = diadem.get(); if (log.isDebugEnabled()) { - log.debug("Joining view: {} from: {} view member: {} on: {}", nextView, from, + log.debug("Joining view: {} diadem: {} from: {} view member: {} on: {}", nextView, cd, from, ViewContext.print(c.member, params.digestAlgorithm()), params.member().getId()); } - return c.member; + return ViewMember.newBuilder(c.member).setDiadem(cd.toDigeste()).build(); } @Override diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index b918b40c8..c6f121cc8 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -243,10 +243,14 @@ private void produceAssemble() { final var vlb = previousBlock.get(); nextViewId = vlb.hash; nextAssembly.addAll(Committee.viewMembersOf(nextViewId, params().context())); + var diadem = view.diadem(); + log.debug("Assembling: {} diadem: {} on: {}", nextViewId, diadem, params().member().getId()); final var assemble = new HashedBlock(params().digestAlgorithm(), view.produce(vlb.height().add(1), vlb.hash, Assemble.newBuilder() .setNextView( vlb.hash.toDigeste()) + .setDiadem( + diadem.toDigeste()) .build(), checkpoint.get())); previousBlock.set(assemble); @@ -255,8 +259,9 @@ private void produceAssemble() { pending.put(assemble.hash, p); p.witnesses.put(params().member(), validation); ds.offer(validation); - log.debug("View assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash, - assemble.height(), assemble.block.getBodyCase(), getViewId(), params().member().getId()); + log.debug("View assembly: {} diadem: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, diadem, + assemble.hash, assemble.height(), assemble.block.getBodyCase(), getViewId(), + params().member().getId()); } private void publish(PendingBlock p) { @@ -368,13 +373,14 @@ public void produceAssemble() { @Override public void reconfigure() { - log.debug("Starting view reconfiguration for: {} on: {}", nextViewId, params().member().getId()); + log.debug("Starting view reconfiguration: {} diadem: {} on: {}", nextViewId, view.diadem(), + params().member().getId()); assembly.set(new ViewAssembly(nextViewId, view, Producer.this::addReassemble, comms) { @Override public void complete() { super.complete(); - log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(), - params().member().getId()); + log.debug("View reconfiguration: {} diadem: {} gathered: {} complete on: {}", nextViewId, + view.diadem(), getSlate().size(), params().member().getId()); assembled.set(true); Producer.this.transitions.viewComplete(); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index c4bf544c8..1b6040bdc 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -198,7 +198,13 @@ private Reassemble join(ViewMember vm) { } return null; } - + final var hex = Digest.from(vm.getDiadem()); + var diadem = view.diadem(); + if (!diadem.equals(hex)) { + log.warn("Invalid diadem: {} not equivalent to: {} vm: {} on: {}", hex, diadem, + ViewContext.print(vm, params().digestAlgorithm()), params().member().getId()); + return null; + } PubKey encoded = vm.getConsensusKey(); if (!m.verify(signature(vm.getSignature()), encoded.toByteString())) { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java index 8800c1d72..7022345d3 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java @@ -74,6 +74,10 @@ public Context context() { return context; } + public Digest diadem() { + return blockProducer == null ? DigestAlgorithm.DEFAULT.getLast() : blockProducer.diadem(); + } + public Validate generateValidation(HashedBlock block) { log.trace("Signing block: {} height: {} on: {}", block.hash, block.height(), params.member().getId()); JohnHancock signature = signer.sign(block.block.getHeader().toByteString()); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java index d43b06e32..d7a70d594 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java @@ -140,7 +140,8 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { if (!cps.validate(diadem, Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()))) { throw new IllegalStateException("Cannot validate checkpoint: " + checkpoint.height()); } - log.info("Restored checkpoint: {} diadem: {} on: {}", checkpoint.height(), diadem, params.member().getId()); + log.info("Restored checkpoint: {} diadem: {} on: {}", checkpoint.height(), diadem.compact(), + params.member().getId()); checkpointState = cps; }); // reconstruct chain to genesis diff --git a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java index 95d681d14..304f6341e 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java @@ -6,8 +6,6 @@ */ package com.salesforce.apollo.choam; -import com.salesforce.apollo.choam.proto.*; -import com.salesforce.apollo.cryptography.proto.PubKey; import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; @@ -18,10 +16,12 @@ import com.salesforce.apollo.choam.comm.Terminal; import com.salesforce.apollo.choam.comm.TerminalClient; import com.salesforce.apollo.choam.comm.TerminalServer; +import com.salesforce.apollo.choam.proto.*; import com.salesforce.apollo.choam.support.HashedBlock; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.Signer; +import com.salesforce.apollo.cryptography.proto.PubKey; import com.salesforce.apollo.membership.Context; import com.salesforce.apollo.membership.ContextImpl; import com.salesforce.apollo.membership.Member; @@ -136,10 +136,15 @@ public Block checkpoint() { return null; } + @Override + public Digest diadem() { + return DigestAlgorithm.DEFAULT.getLast(); + } + @Override public Block genesis(Map joining, Digest nextViewId, HashedBlock previous) { - return CHOAM.genesis(viewId, joining, previous, committee, previous, built, previous, - Collections.emptyList()); + return CHOAM.genesis(viewId, DigestAlgorithm.DEFAULT.getLast(), joining, previous, committee, + previous, built, previous, Collections.emptyList()); } @Override diff --git a/choam/src/test/java/com/salesforce/apollo/choam/ViewAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/ViewAssemblyTest.java index 82e3d0bff..a4926251f 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/ViewAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/ViewAssemblyTest.java @@ -2,9 +2,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; -import com.salesforce.apollo.choam.proto.Reassemble; -import com.salesforce.apollo.choam.proto.ViewMember; -import com.salesforce.apollo.cryptography.proto.PubKey; import com.salesforce.apollo.archipelago.LocalServer; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; @@ -14,7 +11,10 @@ import com.salesforce.apollo.choam.comm.Terminal; import com.salesforce.apollo.choam.comm.TerminalClient; import com.salesforce.apollo.choam.comm.TerminalServer; +import com.salesforce.apollo.choam.proto.Reassemble; +import com.salesforce.apollo.choam.proto.ViewMember; import com.salesforce.apollo.cryptography.*; +import com.salesforce.apollo.cryptography.proto.PubKey; import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.ethereal.DataSource; import com.salesforce.apollo.ethereal.Ethereal; @@ -53,16 +53,16 @@ public class ViewAssemblyTest { - private static short CARDINALITY = 4; - private Map assemblies = new HashMap<>(); - private Map communications = new HashMap<>(); - private CountDownLatch complete; - private Context context; - private List controllers = new ArrayList<>(); - private Map dataSources; - private List gossipers = new ArrayList<>(); - private List members; - private Digest nextViewId; + private static final short CARDINALITY = 4; + private final Map assemblies = new HashMap<>(); + private final Map communications = new HashMap<>(); + private final List controllers = new ArrayList<>(); + private final List gossipers = new ArrayList<>(); + private CountDownLatch complete; + private Context context; + private Map dataSources; + private List members; + private Digest nextViewId; @AfterEach public void after() { @@ -130,6 +130,7 @@ private void buildAssemblies() { public ViewMember answer(InvocationOnMock invocation) throws Throwable { return ViewMember.newBuilder() .setId(m.getId().toDigeste()) + .setDiadem(DigestAlgorithm.DEFAULT.getLast().toDigeste()) .setConsensusKey(consensus) .setSignature(((Signer) m).sign(consensus.toByteString()).toSig()) .build(); @@ -213,7 +214,7 @@ private List process(List preblock, Boolean last) { } private static class VDataSource implements DataSource { - private BlockingQueue outbound = new ArrayBlockingQueue<>(100); + private final BlockingQueue outbound = new ArrayBlockingQueue<>(100); @Override public ByteString getData() { diff --git a/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java b/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java index 620812665..d452b0774 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java +++ b/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java @@ -6,8 +6,8 @@ */ package com.salesforce.apollo.bloomFilters; -import com.salesforce.apollo.cryptography.proto.Biff; import com.salesforce.apollo.cryptography.Digest; +import com.salesforce.apollo.cryptography.proto.Biff; import org.joou.ULong; import java.util.BitSet; @@ -116,7 +116,8 @@ public boolean contains(T element) { } public boolean equivalent(BloomFilter other) { - return h.equivalent(other.h) && bits.equals(other.bits); + var equiv = h.equivalent(other.h) && bits.equals(other.bits); + return equiv; } public double fpp(int n) { diff --git a/cryptography/src/main/java/com/salesforce/apollo/cryptography/HexBloom.java b/cryptography/src/main/java/com/salesforce/apollo/cryptography/HexBloom.java index 1149fdeeb..90714cf0d 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/cryptography/HexBloom.java +++ b/cryptography/src/main/java/com/salesforce/apollo/cryptography/HexBloom.java @@ -6,9 +6,9 @@ */ package com.salesforce.apollo.cryptography; -import com.salesforce.apollo.cryptography.proto.HexBloome; import com.salesforce.apollo.bloomFilters.BloomFilter; import com.salesforce.apollo.bloomFilters.Primes; +import com.salesforce.apollo.cryptography.proto.HexBloome; import java.util.Arrays; import java.util.Collections; @@ -71,6 +71,14 @@ public HexBloom(int cardinality, List crowns, BloomFilter member this.cardinality = cardinality; } + public HexBloom() { + this(DigestAlgorithm.DEFAULT.getLast(), 1); + } + + public HexBloom(Digest initial) { + this(initial, 1); + } + /** * Construct a HexBloom from the supplied parameters, using default hash functions and fpr * @@ -236,6 +244,7 @@ public static HexBloom construct(List currentMembership, List ad } public static HexBloom from(HexBloome hb) { + assert !HexBloome.getDefaultInstance().equals(hb); return new HexBloom(hb); } @@ -280,6 +289,9 @@ public static List> hashWraps(int crowns) { } public Digest compact() { + if (crowns.length == 1) { + return crowns[0]; + } return Arrays.asList(crowns).stream().reduce(crowns[0].getAlgorithm().getOrigin(), (a, b) -> a.xor(b)); } diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Adder.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Adder.java index d07c0a86b..4b2b3fd0f 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Adder.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Adder.java @@ -6,38 +6,26 @@ */ package com.salesforce.apollo.ethereal; -import static com.salesforce.apollo.ethereal.Creator.parentsOnPreviousLevel; -import static com.salesforce.apollo.ethereal.PreUnit.id; +import com.salesforce.apollo.bloomFilters.BloomFilter; +import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; +import com.salesforce.apollo.cryptography.Digest; +import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.cryptography.JohnHancock; +import com.salesforce.apollo.cryptography.Signer; +import com.salesforce.apollo.cryptography.proto.Biff; +import com.salesforce.apollo.ethereal.proto.*; +import com.salesforce.apollo.utils.Entropy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.salesforce.apollo.ethereal.proto.Commit; -import com.salesforce.apollo.ethereal.proto.Have; -import com.salesforce.apollo.ethereal.proto.Missing; -import com.salesforce.apollo.ethereal.proto.PreUnit_s; -import com.salesforce.apollo.ethereal.proto.PreVote; -import com.salesforce.apollo.ethereal.proto.SignedCommit; -import com.salesforce.apollo.ethereal.proto.SignedPreVote; -import com.salesforce.apollo.cryptography.proto.Biff; -import com.salesforce.apollo.cryptography.Digest; -import com.salesforce.apollo.cryptography.DigestAlgorithm; -import com.salesforce.apollo.cryptography.JohnHancock; -import com.salesforce.apollo.cryptography.Signer; -import com.salesforce.apollo.utils.Entropy; -import com.salesforce.apollo.bloomFilters.BloomFilter; -import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; +import static com.salesforce.apollo.ethereal.Creator.parentsOnPreviousLevel; +import static com.salesforce.apollo.ethereal.PreUnit.id; /** * Implements the chain Reliable Broadcast of Aleph. @@ -48,23 +36,24 @@ */ public class Adder { - private static final Logger log = LoggerFactory.getLogger(Adder.class); - private final Map> commits = new TreeMap<>(); - private final Config conf; - private final Dag dag; - private final int epoch; - private final Set failed; - private final ReentrantLock lock = new ReentrantLock(true); - private final int maxSize; - private final Map> missing = new TreeMap<>(); - private final Map> prevotes = new TreeMap<>(); - private final Map signedCommits = new TreeMap<>(); - private final Map signedPrevotes = new TreeMap<>(); - private final int threshold; - private final Map waiting = new TreeMap<>(); - private final Map waitingById = new TreeMap<>(); - private final Map waitingForRound = new TreeMap<>(); - private volatile int round = 0; + private static final Logger log = LoggerFactory.getLogger(Adder.class); + private final Map> commits = new TreeMap<>(); + private final Config conf; + private final Dag dag; + private final int epoch; + private final Set failed; + private final ReentrantLock lock = new ReentrantLock(true); + private final int maxSize; + private final Map> missing = new TreeMap<>(); + private final Map> prevotes = new TreeMap<>(); + private final Map signedCommits = new TreeMap<>(); + private final Map signedPrevotes = new TreeMap<>(); + private final int threshold; + private final Map waiting = new TreeMap<>(); + private final Map waitingById = new TreeMap<>(); + private final Map waitingForRound = new TreeMap<>(); + private volatile int round = 0; + public Adder(int epoch, Dag dag, int maxSize, Config conf, Set failed) { this.epoch = epoch; this.dag = dag; @@ -574,7 +563,7 @@ private boolean decodeParents(Waiting wp) { case DUPLICATE_UNIT: case UNKNOWN_PARENTS: return false; - case ABIGUOUS_PARENTS: + case AMBIGUOUS_PARENTS: case COMPLIANCE_ERROR: case DATA_ERROR: removeFailed(wp, decoded); @@ -589,7 +578,7 @@ private boolean decodeParents(Waiting wp) { return false; } var parents = decoded.parents(); - var digests = Stream.of(parents).map(e -> e == null ? (Digest) null : e.hash()).map(e -> e).toList(); + var digests = Stream.of(parents).map(e -> e == null ? null : e.hash()).map(e -> e).toList(); Digest calculated = Digest.combine(conf.digestAlgorithm(), digests.toArray(new Digest[digests.size()])); if (!calculated.equals(wp.pu().view().controlHash())) { removeFailed(wp); @@ -807,7 +796,7 @@ private boolean validateParents(Waiting wp) { * FAILED can occur at each state transition */ public enum State { - COMMITTED, FAILED, OUTPUT, PREVOTED, PROPOSED, WAITING_FOR_PARENTS, WAITING_ON_ROUND; + COMMITTED, FAILED, OUTPUT, PREVOTED, PROPOSED, WAITING_FOR_PARENTS, WAITING_ON_ROUND } public record Signed(Digest hash, T signed) { diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Correctness.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Correctness.java index 83a87a206..22c49d30c 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Correctness.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Correctness.java @@ -8,5 +8,5 @@ package com.salesforce.apollo.ethereal; public enum Correctness { - ABIGUOUS_PARENTS, COMPLIANCE_ERROR, CORRECT, DATA_ERROR, DUPLICATE_PRE_UNIT, DUPLICATE_UNIT, UNKNOWN_PARENTS; -} \ No newline at end of file + AMBIGUOUS_PARENTS, COMPLIANCE_ERROR, CORRECT, DATA_ERROR, DUPLICATE_PRE_UNIT, DUPLICATE_UNIT, UNKNOWN_PARENTS +} diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Dag.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Dag.java index 02356644e..956191664 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Dag.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Dag.java @@ -6,14 +6,16 @@ */ package com.salesforce.apollo.ethereal; -import static com.salesforce.apollo.ethereal.PreUnit.decode; +import com.salesforce.apollo.bloomFilters.BloomFilter; +import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; +import com.salesforce.apollo.cryptography.Digest; +import com.salesforce.apollo.ethereal.PreUnit.DecodedId; +import com.salesforce.apollo.ethereal.proto.PreUnit_s; +import com.salesforce.apollo.membership.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -24,21 +26,13 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.salesforce.apollo.ethereal.proto.PreUnit_s; -import com.salesforce.apollo.cryptography.Digest; -import com.salesforce.apollo.ethereal.PreUnit.DecodedId; -import com.salesforce.apollo.membership.Context; -import com.salesforce.apollo.bloomFilters.BloomFilter; -import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; +import static com.salesforce.apollo.ethereal.PreUnit.decode; /** * @author hal.hildebrand */ public interface Dag { - static final Logger log = LoggerFactory.getLogger(Dag.class); + Logger log = LoggerFactory.getLogger(Dag.class); static short threshold(int np) { var nProcesses = (double) np; @@ -113,7 +107,7 @@ static boolean validate(int nProc) { void write(Runnable r); - public interface Decoded { + interface Decoded { default Correctness classification() { return Correctness.CORRECT; } @@ -127,7 +121,7 @@ default Unit[] parents() { } } - public class DagImpl implements Dag { + class DagImpl implements Dag { private final List> checks = new ArrayList<>(); private final Config config; @@ -449,7 +443,7 @@ private void updateMaximal(Unit u) { } } - public record DecodedR(Unit[] parents) implements Decoded { + record DecodedR(Unit[] parents) implements Decoded { @Override public boolean inError() { return false; @@ -462,6 +456,7 @@ record DagInfo(int epoch, int[] heights) { class fiberMap { private final List content = new ArrayList<>(); private final short width; + fiberMap(short width) { this.width = width; } @@ -578,7 +573,7 @@ record AmbiguousParents(List units) implements Decoded { @Override public Correctness classification() { - return Correctness.ABIGUOUS_PARENTS; + return Correctness.AMBIGUOUS_PARENTS; } } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 6f64e7ac0..edafc6106 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -54,6 +54,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -89,7 +90,9 @@ public class View { private final DigestAlgorithm digestAlgo; private final RingCommunications gossiper; private final AtomicBoolean introduced = new AtomicBoolean(); - private final Map lifecycleListeners = new HashMap<>(); + private final List lifecycleListeners = new CopyOnWriteArrayList<>(); + private final Executor viewNotificationQueue = Executors.newSingleThreadExecutor( + Thread.ofVirtual().factory()); private final FireflyMetrics metrics; private final Node node; private final Map observations = new ConcurrentSkipListMap<>(); @@ -160,12 +163,10 @@ public static boolean isValidMask(BitSet mask, Context context) { } /** - * Deregister the listener with the supplied id - * - * @param listenerId + * Deregister the listener */ - public void deregister(UUID listenerId) { - lifecycleListeners.remove(listenerId); + public void deregister(ViewLifecycleListener listener) { + lifecycleListeners.remove(listener); } /** @@ -183,15 +184,12 @@ public Digest getNodeId() { } /** - * Register a listener to receive view change events + * Register the listener to receive view change events * * @param listener - the ViewChangeListener to receive events - * @return the UUID identifying this listener */ - public UUID register(ViewLifecycleListener listener) { - final var id = UUID.randomUUID(); - lifecycleListeners.put(id, listener); - return id; + public void register(ViewLifecycleListener listener) { + lifecycleListeners.add(listener); } /** @@ -410,15 +408,18 @@ void introduced() { void notifyListeners(List joining, List leaving) { final var current = currentView(); - lifecycleListeners.forEach((id, listener) -> { - try { - log.trace("Notifying view change: {} listener: {} cardinality: {} joins: {} leaves: {} on: {} ", - currentView(), id, context.totalCount(), joining.size(), leaving.size(), node.getId()); - listener.viewChange(context, current, joining, leaving); - } catch (Throwable e) { - log.error("error in view change listener: {} on: {} ", id, node.getId(), e); - } - }); + viewNotificationQueue.execute(Utils.wrapped(() -> { + lifecycleListeners.forEach(listener -> { + try { + log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ", listener, + currentView(), context.totalCount(), joining.size(), leaving.size(), node.getId()); + listener.viewChange(i -> context.getMember(i.getDigest()), current, viewManagement.cardinality(), + joining, leaving); + } catch (Throwable e) { + log.error("error in view change listener: {} on: {} ", listener, node.getId(), e); + } + }); + }, log)); } /** @@ -1476,13 +1477,14 @@ public interface ViewLifecycleListener { /** * Notification of a view change event * - * @param context - the context for which the view change has occurred - * @param viewId - the Digest identity of the new view - * @param joins - the list of joining member's id - * @param leaves - the list of leaving member's id + * @param members - the source of Members for supplied identifiers + * @param viewId - the compact Digest identifying the new view + * @param cardinality - the cardinality of the new view + * @param joins - the list of joining member's id + * @param leaves - the list of leaving member's id */ - void viewChange(Context context, Digest viewId, List joins, - List leaves); + void viewChange(Function members, Digest viewId, int cardinality, + List joins, List leaves); } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index bb7117334..61811b4a2 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -49,6 +49,7 @@ public class ViewManagement { private static final Logger log = LoggerFactory.getLogger( ViewManagement.class); + final AtomicReference diadem = new AtomicReference<>(); private final AtomicInteger attempt = new AtomicInteger(); private final Digest bootstrapView; private final Context context; @@ -62,7 +63,6 @@ public class ViewManagement { private final AtomicReference vote = new AtomicReference<>(); private final Lock joinLock = new ReentrantLock(); private final AtomicReference currentView = new AtomicReference<>(); - private final AtomicReference diadem = new AtomicReference<>(); private boolean bootstrap; private CompletableFuture onJoined; diff --git a/grpc/src/main/proto/choam.proto b/grpc/src/main/proto/choam.proto index 76f9aae79..c811abae1 100644 --- a/grpc/src/main/proto/choam.proto +++ b/grpc/src/main/proto/choam.proto @@ -69,8 +69,9 @@ message Genesis { message Reconfigure { crypto.Digeste id = 1; - int32 checkpointTarget = 2; - repeated Join joins = 3; + crypto.Digeste view = 2; + int32 checkpointTarget = 3; + repeated Join joins = 4; } message Checkpoint { @@ -86,6 +87,7 @@ message Executions { message Assemble { crypto.Digeste nextView = 1; + crypto.Digeste diadem = 2; } message FoundationSeal { @@ -115,8 +117,9 @@ message Join { message ViewMember { crypto.Digeste id = 1; - crypto.PubKey consensusKey = 2; - crypto.Sig signature = 3; + crypto.Digeste diadem = 2; + crypto.PubKey consensusKey = 3; + crypto.Sig signature = 4; } message Certification { diff --git a/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java b/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java index 469cce981..a0b09094a 100644 --- a/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java +++ b/isolates/src/test/java/com/salesforce/apollo/demesnes/FireFliesTrace.java @@ -19,9 +19,7 @@ import com.salesforce.apollo.delphinius.Oracle; import com.salesforce.apollo.delphinius.Oracle.Assertion; import com.salesforce.apollo.fireflies.View; -import com.salesforce.apollo.fireflies.View.Participant; import com.salesforce.apollo.fireflies.View.Seed; -import com.salesforce.apollo.membership.Context; import com.salesforce.apollo.membership.ContextImpl; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; import com.salesforce.apollo.model.ProcessContainerDomain; @@ -43,6 +41,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -228,14 +227,14 @@ public void smokin() throws Exception { var listener = new View.ViewLifecycleListener() { @Override - public void viewChange(Context context, Digest viewId, - List joins, List leaves) { - if (context.totalCount() == CARDINALITY) { - System.out.printf("Full view: %s members: %s on: %s%n", viewId, context.totalCount(), + public void viewChange(Function context, Digest viewId, + int cardinality, List joins, List leaves) { + if (cardinality == CARDINALITY) { + System.out.printf("Full view: %s members: %s on: %s%n", viewId, cardinality, d.getMember().getId()); countdown.countDown(); } else { - System.out.printf("Members joining: %s members: %s on: %s%n", viewId, context.totalCount(), + System.out.printf("Members joining: %s members: %s on: %s%n", viewId, cardinality, d.getMember().getId()); } } @@ -245,7 +244,9 @@ public void viewChange(Context context, Digest viewId, // start seed final var started = new AtomicReference<>(new CountDownLatch(1)); - domains.get(0).getFoundation().start(() -> started.get().countDown(), gossipDuration, Collections.emptyList()); + domains.getFirst() + .getFoundation() + .start(() -> started.get().countDown(), gossipDuration, Collections.emptyList()); if (!started.get().await(10, TimeUnit.SECONDS)) { throw new IllegalStateException("Cannot start up kernel"); } diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/GroupIterator.java b/memberships/src/main/java/com/salesforce/apollo/membership/GroupIterator.java index cb580cdd6..a94bc9b6b 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/GroupIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/GroupIterator.java @@ -6,29 +6,31 @@ */ package com.salesforce.apollo.membership; +import com.salesforce.apollo.utils.Entropy; + import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.Semaphore; -import com.salesforce.apollo.utils.Entropy; - /** - * Simple iterator on a group of members, randomly shuffling the membership list - * after each complete iteration - * - * @author hal.hildebrand + * Simple iterator on a group of members, randomly shuffling the membership list after each complete iteration * + * @author hal.hildebrand */ public class GroupIterator { - private volatile int current = 0; - private final List group; - private final Semaphore exclusive = new Semaphore(1); + private final List group; + private final Semaphore exclusive = new Semaphore(1); + private volatile int current = 0; public GroupIterator(Collection group) { this.group = new ArrayList<>(group); } + public boolean hasNext() { + return !group.isEmpty(); + } + public Member next() { exclusive.acquireUninterruptibly(); try { diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java index 6f04e7061..710750db8 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java @@ -146,8 +146,8 @@ public SelfAddressingIdentifier spawn(DemesneParameters.Builder prototype) { @Override protected View.ViewLifecycleListener listener() { var delegate = super.listener(); - return (context, id, join, leaving) -> { - delegate.viewChange(context, id, join, leaving); + return (context, diadem, id, join, leaving) -> { + delegate.viewChange(context, diadem, id, join, leaving); log.info("View change: {} for: {} joining: {} leaving: {} on: {}", id, params.context().getId(), join.size(), leaving.size(), params.member().getId()); }; diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index 829810dda..3048247c0 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -29,7 +29,6 @@ import java.nio.file.Path; import java.time.Duration; import java.time.Instant; -import java.util.UUID; import java.util.concurrent.RejectedExecutionException; /** @@ -44,10 +43,10 @@ public class ProcessDomain extends Domain { private final static Logger log = LoggerFactory.getLogger(ProcessDomain.class); protected final KerlDHT dht; protected final View foundation; - private final UUID listener; - private final EventValidation.DelegatedValidation validation; + private final EventValidation.DelegatedValidation validations; private final Verifiers.DelegatedVerifiers verifiers; private final ProcessDomainParameters parameters; + private final ViewLifecycleListener listener = listener(); public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDomainParameters pdParams, Builder builder, Parameters.RuntimeParameters.Builder runtime, InetSocketAddress endpoint, @@ -64,11 +63,11 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDom dht = new KerlDHT(parameters.dhtOpsFrequency, params.context(), member, connectionPool, params.digestAlgorithm(), params.communications(), parameters.dhtOperationsTimeout, parameters.dhtFpr, stereotomyMetrics); - validation = new EventValidation.DelegatedValidation(EventValidation.NONE); + validations = new EventValidation.DelegatedValidation(EventValidation.NONE); verifiers = new Verifiers.DelegatedVerifiers(Verifiers.NONE); - this.foundation = new View(base, getMember(), endpoint, validation, verifiers, params.communications(), + this.foundation = new View(base, getMember(), endpoint, validations, verifiers, params.communications(), ff.build(), DigestAlgorithm.DEFAULT, null); - listener = foundation.register(listener()); + foundation.register(listener); } public KerlDHT getDht() { @@ -83,24 +82,16 @@ public CertificateWithPrivateKey provision(Duration duration, SignatureAlgorithm return member.getIdentifier().provision(Instant.now(), duration, signatureAlgorithm); } - public void setAniValidation() { - validation.setDelegate(dht.getAni().eventValidation(parameters.dhtEventValidTO)); + public void setAniValidations() { + validations.setDelegate(dht.getAni().eventValidation(parameters.dhtEventValidTO)); } public void setDhtVerifiers() { verifiers.setDelegate(dht.getVerifiers()); } - public void setValidation(EventValidation delegate) { - validation.setDelegate(delegate); - } - - public void setValidationNONE() { - validation.setDelegate(EventValidation.NONE); - } - - public void setVerifiers(Verifiers v) { - verifiers.setDelegate(v); + public void setValidationsNONE() { + validations.setDelegate(EventValidation.NONE); } public void setVerifiersNONE() { @@ -125,13 +116,14 @@ public void stop() { } protected ViewLifecycleListener listener() { - return (context, id, join, leaving) -> { + return (context, id, cardinality, join, leaving) -> { for (var d : join) { - params.context().activate(context.getMember(d.getDigest())); + params.context().activate(context.apply(d)); } for (var d : leaving) { params.context().remove(d); } + choam.setDiadem(id); log.info("View change: {} for: {} joining: {} leaving: {} on: {}", id, params.context().getId(), join.size(), leaving.size(), params.member().getId()); diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index 3bdf3b80c..c4ea05526 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -18,9 +18,7 @@ import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.delphinius.Oracle; import com.salesforce.apollo.fireflies.View; -import com.salesforce.apollo.fireflies.View.Participant; import com.salesforce.apollo.fireflies.View.Seed; -import com.salesforce.apollo.membership.Context; import com.salesforce.apollo.membership.ContextImpl; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; import com.salesforce.apollo.stereotomy.StereotomyImpl; @@ -41,6 +39,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -114,14 +113,14 @@ public void smokin() throws Exception { var listener = new View.ViewLifecycleListener() { @Override - public void viewChange(Context context, Digest viewId, - List joins, List leaves) { - if (context.totalCount() == CARDINALITY) { - System.out.printf("Full view: %s members: %s on: %s%n", viewId, context.totalCount(), + public void viewChange(Function context, Digest viewId, + int cardinality, List joins, List leaves) { + if (cardinality == CARDINALITY) { + System.out.printf("Full view: %s members: %s on: %s%n", viewId, cardinality, d.getMember().getId()); countdown.countDown(); } else { - System.out.printf("Members joining: %s members: %s on: %s%n", viewId, context.totalCount(), + System.out.printf("Members joining: %s members: %s on: %s%n", viewId, cardinality, d.getMember().getId()); } }