Skip to content

Commit

Permalink
Further simplification of View Reassembly. Remove ASSEMBLE block type.
Browse files Browse the repository at this point in the history
Further clean up of view assembly
  • Loading branch information
Hellblazer committed Apr 15, 2024
1 parent 77d99b5 commit 45399c0
Show file tree
Hide file tree
Showing 16 changed files with 175 additions and 240 deletions.
4 changes: 4 additions & 0 deletions choam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
<groupId>org.jooq</groupId>
<artifactId>joou</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
</dependency>

<!-- Test only deps below this line -->
<dependency>
Expand Down
59 changes: 23 additions & 36 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.choam.comm.*;
Expand Down Expand Up @@ -95,11 +96,15 @@ public class CHOAM {
private final TransSubmission txnSubmission = new TransSubmission();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final AtomicReference<Context<Member>> pendingView = new AtomicReference<>();
private final ConcurrentLinkedHashMap<Digest, Context<Member>> pendingViews;

public CHOAM(Parameters params) {
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
this.params = params;
executions = Executors.newVirtualThreadPerTaskExecutor();
pendingViews = new ConcurrentLinkedHashMap.Builder<Digest, Context<Member>>().maximumWeightedCapacity(100)
.initialCapacity(10)
.build();

nextView();
var bContext = new DelegatedContext<>(params.context());
Expand Down Expand Up @@ -321,10 +326,15 @@ public void nextView(Context<Member> context, Digest diadem) {
if (c != null) {
c.nextView(context);
} else {
log.info("Acquiring new view, diadem: {} size: {} on: {}", diadem, context.size(), params.member().getId());
log.info("Acquiring new view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(),
params.member().getId());
params.context().setContext(context);
pendingView.set(null);
}

log.info("Pushing pending view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(),
params.member().getId());
pendingViews.putIfAbsent(diadem, context);
}

public void start() {
Expand Down Expand Up @@ -512,20 +522,6 @@ public void onFailure() {
transitions.fail();
}

@Override
public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) {
final HashedCertifiedBlock v = view.get();
var block = Block.newBuilder()
.setHeader(
buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(),
checkpoint.hash, v.height(), v.hash))
.setAssemble(assemble)
.build();
log.trace("Produced block: {} height: {} on: {}", block.getBodyCase(), block.getHeader().getHeight(),
params.member().getId());
return block;
}

@Override
public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) {
final HashedCertifiedBlock v = view.get();
Expand Down Expand Up @@ -674,30 +670,27 @@ private Supplier<Context<Member>> pendingView() {
};
}

private Supplier<ConcurrentLinkedHashMap<Digest, Context<Member>>> pendingViews() {
return () -> pendingViews;
}

private void process() {
final var c = current.get();
final HashedCertifiedBlock h = head.get();
log.info("Begin block: {} hash: {} height: {} committee: {} on: {}", h.block.getBodyCase(), h.hash, h.height(),
c.getClass().getSimpleName(), params.member().getId());
switch (h.block.getBodyCase()) {
case ASSEMBLE: {
params.processor().beginBlock(h.height(), h.hash);
nextViewId.set(Digest.from(h.block.getAssemble().getNextView()));
log.info("Assembled next view id: {} on: {}", nextViewId.get(), params.member().getId());
c.assembled();
break;
}
case RECONFIGURE: {
params.processor().beginBlock(h.height(), h.hash);
reconfigure(h.block.getReconfigure());
reconfigure(h.hash, h.block.getReconfigure());
break;
}
case GENESIS: {
cancelSynchronization();
cancelBootstrap();
transitions.regenerated();
genesisInitialization(h, h.block.getGenesis().getInitializeList());
reconfigure(h.block.getGenesis().getInitialView());
reconfigure(h.hash, h.block.getGenesis().getInitialView());
break;
}
case EXECUTIONS: {
Expand All @@ -719,9 +712,9 @@ private void process() {
params.member().getId());
}

private void reconfigure(Reconfigure reconfigure) {
log.info("Clearing next view id on: {}", params.member().getId());
nextViewId.set(null);
private void reconfigure(Digest hash, Reconfigure reconfigure) {
log.info("Setting next view id: {} on: {}", hash, params.member().getId());
nextViewId.set(hash);
var pv = pendingView.getAndSet(null);
if (pv != null) {
// always advance view.
Expand All @@ -741,7 +734,7 @@ private void reconfigure(Reconfigure reconfigure) {
} else {
log.warn("Reconfiguration to associate failed: {} in view: {} on:{}", validators.size(),
new Digest(reconfigure.getId()), params.member().getId());
current.set(new Client(validators, getViewId()));
transitions.fail();
}
} else {
current.set(new Client(validators, getViewId()));
Expand Down Expand Up @@ -1013,8 +1006,6 @@ public interface BlockProducer {

void onFailure();

Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint);

Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint);

void publish(Digest hash, CertifiedBlock cb);
Expand Down Expand Up @@ -1306,16 +1297,12 @@ private class Associate extends Administration {
params.member().getId());
Signer signer = new SignerImpl(nextView.consensusKeyPair.getPrivate(), ULong.MIN);
Supplier<Context<Member>> pv = pendingView();
producer = new Producer(new ViewContext(context, params, pv, signer, validators, constructBlock()),
producer = new Producer(nextViewId.get(),
new ViewContext(context, params, pv, signer, validators, constructBlock()),
head.get(), checkpoint.get(), comm, getLabel());
producer.start();
}

@Override
public void assembled() {
producer.assembled();
}

@Override
public void complete() {
producer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ static Set<Member> viewMembersOf(Digest hash, Context<? super Member> baseContex

void accept(HashedCertifiedBlock next);

default void assembled() {
}

void complete();

boolean isMember();
Expand Down
130 changes: 35 additions & 95 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.salesforce.apollo.choam.support.TxDataSource;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.cryptography.JohnHancock;
import com.salesforce.apollo.ethereal.Config;
import com.salesforce.apollo.ethereal.Config.Builder;
import com.salesforce.apollo.ethereal.Ethereal;
Expand All @@ -45,7 +44,6 @@ public class Producer {

private static final Logger log = LoggerFactory.getLogger(Producer.class);
private final AtomicBoolean assembled = new AtomicBoolean();
private final AtomicReference<ViewAssembly> assembly = new AtomicReference<>();
private final AtomicReference<HashedBlock> checkpoint = new AtomicReference<>();
private final CommonCommunications<Terminal, ?> comms;
private final Ethereal controller;
Expand All @@ -61,15 +59,17 @@ public class Producer {
private final AtomicBoolean started = new AtomicBoolean(false);
private final Transitions transitions;
private final ViewContext view;
private volatile Digest nextViewId;
private final Digest nextViewId;
private ViewAssembly assembly;

public Producer(ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint,
public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint,
CommonCommunications<Terminal, ?> comms, String label) {
assert view != null;
this.view = view;
this.previousBlock.set(lastBlock);
this.comms = comms;
this.checkpoint.set(checkpoint);
this.nextViewId = nextViewId;

final Parameters params = view.params();
final var producerParams = params.producer();
Expand Down Expand Up @@ -112,22 +112,14 @@ public Producer(ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint,
log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId());
}

public void assembled() {
transitions.assembled();
}

public Digest getNextViewId() {
final Digest current = nextViewId;
return current;
}

public void start() {
if (!started.compareAndSet(false, true)) {
return;
}
reconfigure();
final Block prev = previousBlock.get().block;
if (prev.hasReconfigure() && prev.getReconfigure().getCheckpointTarget() == 0) { // genesis block won't ever be
// 0
// genesis block won't ever be 0
if (prev.hasReconfigure() && prev.getReconfigure().getCheckpointTarget() == 0) {
transitions.checkpoint();
} else {
transitions.start();
Expand All @@ -141,7 +133,7 @@ public void stop() {
log.trace("Closing producer for: {} on: {}", getViewId(), params().member().getId());
controller.stop();
coordinator.stop();
final var c = assembly.get();
final var c = assembly;
if (c != null) {
c.stop();
}
Expand All @@ -159,11 +151,11 @@ public SubmitResult submit(Transaction transaction) {
}
}

private void addReassemble(Reassemble r) {
if (ds.offer(r)) {
log.trace("Adding joins: {} on: {}", r.getMembersList(), params().member().getId());
private void addJoin(SignedJoin signedJoin) {
if (ds.offer(signedJoin)) {
log.trace("Adding on: {}", params().member().getId());
} else {
log.trace("Cannot add joins: {} on: {}", r.getMembersCount(), params().member().getId());
log.trace("Cannot add join on: {}", params().member().getId());
}
}

Expand All @@ -189,13 +181,13 @@ private void create(List<ByteString> preblock, boolean last) {
.filter(p -> p.witnesses.size() >= params().majority())
.forEach(this::publish);

var joins = aggregate.stream().flatMap(e -> e.getJoinsList().stream()).filter(j -> validate(j)).toList();
final var ass = assembly.get();
var joins = aggregate.stream().flatMap(e -> e.getJoinsList().stream()).filter(view::validate).toList();
final var ass = assembly;
if (ass != null) {
log.trace("Consuming joins: {} on: {}", aggregate.size(), joins.size(), params().member().getId());
log.trace("Consuming {} units, {} joins on: {}", aggregate.size(), joins.size(), params().member().getId());
ass.inbound().accept(joins);
} else {
log.trace("Pending joins: {} on: {}", aggregate.size(), joins.size(), params().member().getId());
log.trace("Pending {} units, {} joins on: {}", aggregate.size(), joins.size(), params().member().getId());
pendingJoins.addAll(joins);
}

Expand Down Expand Up @@ -254,30 +246,6 @@ private void processPendingValidations(HashedBlock block, PendingBlock p) {
}
}

private void produceAssemble() {
final var vlb = previousBlock.get();
nextViewId = vlb.hash;
for (var m : Committee.viewMembersOf(nextViewId, view.pendingView())) {
nextAssembly.put(m.getId(), m);
}
log.debug("Assembling: {} on: {}", nextViewId, params().member().getId());
final var assemble = new HashedBlock(params().digestAlgorithm(), view.produce(vlb.height().add(1), vlb.hash,
Assemble.newBuilder()
.setNextView(
vlb.hash.toDigeste())
.build(),
checkpoint.get()));
previousBlock.set(assemble);
final var validation = view.generateValidation(assemble);
final var p = new PendingBlock(assemble, new HashMap<>(), new AtomicBoolean());
pending.put(assemble.hash, p);
p.witnesses.put(params().member(), validation);
ds.offer(validation);
log.debug("Produced block: {} hash: {} height: {} from: {} on: {}", assemble.block.getBodyCase(), assemble.hash,
assemble.height(), getViewId(), params().member().getId());
processPendingValidations(assemble, p);
}

private void publish(PendingBlock p) {
assert p.witnesses.size() >= params().majority() : "Publishing non majority block";
log.debug("Published pending: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(),
Expand All @@ -291,26 +259,23 @@ private void publish(PendingBlock p) {
view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), cb));
}

private boolean validate(SignedJoin join) {
var mid = Digest.from(join.getMember());
var m = nextAssembly.get(mid);
if (m == null) {
log.trace("Cannot validate join view: {} of: {} signed by: {} on: {}",
Digest.from(join.getJoin().getVm().getView()), Digest.from(join.getJoin().getVm().getId()), mid,
params().member().getId());
return false;
}
var validated = m.verify(JohnHancock.from(join.getSignature()), join.getJoin().toByteString());
if (!validated) {
log.trace("Cannot validate view join: {} of: {} signed by: {} on: {}",
Digest.from(join.getJoin().getVm().getView()), Digest.from(join.getJoin().getVm().getId()), mid,
params().member().getId());
} else {
log.trace("Validated view join: {} of: {} signed by: {} on: {}",
Digest.from(join.getJoin().getVm().getView()), Digest.from(join.getJoin().getVm().getId()), mid,
params().member().getId());
}
return validated;
private void reconfigure() {
log.debug("Starting view reconfiguration: {} on: {}", nextViewId, params().member().getId());
assembly = new ViewAssembly(nextViewId, view, Producer.this::addJoin, comms) {
@Override
public void complete() {
super.complete();
log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(),
params().member().getId());
assembled.set(true);
Producer.this.transitions.viewComplete();
}
};
assembly.start();
assembly.assembled();
var joins = new ArrayList<>(pendingJoins);
pendingJoins.clear();
assembly.inbound().accept(joins);
}

private PendingBlock validate(Validate v) {
Expand Down Expand Up @@ -345,7 +310,7 @@ public void assembled() {
log.debug("assembly already complete on: {}", params().member().getId());
return;
}
final var slate = assembly.get().getSlate();
final var slate = assembly.getSlate();
var reconfiguration = new HashedBlock(params().digestAlgorithm(),
view.reconfigure(slate, nextViewId, previousBlock.get(),
checkpoint.get()));
Expand All @@ -368,7 +333,7 @@ public void checkAssembly() {
if (dropped != 0) {
log.warn("Dropped txns: {} on: {}", dropped, params().member().getId());
}
final var viewAssembly = assembly.get();
final var viewAssembly = assembly;
if (viewAssembly == null) {
log.error("Assemble block never processed on: {}", params().member().getId());
transitions.failed();
Expand Down Expand Up @@ -420,31 +385,6 @@ public void fail() {
view.onFailure();
}

@Override
public void produceAssemble() {
Producer.this.produceAssemble();
}

@Override
public void reconfigure() {
log.debug("Starting view reconfiguration: {} on: {}", nextViewId, params().member().getId());
assembly.set(new ViewAssembly(nextViewId, view, Producer.this::addReassemble, comms) {
@Override
public void complete() {
super.complete();
log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(),
params().member().getId());
assembled.set(true);
Producer.this.transitions.viewComplete();
}
});
assembly.get().start();
assembly.get().assembled();
var joins = new ArrayList<>(pendingJoins);
pendingJoins.clear();
assembly.get().inbound().accept(joins);
}

@Override
public void startProduction() {
log.debug("Starting production for: {} on: {}", getViewId(), params().member().getId());
Expand Down
Loading

0 comments on commit 45399c0

Please sign in to comment.