Skip to content

Commit

Permalink
tighten up the screws
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 12, 2024
1 parent 1533216 commit aa6d0b2
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
} else {
config.setPid(pid).setnProc((short) view.roster().size());
}
config.setEpochLength(7).setNumberOfEpochs(3);
config.setEpochLength(33).setNumberOfEpochs(-1);
config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId());
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(),
transitions::process, transitions::nextEpoch, label);
Expand All @@ -106,13 +106,11 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,

@Override
public void certify() {
if (slate.size() < params().majority()) {
log.info("Not certifying genesis for: {} need: {} slate incomplete: {} on: {}", view.context().getId(),
params().majority(), slate.keySet().stream().sorted().toList(), params().member().getId());
if (slate.size() != nextAssembly.size()) {
log.info("Not certifying genesis for: {} slate incomplete: {} on: {}", view.context().getId(),
slate.keySet().stream().sorted().toList(), params().member().getId());
return;
}
assert slate.size() >= params().majority() : "Expected: %s members, slate: %s".formatted(params().majority(),
slate.size());
reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(),
new NullBlock(
params().digestAlgorithm())));
Expand Down Expand Up @@ -163,6 +161,9 @@ public void gather(List<ByteString> preblock, boolean last) {
.peek(j -> log.info("Gathering: {} on: {}", Digest.from(j.getMember().getVm().getId()),
params().member().getId()))
.forEach(this::join);
if (slate.size() == nextAssembly.size()) {
transitions.gathered();
}
}

@Override
Expand All @@ -187,12 +188,12 @@ public void publish() {
log.trace("Cannot publish genesis, reconfiguration is NULL on: {}", params().member().getId());
return;
}
if (witnesses.size() < params().majority()) {
if (witnesses.size() < nextAssembly.size()) {
log.trace("Cannot publish genesis: {} with: {} witnesses on: {}", reconfiguration.hash, witnesses.size(),
params().member().getId());
return;
}
if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < params().majority()) {
if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < nextAssembly.size()) {
log.trace("Cannot publish genesis: {} with: {} joins on: {}", reconfiguration.hash,
reconfiguration.block.getGenesis().getInitialView().getJoinsCount(), params().member().getId());
return;
Expand Down
22 changes: 2 additions & 20 deletions choam/src/main/java/com/salesforce/apollo/choam/Parameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public record Parameters(Parameters.RuntimeParameters runtime, ReliableBroadcast
Parameters.BootstrapParameters bootstrap, Parameters.ProducerParameters producer,
Parameters.MvStoreBuilder mvBuilder, Parameters.LimiterBuilder txnLimiterBuilder,
ExponentialBackoffPolicy.Builder submitPolicy, int checkpointSegmentSize,
ExponentialBackoffPolicy.Builder drainPolicy, boolean generateGenesis) {
boolean generateGenesis) {

public static Builder newBuilder() {
return new Builder();
Expand Down Expand Up @@ -677,14 +677,6 @@ public static class Builder implements Cloneable {
private ReliableBroadcaster.Parameters combine = ReliableBroadcaster.Parameters.newBuilder()
.build();
private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT;
private ExponentialBackoffPolicy.Builder drainPolicy = ExponentialBackoffPolicy.newBuilder()
.setInitialBackoff(
Duration.ofMillis(5))
.setJitter(0.2)
.setMultiplier(1.2)
.setMaxBackoff(
Duration.ofMillis(
500));
private Digest genesisViewId;
private Duration gossipDuration = Duration.ofSeconds(1);
private int maxCheckpointSegments = 200;
Expand All @@ -709,7 +701,7 @@ public Parameters build(RuntimeParameters runtime) {
return new Parameters(runtime, combine, gossipDuration, maxCheckpointSegments, submitTimeout, genesisViewId,
checkpointBlockDelta, crowns, digestAlgorithm, viewSigAlgorithm,
synchronizationCycles, regenerationCycles, bootstrap, producer, mvBuilder,
txnLimiterBuilder, submitPolicy, checkpointSegmentSize, drainPolicy, generateGenesis);
txnLimiterBuilder, submitPolicy, checkpointSegmentSize, generateGenesis);
}

@Override
Expand All @@ -726,7 +718,6 @@ public Builder clone() {
producer.batchInterval, producer.maxBatchCount(), producer.maxGossipDelay));
clone.setTxnLimiterBuilder(txnLimiterBuilder.clone());
clone.setSubmitPolicy(submitPolicy.clone());
clone.setDrainPolicy(drainPolicy.clone());
return clone;
}

Expand Down Expand Up @@ -783,15 +774,6 @@ public Builder setDigestAlgorithm(DigestAlgorithm digestAlgorithm) {
return this;
}

public ExponentialBackoffPolicy.Builder getDrainPolicy() {
return drainPolicy;
}

public Builder setDrainPolicy(ExponentialBackoffPolicy.Builder drainPolicy) {
this.drainPolicy = drainPolicy;
return this;
}

public Digest getGenesisViewId() {
return genesisViewId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
maxEpoch = ep.getEpochLength();

ds = new TxDataSource(params.member(), blocks, params.metrics(), producerParams.maxBatchByteSize(),
producerParams.batchInterval(), producerParams.maxBatchCount(),
params().drainPolicy().build());
producerParams.batchInterval(), producerParams.maxBatchCount());

log.debug("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch,
params.member().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ public String toString() {
private class Recon implements Reconfiguration {
@Override
public void certify() {
countdown.set(-1);
if (proposals.size() == selected.assembly.size()) {
log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority,
nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId());
Expand Down
11 changes: 9 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ public void gather() {

@Override
public Transitions nextEpoch(Integer epoch) {
return epoch.equals(0) ? null : CERTIFICATION;
return null;
}

@Override
public Transitions gathered() {
return CERTIFICATION;
}

@Override
Expand All @@ -80,11 +84,14 @@ public Transitions process(List<ByteString> preblock, boolean last) {
return null;
}
}

}

interface Transitions extends FsmExecutor<Genesis, Genesis.Transitions> {

default Transitions gathered() {
throw fsm().invalidTransitionOn();
}

default Transitions nextEpoch(Integer epoch) {
throw fsm().invalidTransitionOn();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public Transitions countdownCompleted() {
return RECONFIGURED;
}

@Override
public Transitions checkAssembly() {
return null;
}

// See if we already have a full complement of Joins of the next committee
// if not set a deadline
@Entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class TxDataSource implements DataSource {

private final Duration batchInterval;
private final AtomicBoolean draining = new AtomicBoolean();
private final ExponentialBackoffPolicy drainPolicy;
private final Member member;
private final ChoamMetrics metrics;
private final BatchingQueue<Transaction> processing;
Expand All @@ -47,10 +46,9 @@ public class TxDataSource implements DataSource {
private volatile Thread blockingThread;

public TxDataSource(Member member, int maxElements, ChoamMetrics metrics, int maxBatchByteSize,
Duration batchInterval, int maxBatchCount, ExponentialBackoffPolicy drainPolicy) {
Duration batchInterval, int maxBatchCount) {
this.member = member;
this.batchInterval = batchInterval;
this.drainPolicy = drainPolicy;
processing = new BatchingQueue<Transaction>(maxElements, maxBatchCount, tx -> tx.toByteString().size(),
maxBatchByteSize);
this.metrics = metrics;
Expand Down Expand Up @@ -85,7 +83,7 @@ public ByteString getData() {
var v = new ArrayList<Validate>();

if (draining.get()) {
var target = Instant.now().plus(drainPolicy.nextBackoff());
var target = Instant.now().plus(batchInterval);
while (target.isAfter(Instant.now()) && builder.getAssembliesCount() == 0
&& builder.getValidationsCount() == 0) {
// rinse and repeat
Expand All @@ -103,7 +101,7 @@ public ByteString getData() {

// sleep waiting for input
try {
Thread.sleep(drainPolicy.getInitialBackoff().dividedBy(2).toMillis());
Thread.sleep(batchInterval.dividedBy(2).toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ByteString.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.UnsafeExecutors;
import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DynamicContext;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
Expand Down Expand Up @@ -86,10 +85,7 @@ public void setUp() throws Exception {
.setNumberOfEpochs(3)
.setEpochLength(7))
.build())
.setCheckpointBlockDelta(checkpointBlockSize)
.setDrainPolicy(ExponentialBackoffPolicy.newBuilder()
.setInitialBackoff(Duration.ofMillis(1))
.setMaxBackoff(Duration.ofMillis(1)));
.setCheckpointBlockDelta(checkpointBlockSize);

members.subList(0, 4).forEach(m -> {
var context = (DynamicContext<Member>) contextBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ public SigningMember initialize(int checkpointBlockSize, int cardinality) throws
.build())
.setGenerateGenesis(true)
.setCheckpointBlockDelta(checkpointBlockSize);
params.getDrainPolicy().setInitialBackoff(Duration.ofMillis(1)).setMaxBackoff(Duration.ofMillis(1));
params.getProducer().ethereal().setNumberOfEpochs(2).setEpochLength(20);

var entropy = SecureRandom.getInstance("SHA1PRNG");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,19 @@
*/
package com.salesforce.apollo.choam.support;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import java.security.SecureRandom;
import java.time.Duration;

import org.junit.jupiter.api.Test;

import com.google.protobuf.ByteString;
import com.salesforce.apollo.choam.proto.Transaction;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember;
import com.salesforce.apollo.stereotomy.StereotomyImpl;
import com.salesforce.apollo.stereotomy.mem.MemKERL;
import com.salesforce.apollo.stereotomy.mem.MemKeyStore;
import org.junit.jupiter.api.Test;

import java.security.SecureRandom;
import java.time.Duration;

import static org.junit.jupiter.api.Assertions.*;

/**
* @author hal.hildebrand
Expand All @@ -34,7 +31,7 @@ public void func() throws Exception {
entropy.setSeed(new byte[] { 6, 6, 6 });
var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy);
TxDataSource ds = new TxDataSource(new ControlledIdentifierMember(stereotomy.newIdentifier()), 100, null, 1024,
Duration.ofMillis(100), 100, ExponentialBackoffPolicy.newBuilder().build());
Duration.ofMillis(100), 100);
Transaction tx = Transaction.newBuilder()
.setContent(ByteString.copyFromUtf8("Give me food or give me slack or kill me"))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ JohnHancock sign(ULong sequenceNumber, PrivateKey[] privateKeys, InputStream mes
public static SignatureAlgorithm fromSignatureCode(int i) {
return switch (i) {
case 0:
throw new IllegalArgumentException("Unknown signature code: " + i);
yield NULL_SIGNATURE;
case 1:
yield NULL_SIGNATURE;
case 2:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1858,7 +1858,7 @@ public void join(Join join, Digest from, StreamObserver<Gateway> responseObserve
/**
* The first message in the anti-entropy protocol. Process any digests from the inbound gossip digest. Respond
* with the Gossip that represents the digests newer or not known in this view, as well as updates from this
* node based on out of date information in the supplied digests.
* node based on out-of-date information in the supplied digests.
*
* @param request - the Gossip from our partner
* @return Teh response for Moar gossip - updates this node has which the sender is out of touch with, and
Expand Down
Loading

0 comments on commit aa6d0b2

Please sign in to comment.