Skip to content

Commit

Permalink
misc-4 (#203)
Browse files Browse the repository at this point in the history
* fix npe when no member found

* V4 of checkout

* log publish as info

* fix bad bug whereby pending blocks would be removed before fully certified.

Yi

Fix Test subjects in lifecycle tests to not generate genesis.

* use iteration count, rather than relying on currentIndex to advance

* correct iteration count :)

* periodically emit last block produced if no pre no preBlock published

* squelch publishing logging on Producer end

* force beacon publishing

* beacon only emitted based on empty preblock count

* reduce beacon publishing logging

* reduce beacon publishing logging

* cleanup deps, update grpc/proto

* better smoke testing

* better smoke testing

* use vthread exec for smoke

* default repos

* run on mac M1?

* nope

* salesforce-Ubuntu

* revert

* config tweaking
  • Loading branch information
Hellblazer authored May 27, 2024
1 parent e3405aa commit 04a0afa
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 69 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/dependency-review.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Source repository: https://github.com/actions/dependency-review-action
# Public documentation: https://docs.github.com/en/code-security/supply-chain-security/understanding-your-software-supply-chain/about-dependency-review#dependency-review-enforcement
name: 'Dependency Review'
on: [pull_request]
on: [ pull_request ]

permissions:
contents: read
Expand All @@ -15,6 +15,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: 'Checkout Repository'
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: 'Dependency Review'
uses: actions/dependency-review-action@v2
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: graalvm/setup-graalvm@v1
with:
java-version: '22'
Expand Down
24 changes: 15 additions & 9 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ public CHOAM(Parameters params) {

rotateViewKeys();
var bContext = new DelegatedContext<>(params.context());
var adapter = new MessageAdapter(_ -> true, this::signatureHash,
_ -> Collections.emptyList(),
(_, any) -> any, AgedMessageOrBuilder::getContent);
var adapter = new MessageAdapter(_ -> true, this::signatureHash, _ -> Collections.emptyList(), (_, any) -> any,
AgedMessageOrBuilder::getContent);

combine = new ReliableBroadcaster(bContext, params.member(), params.combine(), params.communications(),
params.metrics() == null ? null : params.metrics().getCombineMetrics(),
Expand Down Expand Up @@ -235,6 +234,7 @@ public static Block reconfigure(Digest nextViewId, Map<Digest, Join> joins, Hash
public static Map<Digest, Member> rosterMap(Context<Member> baseContext, Collection<Digest> members) {
return members.stream()
.map(baseContext::getMember)
.filter(m -> m != null)
.collect(Collectors.toMap(Member::getId, Function.identity()));
}

Expand Down Expand Up @@ -542,11 +542,17 @@ public Block produce(ULong height, Digest prev, Executions executions, HashedBlo
}

@Override
public void publish(Digest hash, CertifiedBlock cb) {
log.trace("Publishing: {} hash: {} height: {} certifications: {} on: {}", cb.getBlock().getBodyCase(),
hash, ULong.valueOf(cb.getBlock().getHeader().getHeight()), cb.getCertificationsCount(),
params.member().getId());
combine.publish(cb, true);
public void publish(Digest hash, CertifiedBlock cb, boolean beacon) {
if (beacon) {
log.trace("Publishing beacon: {} hash: {} height: {} certifications: {} on: {}",
cb.getBlock().getBodyCase(), hash, ULong.valueOf(cb.getBlock().getHeader().getHeight()),
cb.getCertificationsCount(), params.member().getId());
} else {
log.info("Publishing: {} hash: {} height: {} certifications: {} on: {}",
cb.getBlock().getBodyCase(), hash, ULong.valueOf(cb.getBlock().getHeader().getHeight()),
cb.getCertificationsCount(), params.member().getId());
}
combine.publish(cb, !beacon);
}

@Override
Expand Down Expand Up @@ -1022,7 +1028,7 @@ public interface BlockProducer {

Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint);

void publish(Digest hash, CertifiedBlock cb);
void publish(Digest hash, CertifiedBlock cb, boolean beacon);

Block reconfigure(Map<Digest, Join> joining, Digest nextViewId, HashedBlock previous, HashedBlock checkpoint);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void publish() {
.sorted(Comparator.comparing(e -> e.getKey().getId()))
.map(Map.Entry::getValue)
.forEach(v -> b.addCertifications(v.getWitness()));
view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), b.build()));
view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), b.build()), false);
controller.completeIt();
log.info("Genesis block: {} published with {} witnesses for: {} on: {}", reconfiguration.hash, witnesses.size(),
view.context().getId(), params().member().getId());
Expand Down
30 changes: 21 additions & 9 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class Producer {
private final Semaphore serialize = new Semaphore(1);
private final ViewAssembly assembly;
private final int maxEpoch;
private volatile int preblocks = 0;
private volatile int emptyPreBlocks = 0;
private volatile boolean assembled = false;

public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label) {
Expand Down Expand Up @@ -240,11 +240,12 @@ private void processAssemblies(List<UnitData> aggregate) {
}

private void processPendingValidations(HashedBlock block, PendingBlock p) {
var pending = pendingValidations.remove(block.hash);
var pending = pendingValidations.get(block.hash);
if (pending != null) {
pending.forEach(v -> validate(v, p, block.hash));
if (p.witnesses.size() >= params().majority()) {
publish(p);
pendingValidations.remove(block.hash);
}
}
}
Expand All @@ -254,12 +255,14 @@ private void processTransactions(boolean last, List<UnitData> aggregate) {
final var txns = aggregate.stream().flatMap(e -> e.getTransactionsList().stream()).toList();

if (txns.isEmpty()) {
if (preblocks % 5 == 0) {
var empty = emptyPreBlocks + 1;
emptyPreBlocks = empty;
if (empty % 5 == 0) {
pending.values()
.stream()
.filter(pb -> pb.published.get())
.max(Comparator.comparing(pb -> pb.block.height()))
.ifPresent(this::publish);
.ifPresent(pb -> publish(pb, true));
}
return;
}
Expand Down Expand Up @@ -306,16 +309,26 @@ private void produceAssemble(ViewAssembly.Vue v) {
}

private void publish(PendingBlock p) {
this.publish(p, false);
}

private void publish(PendingBlock p, boolean beacon) {
assert p.witnesses.size() >= params().majority() : "Publishing non majority block";
log.debug("Published pending: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(),
p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId());
p.published.set(true);
var publish = p.published.compareAndSet(false, true);
if (!publish && !beacon) {
log.trace("Already published: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(),
p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId());
return;
}
log.trace("Publishing {}pending: {} hash: {} height: {} witnesses: {} on: {}", beacon ? "(beacon) " : "",
p.block.block.getBodyCase(), p.block.hash, p.block.height(), p.witnesses.values().size(),
params().member().getId());
final var cb = CertifiedBlock.newBuilder()
.setBlock(p.block.block)
.addAllCertifications(
p.witnesses.values().stream().map(Validate::getWitness).toList())
.build();
view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), cb));
view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), cb), beacon);
}

private void reconfigure() {
Expand Down Expand Up @@ -352,7 +365,6 @@ private void serial(List<ByteString> preblock, Boolean last) {
return;
}
try {
preblocks++;
transitions.create(preblock, last);
} catch (Throwable t) {
log.error("Error processing preblock last: {} on: {}", last, params().member().getId(), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock c
return blockProducer.produce(height, prev, assemble, checkpoint);
}

public void publish(HashedCertifiedBlock block) {
blockProducer.publish(block.hash, block.certifiedBlock);
public void publish(HashedCertifiedBlock block, boolean beacon) {
blockProducer.publish(block.hash, block.certifiedBlock, beacon);
}

public Block reconfigure(Map<Digest, Join> aggregate, Digest nextViewId, HashedBlock lastBlock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Block produce(ULong height, Digest prev, Executions executions, HashedBlo
}

@Override
public void publish(Digest hash, CertifiedBlock cb) {
public void publish(Digest hash, CertifiedBlock cb, boolean beacon) {
complete.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ protected Gossip gossip(Fireflies link, int ring) {
.setRing(ring)
.setGossip(commonDigests())
.build());
log.info("gossiping with: {} on: {}", link.getMember().getId(), node.getId());
log.trace("gossiping with: {} on: {}", link.getMember().getId(), node.getId());
try {
return link.gossip(gossip);
} catch (Throwable e) {
Expand Down Expand Up @@ -1930,7 +1930,7 @@ public Gossip rumors(SayWhat request, Digest from) {
final var digests = request.getGossip();
if (!successor.equals(node)) {
g = redirectTo(member, ring, successor, digests);
log.info("Redirected: {} on: {}", member.getId(), node.getId());
log.debug("Redirected: {} on: {}", member.getId(), node.getId());
} else {
g = Gossip.newBuilder()
.setNotes(processNotes(from, BloomFilter.from(digests.getNoteBff()), params.fpr()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Li
}
ServerBuilder<?> serverBuilder = InProcessServerBuilder.forName(name)
.executor(Executors.newVirtualThreadPerTaskExecutor())
.scheduledExecutorService(
Executors.newScheduledThreadPool(100, Thread.ofVirtual()
.factory()))
.intercept(ConcurrencyLimitServerInterceptor.newBuilder(
limitsBuilder.build())
.statusSupplier(
Expand All @@ -102,6 +105,7 @@ private ManagedChannel connectTo(Member to) {
final var name = String.format(NAME_TEMPLATE, prefix, qb64(to.getId()));
final InProcessChannelBuilder builder = InProcessChannelBuilder.forName(name)
.executor(executor)
.offloadExecutor(executor)
.usePlaintext()
.intercept(clientInterceptor);
disableTrash(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, Cl
Limiter<GrpcClientRequestContext> limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build();
channel = NettyChannelBuilder.forAddress(address)
.executor(executor)
.offloadExecutor(executor)
.withOption(ChannelOption.TCP_NODELAY, true)
.sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3))
.intercept(new ConcurrencyLimitClientInterceptor(limiter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class RingIterator<T extends Member, Comm extends Link> extends RingCommu
private final ScheduledExecutorService scheduler;
private volatile boolean majorityFailed = false;
private volatile boolean majoritySucceed = false;
private volatile int iteration = 0;

public RingIterator(Duration frequency, Context<T> context, SigningMember member,
CommonCommunications<Comm, ?> comm, boolean ignoreSelf, ScheduledExecutorService scheduler) {
Expand Down Expand Up @@ -88,7 +89,7 @@ public <Q> void iterate(Digest digest, BiFunction<Comm, Integer, Q> round, Resul
}

public int iteration() {
return currentIndex + 1;
return iteration;
}

@Override
Expand All @@ -103,7 +104,10 @@ private <Q> void internalIterate(Digest digest, Runnable onMajority, BiFunction<

Runnable proceed = () -> internalIterate(digest, onMajority, round, failedMajority, handler, onComplete, tally,
traversed);
boolean completed = currentIndex == context.getRingCount() - 1;
iteration++;
final var cur = iteration;
var ringCount = context.getRingCount();
boolean completed = cur > ringCount;

Consumer<Boolean> allowed = allow -> proceed(digest, allow, onMajority, failedMajority, tally, completed,
onComplete);
Expand Down
Loading

0 comments on commit 04a0afa

Please sign in to comment.