diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index dd1c678ef..d392f68c5 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -16,4 +16,4 @@ jobs: cache: 'maven' github-token: ${{ secrets.GITHUB_TOKEN }} - name: Build with Maven - run: ./mvnw -batch-mode clean install -Dforks=4 -Ppre --file pom.xml + run: ./mvnw -batch-mode clean install -Dforks=1 -Ppre --file pom.xml diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 877c4a896..242f46e96 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -7,7 +7,6 @@ package com.salesforce.apollo.choam; import com.chiralbehaviors.tron.Fsm; -import com.google.common.base.Function; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.InvalidProtocolBufferException; @@ -56,6 +55,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -109,10 +109,9 @@ public CHOAM(Parameters params) { rotateViewKeys(); var bContext = new DelegatedContext<>(params.context()); - var adapter = new MessageAdapter(_ -> true, (Function) this::signatureHash, - (Function>) _ -> Collections.emptyList(), - (_, any) -> any, - (Function) AgedMessageOrBuilder::getContent); + var adapter = new MessageAdapter(_ -> true, this::signatureHash, + _ -> Collections.emptyList(), + (_, any) -> any, AgedMessageOrBuilder::getContent); combine = new ReliableBroadcaster(bContext, params.member(), params.combine(), params.communications(), params.metrics() == null ? null : params.metrics().getCombineMetrics(), @@ -234,7 +233,9 @@ public static Block reconfigure(Digest nextViewId, Map joins, Hash } public static Map rosterMap(Context baseContext, Collection members) { - return members.stream().collect(Collectors.toMap(m -> m, baseContext::getMember)); + return members.stream() + .map(baseContext::getMember) + .collect(Collectors.toMap(Member::getId, Function.identity())); } public static List toGenesisData(List initializationData) { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Session.java b/choam/src/main/java/com/salesforce/apollo/choam/Session.java index 31c09603e..b680d0737 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Session.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Session.java @@ -7,7 +7,6 @@ package com.salesforce.apollo.choam; import com.codahale.metrics.Timer; -import com.google.common.base.Function; import com.google.protobuf.Message; import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; @@ -33,6 +32,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Supplier; /** diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java index cfd225050..d3d583f35 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java @@ -103,7 +103,14 @@ public void setUp() throws Exception { @Test public void smokin() throws Exception { + var bootstrap = members.subList(0, 4); + var kernel = bootstrap.get(0); + contexts.get(kernel).activate(kernel); + routers.get(kernel).start(); + choams.get(kernel).start(); + + bootstrap.forEach(member -> bootstrap.forEach(m -> contexts.get(member).activate(m))); bootstrap.forEach(member -> bootstrap.forEach(m -> contexts.get(member).activate(m))); diff --git a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java index 549a7087d..4efedc695 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java @@ -102,11 +102,11 @@ public void before() throws Exception { var params = Parameters.newBuilder() .setGenerateGenesis(true) .setGenesisViewId(origin.prefix(entropy.nextLong())) - .setGossipDuration(Duration.ofMillis(20)) + .setGossipDuration(Duration.ofMillis(30)) .setProducer(ProducerParameters.newBuilder() .setMaxBatchCount(15_000) .setMaxBatchByteSize(200 * 1024 * 1024) - .setGossipDuration(Duration.ofMillis(10)) + .setGossipDuration(Duration.ofMillis(30)) .setBatchInterval(Duration.ofMillis(50)) .setEthereal(Config.newBuilder() .setNumberOfEpochs(3) @@ -191,7 +191,7 @@ public void submitMultiplTxn() throws Exception { transactioneers.stream().forEach(e -> e.start()); try { - final var complete = countdown.await(LARGE_TESTS ? 3200 : 60, TimeUnit.SECONDS); + final var complete = countdown.await(LARGE_TESTS ? 3200 : 120, TimeUnit.SECONDS); assertTrue(complete, "All clients did not complete: " + transactioneers.stream() .map(t -> t.getCompleted()) .filter(i -> i < max) diff --git a/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java b/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java index d452b0774..d2f898823 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java +++ b/cryptography/src/main/java/com/salesforce/apollo/bloomFilters/BloomFilter.java @@ -15,13 +15,13 @@ import static com.salesforce.apollo.cryptography.proto.Biff.Type.*; /** - * Simplified Bloom filter for multiple types, with setable seeds and other parameters. + * Simplified Bloom filter for multiple types, with settable seeds and other parameters. * * @author hal.hildebrand */ abstract public class BloomFilter { - private final BitSet bits; - private final Hash h; + final BitSet bits; + final Hash h; private BloomFilter(Hash h) { this(h, new BitSet(h.getM())); @@ -34,42 +34,28 @@ private BloomFilter(Hash h, BitSet bits) { @SuppressWarnings("unchecked") public static BloomFilter create(long seed, int n, double p, Biff.Type type) { - switch (type) { - case DIGEST: - return (BloomFilter) new DigestBloomFilter(seed, n, p); - case INT: - return (BloomFilter) new IntBloomFilter(seed, n, p); - case LONG: - return (BloomFilter) new LongBloomFilter(seed, n, p); - case BYTES: - return (BloomFilter) new BytesBloomFilter(seed, n, p); - case STRING: - return (BloomFilter) new StringBloomFilter(seed, n, p); - case ULONG: - return (BloomFilter) new ULongBloomFilter(seed, n, p); - default: - throw new IllegalArgumentException("Invalid type: " + type); - } + return switch (type) { + case DIGEST -> (BloomFilter) new DigestBloomFilter(seed, n, p); + case INT -> (BloomFilter) new IntBloomFilter(seed, n, p); + case LONG -> (BloomFilter) new LongBloomFilter(seed, n, p); + case BYTES -> (BloomFilter) new BytesBloomFilter(seed, n, p); + case STRING -> (BloomFilter) new StringBloomFilter(seed, n, p); + case ULONG -> (BloomFilter) new ULongBloomFilter(seed, n, p); + default -> throw new IllegalArgumentException("Invalid type: " + type); + }; } @SuppressWarnings("unchecked") public static BloomFilter create(long seed, int m, int k, long[] bits, Biff.Type type) { - switch (type) { - case DIGEST: - return (BloomFilter) new DigestBloomFilter(seed, m, k, bits); - case INT: - return (BloomFilter) new IntBloomFilter(seed, m, k, bits); - case LONG: - return (BloomFilter) new LongBloomFilter(seed, m, k, bits); - case BYTES: - return (BloomFilter) new BytesBloomFilter(seed, m, k, bits); - case STRING: - return (BloomFilter) new StringBloomFilter(seed, m, k, bits); - case ULONG: - return (BloomFilter) new ULongBloomFilter(seed, m, k, bits); - default: - throw new IllegalArgumentException("Invalid type: " + type); - } + return switch (type) { + case DIGEST -> (BloomFilter) new DigestBloomFilter(seed, m, k, bits); + case INT -> (BloomFilter) new IntBloomFilter(seed, m, k, bits); + case LONG -> (BloomFilter) new LongBloomFilter(seed, m, k, bits); + case BYTES -> (BloomFilter) new BytesBloomFilter(seed, m, k, bits); + case STRING -> (BloomFilter) new StringBloomFilter(seed, m, k, bits); + case ULONG -> (BloomFilter) new ULongBloomFilter(seed, m, k, bits); + default -> throw new IllegalArgumentException("Invalid type: " + type); + }; } public static BloomFilter from(Biff bff) { @@ -106,6 +92,8 @@ public void clear() { bits.clear(); } + public abstract BloomFilter clone(); + public boolean contains(T element) { for (int hash : h.hashes(element)) { if (!bits.get(hash)) { @@ -116,8 +104,7 @@ public boolean contains(T element) { } public boolean equivalent(BloomFilter other) { - var equiv = h.equivalent(other.h) && bits.equals(other.bits); - return equiv; + return h.equivalent(other.h) && bits.equals(other.bits); } public double fpp(int n) { @@ -126,9 +113,9 @@ public double fpp(int n) { /** * Estimates the current population of the Bloom filter (see: - * http://en.wikipedia.org/wiki/Bloom_filter#Approximating_the_number_of_items_in_a_Bloom_filter + * ... * - * @return the estimated amount of elements in the filter + * @return the estimated number of elements in the filter */ public double getEstimatedPopulation() { return population(bits, h.getK(), h.getM()); @@ -148,7 +135,7 @@ public Biff toBff() { public static class BytesBloomFilter extends BloomFilter { public BytesBloomFilter(long seed, int n, double p) { - super(new Hash(seed, n, p) { + super(new Hash<>(seed, n, p) { @Override protected Hasher newHasher() { return new BytesHasher(); @@ -157,7 +144,7 @@ protected Hasher newHasher() { } public BytesBloomFilter(long seed, int m, int k, long[] bytes) { - super(new Hash(seed, k, m) { + super(new Hash<>(seed, k, m) { @Override protected Hasher newHasher() { return new BytesHasher(); @@ -165,6 +152,15 @@ protected Hasher newHasher() { }, BitSet.valueOf(bytes)); } + public BytesBloomFilter(Hash hash, BitSet bitSet) { + super(hash, bitSet); + } + + @Override + public BloomFilter clone() { + return new BytesBloomFilter(h.clone(), (BitSet) bits.clone()); + } + @Override protected Biff.Type getType() { return BYTES; @@ -173,8 +169,12 @@ protected Biff.Type getType() { public static class DigestBloomFilter extends BloomFilter { + public DigestBloomFilter(Hash hash, BitSet bitSet) { + super(hash, bitSet); + } + public DigestBloomFilter(long seed, int n, double p) { - super(new Hash(seed, n, p) { + super(new Hash<>(seed, n, p) { @Override protected Hasher newHasher() { return new DigestHasher(); @@ -183,7 +183,7 @@ protected Hasher newHasher() { } public DigestBloomFilter(long seed, int m, int k, long[] bytes) { - super(new Hash(seed, k, m) { + super(new Hash<>(seed, k, m) { @Override protected Hasher newHasher() { return new DigestHasher(); @@ -191,6 +191,11 @@ protected Hasher newHasher() { }, BitSet.valueOf(bytes)); } + @Override + public BloomFilter clone() { + return new DigestBloomFilter(h.clone(), (BitSet) bits.clone()); + } + @Override protected Biff.Type getType() { return DIGEST; @@ -200,8 +205,12 @@ protected Biff.Type getType() { public static class IntBloomFilter extends BloomFilter { + public IntBloomFilter(Hash hash, BitSet bitSet) { + super(hash, bitSet); + } + public IntBloomFilter(long seed, int n, double p) { - super(new Hash(seed, n, p) { + super(new Hash<>(seed, n, p) { @Override protected Hasher newHasher() { return new IntHasher(); @@ -210,7 +219,7 @@ protected Hasher newHasher() { } public IntBloomFilter(long seed, int m, int k, long[] bits) { - super(new Hash(seed, k, m) { + super(new Hash<>(seed, k, m) { @Override protected Hasher newHasher() { return new IntHasher(); @@ -218,6 +227,11 @@ protected Hasher newHasher() { }, BitSet.valueOf(bits)); } + @Override + public BloomFilter clone() { + return new IntBloomFilter(h.clone(), (BitSet) bits.clone()); + } + @Override protected Biff.Type getType() { return INT; @@ -226,8 +240,13 @@ protected Biff.Type getType() { } public static class LongBloomFilter extends BloomFilter { + + public LongBloomFilter(Hash hash, BitSet bitSet) { + super(hash, bitSet); + } + public LongBloomFilter(long seed, int n, double p) { - super(new Hash(seed, n, p) { + super(new Hash<>(seed, n, p) { @Override protected Hasher newHasher() { return new LongHasher(); @@ -236,7 +255,7 @@ protected Hasher newHasher() { } public LongBloomFilter(long seed, int m, int k, long[] bits) { - super(new Hash(seed, k, m) { + super(new Hash<>(seed, k, m) { @Override protected Hasher newHasher() { return new LongHasher(); @@ -244,6 +263,11 @@ protected Hasher newHasher() { }, BitSet.valueOf(bits)); } + @Override + public BloomFilter clone() { + return new LongBloomFilter(h.clone(), (BitSet) bits.clone()); + } + @Override protected Biff.Type getType() { return LONG; @@ -253,8 +277,12 @@ protected Biff.Type getType() { public static class StringBloomFilter extends BloomFilter { + public StringBloomFilter(Hash hash, BitSet bitSet) { + super(hash, bitSet); + } + public StringBloomFilter(long seed, int n, double p) { - super(new Hash(seed, n, p) { + super(new Hash<>(seed, n, p) { @Override protected Hasher newHasher() { return new StringHasher(); @@ -263,7 +291,7 @@ protected Hasher newHasher() { } public StringBloomFilter(long seed, int m, int k, long[] bytes) { - super(new Hash(seed, k, m) { + super(new Hash<>(seed, k, m) { @Override protected Hasher newHasher() { return new StringHasher(); @@ -271,6 +299,11 @@ protected Hasher newHasher() { }, BitSet.valueOf(bytes)); } + @Override + public BloomFilter clone() { + return new StringBloomFilter(h.clone(), (BitSet) bits.clone()); + } + @Override protected Biff.Type getType() { return STRING; @@ -278,8 +311,13 @@ protected Biff.Type getType() { } public static class ULongBloomFilter extends BloomFilter { + + public ULongBloomFilter(Hash hash, BitSet bitSet) { + super(hash, bitSet); + } + public ULongBloomFilter(long seed, int n, double p) { - super(new Hash(seed, n, p) { + super(new Hash<>(seed, n, p) { @Override protected Hasher newHasher() { return new ULongHasher(); @@ -288,7 +326,7 @@ protected Hasher newHasher() { } public ULongBloomFilter(long seed, int m, int k, long[] bits) { - super(new Hash(seed, k, m) { + super(new Hash<>(seed, k, m) { @Override protected Hasher newHasher() { return new ULongHasher(); @@ -296,6 +334,11 @@ protected Hasher newHasher() { }, BitSet.valueOf(bits)); } + @Override + public BloomFilter clone() { + return new ULongBloomFilter(h.clone(), (BitSet) bits.clone()); + } + @Override protected Biff.Type getType() { return ULONG; diff --git a/cryptography/src/main/java/com/salesforce/apollo/cryptography/HexBloom.java b/cryptography/src/main/java/com/salesforce/apollo/cryptography/HexBloom.java index 90714cf0d..c2480c1fe 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/cryptography/HexBloom.java +++ b/cryptography/src/main/java/com/salesforce/apollo/cryptography/HexBloom.java @@ -26,6 +26,7 @@ * @author hal.hildebrand */ public class HexBloom { + public static final double DEFAULT_FPR = 0.0001; public static final long DEFAULT_SEED = Primes.PRIMES[666]; private static final Function IDENTITY = d -> d; @@ -288,6 +289,25 @@ public static List> hashWraps(int crowns) { return IntStream.range(0, crowns).mapToObj(i -> hashWrap(i)).toList(); } + public HexBloom add(Digest d, List> hashes) { + return addAll(Collections.singletonList(d), hashes); + } + + public HexBloom addAll(List added, List> hashes) { + var nextCard = cardinality + added.size(); + var nextMembership = membership.clone(); + var crwns = Arrays.stream(crowns).map(AtomicReference::new).toList(); + + added.forEach(d -> { + for (int i = 0; i < crwns.size(); i++) { + crwns.get(i).accumulateAndGet(hashes.get(i).apply(d), Digest::xor); + } + nextMembership.add(d); + }); + + return new HexBloom(nextCard, crwns.stream().map(AtomicReference::get).toList(), nextMembership); + } + public Digest compact() { if (crowns.length == 1) { return crowns[0]; diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 91597bb46..4e107d8a9 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -274,7 +274,7 @@ private void join(Redirect redirect, Digest v, Duration duration) { return; } redirecting.iterate((link, m) -> { - if (!view.started.get()) { + if (gateway.isDone() || !view.started.get()) { return null; } log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId()); diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java index 803dcfa44..ec5aa1df1 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Parameters.java @@ -13,7 +13,7 @@ */ public record Parameters(int joinRetries, int minimumBiffCardinality, int rebuttalTimeout, int viewChangeRounds, int finalizeViewRounds, double fpr, int maximumTxfr, Duration retryDelay, int maxPending, - Duration seedingTimeout, int validationRetries, int crowns) { + Duration seedingTimeout, int validationRetries, int crowns, Duration populateDuration) { public static Builder newBuilder() { return new Builder(); @@ -68,11 +68,12 @@ public static class Builder { * Minimum number of rounds to check for view change */ private int viewChangeRounds = 7; + private Duration populateDuration = Duration.ofMillis(20); public Parameters build() { return new Parameters(joinRetries, minimumBiffCardinality, rebuttalTimeout, viewChangeRounds, finalizeViewRounds, fpr, maximumTxfr, retryDelay, maxPending, seedingTimout, - validationRetries, crowns); + validationRetries, crowns, populateDuration); } public int getCrowns() { @@ -138,6 +139,15 @@ public Builder setMinimumBiffCardinality(int minimumBiffCardinality) { return this; } + public Duration getPopulateDuration() { + return populateDuration; + } + + public Builder setPopulateDuration(Duration populateDuration) { + this.populateDuration = populateDuration; + return this; + } + public int getRebuttalTimeout() { return rebuttalTimeout; } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 7b2b499b2..e337a5dff 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -137,6 +137,7 @@ public View(DynamicContext context, ControlledIdentifierMember memb r -> new EntranceServer(gateway.getClientIdentityProvider(), r, metrics), EntranceClient.getCreate(metrics), Entrance.getLocalLoopback(node, service)); gossiper = new RingCommunications<>(context, node, comm); + gossiper.allowDuplicates(); this.validation = validation; this.verifiers = verifiers; } @@ -443,7 +444,7 @@ void processUpdates(Gossip gossip) { * @param ring */ boolean redirect(Participant member, Gossip gossip, int ring) { - if (!gossip.hasRedirect()) { + if (gossip.getRedirect().equals(SignedNote.getDefaultInstance())) { log.warn("Redirect from: {} on ring: {} did not contain redirect member note on: {}", member.getId(), ring, node.getId()); return false; @@ -615,15 +616,11 @@ protected Gossip gossip(Fireflies link, int ring) { .setRing(ring) .setGossip(commonDigests()) .build()); + log.info("gossiping with: {} on: {}", link.getMember().getId(), node.getId()); try { return link.gossip(gossip); } catch (Throwable e) { final var p = (Participant) link.getMember(); - if (!viewManagement.joined()) { - log.debug("Exception: {} bootstrap gossiping with:S {} view: {} on: {}", e.getMessage(), p.getId(), - currentView(), node.getId()); - return null; - } if (e instanceof StatusRuntimeException sre) { switch (sre.getStatus().getCode()) { case PERMISSION_DENIED: @@ -653,7 +650,8 @@ protected Gossip gossip(Fireflies link, int ring) { } } else { - log.debug("Exception gossiping with: {} view: {} on: {}", p.getId(), currentView(), node.getId(), e); + log.debug("Exception gossiping joined: {} with: {} view: {} on: {}", viewManagement.joined(), p.getId(), + currentView(), node.getId(), e); accuse(p, ring, e); } return null; @@ -1064,7 +1062,7 @@ private void gossip(Optional result, RingCommunications.Destination redirect(member, gossip, destination.ring())); } else if (viewManagement.joined()) { try { @@ -1298,7 +1296,7 @@ private void recover(Participant member) { return; } if (context.activate(member)) { - log.debug("Recovering: {} cardinality: {} count: {} on: {}", member.getId(), viewManagement.cardinality(), + log.trace("Recovering: {} cardinality: {} count: {} on: {}", member.getId(), viewManagement.cardinality(), context.totalCount(), node.getId()); } } @@ -1334,7 +1332,7 @@ private Gossip redirectTo(Participant member, int ring, Participant successor, D .setObservations(processObservations(BloomFilter.from(digests.getObservationBff()))) .setJoins(viewManagement.processJoins(BloomFilter.from(digests.getJoinBiff()))) .build(); - log.trace("Redirecting: {} to: {} on ring: {} notes: {} acc: {} obv: {} joins: {} on: {}", member.getId(), + log.trace("Redirect: {} to: {} on ring: {} notes: {} acc: {} obv: {} joins: {} on: {}", member.getId(), successor.getId(), ring, gossip.getNotes().getUpdatesCount(), gossip.getAccusations().getUpdatesCount(), gossip.getObservations().getUpdatesCount(), gossip.getJoins().getUpdatesCount(), node.getId()); @@ -1932,6 +1930,7 @@ public Gossip rumors(SayWhat request, Digest from) { final var digests = request.getGossip(); if (!successor.equals(node)) { g = redirectTo(member, ring, successor, digests); + log.info("Redirected: {} on: {}", member.getId(), node.getId()); } else { g = Gossip.newBuilder() .setNotes(processNotes(from, BloomFilter.from(digests.getNoteBff()), params.fpr())) diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 7124e4fd3..5a62ea8d0 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -386,7 +386,7 @@ void populate(List sample) { return view.gossip(link, 0); }, (futureSailor, link, m) -> { futureSailor.ifPresent(g -> { - if (g.hasRedirect()) { + if (!g.getRedirect().equals(SignedNote.getDefaultInstance())) { final Participant member = (Participant) link.getMember(); view.stable(() -> view.redirect(member, g, 0)); } else { @@ -396,10 +396,10 @@ void populate(List sample) { return !joined(); }, () -> { if (!joined()) { - scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(repopulate.get(), log)), 500, - TimeUnit.MILLISECONDS); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(repopulate.get(), log)), + params.populateDuration().toNanos(), TimeUnit.NANOSECONDS); } - }, Duration.ofMillis(500))); + }, params.populateDuration())); repopulate.get().run(); } diff --git a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java index ddb713b19..5b1167d8d 100644 --- a/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java +++ b/fireflies/src/test/java/com/salesforce/apollo/fireflies/SwarmTest.java @@ -180,6 +180,17 @@ public void swarm() throws Exception { } assertTrue(testGraph.isSC()); } + + var ringRef = views.get(0).getContext().rings().toList(); + for (var v : views) { + var tested = v.getContext().rings().toList(); + for (int i = 0; i < ringRef.size(); i++) { + var r = ringRef.get(i); + var t = tested.get(i); + assertEquals(r.getRing(), t.getRing()); + assertEquals(r.getRing(), t.getRing()); + } + } } communications.forEach(e -> e.close(Duration.ofSeconds(1))); views.forEach(view -> view.stop()); diff --git a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java index 52a37ec43..3f13824cf 100644 --- a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java +++ b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java @@ -415,7 +415,7 @@ public void enroll(Notarization request, Digest from) { log.warn("Invalid notarization for: {} from: {} on: {}", identifier, from, member.getId()); throw new StatusRuntimeException(Status.UNAUTHENTICATED.withDescription("Invalid notarization")); } - log.info("Enrolling notorization for: {} from: {} on: {}", identifier, from, member.getId()); + log.info("Enrolling notarization for: {} from: {} on: {}", identifier, from, member.getId()); Gorgoneion.this.enroll(request); } diff --git a/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java b/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java index fe6a7d963..dce34e277 100644 --- a/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java +++ b/memberships/src/main/java/com/salesforce/apollo/context/DynamicContextImpl.java @@ -16,6 +16,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; @@ -41,7 +42,7 @@ public class DynamicContextImpl implements DynamicContext { private final Map> members = new ConcurrentSkipListMap<>(); private final Map> membershipListeners = new ConcurrentHashMap<>(); private final double pByz; - private final List> rings = new ArrayList<>(); + private final List> rings = new CopyOnWriteArrayList<>(); private volatile int cardinality; public DynamicContextImpl(Digest id, int cardinality, double pbyz, int bias) { diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java b/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java index 68ede41e4..91286415a 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/RingCommunications.java @@ -36,11 +36,11 @@ public class RingCommunications { final SigningMember member; private final CommonCommunications comm; private final Direction direction; - private final boolean ignoreSelf; private final Lock lock = new ReentrantLock(); private final List> traversalOrder = new ArrayList<>(); protected boolean noDuplicates = true; volatile int currentIndex = -1; + private boolean ignoreSelf; public RingCommunications(Context context, SigningMember member, CommonCommunications comm) { this(context, member, comm, false); @@ -66,6 +66,10 @@ public RingCommunications allowDuplicates() { return this; } + public void dontIgnoreSelf() { + this.ignoreSelf = false; + } + public void execute(BiFunction round, SyncHandler handler) { final var next = next(member.getId()); if (next == null || next.member == null) { @@ -81,13 +85,17 @@ public void execute(BiFunction round, SyncHandler noDuplicates() { noDuplicates = true; return this; } public void reset() { - currentIndex = 0; + currentIndex = -1; traversalOrder.clear(); log.trace("Reset on: {}", member.getId()); } @@ -102,6 +110,10 @@ List> calculateTraversal(Digest digest) { var traversal = new ArrayList>(); var traversed = new TreeSet(); for (int ring = 0; ring < context.getRingCount(); ring++) { + if (context.size() == 1) { + traversal.add(new iteration<>((T) member, ring)); + continue; + } T successor = direction.retrieve(context, ring, digest, m -> { if (ignoreSelf && m.equals(member)) { return Context.IterateResult.CONTINUE; diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java index 48313ec80..1fa04a7e9 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java @@ -114,7 +114,7 @@ private void internalIterate(Digest digest, Runnable onMajority, BiFunction< var next = next(digest); log.trace("Iteration: {} tally: {} for digest: {} on: {} ring: {} complete: false on: {}", iteration(), - tally.get(), digest, context.getId(), next.ring(), member.getId()); + tally.get(), digest, context.getId(), next == null ? "" : next.ring(), member.getId()); if (next == null || next.link() == null) { log.trace("No successor found for digest: {} on: {} iteration: {} traversed: {} ring: {} on: {}", digest, context.getId(), iteration(), traversed, currentIndex, member.getId()); @@ -125,8 +125,8 @@ private void internalIterate(Digest digest, Runnable onMajority, BiFunction< digest, context.getId(), tally.get(), member.getId()); schedule(proceed); } else { - log.trace("Completed on iteration: {} on: {} for digest: {} for: {} tally: {} on: {}", iteration(), - digest, context.getId(), tally.get(), member.getId()); + log.trace("Completed on iteration: {} for digest: {} for: {} tally: {} on: {}", iteration(), digest, + context.getId(), tally.get(), member.getId()); } return; } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index b029eee4a..291995bde 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -198,18 +198,13 @@ public KeyState_ append(AttachmentEvent event) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null, - (link, r) -> link.append( - Collections.emptyList(), - Collections.singletonList( - event)), null, - (tally, futureSailor, destination) -> mutate( - gathered, futureSailor, - identifier, isTimedOut, - tally, destination, - "append events"), - t -> completeIt(result, - gathered)); + var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + iterator.dontIgnoreSelf(); + iterator.iterate(identifier, null, + (link, r) -> link.append(Collections.emptyList(), Collections.singletonList(event)), null, + (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, + tally, destination, "append events"), + t -> completeIt(result, gathered)); try { List s = result.get().getKeyStatesList(); return s.isEmpty() ? null : s.getFirst(); @@ -239,16 +234,12 @@ public List append(KERL_ kerl) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null, - (link, r) -> link.append( - kerl), null, - (tally, futureSailor, destination) -> mutate( - gathered, futureSailor, - identifier, isTimedOut, - tally, destination, - "append kerl"), - t -> completeIt(result, - gathered)); + var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + iterator.dontIgnoreSelf(); + iterator.iterate(identifier, null, (link, r) -> link.append(kerl), null, + (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, + tally, destination, "append kerl"), + t -> completeIt(result, gathered)); try { return result.get().getKeyStatesList(); } catch (InterruptedException e) { @@ -272,17 +263,12 @@ public KeyState_ append(KeyEvent_ event) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null, - (link, r) -> link.append( - Collections.singletonList( - event)), null, - (tally, futureSailor, destination) -> mutate( - gathered, futureSailor, - identifier, isTimedOut, - tally, destination, - "append events"), - t -> completeIt(result, - gathered)); + var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + iterator.dontIgnoreSelf(); + iterator.iterate(identifier, null, (link, r) -> link.append(Collections.singletonList(event)), null, + (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, + tally, destination, "append event"), + t -> completeIt(result, gathered)); try { var ks = result.get(); return ks.getKeyStatesCount() == 0 ? KeyState_.getDefaultInstance() : ks.getKeyStatesList().getFirst(); @@ -334,16 +320,12 @@ public Empty appendAttachments(List events) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null, - (link, r) -> link.appendAttachments( - events), null, - (tally, futureSailor, destination) -> mutate( - gathered, futureSailor, - identifier, isTimedOut, - tally, destination, - "append attachments"), - t -> completeIt(result, - gathered)); + var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + iterator.dontIgnoreSelf(); + iterator.iterate(identifier, null, (link, r) -> link.appendAttachments(events), null, + (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, + tally, destination, "append attachments"), + t -> completeIt(result, gathered)); try { return result.get(); @@ -372,16 +354,12 @@ public Empty appendValidations(Validations validations) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null, - (link, r) -> link.appendValidations( - validations), null, - (tally, futureSailor, destination) -> mutate( - gathered, futureSailor, - identifier, isTimedOut, - tally, destination, - "append validations"), - t -> completeIt(result, - gathered)); + var iterator = new RingIterator(operationsFrequency, context, member, scheduler, dhtComms); + iterator.dontIgnoreSelf(); + iterator.iterate(identifier, null, (link, r) -> link.appendValidations(validations), null, + (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, + tally, destination, "append validations"), + t -> completeIt(result, gathered)); try { return result.get(); } catch (InterruptedException e) { @@ -799,8 +777,9 @@ private void completeIt(CompletableFuture result, HashMultiset gathere .stream() .max(Ordering.natural().onResultOf(Multiset.Entry::getCount)) .orElse(null); + var majority = context.size() == 1 ? 1 : context.majority(); if (max != null) { - if (max.getCount() >= context.majority()) { + if (max.getCount() >= majority) { try { result.complete(max.getElement()); } catch (Throwable t) { @@ -810,8 +789,8 @@ private void completeIt(CompletableFuture result, HashMultiset gathere } } result.completeExceptionally(new CompletionException( - "Unable to achieve majority, max: " + (max == null ? 0 : max.getCount()) + " required: " + context.majority() - + " on: " + member.getId())); + "Unable to achieve majority, max: " + (max == null ? 0 : max.getCount()) + " required: " + majority + " on: " + + member.getId())); } private boolean failedMajority(CompletableFuture result, int maxAgree, String operation) { @@ -867,17 +846,18 @@ private boolean mutate(HashMultiset gathered, Optional futureSailor, D Supplier isTimedOut, AtomicInteger tally, RingCommunications.Destination destination, String action) { if (futureSailor.isEmpty()) { - log.debug("Failed {}: {} from: {} on: {}", action, identifier, destination.member().getId(), - member.getId()); + log.debug("Failed {}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), + destination.member().getId(), member.getId()); return !isTimedOut.get(); } T content = futureSailor.get(); - log.trace("{}: {} from: {} on: {}", action, identifier, destination.member().getId(), member.getId()); gathered.add(content); gathered.entrySet() .stream() .max(Ordering.natural().onResultOf(Entry::getCount)) .ifPresent(max -> tally.set(max.getCount())); + log.warn("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(), + member.getId()); return !isTimedOut.get(); } @@ -885,20 +865,23 @@ private boolean read(CompletableFuture result, HashMultiset gathered, Optional futureSailor, Digest identifier, Supplier isTimedOut, RingCommunications.Destination destination, String action, T empty) { if (futureSailor.isEmpty()) { - log.debug("Failed {}: {} from: {} on: {}", action, identifier, destination.member().getId(), - member.getId()); + log.debug("Failed {}: {} tally: {} from: {} on: {}", action, identifier, tally, + destination.member().getId(), member.getId()); return !isTimedOut.get(); } T content = futureSailor.get(); - log.trace("{}: {} from: {} on: {}", action, identifier, destination.member().getId(), member.getId()); + log.trace("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(), + member.getId()); gathered.add(content); var max = max(gathered); if (max != null) { tally.set(max.getCount()); - final var majority = tally.get() >= context.majority(); + var ctxMajority = context.size() == 1 ? 1 : context.majority(); + final var majority = tally.get() >= ctxMajority; if (majority) { result.complete(max.getElement()); - log.debug("Majority: {} achieved: {}: {} on: {}", max.getCount(), action, identifier, member.getId()); + log.debug("Majority: {} achieved: {}: {} tally: {} on: {}", max.getCount(), action, identifier, + tally.get(), member.getId()); return false; } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java b/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java index 6e788824e..7b25f6ad9 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java @@ -128,7 +128,8 @@ record validator(EstablishmentEvent validating, JohnHancock signature) { r.validating.getIdentifier()); } } - var validated = verified >= context.majority(); + var ctxMajority = context.size() == 1 ? 1 : context.majority(); + var validated = verified >= ctxMajority; log.trace("Validated: {} valid: {} out of: {} required: {} for: {} ", validated, verified, mapped.size(), ctx.majority(), event.getCoordinates());