Skip to content

Commit

Permalink
better logging. bootstrap sampling via successor cuts of ring 0 itera…
Browse files Browse the repository at this point in the history
…tion.

checkpoint assembly gossip of reconfiguration committee members only.

Some general tidying up
  • Loading branch information
Hellblazer committed May 27, 2024
1 parent 04a0afa commit 975d644
Show file tree
Hide file tree
Showing 19 changed files with 415 additions and 143 deletions.
14 changes: 4 additions & 10 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,6 @@ private SubmitResult submit(Transaction request, Digest from) {
}

private Initial sync(Synchronize request, Digest from) {
if (from == null) {
return Initial.getDefaultInstance();
}
final HashedCertifiedBlock g = genesis.get();
if (g != null) {
Initial.Builder initial = Initial.newBuilder();
Expand All @@ -897,13 +894,10 @@ private Initial sync(Synchronize request, Digest from) {
}
final ULong lastReconfig = ULong.valueOf(cp.block.getHeader().getLastReconfig());
HashedCertifiedBlock lastView = null;
if (lastReconfig.equals(ULong.valueOf(0))) {
lastView = cp;
} else {
var stored = store.getCertifiedBlock(lastReconfig);
if (stored != null) {
lastView = new HashedCertifiedBlock(params.digestAlgorithm(), stored);
}

var stored = store.getCertifiedBlock(lastReconfig);
if (stored != null) {
lastView = new HashedCertifiedBlock(params.digestAlgorithm(), stored);
}
if (lastView == null) {
lastView = g;
Expand Down
10 changes: 5 additions & 5 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash
producerParams.batchInterval(), producerParams.maxBatchCount(),
params().drainPolicy().build());

log.info("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch,
params.member().getId());
log.debug("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch,
params.member().getId());

var fsm = Fsm.construct(new DriveIn(), Transitions.class, Earner.INITIAL, true);
fsm.setName("Producer%s on: %s".formatted(getViewId(), params.member().getId()));
Expand Down Expand Up @@ -343,9 +343,9 @@ private void reconfigure() {
pending.put(reconfiguration.hash, p);
p.witnesses.put(params().member(), validation);
ds.offer(validation);
log.info("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(),
reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(),
params().member().getId());
log.trace("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(),
reconfiguration.hash, reconfiguration.height(), slate.keySet().stream().sorted().toList(),
params().member().getId());
processPendingValidations(reconfiguration, p);

log.trace("Draining on: {}", params().member().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void assemble(List<Assemblies> asses) {
.toList();
var views = asses.stream().flatMap(a -> a.getViewsList().stream()).filter(SignedViews::hasViews).toList();

log.info("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId());
log.debug("Assembling joins: {} views: {} on: {}", joins.size(), views.size(), params().member().getId());

joins.forEach(sj -> join(sj.getJoin(), false));
if (selected != null) {
Expand All @@ -124,7 +124,7 @@ void assemble(List<Assemblies> asses) {
Digest.from(svs.getViews().getMember()), params().member().getId());
viewProposals.put(Digest.from(svs.getViews().getMember()), svs.getViews());
} else {
log.info("Invalid views: {} from: {} on: {}",
log.warn("Invalid views: {} from: {} on: {}",
svs.getViews().getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
Digest.from(svs.getViews().getMember()), params().member().getId());
}
Expand All @@ -140,14 +140,14 @@ void assemble(List<Assemblies> asses) {

boolean complete() {
if (selected == null) {
log.info("Cannot complete view assembly: {} as selected is null on: {}", nextViewId,
params().member().getId());
log.error("Cannot complete view assembly: {} as selected is null on: {}", nextViewId,
params().member().getId());
transitions.failed();
return false;
}
if (proposals.size() < selected.majority) {
log.info("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId,
proposals.keySet().stream().sorted().toList(), selected.majority, params().member().getId());
log.error("Cannot complete view assembly: {} proposed: {} required: {} on: {}", nextViewId,
proposals.keySet().stream().sorted().toList(), selected.majority, params().member().getId());
transitions.failed();
return false;
}
Expand Down Expand Up @@ -290,9 +290,9 @@ private void propose() {
.setMember(params().member().getId().toDigeste())
.setVid(nextViewId.toDigeste())
.build();
log.info("Proposing for: {} views: {} on: {}", nextViewId,
views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
params().member().getId());
log.debug("Proposing for: {} views: {} on: {}", nextViewId,
views.getViewsList().stream().map(v -> Digest.from(v.getDiadem())).toList(),
params().member().getId());
publisher.accept(Assemblies.newBuilder()
.addViews(
SignedViews.newBuilder().setViews(views).setSignature(view.sign(views).toSig()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,16 @@ public boolean validate(SignedViews sv) {
Verifier v = verifierOf(sv);
if (v == null) {
if (log.isDebugEnabled()) {
log.debug("no verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()),
params.member().getId());
log.warn("No verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()),
params.member().getId());
}
return false;
}
var validated = v.verify(JohnHancock.from(sv.getSignature()), sv.getViews().toByteString());
if (!validated) {
if (log.isTraceEnabled()) {
log.trace("Cannot validate views signed by: {} on: {}", Digest.from(sv.getViews().getMember()),
params().member().getId());
log.warn("Cannot validate views signed by: {} on: {}", Digest.from(sv.getViews().getMember()),
params().member().getId());
}
} else if (log.isTraceEnabled()) {
log.trace("Validated views signed by: {} on: {}", Digest.from(sv.getViews().getMember()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,31 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
* @author hal.hildebrand
*/
public class Bootstrapper {
private static final Logger log = LoggerFactory.getLogger(
Bootstrapper.class);
private final HashedCertifiedBlock anchor;
private final CompletableFuture<Boolean> anchorSynchronized = new CompletableFuture<>();
private final CommonCommunications<Terminal, Concierge> comms;
private final ULong lastCheckpoint;
private final Parameters params;
private final Store store;
private final CompletableFuture<SynchronizedState> sync = new CompletableFuture<>();
private final CompletableFuture<Boolean> viewChainSynchronized = new CompletableFuture<>();
private final ScheduledExecutorService scheduler;
private volatile HashedCertifiedBlock checkpoint;
private volatile CompletableFuture<CheckpointState> checkpointAssembled;
private volatile CheckpointState checkpointState;
private volatile HashedCertifiedBlock checkpointView;
private volatile HashedCertifiedBlock genesis;
private static final Logger log = LoggerFactory.getLogger(Bootstrapper.class);

private final HashedCertifiedBlock anchor;
private final CompletableFuture<Boolean> anchorSynchronized = new CompletableFuture<>();
private final CommonCommunications<Terminal, Concierge> comms;
private final ULong lastCheckpoint;
private final Parameters params;
private final Store store;
private final CompletableFuture<SynchronizedState> sync = new CompletableFuture<>();
private final CompletableFuture<Boolean> viewChainSynchronized = new CompletableFuture<>();
private final ScheduledExecutorService scheduler;
private final AtomicInteger sampleIndex = new AtomicInteger();
private volatile HashedCertifiedBlock checkpoint;
private volatile CompletableFuture<CheckpointState> checkpointAssembled;
private volatile CheckpointState checkpointState;
private volatile HashedCertifiedBlock checkpointView;
private volatile HashedCertifiedBlock genesis;

public Bootstrapper(HashedCertifiedBlock anchor, Parameters params, Store store,
CommonCommunications<Terminal, Concierge> bootstrapComm) {
Expand Down Expand Up @@ -122,8 +124,16 @@ private void checkpointCompletion(int threshold, Initial mostRecent) {
log.info("Assembling from checkpoint: {}:{} crown: {} last cp: {} on: {}", checkpoint.height(), checkpoint.hash,
crown.compactWrapped(), Digest.from(checkpoint.block.getHeader().getLastCheckpointHash()),
params.member().getId());

CheckpointAssembler assembler = new CheckpointAssembler(params.gossipDuration(), checkpoint.height(),
var committee = checkpointView.certifiedBlock.getBlock()
.getReconfigure()
.getJoinsList()
.stream()
.map(j -> j.getMember().getVm().getId())
.map(Digest::from)
.map(d -> params.context().getMember(d))
.filter(m -> m != null)
.toList();
CheckpointAssembler assembler = new CheckpointAssembler(committee, params.gossipDuration(), checkpoint.height(),
checkpoint.block.getCheckpoint(), params.member(),
store, comms, params.context(), threshold,
params.digestAlgorithm());
Expand Down Expand Up @@ -366,11 +376,21 @@ private void sample() {
}
HashMap<Digest, Initial> votes = new HashMap<>();
Synchronize s = Synchronize.newBuilder().setHeight(anchor.height().longValue()).build();
final var randomCut = params.digestAlgorithm().random();
new RingIterator<>(params.gossipDuration(), params.context(), params.member(), comms, true, scheduler).iterate(
randomCut, (link, ring) -> synchronize(s, link),
(tally, futureSailor, destination) -> synchronize(futureSailor, votes, destination),
t -> computeGenesis(votes));
var si = sampleIndex.getAndIncrement();
var member = params.context().getMember(si, Entropy.nextBitsStreamInt(params.context().getRingCount()));
if (member == null) {
log.warn("No members: {} to sample on: {}", params.context().size(), params.member().getId());
computeGenesis(votes);
return;
}
var randomCut = member.getId();
log.info("Random cut: {} on: {}", randomCut, params.member().getId());
var iterator = new RingIterator<Member, Terminal>(params.gossipDuration(), params.context(), params.member(),
comms, true, scheduler);
iterator.allowDuplicates();
iterator.iterate(randomCut, (link, _) -> synchronize(s, link),
(_, futureSailor, destination) -> synchronize(futureSailor, votes, destination),
t -> computeGenesis(votes));
}

private void scheduleAnchorCompletion(AtomicReference<ULong> start, ULong anchorTo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.salesforce.apollo.cryptography.HexBloom;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.ring.RingIterator;
import com.salesforce.apollo.ring.SliceIterator;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
import org.h2.mvstore.MVMap;
Expand All @@ -28,6 +28,8 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -51,10 +53,12 @@ public class CheckpointAssembler {
private final SigningMember member;
private final MVMap<Integer, byte[]> state;
private final HexBloom diadem;
private final List<Member> committee;

public CheckpointAssembler(Duration frequency, ULong height, Checkpoint checkpoint, SigningMember member,
Store store, CommonCommunications<Terminal, Concierge> comms, Context<Member> context,
double falsePositiveRate, DigestAlgorithm digestAlgorithm) {
public CheckpointAssembler(List<Member> committee, Duration frequency, ULong height, Checkpoint checkpoint,
SigningMember member, Store store, CommonCommunications<Terminal, Concierge> comms,
Context<Member> context, double falsePositiveRate, DigestAlgorithm digestAlgorithm) {
this.committee = new ArrayList<>(committee);
this.height = height;
this.member = member;
this.checkpoint = checkpoint;
Expand Down Expand Up @@ -112,11 +116,19 @@ private void gossip(ScheduledExecutorService scheduler, Duration duration) {
}
log.info("Assembly of checkpoint: {} segments: {} crown: {} on: {}", height, checkpoint.getCount(),
diadem.compactWrapped(), member.getId());
var ringer = new RingIterator<>(frequency, context, member, comms, true, scheduler);
ringer.iterate(digestAlgorithm.random(), (link, ring) -> gossip(link),
(tally, result, destination) -> gossip(result), t -> scheduler.schedule(
() -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(scheduler, duration), log)), duration.toMillis(),
TimeUnit.MILLISECONDS));

var ringer = new SliceIterator<>("Assembly[%s:%s]".formatted(diadem.compactWrapped(), member.getId()), member,
committee, comms);
ringer.iterate((link, m) -> {
log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId());
return gossip(link);
}, (result, link, m) -> gossip(result), () -> {
if (!assembled.isDone()) {
scheduler.schedule(
() -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(scheduler, duration), log)),
duration.toMillis(), TimeUnit.MILLISECONDS);
}
}, duration);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,18 @@ public CheckpointSegments answer(InvocationOnMock invocation) throws Throwable {
return CheckpointSegments.newBuilder().addAllSegments(fetched).build();
}
});
when(client.getMember()).then(new Answer<>() {
@Override
public Member answer(InvocationOnMock invocation) {
return members.get(1);
}
});
@SuppressWarnings("unchecked")
CommonCommunications<Terminal, Concierge> comm = mock(CommonCommunications.class);
when(comm.connect(any())).thenReturn(client);

Store store2 = new Store(DigestAlgorithm.DEFAULT, new MVStore.Builder().open());
CheckpointAssembler boot = new CheckpointAssembler(Duration.ofMillis(10), ULong.valueOf(0), checkpoint,
CheckpointAssembler boot = new CheckpointAssembler(members, Duration.ofMillis(10), ULong.valueOf(0), checkpoint,
bootstrapping, store2, comm, context, 0.00125,
DigestAlgorithm.DEFAULT);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
Expand Down
6 changes: 5 additions & 1 deletion choam/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.ring.SliceIterator" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.membership.messaging.rbc" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
Expand All @@ -33,7 +37,7 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.choam.CHOAM" level="info" additivity="false">
<logger name="com.salesforce.apollo.choam" level="info" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

Expand Down
Loading

0 comments on commit 975d644

Please sign in to comment.