Skip to content

Commit

Permalink
interim.
Browse files Browse the repository at this point in the history
Invert join protocol. Future contacts past. Revive ASSEMBLE block.

Dear lord.
  • Loading branch information
Hellblazer committed Apr 23, 2024
1 parent 66a9729 commit d97f389
Show file tree
Hide file tree
Showing 16 changed files with 461 additions and 436 deletions.
154 changes: 132 additions & 22 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.chiralbehaviors.tron.Fsm;
import com.google.common.base.Function;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
Expand Down Expand Up @@ -61,6 +62,8 @@
import static com.salesforce.apollo.choam.support.HashedBlock.height;
import static com.salesforce.apollo.cryptography.QualifiedBase64.bs;
import static com.salesforce.apollo.cryptography.QualifiedBase64.digest;
import static io.grpc.Status.FAILED_PRECONDITION;
import static io.grpc.Status.INVALID_ARGUMENT;

/**
* Combine Honnete Ober Advancer Mercantiles.
Expand Down Expand Up @@ -94,6 +97,7 @@ public class CHOAM {
private final TransSubmission txnSubmission = new TransSubmission();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private final PendingViews pendingViews = new PendingViews();
private final AtomicReference<CompletableFuture<Void>> join = new AtomicReference<>();

public CHOAM(Parameters params) {
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
Expand Down Expand Up @@ -246,6 +250,17 @@ public static List<Transaction> toGenesisData(List<? extends Message> initializa
.toList();
}

private static Block assembly(AtomicReference<Digest> nextViewId, View view, HashedBlock head,
HashedBlock lastViewChange, Parameters params, HashedBlock lastCheckpoint) {
var body = Assemble.newBuilder().setView(view).build();
return Block.newBuilder()
.setHeader(
buildHeader(params.digestAlgorithm(), body, head.hash, ULong.valueOf(0), lastCheckpoint.height(),
lastCheckpoint.hash, lastViewChange.height(), lastViewChange.hash))
.setAssemble(body)
.build();
}

public boolean active() {
final var c = current.get();
HashedCertifiedBlock h = head.get();
Expand Down Expand Up @@ -464,7 +479,6 @@ private void combine(Msg m) {

private BlockProducer constructBlock() {
return new BlockProducer() {

@Override
public Block checkpoint() {
return CHOAM.this.checkpoint();
Expand Down Expand Up @@ -497,6 +511,17 @@ public void onFailure() {
transitions.fail();
}

@Override
public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) {
final HashedCertifiedBlock v = view.get();
return Block.newBuilder()
.setHeader(
buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(),
checkpoint.hash, v.height(), v.hash))
.setAssemble(assemble)
.build();
}

@Override
public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) {
final HashedCertifiedBlock v = view.get();
Expand Down Expand Up @@ -609,18 +634,16 @@ private boolean isNext(HashedBlock next) {
return isNext;
}

private SignedViewMember join(Digest nextView, Digest from) {
final var c = next.get();
var inView = ViewMember.newBuilder(c.member).setView(nextView.toDigeste()).build();

if (log.isDebugEnabled()) {
log.debug("Joining view: {} from: {} view member: {} on: {}", nextView, from,
ViewContext.print(inView, params.digestAlgorithm()), params.member().getId());
private Empty join(SignedViewMember nextView, Digest from) {
var c = current.get();
if (c == null) {
log.trace("No committee for: {} to join: {} diadem: {} on: {}", from,
Digest.from(nextView.getVm().getView()), Digest.from(nextView.getVm().getDiadem()),
params.member().getId());
throw new StatusRuntimeException(FAILED_PRECONDITION);
}
return SignedViewMember.newBuilder()
.setVm(inView)
.setSignature(params.member().sign(inView.toByteString()).toSig())
.build();
c.join(nextView, from);
return Empty.getDefaultInstance();
}

private Supplier<PendingViews> pendingViews() {
Expand All @@ -645,6 +668,11 @@ private void process() {
reconfigure(h.hash, h.block.getGenesis().getInitialView());
break;
}
case ASSEMBLE: {
params.processor().beginBlock(h.height(), h.hash);
c.assemble(h.block.getAssemble());
break;
}
case EXECUTIONS: {
params.processor().beginBlock(h.height(), h.hash);
execute(h.block.getExecutions().getExecutionsList());
Expand All @@ -666,6 +694,10 @@ private void process() {

private void reconfigure(Digest hash, Reconfigure reconfigure) {
log.info("Setting next view id: {} on: {}", hash, params.member().getId());
var j = join.getAndSet(null);
if (j != null) {
j.cancel(true);
}
nextViewId.set(hash);
var pv = pendingViews.advance();
if (pv != null) {
Expand All @@ -683,8 +715,9 @@ private void reconfigure(Digest hash, Reconfigure reconfigure) {
if (Dag.validate(validators.size())) {
current.set(new Associate(h, validators, currentView));
} else {
log.warn("Reconfiguration to associate failed: {} in view: {} on:{}", validators.size(),
new Digest(reconfigure.getId()), params.member().getId());
log.warn("Reconfiguration to associate failed: {} committee: {} in view: {} on:{}", validators.size(),
new Digest(reconfigure.getId()), current.get().getClass().getSimpleName(),
params.member().getId());
transitions.fail();
}
} else {
Expand Down Expand Up @@ -745,8 +778,8 @@ private void restore() throws IllegalStateException {
view.set(lastView);
var validators = validatorsOf(reconfigure, params.context(), params.member().getId(), log);
current.set(new Synchronizer(validators));
log.info("Reconfigured to checkpoint view: {} on: {}", new Digest(reconfigure.getId()),
params.member().getId());
log.info("Reconfigured to checkpoint view: {} committee: {} on: {}", new Digest(reconfigure.getId()),
current.get().getClass().getSimpleName(), params.member().getId());
}

log.info("Restored to: {} lastView: {} lastCheckpoint: {} lastBlock: {} on: {}", geni.hash, view.get().hash,
Expand Down Expand Up @@ -980,6 +1013,8 @@ public interface BlockProducer {

void onFailure();

Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint);

Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint);

void publish(Digest hash, CertifiedBlock cb);
Expand Down Expand Up @@ -1073,7 +1108,7 @@ public record PendingView(Digest diadem, Context<Member> context) {
* @return the View determined by this Context and the supplied hash value
*/
public View getView(Digest hash) {
var builder = View.newBuilder().setDiadem(diadem.toDigeste());
var builder = View.newBuilder().setDiadem(diadem.toDigeste()).setMajority(context.majority());
Committee.viewMembersOf(hash, context).forEach(d -> builder.addCommittee(d.getId().toDigeste()));
return builder.build();
}
Expand Down Expand Up @@ -1163,9 +1198,9 @@ public void fail() {

@Override
public void recover(HashedCertifiedBlock anchor) {
log.info("Anchor discovered: {} hash: {} height: {} on: {}", anchor.block.getBodyCase(), anchor.hash,
anchor.height(), params.member().getId());
current.set(new Formation());
log.info("Anchor discovered: {} hash: {} height: {} committee: {} on: {}", anchor.block.getBodyCase(),
anchor.hash, anchor.height(), current.get().getClass().getSimpleName(), params.member().getId());
CHOAM.this.recover(anchor);
}

Expand Down Expand Up @@ -1220,8 +1255,9 @@ public Blocks fetchViewChain(BlockReplication request, Digest from) {
}

@Override
public SignedViewMember join(Digest nextView, Digest from) {
return CHOAM.this.join(nextView, from);
public Empty join(SignedViewMember nextView, Digest from) {
CHOAM.this.join(nextView, from);
return Empty.getDefaultInstance();
}

@Override
Expand All @@ -1248,6 +1284,19 @@ public void accept(HashedCertifiedBlock hb) {
process();
}

@Override
public void assemble(Assemble assemble) {
var mid = params.member().getId();
var view = assemble.getView();
if (view.getCommitteeList().stream().map(Digest::from).noneMatch(mid::equals)) {
log.info("Assemble view: {}; Not associate: {} in diadem: {} on: {}", viewId,
getClass().getSimpleName(), Digest.from(view.getDiadem()), mid);
return;
}
log.info("Assemble view: {}; Associate in diadem: {} on: {}", viewId, Digest.from(view.getDiadem()), mid);
join(view);
}

@Override
public void complete() {
}
Expand Down Expand Up @@ -1314,6 +1363,56 @@ public SubmitResult submitTxn(Transaction transaction) {
public boolean validate(HashedCertifiedBlock hb) {
return validate(hb, validators);
}

private void join(View view) {
var joining = new CompletableFuture<Void>();
if (!join.compareAndSet(null, joining)) {
log.info("Ongoing join of: {} should have been cancelled on: {}", Digest.from(view.getDiadem()),
params.member().getId());
transitions.fail();
return;
}
var servers = new GroupIterator(validators.keySet());
var joined = new HashSet<Member>();
Thread.ofVirtual().start(Utils.wrapped(() -> {
while (!joining.isDone() && joined.size() < view.getMajority() && servers.hasNext()) {
Member target = servers.next();
try (var link = comm.connect(target)) {
if (link == null) {
log.debug("No link for: {} for joining: {} on: {}", target.getId(),
Digest.from(view.getDiadem()), params.member().getId());
continue;
}
log.trace("Joining view: {} diadem: {} on: {}", viewId, Digest.from(view.getDiadem()),
params.member().getId());
final var c = next.get();
var inView = ViewMember.newBuilder(c.member)
.setDiadem(view.getDiadem())
.setView(nextViewId.get().toDigeste())
.build();
var svm = SignedViewMember.newBuilder()
.setVm(inView)
.setSignature(params.member().sign(inView.toByteString()).toSig())
.build();
try {
link.join(svm);
joined.add(target);
} catch (Throwable t) {
log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target, nextViewId,
Digest.from(view.getDiadem()), params.member().getId(), t);
}
} catch (StatusRuntimeException e) {
log.trace("Failed join attempt with: {} view: {} diadem: {} status:{} on: {}", target,
nextViewId, Digest.from(view.getDiadem()), e.getStatus(), params.member().getId());
} catch (Throwable e) {
log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target, nextViewId,
Digest.from(view.getDiadem()), params.member().getId(), e);
}
}
joining.complete(null);
log.info("Finishing join of: {} on: {}", Digest.from(view.getDiadem()), params.member().getId());
}, log));
}
}

/** a member of the current committee */
Expand All @@ -1335,7 +1434,7 @@ private class Associate extends Administration {
var pv = pendingViews();
producer = new Producer(nextViewId.get(),
new ViewContext(context, params, pv, signer, validators, constructBlock()),
head.get(), checkpoint.get(), comm, getLabel());
head.get(), checkpoint.get(), getLabel());
producer.start();
}

Expand All @@ -1344,6 +1443,17 @@ public void complete() {
producer.stop();
}

@Override
public void join(SignedViewMember nextView, Digest from) {
if (!from.equals(Digest.from(nextView.getVm().getId()))) {
log.trace("Join from: {} does not match {} from join: {} diadem: {} on: {}", from,
Digest.from(nextView.getVm().getId()), Digest.from(nextView.getVm().getView()),
Digest.from(nextView.getVm().getDiadem()), params.member().getId());
throw new StatusRuntimeException(INVALID_ARGUMENT);
}
producer.join(nextView);
}

@Override
public SubmitResult submit(Transaction request) {
return producer.submit(request);
Expand Down
20 changes: 14 additions & 6 deletions choam/src/main/java/com/salesforce/apollo/choam/Committee.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@
*/
package com.salesforce.apollo.choam;

import com.salesforce.apollo.choam.proto.Certification;
import com.salesforce.apollo.choam.proto.Reconfigure;
import com.salesforce.apollo.choam.proto.SubmitResult;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.choam.proto.SubmitResult.Result;
import com.salesforce.apollo.choam.proto.Transaction;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.StaticContext;
Expand All @@ -20,14 +17,17 @@
import com.salesforce.apollo.cryptography.Verifier.DefaultVerifier;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.MockMember;
import io.grpc.StatusRuntimeException;
import org.slf4j.Logger;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey;
import static io.grpc.Status.ABORTED;

/**
* @author hal.hildebrand
Expand Down Expand Up @@ -82,10 +82,17 @@ static Set<Member> viewMembersOf(Digest hash, Context<? super Member> baseContex

void accept(HashedCertifiedBlock next);

default void assemble(Assemble assemble) {
}

void complete();

boolean isMember();

default void join(SignedViewMember nextView, Digest from) {
throw new StatusRuntimeException(ABORTED);
}

Logger log();

void nextView(Digest diadem, Context<Member> pendingView);
Expand Down Expand Up @@ -153,17 +160,18 @@ default boolean validate(HashedCertifiedBlock hb, Map<Member, Verifier> validato
}
}
final int toleranceLevel = params.context().toleranceLevel();
log().trace("Validate: {} height: {} count: {} needed: {} on: {}}", hb.hash, hb.height(), valid, toleranceLevel,
log().trace("Validate: {} height: {} count: {} needed: {} on: {}", hb.hash, hb.height(), valid, toleranceLevel,
params.member().getId());
return valid > toleranceLevel;
}

default boolean validateRegeneration(HashedCertifiedBlock hb) {
if (!hb.block.hasGenesis()) {
if (!Objects.requireNonNull(hb.block).hasGenesis()) {
return false;
}
var reconfigure = hb.block.getGenesis().getInitialView();
var validators = validatorsOf(reconfigure, params().context(), params().member().getId(), log());
return !validators.isEmpty() && validate(hb, validators);
}

}
Loading

0 comments on commit d97f389

Please sign in to comment.