From 867de1a72f6dde66057eeff39360006ab9f9854d Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Wed, 3 Jul 2024 19:32:45 -0700 Subject: [PATCH] subtle --- .../com/salesforce/apollo/choam/CHOAM.java | 5 +- .../com/salesforce/apollo/choam/Producer.java | 17 ++-- .../apollo/choam/support/OneShot.java | 36 ------- .../apollo/choam/support/TxDataSource.java | 93 ++++++------------- choam/src/test/resources/logback-test.xml | 4 - .../salesforce/apollo/ethereal/Ethereal.java | 2 +- .../messaging/rbc/ReliableBroadcaster.java | 66 +++---------- sql-state/src/test/resources/logback-test.xml | 4 - 8 files changed, 54 insertions(+), 173 deletions(-) delete mode 100644 choam/src/main/java/com/salesforce/apollo/choam/support/OneShot.java diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index e17faf827..2a587e0af 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -320,6 +320,7 @@ public String logState() { public void rotateViewKeys(ViewChange viewChange) { var context = viewChange.context(); var diadem = viewChange.diadem(); + log.trace("Setting RBC Context to: {} on: {}", context, params.member().getId()); ((DelegatedContext) combine.getContext()).setContext(context); var c = current.get(); if (c != null) { @@ -331,10 +332,6 @@ public void rotateViewKeys(ViewChange viewChange) { pendingViews.clear(); pendingViews.add(diadem, context); } - - log.info("Pushing pending view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(), - params.member().getId()); - pendingViews.add(diadem, context); } public void start() { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index def7fd907..f5f71d546 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -32,6 +32,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** @@ -57,6 +58,8 @@ public class Producer { private final ViewAssembly assembly; private final int maxEpoch; private final AtomicBoolean assembled = new AtomicBoolean(false); + private final AtomicInteger epoch = new AtomicInteger(-1); + private final AtomicInteger preblocks = new AtomicInteger(); public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label, ScheduledExecutorService scheduler) { @@ -203,9 +206,10 @@ private List aggregate(List preblock) { } private void create(List preblock, boolean last) { + var count = preblocks.incrementAndGet(); if (log.isDebugEnabled()) { - log.debug("emit last: {} preblock: {} on: {}", last, - preblock.stream().map(DigestAlgorithm.DEFAULT::digest).toList(), params().member().getId()); + log.debug("emit #{} epoch: {} hashes: {} last: {} on: {}", count, epoch, + preblock.stream().map(DigestAlgorithm.DEFAULT::digest).toList(), last, params().member().getId()); } var aggregate = aggregate(preblock); processAssemblies(aggregate); @@ -220,18 +224,19 @@ private Digest getViewId() { return view.context().getId(); } - private void newEpoch(Integer epoch) { + private void newEpoch(Integer e) { serialize.execute(Utils.wrapped(() -> { - log.trace("new epoch: {} on: {}", epoch, params().member().getId()); + this.epoch.set(e); + log.trace("new epoch: {} preblocks: {} on: {}", e, preblocks.get(), params().member().getId()); assembly.newEpoch(); - var last = epoch >= maxEpoch && assembled.get(); + var last = e >= maxEpoch && assembled.get(); if (last) { controller.completeIt(); Producer.this.transitions.viewComplete(); } else { ds.reset(); } - transitions.newEpoch(epoch, last); + transitions.newEpoch(e, last); }, log)); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/OneShot.java b/choam/src/main/java/com/salesforce/apollo/choam/support/OneShot.java deleted file mode 100644 index 635104c4d..000000000 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/OneShot.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2021, salesforce.com, inc. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ -package com.salesforce.apollo.choam.support; - -import com.google.protobuf.ByteString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Phaser; -import java.util.function.Supplier; - -public class OneShot implements Supplier { - private static final Logger log = LoggerFactory.getLogger(OneShot.class); - - private final Phaser phaser = new Phaser(1); - private volatile ByteString value; - - @Override - public ByteString get() { - phaser.register(); - final var current = value; - log.trace("providing value: " + (current == null ? "null" : String.valueOf(current.size()))); - value = null; - return current == null ? ByteString.EMPTY : current; - } - - public void setValue(ByteString value) { - log.trace("resetting value: " + (value == null ? "null" : String.valueOf(value.size()))); - this.value = value; - phaser.arriveAndDeregister(); - } -} diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java index de81a7712..670f98ac3 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java @@ -17,7 +17,6 @@ import org.slf4j.LoggerFactory; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -36,14 +35,13 @@ public class TxDataSource implements DataSource { private final static Logger log = LoggerFactory.getLogger(TxDataSource.class); - private final Duration batchInterval; - private final AtomicBoolean draining = new AtomicBoolean(); - private final Member member; - private final ChoamMetrics metrics; - private final BatchingQueue processing; - private final BlockingQueue assemblies = new LinkedBlockingQueue<>(); - private final BlockingQueue validations = new LinkedBlockingQueue<>(); - private volatile Thread blockingThread; + private final Duration batchInterval; + private final AtomicBoolean draining = new AtomicBoolean(); + private final Member member; + private final ChoamMetrics metrics; + private final BatchingQueue processing; + private final BlockingQueue assemblies = new LinkedBlockingQueue<>(); + private final BlockingQueue validations = new LinkedBlockingQueue<>(); public TxDataSource(Member member, int maxElements, ChoamMetrics metrics, int maxBatchByteSize, Duration batchInterval, int maxBatchCount) { @@ -55,11 +53,6 @@ public TxDataSource(Member member, int maxElements, ChoamMetrics metrics, int ma } public void close() { - final var current = blockingThread; - if (current != null) { - current.interrupt(); - } - blockingThread = null; if (metrics != null) { metrics.dropped(processing.size(), validations.size(), assemblies.size()); } @@ -76,38 +69,15 @@ public void drain() { @Override public ByteString getData() { var builder = UnitData.newBuilder(); - log.trace("Requesting unit data on: {}", member.getId()); - blockingThread = Thread.currentThread(); - try { - var r = new ArrayList(); - var v = new ArrayList(); - - if (draining.get()) { - var target = Instant.now().plus(batchInterval); - while (target.isAfter(Instant.now()) && builder.getAssembliesCount() == 0 - && builder.getValidationsCount() == 0) { - // rinse and repeat - r = new ArrayList<>(); - assemblies.drainTo(r); - builder.addAllAssemblies(r); - - v = new ArrayList(); - validations.drainTo(v); - builder.addAllValidations(v); - - if (builder.getAssembliesCount() != 0 || builder.getValidationsCount() != 0) { - break; - } + var r = new ArrayList(); + assemblies.drainTo(r); + builder.addAllAssemblies(r); - // sleep waiting for input - try { - Thread.sleep(batchInterval.dividedBy(2).toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return ByteString.EMPTY; - } - } - } else { + var v = new ArrayList(); + validations.drainTo(v); + builder.addAllValidations(v); + if (!draining.get()) { + if (processing.size() > 0 || (validations.isEmpty() || assemblies.isEmpty())) { try { var batch = processing.take(batchInterval); if (batch != null) { @@ -118,28 +88,17 @@ public ByteString getData() { return ByteString.EMPTY; } } + } - // One more time into ye breech - r = new ArrayList<>(); - assemblies.drainTo(r); - builder.addAllAssemblies(r); - - v = new ArrayList(); - validations.drainTo(v); - builder.addAllValidations(v); - - ByteString bs = builder.build().toByteString(); - if (metrics != null) { - metrics.publishedBatch(builder.getTransactionsCount(), bs.size(), builder.getValidationsCount(), - builder.getAssembliesCount()); - } - log.trace("Unit data: {} txns, {} validations, {} assemblies totalling: {} bytes on: {}", - builder.getTransactionsCount(), builder.getValidationsCount(), builder.getAssembliesCount(), - bs.size(), member.getId()); - return bs; - } finally { - blockingThread = null; + ByteString bs = builder.build().toByteString(); + if (metrics != null) { + metrics.publishedBatch(builder.getTransactionsCount(), bs.size(), builder.getValidationsCount(), + builder.getAssembliesCount()); } + log.trace("Unit data: {} txns, {} validations, {} assemblies totalling: {} bytes on: {}", + builder.getTransactionsCount(), builder.getValidationsCount(), builder.getAssembliesCount(), + bs.size(), member.getId()); + return bs; } public int getRemainingReassemblies() { @@ -166,8 +125,8 @@ public boolean offer(Transaction txn) { } } - public void offer(Validate generateValidation) { - validations.offer(generateValidation); + public boolean offer(Validate generateValidation) { + return validations.offer(generateValidation); } public void reset() { diff --git a/choam/src/test/resources/logback-test.xml b/choam/src/test/resources/logback-test.xml index d5be6a052..3263880c3 100644 --- a/choam/src/test/resources/logback-test.xml +++ b/choam/src/test/resources/logback-test.xml @@ -53,10 +53,6 @@ - - - - diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java index cbead8839..406462dfc 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java @@ -316,7 +316,7 @@ private void insert(Unit unit) { var ep = retrieveEpoch(unit); if (ep != null) { ep.adder().produce(unit); - log.debug("Produced: {} on: {}", unit, config.logLabel()); + log.debug("Produced: {} {}", unit, config.logLabel()); } else { log.trace("Unable to retrieve epic for Unit creator: {} epoch: {} height: {} level: {} on: {}", unit.creator(), unit.epoch(), unit.height(), unit.level(), config.logLabel()); diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index ee0455574..5521c80e1 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -15,7 +15,6 @@ import com.salesforce.apollo.archipelago.server.FernetServerInterceptor; import com.salesforce.apollo.bloomFilters.BloomFilter; import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; -import com.salesforce.apollo.bloomFilters.BloomWindow; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; @@ -298,7 +297,7 @@ private void oneRound(Duration duration, ScheduledExecutorService scheduler) { } var timer = metrics == null ? null : metrics.gossipRoundDuration().time(); - gossiper.execute((link, ring) -> gossipRound(link, ring), + gossiper.execute(this::gossipRound, (futureSailor, destination) -> handle(futureSailor, destination, duration, scheduler, timer)); } @@ -319,24 +318,20 @@ public record MessageAdapter(Predicate verifier, Function source, ByteString content, Digest hash) { } - public record Parameters(int bufferSize, int maxMessages, DigestAlgorithm digestAlgorithm, double falsePositiveRate, - int dedupBufferSize, double dedupFpr) { + public record Parameters(int bufferSize, int maxMessages, DigestAlgorithm digestAlgorithm, + double falsePositiveRate) { public static Parameters.Builder newBuilder() { return new Builder(); } public static class Builder implements Cloneable { - private int bufferSize = 1500; - private int dedupBufferSize = 100; - private double dedupFpr = Math.pow(10, -6); - private int deliveredCacheSize = 100; - private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT; - private double falsePositiveRate = 0.0000125; - private int maxMessages = 500; + private int bufferSize = 1500; + private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT; + private double falsePositiveRate = 0.0000125; + private int maxMessages = 500; public Parameters build() { - return new Parameters(bufferSize, maxMessages, digestAlgorithm, falsePositiveRate, dedupBufferSize, - dedupFpr); + return new Parameters(bufferSize, maxMessages, digestAlgorithm, falsePositiveRate); } @Override @@ -357,33 +352,6 @@ public Parameters.Builder setBufferSize(int bufferSize) { return this; } - public int getDedupBufferSize() { - return dedupBufferSize; - } - - public Builder setDedupBufferSize(int dedupBufferSize) { - this.dedupBufferSize = dedupBufferSize; - return this; - } - - public double getDedupFpr() { - return dedupFpr; - } - - public Builder setDedupFpr(double dedupFpr) { - this.dedupFpr = dedupFpr; - return this; - } - - public int getDeliveredCacheSize() { - return deliveredCacheSize; - } - - public Builder setDeliveredCacheSize(int deliveredCacheSize) { - this.deliveredCacheSize = deliveredCacheSize; - return this; - } - public DigestAlgorithm getDigestAlgorithm() { return digestAlgorithm; } @@ -446,7 +414,6 @@ public void update(ReconcileContext reconcile, Digest from) { } private class Buffer { - private final BloomWindow delivered; private final Semaphore garbageCollecting = new Semaphore(1); private final int highWaterMark; private final int maxAge; @@ -457,7 +424,6 @@ private class Buffer { private Buffer(int maxAge) { this.maxAge = maxAge; highWaterMark = (params.bufferSize - (int) (params.bufferSize + ((params.bufferSize) * 0.1))); - delivered = BloomWindow.create(params.dedupBufferSize, params.dedupFpr, Biff.Type.DIGEST); } public void clear() { @@ -466,12 +432,12 @@ public void clear() { public BloomFilter forReconcilliation() { var biff = new DigestBloomFilter(Entropy.nextBitsStreamLong(), params.bufferSize, params.falsePositiveRate); - state.keySet().stream().collect(Utils.toShuffledList()).forEach(k -> biff.add(k)); + state.keySet().stream().collect(Utils.toShuffledList()).forEach(biff::add); return biff; } public void receive(List messages) { - if (messages.size() == 0) { + if (messages.isEmpty()) { return; } log.trace("receiving: {} msgs on: {}", messages.size(), member.getId()); @@ -483,13 +449,13 @@ public void receive(List messages) { .map(s -> state.merge(s.hash, s, (a, b) -> a.msg.getAge() >= b.msg.getAge() ? a : b)) .map(s -> new Msg(adapter.source.apply(s.msg.getContent()), adapter.extractor.apply(s.msg), s.hash)) - .filter(m -> delivered.add(m.hash)) .toList()); gc(); } public Iterable reconcile(BloomFilter biff, Digest from) { - PriorityQueue mailBox = new PriorityQueue<>(Comparator.comparingInt(s -> s.getAge())); + PriorityQueue mailBox = new PriorityQueue<>( + Comparator.comparingInt(AgedMessage.Builder::getAge)); state.values() .stream() .collect(Utils.toShuffledList()) @@ -497,7 +463,7 @@ public Iterable reconcile(BloomFilter biff, Diges .filter(s -> !biff.contains(s.hash)) .filter(s -> s.msg.getAge() < maxAge) .forEach(s -> mailBox.add(s.msg)); - List reconciled = mailBox.stream().map(b -> b.build()).toList(); + List reconciled = mailBox.stream().map(AgedMessage.Builder::build).toList(); if (!reconciled.isEmpty()) { log.trace("reconciled: {} for: {} on: {}", reconciled.size(), from, member.getId()); } @@ -565,7 +531,7 @@ private boolean dup(state s) { // log.trace("duplicate event: {} on: {}", s.hash, member.getId()); return true; } - return delivered.contains(s.hash); + return false; } private void gc() { @@ -596,9 +562,7 @@ private void purgeTheAged() { Queue candidates = new PriorityQueue<>( Collections.reverseOrder((a, b) -> Integer.compare(a.msg.getAge(), b.msg.getAge()))); candidates.addAll(state.values()); - var processing = candidates.iterator(); - while (processing.hasNext()) { - var m = processing.next(); + for (ReliableBroadcaster.state m : candidates) { if (m.msg.getAge() > maxAge) { state.remove(m.hash); } else { diff --git a/sql-state/src/test/resources/logback-test.xml b/sql-state/src/test/resources/logback-test.xml index 798240567..fc6798bcb 100644 --- a/sql-state/src/test/resources/logback-test.xml +++ b/sql-state/src/test/resources/logback-test.xml @@ -56,10 +56,6 @@ - - - -