Skip to content

Commit

Permalink
misc-7 (#210)
Browse files Browse the repository at this point in the history
* fix invalid transition

* use tolerance + 1 for majority, rather than 2/3+1 in DHT/iterations.

* limit bootstrapping to min of 4. fix RingIterator logic.

* generic gate on view bootstrapping

* prevent state transition errors on failure

* gate view change in KerlDHT

* tighten up the screws

* use majority for observer's view change.

utility for listing network interfaces

* better ViewChange

* better view change, bug fixes, serialize Gorgoneion endorsement

* provide RingIterator functionality on the SliceIterator. virtual synchrony using SI for KerlDHT

* emit last block as beacon if no transactions to publish

* cleanup

* cleanup
  • Loading branch information
Hellblazer authored Jun 15, 2024
1 parent 84cf5a6 commit 3ce0e66
Show file tree
Hide file tree
Showing 34 changed files with 612 additions and 469 deletions.
22 changes: 11 additions & 11 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DelegatedContext;
import com.salesforce.apollo.context.StaticContext;
import com.salesforce.apollo.context.ViewChange;
import com.salesforce.apollo.cryptography.*;
import com.salesforce.apollo.cryptography.Signer.SignerImpl;
import com.salesforce.apollo.cryptography.proto.PubKey;
Expand Down Expand Up @@ -322,15 +323,14 @@ public String logState() {

/**
* A view change has occurred
*
* @param context - the new membership context
* @param diadem - the compact HexBloom of the context view
*/
public void rotateViewKeys(Context<Member> context, Digest diadem) {
public void rotateViewKeys(ViewChange viewChange) {
var context = viewChange.context();
var diadem = viewChange.diadem();
((DelegatedContext<Member>) combine.getContext()).setContext(context);
var c = current.get();
if (c != null) {
c.nextView(diadem, context);
c.nextView(viewChange.diadem(), context);
} else {
log.info("Acquiring new view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(),
params.member().getId());
Expand Down Expand Up @@ -361,11 +361,11 @@ public void stop() {
}
session.cancelAll();
try {
linear.shutdown();
linear.shutdownNow();
} catch (Throwable e) {
}
try {
executions.shutdown();
executions.shutdownNow();
} catch (Throwable e) {
}
final var c = current.get();
Expand Down Expand Up @@ -1239,16 +1239,16 @@ private void synchronizationFailed() {
cancelSynchronization();
Context<Member> memberContext = context();
var activeCount = memberContext.size();
var majority = params.majority();
if (params.generateGenesis() && activeCount >= majority) {
var count = context().getRingCount();
if (params.generateGenesis() && activeCount >= context().getRingCount()) {
if (current.get() == null && current.compareAndSet(null, new Formation())) {
log.info(
"Quorum achieved, triggering regeneration. members: {} required: {} forming Genesis committee on: {}",
activeCount, majority, params.member().getId());
activeCount, count, params.member().getId());
transitions.regenerate();
} else {
log.info("Quorum achieved, members: {} required: {} existing committee: {} on: {}", activeCount,
majority, current.get().getClass().getSimpleName(), params.member().getId());
count, current.get().getClass().getSimpleName(), params.member().getId());
}
} else {
final var c = current.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
} else {
config.setPid(pid).setnProc((short) view.roster().size());
}
config.setEpochLength(7).setNumberOfEpochs(3);
config.setEpochLength(33).setNumberOfEpochs(-1);
config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId());
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(),
transitions::process, transitions::nextEpoch, label);
Expand All @@ -106,13 +106,11 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,

@Override
public void certify() {
if (slate.size() < params().majority()) {
log.info("Not certifying genesis for: {} need: {} slate incomplete: {} on: {}", view.context().getId(),
params().majority(), slate.keySet().stream().sorted().toList(), params().member().getId());
if (slate.size() != nextAssembly.size()) {
log.info("Not certifying genesis for: {} slate incomplete: {} on: {}", view.context().getId(),
slate.keySet().stream().sorted().toList(), params().member().getId());
return;
}
assert slate.size() >= params().majority() : "Expected: %s members, slate: %s".formatted(params().majority(),
slate.size());
reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(),
new NullBlock(
params().digestAlgorithm())));
Expand All @@ -130,7 +128,7 @@ public void certify(List<ByteString> preblock, boolean last) {
try {
return Validate.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
log.warn("Unable to parse preblock: {} on: {}", bs, params().member().getId(), e);
log.trace("Unable to parse preblock: {} on: {}", bs, params().member().getId(), e);
return null;
}
}).filter(Objects::nonNull).filter(v -> !v.equals(Validate.getDefaultInstance())).forEach(this::certify);
Expand All @@ -154,7 +152,7 @@ public void gather(List<ByteString> preblock, boolean last) {
try {
return Join.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
log.warn("error parsing join: {} on: {}", bs, params().member().getId(), e);
log.trace("error parsing join: {} on: {}", bs, params().member().getId(), e);
return null;
}
})
Expand All @@ -163,6 +161,9 @@ public void gather(List<ByteString> preblock, boolean last) {
.peek(j -> log.info("Gathering: {} on: {}", Digest.from(j.getMember().getVm().getId()),
params().member().getId()))
.forEach(this::join);
if (slate.size() == nextAssembly.size()) {
transitions.gathered();
}
}

@Override
Expand All @@ -172,7 +173,7 @@ public void nominations(List<ByteString> preblock, boolean last) {
try {
return Validations.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
log.warn("error parsing validations: {} on: {}", bs, params().member().getId(), e);
log.trace("error parsing validations: {} on: {}", bs, params().member().getId(), e);
return null;
}
})
Expand All @@ -187,12 +188,12 @@ public void publish() {
log.trace("Cannot publish genesis, reconfiguration is NULL on: {}", params().member().getId());
return;
}
if (witnesses.size() < params().majority()) {
if (witnesses.size() < nextAssembly.size()) {
log.trace("Cannot publish genesis: {} with: {} witnesses on: {}", reconfiguration.hash, witnesses.size(),
params().member().getId());
return;
}
if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < params().majority()) {
if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < nextAssembly.size()) {
log.trace("Cannot publish genesis: {} with: {} joins on: {}", reconfiguration.hash,
reconfiguration.block.getGenesis().getInitialView().getJoinsCount(), params().member().getId());
return;
Expand Down
22 changes: 2 additions & 20 deletions choam/src/main/java/com/salesforce/apollo/choam/Parameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public record Parameters(Parameters.RuntimeParameters runtime, ReliableBroadcast
Parameters.BootstrapParameters bootstrap, Parameters.ProducerParameters producer,
Parameters.MvStoreBuilder mvBuilder, Parameters.LimiterBuilder txnLimiterBuilder,
ExponentialBackoffPolicy.Builder submitPolicy, int checkpointSegmentSize,
ExponentialBackoffPolicy.Builder drainPolicy, boolean generateGenesis) {
boolean generateGenesis) {

public static Builder newBuilder() {
return new Builder();
Expand Down Expand Up @@ -677,14 +677,6 @@ public static class Builder implements Cloneable {
private ReliableBroadcaster.Parameters combine = ReliableBroadcaster.Parameters.newBuilder()
.build();
private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT;
private ExponentialBackoffPolicy.Builder drainPolicy = ExponentialBackoffPolicy.newBuilder()
.setInitialBackoff(
Duration.ofMillis(5))
.setJitter(0.2)
.setMultiplier(1.2)
.setMaxBackoff(
Duration.ofMillis(
500));
private Digest genesisViewId;
private Duration gossipDuration = Duration.ofSeconds(1);
private int maxCheckpointSegments = 200;
Expand All @@ -709,7 +701,7 @@ public Parameters build(RuntimeParameters runtime) {
return new Parameters(runtime, combine, gossipDuration, maxCheckpointSegments, submitTimeout, genesisViewId,
checkpointBlockDelta, crowns, digestAlgorithm, viewSigAlgorithm,
synchronizationCycles, regenerationCycles, bootstrap, producer, mvBuilder,
txnLimiterBuilder, submitPolicy, checkpointSegmentSize, drainPolicy, generateGenesis);
txnLimiterBuilder, submitPolicy, checkpointSegmentSize, generateGenesis);
}

@Override
Expand All @@ -726,7 +718,6 @@ public Builder clone() {
producer.batchInterval, producer.maxBatchCount(), producer.maxGossipDelay));
clone.setTxnLimiterBuilder(txnLimiterBuilder.clone());
clone.setSubmitPolicy(submitPolicy.clone());
clone.setDrainPolicy(drainPolicy.clone());
return clone;
}

Expand Down Expand Up @@ -783,15 +774,6 @@ public Builder setDigestAlgorithm(DigestAlgorithm digestAlgorithm) {
return this;
}

public ExponentialBackoffPolicy.Builder getDrainPolicy() {
return drainPolicy;
}

public Builder setDrainPolicy(ExponentialBackoffPolicy.Builder drainPolicy) {
this.drainPolicy = drainPolicy;
return this;
}

public Digest getGenesisViewId() {
return genesisViewId;
}
Expand Down
20 changes: 7 additions & 13 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class Producer {
private final Semaphore serialize = new Semaphore(1);
private final ViewAssembly assembly;
private final int maxEpoch;
private volatile int emptyPreBlocks = 0;
private volatile boolean assembled = false;

public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label) {
Expand All @@ -76,8 +75,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
maxEpoch = ep.getEpochLength();

ds = new TxDataSource(params.member(), blocks, params.metrics(), producerParams.maxBatchByteSize(),
producerParams.batchInterval(), producerParams.maxBatchCount(),
params().drainPolicy().build());
producerParams.batchInterval(), producerParams.maxBatchCount());

log.debug("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch,
params.member().getId());
Expand Down Expand Up @@ -206,7 +204,7 @@ private void create(List<ByteString> preblock, boolean last) {
processAssemblies(aggregate);
processTransactions(last, aggregate);
if (last) {
started.set(true);
started.set(false);
transitions.lastBlock();
}
}
Expand Down Expand Up @@ -255,15 +253,11 @@ private void processTransactions(boolean last, List<UnitData> aggregate) {
final var txns = aggregate.stream().flatMap(e -> e.getTransactionsList().stream()).toList();

if (txns.isEmpty()) {
var empty = emptyPreBlocks + 1;
emptyPreBlocks = empty;
if (empty % 5 == 0) {
pending.values()
.stream()
.filter(pb -> pb.published.get())
.max(Comparator.comparing(pb -> pb.block.height()))
.ifPresent(pb -> publish(pb, true));
}
pending.values()
.stream()
.filter(pb -> pb.published.get())
.max(Comparator.comparing(pb -> pb.block.height()))
.ifPresent(pb -> publish(pb, true));
return;
}
log.trace("transactions: {} combined hash: {} height: {} on: {}", txns.size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ public String toString() {
private class Recon implements Reconfiguration {
@Override
public void certify() {
countdown.set(-1);
if (proposals.size() == selected.assembly.size()) {
log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority,
nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId());
Expand Down Expand Up @@ -440,6 +441,7 @@ public void chill() {

@Override
public void complete() {
countdown.set(-1);
ViewAssembly.this.complete();
}

Expand All @@ -454,12 +456,14 @@ public void convened() {

@Override
public void failed() {
countdown.set(-1);
view.onFailure();
log.debug("Failed view assembly for: {} on: {}", nextViewId, params().member().getId());
}

@Override
public void finish() {
countdown.set(-1);
started.set(false);
}

Expand Down
65 changes: 65 additions & 0 deletions choam/src/main/java/com/salesforce/apollo/choam/fsm/Combine.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,71 @@ public Transitions rotateViewKeys() {
public void failIt() {
context().fail();
}

@Override
public Transitions beginCheckpoint() {
return null;
}

@Override
public Transitions bootstrap(HashedCertifiedBlock anchor) {
return null;
}

@Override
public Transitions combine() {
return null;
}

@Override
public Transitions fail() {
return null;
}

@Override
public Transitions finishCheckpoint() {
return null;
}

@Override
public Transitions nextView() {
return null;
}

@Override
public Transitions regenerate() {
return null;
}

@Override
public Transitions regenerated() {
return null;
}

@Override
public Transitions rotateViewKeys() {
return null;
}

@Override
public Transitions start() {
return null;
}

@Override
public Transitions synchd() {
return null;
}

@Override
public Transitions synchronizationFailed() {
return null;
}

@Override
public Transitions synchronizing() {
return null;
}
}, RECOVERING {
@Override
public Transitions bootstrap(HashedCertifiedBlock anchor) {
Expand Down
11 changes: 9 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ public void gather() {

@Override
public Transitions nextEpoch(Integer epoch) {
return epoch.equals(0) ? null : CERTIFICATION;
return null;
}

@Override
public Transitions gathered() {
return CERTIFICATION;
}

@Override
Expand All @@ -80,11 +84,14 @@ public Transitions process(List<ByteString> preblock, boolean last) {
return null;
}
}

}

interface Transitions extends FsmExecutor<Genesis, Genesis.Transitions> {

default Transitions gathered() {
throw fsm().invalidTransitionOn();
}

default Transitions nextEpoch(Integer epoch) {
throw fsm().invalidTransitionOn();
}
Expand Down
Loading

0 comments on commit 3ce0e66

Please sign in to comment.