Skip to content

Commit

Permalink
View agreement on reassembly. F*ck yea
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Apr 18, 2024
1 parent a58f92e commit 3e406a5
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 19 deletions.
6 changes: 6 additions & 0 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,12 @@ public PendingView get(Digest diadem) {
return views.get(diadem);
}

public Views.Builder getViews(Digest hash) {
var builder = Views.newBuilder();
views.values().stream().map(pv -> pv.getView(hash)).forEach(v -> builder.addViews(v));
return builder;
}

public PendingView last() {
final var l = lock.readLock();
try {
Expand Down
122 changes: 104 additions & 18 deletions choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@

import com.chiralbehaviors.tron.Fsm;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.choam.comm.Terminal;
import com.salesforce.apollo.choam.fsm.Reconfiguration;
import com.salesforce.apollo.choam.fsm.Reconfiguration.Reconfigure;
import com.salesforce.apollo.choam.fsm.Reconfiguration.Transitions;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.JohnHancock;
import com.salesforce.apollo.cryptography.proto.Digeste;
import com.salesforce.apollo.cryptography.proto.PubKey;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.ring.SliceIterator;
Expand Down Expand Up @@ -50,16 +51,17 @@
*/
public class ViewAssembly {

private final static Logger log = LoggerFactory.getLogger(ViewAssembly.class);
private final static Logger log = LoggerFactory.getLogger(ViewAssembly.class);
protected final Transitions transitions;
private final AtomicBoolean cancelSlice = new AtomicBoolean();
private final AtomicBoolean cancelSlice = new AtomicBoolean();
private final Digest nextViewId;
private final Map<Digest, SignedViewMember> proposals = new ConcurrentHashMap<>();
private final Map<Digest, Views> viewProposals = new ConcurrentHashMap<>();
private final Map<Digest, SignedViewMember> proposals = new ConcurrentHashMap<>();
private final Consumer<Assemblies> publisher;
private final Map<Digest, Join> slate = new HashMap<>();
private final Map<Digest, Join> slate = new HashMap<>();
private final ViewContext view;
private final CommonCommunications<Terminal, ?> comms;
private final Set<Digest> polled = Collections.newSetFromMap(
private final Set<Digest> polled = Collections.newSetFromMap(
new ConcurrentSkipListMap<>());
private volatile View selected;

Expand Down Expand Up @@ -108,22 +110,62 @@ void finalElection() {
}

Consumer<List<Assemblies>> inbound() {
return lre -> {
lre.forEach(this::assemble);
};
return lre -> lre.forEach(this::assemble);
}

private void assemble(Assemblies ass) {
log.info("Assembling {} joins and {} views on: {}", ass.getJoinsCount(), ass.getViewsCount(),
params().member().getId());
ass.getJoinsList().stream().filter(view::validate).forEach(sj -> join(sj.getJoin(), false));
var candidates = HashMultiset.create();
var majority = params().context().toleranceLevel() + 1;
if (selected != null) {
log.info("View already selected: {} on: {}", selected.diadem, params().member().getId());
return;
}
for (SignedViews svs : ass.getViewsList()) {
if (view.validate(svs)) {
candidates.addAll(svs.getViews().getViewsList());
viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews());
}
}
candidates.entrySet().stream().filter(e -> e.getCount() >= majority).forEach(e -> {
});
vote();
}

private Map<Digest, Member> assemblyOf(List<Digeste> committee) {
var last = view.pendingViews().last();
return committee.stream()
.map(d -> last.context().getMember(Digest.from(d)))
.collect(Collectors.toMap(m -> m.getId(), m -> m));
}

private void castVote() {
var views = view.pendingViews()
.getViews(nextViewId)
.setMember(params().member().getId().toDigeste())
.setVid(nextViewId.toDigeste())
.build();
log.info("Voting for: {} on: {}", nextViewId, params().member().getId());
publisher.accept(Assemblies.newBuilder()
.addViews(
SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig()))
.build());
}

private void castVote(Views vs, List<com.salesforce.apollo.choam.proto.View> majorities,
Multiset<com.salesforce.apollo.choam.proto.View> consensus) {
var ordered = vs.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList();
var lastIndex = -1;
com.salesforce.apollo.choam.proto.View last = null;
for (var v : majorities) {
var i = ordered.indexOf(Digest.from(v.getDiadem()));
if (i != -1) {
if (i > lastIndex) {
last = v;
lastIndex = i;
}
}
}
if (last != null) {
consensus.add(last);
}
}

private void completeSlice(Duration retryDelay, AtomicReference<Runnable> reiterate) {
Expand Down Expand Up @@ -247,11 +289,53 @@ private Parameters params() {
return view.params();
}

private record View(Digest diadem, Map<Digest, Member> assembly, Context<Member> context) {
private void vote() {
Multiset<com.salesforce.apollo.choam.proto.View> candidates = HashMultiset.create();
viewProposals.values().forEach(v -> candidates.addAll(v.getViewsList()));
var majority = params().majority();
var majorities = candidates.entrySet()
.stream()
.filter(e -> e.getCount() >= majority)
.map(e -> e.getElement())
.toList();
if (majorities.isEmpty()) {
log.info("No majority views on: {}", params().member().getId());
return;
}
if (log.isTraceEnabled()) {
log.trace("Majority views: {} on: {}", majorities.stream().map(v -> Digest.from(v.getDiadem())),
params().member().getId());
}
Multiset<com.salesforce.apollo.choam.proto.View> consensus = HashMultiset.create();
viewProposals.values().forEach(vs -> {
castVote(vs, majorities, consensus);
});
var ratification = consensus.entrySet()
.stream()
.filter(e -> e.getCount() >= majority)
.map(e -> e.getElement())
.collect(Collectors.toList());
if (consensus.isEmpty()) {
log.info("No consensus views on: {}", params().member().getId());
return;
} else if (log.isTraceEnabled()) {
log.trace("Consensus views: {} on: {}", ratification.stream().map(v -> Digest.from(v.getDiadem())),
params().member().getId());
}
ratification.sort(Comparator.comparing(v -> Digest.from(v.getDiadem())));
var winner = ratification.getFirst();
selected = new View(Digest.from(winner.getDiadem()), assemblyOf(winner.getCommitteeList()));
if (log.isDebugEnabled()) {
log.debug("Selected view: {} on: {}", selected, params().member().getId());
}
transitions.viewDetermined();
}

private record View(Digest diadem, Map<Digest, Member> assembly) {
public View(CHOAM.PendingView view, Digest viewId) {
this(view.diadem(), Committee.viewMembersOf(viewId, view.context())
.stream()
.collect(Collectors.toMap(m -> m.getId(), m -> m)), view.context());
.collect(Collectors.toMap(m -> m.getId(), m -> m)));

}
}
Expand Down Expand Up @@ -301,7 +385,9 @@ public void failed() {

@Override
public void gather() {
selected = new View(view.pendingViews().last(), nextViewId);
if (selected == null) {
selected = new View(view.pendingViews().last(), nextViewId);
}
log.trace("Gathering assembly for: {} on: {}", nextViewId, params().member().getId());
AtomicReference<Runnable> reiterate = new AtomicReference<>();
var retryDelay = Duration.ofMillis(10);
Expand All @@ -326,7 +412,7 @@ public void nominate() {

@Override
public void viewAgreement() {
view.pendingViews();
ViewAssembly.this.castVote();
}

private Join joinOf(SignedViewMember vm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ public JohnHancock sign(SignedViewMember svm) {
return signer.sign(svm.toByteString());
}

public JohnHancock sign(Views views) {
if (log.isTraceEnabled()) {
log.trace("Signing views on: {}", params.member().getId());
}
return signer.sign(views.toByteString());
}

public boolean validate(HashedBlock block, Validate validate) {
Verifier v = verifierOf(validate);
if (v == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ enum Reconfigure implements Transitions {
AWAIT_ASSEMBLY {
@Override
public Transitions assembled() {
return GATHER;
return VIEW_AGREEMENT;
}
}, CERTIFICATION {
@Override
Expand Down Expand Up @@ -63,6 +63,11 @@ public void assembly() {
public Transitions gathered() {
return NOMINATION;
}

@Override
public Transitions viewDetermined() {
return null;
}
}, NOMINATION {
@Entry
public void nominate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ public int compareTo(Digest id) {
if (id == this) {
return 0;
}
if (hash.length != id.hash.length) {
throw new IllegalArgumentException("hash length incorrect for algorithm");
}
for (int i = 0; i < hash.length; i++) {
int compare = Long.compareUnsigned(hash[i], id.hash[i]);
if (compare != 0) {
Expand Down

0 comments on commit 3e406a5

Please sign in to comment.