diff --git a/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java b/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java index b27b770c4..c14594ed1 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java +++ b/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java @@ -6,22 +6,143 @@ */ package com.salesforce.apollo.bloomFilters; -import java.util.BitSet; -import java.util.function.Consumer; - -import org.joou.ULong; - import com.salesfoce.apollo.cryptography.proto.Biff; import com.salesforce.apollo.crypto.Digest; +import org.joou.ULong; + +import java.util.BitSet; + +import static com.salesfoce.apollo.cryptography.proto.Biff.Type.*; /** - * Simplified Bloom filter for multiple types, with setable seeds and other - * parameters. - * - * @author hal.hildebrand + * Simplified Bloom filter for multiple types, with setable seeds and other parameters. * + * @author hal.hildebrand */ abstract public class BloomFilter { + private final BitSet bits; + private final Hash h; + + private BloomFilter(Hash h) { + this(h, new BitSet(h.getM())); + } + + private BloomFilter(Hash h, BitSet bits) { + this.h = h; + this.bits = bits; + } + + @SuppressWarnings("unchecked") + public static BloomFilter create(long seed, int n, double p, Biff.Type type) { + switch (type) { + case DIGEST: + return (BloomFilter) new DigestBloomFilter(seed, n, p); + case INT: + return (BloomFilter) new IntBloomFilter(seed, n, p); + case LONG: + return (BloomFilter) new LongBloomFilter(seed, n, p); + case BYTES: + return (BloomFilter) new BytesBloomFilter(seed, n, p); + case STRING: + return (BloomFilter) new StringBloomFilter(seed, n, p); + case ULONG: + return (BloomFilter) new ULongBloomFilter(seed, n, p); + default: + throw new IllegalArgumentException("Invalid type: " + type); + } + } + + @SuppressWarnings("unchecked") + public static BloomFilter create(long seed, int m, int k, long[] bits, Biff.Type type) { + switch (type) { + case DIGEST: + return (BloomFilter) new DigestBloomFilter(seed, m, k, bits); + case INT: + return (BloomFilter) new IntBloomFilter(seed, m, k, bits); + case LONG: + return (BloomFilter) new LongBloomFilter(seed, m, k, bits); + case BYTES: + return (BloomFilter) new BytesBloomFilter(seed, m, k, bits); + case STRING: + return (BloomFilter) new StringBloomFilter(seed, m, k, bits); + case ULONG: + return (BloomFilter) new ULongBloomFilter(seed, m, k, bits); + default: + throw new IllegalArgumentException("Invalid type: " + type); + } + } + + public static BloomFilter from(Biff bff) { + long[] bits = new long[bff.getBitsCount()]; + int i = 0; + for (long l : bff.getBitsList()) { + bits[i++] = l; + } + return create(bff.getSeed(), bff.getM(), bff.getK(), bits, bff.getType()); + } + + private static double population(BitSet bitSet, int k, int m) { + int oneBits = bitSet.cardinality(); + return -m / ((double) k) * Math.log(1 - oneBits / ((double) m)); + } + + public boolean add(T element) { + final var hashes = h.hashes(element); + var contains = true; + for (int hash : hashes) { + if (!bits.get(hash)) { + contains = false; + } + bits.set(hash); + } + return !contains; + } + + public String biffString() { + return bits.toString(); + } + + public void clear() { + bits.clear(); + } + + public boolean contains(T element) { + for (int hash : h.hashes(element)) { + if (!bits.get(hash)) { + return false; + } + } + return true; + } + + public boolean equivalent(BloomFilter other) { + return h.equivalent(other.h) && bits.equals(other.bits); + } + + public double fpp(int n) { + return h.fpp(n); + } + + /** + * Estimates the current population of the Bloom filter (see: + * http://en.wikipedia.org/wiki/Bloom_filter#Approximating_the_number_of_items_in_a_Bloom_filter + * + * @return the estimated amount of elements in the filter + */ + public double getEstimatedPopulation() { + return population(bits, h.getK(), h.getM()); + } + + public Biff toBff() { + Biff.Builder builder = Biff.newBuilder().setSeed(h.getSeed()).setM(h.getM()).setK(h.getK()).setType(getType()); + + for (long l : bits.toLongArray()) { + builder.addBits(l); + } + return builder.build(); + } + + protected abstract Biff.Type getType(); public static class BytesBloomFilter extends BloomFilter { @@ -44,7 +165,7 @@ protected Hasher newHasher() { } @Override - protected int getType() { + protected Biff.Type getType() { return BYTES; } } @@ -70,7 +191,7 @@ protected Hasher newHasher() { } @Override - protected int getType() { + protected Biff.Type getType() { return DIGEST; } @@ -97,7 +218,7 @@ protected Hasher newHasher() { } @Override - protected int getType() { + protected Biff.Type getType() { return INT; } @@ -123,7 +244,7 @@ protected Hasher newHasher() { } @Override - protected int getType() { + protected Biff.Type getType() { return LONG; } @@ -150,7 +271,7 @@ protected Hasher newHasher() { } @Override - protected int getType() { + protected Biff.Type getType() { return STRING; } } @@ -175,152 +296,9 @@ protected Hasher newHasher() { } @Override - protected int getType() { + protected Biff.Type getType() { return ULONG; } } - - private static final int BYTES = 3; - private static final int DIGEST = 0; - private static final int INT = 1; - private static final int LONG = 2; - private static final int STRING = 4; - private static final int ULONG = 5; - - @SuppressWarnings("unchecked") - public static BloomFilter create(long seed, int n, double p, int type) { - switch (type) { - case DIGEST: - return (BloomFilter) new DigestBloomFilter(seed, n, p); - case INT: - return (BloomFilter) new IntBloomFilter(seed, n, p); - case LONG: - return (BloomFilter) new LongBloomFilter(seed, n, p); - case BYTES: - return (BloomFilter) new BytesBloomFilter(seed, n, p); - case STRING: - return (BloomFilter) new StringBloomFilter(seed, n, p); - case ULONG: - return (BloomFilter) new ULongBloomFilter(seed, n, p); - default: - throw new IllegalArgumentException("Invalid type: " + type); - } - } - - @SuppressWarnings("unchecked") - public static BloomFilter create(long seed, int m, int k, long[] bits, int type) { - switch (type) { - case DIGEST: - return (BloomFilter) new DigestBloomFilter(seed, m, k, bits); - case INT: - return (BloomFilter) new IntBloomFilter(seed, m, k, bits); - case LONG: - return (BloomFilter) new LongBloomFilter(seed, m, k, bits); - case BYTES: - return (BloomFilter) new BytesBloomFilter(seed, m, k, bits); - case STRING: - return (BloomFilter) new StringBloomFilter(seed, m, k, bits); - case ULONG: - return (BloomFilter) new ULongBloomFilter(seed, m, k, bits); - default: - throw new IllegalArgumentException("Invalid type: " + type); - } - } - - public static BloomFilter from(Biff bff) { - long[] bits = new long[bff.getBitsCount()]; - int i = 0; - for (long l : bff.getBitsList()) { - bits[i++] = l; - } - return create(bff.getSeed(), bff.getM(), bff.getK(), bits, bff.getType()); - } - - private static double population(BitSet bitSet, int k, int m) { - int oneBits = bitSet.cardinality(); - return -m / ((double) k) * Math.log(1 - oneBits / ((double) m)); - } - - private final BitSet bits; - private final Hash h; - - private BloomFilter(Hash h) { - this(h, new BitSet(h.getM())); - } - - private BloomFilter(Hash h, BitSet bits) { - this.h = h; - this.bits = bits; - } - - public void add(T element) { - for (int hash : h.hashes(element)) { - bits.set(hash); - } - } - - public boolean add(T element, Consumer ifAbsent) { - final var hashes = h.hashes(element); - var contains = true; - for (int hash : hashes) { - if (!bits.get(hash)) { - contains = false; - break; - } - } - if (!contains) { - ifAbsent.accept(element); - for (int hash : hashes) { - bits.set(hash); - } - } - return !contains; - } - - public String biffString() { - return bits.toString(); - } - - public void clear() { - bits.clear(); - } - - public boolean contains(T element) { - for (int hash : h.hashes(element)) { - if (!bits.get(hash)) { - return false; - } - } - return true; - } - - public boolean equivalent(BloomFilter other) { - return h.equivalent(other.h) && bits.equals(other.bits); - } - - public double fpp(int n) { - return h.fpp(n); - } - - /** - * Estimates the current population of the Bloom filter (see: - * http://en.wikipedia.org/wiki/Bloom_filter#Approximating_the_number_of_items_in_a_Bloom_filter - * - * @return the estimated amount of elements in the filter - */ - public double getEstimatedPopulation() { - return population(bits, h.getK(), h.getM()); - } - - public Biff toBff() { - Biff.Builder builder = Biff.newBuilder().setSeed(h.getSeed()).setM(h.getM()).setK(h.getK()).setType(getType()); - - for (long l : bits.toLongArray()) { - builder.addBits(l); - } - return builder.build(); - } - - protected abstract int getType(); } diff --git a/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomWindow.java b/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomWindow.java index 8684249a5..8aa10724f 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomWindow.java +++ b/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomWindow.java @@ -6,76 +6,94 @@ */ package com.salesforce.apollo.bloomFilters; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicBoolean; +import com.salesfoce.apollo.cryptography.proto.Biff; +import com.salesforce.apollo.utils.Entropy; + import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Consumer; -import java.util.function.Supplier; /** - * @author hal.hildebrand + * Provide a windowed deduplication mechanism. The implementation is based on the most excellent paper (); + var falsePositives = new TreeSet(); + var decayed = new TreeSet(); + + IntStream.range(0, window).mapToObj(i -> algo.random(entropy)).forEach(d -> { + if (!seen.add(d)) { + falsePositives.add(d); + } else { + inserted.add(d); + } + }); + assertEquals(0, falsePositives.size(), falsePositives.toString()); + + for (Digest digest : inserted) { + if (!seen.contains(digest)) { + decayed.add(digest); + } + } + assertEquals(0, decayed.size(), decayed.toString()); + + IntStream.range(0, window).mapToObj(i -> algo.random(entropy)).forEach(d -> { + if (seen.contains(d)) { + falsePositives.add(d); + } + }); + assertEquals(0, falsePositives.size(), falsePositives.toString()); + + var overflow = new ArrayList(); + + IntStream.range(0, window).mapToObj(i -> algo.random(entropy)).forEach(d -> { + if (!seen.add(d)) { + falsePositives.add(d); + } else { + overflow.add(d); + } + }); + assertEquals(1, falsePositives.size(), falsePositives.toString()); + + for (Digest digest : overflow) { + if (!seen.contains(digest)) { + decayed.add(digest); + } + } + assertEquals(0, decayed.size(), decayed.toString()); + + for (Digest digest : inserted) { + if (!seen.contains(digest)) { + decayed.add(digest); + } + } + assertEquals(0, decayed.size(), decayed.toString()); + + IntStream.range(0, window).mapToObj(i -> algo.random(entropy)).forEach(d -> { + if (seen.contains(d)) { + falsePositives.add(d); + } + }); + assertEquals(12, falsePositives.size(), falsePositives.toString()); + } +} diff --git a/grpc/src/main/proto/crypto.proto b/grpc/src/main/proto/crypto.proto index e3dd21717..50011ec05 100644 --- a/grpc/src/main/proto/crypto.proto +++ b/grpc/src/main/proto/crypto.proto @@ -10,10 +10,19 @@ import "google/protobuf/timestamp.proto"; package crypto; message Biff { + enum Type { + invalid = 0; + DIGEST = 1; + INT = 2; + LONG = 3; + BYTES = 4; + STRING = 5; + ULONG = 6; + } int32 m = 1; int32 k = 2; int64 seed = 3; - int32 type = 4; + Type type = 4; repeated uint64 bits = 5; } diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/DigestWindow.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/DigestWindow.java deleted file mode 100644 index 76a0233d5..000000000 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/DigestWindow.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2022, salesforce.com, inc. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ -package com.salesforce.apollo.membership.messaging.rbc; - -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - -import com.salesforce.apollo.crypto.Digest; - -/** - * @author hal.hildebrand - * - */ -public class DigestWindow { - private final AtomicInteger count = new AtomicInteger(0); - private final BlockingDeque> segments = new LinkedBlockingDeque<>(); - private final int windowSize; - - public DigestWindow(int windowSize, int segments) { - this.windowSize = windowSize; - for (int i = 0; i < segments; i++) { - this.segments.add(new ConcurrentSkipListSet<>()); - } - } - - public void add(Digest element) { - if (count.incrementAndGet() % windowSize == 0) { - segments.removeLast(); - segments.addFirst(new ConcurrentSkipListSet<>()); - } - segments.getFirst().add(element); - } - - public boolean add(Digest element, Consumer ifAbsent) { - if (count.incrementAndGet() % windowSize == 0) { - segments.removeLast(); - segments.addFirst(new ConcurrentSkipListSet<>()); - } - if (segments.getFirst().add(element)) { - if (ifAbsent != null) { - ifAbsent.accept(element); - } - return true; - } - return false; - } - - public boolean contains(Digest element) { - for (var biff : segments) { - if (biff.contains(element)) { - return true; - } - } - return false; - } -} diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index 18c518686..0e38cc67e 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -7,15 +7,16 @@ package com.salesforce.apollo.membership.messaging.rbc; import com.codahale.metrics.Timer; -import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import com.salesfoce.apollo.cryptography.proto.Biff; import com.salesfoce.apollo.messaging.proto.*; import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.bloomFilters.BloomFilter; import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter; +import com.salesforce.apollo.bloomFilters.BloomWindow; import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.DigestAlgorithm; import com.salesforce.apollo.crypto.JohnHancock; @@ -295,20 +296,24 @@ public record Msg(List source, ByteString content, Digest hash) { } public record Parameters(int bufferSize, int maxMessages, DigestAlgorithm digestAlgorithm, double falsePositiveRate, - int deliveredCacheSize) { + int dedupBufferSize, double dedupFpr) { public static Parameters.Builder newBuilder() { return new Builder(); } public static class Builder implements Cloneable { - private int bufferSize = 1500; + private int bufferSize = 1500; + + private int dedupBufferSize = 100; + private double dedupFpr = Math.pow(10, -9); private int deliveredCacheSize = 100; private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT; private double falsePositiveRate = 0.00125; private int maxMessages = 500; public Parameters build() { - return new Parameters(bufferSize, maxMessages, digestAlgorithm, falsePositiveRate, deliveredCacheSize); + return new Parameters(bufferSize, maxMessages, digestAlgorithm, falsePositiveRate, dedupBufferSize, + dedupFpr); } @Override @@ -329,6 +334,24 @@ public Parameters.Builder setBufferSize(int bufferSize) { return this; } + public int getDedupBufferSize() { + return dedupBufferSize; + } + + public Builder setDedupBufferSize(int dedupBufferSize) { + this.dedupBufferSize = dedupBufferSize; + return this; + } + + public double getDedupFpr() { + return dedupFpr; + } + + public Builder setDedupFpr(double dedupFpr) { + this.dedupFpr = dedupFpr; + return this; + } + public int getDeliveredCacheSize() { return deliveredCacheSize; } @@ -400,7 +423,7 @@ public void update(ReconcileContext reconcile, Digest from) { } private class Buffer { - private final DigestWindow delivered; + private final BloomWindow delivered; private final Semaphore garbageCollecting = new Semaphore(1); private final int highWaterMark; private final int maxAge; @@ -408,10 +431,10 @@ private class Buffer { private final Map state = new ConcurrentHashMap<>(); private final Semaphore tickGate = new Semaphore(1); - public Buffer(int maxAge) { + private Buffer(int maxAge) { this.maxAge = maxAge; highWaterMark = (params.bufferSize - (int) (params.bufferSize + ((params.bufferSize) * 0.1))); - delivered = new DigestWindow(params.deliveredCacheSize, 3); + delivered = BloomWindow.create(params.dedupBufferSize, params.dedupFpr, Biff.Type.DIGEST); } public void clear() { @@ -437,7 +460,7 @@ public void receive(List messages) { .map(s -> state.merge(s.hash, s, (a, b) -> a.msg.getAge() >= b.msg.getAge() ? a : b)) .map(s -> new Msg(adapter.source.apply(s.msg.getContent()), adapter.extractor.apply(s.msg), s.hash)) - .filter(m -> delivered.add(m.hash, null)) + .filter(m -> delivered.add(m.hash)) .toList()); gc(); } diff --git a/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java index 9d6b8ea7f..460c4cb61 100644 --- a/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java @@ -53,10 +53,14 @@ */ public class RbcTest { + private static final boolean LARGE_TESTS = Boolean.getBoolean("large_tests"); private static final Parameters.Builder parameters = Parameters.newBuilder() .setMaxMessages(100) .setFalsePositiveRate(0.0125) - .setBufferSize(500); + .setBufferSize(500) + .setDedupBufferSize( + LARGE_TESTS ? 100 * 100 : 50 * 50) + .setDedupFpr(Math.pow(10, -9)); final AtomicReference round = new AtomicReference<>(); private final List communications = new ArrayList<>(); private final AtomicInteger totalReceived = new AtomicInteger(0); @@ -111,7 +115,7 @@ public void broadcast() throws Exception { view.registerHandler(receiver); receivers.put(view.getMember(), receiver); } - int rounds = Boolean.getBoolean("large_tests") ? 100 : 5; + int rounds = LARGE_TESTS ? 100 : 50; for (int r = 0; r < rounds; r++) { CountDownLatch latch = new CountDownLatch(messengers.size()); round.set(latch);