Skip to content

Commit

Permalink
Merge branch 'master' into thoth-2
Browse files Browse the repository at this point in the history
# Conflicts:
#	model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java
  • Loading branch information
Hellblazer committed Feb 4, 2024
2 parents 1eee755 + 7395477 commit d67dec0
Show file tree
Hide file tree
Showing 21 changed files with 258 additions and 206 deletions.
6 changes: 1 addition & 5 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ jobs:
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Set up Maven
uses: stCarolas/[email protected]
with:
maven-version: 3.9.4
- uses: actions/checkout@v3
- uses: graalvm/setup-graalvm@v1
with:
Expand All @@ -27,4 +23,4 @@ jobs:
cache: 'maven'
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: mvn -batch-mode clean install -Ppre --file pom.xml
run: ./mvnw -batch-mode clean install -Ppre --file pom.xml
89 changes: 63 additions & 26 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ public class CHOAM {
private final Combine.Transitions transitions;
private final TransSubmission txnSubmission = new TransSubmission();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final AtomicReference<Digest> diadem = new AtomicReference<>();

public CHOAM(Parameters params) {
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
this.params = params;
diadem.set(params.digestAlgorithm().getLast());
executions = Executors.newVirtualThreadPerTaskExecutor();

nextView();
Expand Down Expand Up @@ -173,10 +175,10 @@ public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmen
return cp;
}

public static Block genesis(Digest id, Map<Member, Join> joins, HashedBlock head, Context<Member> context,
HashedBlock lastViewChange, Parameters params, HashedBlock lastCheckpoint,
Iterable<Transaction> initialization) {
var reconfigure = reconfigure(id, joins, context, params, params.checkpointBlockDelta());
public static Block genesis(Digest id, Digest diadem, Map<Member, Join> joins, HashedBlock head,
Context<Member> context, HashedBlock lastViewChange, Parameters params,
HashedBlock lastCheckpoint, Iterable<Transaction> initialization) {
var reconfigure = reconfigure(id, diadem, joins, context, params, params.checkpointBlockDelta());
return Block.newBuilder()
.setHeader(buildHeader(params.digestAlgorithm(), reconfigure, head.hash, ULong.valueOf(0),
lastCheckpoint.height(), lastCheckpoint.hash, lastViewChange.height(),
Expand All @@ -194,9 +196,12 @@ public static String print(Join join, DigestAlgorithm da) {
+ "certifications: " + join.getEndorsementsList().stream().map(c -> ViewContext.print(c, da)).toList() + "]";
}

public static Reconfigure reconfigure(Digest nextViewId, Map<Member, Join> joins, Context<Member> context,
Parameters params, int checkpointTarget) {
var builder = Reconfigure.newBuilder().setCheckpointTarget(checkpointTarget).setId(nextViewId.toDigeste());
public static Reconfigure reconfigure(Digest nextViewId, Digest diadem, Map<Member, Join> joins,
Context<Member> context, Parameters params, int checkpointTarget) {
var builder = Reconfigure.newBuilder()
.setCheckpointTarget(checkpointTarget)
.setId(nextViewId.toDigeste())
.setView(diadem.toDigeste());

// Canonical labeling of the view members for Ethereal
var remapped = rosterMap(context, joins.keySet());
Expand All @@ -207,14 +212,14 @@ public static Reconfigure reconfigure(Digest nextViewId, Map<Member, Join> joins
return reconfigure;
}

public static Block reconfigure(Digest nextViewId, Map<Member, Join> joins, HashedBlock head,
public static Block reconfigure(Digest nextViewId, Digest diadem, Map<Member, Join> joins, HashedBlock head,
Context<Member> context, HashedBlock lastViewChange, Parameters params,
HashedBlock lastCheckpoint) {
final Block lvc = lastViewChange.block;
int lastTarget = lvc.hasGenesis() ? lvc.getGenesis().getInitialView().getCheckpointTarget()
: lvc.getReconfigure().getCheckpointTarget();
int checkpointTarget = lastTarget == 0 ? params.checkpointBlockDelta() : lastTarget - 1;
var reconfigure = reconfigure(nextViewId, joins, context, params, checkpointTarget);
var reconfigure = reconfigure(nextViewId, diadem, joins, context, params, checkpointTarget);
return Block.newBuilder()
.setHeader(buildHeader(params.digestAlgorithm(), reconfigure, head.hash, head.height().add(1),
lastCheckpoint.height(), lastCheckpoint.hash, lastViewChange.height(),
Expand Down Expand Up @@ -294,11 +299,17 @@ public String logState() {
params.member().getId());
}

public void setDiadem(Digest diadem) {
log.info("Set diadem: {} on: {}", diadem, params.member().getId());
this.diadem.set(diadem);
}

public void start() {
if (!started.compareAndSet(false, true)) {
return;
}
log.info("CHOAM startup, majority: {} on: {}", params.majority(), params.member().getId());
log.info("CHOAM startup: {} diadem: {}, majority: {} on: {}", params.context().getId(), diadem.get(),
params.majority(), params.member().getId());
combine.start(params.producer().gossipDuration());
transitions.fsm().enterStartState();
transitions.start();
Expand Down Expand Up @@ -463,33 +474,40 @@ public Block checkpoint() {
return CHOAM.this.checkpoint();
}

@Override
public Digest diadem() {
return diadem.get();
}

@Override
public Block genesis(Map<Member, Join> joining, Digest nextViewId, HashedBlock previous) {
final HashedCertifiedBlock cp = checkpoint.get();
final HashedCertifiedBlock v = view.get();
return CHOAM.genesis(nextViewId, joining, previous, params.context(), v, params, cp,
var current = diadem.get();
log.info("Create genesis: {}", diadem.get());
return CHOAM.genesis(nextViewId, current, joining, previous, params.context(), v, params, cp,
params.genesisData().apply(joining));
}

@Override
public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) {
public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) {
final HashedCertifiedBlock v = view.get();
return Block.newBuilder()
.setHeader(
buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(),
buildHeader(params.digestAlgorithm(), executions, prev, height, checkpoint.height(),
checkpoint.hash, v.height(), v.hash))
.setAssemble(assemble)
.setExecutions(executions)
.build();
}

@Override
public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) {
public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) {
final HashedCertifiedBlock v = view.get();
return Block.newBuilder()
.setHeader(
buildHeader(params.digestAlgorithm(), executions, prev, height, checkpoint.height(),
buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(),
checkpoint.hash, v.height(), v.hash))
.setExecutions(executions)
.setAssemble(assemble)
.build();
}

Expand All @@ -504,7 +522,8 @@ public void publish(CertifiedBlock cb) {
public Block reconfigure(Map<Member, Join> joining, Digest nextViewId, HashedBlock previous,
HashedBlock checkpoint) {
final HashedCertifiedBlock v = view.get();
return CHOAM.reconfigure(nextViewId, joining, previous, params.context(), v, params, checkpoint);
return CHOAM.reconfigure(nextViewId, diadem.get(), joining, previous, params.context(), v, params,
checkpoint);
}
};
}
Expand Down Expand Up @@ -612,9 +631,10 @@ private void nextView() {
log.error("Unable to generate and sign consensus key on: {}", params.member().getId());
return;
}
log.trace("Generated next view consensus key: {} sig: {} on: {}",
var current = diadem.get();
log.trace("Generated next view consensus key: {} sig: {} diadem: {} on: {}",
params.digestAlgorithm().digest(pubKey.getEncoded()),
params.digestAlgorithm().digest(signed.toSig().toByteString()), params.member().getId());
params.digestAlgorithm().digest(signed.toSig().toByteString()), current, params.member().getId());
next.set(new nextView(ViewMember.newBuilder()
.setId(params.member().getId().toDigeste())
.setConsensusKey(pubKey)
Expand All @@ -631,7 +651,13 @@ private void process() {
case ASSEMBLE: {
params.processor().beginBlock(h.height(), h.hash);
nextViewId.set(Digest.from(h.block.getAssemble().getNextView()));
log.info("Next view id: {} on: {}", nextViewId.get(), params.member().getId());
var ass = Digest.from(h.block.getAssemble().getDiadem());
var current = diadem.getAndSet(ass);
if (!current.equals(ass)) {
log.info("Next view id: {} diadem: {} does match current: {} on: {}", nextViewId.get(), ass, current,
params.member().getId());
}
log.info("Next view id: {} diadem: {} on: {}", nextViewId.get(), diadem.get(), params.member().getId());
c.assembled();
break;
}
Expand Down Expand Up @@ -935,12 +961,14 @@ private void synchronizedProcess(CertifiedBlock certifiedBlock) {
public interface BlockProducer {
Block checkpoint();

Block genesis(Map<Member, Join> joining, Digest nextViewId, HashedBlock previous);
Digest diadem();

Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint);
Block genesis(Map<Member, Join> joining, Digest nextViewId, HashedBlock previous);

Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint);

Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint);

void publish(CertifiedBlock cb);

Block reconfigure(Map<Member, Join> joining, Digest nextViewId, HashedBlock previous, HashedBlock checkpoint);
Expand Down Expand Up @@ -1129,7 +1157,7 @@ public ViewMember join(Digest nextView, Digest from) {
log.debug("Joining view: {} from: {} view member: {} on: {}", nextView, from,
ViewContext.print(c.member, params.digestAlgorithm()), params.member().getId());
}
return c.member;
return ViewMember.newBuilder(c.member).setDiadem(diadem.get().toDigeste()).build();
}

@Override
Expand All @@ -1144,6 +1172,14 @@ public Parameters params() {

@Override
public SubmitResult submitTxn(Transaction transaction) {
if (!servers.hasNext()) {
log.trace("Failed submitting txn: {} no servers available in: {} on: {}",
hashOf(transaction, params.digestAlgorithm()), viewId, params.member().getId());
return SubmitResult.newBuilder()
.setResult(Result.ERROR_SUBMITTING)
.setErrorMsg("no servers available")
.build();
}
Member target = servers.next();
try (var link = submissionComm.connect(target)) {
if (link == null) {
Expand Down Expand Up @@ -1268,11 +1304,12 @@ public ViewMember join(Digest nextView, Digest from) {
return ViewMember.getDefaultInstance();
}
final var c = next.get();
var cd = diadem.get();
if (log.isDebugEnabled()) {
log.debug("Joining view: {} from: {} view member: {} on: {}", nextView, from,
log.debug("Joining view: {} diadem: {} from: {} view member: {} on: {}", nextView, cd, from,
ViewContext.print(c.member, params.digestAlgorithm()), params.member().getId());
}
return c.member;
return ViewMember.newBuilder(c.member).setDiadem(cd.toDigeste()).build();
}

@Override
Expand Down
16 changes: 11 additions & 5 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,14 @@ private void produceAssemble() {
final var vlb = previousBlock.get();
nextViewId = vlb.hash;
nextAssembly.addAll(Committee.viewMembersOf(nextViewId, params().context()));
var diadem = view.diadem();
log.debug("Assembling: {} diadem: {} on: {}", nextViewId, diadem, params().member().getId());
final var assemble = new HashedBlock(params().digestAlgorithm(), view.produce(vlb.height().add(1), vlb.hash,
Assemble.newBuilder()
.setNextView(
vlb.hash.toDigeste())
.setDiadem(
diadem.toDigeste())
.build(),
checkpoint.get()));
previousBlock.set(assemble);
Expand All @@ -255,8 +259,9 @@ private void produceAssemble() {
pending.put(assemble.hash, p);
p.witnesses.put(params().member(), validation);
ds.offer(validation);
log.debug("View assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash,
assemble.height(), assemble.block.getBodyCase(), getViewId(), params().member().getId());
log.debug("View assembly: {} diadem: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, diadem,
assemble.hash, assemble.height(), assemble.block.getBodyCase(), getViewId(),
params().member().getId());
}

private void publish(PendingBlock p) {
Expand Down Expand Up @@ -368,13 +373,14 @@ public void produceAssemble() {

@Override
public void reconfigure() {
log.debug("Starting view reconfiguration for: {} on: {}", nextViewId, params().member().getId());
log.debug("Starting view reconfiguration: {} diadem: {} on: {}", nextViewId, view.diadem(),
params().member().getId());
assembly.set(new ViewAssembly(nextViewId, view, Producer.this::addReassemble, comms) {
@Override
public void complete() {
super.complete();
log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(),
params().member().getId());
log.debug("View reconfiguration: {} diadem: {} gathered: {} complete on: {}", nextViewId,
view.diadem(), getSlate().size(), params().member().getId());
assembled.set(true);
Producer.this.transitions.viewComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,13 @@ private Reassemble join(ViewMember vm) {
}
return null;
}

final var hex = Digest.from(vm.getDiadem());
var diadem = view.diadem();
if (!diadem.equals(hex)) {
log.warn("Invalid diadem: {} not equivalent to: {} vm: {} on: {}", hex, diadem,
ViewContext.print(vm, params().digestAlgorithm()), params().member().getId());
return null;
}
PubKey encoded = vm.getConsensusKey();

if (!m.verify(signature(vm.getSignature()), encoded.toByteString())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public Context<Member> context() {
return context;
}

public Digest diadem() {
return blockProducer == null ? DigestAlgorithm.DEFAULT.getLast() : blockProducer.diadem();
}

public Validate generateValidation(HashedBlock block) {
log.trace("Signing block: {} height: {} on: {}", block.hash, block.height(), params.member().getId());
JohnHancock signature = signer.sign(block.block.getHeader().toByteString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ private void checkpointCompletion(int threshold, Initial mostRecent) {
if (!cps.validate(diadem, Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()))) {
throw new IllegalStateException("Cannot validate checkpoint: " + checkpoint.height());
}
log.info("Restored checkpoint: {} diadem: {} on: {}", checkpoint.height(), diadem, params.member().getId());
log.info("Restored checkpoint: {} diadem: {} on: {}", checkpoint.height(), diadem.compact(),
params.member().getId());
checkpointState = cps;
});
// reconstruct chain to genesis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
*/
package com.salesforce.apollo.choam;

import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.cryptography.proto.PubKey;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
Expand All @@ -18,10 +16,12 @@
import com.salesforce.apollo.choam.comm.Terminal;
import com.salesforce.apollo.choam.comm.TerminalClient;
import com.salesforce.apollo.choam.comm.TerminalServer;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.choam.support.HashedBlock;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.cryptography.Signer;
import com.salesforce.apollo.cryptography.proto.PubKey;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.ContextImpl;
import com.salesforce.apollo.membership.Member;
Expand Down Expand Up @@ -136,10 +136,15 @@ public Block checkpoint() {
return null;
}

@Override
public Digest diadem() {
return DigestAlgorithm.DEFAULT.getLast();
}

@Override
public Block genesis(Map<Member, Join> joining, Digest nextViewId, HashedBlock previous) {
return CHOAM.genesis(viewId, joining, previous, committee, previous, built, previous,
Collections.emptyList());
return CHOAM.genesis(viewId, DigestAlgorithm.DEFAULT.getLast(), joining, previous, committee,
previous, built, previous, Collections.emptyList());
}

@Override
Expand Down
Loading

0 comments on commit d67dec0

Please sign in to comment.