Skip to content

Commit

Permalink
interim.
Browse files Browse the repository at this point in the history
moar
  • Loading branch information
Hellblazer committed Apr 27, 2024
1 parent d97f389 commit ace496a
Show file tree
Hide file tree
Showing 14 changed files with 341 additions and 216 deletions.
108 changes: 72 additions & 36 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.MessageAdapter;
import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.Msg;
import com.salesforce.apollo.messaging.proto.AgedMessageOrBuilder;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
import io.grpc.StatusRuntimeException;
import org.h2.mvstore.MVMap;
Expand All @@ -47,6 +48,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyPair;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1105,7 +1107,7 @@ public record PendingView(Digest diadem, Context<Member> context) {
*
* @param hash - the "cut" across the rings of the context, determining the successors and thus the committee
* members of the view
* @return the View determined by this Context and the supplied hash value
* @return the Vue determined by this Context and the supplied hash value
*/
public View getView(Digest hash) {
var builder = View.newBuilder().setDiadem(diadem.toDigeste()).setMajority(context.majority());
Expand Down Expand Up @@ -1326,6 +1328,11 @@ public Parameters params() {

@Override
public SubmitResult submitTxn(Transaction transaction) {
if (!started.get()) {
log.trace("Failed submitting txn: {} no servers available in: {} on: {}",
hashOf(transaction, params.digestAlgorithm()), viewId, params.member().getId());
return SubmitResult.newBuilder().setResult(Result.ERROR_SUBMITTING).setErrorMsg("Shutdown").build();
}
if (!servers.hasNext()) {
log.trace("Failed submitting txn: {} no servers available in: {} on: {}",
hashOf(transaction, params.digestAlgorithm()), viewId, params.member().getId());
Expand Down Expand Up @@ -1365,6 +1372,8 @@ public boolean validate(HashedCertifiedBlock hb) {
}

private void join(View view) {
log.info("Joining view: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
var joining = new CompletableFuture<Void>();
if (!join.compareAndSet(null, joining)) {
log.info("Ongoing join of: {} should have been cancelled on: {}", Digest.from(view.getDiadem()),
Expand All @@ -1374,44 +1383,71 @@ private void join(View view) {
}
var servers = new GroupIterator(validators.keySet());
var joined = new HashSet<Member>();
Thread.ofVirtual().start(Utils.wrapped(() -> {
while (!joining.isDone() && joined.size() < view.getMajority() && servers.hasNext()) {
Member target = servers.next();
try (var link = comm.connect(target)) {
if (link == null) {
log.debug("No link for: {} for joining: {} on: {}", target.getId(),
Digest.from(view.getDiadem()), params.member().getId());
continue;
}
log.trace("Joining view: {} diadem: {} on: {}", viewId, Digest.from(view.getDiadem()),
params.member().getId());
final var c = next.get();
var inView = ViewMember.newBuilder(c.member)
.setDiadem(view.getDiadem())
.setView(nextViewId.get().toDigeste())
.build();
var svm = SignedViewMember.newBuilder()
.setVm(inView)
.setSignature(params.member().sign(inView.toByteString()).toSig())
.build();
try {
link.join(svm);
joined.add(target);
} catch (Throwable t) {
log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target, nextViewId,
Digest.from(view.getDiadem()), params.member().getId(), t);
}
} catch (StatusRuntimeException e) {
log.trace("Failed join attempt with: {} view: {} diadem: {} status:{} on: {}", target,
nextViewId, Digest.from(view.getDiadem()), e.getStatus(), params.member().getId());
} catch (Throwable e) {
log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target, nextViewId,
Digest.from(view.getDiadem()), params.member().getId(), e);

var delay = Duration.ofMillis(Entropy.nextSecureInt(100));

Thread.ofVirtual().start(() -> {
log.error("Starting join of: {} diadem {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
while (!joining.isDone() && joined.size() < view.getMajority()) {
try {
Thread.sleep(delay.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
join(view, servers, joined);
}
log.info("Finishing join of: {} diadem: {} on: {}", nextViewId.get(), Digest.from(view.getDiadem()),
params.member().getId());
joining.complete(null);
log.info("Finishing join of: {} on: {}", Digest.from(view.getDiadem()), params.member().getId());
}, log));
});
}

private void join(View view, GroupIterator servers, HashSet<Member> joined) {
Member target = servers.next();
if (joined.contains(target)) {
log.trace("Already joined with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId.get(),
Digest.from(view.getDiadem()), params.member().getId());
return;
}
try (var link = comm.connect(target)) {
join(view, link, target, joined);
} catch (StatusRuntimeException e) {
log.trace("Failed join attempt with: {} view: {} diadem: {} status:{} on: {}", target.getId(),
nextViewId, Digest.from(view.getDiadem()), e.getStatus(), params.member().getId());
} catch (Throwable e) {
log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId,
Digest.from(view.getDiadem()), params.member().getId(), e);
}
}

private void join(View view, Terminal link, Member target, HashSet<Member> joined) {
if (link == null) {
log.debug("No link for: {} for joining: {} on: {}", target.getId(), Digest.from(view.getDiadem()),
params.member().getId());
return;
}
log.trace("Joining view: {} diadem: {} on: {}", viewId, Digest.from(view.getDiadem()),
params.member().getId());
final var c = next.get();
var inView = ViewMember.newBuilder(c.member)
.setDiadem(view.getDiadem())
.setView(nextViewId.get().toDigeste())
.build();
var svm = SignedViewMember.newBuilder()
.setVm(inView)
.setSignature(params.member().sign(inView.toByteString()).toSig())
.build();
try {
link.join(svm);
joined.add(target);
log.trace("Joined with: {} view: {} diadem: {} on: {}", target.getId(), viewId,
Digest.from(view.getDiadem()), params.member().getId());
} catch (Throwable t) {
log.trace("Failed join attempt with: {} view: {} diadem: {} on: {}", target.getId(), nextViewId,
Digest.from(view.getDiadem()), params.member().getId(), t);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ default void assemble(Assemble assemble) {
boolean isMember();

default void join(SignedViewMember nextView, Digest from) {
log().trace("Error joining by: {} view: {} diadem: {} invalid committee: {} on: {}", from,
Digest.from(nextView.getVm().getView()), Digest.from(nextView.getVm().getView()),
this.getClass().getSimpleName(), params().member().getId());
throw new StatusRuntimeException(ABORTED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public void stop() {
private void certify(Validate v) {
if (reconfiguration == null) {
pendingValidations.add(v);
return;
}
log.trace("Validating reconfiguration block: {} height: {} on: {}", reconfiguration.hash,
reconfiguration.height(), params().member().getId());
Expand Down
46 changes: 22 additions & 24 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -48,7 +51,6 @@ public class Producer {
private final ChRbcGossip coordinator;
private final TxDataSource ds;
private final Map<Digest, PendingBlock> pending = new ConcurrentSkipListMap<>();
private final List<Assemblies> pendingAssemblies = new CopyOnWriteArrayList<>();
private final Map<Digest, List<Validate>> pendingValidations = new ConcurrentSkipListMap<>();
private final AtomicReference<HashedBlock> previousBlock = new AtomicReference<>();
private final AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -105,7 +107,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
params().communications(), producerMetrics);
log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId());

var onConsensus = new CompletableFuture<ViewAssembly.View>();
var onConsensus = new CompletableFuture<ViewAssembly.Vue>();
onConsensus.whenComplete((v, throwable) -> {
if (throwable == null) {
produceAssemble(v);
Expand All @@ -115,11 +117,14 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
});
assembly = new ViewAssembly(nextViewId, view, Producer.this::addAssembly, onConsensus) {
@Override
public void complete() {
super.complete();
log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(),
params().member().getId());
assembled = true;
public boolean complete() {
if (super.complete()) {
log.debug("Vue reconfiguration: {} gathered: {} complete on: {}", nextViewId,
getSlate().keySet().stream().sorted().toList(), params().member().getId());
assembled = true;
return true;
}
return false;
}
};
}
Expand Down Expand Up @@ -212,10 +217,13 @@ private Digest getViewId() {

private void newEpoch(Integer epoch) {
log.trace("new epoch: {} on: {}", epoch, params().member().getId());
assembly.newEpoch();
var last = epoch >= maxEpoch && assembled;
if (last) {
controller.completeIt();
Producer.this.transitions.viewComplete();
} else {
ds.reset();
}
transitions.newEpoch(epoch, last);
}
Expand All @@ -225,17 +233,10 @@ private Parameters params() {
}

private void processAssemblies(List<UnitData> aggregate) {
var joins = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList();
final var ass = assembly;
if (ass != null) {
log.trace("Consuming {} units, {} assemblies on: {}", aggregate.size(), joins.size(),
params().member().getId());
ass.inbound().accept(joins);
} else {
log.trace("Pending {} units, {} assemblies on: {}", aggregate.size(), joins.size(),
params().member().getId());
pendingAssemblies.addAll(joins);
}
var aggs = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList();
log.trace("Consuming {} assemblies from {} units on: {}", aggregate.size(), aggs.size(),
params().member().getId());
assembly.assemble(aggs);
}

private void processPendingValidations(HashedBlock block, PendingBlock p) {
Expand Down Expand Up @@ -277,7 +278,7 @@ private void processTransactions(boolean last, List<UnitData> aggregate) {
}
}

private void produceAssemble(ViewAssembly.View v) {
private void produceAssemble(ViewAssembly.Vue v) {
final var vlb = previousBlock.get();
var ass = Assemble.newBuilder()
.setView(View.newBuilder()
Expand All @@ -294,7 +295,7 @@ private void produceAssemble(ViewAssembly.View v) {
pending.put(assemble.hash, p);
p.witnesses.put(params().member(), validation);
ds.offer(validation);
log.debug("View assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash,
log.debug("Vue assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash,
assemble.height(), assemble.block.getBodyCase(), getViewId(), params().member().getId());
transitions.assembled();
}
Expand Down Expand Up @@ -385,10 +386,7 @@ private class DriveIn implements Driven {
public void assemble() {
log.debug("Starting view diadem consensus for: {} on: {}", nextViewId, params().member().getId());
startProduction();
var joins = new ArrayList<>(pendingAssemblies);
pendingAssemblies.clear();
assembly.start();
assembly.inbound().accept(joins);
}

@Override
Expand Down
Loading

0 comments on commit ace496a

Please sign in to comment.