Skip to content

Commit

Permalink
misc clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed May 21, 2024
1 parent 999350a commit ce9f397
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
import static com.salesforce.apollo.cryptography.proto.Biff.Type.*;

/**
* Simplified Bloom filter for multiple types, with setable seeds and other parameters.
* Simplified Bloom filter for multiple types, with settable seeds and other parameters.
*
* @author hal.hildebrand
*/
abstract public class BloomFilter<T> {
private final BitSet bits;
private final Hash<T> h;
final BitSet bits;
final Hash<T> h;

private BloomFilter(Hash<T> h) {
this(h, new BitSet(h.getM()));
Expand All @@ -34,42 +34,28 @@ private BloomFilter(Hash<T> h, BitSet bits) {

@SuppressWarnings("unchecked")
public static <Q> BloomFilter<Q> create(long seed, int n, double p, Biff.Type type) {
switch (type) {
case DIGEST:
return (BloomFilter<Q>) new DigestBloomFilter(seed, n, p);
case INT:
return (BloomFilter<Q>) new IntBloomFilter(seed, n, p);
case LONG:
return (BloomFilter<Q>) new LongBloomFilter(seed, n, p);
case BYTES:
return (BloomFilter<Q>) new BytesBloomFilter(seed, n, p);
case STRING:
return (BloomFilter<Q>) new StringBloomFilter(seed, n, p);
case ULONG:
return (BloomFilter<Q>) new ULongBloomFilter(seed, n, p);
default:
throw new IllegalArgumentException("Invalid type: " + type);
}
return switch (type) {
case DIGEST -> (BloomFilter<Q>) new DigestBloomFilter(seed, n, p);
case INT -> (BloomFilter<Q>) new IntBloomFilter(seed, n, p);
case LONG -> (BloomFilter<Q>) new LongBloomFilter(seed, n, p);
case BYTES -> (BloomFilter<Q>) new BytesBloomFilter(seed, n, p);
case STRING -> (BloomFilter<Q>) new StringBloomFilter(seed, n, p);
case ULONG -> (BloomFilter<Q>) new ULongBloomFilter(seed, n, p);
default -> throw new IllegalArgumentException("Invalid type: " + type);
};
}

@SuppressWarnings("unchecked")
public static <Q> BloomFilter<Q> create(long seed, int m, int k, long[] bits, Biff.Type type) {
switch (type) {
case DIGEST:
return (BloomFilter<Q>) new DigestBloomFilter(seed, m, k, bits);
case INT:
return (BloomFilter<Q>) new IntBloomFilter(seed, m, k, bits);
case LONG:
return (BloomFilter<Q>) new LongBloomFilter(seed, m, k, bits);
case BYTES:
return (BloomFilter<Q>) new BytesBloomFilter(seed, m, k, bits);
case STRING:
return (BloomFilter<Q>) new StringBloomFilter(seed, m, k, bits);
case ULONG:
return (BloomFilter<Q>) new ULongBloomFilter(seed, m, k, bits);
default:
throw new IllegalArgumentException("Invalid type: " + type);
}
return switch (type) {
case DIGEST -> (BloomFilter<Q>) new DigestBloomFilter(seed, m, k, bits);
case INT -> (BloomFilter<Q>) new IntBloomFilter(seed, m, k, bits);
case LONG -> (BloomFilter<Q>) new LongBloomFilter(seed, m, k, bits);
case BYTES -> (BloomFilter<Q>) new BytesBloomFilter(seed, m, k, bits);
case STRING -> (BloomFilter<Q>) new StringBloomFilter(seed, m, k, bits);
case ULONG -> (BloomFilter<Q>) new ULongBloomFilter(seed, m, k, bits);
default -> throw new IllegalArgumentException("Invalid type: " + type);
};
}

public static <Q> BloomFilter<Q> from(Biff bff) {
Expand Down Expand Up @@ -106,6 +92,8 @@ public void clear() {
bits.clear();
}

public abstract BloomFilter<T> clone();

public boolean contains(T element) {
for (int hash : h.hashes(element)) {
if (!bits.get(hash)) {
Expand All @@ -116,8 +104,7 @@ public boolean contains(T element) {
}

public boolean equivalent(BloomFilter<T> other) {
var equiv = h.equivalent(other.h) && bits.equals(other.bits);
return equiv;
return h.equivalent(other.h) && bits.equals(other.bits);
}

public double fpp(int n) {
Expand All @@ -126,9 +113,9 @@ public double fpp(int n) {

/**
* Estimates the current population of the Bloom filter (see:
* http://en.wikipedia.org/wiki/Bloom_filter#Approximating_the_number_of_items_in_a_Bloom_filter
* <a href="http://en.wikipedia.org/wiki/Bloom_filter#Approximating_the_number_of_items_in_a_Bloom_filter">...</a>
*
* @return the estimated amount of elements in the filter
* @return the estimated number of elements in the filter
*/
public double getEstimatedPopulation() {
return population(bits, h.getK(), h.getM());
Expand All @@ -148,7 +135,7 @@ public Biff toBff() {
public static class BytesBloomFilter extends BloomFilter<byte[]> {

public BytesBloomFilter(long seed, int n, double p) {
super(new Hash<byte[]>(seed, n, p) {
super(new Hash<>(seed, n, p) {
@Override
protected Hasher<byte[]> newHasher() {
return new BytesHasher();
Expand All @@ -157,14 +144,23 @@ protected Hasher<byte[]> newHasher() {
}

public BytesBloomFilter(long seed, int m, int k, long[] bytes) {
super(new Hash<byte[]>(seed, k, m) {
super(new Hash<>(seed, k, m) {
@Override
protected Hasher<byte[]> newHasher() {
return new BytesHasher();
}
}, BitSet.valueOf(bytes));
}

public BytesBloomFilter(Hash<byte[]> hash, BitSet bitSet) {
super(hash, bitSet);
}

@Override
public BloomFilter<byte[]> clone() {
return new BytesBloomFilter(h.clone(), (BitSet) bits.clone());
}

@Override
protected Biff.Type getType() {
return BYTES;
Expand All @@ -173,8 +169,12 @@ protected Biff.Type getType() {

public static class DigestBloomFilter extends BloomFilter<Digest> {

public DigestBloomFilter(Hash<Digest> hash, BitSet bitSet) {
super(hash, bitSet);
}

public DigestBloomFilter(long seed, int n, double p) {
super(new Hash<Digest>(seed, n, p) {
super(new Hash<>(seed, n, p) {
@Override
protected Hasher<Digest> newHasher() {
return new DigestHasher();
Expand All @@ -183,14 +183,19 @@ protected Hasher<Digest> newHasher() {
}

public DigestBloomFilter(long seed, int m, int k, long[] bytes) {
super(new Hash<Digest>(seed, k, m) {
super(new Hash<>(seed, k, m) {
@Override
protected Hasher<Digest> newHasher() {
return new DigestHasher();
}
}, BitSet.valueOf(bytes));
}

@Override
public BloomFilter<Digest> clone() {
return new DigestBloomFilter(h.clone(), (BitSet) bits.clone());
}

@Override
protected Biff.Type getType() {
return DIGEST;
Expand All @@ -200,8 +205,12 @@ protected Biff.Type getType() {

public static class IntBloomFilter extends BloomFilter<Integer> {

public IntBloomFilter(Hash<Integer> hash, BitSet bitSet) {
super(hash, bitSet);
}

public IntBloomFilter(long seed, int n, double p) {
super(new Hash<Integer>(seed, n, p) {
super(new Hash<>(seed, n, p) {
@Override
protected Hasher<Integer> newHasher() {
return new IntHasher();
Expand All @@ -210,14 +219,19 @@ protected Hasher<Integer> newHasher() {
}

public IntBloomFilter(long seed, int m, int k, long[] bits) {
super(new Hash<Integer>(seed, k, m) {
super(new Hash<>(seed, k, m) {
@Override
protected Hasher<Integer> newHasher() {
return new IntHasher();
}
}, BitSet.valueOf(bits));
}

@Override
public BloomFilter<Integer> clone() {
return new IntBloomFilter(h.clone(), (BitSet) bits.clone());
}

@Override
protected Biff.Type getType() {
return INT;
Expand All @@ -226,8 +240,13 @@ protected Biff.Type getType() {
}

public static class LongBloomFilter extends BloomFilter<Long> {

public LongBloomFilter(Hash<Long> hash, BitSet bitSet) {
super(hash, bitSet);
}

public LongBloomFilter(long seed, int n, double p) {
super(new Hash<Long>(seed, n, p) {
super(new Hash<>(seed, n, p) {
@Override
protected Hasher<Long> newHasher() {
return new LongHasher();
Expand All @@ -236,14 +255,19 @@ protected Hasher<Long> newHasher() {
}

public LongBloomFilter(long seed, int m, int k, long[] bits) {
super(new Hash<Long>(seed, k, m) {
super(new Hash<>(seed, k, m) {
@Override
protected Hasher<Long> newHasher() {
return new LongHasher();
}
}, BitSet.valueOf(bits));
}

@Override
public BloomFilter<Long> clone() {
return new LongBloomFilter(h.clone(), (BitSet) bits.clone());
}

@Override
protected Biff.Type getType() {
return LONG;
Expand All @@ -253,8 +277,12 @@ protected Biff.Type getType() {

public static class StringBloomFilter extends BloomFilter<String> {

public StringBloomFilter(Hash<String> hash, BitSet bitSet) {
super(hash, bitSet);
}

public StringBloomFilter(long seed, int n, double p) {
super(new Hash<String>(seed, n, p) {
super(new Hash<>(seed, n, p) {
@Override
protected Hasher<String> newHasher() {
return new StringHasher();
Expand All @@ -263,23 +291,33 @@ protected Hasher<String> newHasher() {
}

public StringBloomFilter(long seed, int m, int k, long[] bytes) {
super(new Hash<String>(seed, k, m) {
super(new Hash<>(seed, k, m) {
@Override
protected Hasher<String> newHasher() {
return new StringHasher();
}
}, BitSet.valueOf(bytes));
}

@Override
public BloomFilter<String> clone() {
return new StringBloomFilter(h.clone(), (BitSet) bits.clone());
}

@Override
protected Biff.Type getType() {
return STRING;
}
}

public static class ULongBloomFilter extends BloomFilter<ULong> {

public ULongBloomFilter(Hash<ULong> hash, BitSet bitSet) {
super(hash, bitSet);
}

public ULongBloomFilter(long seed, int n, double p) {
super(new Hash<ULong>(seed, n, p) {
super(new Hash<>(seed, n, p) {
@Override
protected Hasher<ULong> newHasher() {
return new ULongHasher();
Expand All @@ -288,14 +326,19 @@ protected Hasher<ULong> newHasher() {
}

public ULongBloomFilter(long seed, int m, int k, long[] bits) {
super(new Hash<ULong>(seed, k, m) {
super(new Hash<>(seed, k, m) {
@Override
protected Hasher<ULong> newHasher() {
return new ULongHasher();
}
}, BitSet.valueOf(bits));
}

@Override
public BloomFilter<ULong> clone() {
return new ULongBloomFilter(h.clone(), (BitSet) bits.clone());
}

@Override
protected Biff.Type getType() {
return ULONG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* @author hal.hildebrand
*/
public class HexBloom {

public static final double DEFAULT_FPR = 0.0001;
public static final long DEFAULT_SEED = Primes.PRIMES[666];
private static final Function<Digest, Digest> IDENTITY = d -> d;
Expand Down Expand Up @@ -288,6 +289,25 @@ public static List<Function<Digest, Digest>> hashWraps(int crowns) {
return IntStream.range(0, crowns).mapToObj(i -> hashWrap(i)).toList();
}

public HexBloom add(Digest d, List<Function<Digest, Digest>> hashes) {
return addAll(Collections.singletonList(d), hashes);
}

public HexBloom addAll(List<Digest> added, List<Function<Digest, Digest>> hashes) {
var nextCard = cardinality + added.size();
var nextMembership = membership.clone();
var crwns = Arrays.stream(crowns).map(AtomicReference::new).toList();

added.forEach(d -> {
for (int i = 0; i < crwns.size(); i++) {
crwns.get(i).accumulateAndGet(hashes.get(i).apply(d), Digest::xor);
}
nextMembership.add(d);
});

return new HexBloom(nextCard, crwns.stream().map(AtomicReference::get).toList(), nextMembership);
}

public Digest compact() {
if (crowns.length == 1) {
return crowns[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private void join(Redirect redirect, Digest v, Duration duration) {
return;
}
redirecting.iterate((link, m) -> {
if (!view.started.get()) {
if (gateway.isDone() || !view.started.get()) {
return null;
}
log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,11 +619,6 @@ protected Gossip gossip(Fireflies link, int ring) {
return link.gossip(gossip);
} catch (Throwable e) {
final var p = (Participant) link.getMember();
if (!viewManagement.joined()) {
log.debug("Exception: {} bootstrap gossiping with:S {} view: {} on: {}", e.getMessage(), p.getId(),
currentView(), node.getId());
return null;
}
if (e instanceof StatusRuntimeException sre) {
switch (sre.getStatus().getCode()) {
case PERMISSION_DENIED:
Expand Down Expand Up @@ -653,7 +648,8 @@ protected Gossip gossip(Fireflies link, int ring) {

}
} else {
log.debug("Exception gossiping with: {} view: {} on: {}", p.getId(), currentView(), node.getId(), e);
log.debug("Exception gossiping joined: {} with: {} view: {} on: {}", viewManagement.joined(), p.getId(),
currentView(), node.getId(), e);
accuse(p, ring, e);
}
return null;
Expand Down Expand Up @@ -1298,7 +1294,7 @@ private void recover(Participant member) {
return;
}
if (context.activate(member)) {
log.debug("Recovering: {} cardinality: {} count: {} on: {}", member.getId(), viewManagement.cardinality(),
log.trace("Recovering: {} cardinality: {} count: {} on: {}", member.getId(), viewManagement.cardinality(),
context.totalCount(), node.getId());
}
}
Expand Down

0 comments on commit ce9f397

Please sign in to comment.