Skip to content

Commit

Permalink
interim.
Browse files Browse the repository at this point in the history
moar
  • Loading branch information
Hellblazer committed Apr 27, 2024
1 parent ace496a commit b530290
Show file tree
Hide file tree
Showing 19 changed files with 173 additions and 127 deletions.
13 changes: 9 additions & 4 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,11 @@ public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmen
}
var crown = accumulator.build();
log.info("Checkpoint length: {} segment size: {} count: {} crown: {} initial: {} on: {}", length, segmentSize,
builder.getCount(), crown, initial, id);
builder.getCount(), crown.compactWrapped(), initial, id);
var cp = builder.setCrown(crown.toHexBloome()).build();

var deserialized = HexBloom.from(cp.getCrown());
log.info("Deserialized checkpoint crown: {} initial: {} on: {}", deserialized, initial, id);
log.info("Deserialized checkpoint crown: {} initial: {} on: {}", deserialized.compactWrapped(), initial, id);
return cp;
}

Expand Down Expand Up @@ -490,6 +490,8 @@ public Block checkpoint() {
public Block genesis(Map<Digest, Join> joining, Digest nextViewId, HashedBlock previous) {
final HashedCertifiedBlock cp = checkpoint.get();
final HashedCertifiedBlock v = view.get();
log.trace("Genesis cp: {} view: {} previous: {} on: {}", cp.hash, v.hash, previous.hash,
params.member().getId());
var g = CHOAM.genesis(nextViewId, joining, previous, v, params, cp, params.genesisData()
.apply(joining.keySet()
.stream()
Expand Down Expand Up @@ -1372,15 +1374,15 @@ public boolean validate(HashedCertifiedBlock hb) {
}

private void join(View view) {
log.info("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
var joining = new CompletableFuture<Void>();
if (!join.compareAndSet(null, joining)) {
log.info("Ongoing join of: {} should have been cancelled on: {}", Digest.from(view.getDiadem()),
params.member().getId());
transitions.fail();
return;
}
log.info("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
var servers = new GroupIterator(validators.keySet());
var joined = new HashSet<Member>();

Expand Down Expand Up @@ -1444,6 +1446,9 @@ private void join(View view, Terminal link, Member target, HashSet<Member> joine
joined.add(target);
log.trace("Joined with: {} view: {} diadem: {} on: {}", target.getId(), viewId,
Digest.from(view.getDiadem()), params.member().getId());
} catch (StatusRuntimeException sre) {
log.trace("Failed join attempt: {} with: {} view: {} diadem: {} on: {}", sre.getStatus(),
target.getId(), nextViewId, Digest.from(view.getDiadem()), params.member().getId(), sre);
} catch (Throwable t) {
log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId,
Digest.from(view.getDiadem()), params.member().getId(), t);
Expand Down
71 changes: 39 additions & 32 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -60,6 +57,7 @@ public class Producer {
private final Semaphore serialize = new Semaphore(1);
private final ViewAssembly assembly;
private final int maxEpoch;
private volatile int preblocks = 0;
private volatile boolean assembled = false;

public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label) {
Expand Down Expand Up @@ -119,7 +117,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
@Override
public boolean complete() {
if (super.complete()) {
log.debug("Vue reconfiguration: {} gathered: {} complete on: {}", nextViewId,
log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId,
getSlate().keySet().stream().sorted().toList(), params().member().getId());
assembled = true;
return true;
Expand All @@ -139,9 +137,11 @@ public void start() {
}
final Block prev = previousBlock.get().block;
// genesis block won't ever be 0
if (prev.hasReconfigure() && prev.getReconfigure().getCheckpointTarget() == 0) {
if (prev.hasGenesis() || (prev.hasReconfigure() && prev.getReconfigure().getCheckpointTarget() == 0)) {
transitions.checkpoint();
} else {
log.trace("Checkpoint target: {} for: {} on: {}", prev.getReconfigure().getCheckpointTarget(),
params().context().getId(), params().member().getId());
transitions.start();
}
}
Expand Down Expand Up @@ -234,7 +234,7 @@ private Parameters params() {

private void processAssemblies(List<UnitData> aggregate) {
var aggs = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList();
log.trace("Consuming {} assemblies from {} units on: {}", aggregate.size(), aggs.size(),
log.trace("Consuming {} assemblies from {} units on: {}", aggs.size(), aggregate.size(),
params().member().getId());
assembly.assemble(aggs);
}
Expand All @@ -253,29 +253,34 @@ private void processTransactions(boolean last, List<UnitData> aggregate) {
HashedBlock lb = previousBlock.get();
final var txns = aggregate.stream().flatMap(e -> e.getTransactionsList().stream()).toList();

if (!txns.isEmpty()) {
log.trace("transactions: {} combined hash: {} height: {} on: {}", txns.size(), txns.stream()
.map(t -> CHOAM.hashOf(t,
params().digestAlgorithm()))
.reduce(Digest::xor)
.orElse(null),
lb.height().add(1), params().member().getId());
var builder = Executions.newBuilder();
txns.forEach(builder::addExecutions);

var next = new HashedBlock(params().digestAlgorithm(),
view.produce(lb.height().add(1), lb.hash, builder.build(), checkpoint.get()));
previousBlock.set(next);

final var validation = view.generateValidation(next);
ds.offer(validation);
final var p = new PendingBlock(next, new HashMap<>(), new AtomicBoolean());
pending.put(next.hash, p);
p.witnesses.put(params().member(), validation);
log.debug("Produced block: {} hash: {} height: {} prev: {} last: {} on: {}", next.block.getBodyCase(),
next.hash, next.height(), lb.hash, last, params().member().getId());
processPendingValidations(next, p);
if (txns.isEmpty()) {
if (preblocks % 5 == 0) {
pending.values()
.stream()
.filter(pb -> pb.published.get())
.max(Comparator.comparing(pb -> pb.block.height()))
.ifPresent(this::publish);
}
return;
}
log.trace("transactions: {} combined hash: {} height: {} on: {}", txns.size(),
txns.stream().map(t -> CHOAM.hashOf(t, params().digestAlgorithm())).reduce(Digest::xor).orElse(null),
lb.height().add(1), params().member().getId());
var builder = Executions.newBuilder();
txns.forEach(builder::addExecutions);

var next = new HashedBlock(params().digestAlgorithm(),
view.produce(lb.height().add(1), lb.hash, builder.build(), checkpoint.get()));
previousBlock.set(next);

final var validation = view.generateValidation(next);
ds.offer(validation);
final var p = new PendingBlock(next, new HashMap<>(), new AtomicBoolean());
pending.put(next.hash, p);
p.witnesses.put(params().member(), validation);
log.debug("Produced block: {} hash: {} height: {} prev: {} last: {} on: {}", next.block.getBodyCase(),
next.hash, next.height(), lb.hash, last, params().member().getId());
processPendingValidations(next, p);
}

private void produceAssemble(ViewAssembly.Vue v) {
Expand All @@ -295,7 +300,7 @@ private void produceAssemble(ViewAssembly.Vue v) {
pending.put(assemble.hash, p);
p.witnesses.put(params().member(), validation);
ds.offer(validation);
log.debug("Vue assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash,
log.debug("View assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash,
assemble.height(), assemble.block.getBodyCase(), getViewId(), params().member().getId());
transitions.assembled();
}
Expand Down Expand Up @@ -347,6 +352,7 @@ private void serial(List<ByteString> preblock, Boolean last) {
return;
}
try {
preblocks++;
transitions.create(preblock, last);
} catch (Throwable t) {
log.error("Error processing preblock last: {} on: {}", last, params().member().getId(), t);
Expand All @@ -367,12 +373,13 @@ private PendingBlock validate(Validate v) {
}

private PendingBlock validate(Validate v, PendingBlock p, Digest hash) {
var from = Digest.from(v.getWitness().getId());
if (!view.validate(p.block, v)) {
log.trace("Invalid validate for: {} hash: {} on: {}", p.block.block.getBodyCase(), hash,
log.trace("Invalid validate from: {} for: {} hash: {} on: {}", from, p.block.block.getBodyCase(), hash,
params().member().getId());
return null;
}
p.witnesses.put(view.context().getMember(Digest.from(v.getWitness().getId())), v);
p.witnesses.put(view.context().getMember(from), v);
return p;
}

Expand Down
43 changes: 13 additions & 30 deletions choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,19 @@ void assemble(List<Assemblies> asses) {
svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
Digest.from(svs.getViews().getMember()), params().member().getId());
viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews());
if (viewProposals.size() == params().context().getRingCount()) {
transitions.certified();
countdown.set(-1);
}
} else {
log.info("Invalid views: {} from: {} on: {}",
svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
Digest.from(svs.getViews().getMember()), params().member().getId());
}
});
if (viewProposals.size() >= params().majority()) {
transitions.proposed();
} else {
log.trace("Incomplete view proposals: {} is less than majority: {} views: {} on: {}", viewProposals.size(),
params().majority(), viewProposals.keySet().stream().sorted().toList(),
params().member().getId());
}
}

boolean complete() {
Expand All @@ -144,7 +147,7 @@ boolean complete() {
}
if (proposals.size() < selected.majority) {
log.info("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId,
proposals.keySet().stream().toList(), selected.majority, params().member().getId());
proposals.keySet().stream().sorted().toList(), selected.majority, params().member().getId());
transitions.failed();
return false;
}
Expand All @@ -164,8 +167,8 @@ boolean complete() {
}
assert slate.size() == selected.assembly.size() : "Invalid slate: " + slate.size() + " expected: "
+ selected.assembly.size();
log.debug("View Assembly: {} completed with: {} members on: {}", nextViewId, slate.size(),
params().member().getId());
log.debug("View Assembly: {} completed assembly: {} on: {}", nextViewId,
slate.keySet().stream().sorted().toList(), params().member().getId());
transitions.complete();
return true;
}
Expand Down Expand Up @@ -258,7 +261,7 @@ void join(SignedViewMember svm, boolean direct) {
void newEpoch() {
var current = countdown.decrementAndGet();
if (current == 0) {
transitions.certified();
transitions.countdownCompleted();
}
}

Expand All @@ -271,8 +274,7 @@ private Map<Digest, Member> assemblyOf(List<Digeste> committee) {

private void checkAssembly() {
if (proposals.size() >= selected.majority) {
countdown.set(-1);
transitions.certified();
transitions.gathered();
}
}

Expand All @@ -293,7 +295,6 @@ private void propose() {
.addViews(
SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig()))
.build());
countdown.set(2);
}

private void propose(Views vs, List<View> majorities, Multiset<View> consensus) {
Expand Down Expand Up @@ -357,7 +358,7 @@ private void vote() {
log.debug("Selected: {} on: {}", selected, params().member().getId());
}
onConsensus.complete(selected);
transitions.certified();
transitions.viewAcquired();
pendingJoins.forEach(svm -> join(svm, false));
pendingJoins.clear();
}
Expand All @@ -375,8 +376,6 @@ public void certify() {
if (proposals.size() == selected.majority) {
log.debug("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority,
nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId());

proposals.forEach((key, value) -> slate.put(key, joinOf(value)));
transitions.certified();
} else {
countdown.set(2);
Expand All @@ -386,12 +385,10 @@ public void certify() {
}

public void checkAssembly() {
countdown.set(2);
ViewAssembly.this.checkAssembly();
}

public void checkViews() {
countdown.set(2);
vote();
}

Expand All @@ -400,20 +397,6 @@ public void complete() {
ViewAssembly.this.complete();
}

@Override
public void elect() {
if (selected != null && proposals.size() >= selected.majority) {
log.debug("Electing view: {} required: {} proposed: {} on: {}", nextViewId, selected.majority,
proposals.keySet().stream().sorted().toList(), params().member().getId());
transitions.complete();
} else {
log.error("Failed election selected: {} required: {} proposed: {} of: {} on: {}", selected != null,
selected != null ? -1 : selected.majority, proposals.keySet().stream().sorted().toList(),
nextViewId, params().member().getId());
transitions.failed();
}
}

@Override
public void failed() {
view.onFailure();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public Block checkpoint() {
return blockProducer.checkpoint();
}

public int committeeSize() {
return validators.size();
}

public Context<Member> context() {
return context;
}
Expand Down
Loading

0 comments on commit b530290

Please sign in to comment.