diff --git a/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomWindow.java b/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomWindow.java index 46766ee79..783bc7854 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomWindow.java +++ b/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomWindow.java @@ -29,12 +29,13 @@ private BloomWindow(BloomFilter active1, int capacity, BloomFilter active2 this.capacity = capacity; } - public static BloomWindow create(int n, double p, Biff.Type type) { - return create(Entropy.nextBitsStreamLong(), Entropy.nextBitsStreamLong(), n, p, type); + public static BloomWindow create(int capacity, double fpr, Biff.Type type) { + return create(Entropy.nextBitsStreamLong(), Entropy.nextBitsStreamLong(), capacity, fpr, type); } - public static BloomWindow create(long seed1, long seed2, int n, double p, Biff.Type type) { - return new BloomWindow<>(BloomFilter.create(seed1, n, p, type), n, BloomFilter.create(seed2, n, p, type)); + public static BloomWindow create(long seed1, long seed2, int capacity, double fpr, Biff.Type type) { + return new BloomWindow<>(BloomFilter.create(seed1, capacity, fpr, type), capacity, + BloomFilter.create(seed2, capacity, fpr, type)); } /** diff --git a/cryptography/src/test/java/com/salesforce/apollo/bloomFilters/BloomWindowTest.java b/cryptography/src/test/java/com/salesforce/apollo/bloomFilters/BloomWindowTest.java index 6bb61cb19..80a961e25 100644 --- a/cryptography/src/test/java/com/salesforce/apollo/bloomFilters/BloomWindowTest.java +++ b/cryptography/src/test/java/com/salesforce/apollo/bloomFilters/BloomWindowTest.java @@ -24,7 +24,8 @@ public void smokin() throws Exception { var algo = DigestAlgorithm.DEFAULT; var window = 1 << 16; - var seen = BloomWindow.create(entropy.nextLong(), entropy.nextLong(), window, 0.000000001, Biff.Type.DIGEST); + var seen = BloomWindow.create(entropy.nextLong(), entropy.nextLong(), window, Math.pow(10, -9), + Biff.Type.DIGEST); var inserted = new TreeSet(); var falsePositives = new TreeSet(); var decayed = new TreeSet(); diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/DigestWindow.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/DigestWindow.java deleted file mode 100644 index 76a0233d5..000000000 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/DigestWindow.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2022, salesforce.com, inc. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ -package com.salesforce.apollo.membership.messaging.rbc; - -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - -import com.salesforce.apollo.crypto.Digest; - -/** - * @author hal.hildebrand - * - */ -public class DigestWindow { - private final AtomicInteger count = new AtomicInteger(0); - private final BlockingDeque> segments = new LinkedBlockingDeque<>(); - private final int windowSize; - - public DigestWindow(int windowSize, int segments) { - this.windowSize = windowSize; - for (int i = 0; i < segments; i++) { - this.segments.add(new ConcurrentSkipListSet<>()); - } - } - - public void add(Digest element) { - if (count.incrementAndGet() % windowSize == 0) { - segments.removeLast(); - segments.addFirst(new ConcurrentSkipListSet<>()); - } - segments.getFirst().add(element); - } - - public boolean add(Digest element, Consumer ifAbsent) { - if (count.incrementAndGet() % windowSize == 0) { - segments.removeLast(); - segments.addFirst(new ConcurrentSkipListSet<>()); - } - if (segments.getFirst().add(element)) { - if (ifAbsent != null) { - ifAbsent.accept(element); - } - return true; - } - return false; - } - - public boolean contains(Digest element) { - for (var biff : segments) { - if (biff.contains(element)) { - return true; - } - } - return false; - } -} diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index 18c518686..0e38cc67e 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -7,15 +7,16 @@ package com.salesforce.apollo.membership.messaging.rbc; import com.codahale.metrics.Timer; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import com.salesfoce.apollo.cryptography.proto.Biff; import com.salesfoce.apollo.messaging.proto.*; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.bloomFilters.BloomFilter; import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; +import com.salesforce.apollo.bloomFilters.BloomWindow; import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.DigestAlgorithm; import com.salesforce.apollo.crypto.JohnHancock; @@ -295,20 +296,24 @@ public record Msg(List source, ByteString content, Digest hash) { } public record Parameters(int bufferSize, int maxMessages, DigestAlgorithm digestAlgorithm, double falsePositiveRate, - int deliveredCacheSize) { + int dedupBufferSize, double dedupFpr) { public static Parameters.Builder newBuilder() { return new Builder(); } public static class Builder implements Cloneable { - private int bufferSize = 1500; + private int bufferSize = 1500; + + private int dedupBufferSize = 100; + private double dedupFpr = Math.pow(10, -9); private int deliveredCacheSize = 100; private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT; private double falsePositiveRate = 0.00125; private int maxMessages = 500; public Parameters build() { - return new Parameters(bufferSize, maxMessages, digestAlgorithm, falsePositiveRate, deliveredCacheSize); + return new Parameters(bufferSize, maxMessages, digestAlgorithm, falsePositiveRate, dedupBufferSize, + dedupFpr); } @Override @@ -329,6 +334,24 @@ public Parameters.Builder setBufferSize(int bufferSize) { return this; } + public int getDedupBufferSize() { + return dedupBufferSize; + } + + public Builder setDedupBufferSize(int dedupBufferSize) { + this.dedupBufferSize = dedupBufferSize; + return this; + } + + public double getDedupFpr() { + return dedupFpr; + } + + public Builder setDedupFpr(double dedupFpr) { + this.dedupFpr = dedupFpr; + return this; + } + public int getDeliveredCacheSize() { return deliveredCacheSize; } @@ -400,7 +423,7 @@ public void update(ReconcileContext reconcile, Digest from) { } private class Buffer { - private final DigestWindow delivered; + private final BloomWindow delivered; private final Semaphore garbageCollecting = new Semaphore(1); private final int highWaterMark; private final int maxAge; @@ -408,10 +431,10 @@ private class Buffer { private final Map state = new ConcurrentHashMap<>(); private final Semaphore tickGate = new Semaphore(1); - public Buffer(int maxAge) { + private Buffer(int maxAge) { this.maxAge = maxAge; highWaterMark = (params.bufferSize - (int) (params.bufferSize + ((params.bufferSize) * 0.1))); - delivered = new DigestWindow(params.deliveredCacheSize, 3); + delivered = BloomWindow.create(params.dedupBufferSize, params.dedupFpr, Biff.Type.DIGEST); } public void clear() { @@ -437,7 +460,7 @@ public void receive(List messages) { .map(s -> state.merge(s.hash, s, (a, b) -> a.msg.getAge() >= b.msg.getAge() ? a : b)) .map(s -> new Msg(adapter.source.apply(s.msg.getContent()), adapter.extractor.apply(s.msg), s.hash)) - .filter(m -> delivered.add(m.hash, null)) + .filter(m -> delivered.add(m.hash)) .toList()); gc(); } diff --git a/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java index 9d6b8ea7f..460c4cb61 100644 --- a/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java @@ -53,10 +53,14 @@ */ public class RbcTest { + private static final boolean LARGE_TESTS = Boolean.getBoolean("large_tests"); private static final Parameters.Builder parameters = Parameters.newBuilder() .setMaxMessages(100) .setFalsePositiveRate(0.0125) - .setBufferSize(500); + .setBufferSize(500) + .setDedupBufferSize( + LARGE_TESTS ? 100 * 100 : 50 * 50) + .setDedupFpr(Math.pow(10, -9)); final AtomicReference round = new AtomicReference<>(); private final List communications = new ArrayList<>(); private final AtomicInteger totalReceived = new AtomicInteger(0); @@ -111,7 +115,7 @@ public void broadcast() throws Exception { view.registerHandler(receiver); receivers.put(view.getMember(), receiver); } - int rounds = Boolean.getBoolean("large_tests") ? 100 : 5; + int rounds = LARGE_TESTS ? 100 : 50; for (int r = 0; r < rounds; r++) { CountDownLatch latch = new CountDownLatch(messengers.size()); round.set(latch);