Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc 5 #204

Merged
merged 19 commits into from
Jun 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on: [ push ]

jobs:
build:

runs-on: ubuntu-latest

steps:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
**/.classpath
**/.DS_Store
**/bin/
**/.run/
/bin/
*.csv
**/dependency-reduced-pom.xml
Expand Down
27 changes: 12 additions & 15 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,6 @@ private SubmitResult submit(Transaction request, Digest from) {
}

private Initial sync(Synchronize request, Digest from) {
if (from == null) {
return Initial.getDefaultInstance();
}
final HashedCertifiedBlock g = genesis.get();
if (g != null) {
Initial.Builder initial = Initial.newBuilder();
Expand All @@ -897,13 +894,10 @@ private Initial sync(Synchronize request, Digest from) {
}
final ULong lastReconfig = ULong.valueOf(cp.block.getHeader().getLastReconfig());
HashedCertifiedBlock lastView = null;
if (lastReconfig.equals(ULong.valueOf(0))) {
lastView = cp;
} else {
var stored = store.getCertifiedBlock(lastReconfig);
if (stored != null) {
lastView = new HashedCertifiedBlock(params.digestAlgorithm(), stored);
}

var stored = store.getCertifiedBlock(lastReconfig);
if (stored != null) {
lastView = new HashedCertifiedBlock(params.digestAlgorithm(), stored);
}
if (lastView == null) {
lastView = g;
Expand Down Expand Up @@ -1120,7 +1114,8 @@ public record PendingView(Digest diadem, Context<Member> context) {
*/
public View getView(Digest hash) {
var builder = View.newBuilder().setDiadem(diadem.toDigeste()).setMajority(context.majority());
Committee.viewMembersOf(hash, context).forEach(d -> builder.addCommittee(d.getId().toDigeste()));
((Context<? super Member>) context).bftSubset(hash).forEach(
d -> builder.addCommittee(d.getId().toDigeste()));
return builder.build();
}
}
Expand All @@ -1134,8 +1129,8 @@ public class Combiner implements Combine {
public void anchor() {
HashedCertifiedBlock anchor = pending.poll();
var pending = pendingViews.last().context;
if (anchor != null && pending.totalCount() >= pending.majority()) {
log.info("Synchronizing from anchor: {} cardinality: {} on: {}", anchor.hash, pending.totalCount(),
if (anchor != null && pending.size() >= pending.majority()) {
log.info("Synchronizing from anchor: {} cardinality: {} on: {}", anchor.hash, pending.size(),
params.member().getId());
transitions.bootstrap(anchor);
}
Expand Down Expand Up @@ -1181,9 +1176,10 @@ public void awaitSynchronization() {
synchronizationFailed();
} catch (IllegalStateException e) {
final var c = current.get();
Context<Member> memberContext = context();
log.debug(
"Synchronization quorum formation failed: {}, members: {} desired: {} required: {}, no anchor to recover from: {} on: {}",
e.getMessage(), context().totalCount(), context().getRingCount(), params.majority(),
e.getMessage(), memberContext.size(), context().getRingCount(), params.majority(),
c == null ? "<no formation>" : c.getClass().getSimpleName(), params.member().getId());
awaitSynchronization();
}
Expand Down Expand Up @@ -1227,7 +1223,8 @@ public void rotateViewKeys() {

private void synchronizationFailed() {
cancelSynchronization();
var activeCount = context().totalCount();
Context<Member> memberContext = context();
var activeCount = memberContext.size();
var majority = params.majority();
if (params.generateGenesis() && activeCount >= majority) {
if (current.get() == null && current.compareAndSet(null, new Formation())) {
Expand Down
14 changes: 1 addition & 13 deletions choam/src/main/java/com/salesforce/apollo/choam/Committee.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.grpc.StatusRuntimeException;
import org.slf4j.Logger;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -63,23 +62,12 @@ static Map<Member, Verifier> validatorsOf(Reconfigure reconfigure, Context<Membe
* Create a view based on the cut of the supplied hash across the rings of the base context
*/
static Context<Member> viewFor(Digest hash, Context<? super Member> baseContext) {
Set<Member> successors = viewMembersOf(hash, baseContext);
Set<Member> successors = (Set<Member>) baseContext.bftSubset(hash);
var newView = new StaticContext<>(hash, baseContext.getProbabilityByzantine(), 3, successors,
baseContext.getEpsilon(), successors.size());
return newView;
}

static Set<Member> viewMembersOf(Digest hash, Context<? super Member> baseContext) {
Set<Member> successors = new HashSet<>();
baseContext.successors(hash, m -> {
if (successors.size() == baseContext.getRingCount()) {
return false;
}
return successors.add(m);
});
return successors;
}

void accept(HashedCertifiedBlock next);

default void assemble(Assemble assemble) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.salesforce.apollo.choam.support.HashedCertifiedBlock;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock.NullBlock;
import com.salesforce.apollo.choam.support.OneShot;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.StaticContext;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.proto.PubKey;
Expand Down Expand Up @@ -64,9 +65,9 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
String label) {
view = vc;
ds = new OneShot();
nextAssembly = Committee.viewMembersOf(view.context().getId(), view.pendingViews().last().context())
.stream()
.collect(Collectors.toMap(Member::getId, m -> m));
Digest hash = view.context().getId();
nextAssembly = ((Set<Member>) ((Context<? super Member>) view.pendingViews().last().context()).bftSubset(
hash)).stream().collect(Collectors.toMap(Member::getId, m -> m));
if (!Dag.validate(nextAssembly.size())) {
throw new IllegalStateException("Invalid BFT cardinality: " + nextAssembly.size());
}
Expand Down Expand Up @@ -96,7 +97,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId());
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(),
transitions::process, transitions::nextEpoch, label);
coordinator = new ChRbcGossip(reContext, params().member(), controller.processor(), params().communications(),
coordinator = new ChRbcGossip(reContext.getId(), params().member(), nextAssembly.values(),
controller.processor(), params().communications(),
params().metrics() == null ? null : params().metrics().getGensisMetrics());
log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(),
reContext.getId(), nextAssembly.keySet(), params().member().getId());
Expand Down
18 changes: 9 additions & 9 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
producerParams.batchInterval(), producerParams.maxBatchCount(),
params().drainPolicy().build());

log.info("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch,
params.member().getId());
log.debug("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch,
params.member().getId());

var fsm = Fsm.construct(new DriveIn(), Transitions.class, Earner.INITIAL, true);
fsm.setName("Producer%s on: %s".formatted(getViewId(), params.member().getId()));
Expand All @@ -99,10 +99,10 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash

config.setLabel("Producer" + getViewId() + " on: " + params().member().getId());
var producerMetrics = params().metrics() == null ? null : params().metrics().getProducerMetrics();
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds,
(preblock, last) -> serial(preblock, last), this::newEpoch, label);
coordinator = new ChRbcGossip(view.context(), params().member(), controller.processor(),
params().communications(), producerMetrics);
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, this::serial,
this::newEpoch, label);
coordinator = new ChRbcGossip(view.context().getId(), params().member(), view.membership(),
controller.processor(), params().communications(), producerMetrics);
log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId());

var onConsensus = new CompletableFuture<ViewAssembly.Vue>();
Expand Down Expand Up @@ -343,9 +343,9 @@ private void reconfigure() {
pending.put(reconfiguration.hash, p);
p.witnesses.put(params().member(), validation);
ds.offer(validation);
log.info("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(),
reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(),
params().member().getId());
log.trace("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(),
reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(),
params().member().getId());
processPendingValidations(reconfiguration, p);

log.trace("Draining on: {}", params().member().getId());
Expand Down
4 changes: 2 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public static <T> CompletableFuture<T> retryNesting(Supplier<CompletableFuture<T
CompletableFuture<T> cf = supplier.get();
for (int i = 0; i < maxRetries; i++) {
final var attempt = i;
cf = cf.thenApply(CompletableFuture::completedFuture).exceptionally(__ -> {
log.trace("resubmitting after attempt: {}", attempt + 1);
cf = cf.thenApply(CompletableFuture::completedFuture).exceptionally(e -> {
log.info("resubmitting after attempt: {} exception: {}", attempt + 1, e.toString());
return supplier.get();
}).thenCompose(java.util.function.Function.identity());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void assemble(List<Assemblies> asses) {
.toList();
var views = asses.stream().flatMap(a -> a.getViewsList().stream()).filter(SignedViews::hasViews).toList();

log.info("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId());
log.debug("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId());

joins.forEach(sj -> join(sj.getJoin(), false));
if (selected != null) {
Expand All @@ -124,7 +124,7 @@ void assemble(List<Assemblies> asses) {
Digest.from(svs.getViews().getMember()), params().member().getId());
viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews());
} else {
log.info("Invalid views: {} from: {} on: {}",
log.warn("Invalid views: {} from: {} on: {}",
svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
Digest.from(svs.getViews().getMember()), params().member().getId());
}
Expand All @@ -140,14 +140,14 @@ void assemble(List<Assemblies> asses) {

boolean complete() {
if (selected == null) {
log.info("Cannot complete view assembly: {} as selected is null on: {}", nextViewId,
params().member().getId());
log.error("Cannot complete view assembly: {} as selected is null on: {}", nextViewId,
params().member().getId());
transitions.failed();
return false;
}
if (proposals.size() < selected.majority) {
log.info("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId,
proposals.keySet().stream().sorted().toList(), selected.majority, params().member().getId());
log.error("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId,
proposals.keySet().stream().sorted().toList(), selected.majority, params().member().getId());
transitions.failed();
return false;
}
Expand Down Expand Up @@ -290,9 +290,9 @@ private void propose() {
.setMember(params().member().getId().toDigeste())
.setVid(nextViewId.toDigeste())
.build();
log.info("Proposing for: {} views: {} on: {}", nextViewId,
views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
params().member().getId());
log.debug("Proposing for: {} views: {} on: {}", nextViewId,
views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
params().member().getId());
publisher.accept(Assemblies.newBuilder()
.addViews(
SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey;
Expand Down Expand Up @@ -135,6 +136,10 @@ public Signer getSigner() {
return signer;
}

public Set<Member> membership() {
return validators.keySet();
}

/**
* The process has failed
*/
Expand Down Expand Up @@ -244,7 +249,7 @@ public boolean validate(SignedViews sv) {
Verifier v = verifierOf(sv);
if (v == null) {
if (log.isDebugEnabled()) {
log.debug("no verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()),
log.debug("No verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()),
params.member().getId());
}
return false;
Expand Down
Loading
Loading