Skip to content

Commit

Permalink
vsync 3 (#194)
Browse files Browse the repository at this point in the history
* v sync 3

* Properly handle committee members who did not supply their view signing key in the CHOAM view reconfiguration.

Some clean up on builders, adding missing clone()

* try 2 forks again

* 4 forks

* interim.

Save progress before refactoring Join protocol.

* interim.

Invert join protocol. Future contacts past. Revive ASSEMBLE block.

Dear lord.

* interim.

moar

* interim.

moar

* hallelujah! build runs green

View change protocol now stable.

* 2 forks

* 4

* info logging for CHOAM

* no forks

* fix test params

* trace instead of info

* prioritize joins

* more efficient joins.
  • Loading branch information
Hellblazer authored Apr 28, 2024
1 parent ea24040 commit 11e71e6
Show file tree
Hide file tree
Showing 47 changed files with 1,380 additions and 796 deletions.
355 changes: 221 additions & 134 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java

Large diffs are not rendered by default.

53 changes: 40 additions & 13 deletions choam/src/main/java/com/salesforce/apollo/choam/Committee.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,45 @@
import com.salesforce.apollo.cryptography.Verifier;
import com.salesforce.apollo.cryptography.Verifier.DefaultVerifier;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.MockMember;
import io.grpc.StatusRuntimeException;
import org.slf4j.Logger;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey;
import static io.grpc.Status.ABORTED;

/**
* @author hal.hildebrand
*/
public interface Committee {

static Map<Member, Verifier> validatorsOf(Reconfigure reconfigure, Context<Member> context) {
var validators = reconfigure.getJoinsList()
.stream()
.collect(
Collectors.toMap(e -> context.getMember(new Digest(e.getMember().getVm().getId())),
e -> (Verifier) new DefaultVerifier(
publicKey(e.getMember().getVm().getConsensusKey()))));
static Map<Member, Verifier> validatorsOf(Reconfigure reconfigure, Context<Member> context, Digest member,
Logger log) {
var validators = reconfigure.getJoinsList().stream().collect(Collectors.toMap(e -> {
var id = new Digest(e.getMember().getVm().getId());
var m = context.getMember(id);
if (m == null) {
log.info("No member for validator: {}, returning mock on: {}", id, member);
return new MockMember(id);
} else {
return m;
}
}, e -> {
var vm = e.getMember().getVm();
if (vm.hasConsensusKey()) {

return new DefaultVerifier(publicKey(vm.getConsensusKey()));
} else {
log.info("No member for validator: {}, returning mock on: {}", Digest.from(vm.getId()), member);
return Verifier.NO_VERIFIER;
}
}));
assert !validators.isEmpty() : "No validators in this reconfiguration of: " + context.getId();
return validators;
}
Expand Down Expand Up @@ -64,11 +82,19 @@ static Set<Member> viewMembersOf(Digest hash, Context<? super Member> baseContex

void accept(HashedCertifiedBlock next);

default void assemble(Assemble assemble) {
}

void complete();

boolean isMember();

SignedViewMember join(Digest nextView, Digest from);
default void join(SignedViewMember nextView, Digest from) {
log().trace("Error joining by: {} view: {} diadem: {} invalid committee: {} on: {}", from,
Digest.from(nextView.getVm().getView()), Digest.from(nextView.getVm().getView()),
this.getClass().getSimpleName(), params().member().getId());
throw new StatusRuntimeException(ABORTED);
}

Logger log();

Expand Down Expand Up @@ -123,8 +149,8 @@ default boolean validate(HashedCertifiedBlock hb, Certification c, Map<Member, V

default boolean validate(HashedCertifiedBlock hb, Map<Member, Verifier> validators) {
Parameters params = params();

log().trace("Validating block: {} height: {} certs: {} on: {}", hb.hash, hb.height(),
log().trace("Validating block: {} hash: {} height: {} certs: {} on: {}", hb.block.getBodyCase(), hb.hash,
hb.height(),
hb.certifiedBlock.getCertificationsList().stream().map(c -> new Digest(c.getId())).toList(),
params.member().getId());
int valid = 0;
Expand All @@ -137,17 +163,18 @@ default boolean validate(HashedCertifiedBlock hb, Map<Member, Verifier> validato
}
}
final int toleranceLevel = params.context().toleranceLevel();
log().trace("Validate: {} height: {} count: {} needed: {} on: {}}", hb.hash, hb.height(), valid, toleranceLevel,
log().trace("Validate: {} height: {} count: {} needed: {} on: {}", hb.hash, hb.height(), valid, toleranceLevel,
params.member().getId());
return valid > toleranceLevel;
}

default boolean validateRegeneration(HashedCertifiedBlock hb) {
if (!hb.block.hasGenesis()) {
if (!Objects.requireNonNull(hb.block).hasGenesis()) {
return false;
}
var reconfigure = hb.block.getGenesis().getInitialView();
var validators = validatorsOf(reconfigure, params().context());
var validators = validatorsOf(reconfigure, params().context(), params().member().getId(), log());
return !validators.isEmpty() && validate(hb, validators);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public void stop() {
private void certify(Validate v) {
if (reconfiguration == null) {
pendingValidations.add(v);
return;
}
log.trace("Validating reconfiguration block: {} height: {} on: {}", reconfiguration.hash,
reconfiguration.height(), params().member().getId());
Expand Down
21 changes: 19 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/Parameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ public Builder setMaxGossipDelay(Duration maxGossipDelay) {
}
}

public static class LimiterBuilder {
public static class LimiterBuilder implements Cloneable {
private Duration backlogDuration = Duration.ofSeconds(1);
private int backlogSize = 1_000;
private double backoffRatio = 0.5;
Expand All @@ -597,6 +597,14 @@ public Limiter<Void> build(String name, MetricRegistry metrics) {
.build();
}

public LimiterBuilder clone() {
try {
return (LimiterBuilder) super.clone();
} catch (CloneNotSupportedException e) {
throw new IllegalStateException(e);
}
}

public int getBacklogSize() {
return backlogSize;
}
Expand Down Expand Up @@ -706,11 +714,20 @@ public Parameters build(RuntimeParameters runtime) {

@Override
public Builder clone() {
Builder clone;
try {
return (Builder) super.clone();
clone = (Builder) super.clone();
} catch (CloneNotSupportedException e) {
throw new IllegalStateException("well, that was unexpected");
}
clone.setMvBuilder(mvBuilder.clone());
clone.setProducer(
new ProducerParameters(producer.ethereal.clone(), producer.gossipDuration, producer.maxBatchByteSize(),
producer.batchInterval, producer.maxBatchCount(), producer.maxGossipDelay));
clone.setTxnLimiterBuilder(txnLimiterBuilder.clone());
clone.setSubmitPolicy(submitPolicy.clone());
clone.setDrainPolicy(drainPolicy.clone());
return clone;
}

public BootstrapParameters getBootstrap() {
Expand Down
Loading

0 comments on commit 11e71e6

Please sign in to comment.