Skip to content

Commit

Permalink
whole lotta stabilizing, clarifying, etc
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Apr 1, 2024
1 parent 211c02d commit 29fef4b
Show file tree
Hide file tree
Showing 29 changed files with 817 additions and 359 deletions.
210 changes: 101 additions & 109 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java

Large diffs are not rendered by default.

21 changes: 12 additions & 9 deletions choam/src/main/java/com/salesforce/apollo/choam/Committee.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
public interface Committee {

static Map<Member, Verifier> validatorsOf(Reconfigure reconfigure, Context<Member> context) {
return reconfigure.getJoinsList()
.stream()
.collect(Collectors.toMap(e -> context.getMember(new Digest(e.getMember().getId())),
e -> new DefaultVerifier(
publicKey(e.getMember().getConsensusKey()))));
var validators = reconfigure.getJoinsList()
.stream()
.collect(Collectors.toMap(e -> context.getMember(new Digest(e.getMember().getId())),
e -> (Verifier) new DefaultVerifier(
publicKey(e.getMember().getConsensusKey()))));
assert !validators.isEmpty() : "No validators in this reconfiguration of: " + context.getId();
return validators;
}

/**
Expand Down Expand Up @@ -73,7 +75,7 @@ default void assembled() {

Logger log();

void nextView(CHOAM.PendingView pendingView);
void nextView(Context<Member> pendingView);

Parameters params();

Expand Down Expand Up @@ -137,17 +139,18 @@ default boolean validate(HashedCertifiedBlock hb, Map<Member, Verifier> validato
valid++;
}
}
final int toleranceLevel = params.majority();
final int toleranceLevel = params.context().toleranceLevel();
log().trace("Validate: {} height: {} count: {} needed: {} on: {}}", hb.hash, hb.height(), valid, toleranceLevel,
params.member().getId());
return valid >= toleranceLevel;
return valid > toleranceLevel;
}

default boolean validateRegeneration(HashedCertifiedBlock hb) {
if (!hb.block.hasGenesis()) {
return false;
}
var reconfigure = hb.block.getGenesis().getInitialView();
return validate(hb, validatorsOf(reconfigure, params().context()));
var validators = validatorsOf(reconfigure, params().context());
return !validators.isEmpty() && validate(hb, validators);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,

final Fsm<Genesis, Transitions> fsm = Fsm.construct(this, Transitions.class, BrickLayer.INITIAL, true);
this.transitions = fsm.getTransitions();
fsm.setName("Genesis:" + view.context().getId() + ":" + params().member().getId());

fsm.setName("Genesis%s on: %s".formatted(view.context().getId(), params().member().getId()));

Config.Builder config = params().producer().ethereal().clone();

Expand All @@ -105,13 +106,14 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
public void certify() {
proposals.values()
.stream()
.filter(p -> p.certifications.size() >= params().majority())
.filter(p -> p.certifications.size() > params().majority())
.forEach(p -> slate.put(p.member(), joinOf(p)));
assert !slate.isEmpty() : "Slate is empty, no certifications";
reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(),
new NullBlock(
params().digestAlgorithm())));
var validate = view.generateValidation(reconfiguration);
log.trace("Certifying genesis block: {} for: {} count: {} on: {}", reconfiguration.hash, view.context().getId(),
log.debug("Certifying genesis block: {} for: {} count: {} on: {}", reconfiguration.hash, view.context().getId(),
slate.size(), params().member().getId());
ds = new OneShot();
ds.setValue(validate.toByteString());
Expand Down Expand Up @@ -203,9 +205,9 @@ public void publish() {
.map(Map.Entry::getValue)
.forEach(v -> b.addCertifications(v.getWitness()));
view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), b.build()));
controller.completeIt();
log.debug("Genesis block: {} published for: {} on: {}", reconfiguration.hash, view.context().getId(),
params().member().getId());
// controller.completeIt();
log.info("Genesis block: {} published for: {} on: {}", reconfiguration.hash, view.context().getId(),
params().member().getId());
}

public void start() {
Expand Down Expand Up @@ -240,7 +242,7 @@ private void certify(Validate v) {
var member = view.context().getMember(Digest.from(v.getWitness().getId()));
if (member != null) {
witnesses.put(member, v);
if (witnesses.size() >= params().majority()) {
if (witnesses.size() > params().majority()) {
if (published.compareAndSet(false, true)) {
publish();
}
Expand Down Expand Up @@ -337,13 +339,19 @@ private void validate(Validate v) {
return; // do not have the join yet
}
if (!view.validate(proposed.join.getMember(), v)) {
log.warn("Invalid cetification for view join: {} from: {} on: {}", hash,
log.warn("Invalid certification for view join: {} from: {} on: {}", hash,
Digest.from(v.getWitness().getId()), params().member().getId());
return;
}
proposed.certifications.put(certifier, v.getWitness());
log.debug("Validation of view member: {}:{} using certifier: {} on: {}", member.getId(), hash,
certifier.getId(), params().member().getId());
var prev = proposed.certifications.put(certifier, v.getWitness());
if (prev == null) {
log.debug("New validation of view member: {} hash: {} using certifier: {} witnesses: {} on: {}",
member.getId(), hash, certifier.getId(), proposed.certifications.values().size(),
params().member().getId());
} else {
log.debug("Redundant validation of view member: {} hash: {} using certifier: {} on: {}", member.getId(),
hash, certifier.getId(), params().member().getId());
}
}

private record Proposed(Join join, Member member, Map<Member, Certification> certifications) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
public class GenesisContext extends ViewContext {

public GenesisContext(Context<Member> context, Supplier<CHOAM.PendingView> pendingView, Parameters params,
public GenesisContext(Context<Member> context, Supplier<Context<Member>> pendingView, Parameters params,
Signer signer, BlockProducer blockProducer) {
super(context, params, pendingView, signer, Collections.emptyMap(), blockProducer);
}
Expand Down
22 changes: 19 additions & 3 deletions choam/src/main/java/com/salesforce/apollo/choam/Parameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -82,7 +83,7 @@ public Supplier<KERL_> kerl() {
}

public int majority() {
return runtime.context.majority();
return runtime.context.majority(true);
}

public SigningMember member() {
Expand All @@ -93,6 +94,10 @@ public ChoamMetrics metrics() {
return runtime.metrics;
}

public CompletableFuture<Void> onFailure() {
return runtime().onFailure();
}

public TransactionExecutor processor() {
return runtime.processor;
}
Expand Down Expand Up @@ -288,7 +293,7 @@ public record RuntimeParameters(DelegatedContext<Member> context, Router communi
Function<Map<Member, Join>, List<Transaction>> genesisData,
TransactionExecutor processor, BiConsumer<HashedBlock, CheckpointState> restorer,
Function<ULong, File> checkpointer, ChoamMetrics metrics, Supplier<KERL_> kerl,
FoundationSeal foundation) {
FoundationSeal foundation, CompletableFuture<Void> onFailure) {
public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -325,9 +330,11 @@ public static class Builder implements Cloneable {
private BiConsumer<HashedBlock, CheckpointState> restorer = (height, checkpointState) -> {
};

private CompletableFuture<Void> onFailure = new CompletableFuture<>();

public RuntimeParameters build() {
return new RuntimeParameters(new DelegatedContext<Member>(context), communications, member, genesisData,
processor, restorer, checkpointer, metrics, kerl, foundation);
processor, restorer, checkpointer, metrics, kerl, foundation, onFailure);
}

@Override
Expand Down Expand Up @@ -414,6 +421,15 @@ public Builder setMetrics(ChoamMetrics metrics) {
return this;
}

public CompletableFuture<Void> getOnFailure() {
return onFailure;
}

public Builder setOnFailure(CompletableFuture<Void> onFailure) {
this.onFailure = onFailure;
return this;
}

public TransactionExecutor getProcessor() {
return processor;
}
Expand Down
39 changes: 19 additions & 20 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Producer(ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint,
// Number of rounds we can provide data for
final var blocks = ep.getEpochLength() - 2;
final int maxElements = blocks * lastEpoch;
final var name = "Producer" + getViewId() + params().member().getId().toString();

ds = new TxDataSource(params.member(), maxElements, params.metrics(), producerParams.maxBatchByteSize(),
producerParams.batchInterval(), producerParams.maxBatchCount(),
params().drainPolicy().build());
Expand All @@ -87,7 +87,7 @@ public Producer(ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint,
params.member().getId());

var fsm = Fsm.construct(new DriveIn(), Transitions.class, Earner.INITIAL, true);
fsm.setName(name);
fsm.setName("Producer%s on: %s".formatted(getViewId(), params.member().getId()));
transitions = fsm.getTransitions();

Config.Builder config = params().producer().ethereal().clone();
Expand Down Expand Up @@ -176,7 +176,7 @@ private void create(List<ByteString> preblock, boolean last) {
.map(this::validate)
.filter(Objects::nonNull)
.filter(p -> !p.published.get())
.filter(p -> p.witnesses.size() >= params().majority())
.filter(p -> p.witnesses.size() > params().majority())
.forEach(this::publish);

var reass = Reassemble.newBuilder();
Expand Down Expand Up @@ -218,8 +218,8 @@ private void create(List<ByteString> preblock, boolean last) {
final var p = new PendingBlock(next, new HashMap<>(), new AtomicBoolean());
pending.put(next.hash, p);
p.witnesses.put(params().member(), validation);
log.debug("Created block: {} height: {} prev: {} last: {} on: {}", next.hash, next.height(), lb.hash, last,
params().member().getId());
log.debug("Created block: {} hash: {} height: {} prev: {} last: {} on: {}", next.block.getBodyCase(),
next.hash, next.height(), lb.hash, last, params().member().getId());
}
if (last) {
started.set(true);
Expand All @@ -244,14 +244,11 @@ private void produceAssemble() {
final var vlb = previousBlock.get();
nextViewId = vlb.hash;
nextAssembly.addAll(Committee.viewMembersOf(nextViewId, params().context()));
var diadem = view.pendingView().diadem();
log.debug("Assembling: {} diadem: {} on: {}", nextViewId, diadem, params().member().getId());
log.debug("Assembling: {} on: {}", nextViewId, params().member().getId());
final var assemble = new HashedBlock(params().digestAlgorithm(), view.produce(vlb.height().add(1), vlb.hash,
Assemble.newBuilder()
.setNextView(
vlb.hash.toDigeste())
.setDiadem(
diadem.toDigeste())
.build(),
checkpoint.get()));
previousBlock.set(assemble);
Expand All @@ -260,13 +257,13 @@ private void produceAssemble() {
pending.put(assemble.hash, p);
p.witnesses.put(params().member(), validation);
ds.offer(validation);
log.debug("View assembly: {} diadem: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, diadem,
assemble.hash, assemble.height(), assemble.block.getBodyCase(), getViewId(),
params().member().getId());
log.debug("Produced view assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash,
assemble.height(), assemble.block.getBodyCase(), getViewId(), params().member().getId());
}

private void publish(PendingBlock p) {
log.debug("Published pending: {} height: {} on: {}", p.block.hash, p.block.height(), params().member().getId());
log.debug("Published pending: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(),
p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId());
p.published.set(true);
pending.remove(p.block.hash);
final var cb = CertifiedBlock.newBuilder()
Expand All @@ -284,7 +281,8 @@ private PendingBlock validate(Validate v) {
return null;
}
if (!view.validate(p.block, v)) {
log.trace("Invalid validate for: {} on: {}", hash, params().member().getId());
log.trace("Invalid validate for: {} hash: {} on: {}", p.block.block.getBodyCase(), hash,
params().member().getId());
return null;
}
p.witnesses.put(view.context().getMember(Digest.from(v.getWitness().getId())), v);
Expand All @@ -311,7 +309,7 @@ public void assembled() {
pending.put(reconfiguration.hash, p);
p.witnesses.put(params().member(), validation);
ds.offer(validation);
controller.completeIt();
// controller.completeIt();
log.info("Reconfiguration block: {} height: {} produced on: {}", reconfiguration.hash,
reconfiguration.height(), params().member().getId());
}
Expand All @@ -325,7 +323,8 @@ public void checkAssembly() {
}
final var viewAssembly = assembly.get();
if (viewAssembly == null) {
log.warn("Assemble block never processed on: {}", params().member().getId());
log.error("Assemble block never processed on: {}", params().member().getId());
transitions.failed();
return;
}
viewAssembly.finalElection();
Expand Down Expand Up @@ -369,6 +368,7 @@ public void create(List<ByteString> preblock, boolean last) {
@Override
public void fail() {
stop();
view.onFailure();
}

@Override
Expand All @@ -378,14 +378,13 @@ public void produceAssemble() {

@Override
public void reconfigure() {
log.debug("Starting view reconfiguration: {} diadem: {} on: {}", nextViewId, view.pendingView().diadem(),
params().member().getId());
log.debug("Starting view reconfiguration: {} on: {}", nextViewId, params().member().getId());
assembly.set(new ViewAssembly(nextViewId, view, Producer.this::addReassemble, comms) {
@Override
public void complete() {
super.complete();
log.debug("View reconfiguration: {} diadem: {} gathered: {} complete on: {}", nextViewId,
view.pendingView().diadem(), getSlate().size(), params().member().getId());
log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(),
params().member().getId());
assembled.set(true);
Producer.this.transitions.viewComplete();
}
Expand Down
Loading

0 comments on commit 29fef4b

Please sign in to comment.