From 77d99b5b876df6c3ec18bc00ab92f933f763657f Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sat, 13 Apr 2024 20:30:52 -0700 Subject: [PATCH] Simplify genesis and view reconfiguration remove unnecessary signatures n' such. --- .../com/salesforce/apollo/choam/CHOAM.java | 35 ++-- .../apollo/choam/GenesisAssembly.java | 111 ++--------- .../com/salesforce/apollo/choam/Producer.java | 66 ++++--- .../salesforce/apollo/choam/ViewAssembly.java | 179 ++++-------------- .../salesforce/apollo/choam/ViewContext.java | 6 +- .../salesforce/apollo/choam/fsm/Genesis.java | 20 +- .../apollo/choam/support/TxDataSource.java | 4 +- .../apollo/choam/GenesisAssemblyTest.java | 4 +- grpc/src/main/proto/choam.proto | 10 +- 9 files changed, 134 insertions(+), 301 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index a751be158..436bedeb3 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -184,7 +184,7 @@ public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmen return cp; } - public static Block genesis(Digest id, Map joins, HashedBlock head, Context context, + public static Block genesis(Digest id, Map joins, HashedBlock head, Context context, HashedBlock lastViewChange, Parameters params, HashedBlock lastCheckpoint, Iterable initialization) { var reconfigure = reconfigure(id, joins, context, params.checkpointBlockDelta()); @@ -202,26 +202,23 @@ public static Digest hashOf(Transaction transaction, DigestAlgorithm digestAlgor public static String print(Join join, DigestAlgorithm da) { return "J[view: " + Digest.from(join.getMember().getVm().getView()) + " member: " + ViewContext.print( - join.getMember(), da) + "certifications: " + join.getEndorsementsList() - .stream() - .map(c -> ViewContext.print(c, da)) - .toList() + "]"; + join.getMember(), da) + "]"; } - public static Reconfigure reconfigure(Digest nextViewId, Map joins, Context context, + public static Reconfigure reconfigure(Digest nextViewId, Map joins, Context context, int checkpointTarget) { var builder = Reconfigure.newBuilder().setCheckpointTarget(checkpointTarget).setId(nextViewId.toDigeste()); // Canonical labeling of the view members for Ethereal var remapped = rosterMap(context, joins.keySet()); - remapped.keySet().stream().sorted().map(remapped::get).forEach(m -> builder.addJoins(joins.get(m))); + remapped.keySet().stream().sorted().map(remapped::get).forEach(m -> builder.addJoins(joins.get(m.getId()))); var reconfigure = builder.build(); return reconfigure; } - public static Block reconfigure(Digest nextViewId, Map joins, HashedBlock head, + public static Block reconfigure(Digest nextViewId, Map joins, HashedBlock head, Context context, HashedBlock lastViewChange, Parameters params, HashedBlock lastCheckpoint) { final Block lvc = lastViewChange.block; @@ -237,10 +234,8 @@ public static Block reconfigure(Digest nextViewId, Map joins, Hash .build(); } - public static Map rosterMap(Context baseContext, Collection members) { - - // Canonical labeling of the view members for Ethereal - return members.stream().collect(Collectors.toMap(Member::getId, m -> m)); + public static Map rosterMap(Context baseContext, Collection members) { + return members.stream().collect(Collectors.toMap(m -> m, m -> baseContext.getMember(m))); } public static List toGenesisData(List initializationData) { @@ -497,11 +492,17 @@ public Block checkpoint() { } @Override - public Block genesis(Map joining, Digest nextViewId, HashedBlock previous) { + public Block genesis(Map joining, Digest nextViewId, HashedBlock previous) { final HashedCertifiedBlock cp = checkpoint.get(); final HashedCertifiedBlock v = view.get(); var g = CHOAM.genesis(nextViewId, joining, previous, params.context(), v, params, cp, - params.genesisData().apply(joining)); + params.genesisData() + .apply(joining.keySet() + .stream() + .map(m -> params.context().getMember(m)) + .filter(m -> m != null) + .collect( + Collectors.toMap(m -> m, m -> joining.get(m.getId()))))); log.info("Create genesis: {} on: {}", nextViewId, params.member().getId()); return g; } @@ -548,7 +549,7 @@ public void publish(Digest hash, CertifiedBlock cb) { } @Override - public Block reconfigure(Map joining, Digest nextViewId, HashedBlock previous, + public Block reconfigure(Map joining, Digest nextViewId, HashedBlock previous, HashedBlock checkpoint) { final HashedCertifiedBlock v = view.get(); var block = CHOAM.reconfigure(nextViewId, joining, previous, pendingView().get(), v, params, @@ -1008,7 +1009,7 @@ private void synchronizedProcess(CertifiedBlock certifiedBlock) { public interface BlockProducer { Block checkpoint(); - Block genesis(Map joining, Digest nextViewId, HashedBlock previous); + Block genesis(Map joining, Digest nextViewId, HashedBlock previous); void onFailure(); @@ -1018,7 +1019,7 @@ public interface BlockProducer { void publish(Digest hash, CertifiedBlock cb); - Block reconfigure(Map joining, Digest nextViewId, HashedBlock previous, HashedBlock checkpoint); + Block reconfigure(Map joining, Digest nextViewId, HashedBlock previous, HashedBlock checkpoint); } @FunctionalInterface diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index 9b004deb7..9ff7eaae3 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -44,19 +44,19 @@ * @author hal.hildebrand */ public class GenesisAssembly implements Genesis { - private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class); + private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class); private final Ethereal controller; private final ChRbcGossip coordinator; private final SignedViewMember genesisMember; private final Map nextAssembly; - private final Map proposals = new ConcurrentHashMap<>(); - private final AtomicBoolean published = new AtomicBoolean(); - private final Map slate = new ConcurrentHashMap<>(); - private final AtomicBoolean started = new AtomicBoolean(); + private final AtomicBoolean published = new AtomicBoolean(); + private final Map slate = new ConcurrentHashMap<>(); + private final AtomicBoolean started = new AtomicBoolean(); private final Transitions transitions; private final ViewContext view; - private final Map witnesses = new ConcurrentHashMap<>(); + private final Map witnesses = new ConcurrentHashMap<>(); private final OneShot ds; + private final List pendingValidations = new ArrayList<>(); private volatile Thread blockingThread; private volatile HashedBlock reconfiguration; @@ -104,13 +104,9 @@ public GenesisAssembly(ViewContext vc, CommonCommunications comms, @Override public void certify() { - proposals.values() - .stream() - .filter(p -> p.certifications.size() == nextAssembly.size()) - .forEach(p -> slate.put(p.member(), joinOf(p))); if (slate.size() != nextAssembly.size()) { log.info("Not certifying genesis for: {} slate incomplete: {} on: {}", view.context().getId(), - slate.keySet().stream().map(m -> m.getId()).toList(), params().member().getId()); + slate.keySet().stream().sorted().toList(), params().member().getId()); return; } assert slate.size() == nextAssembly.size() : "Expected: %s members, slate: %s".formatted(nextAssembly.size(), @@ -120,8 +116,10 @@ public void certify() { params().digestAlgorithm()))); var validate = view.generateValidation(reconfiguration); log.debug("Certifying genesis block: {} for: {} slate: {} on: {}", reconfiguration.hash, view.context().getId(), - slate.keySet().stream().map(m -> m.getId()).toList(), params().member().getId()); + slate.keySet().stream().sorted().toList(), params().member().getId()); ds.setValue(validate.toByteString()); + witnesses.put(params().member(), validate); + pendingValidations.forEach(v -> certify(v)); } @Override @@ -139,15 +137,8 @@ public void certify(List preblock, boolean last) { @Override public void gather() { log.info("Gathering next assembly on: {}", params().member().getId()); - var certification = view.generateValidation(genesisMember).getWitness(); - var join = Join.newBuilder() - .setMember(genesisMember) - .addEndorsements(certification) - .setKerl(params().kerl().get()) - .build(); - var proposed = new Proposed(join, params().member()); - proposed.certifications.put(params().member(), certification); - proposals.put(params().member().getId(), proposed); + var join = Join.newBuilder().setMember(genesisMember).setKerl(params().kerl().get()).build(); + slate.put(params().member().getId(), join); ds.setValue(join.toByteString()); coordinator.start(params().producer().gossipDuration()); @@ -172,19 +163,6 @@ public void gather(List preblock, boolean last) { .forEach(this::join); } - @Override - public void nominate() { - var validations = Validations.newBuilder(); - proposals.values() - .stream() - .filter(p -> !p.member.equals(params().member())) - .map(p -> view.generateValidation(p.join.getMember())) - .forEach(validations::addValidations); - ds.setValue(validations.build().toByteString()); - log.info("Nominations of: {} validations: {} on: {}", params().context().getId(), - validations.getValidationsCount(), params().member().getId()); - } - @Override public void nominations(List preblock, boolean last) { preblock.stream() @@ -198,8 +176,7 @@ public void nominations(List preblock, boolean last) { }) .filter(Objects::nonNull) .flatMap(vs -> vs.getValidationsList().stream()) - .filter(v -> !v.equals(Validate.getDefaultInstance())) - .forEach(this::validate); + .filter(v -> !v.equals(Validate.getDefaultInstance())); } @Override @@ -258,6 +235,9 @@ public void stop() { } private void certify(Validate v) { + if (reconfiguration == null) { + pendingValidations.add(v); + } log.trace("Validating reconfiguration block: {} height: {} on: {}", reconfiguration.hash, reconfiguration.height(), params().member().getId()); if (!view.validate(reconfiguration, v)) { @@ -326,64 +306,15 @@ private void join(Join join) { ViewContext.print(svm, params().digestAlgorithm()), params().member().getId()); return; } - if (log.isTraceEnabled()) { - log.trace("Valid view member: {} on: {}", ViewContext.print(svm, params().digestAlgorithm()), - params().member().getId()); - } - var proposed = proposals.computeIfAbsent(mid, k -> new Proposed(join, m)); - if (join.getEndorsementsList().size() == 1) { - proposed.certifications.computeIfAbsent(m, k -> join.getEndorsements(0)); + if (slate.putIfAbsent(m.getId(), join) == null) { + if (log.isTraceEnabled()) { + log.trace("Add view member: {} to slate on: {}", ViewContext.print(svm, params().digestAlgorithm()), + params().member().getId()); + } } } - private Join joinOf(Proposed candidate) { - final List witnesses = candidate.certifications.values() - .stream() - .sorted( - Comparator.comparing(c -> new Digest(c.getId()))) - .collect(Collectors.toList()); - return Join.newBuilder(candidate.join).clearEndorsements().addAllEndorsements(witnesses).build(); - } - private Parameters params() { return view.params(); } - - private void validate(Validate v) { - final var cid = Digest.from(v.getWitness().getId()); - var certifier = view.context().getMember(cid); - if (certifier == null) { - log.warn("Unknown certifier: {} on: {}", cid, params().member().getId()); - return; // do not have the join yet - } - final var vid = Digest.from(v.getHash()); - final var member = nextAssembly.get(vid); - if (member == null) { - return; - } - var proposed = proposals.get(vid); - if (proposed == null) { - log.warn("Invalid certification, unknown view join: {} on: {}", vid, params().member().getId()); - return; // do not have the join yet - } - if (!view.validate(proposed.join.getMember(), v)) { - log.warn("Invalid certification for view join: {} from: {} on: {}", vid, - Digest.from(v.getWitness().getId()), params().member().getId()); - return; - } - var prev = proposed.certifications.put(certifier, v.getWitness()); - if (prev == null) { - log.debug("New validation of view member: {} using certifier: {} witnesses: {} on: {}", member.getId(), - certifier.getId(), proposed.certifications.values().size(), params().member().getId()); - } else { - log.debug("Redundant validation of view member: {} hash: {} using certifier: {} on: {}", member.getId(), - vid, certifier.getId(), params().member().getId()); - } - } - - private record Proposed(Join join, Member member, Map certifications) { - public Proposed(Join join, Member member) { - this(join, member, new HashMap<>()); - } - } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 0ff834cb1..dd6954b33 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -21,6 +21,7 @@ import com.salesforce.apollo.choam.support.TxDataSource; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; +import com.salesforce.apollo.cryptography.JohnHancock; import com.salesforce.apollo.ethereal.Config; import com.salesforce.apollo.ethereal.Config.Builder; import com.salesforce.apollo.ethereal.Ethereal; @@ -30,10 +31,8 @@ import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -53,9 +52,9 @@ public class Producer { private final ChRbcGossip coordinator; private final TxDataSource ds; private final int lastEpoch; - private final Set nextAssembly = new HashSet<>(); + private final Map nextAssembly = new HashMap<>(); private final Map pending = new ConcurrentSkipListMap<>(); - private final BlockingQueue pendingReassembles = new LinkedBlockingQueue<>(); + private final List pendingJoins = new CopyOnWriteArrayList<>(); private final Map> pendingValidations = new ConcurrentSkipListMap<>(); private final AtomicReference previousBlock = new AtomicReference<>(); private final AtomicBoolean reconfigured = new AtomicBoolean(); @@ -161,9 +160,11 @@ public SubmitResult submit(Transaction transaction) { } private void addReassemble(Reassemble r) { - log.trace("Adding reassembly members: {} validations: {} on: {}", r.getMembersCount(), r.getValidationsCount(), - params().member().getId()); - ds.offer(r); + if (ds.offer(r)) { + log.trace("Adding joins: {} on: {}", r.getMembersList(), params().member().getId()); + } else { + log.trace("Cannot add joins: {} on: {}", r.getMembersCount(), params().member().getId()); + } } private void create(List preblock, boolean last) { @@ -188,19 +189,14 @@ private void create(List preblock, boolean last) { .filter(p -> p.witnesses.size() >= params().majority()) .forEach(this::publish); - var reass = Reassemble.newBuilder(); - aggregate.stream() - .flatMap(e -> e.getReassembliesList().stream()) - .forEach(r -> reass.addAllMembers(r.getMembersList()).addAllValidations(r.getValidationsList())); + var joins = aggregate.stream().flatMap(e -> e.getJoinsList().stream()).filter(j -> validate(j)).toList(); final var ass = assembly.get(); if (ass != null) { - log.trace("Consuming reassemblies: {} members: {} validations: {} on: {}", aggregate.size(), - reass.getMembersCount(), reass.getValidationsCount(), params().member().getId()); - ass.inbound().accept(Collections.singletonList(reass.build())); + log.trace("Consuming joins: {} on: {}", aggregate.size(), joins.size(), params().member().getId()); + ass.inbound().accept(joins); } else { - log.trace("Pending reassemblies: {} members: {} validations: {} on: {}", aggregate.size(), - reass.getMembersCount(), reass.getValidationsCount(), params().member().getId()); - pendingReassembles.add(reass.build()); + log.trace("Pending joins: {} on: {}", aggregate.size(), joins.size(), params().member().getId()); + pendingJoins.addAll(joins); } HashedBlock lb = previousBlock.get(); @@ -261,7 +257,9 @@ private void processPendingValidations(HashedBlock block, PendingBlock p) { private void produceAssemble() { final var vlb = previousBlock.get(); nextViewId = vlb.hash; - nextAssembly.addAll(Committee.viewMembersOf(nextViewId, view.pendingView())); + for (var m : Committee.viewMembersOf(nextViewId, view.pendingView())) { + nextAssembly.put(m.getId(), m); + } log.debug("Assembling: {} on: {}", nextViewId, params().member().getId()); final var assemble = new HashedBlock(params().digestAlgorithm(), view.produce(vlb.height().add(1), vlb.hash, Assemble.newBuilder() @@ -293,6 +291,28 @@ private void publish(PendingBlock p) { view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), cb)); } + private boolean validate(SignedJoin join) { + var mid = Digest.from(join.getMember()); + var m = nextAssembly.get(mid); + if (m == null) { + log.trace("Cannot validate join view: {} of: {} signed by: {} on: {}", + Digest.from(join.getJoin().getVm().getView()), Digest.from(join.getJoin().getVm().getId()), mid, + params().member().getId()); + return false; + } + var validated = m.verify(JohnHancock.from(join.getSignature()), join.getJoin().toByteString()); + if (!validated) { + log.trace("Cannot validate view join: {} of: {} signed by: {} on: {}", + Digest.from(join.getJoin().getVm().getView()), Digest.from(join.getJoin().getVm().getId()), mid, + params().member().getId()); + } else { + log.trace("Validated view join: {} of: {} signed by: {} on: {}", + Digest.from(join.getJoin().getVm().getView()), Digest.from(join.getJoin().getVm().getId()), mid, + params().member().getId()); + } + return validated; + } + private PendingBlock validate(Validate v) { Digest hash = Digest.from(v.getHash()); var p = pending.get(hash); @@ -336,8 +356,8 @@ public void assembled() { ds.offer(validation); // controller.completeIt(); log.info("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(), - reconfiguration.hash, reconfiguration.height(), - slate.keySet().stream().map(m -> m.getId()).sorted().toList(), params().member().getId()); + reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(), + params().member().getId()); processPendingValidations(reconfiguration, p); } @@ -420,9 +440,9 @@ public void complete() { }); assembly.get().start(); assembly.get().assembled(); - List reasses = new ArrayList<>(); - pendingReassembles.drainTo(reasses); - assembly.get().inbound().accept(reasses); + var joins = new ArrayList<>(pendingJoins); + pendingJoins.clear(); + assembly.get().inbound().accept(joins); } @Override diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index 105d0d98d..1cfc03a25 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -12,7 +12,10 @@ import com.salesforce.apollo.choam.fsm.Reconfiguration; import com.salesforce.apollo.choam.fsm.Reconfiguration.Reconfigure; import com.salesforce.apollo.choam.fsm.Reconfiguration.Transitions; -import com.salesforce.apollo.choam.proto.*; +import com.salesforce.apollo.choam.proto.Join; +import com.salesforce.apollo.choam.proto.Reassemble; +import com.salesforce.apollo.choam.proto.SignedJoin; +import com.salesforce.apollo.choam.proto.SignedViewMember; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.JohnHancock; @@ -26,7 +29,10 @@ import java.security.PublicKey; import java.time.Duration; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -48,10 +54,9 @@ public class ViewAssembly { protected final Transitions transitions; private final AtomicBoolean cancelSlice = new AtomicBoolean(); private final Digest nextViewId; - private final Map proposals = new ConcurrentHashMap<>(); + private final Map proposals = new ConcurrentHashMap<>(); private final Consumer publisher; - private final Map slate = new ConcurrentSkipListMap<>(); - private final Map> unassigned = new ConcurrentHashMap<>(); + private final Map slate = new ConcurrentSkipListMap<>(); private final ViewContext view; private final CommonCommunications comms; private final Set polled = Collections.newSetFromMap( @@ -77,7 +82,7 @@ public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publ nextAssembly.keySet(), params().member().getId()); } - public Map getSlate() { + public Map getSlate() { return slate; } @@ -95,34 +100,21 @@ void assembled() { void complete() { cancelSlice.set(true); - proposals.values() + proposals.entrySet() .stream() - .filter(p -> p.validations.size() >= params().majority()) - .forEach(p -> slate.put(p.member(), joinOf(p))); + .forEach(e -> slate.put(e.getKey(), Join.newBuilder().setMember(e.getValue()).build())); Context pendingContext = view.pendingView(); - if (slate.size() >= pendingContext.majority()) { - log.debug("View Assembly: {} completed with: {} members on: {}", nextViewId, slate.size(), - params().member().getId()); - } else { - log.debug("Failed view assembly completion, election required: {} slate: {} of: {} on: {}", - pendingContext.majority(), proposals.values() - .stream() - .map(p -> String.format("%s:%s", p.member.getId(), - p.validations.size())) - .sorted() - .toList(), nextViewId, params().member().getId()); - transitions.complete(); - } + log.debug("View Assembly: {} completed with: {} members on: {}", nextViewId, slate.size(), + params().member().getId()); } void finalElection() { transitions.complete(); } - Consumer> inbound() { + Consumer> inbound() { return lre -> { - lre.stream().flatMap(re -> re.getMembersList().stream()).forEach(vm -> join(vm, false)); - lre.stream().flatMap(re -> re.getValidationsList().stream()).forEach(this::validate); + lre.forEach(vm -> join(vm.getJoin(), false)); }; } @@ -152,7 +144,6 @@ private boolean consider(Optional futureSailor, Terminal term, } SignedViewMember signedViewMember; signedViewMember = futureSailor.get(); - log.debug("Join reply from: {} on: {}", term.getMember().getId(), params().member().getId()); if (signedViewMember.equals(SignedViewMember.getDefaultInstance())) { log.debug("Empty join response from: {} on: {}", term.getMember().getId(), params().member().getId()); return !gathered(); @@ -163,6 +154,7 @@ private boolean consider(Optional futureSailor, Terminal term, params().member().getId()); return !gathered(); } + log.debug("Join reply from: {} on: {}", term.getMember().getId(), params().member().getId()); join(signedViewMember, true); return !gathered(); } @@ -176,14 +168,6 @@ private boolean gathered() { return false; } - private Reassemble getMemberProposal() { - return Reassemble.newBuilder() - .addAllMembers(proposals.values().stream().map(p -> p.vm).toList()) - .addAllValidations( - proposals.values().stream().flatMap(p -> p.validations.values().stream()).toList()) - .build(); - } - private void join(SignedViewMember svm, boolean direct) { final var mid = Digest.from(svm.getVm().getId()); final var m = nextAssembly.get(mid); @@ -233,117 +217,38 @@ private void join(SignedViewMember svm, boolean direct) { } return; } - AtomicBoolean newJoin = new AtomicBoolean(); - - var proposed = proposals.computeIfAbsent(mid, k -> { - newJoin.set(true); - return new Proposed(svm, m, new ConcurrentSkipListMap<>()); - }); - - var builder = Reassemble.newBuilder(); - proposed.validations.computeIfAbsent(params().member(), k -> { - var validate = view.generateValidation(svm); - builder.addValidations(validate); - return validate; - }); - - if (newJoin.get()) { + if (proposals.putIfAbsent(mid, svm) == null) { if (log.isTraceEnabled()) { log.trace("Adding view member: {} on: {}", ViewContext.print(svm, params().digestAlgorithm()), params().member().getId()); } if (direct) { - builder.addMembers(svm); - } - var validations = unassigned.remove(mid); - if (validations != null) { - validations.forEach(this::validate); + publisher.accept(Reassemble.newBuilder().addMembers(svm).build()); } } polled.add(mid); - var reass = builder.build(); - if (reass.isInitialized()) { - publisher.accept(reass); - } - } - - private Join joinOf(Proposed candidate) { - final List witnesses = candidate.validations.values() - .stream() - .map(Validate::getWitness) - .sorted( - Comparator.comparing(c -> new Digest(c.getId()))) - .toList(); - return Join.newBuilder().setMember(candidate.vm).addAllEndorsements(witnesses).build(); } private Parameters params() { return view.params(); } - private void validate(Validate v) { - final var cid = Digest.from(v.getWitness().getId()); - var certifier = view.context().getMember(cid); - if (certifier == null) { - log.warn("Unknown certifier: {} on: {}", cid, params().member().getId()); - return; - } - final var digest = Digest.from(v.getHash()); - final var member = nextAssembly.get(digest); - if (member == null) { - log.warn("Unknown next view member: {} on: {}", digest, params().member().getId()); - return; - } - var proposed = proposals.get(digest); - if (proposed == null) { - log.warn("Unassigned certification, unknown view join: {} on: {}", digest, params().member().getId()); - unassigned.computeIfAbsent(digest, d -> new CopyOnWriteArrayList<>()).add(v); - return; - } - if (!view.validate(proposed.vm, v)) { - log.warn("Invalid certification for view join: {} from: {} on: {}", digest, - Digest.from(v.getWitness().getId()), params().member().getId()); - return; - } - var newCertifier = new AtomicBoolean(); - proposed.validations.computeIfAbsent(certifier, k -> { - log.debug("Validation of view member: {}:{} using certifier: {} on: {}", member.getId(), digest, - certifier.getId(), params().member().getId()); - newCertifier.set(true); - return v; - }); - if (newCertifier.get()) { - transitions.validation(); - } - } - - private record Proposed(SignedViewMember vm, Member member, Map validations) { + private record Proposed(SignedViewMember vm, Member member) { } private class Recon implements Reconfiguration { @Override public void certify() { - var certified = proposals.entrySet() - .stream() - .filter(p -> p.getValue().validations.size() >= params().majority()) - .map(Map.Entry::getKey) - .sorted() - .toList(); - Context memberContext = view.pendingView(); - var required = memberContext.majority(); - if (certified.size() >= required) { + if (proposals.size() == nextAssembly.size()) { cancelSlice.set(true); - log.debug("Certifying: {} required: {} of: {} slate: {} on: {}", certified.size(), required, - nextViewId, certified, params().member().getId()); + log.debug("Certifying: {} required: {} of: {} slate: {} on: {}", proposals.size(), nextAssembly.size(), + nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); transitions.certified(); } else { - log.debug("Not certifying: {} required: {} slate: {} of: {} on: {}", certified.size(), required, - proposals.entrySet() - .stream() - .map(e -> String.format("%s:%s", e.getKey(), e.getValue().validations.size())) - .sorted() - .toList(), nextViewId, params().member().getId()); + log.debug("Not certifying: {} required: {} slate: {} of: {} on: {}", proposals.size(), + nextAssembly.size(), proposals.entrySet().stream().sorted().toList(), nextViewId, + params().member().getId()); } } @@ -354,33 +259,17 @@ public void complete() { @Override public void elect() { - proposals.values() - .stream() - .filter(p -> p.validations.size() >= params().majority()) - .sorted(Comparator.comparing(p -> p.member.getId())) - .forEach(p -> slate.put(p.member(), joinOf(p))); - if (slate.size() >= view.pendingView().majority()) { + proposals.entrySet().stream().forEach(e -> slate.put(e.getKey(), joinOf(e.getValue()))); + if (slate.size() == view.pendingView().getRingCount()) { cancelSlice.set(true); log.debug("Electing: {} of: {} slate: {} proposals: {} on: {}", slate.size(), nextViewId, - slate.keySet().stream().map(Member::getId).sorted().toList(), proposals.values() - .stream() - .map( - p -> String.format( - "%s:%s", - p.member.getId(), - p.validations.size())) - .sorted() - .toList(), + slate.keySet().stream().sorted().toList(), proposals.keySet().stream().sorted().toList(), params().member().getId()); transitions.complete(); } else { Context memberContext = view.pendingView(); log.error("Failed election, required: {} slate: {} of: {} on: {}", memberContext.majority(), - proposals.values() - .stream() - .map(p -> String.format("%s:%s", p.member.getId(), p.validations.size())) - .sorted() - .toList(), nextViewId, params().member().getId()); + proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId()); } } @@ -415,8 +304,12 @@ public void gather() { @Override public void nominate() { - publisher.accept(getMemberProposal()); + // publisher.accept(getMemberProposal()); transitions.nominated(); } + + private Join joinOf(SignedViewMember vm) { + return Join.newBuilder().setMember(vm).build(); + } } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java index c06aedb9a..1a64d79ae 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java @@ -47,7 +47,7 @@ public ViewContext(Context context, Parameters params, Supplier m.getId()).toList()); short pid = 0; for (Digest d : remapped.keySet().stream().sorted().toList()) { roster.put(remapped.get(d).getId(), pid++); @@ -121,7 +121,7 @@ public Validate generateValidation(SignedViewMember svm) { return validation; } - public Block genesis(Map slate, Digest nextViewId, HashedBlock previous) { + public Block genesis(Map slate, Digest nextViewId, HashedBlock previous) { return blockProducer.genesis(slate, nextViewId, previous); } @@ -156,7 +156,7 @@ public void publish(HashedCertifiedBlock block) { blockProducer.publish(block.hash, block.certifiedBlock); } - public Block reconfigure(Map aggregate, Digest nextViewId, HashedBlock lastBlock, + public Block reconfigure(Map aggregate, Digest nextViewId, HashedBlock lastBlock, HashedBlock checkpoint) { return blockProducer.reconfigure(aggregate, nextViewId, lastBlock, checkpoint); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java index c8323c15c..0cdf3ac47 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java @@ -24,8 +24,6 @@ public interface Genesis { void gather(List preblock, boolean last); - void nominate(); - void nominations(List preblock, boolean last); void publish(); @@ -52,7 +50,7 @@ public void gather() { @Override public Transitions nextEpoch(Integer epoch) { - return epoch.equals(0) ? null : NOMINATION; + return epoch.equals(0) ? null : CERTIFICATION; } @@ -61,22 +59,6 @@ public Transitions process(List preblock, boolean last) { context().gather(preblock, last); return null; } - }, NOMINATION { - @Override - public Transitions nextEpoch(Integer epoch) { - return CERTIFICATION; - } - - @Entry - public void nominate() { - context().nominate(); - } - - @Override - public Transitions process(List preblock, boolean last) { - context().nominations(preblock, last); - return null; - } }, PUBLISH { @Entry public void publish() { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java index 775a01a5a..5acb7c4cb 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java @@ -156,8 +156,8 @@ public int getRemainingValidations() { return validations.size(); } - public void offer(Reassemble reassembly) { - reassemblies.offer(reassembly); + public boolean offer(Reassemble reassembly) { + return reassemblies.offer(reassembly); } public boolean offer(Transaction txn) { diff --git a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java index 974363d66..83d441916 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java @@ -137,7 +137,7 @@ public Block checkpoint() { } @Override - public Block genesis(Map joining, Digest nextViewId, HashedBlock previous) { + public Block genesis(Map joining, Digest nextViewId, HashedBlock previous) { return CHOAM.genesis(viewId, joining, previous, committee, previous, built, previous, Collections.emptyList()); } @@ -163,7 +163,7 @@ public void publish(Digest hash, CertifiedBlock cb) { } @Override - public Block reconfigure(Map joining, Digest nextViewId, HashedBlock previous, + public Block reconfigure(Map joining, Digest nextViewId, HashedBlock previous, HashedBlock checkpoint) { return null; } diff --git a/grpc/src/main/proto/choam.proto b/grpc/src/main/proto/choam.proto index 1cc17b0d0..5c5c7416c 100644 --- a/grpc/src/main/proto/choam.proto +++ b/grpc/src/main/proto/choam.proto @@ -104,12 +104,18 @@ message UnitData { repeated Validate validations = 1; repeated Transaction transactions = 2; repeated Reassemble reassemblies = 3; + repeated SignedJoin joins = 4; } message Join { SignedViewMember member = 1; - repeated Certification endorsements = 2; - stereotomy.KERL_ kerl = 3; + stereotomy.KERL_ kerl = 2; +} + +message SignedJoin { + crypto.Digeste member = 1; + SignedViewMember join = 2; + crypto.Sig signature = 3; } message ViewMember {