Skip to content

Commit

Permalink
BloomWindow replaces DigestWindow.
Browse files Browse the repository at this point in the history
Remove DigestWindow.
  • Loading branch information
Hellblazer committed Nov 25, 2023
1 parent 021f06f commit 65b17fa
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ private BloomWindow(BloomFilter<T> active1, int capacity, BloomFilter<T> active2
this.capacity = capacity;
}

public static <Q> BloomWindow<Q> create(int n, double p, Biff.Type type) {
return create(Entropy.nextBitsStreamLong(), Entropy.nextBitsStreamLong(), n, p, type);
public static <Q> BloomWindow<Q> create(int capacity, double fpr, Biff.Type type) {
return create(Entropy.nextBitsStreamLong(), Entropy.nextBitsStreamLong(), capacity, fpr, type);
}

public static <Q> BloomWindow<Q> create(long seed1, long seed2, int n, double p, Biff.Type type) {
return new BloomWindow<>(BloomFilter.create(seed1, n, p, type), n, BloomFilter.create(seed2, n, p, type));
public static <Q> BloomWindow<Q> create(long seed1, long seed2, int capacity, double fpr, Biff.Type type) {
return new BloomWindow<>(BloomFilter.create(seed1, capacity, fpr, type), capacity,
BloomFilter.create(seed2, capacity, fpr, type));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public void smokin() throws Exception {
var algo = DigestAlgorithm.DEFAULT;

var window = 1 << 16;
var seen = BloomWindow.create(entropy.nextLong(), entropy.nextLong(), window, 0.000000001, Biff.Type.DIGEST);
var seen = BloomWindow.create(entropy.nextLong(), entropy.nextLong(), window, Math.pow(10, -9),
Biff.Type.DIGEST);
var inserted = new TreeSet<Digest>();
var falsePositives = new TreeSet<Digest>();
var decayed = new TreeSet<Digest>();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
package com.salesforce.apollo.membership.messaging.rbc;

import com.codahale.metrics.Timer;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.salesfoce.apollo.cryptography.proto.Biff;
import com.salesfoce.apollo.messaging.proto.*;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.bloomFilters.BloomFilter.DigestBloomFilter;
import com.salesforce.apollo.bloomFilters.BloomWindow;
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.DigestAlgorithm;
import com.salesforce.apollo.crypto.JohnHancock;
Expand Down Expand Up @@ -295,20 +296,24 @@ public record Msg(List<Digest> source, ByteString content, Digest hash) {
}

public record Parameters(int bufferSize, int maxMessages, DigestAlgorithm digestAlgorithm, double falsePositiveRate,
int deliveredCacheSize) {
int dedupBufferSize, double dedupFpr) {
public static Parameters.Builder newBuilder() {
return new Builder();
}

public static class Builder implements Cloneable {
private int bufferSize = 1500;
private int bufferSize = 1500;

private int dedupBufferSize = 100;
private double dedupFpr = Math.pow(10, -9);
private int deliveredCacheSize = 100;
private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT;
private double falsePositiveRate = 0.00125;
private int maxMessages = 500;

public Parameters build() {
return new Parameters(bufferSize, maxMessages, digestAlgorithm, falsePositiveRate, deliveredCacheSize);
return new Parameters(bufferSize, maxMessages, digestAlgorithm, falsePositiveRate, dedupBufferSize,
dedupFpr);
}

@Override
Expand All @@ -329,6 +334,24 @@ public Parameters.Builder setBufferSize(int bufferSize) {
return this;
}

public int getDedupBufferSize() {
return dedupBufferSize;
}

public Builder setDedupBufferSize(int dedupBufferSize) {
this.dedupBufferSize = dedupBufferSize;
return this;
}

public double getDedupFpr() {
return dedupFpr;
}

public Builder setDedupFpr(double dedupFpr) {
this.dedupFpr = dedupFpr;
return this;
}

public int getDeliveredCacheSize() {
return deliveredCacheSize;
}
Expand Down Expand Up @@ -400,18 +423,18 @@ public void update(ReconcileContext reconcile, Digest from) {
}

private class Buffer {
private final DigestWindow delivered;
private final BloomWindow delivered;
private final Semaphore garbageCollecting = new Semaphore(1);
private final int highWaterMark;
private final int maxAge;
private final AtomicInteger round = new AtomicInteger();
private final Map<Digest, state> state = new ConcurrentHashMap<>();
private final Semaphore tickGate = new Semaphore(1);

public Buffer(int maxAge) {
private Buffer(int maxAge) {
this.maxAge = maxAge;
highWaterMark = (params.bufferSize - (int) (params.bufferSize + ((params.bufferSize) * 0.1)));
delivered = new DigestWindow(params.deliveredCacheSize, 3);
delivered = BloomWindow.create(params.dedupBufferSize, params.dedupFpr, Biff.Type.DIGEST);
}

public void clear() {
Expand All @@ -437,7 +460,7 @@ public void receive(List<AgedMessage> messages) {
.map(s -> state.merge(s.hash, s, (a, b) -> a.msg.getAge() >= b.msg.getAge() ? a : b))
.map(s -> new Msg(adapter.source.apply(s.msg.getContent()), adapter.extractor.apply(s.msg),
s.hash))
.filter(m -> delivered.add(m.hash, null))
.filter(m -> delivered.add(m.hash))
.toList());
gc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@
*/
public class RbcTest {

private static final boolean LARGE_TESTS = Boolean.getBoolean("large_tests");
private static final Parameters.Builder parameters = Parameters.newBuilder()
.setMaxMessages(100)
.setFalsePositiveRate(0.0125)
.setBufferSize(500);
.setBufferSize(500)
.setDedupBufferSize(
LARGE_TESTS ? 100 * 100 : 50 * 50)
.setDedupFpr(Math.pow(10, -9));
final AtomicReference<CountDownLatch> round = new AtomicReference<>();
private final List<Router> communications = new ArrayList<>();
private final AtomicInteger totalReceived = new AtomicInteger(0);
Expand Down Expand Up @@ -111,7 +115,7 @@ public void broadcast() throws Exception {
view.registerHandler(receiver);
receivers.put(view.getMember(), receiver);
}
int rounds = Boolean.getBoolean("large_tests") ? 100 : 5;
int rounds = LARGE_TESTS ? 100 : 50;
for (int r = 0; r < rounds; r++) {
CountDownLatch latch = new CountDownLatch(messengers.size());
round.set(latch);
Expand Down

0 comments on commit 65b17fa

Please sign in to comment.