From 3ce0e66ea6c36832e807794dbda1d575354889f2 Mon Sep 17 00:00:00 2001 From: Constantine Date: Sat, 15 Jun 2024 15:55:42 -0700 Subject: [PATCH] misc-7 (#210) * fix invalid transition * use tolerance + 1 for majority, rather than 2/3+1 in DHT/iterations. * limit bootstrapping to min of 4. fix RingIterator logic. * generic gate on view bootstrapping * prevent state transition errors on failure * gate view change in KerlDHT * tighten up the screws * use majority for observer's view change. utility for listing network interfaces * better ViewChange * better view change, bug fixes, serialize Gorgoneion endorsement * provide RingIterator functionality on the SliceIterator. virtual synchrony using SI for KerlDHT * emit last block as beacon if no transactions to publish * cleanup * cleanup --- .../com/salesforce/apollo/choam/CHOAM.java | 22 +- .../apollo/choam/GenesisAssembly.java | 23 +- .../salesforce/apollo/choam/Parameters.java | 22 +- .../com/salesforce/apollo/choam/Producer.java | 20 +- .../salesforce/apollo/choam/ViewAssembly.java | 4 + .../salesforce/apollo/choam/fsm/Combine.java | 65 ++++ .../salesforce/apollo/choam/fsm/Genesis.java | 11 +- .../apollo/choam/fsm/Reconfiguration.java | 10 + .../choam/support/CheckpointAssembler.java | 4 +- .../apollo/choam/support/TxDataSource.java | 8 +- .../salesforce/apollo/choam/DynamicTest.java | 6 +- .../apollo/choam/MembershipTests.java | 1 - .../choam/support/TxDataSourceTest.java | 17 +- .../cryptography/SignatureAlgorithm.java | 2 +- .../ethereal/memberships/ChRbcGossip.java | 15 +- .../salesforce/apollo/fireflies/Binding.java | 8 +- .../com/salesforce/apollo/fireflies/View.java | 192 ++++++------ .../apollo/fireflies/ViewManagement.java | 39 +-- .../apollo/gorgoneion/Gorgoneion.java | 29 +- .../comm/endorsement/Endorsement.java | 9 +- .../apollo/archipelago/EndpointProvider.java | 14 +- .../archipelago/StandardEpProvider.java | 7 +- .../salesforce/apollo/context/ViewChange.java | 8 + .../salesforce/apollo/ring/RingIterator.java | 9 +- .../salesforce/apollo/ring/SliceIterator.java | 82 ++++-- .../apollo/ring/SliceIteratorTest.java | 7 +- .../apollo/model/ProcessDomain.java | 19 +- .../apollo/model/ContainmentDomainTest.java | 42 ++- .../salesforce/apollo/model/DomainTest.java | 6 +- .../apollo/model/FireFliesTest.java | 50 ++-- .../salesforce/apollo/protocols/ListNIFs.java | 41 +++ .../apollo/state/AbstractLifecycleTest.java | 1 - .../services/grpc/kerl/KERLAdapter.java | 11 +- .../com/salesforce/apollo/thoth/KerlDHT.java | 277 ++++++++++-------- 34 files changed, 612 insertions(+), 469 deletions(-) create mode 100644 memberships/src/main/java/com/salesforce/apollo/context/ViewChange.java create mode 100644 protocols/src/main/java/com/salesforce/apollo/protocols/ListNIFs.java diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index c50ed31b1..104d086bc 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -25,6 +25,7 @@ import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DelegatedContext; import com.salesforce.apollo.context.StaticContext; +import com.salesforce.apollo.context.ViewChange; import com.salesforce.apollo.cryptography.*; import com.salesforce.apollo.cryptography.Signer.SignerImpl; import com.salesforce.apollo.cryptography.proto.PubKey; @@ -322,15 +323,14 @@ public String logState() { /** * A view change has occurred - * - * @param context - the new membership context - * @param diadem - the compact HexBloom of the context view */ - public void rotateViewKeys(Context context, Digest diadem) { + public void rotateViewKeys(ViewChange viewChange) { + var context = viewChange.context(); + var diadem = viewChange.diadem(); ((DelegatedContext) combine.getContext()).setContext(context); var c = current.get(); if (c != null) { - c.nextView(diadem, context); + c.nextView(viewChange.diadem(), context); } else { log.info("Acquiring new view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(), params.member().getId()); @@ -361,11 +361,11 @@ public void stop() { } session.cancelAll(); try { - linear.shutdown(); + linear.shutdownNow(); } catch (Throwable e) { } try { - executions.shutdown(); + executions.shutdownNow(); } catch (Throwable e) { } final var c = current.get(); @@ -1239,16 +1239,16 @@ private void synchronizationFailed() { cancelSynchronization(); Context memberContext = context(); var activeCount = memberContext.size(); - var majority = params.majority(); - if (params.generateGenesis() && activeCount >= majority) { + var count = context().getRingCount(); + if (params.generateGenesis() && activeCount >= context().getRingCount()) { if (current.get() == null && current.compareAndSet(null, new Formation())) { log.info( "Quorum achieved, triggering regeneration. members: {} required: {} forming Genesis committee on: {}", - activeCount, majority, params.member().getId()); + activeCount, count, params.member().getId()); transitions.regenerate(); } else { log.info("Quorum achieved, members: {} required: {} existing committee: {} on: {}", activeCount, - majority, current.get().getClass().getSimpleName(), params.member().getId()); + count, current.get().getClass().getSimpleName(), params.member().getId()); } } else { final var c = current.get(); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index a5ee566f6..a13861270 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -93,7 +93,7 @@ public GenesisAssembly(ViewContext vc, CommonCommunications comms, } else { config.setPid(pid).setnProc((short) view.roster().size()); } - config.setEpochLength(7).setNumberOfEpochs(3); + config.setEpochLength(33).setNumberOfEpochs(-1); config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId()); controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(), transitions::process, transitions::nextEpoch, label); @@ -106,13 +106,11 @@ public GenesisAssembly(ViewContext vc, CommonCommunications comms, @Override public void certify() { - if (slate.size() < params().majority()) { - log.info("Not certifying genesis for: {} need: {} slate incomplete: {} on: {}", view.context().getId(), - params().majority(), slate.keySet().stream().sorted().toList(), params().member().getId()); + if (slate.size() != nextAssembly.size()) { + log.info("Not certifying genesis for: {} slate incomplete: {} on: {}", view.context().getId(), + slate.keySet().stream().sorted().toList(), params().member().getId()); return; } - assert slate.size() >= params().majority() : "Expected: %s members, slate: %s".formatted(params().majority(), - slate.size()); reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(), new NullBlock( params().digestAlgorithm()))); @@ -130,7 +128,7 @@ public void certify(List preblock, boolean last) { try { return Validate.parseFrom(bs); } catch (InvalidProtocolBufferException e) { - log.warn("Unable to parse preblock: {} on: {}", bs, params().member().getId(), e); + log.trace("Unable to parse preblock: {} on: {}", bs, params().member().getId(), e); return null; } }).filter(Objects::nonNull).filter(v -> !v.equals(Validate.getDefaultInstance())).forEach(this::certify); @@ -154,7 +152,7 @@ public void gather(List preblock, boolean last) { try { return Join.parseFrom(bs); } catch (InvalidProtocolBufferException e) { - log.warn("error parsing join: {} on: {}", bs, params().member().getId(), e); + log.trace("error parsing join: {} on: {}", bs, params().member().getId(), e); return null; } }) @@ -163,6 +161,9 @@ public void gather(List preblock, boolean last) { .peek(j -> log.info("Gathering: {} on: {}", Digest.from(j.getMember().getVm().getId()), params().member().getId())) .forEach(this::join); + if (slate.size() == nextAssembly.size()) { + transitions.gathered(); + } } @Override @@ -172,7 +173,7 @@ public void nominations(List preblock, boolean last) { try { return Validations.parseFrom(bs); } catch (InvalidProtocolBufferException e) { - log.warn("error parsing validations: {} on: {}", bs, params().member().getId(), e); + log.trace("error parsing validations: {} on: {}", bs, params().member().getId(), e); return null; } }) @@ -187,12 +188,12 @@ public void publish() { log.trace("Cannot publish genesis, reconfiguration is NULL on: {}", params().member().getId()); return; } - if (witnesses.size() < params().majority()) { + if (witnesses.size() < nextAssembly.size()) { log.trace("Cannot publish genesis: {} with: {} witnesses on: {}", reconfiguration.hash, witnesses.size(), params().member().getId()); return; } - if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < params().majority()) { + if (reconfiguration.block.getGenesis().getInitialView().getJoinsCount() < nextAssembly.size()) { log.trace("Cannot publish genesis: {} with: {} joins on: {}", reconfiguration.hash, reconfiguration.block.getGenesis().getInitialView().getJoinsCount(), params().member().getId()); return; diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java b/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java index 2a3449d64..6241fb072 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java @@ -56,7 +56,7 @@ public record Parameters(Parameters.RuntimeParameters runtime, ReliableBroadcast Parameters.BootstrapParameters bootstrap, Parameters.ProducerParameters producer, Parameters.MvStoreBuilder mvBuilder, Parameters.LimiterBuilder txnLimiterBuilder, ExponentialBackoffPolicy.Builder submitPolicy, int checkpointSegmentSize, - ExponentialBackoffPolicy.Builder drainPolicy, boolean generateGenesis) { + boolean generateGenesis) { public static Builder newBuilder() { return new Builder(); @@ -677,14 +677,6 @@ public static class Builder implements Cloneable { private ReliableBroadcaster.Parameters combine = ReliableBroadcaster.Parameters.newBuilder() .build(); private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT; - private ExponentialBackoffPolicy.Builder drainPolicy = ExponentialBackoffPolicy.newBuilder() - .setInitialBackoff( - Duration.ofMillis(5)) - .setJitter(0.2) - .setMultiplier(1.2) - .setMaxBackoff( - Duration.ofMillis( - 500)); private Digest genesisViewId; private Duration gossipDuration = Duration.ofSeconds(1); private int maxCheckpointSegments = 200; @@ -709,7 +701,7 @@ public Parameters build(RuntimeParameters runtime) { return new Parameters(runtime, combine, gossipDuration, maxCheckpointSegments, submitTimeout, genesisViewId, checkpointBlockDelta, crowns, digestAlgorithm, viewSigAlgorithm, synchronizationCycles, regenerationCycles, bootstrap, producer, mvBuilder, - txnLimiterBuilder, submitPolicy, checkpointSegmentSize, drainPolicy, generateGenesis); + txnLimiterBuilder, submitPolicy, checkpointSegmentSize, generateGenesis); } @Override @@ -726,7 +718,6 @@ public Builder clone() { producer.batchInterval, producer.maxBatchCount(), producer.maxGossipDelay)); clone.setTxnLimiterBuilder(txnLimiterBuilder.clone()); clone.setSubmitPolicy(submitPolicy.clone()); - clone.setDrainPolicy(drainPolicy.clone()); return clone; } @@ -783,15 +774,6 @@ public Builder setDigestAlgorithm(DigestAlgorithm digestAlgorithm) { return this; } - public ExponentialBackoffPolicy.Builder getDrainPolicy() { - return drainPolicy; - } - - public Builder setDrainPolicy(ExponentialBackoffPolicy.Builder drainPolicy) { - this.drainPolicy = drainPolicy; - return this; - } - public Digest getGenesisViewId() { return genesisViewId; } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 19bc5580f..d6dbc33f8 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -57,7 +57,6 @@ public class Producer { private final Semaphore serialize = new Semaphore(1); private final ViewAssembly assembly; private final int maxEpoch; - private volatile int emptyPreBlocks = 0; private volatile boolean assembled = false; public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label) { @@ -76,8 +75,7 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash maxEpoch = ep.getEpochLength(); ds = new TxDataSource(params.member(), blocks, params.metrics(), producerParams.maxBatchByteSize(), - producerParams.batchInterval(), producerParams.maxBatchCount(), - params().drainPolicy().build()); + producerParams.batchInterval(), producerParams.maxBatchCount()); log.debug("Producer max elements: {} reconfiguration epoch: {} on: {}", blocks, maxEpoch, params.member().getId()); @@ -206,7 +204,7 @@ private void create(List preblock, boolean last) { processAssemblies(aggregate); processTransactions(last, aggregate); if (last) { - started.set(true); + started.set(false); transitions.lastBlock(); } } @@ -255,15 +253,11 @@ private void processTransactions(boolean last, List aggregate) { final var txns = aggregate.stream().flatMap(e -> e.getTransactionsList().stream()).toList(); if (txns.isEmpty()) { - var empty = emptyPreBlocks + 1; - emptyPreBlocks = empty; - if (empty % 5 == 0) { - pending.values() - .stream() - .filter(pb -> pb.published.get()) - .max(Comparator.comparing(pb -> pb.block.height())) - .ifPresent(pb -> publish(pb, true)); - } + pending.values() + .stream() + .filter(pb -> pb.published.get()) + .max(Comparator.comparing(pb -> pb.block.height())) + .ifPresent(pb -> publish(pb, true)); return; } log.trace("transactions: {} combined hash: {} height: {} on: {}", txns.size(), diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index c6b854426..cf31987be 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -401,6 +401,7 @@ public String toString() { private class Recon implements Reconfiguration { @Override public void certify() { + countdown.set(-1); if (proposals.size() == selected.assembly.size()) { log.info("Certifying: {} majority: {} of: {} slate: {} on: {}", nextViewId, selected.majority, nextViewId, proposals.keySet().stream().sorted().toList(), params().member().getId()); @@ -440,6 +441,7 @@ public void chill() { @Override public void complete() { + countdown.set(-1); ViewAssembly.this.complete(); } @@ -454,12 +456,14 @@ public void convened() { @Override public void failed() { + countdown.set(-1); view.onFailure(); log.debug("Failed view assembly for: {} on: {}", nextViewId, params().member().getId()); } @Override public void finish() { + countdown.set(-1); started.set(false); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Combine.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Combine.java index 93dcbe4a1..788b0db61 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Combine.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Combine.java @@ -110,6 +110,71 @@ public Transitions rotateViewKeys() { public void failIt() { context().fail(); } + + @Override + public Transitions beginCheckpoint() { + return null; + } + + @Override + public Transitions bootstrap(HashedCertifiedBlock anchor) { + return null; + } + + @Override + public Transitions combine() { + return null; + } + + @Override + public Transitions fail() { + return null; + } + + @Override + public Transitions finishCheckpoint() { + return null; + } + + @Override + public Transitions nextView() { + return null; + } + + @Override + public Transitions regenerate() { + return null; + } + + @Override + public Transitions regenerated() { + return null; + } + + @Override + public Transitions rotateViewKeys() { + return null; + } + + @Override + public Transitions start() { + return null; + } + + @Override + public Transitions synchd() { + return null; + } + + @Override + public Transitions synchronizationFailed() { + return null; + } + + @Override + public Transitions synchronizing() { + return null; + } }, RECOVERING { @Override public Transitions bootstrap(HashedCertifiedBlock anchor) { diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java index e917bc716..e2b5a2532 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java @@ -55,8 +55,12 @@ public void gather() { @Override public Transitions nextEpoch(Integer epoch) { - return epoch.equals(0) ? null : CERTIFICATION; + return null; + } + @Override + public Transitions gathered() { + return CERTIFICATION; } @Override @@ -80,11 +84,14 @@ public Transitions process(List preblock, boolean last) { return null; } } - } interface Transitions extends FsmExecutor { + default Transitions gathered() { + throw fsm().invalidTransitionOn(); + } + default Transitions nextEpoch(Integer epoch) { throw fsm().invalidTransitionOn(); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java index 41532259b..8e9dc9a49 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java @@ -75,6 +75,11 @@ public Transitions countdownCompleted() { return RECONFIGURED; } + @Override + public Transitions checkAssembly() { + return null; + } + // See if we already have a full complement of Joins of the next committee // if not set a deadline @Entry @@ -160,6 +165,11 @@ public Transitions complete() { return null; } + @Override + public Transitions countdownCompleted() { + return null; + } + // Complete the configuration protocol // The slate of the ViewAssembly now contains // the SignedViewMembers of the next committee diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java index e382e8f5b..881c8af6f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/CheckpointAssembler.java @@ -119,10 +119,10 @@ private void gossip(ScheduledExecutorService scheduler, Duration duration) { var ringer = new SliceIterator<>("Assembly[%s:%s]".formatted(diadem.compactWrapped(), member.getId()), member, committee, comms); - ringer.iterate((link, m) -> { + ringer.iterate((link) -> { log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId()); return gossip(link); - }, (result, link, m) -> gossip(result), () -> { + }, (result, _, _) -> gossip(result), () -> { if (!assembled.isDone()) { scheduler.schedule( () -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(scheduler, duration), log)), diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java index 8053d4878..de81a7712 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java @@ -38,7 +38,6 @@ public class TxDataSource implements DataSource { private final Duration batchInterval; private final AtomicBoolean draining = new AtomicBoolean(); - private final ExponentialBackoffPolicy drainPolicy; private final Member member; private final ChoamMetrics metrics; private final BatchingQueue processing; @@ -47,10 +46,9 @@ public class TxDataSource implements DataSource { private volatile Thread blockingThread; public TxDataSource(Member member, int maxElements, ChoamMetrics metrics, int maxBatchByteSize, - Duration batchInterval, int maxBatchCount, ExponentialBackoffPolicy drainPolicy) { + Duration batchInterval, int maxBatchCount) { this.member = member; this.batchInterval = batchInterval; - this.drainPolicy = drainPolicy; processing = new BatchingQueue(maxElements, maxBatchCount, tx -> tx.toByteString().size(), maxBatchByteSize); this.metrics = metrics; @@ -85,7 +83,7 @@ public ByteString getData() { var v = new ArrayList(); if (draining.get()) { - var target = Instant.now().plus(drainPolicy.nextBackoff()); + var target = Instant.now().plus(batchInterval); while (target.isAfter(Instant.now()) && builder.getAssembliesCount() == 0 && builder.getValidationsCount() == 0) { // rinse and repeat @@ -103,7 +101,7 @@ public ByteString getData() { // sleep waiting for input try { - Thread.sleep(drainPolicy.getInitialBackoff().dividedBy(2).toMillis()); + Thread.sleep(batchInterval.dividedBy(2).toMillis()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return ByteString.EMPTY; diff --git a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java index 7cb99d6d1..332da3e8b 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/DynamicTest.java @@ -4,7 +4,6 @@ import com.salesforce.apollo.archipelago.Router; import com.salesforce.apollo.archipelago.ServerConnectionCache; import com.salesforce.apollo.archipelago.UnsafeExecutors; -import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContext; import com.salesforce.apollo.cryptography.DigestAlgorithm; @@ -86,10 +85,7 @@ public void setUp() throws Exception { .setNumberOfEpochs(3) .setEpochLength(7)) .build()) - .setCheckpointBlockDelta(checkpointBlockSize) - .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() - .setInitialBackoff(Duration.ofMillis(1)) - .setMaxBackoff(Duration.ofMillis(1))); + .setCheckpointBlockDelta(checkpointBlockSize); members.subList(0, 4).forEach(m -> { var context = (DynamicContext) contextBuilder.build(); diff --git a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java index effd854f5..f9324e628 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/MembershipTests.java @@ -152,7 +152,6 @@ public SigningMember initialize(int checkpointBlockSize, int cardinality) throws .build()) .setGenerateGenesis(true) .setCheckpointBlockDelta(checkpointBlockSize); - params.getDrainPolicy().setInitialBackoff(Duration.ofMillis(1)).setMaxBackoff(Duration.ofMillis(1)); params.getProducer().ethereal().setNumberOfEpochs(2).setEpochLength(20); var entropy = SecureRandom.getInstance("SHA1PRNG"); diff --git a/choam/src/test/java/com/salesforce/apollo/choam/support/TxDataSourceTest.java b/choam/src/test/java/com/salesforce/apollo/choam/support/TxDataSourceTest.java index 4f89b2f7f..2583a2bc9 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/support/TxDataSourceTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/support/TxDataSourceTest.java @@ -6,15 +6,6 @@ */ package com.salesforce.apollo.choam.support; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import java.security.SecureRandom; -import java.time.Duration; - -import org.junit.jupiter.api.Test; - import com.google.protobuf.ByteString; import com.salesforce.apollo.choam.proto.Transaction; import com.salesforce.apollo.cryptography.DigestAlgorithm; @@ -22,6 +13,12 @@ import com.salesforce.apollo.stereotomy.StereotomyImpl; import com.salesforce.apollo.stereotomy.mem.MemKERL; import com.salesforce.apollo.stereotomy.mem.MemKeyStore; +import org.junit.jupiter.api.Test; + +import java.security.SecureRandom; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.*; /** * @author hal.hildebrand @@ -34,7 +31,7 @@ public void func() throws Exception { entropy.setSeed(new byte[] { 6, 6, 6 }); var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy); TxDataSource ds = new TxDataSource(new ControlledIdentifierMember(stereotomy.newIdentifier()), 100, null, 1024, - Duration.ofMillis(100), 100, ExponentialBackoffPolicy.newBuilder().build()); + Duration.ofMillis(100), 100); Transaction tx = Transaction.newBuilder() .setContent(ByteString.copyFromUtf8("Give me food or give me slack or kill me")) .build(); diff --git a/cryptography/src/main/java/com/salesforce/apollo/cryptography/SignatureAlgorithm.java b/cryptography/src/main/java/com/salesforce/apollo/cryptography/SignatureAlgorithm.java index 1e6f868a0..c69097ecb 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/cryptography/SignatureAlgorithm.java +++ b/cryptography/src/main/java/com/salesforce/apollo/cryptography/SignatureAlgorithm.java @@ -251,7 +251,7 @@ JohnHancock sign(ULong sequenceNumber, PrivateKey[] privateKeys, InputStream mes public static SignatureAlgorithm fromSignatureCode(int i) { return switch (i) { case 0: - throw new IllegalArgumentException("Unknown signature code: " + i); + yield NULL_SIGNATURE; case 1: yield NULL_SIGNATURE; case 2: diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java index 829b7e2de..60a8efb9a 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java @@ -28,9 +28,7 @@ import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -42,8 +40,8 @@ import static com.salesforce.apollo.ethereal.memberships.comm.GossiperClient.getCreate; /** - * Handles the gossip propigation of proposals, commits and preVotes from this node, as well as the notification of the - * adder of such from other nodes. + * Handles the gossip propagation of proposals, the commits and preVotes from this node, and the notification of the + * Adder of such events from other nodes. * * @author hal.hildebrand */ @@ -59,7 +57,6 @@ public class ChRbcGossip { private final SliceIterator ring; private final AtomicBoolean started = new AtomicBoolean(); private final Terminal terminal = new Terminal(); - private final List membership; private volatile ScheduledFuture scheduled; public ChRbcGossip(Digest id, SigningMember member, Collection membership, Processor processor, @@ -68,12 +65,10 @@ public ChRbcGossip(Digest id, SigningMember member, Collection membershi this.member = member; this.metrics = m; this.id = id; - this.membership = new ArrayList<>(membership); comm = communications.create(member, id, terminal, getClass().getCanonicalName(), r -> new GossiperServer(communications.getClientIdentityProvider(), metrics, r), getCreate(metrics), Gossiper.getLocalLoopback(member)); - ring = new SliceIterator("ChRbcGossip[%s on: %s]".formatted(id, member.getId()), member, membership, - comm); + ring = new SliceIterator<>("ChRbcGossip[%s on: %s]".formatted(id, member.getId()), member, membership, comm); } /** @@ -128,7 +123,7 @@ private void gossip(Duration frequency, ScheduledExecutorService scheduler) { return; } var timer = metrics == null ? null : metrics.gossipRoundDuration().time(); - ring.iterate((link, _) -> gossipRound(link), (result, link, _) -> { + ring.iterate((link) -> gossipRound(link), (result, _, link) -> { handle(result, link); return true; }, () -> { @@ -150,7 +145,7 @@ private Update gossipRound(Gossiper link) { if (!started.get()) { return null; } - log.trace("gossiping[{}] with {} on {}", id, link.getMember(), member); + log.trace("gossiping[{}] with {} on {}", id, link.getMember(), member.getId()); try { return link.gossip(processor.gossip(id)); } catch (StatusRuntimeException e) { diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index adaf5837c..6ac384ac8 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -97,10 +97,10 @@ void seeding() { var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); reseed.set(() -> { final var registration = registration(); - seedlings.iterate((link, m) -> { + seedlings.iterate((link) -> { log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), node.getId()); return link.seed(registration); - }, (futureSailor, link, m) -> complete(redirect, futureSailor, m), () -> { + }, (futureSailor, _, link) -> complete(redirect, futureSailor, link.getMember()), () -> { if (!redirect.isDone()) { scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(reseed.get(), log)), params.retryDelay().toNanos(), TimeUnit.NANOSECONDS); @@ -273,7 +273,7 @@ private void join(Redirect redirect, Digest v, Duration duration) { if (!view.started.get()) { return; } - redirecting.iterate((link, m) -> { + redirecting.iterate((link) -> { if (gateway.isDone() || !view.started.get()) { return null; } @@ -295,7 +295,7 @@ private void join(Redirect redirect, Digest v, Duration duration) { abandon.incrementAndGet(); return null; } - }, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, trusts, + }, (futureSailor, _, link) -> completeGateway((Participant) link.getMember(), gateway, futureSailor, trusts, initialSeedSet, v, majority), () -> { if (!view.started.get() || gateway.isDone()) { return; diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index b8f0c2f83..c1bbc5aad 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -16,9 +16,9 @@ import com.salesforce.apollo.archipelago.Router.ServiceRouting; import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.bloomFilters.BloomFilter; -import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContext; import com.salesforce.apollo.context.DynamicContextImpl; +import com.salesforce.apollo.context.ViewChange; import com.salesforce.apollo.cryptography.*; import com.salesforce.apollo.cryptography.proto.Biff; import com.salesforce.apollo.fireflies.Binding.Bound; @@ -58,6 +58,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -82,34 +83,32 @@ * @since 220 */ public class View { - private static final String FINALIZE_VIEW_CHANGE = "FINALIZE VIEW CHANGE"; - private static final Logger log = LoggerFactory.getLogger( - View.class); - private static final String SCHEDULED_VIEW_CHANGE = "Scheduled View Change"; - final CommonCommunications comm; - final AtomicBoolean started = new AtomicBoolean(); - private final CommonCommunications approaches; - private final DynamicContext context; - private final DigestAlgorithm digestAlgo; - private final RingCommunications gossiper; - private final AtomicBoolean introduced = new AtomicBoolean(); - private final List> viewChangeListeners = new CopyOnWriteArrayList<>(); - private final Executor viewNotificationQueue = Executors.newSingleThreadExecutor( - Thread.ofVirtual().factory()); - private final FireflyMetrics metrics; - private final Node node; - private final Map observations = new ConcurrentSkipListMap<>(); - private final Parameters params; - private final ConcurrentMap pendingRebuttals = new ConcurrentSkipListMap<>(); - private final RoundScheduler roundTimers; - private final Set shunned = new ConcurrentSkipListSet<>(); - private final Map timers = new HashMap<>(); - private final ReadWriteLock viewChange = new ReentrantReadWriteLock( - true); - private final ViewManagement viewManagement; - private final EventValidation validation; - private final Verifiers verifiers; - private volatile ScheduledFuture futureGossip; + private static final String FINALIZE_VIEW_CHANGE = "FINALIZE VIEW CHANGE"; + private static final Logger log = LoggerFactory.getLogger(View.class); + private static final String SCHEDULED_VIEW_CHANGE = "Scheduled View Change"; + + final CommonCommunications comm; + final AtomicBoolean started = new AtomicBoolean(); + private final CommonCommunications approaches; + private final DynamicContext context; + private final DigestAlgorithm digestAlgo; + private final RingCommunications gossiper; + private final AtomicBoolean introduced = new AtomicBoolean(); + private final Map> viewChangeListeners = new HashMap<>(); + private final Executor viewNotificationQueue; + private final FireflyMetrics metrics; + private final Node node; + private final Map observations = new ConcurrentSkipListMap<>(); + private final Parameters params; + private final ConcurrentMap pendingRebuttals = new ConcurrentSkipListMap<>(); + private final RoundScheduler roundTimers; + private final Set shunned = new ConcurrentSkipListSet<>(); + private final Map timers = new HashMap<>(); + private final ReadWriteLock viewChange; + private final ViewManagement viewManagement; + private final EventValidation validation; + private final Verifiers verifiers; + private volatile ScheduledFuture futureGossip; public View(DynamicContext context, ControlledIdentifierMember member, String endpoint, EventValidation validation, Verifiers verifiers, Router communications, Parameters params, @@ -141,6 +140,8 @@ public View(DynamicContext context, ControlledIdentifierMember memb gossiper.ignoreSelf(); this.validation = validation; this.verifiers = verifiers; + viewNotificationQueue = Executors.newSingleThreadExecutor(Thread.ofVirtual().factory()); + viewChange = new ReentrantReadWriteLock(true); } /** @@ -170,7 +171,7 @@ public static boolean isValidMask(BitSet mask, DynamicContext context) { /** * Deregister the listener */ - public void deregister(BiConsumer listener) { + public void deregister(Consumer listener) { viewChangeListeners.remove(listener); } @@ -191,8 +192,8 @@ public Digest getNodeId() { /** * Register the listener to receive view changes */ - public void register(BiConsumer listener) { - viewChangeListeners.add(listener); + public void register(String key, Consumer listener) { + viewChangeListeners.put(key, listener); } /** @@ -343,17 +344,16 @@ void finalizeViewChange() { return; } viewChange(() -> { - final var superMajority = - context.size() == 1 ? 1 : context.getRingCount() - ((context.getRingCount() - 1) / 4); - if (observations.size() < superMajority) { - log.trace("Do not have superMajority: {} required: {} observers: {} for: {} on: {}", - observations.size(), superMajority, viewManagement.observersList(), currentView(), - node.getId()); + final var supermajority = (context.getRingCount() * 3 / 4) + 1; + final var majority = context.size() == 1 ? 1 : supermajority; + if (observations.size() < majority) { + log.trace("Do not have majority: {} required: {} observers: {} for: {} on: {}", observations.size(), + majority, viewManagement.observersList(), currentView(), node.getId()); scheduleFinalizeViewChange(2); return; } - log.info("Finalizing view change: {} required: {} observers: {} for: {} on: {}", context.getId(), - superMajority, viewManagement.observersList(), currentView(), node.getId()); + log.info("Finalizing view change: {} required: {} observers: {} for: {} on: {}", context.getId(), majority, + viewManagement.observersList(), currentView(), node.getId()); HashMultiset ballots = HashMultiset.create(); final var current = currentView(); observations.values() @@ -374,15 +374,15 @@ void finalizeViewChange() { .stream() .max(Ordering.natural().onResultOf(Multiset.Entry::getCount)) .orElse(null); - if (max != null && max.getCount() >= superMajority) { - log.info("View consensus successful: {} required: {} cardinality: {} for: {} on: {}", max, - superMajority, viewManagement.cardinality(), currentView(), node.getId()); + if (max != null && max.getCount() >= majority) { + log.info("View consensus successful: {} required: {} cardinality: {} for: {} on: {}", max, majority, + viewManagement.cardinality(), currentView(), node.getId()); viewManagement.install(max.getElement()); } else { @SuppressWarnings("unchecked") final var reversed = Comparator.comparing(e -> ((Entry) e).getCount()).reversed(); log.info("View consensus failed: {}, required: {} cardinality: {} ballots: {} for: {} on: {}", - observations.size(), superMajority, viewManagement.cardinality(), + observations.size(), majority, viewManagement.cardinality(), ballots.entrySet().stream().sorted(reversed).toList(), currentView(), node.getId()); } @@ -418,16 +418,18 @@ void introduced() { } void notifyListeners(List joining, List leaving) { - final var current = currentView(); - var sc = context.asStatic(); + final var viewChange = new ViewChange(context.asStatic(), currentView(), + joining.stream().map(SelfAddressingIdentifier::getDigest).toList(), + Collections.unmodifiableList(leaving)); viewNotificationQueue.execute(Utils.wrapped(() -> { - viewChangeListeners.forEach(listener -> { + viewChangeListeners.entrySet().forEach(entry -> { try { - log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ", listener, - currentView(), context.size(), joining.size(), leaving.size(), node.getId()); - listener.accept(sc, current); + log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ", + entry.getKey(), currentView(), context.size(), joining.size(), leaving.size(), + node.getId()); + entry.getValue().accept(viewChange); } catch (Throwable e) { - log.error("error in view change listener: {} on: {} ", listener, node.getId(), e); + log.error("error in view change listener: {} on: {} ", entry.getKey(), node.getId(), e); } }); }, log)); @@ -609,14 +611,13 @@ void viewChange(Runnable r) { protected Gossip gossip(Fireflies link, int ring) { tick(); if (shunned.contains(link.getMember().getId())) { - log.trace("Shunning gossip view: {} with: {} on: {}", currentView(), link.getMember().getId(), - node.getId()); if (metrics != null) { metrics.shunnedGossip().mark(); } return null; } + final var p = (Participant) link.getMember(); final SayWhat gossip = stable(() -> SayWhat.newBuilder() .setView(currentView().toDigeste()) .setNote(node.getNote().getWrapped()) @@ -625,42 +626,42 @@ protected Gossip gossip(Fireflies link, int ring) { .build()); try { return link.gossip(gossip); - } catch (Throwable e) { - final var p = (Participant) link.getMember(); - if (e instanceof StatusRuntimeException sre) { - switch (sre.getStatus().getCode()) { - case PERMISSION_DENIED: - log.trace("Rejected gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), - node.getId()); - break; - case FAILED_PRECONDITION: - log.trace("Failed gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), - node.getId()); - break; - case RESOURCE_EXHAUSTED: - log.trace("Resource exhausted for gossip: {} view: {} from: {} on: {}", sre.getStatus(), - currentView(), p.getId(), node.getId()); - break; - case CANCELLED: - log.trace("Communication cancelled for gossip view: {} from: {} on: {}", currentView(), p.getId(), - node.getId()); - break; - case UNAVAILABLE: - accuse(p, ring, sre); - break; - default: - log.debug("Error gossiping: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), - node.getId()); - accuse(p, ring, sre); - break; + } catch (StatusRuntimeException sre) { + switch (sre.getStatus().getCode()) { + case PERMISSION_DENIED: + log.trace("Rejected gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), + node.getId()); + break; + case FAILED_PRECONDITION: + log.trace("Failed gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), + node.getId()); + break; + case RESOURCE_EXHAUSTED: + log.trace("Resource exhausted for gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), + p.getId(), node.getId()); + break; + case CANCELLED: + log.trace("Communication cancelled for gossip view: {} from: {} on: {}", currentView(), p.getId(), + node.getId()); + break; + case UNAVAILABLE: + log.trace("Communication cancelled for gossip view: {} from: {} on: {}", currentView(), p.getId(), + node.getId(), sre); + accuse(p, ring, sre); + break; + default: + log.debug("Error gossiping: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), + node.getId()); + accuse(p, ring, sre); + break; - } - } else { - log.debug("Exception gossiping joined: {} with: {} view: {} on: {}", viewManagement.joined(), p.getId(), - currentView(), node.getId(), e); - accuse(p, ring, e); } return null; + } catch (Throwable e) { + log.debug("Exception gossiping joined: {} with: {} view: {} on: {}", viewManagement.joined(), p.getId(), + currentView(), node.getId(), e); + accuse(p, ring, e); + return null; } } @@ -678,7 +679,7 @@ private void accuse(Participant member, int ring, Throwable e) { member.addAccusation(node.accuse(member, ring)); pendingRebuttals.computeIfAbsent(member.getId(), d -> roundTimers.schedule(() -> gc(member), params.rebuttalTimeout())); - log.debug("Accuse {} on ring {} view: {} (timer started): {} on: {}", member.getId(), ring, currentView(), + log.debug("Accuse: {} on ring: {} view: {} (timer started): {} on: {}", member.getId(), ring, currentView(), e.getMessage(), node.getId()); } @@ -1102,6 +1103,8 @@ private void gossip(Optional result, RingCommunications.Destination value.verify(signature, message)).orElse(false); } private boolean verify(SelfAddressingIdentifier id, SigningThreshold threshold, JohnHancock signature, InputStream message) { - var verifier = verifiers.verifierFor(id); - if (verifier.isEmpty()) { - return false; - } - return verifier.get().verify(threshold, signature, message); + return verifiers.verifierFor(id).map(value -> value.verify(threshold, signature, message)).orElse(false); } public record Seed(SelfAddressingIdentifier identifier, String endpoint) { @@ -1469,6 +1464,7 @@ public Node(ControlledIdentifierMember wrapped, String endpoint) { .setSignature(wrapped.sign(n.toByteString()).toSig()) .build(); note = new NoteWrapper(signedNote, digestAlgo); + log.info("Endpoint: {} on: {}", endpoint, wrapped.getId()); } /** @@ -1858,7 +1854,7 @@ public void join(Join join, Digest from, StreamObserver responseObserve /** * The first message in the anti-entropy protocol. Process any digests from the inbound gossip digest. Respond * with the Gossip that represents the digests newer or not known in this view, as well as updates from this - * node based on out of date information in the supplied digests. + * node based on out-of-date information in the supplied digests. * * @param request - the Gossip from our partner * @return Teh response for Moar gossip - updates this node has which the sender is out of touch with, and diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java index 3104ab20d..8055e32ff 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/ViewManagement.java @@ -15,12 +15,10 @@ import com.salesforce.apollo.fireflies.Binding.Bound; import com.salesforce.apollo.fireflies.View.Node; import com.salesforce.apollo.fireflies.View.Participant; -import com.salesforce.apollo.fireflies.comm.gossip.Fireflies; import com.salesforce.apollo.fireflies.proto.*; import com.salesforce.apollo.fireflies.proto.Update.Builder; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.ReservoirSampler; -import com.salesforce.apollo.ring.SliceIterator; import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; import com.salesforce.apollo.utils.Entropy; import com.salesforce.apollo.utils.Utils; @@ -32,7 +30,10 @@ import java.time.Duration; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -374,6 +375,12 @@ boolean joined() { * start a view change if there are any offline members or joining members */ void maybeViewChange() { + if (context.size() == 1 && joins.size() < context.getRingCount() - 1) { + log.info("Do not have minimum cluster size: {} required: {} for: {} on: {}", joins.size() + context.size(), + 4, currentView(), node.getId()); + view.scheduleViewChange(); + return; + } if ((context.offlineCount() > 0 || !joins.isEmpty())) { if (isObserver()) { log.trace("Initiating view change: {} (non observer) joins: {} leaves: {} on: {}", currentView(), @@ -403,31 +410,6 @@ List observersList() { return observers().stream().toList(); } - void populate(List sample) { - var populate = new SliceIterator("Populate: " + context.getId(), node, sample, view.comm); - var repopulate = new AtomicReference(); - var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); - repopulate.set(() -> populate.iterate((link, m) -> { - return view.gossip(link, 0); - }, (futureSailor, link, m) -> { - futureSailor.ifPresent(g -> { - if (!g.getRedirect().equals(SignedNote.getDefaultInstance())) { - final Participant member = (Participant) link.getMember(); - view.stable(() -> view.redirect(member, g, 0)); - } else { - view.stable(() -> view.processUpdates(g)); - } - }); - return !joined(); - }, () -> { - if (!joined()) { - scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(repopulate.get(), log)), - params.populateDuration().toNanos(), TimeUnit.NANOSECONDS); - } - }, params.populateDuration())); - repopulate.get().run(); - } - JoinGossip.Builder processJoins(BloomFilter bff) { JoinGossip.Builder builder = JoinGossip.newBuilder(); @@ -609,7 +591,6 @@ private void resetObservers() { if (observers.size() > 1 && observers.size() < context.getRingCount()) { log.debug("Incomplete observers: {} cardinality: {} view: {} context: {} on: {}", observers.size(), context.cardinality(), currentView(), context.getId(), node.getId()); - assert observers.size() > 1 && observers.size() < context.getRingCount(); } log.trace("Reset observers: {} cardinality: {} view: {} context: {} on: {}", observers.size(), context.cardinality(), currentView(), context.getId(), node.getId()); diff --git a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java index 937c584b5..3c977ab52 100644 --- a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java +++ b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java @@ -163,11 +163,11 @@ private SignedNonce generateNonce(KERL_ application) { final var redirecting = new SliceIterator<>("Nonce Endorsement", member, successors, endorsementComm); Set endorsements = Collections.newSetFromMap(new ConcurrentHashMap<>()); var generated = new CompletableFuture(); - redirecting.iterate((link, m) -> { + redirecting.iterate((link) -> { log.info("Request signing nonce for: {} contacting: {} on: {}", identifier, link.getMember().getId(), member.getId()); return link.endorse(nonce, parameters.registrationTimeout()); - }, (futureSailor, link, m) -> completeEndorsement(futureSailor, m, endorsements), () -> { + }, (futureSailor, _, link) -> completeEndorsement(futureSailor, link.getMember(), endorsements), () -> { if (endorsements.size() < majority) { generated.completeExceptionally(new StatusRuntimeException(Status.ABORTED.withDescription( "Cannot gather required nonce endorsements: %s required: %s on: %s".formatted(endorsements.size(), @@ -208,7 +208,7 @@ private Identifier identifier(KERL_ kerl) { return null; } - private void notarize(Credentials credentials, Validations validations) { + private CompletableFuture notarize(Credentials credentials, Validations validations) { final var kerl = credentials.getAttestation().getAttestation().getKerl(); final var identifier = identifier(kerl); if (identifier == null) { @@ -224,15 +224,21 @@ private void notarize(Credentials credentials, Validations validations) { final var majority = context.size() == 1 ? 1 : context.majority(); SliceIterator redirecting = new SliceIterator<>("Enrollment", member, successors, endorsementComm); var completed = new HashSet(); - redirecting.iterate((link, m) -> { + var result = new CompletableFuture(); + redirecting.iterate((link) -> { log.info("Enrolling: {} contacting: {} on: {}", identifier, link.getMember().getId(), member.getId()); link.enroll(notarization, parameters.registrationTimeout()); return Empty.getDefaultInstance(); - }, (futureSailor, link, m) -> completeEnrollment(futureSailor, m, completed), () -> { + }, (futureSailor, _, link) -> completeEnrollment(futureSailor, link.getMember(), completed), () -> { if (completed.size() < majority) { - throw new StatusRuntimeException(Status.ABORTED.withDescription("Cannot complete enrollment")); + var sre = new StatusRuntimeException(Status.ABORTED.withDescription("Cannot complete enrollment")); + result.completeExceptionally(sre); + throw sre; + } else { + result.complete(validations); } }, parameters.frequency()); + return result; } private Validations register(Credentials request) { @@ -250,11 +256,11 @@ private Validations register(Credentials request) { final var majority = context.size() == 1 ? 1 : context.majority(); final var redirecting = new SliceIterator<>("Credential verification", member, successors, endorsementComm); var verifications = new HashSet(); - redirecting.iterate((link, m) -> { + redirecting.iterate((link) -> { log.debug("Validating credentials for: {} contacting: {} on: {}", identifier, link.getMember().getId(), member.getId()); return link.validate(request, parameters.registrationTimeout()); - }, (futureSailor, link, m) -> completeVerification(futureSailor, m, verifications), () -> { + }, (futureSailor, _, link) -> completeVerification(futureSailor, link.getMember(), verifications), () -> { if (verifications.size() < majority) { throw new StatusRuntimeException( Status.ABORTED.withDescription("Cannot gather required credential validations")); @@ -272,15 +278,12 @@ private Validations register(Credentials request) { } }, parameters.frequency()); try { - return validated.thenApply(v -> { - notarize(request, v); - return v; - }).get(); + return validated.thenCompose(v -> notarize(request, v)).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } catch (ExecutionException e) { - throw new RuntimeException(e); + throw new StatusRuntimeException(Status.INTERNAL.withCause(e.getCause())); } } diff --git a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/comm/endorsement/Endorsement.java b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/comm/endorsement/Endorsement.java index 24cad162e..c4bd93f0a 100644 --- a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/comm/endorsement/Endorsement.java +++ b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/comm/endorsement/Endorsement.java @@ -6,15 +6,13 @@ */ package com.salesforce.apollo.gorgoneion.comm.endorsement; -import com.google.common.util.concurrent.SettableFuture; -import com.google.protobuf.Empty; +import com.salesforce.apollo.archipelago.Link; import com.salesforce.apollo.gorgoneion.proto.Credentials; import com.salesforce.apollo.gorgoneion.proto.MemberSignature; import com.salesforce.apollo.gorgoneion.proto.Nonce; import com.salesforce.apollo.gorgoneion.proto.Notarization; -import com.salesforce.apollo.stereotomy.event.proto.Validation_; -import com.salesforce.apollo.archipelago.Link; import com.salesforce.apollo.membership.Member; +import com.salesforce.apollo.stereotomy.event.proto.Validation_; import java.io.IOException; import java.time.Duration; @@ -32,13 +30,11 @@ public void close() throws IOException { @Override public MemberSignature endorse(Nonce nonce, Duration timer) { - SettableFuture f = SettableFuture.create(); return service.endorse(nonce, member.getId()); } @Override public void enroll(Notarization notarization, Duration timeout) { - SettableFuture f = SettableFuture.create(); service.enroll(notarization, member.getId()); } @@ -49,7 +45,6 @@ public Member getMember() { @Override public Validation_ validate(Credentials credentials, Duration timeout) { - SettableFuture f = SettableFuture.create(); return service.validate(credentials, member.getId()); } }; diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/EndpointProvider.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/EndpointProvider.java index b3fb1b0b8..b5190c6bb 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/EndpointProvider.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/EndpointProvider.java @@ -11,22 +11,32 @@ import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.utils.Utils; import io.netty.handler.ssl.ClientAuth; +import org.slf4j.LoggerFactory; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.UnknownHostException; /** * @author hal.hildebrand */ public interface EndpointProvider { static String allocatePort() { - var addr = new InetSocketAddress(Utils.allocatePort()); + InetSocketAddress addr = null; + try { + addr = new InetSocketAddress(InetAddress.getLocalHost(), Utils.allocatePort(InetAddress.getLocalHost())); + } catch (UnknownHostException e) { + throw new IllegalStateException("Cannot resolve localhost!", e); + } return HostAndPort.fromParts(addr.getHostName(), addr.getPort()).toString(); } static T reify(String encoded) { var hnp = HostAndPort.fromString(encoded); - return (T) new InetSocketAddress(hnp.getHost(), hnp.getPort()); + var inetSocketAddress = new InetSocketAddress(hnp.getHost(), hnp.getPort()); + LoggerFactory.getLogger(EndpointProvider.class).trace("Resolved host: {}", inetSocketAddress); + return (T) inetSocketAddress; } SocketAddress addressFor(Member to); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/StandardEpProvider.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/StandardEpProvider.java index 6cd96f10e..eafc818f5 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/StandardEpProvider.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/StandardEpProvider.java @@ -25,7 +25,12 @@ public class StandardEpProvider implements EndpointProvider { public StandardEpProvider(String bindAddress, ClientAuth clientAuth, CertificateValidator validator, Function resolver) { - this.bindAddress = EndpointProvider.reify(bindAddress); + this(EndpointProvider.reify(bindAddress), clientAuth, validator, resolver); + } + + public StandardEpProvider(SocketAddress bindAddress, ClientAuth clientAuth, CertificateValidator validator, + Function resolver) { + this.bindAddress = bindAddress; this.clientAuth = clientAuth; this.validator = validator; this.resolver = resolver; diff --git a/memberships/src/main/java/com/salesforce/apollo/context/ViewChange.java b/memberships/src/main/java/com/salesforce/apollo/context/ViewChange.java new file mode 100644 index 000000000..b1cdc4d8c --- /dev/null +++ b/memberships/src/main/java/com/salesforce/apollo/context/ViewChange.java @@ -0,0 +1,8 @@ +package com.salesforce.apollo.context; + +import com.salesforce.apollo.cryptography.Digest; + +import java.util.Collection; + +public record ViewChange(Context context, Digest diadem, Collection joining, Collection leaving) { +} diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java index c582c09e3..4ee531173 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/RingIterator.java @@ -171,16 +171,17 @@ private void proceed(Digest key, final boolean allow, Runnable onMajority, Runna if (!finalIteration) { log.trace( "Determining: {} continuation of: {} for digest: {} tally: {} majority: {} final itr: {} allow: {} on: {}", - current, key, context.getId(), tally.get(), context.majority(), finalIteration, allow, member.getId()); + current, key, context.getId(), tally.get(), context.toleranceLevel() + 1, finalIteration, allow, + member.getId()); } if (finalIteration && allow) { log.trace("Completing iteration: {} of: {} for digest: {} tally: {} on: {}", iteration(), key, context.getId(), tally.get(), member.getId()); if (failedMajority != null && !majorityFailed) { - if (tally.get() < context.majority()) { + if (tally.get() < context.toleranceLevel() + 1) { majorityFailed = true; log.debug("Failed to obtain majority of: {} for digest: {} tally: {} required: {} on: {}", key, - context.getId(), tally.get(), context.majority(), member.getId()); + context.getId(), tally.get(), context.toleranceLevel() + 1, member.getId()); failedMajority.run(); } } @@ -192,7 +193,7 @@ private void proceed(Digest key, final boolean allow, Runnable onMajority, Runna member.getId()); } else { if (onMajority != null && !majoritySucceed) { - if (tally.get() >= context.majority()) { + if (tally.get() > context.toleranceLevel()) { majoritySucceed = true; log.debug("Obtained: {} majority of: {} for digest: {} tally: {} on: {}", current, key, context.getId(), tally.get(), member.getId()); diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java index ba283f25f..bc7733cda 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java @@ -23,8 +23,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.function.Function; /** * @author hal.hildebrand @@ -37,8 +38,12 @@ public class SliceIterator { private final SigningMember member; private final List slice; private final ScheduledExecutorService scheduler; + private final int majority; private volatile Member current; private volatile Iterator currentIteration; + private volatile int i; + private volatile boolean majoritySucceed = false; + private volatile boolean majorityFailed; public SliceIterator(String label, SigningMember member, Collection slice, CommonCommunications comm) { @@ -47,6 +52,11 @@ public SliceIterator(String label, SigningMember member, Collection s, CommonCommunications comm, ScheduledExecutorService scheduler) { + this(label, member, s, comm, scheduler, -1); + } + + public SliceIterator(String label, SigningMember member, Collection s, + CommonCommunications comm, ScheduledExecutorService scheduler, int majority) { assert member != null && s != null && comm != null; assert !s.stream().filter(Objects::nonNull).toList().isEmpty() : "All elements must be non-null: " + s; this.label = label; @@ -54,46 +64,57 @@ public SliceIterator(String label, SigningMember member, Collection(s); this.comm = comm; this.scheduler = scheduler; + this.majority = majority; Entropy.secureShuffle(this.slice); this.currentIteration = slice.iterator(); log.debug("Slice for: <{}> is: {} on: {}", label, slice.stream().map(Member::getId).toList(), member.getId()); } - public void iterate(BiFunction round, SlicePredicateHandler handler, - Runnable onComplete, Duration frequency) { + public void iterate(Function round, SlicePredicateHandler handler, Runnable onComplete, + Duration frequency) { + iterate(null, round, handler, onComplete, frequency, null); + } + + public void iterate(Runnable onMajority, Function round, SlicePredicateHandler handler, + Runnable onComplete, Duration frequency, Runnable failedMajority) { log.trace("Starting iteration of: <{}> on: {}", label, member.getId()); - Thread.ofVirtual().start(Utils.wrapped(() -> internalIterate(round, handler, onComplete, frequency), log)); + var tally = new AtomicInteger(0); + Thread.ofVirtual() + .start(Utils.wrapped( + () -> internalIterate(round, onMajority, handler, onComplete, tally, failedMajority, frequency), log)); } - public void iterate(BiFunction round, SlicePredicateHandler handler, - Duration frequency) { + public void iterate(Function round, SlicePredicateHandler handler, Duration frequency) { iterate(round, handler, null, frequency); } - private void internalIterate(BiFunction round, SlicePredicateHandler handler, - Runnable onComplete, Duration frequency) { - Runnable proceed = () -> internalIterate(round, handler, onComplete, frequency); + private void internalIterate(Function round, Runnable onMajority, + SlicePredicateHandler handler, Runnable onComplete, AtomicInteger tally, + Runnable failedMajority, Duration frequency) { + Runnable proceed = () -> internalIterate(round, onMajority, handler, onComplete, tally, onMajority, frequency); - Consumer allowed = allow -> proceed(allow, proceed, onComplete, frequency); + var c = i + 1; + i = c; + Consumer allowed = allow -> proceed(onMajority, allow, proceed, tally, onComplete, frequency, + failedMajority); try (Comm link = next()) { if (link == null || link.getMember() == null) { - log.trace("No link for iteration of: <{}> on: {}", label, member.getId()); - allowed.accept( - handler.handle(Optional.empty(), link, slice.isEmpty() ? null : slice.get(slice.size() - 1))); + log.trace("No link for iteration: {} of: <{}> on: {}", c, label, member.getId()); + allowed.accept(handler.handle(Optional.empty(), tally, link)); return; } - log.trace("Iteration of: <{}> to: {} on: {}", label, link.getMember().getId(), member.getId()); + log.trace("Iteration: {} of: <{}> to: {} on: {}", c, label, link.getMember().getId(), member.getId()); T result = null; try { - result = round.apply(link, link.getMember()); + result = round.apply(link); } catch (StatusRuntimeException e) { - log.trace("Error: {} applying: <{}> slice to: {} on: {}", e, label, link.getMember().getId(), - member.getId()); + log.trace("Error: {} applying: <{}> slice to: {} iteration: {} on: {}", e, label, + link.getMember().getId(), c, member.getId()); } catch (Throwable e) { - log.error("Unhandled: {} applying: <{}> slice to: {} on: {}", e, label, link.getMember().getId(), - member.getId()); + log.error("Unhandled: {} applying: <{}> slice to: {} iteration: {} on: {}", e, label, + link.getMember().getId(), c, member.getId()); } - allowed.accept(handler.handle(Optional.ofNullable(result), link, link.getMember())); + allowed.accept(handler.handle(Optional.ofNullable(result), tally, link)); } catch (IOException e) { log.debug("Error closing", e); } @@ -121,16 +142,33 @@ private Comm next() { return linkFor(current); } - private void proceed(final boolean allow, Runnable proceed, Runnable onComplete, Duration frequency) { + private void proceed(Runnable onMajority, final boolean allow, Runnable proceed, AtomicInteger tally, + Runnable onComplete, Duration frequency, Runnable failedMajority) { log.trace("Determining continuation for: <{}> final itr: {} allow: {} on: {}", label, !currentIteration.hasNext(), allow, member.getId()); if (!currentIteration.hasNext() && allow) { + if (failedMajority != null && !majorityFailed) { + if (tally.get() < majority) { + majorityFailed = true; + log.debug("Failed to obtain majority for: {} tally: {} required: {} on: {}", label, tally.get(), + majority, member.getId()); + failedMajority.run(); + } + } log.trace("Final iteration of: <{}> on: {}", label, member.getId()); if (onComplete != null) { log.trace("Completing iteration for: {} on: {}", label, member.getId()); onComplete.run(); } } else if (allow) { + if (onMajority != null && !majoritySucceed) { + if (tally.get() >= majority) { + majoritySucceed = true; + log.debug("Obtained: {} majority of: {} tally: {} on: {}", current, label, tally.get(), + member.getId()); + onMajority.run(); + } + } log.trace("Proceeding for: <{}> on: {}", label, member.getId()); scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(proceed, log)), frequency.toNanos(), TimeUnit.NANOSECONDS); @@ -141,6 +179,6 @@ private void proceed(final boolean allow, Runnable proceed, Runnable onComplete, @FunctionalInterface public interface SlicePredicateHandler { - boolean handle(Optional result, Comm communications, Member member); + boolean handle(Optional result, AtomicInteger tally, Comm communications); } } diff --git a/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java b/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java index b0abd0790..048980797 100644 --- a/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/ring/SliceIteratorTest.java @@ -94,10 +94,9 @@ public Any ping(Any request) { var slice = new SliceIterator("Test Me", serverMember1, Arrays.asList(serverMember1, serverMember2), commsA); var countdown = new CountDownLatch(1); - slice.iterate((link, member) -> link.ping(Any.getDefaultInstance()), (result, comms, member) -> true, - () -> { - countdown.countDown(); - }, Duration.ofMillis(1)); + slice.iterate((link) -> link.ping(Any.getDefaultInstance()), (_, _, _) -> true, () -> { + countdown.countDown(); + }, Duration.ofMillis(1)); boolean finished = countdown.await(3, TimeUnit.SECONDS); assertTrue(finished, "completed: " + countdown.getCount()); assertTrue(pinged1.get()); diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index 51645eded..28bd6f841 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -10,6 +10,7 @@ import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContext; +import com.salesforce.apollo.context.ViewChange; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.SignatureAlgorithm; @@ -32,6 +33,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.RejectedExecutionException; import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * The logical domain of the current "Process" - OS and Simulation defined, 'natch. @@ -51,7 +53,7 @@ public class ProcessDomain extends Domain { private final Verifiers.DelegatedVerifiers verifiers; private final ProcessDomainParameters parameters; private final List> lifecycleListeners = new CopyOnWriteArrayList<>(); - private final BiConsumer listener = listener(); + private final Consumer listener = listener(); public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDomainParameters pdParams, Builder builder, Parameters.RuntimeParameters.Builder runtime, String endpoint, @@ -70,7 +72,7 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDom verifiers = new Verifiers.DelegatedVerifiers(Verifiers.NONE); this.foundation = new View(base, getMember(), endpoint, validations, verifiers, params.communications(), ff.build(), DigestAlgorithm.DEFAULT, null); - foundation.register(listener); + foundation.register("ProcessDomain[%s]".formatted(member.getId()), listener); } public KerlDHT getDht() { @@ -118,12 +120,15 @@ public void stop() { } } - protected BiConsumer listener() { - return (context, diadem) -> { - choam.rotateViewKeys(context, diadem); + protected Consumer listener() { + return (viewChange) -> { + log.info("Start view change: {} for: {} cardinality: {} on: {}", viewChange.diadem(), + params.context().getId(), viewChange.context().size(), params.member().getId()); + choam.rotateViewKeys(viewChange); + dht.nextView(viewChange); - log.info("View change: {} for: {} cardinality: {} on: {}", diadem, params.context().getId(), context.size(), - params.member().getId()); + log.info("Finished view change: {} for: {} cardinality: {} on: {}", viewChange.diadem(), + params.context().getId(), viewChange.context().size(), params.member().getId()); }; } diff --git a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java index 641c211c8..e8b6982c2 100644 --- a/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/ContainmentDomainTest.java @@ -11,7 +11,6 @@ import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.proto.FoundationSeal; -import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.DynamicContextImpl; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; @@ -116,33 +115,28 @@ public void smoke() throws Exception { .filter(c -> !c.active()) .map(Domain::logState) .toList())); - var oracle = domains.get(0).getDelphi(); + var oracle = domains.getFirst().getDelphi(); oracle.add(new Oracle.Namespace("test")).get(); DomainTest.smoke(oracle); } private Builder params() { - var template = Parameters.newBuilder() - .setGenerateGenesis(true) - .setGenesisViewId(GENESIS_VIEW_ID) - .setBootstrap(Parameters.BootstrapParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(20)) - .build()) - .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) - .setGossipDuration(Duration.ofMillis(20)) - .setProducer(Parameters.ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(20)) - .setBatchInterval(Duration.ofMillis(50)) - .setMaxBatchByteSize(1024 * 1024) - .setMaxBatchCount(10_000) - .setEthereal(Config.newBuilder() - .setNumberOfEpochs(12) - .setEpochLength(33)) - .build()) - .setCheckpointBlockDelta(200) - .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() - .setInitialBackoff(Duration.ofMillis(1)) - .setMaxBackoff(Duration.ofMillis(1))); - return template; + return Parameters.newBuilder() + .setGenerateGenesis(true) + .setGenesisViewId(GENESIS_VIEW_ID) + .setBootstrap( + Parameters.BootstrapParameters.newBuilder().setGossipDuration(Duration.ofMillis(20)).build()) + .setGenesisViewId(DigestAlgorithm.DEFAULT.getOrigin()) + .setGossipDuration(Duration.ofMillis(20)) + .setProducer(Parameters.ProducerParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(20)) + .setBatchInterval(Duration.ofMillis(50)) + .setMaxBatchByteSize(1024 * 1024) + .setMaxBatchCount(10_000) + .setEthereal(Config.newBuilder() + .setNumberOfEpochs(12) + .setEpochLength(33)) + .build()) + .setCheckpointBlockDelta(200); } } diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index 122956188..ff25f7d56 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -11,7 +11,6 @@ import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.proto.FoundationSeal; -import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; import com.salesforce.apollo.context.DynamicContextImpl; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; @@ -297,10 +296,7 @@ private Builder params() { .setNumberOfEpochs(12) .setEpochLength(33)) .build()) - .setCheckpointBlockDelta(200) - .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() - .setInitialBackoff(Duration.ofMillis(1)) - .setMaxBackoff(Duration.ofMillis(1))); + .setCheckpointBlockDelta(200); return template; } } diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index 867d8d4d0..a11187f26 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -11,9 +11,8 @@ import com.salesforce.apollo.choam.Parameters.Builder; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.proto.FoundationSeal; -import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy; -import com.salesforce.apollo.context.Context; import com.salesforce.apollo.context.DynamicContextImpl; +import com.salesforce.apollo.context.ViewChange; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.delphinius.Oracle; @@ -37,7 +36,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -57,7 +56,7 @@ public class FireFliesTest { @AfterEach public void after() { - domains.forEach(n -> n.stop()); + domains.forEach(ProcessDomain::stop); domains.clear(); routers.values().forEach(r -> r.close(Duration.ofSeconds(0))); routers.clear(); @@ -78,9 +77,10 @@ public void before() throws Exception { var params = params(); var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(params.getDigestAlgorithm()), entropy); - var identities = IntStream.range(0, CARDINALITY).mapToObj(i -> { - return stereotomy.newIdentifier(); - }).collect(Collectors.toMap(controlled -> controlled.getIdentifier().getDigest(), controlled -> controlled)); + var identities = IntStream.range(0, CARDINALITY) + .mapToObj(i -> stereotomy.newIdentifier()) + .collect(Collectors.toMap(controlled -> controlled.getIdentifier().getDigest(), + controlled -> controlled)); Digest group = DigestAlgorithm.DEFAULT.getOrigin(); var sealed = FoundationSeal.newBuilder().build(); @@ -115,17 +115,17 @@ public void smokin() throws Exception { final var seeds = Collections.singletonList( new Seed(domains.getFirst().getMember().getIdentifier().getIdentifier(), EndpointProvider.allocatePort())); domains.forEach(d -> { - BiConsumer c = (context, viewId) -> { - if (context.cardinality() == CARDINALITY) { - System.out.printf("Full view: %s members: %s on: %s%n", viewId, context.cardinality(), - d.getMember().getId()); + Consumer c = viewChange -> { + if (viewChange.context().cardinality() == CARDINALITY) { + System.out.printf("Full view: %s members: %s on: %s%n", viewChange.diadem(), + viewChange.context().cardinality(), d.getMember().getId()); countdown.countDown(); } else { - System.out.printf("Members joining: %s members: %s on: %s%n", viewId, context.cardinality(), - d.getMember().getId()); + System.out.printf("Members joining: %s members: %s on: %s%n", viewChange.diadem(), + viewChange.context().cardinality(), d.getMember().getId()); } }; - d.foundation.register(c); + d.foundation.register("FFTest", c); }); // start seed final var started = new AtomicReference<>(new CountDownLatch(1)); @@ -144,10 +144,10 @@ public void smokin() throws Exception { assertTrue(countdown.await(30, TimeUnit.SECONDS), "Could not join all members in all views"); - assertTrue(Utils.waitForCondition(60_000, 1_000, () -> { - return domains.stream().filter(d -> d.getFoundation().getContext().activeCount() != domains.size()).count() - == 0; - })); + assertTrue(Utils.waitForCondition(60_000, 1_000, () -> domains.stream() + .noneMatch( + d -> d.getFoundation().getContext().activeCount() + != domains.size()))); System.out.println(); System.out.println("******"); System.out.println( @@ -155,12 +155,11 @@ public void smokin() throws Exception { + " members"); System.out.println("******"); System.out.println(); - domains.forEach(n -> n.start()); - final var activated = Utils.waitForCondition(20_000, 1_000, - () -> domains.stream().filter(c -> !c.active()).count() == 0); + domains.forEach(ProcessDomain::start); + final var activated = Utils.waitForCondition(20_000, 1_000, () -> domains.stream().allMatch(Domain::active)); assertTrue(activated, "Domains did not become active : " + (domains.stream() .filter(c -> !c.active()) - .map(d -> d.logState()) + .map(Domain::logState) .toList())); System.out.println(); System.out.println("******"); @@ -169,7 +168,7 @@ public void smokin() throws Exception { + " members"); System.out.println("******"); System.out.println(); - var oracle = domains.get(0).getDelphi(); + var oracle = domains.getFirst().getDelphi(); oracle.add(new Oracle.Namespace("test")).get(); DomainTest.smoke(oracle); } @@ -192,10 +191,7 @@ private Builder params() { .setNumberOfEpochs(12) .setEpochLength(33)) .build()) - .setCheckpointBlockDelta(200) - .setDrainPolicy(ExponentialBackoffPolicy.newBuilder() - .setInitialBackoff(Duration.ofMillis(1)) - .setMaxBackoff(Duration.ofMillis(1))); + .setCheckpointBlockDelta(200); return template; } } diff --git a/protocols/src/main/java/com/salesforce/apollo/protocols/ListNIFs.java b/protocols/src/main/java/com/salesforce/apollo/protocols/ListNIFs.java new file mode 100644 index 000000000..82f6918e6 --- /dev/null +++ b/protocols/src/main/java/com/salesforce/apollo/protocols/ListNIFs.java @@ -0,0 +1,41 @@ +package com.salesforce.apollo.protocols; + +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Collections; +import java.util.Enumeration; + +import static java.lang.System.out; + +/** + * @author hal.hildebrand + **/ +public class ListNIFs { + public static void main(String[] args) throws SocketException { + Enumeration nets = NetworkInterface.getNetworkInterfaces(); + + for (NetworkInterface netIf : Collections.list(nets)) { + out.printf("Display name: %s\n", netIf.getDisplayName()); + out.printf("Name: %s\n", netIf.getName()); + var addresses = netIf.getInterfaceAddresses(); + for (var add : addresses) { + out.printf("Address: %s\n", add); + } + displaySubInterfaces(netIf); + out.print("\n"); + } + } + + static void displaySubInterfaces(NetworkInterface netIf) throws SocketException { + Enumeration subIfs = netIf.getSubInterfaces(); + for (NetworkInterface subIf : Collections.list(subIfs)) { + out.printf("\tSub Interface Display name: %s\n", subIf.getDisplayName()); + out.printf("\tSub Interface Name: %s\n", subIf.getName()); + Enumeration inetAddresses = subIf.getInetAddresses(); + for (InetAddress add : Collections.list(inetAddresses)) { + out.printf("\t Sub Interface Address: %s\n", add.getHostAddress()); + } + } + } +} diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index de2960e19..15948e7ca 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -380,7 +380,6 @@ private Builder parameters(Context context) { .setCheckpointBlockDelta(checkpointBlockSize()) .setCheckpointSegmentSize(128); - params.getDrainPolicy().setInitialBackoff(Duration.ofMillis(1)).setMaxBackoff(Duration.ofMillis(1)); params.getProducer().ethereal().setNumberOfEpochs(2).setEpochLength(20); return params; diff --git a/stereotomy-services/src/main/java/com/salesforce/apollo/stereotomy/services/grpc/kerl/KERLAdapter.java b/stereotomy-services/src/main/java/com/salesforce/apollo/stereotomy/services/grpc/kerl/KERLAdapter.java index 91183ff75..fd6d4c254 100644 --- a/stereotomy-services/src/main/java/com/salesforce/apollo/stereotomy/services/grpc/kerl/KERLAdapter.java +++ b/stereotomy-services/src/main/java/com/salesforce/apollo/stereotomy/services/grpc/kerl/KERLAdapter.java @@ -59,7 +59,8 @@ public Void append(List events) { public List append(List events, List attachments) { var l = kerl.append(events.stream().map(d -> d.toKeyEvent_()).toList(), attachments.stream().map(ae -> ae.toEvent_()).toList()); - return l.stream().map(ks -> new KeyStateImpl(ks)).map(ks -> (KeyState) ks).toList(); + return l == null ? Collections.emptyList() + : l.stream().map(ks -> new KeyStateImpl(ks)).map(ks -> (KeyState) ks).toList(); } @Override @@ -132,10 +133,8 @@ public Map getValidations(EventCoordinates coordi @Override public List kerl(Identifier identifier) { - return kerl.getKERL(identifier.toIdent()) - .getEventsList() - .stream() - .map(kwa -> ProtobufEventFactory.from(kwa)) - .toList(); + var k = kerl.getKERL(identifier.toIdent()); + return k == null ? Collections.emptyList() + : k.getEventsList().stream().map(kwa -> ProtobufEventFactory.from(kwa)).toList(); } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index 1939e0a47..8242fa21c 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -16,13 +16,16 @@ import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications; import com.salesforce.apollo.archipelago.server.FernetServerInterceptor; import com.salesforce.apollo.context.Context; +import com.salesforce.apollo.context.DelegatedContext; +import com.salesforce.apollo.context.StaticContext; +import com.salesforce.apollo.context.ViewChange; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.cryptography.DigestAlgorithm; import com.salesforce.apollo.cryptography.Verifier; import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.ring.RingCommunications; -import com.salesforce.apollo.ring.RingIterator; +import com.salesforce.apollo.ring.SliceIterator; import com.salesforce.apollo.stereotomy.*; import com.salesforce.apollo.stereotomy.caching.CachingKERL; import com.salesforce.apollo.stereotomy.db.UniKERLDirectPooled; @@ -70,7 +73,6 @@ import java.sql.SQLException; import java.time.Duration; import java.time.Instant; -import java.time.temporal.TemporalAmount; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -96,7 +98,7 @@ public class KerlDHT implements ProtoKERLService { private final Ani ani; private final CachingKERL cache; private final JdbcConnectionPool connectionPool; - private final Context context; + private final DelegatedContext context; private final CommonCommunications dhtComms; private final double fpr; private final Duration operationsFrequency; @@ -110,15 +112,13 @@ public class KerlDHT implements ProtoKERLService { private final ScheduledExecutorService scheduler; private final Service service = new Service(); private final AtomicBoolean started = new AtomicBoolean(); - private final TemporalAmount operationTimeout; + private final Duration operationTimeout; public KerlDHT(Duration operationsFrequency, Context context, SigningMember member, BiFunction wrap, JdbcConnectionPool connectionPool, - DigestAlgorithm digestAlgorithm, Router communications, TemporalAmount operationTimeout, + DigestAlgorithm digestAlgorithm, Router communications, Duration operationTimeout, double falsePositiveRate, StereotomyMetrics metrics) { - @SuppressWarnings("unchecked") - final var casting = (Context) context; - this.context = casting; + this.context = new DelegatedContext<>((Context) new StaticContext<>(context)); this.member = member; this.operationTimeout = operationTimeout; this.fpr = falsePositiveRate; @@ -162,7 +162,7 @@ public KerlDHT(Duration operationsFrequency, Context context, public KerlDHT(Duration operationsFrequency, Context context, SigningMember member, JdbcConnectionPool connectionPool, DigestAlgorithm digestAlgorithm, Router communications, - TemporalAmount operationTimeout, double falsePositiveRate, StereotomyMetrics metrics) { + Duration operationTimeout, double falsePositiveRate, StereotomyMetrics metrics) { this(operationsFrequency, context, member, (t, k) -> k, connectionPool, digestAlgorithm, communications, operationTimeout, falsePositiveRate, metrics); } @@ -199,14 +199,13 @@ public KeyState_ append(AttachmentEvent event) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.dontIgnoreSelf(); - iterator.iterate(identifier, null, - (link, r) -> link.append(Collections.emptyList(), Collections.singletonList(event)), null, - (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, - tally, destination, "append events"), - t -> completeIt(result, gathered)); + var slice = context.bftSubset(identifier); + var iterator = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); try { + iterator.iterate((link) -> link.append(Collections.emptyList(), Collections.singletonList(event)), + (futureSailor, tally, link) -> mutate(gathered, futureSailor, identifier, isTimedOut, + tally, link, "append events"), + () -> completeIt(result, gathered), operationsFrequency); List s = result.get().getKeyStatesList(); return s.isEmpty() ? null : s.getFirst(); } catch (InterruptedException e) { @@ -235,13 +234,13 @@ public List append(KERL_ kerl) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.dontIgnoreSelf(); - iterator.iterate(identifier, null, (link, r) -> link.append(kerl), null, - (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, - tally, destination, "append kerl"), - t -> completeIt(result, gathered)); + var slice = context.bftSubset(identifier); + var iterator = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); try { + iterator.iterate((link) -> link.append(kerl), + (futureSailor, tally, link) -> mutate(gathered, futureSailor, identifier, isTimedOut, + tally, link, "append kerl"), + () -> completeIt(result, gathered), operationsFrequency); return result.get().getKeyStatesList(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -264,13 +263,13 @@ public KeyState_ append(KeyEvent_ event) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.dontIgnoreSelf(); - iterator.iterate(identifier, null, (link, r) -> link.append(Collections.singletonList(event)), null, - (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, - tally, destination, "append event"), - t -> completeIt(result, gathered)); + var slice = context.bftSubset(identifier); + var iterator = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); try { + iterator.iterate((link) -> link.append(Collections.singletonList(event)), + (futureSailor, tally, link) -> mutate(gathered, futureSailor, identifier, isTimedOut, + tally, link, "append kerl"), + () -> completeIt(result, gathered), operationsFrequency); var ks = result.get(); return ks.getKeyStatesCount() == 0 ? KeyState_.getDefaultInstance() : ks.getKeyStatesList().getFirst(); } catch (InterruptedException e) { @@ -321,14 +320,13 @@ public Empty appendAttachments(List events) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.dontIgnoreSelf(); - iterator.iterate(identifier, null, (link, r) -> link.appendAttachments(events), null, - (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, - tally, destination, "append attachments"), - t -> completeIt(result, gathered)); - + var slice = context.bftSubset(identifier); + var iterator = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); try { + iterator.iterate((link) -> link.appendAttachments(events), + (futureSailor, tally, link) -> mutate(gathered, futureSailor, identifier, isTimedOut, + tally, link, "append kerl"), + () -> completeIt(result, gathered), operationsFrequency); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -355,13 +353,13 @@ public Empty appendValidations(Validations validations) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.dontIgnoreSelf(); - iterator.iterate(identifier, null, (link, r) -> link.appendValidations(validations), null, - (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, - tally, destination, "append validations"), - t -> completeIt(result, gathered)); + var slice = context.bftSubset(identifier); + var iterator = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); try { + iterator.iterate((link) -> link.appendValidations(validations), + (futureSailor, tally, link) -> mutate(gathered, futureSailor, identifier, isTimedOut, + tally, link, "append kerl"), + () -> completeIt(result, gathered), operationsFrequency); return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -408,13 +406,12 @@ public Attachment getAttachment(EventCoords coordinates) { var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var operation = "getAttachment(%s)".formatted(EventCoordinates.from(coordinates)); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(identifier, null, (link, r) -> link.getAttachment(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, identifier, - isTimedOut, destination, "get attachment", - Attachment.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + var slice = context.bftSubset(identifier); + var iter = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); + iter.iterate(link -> link.getAttachment(coordinates), + (futureSailor, tally, destination) -> read(result, gathered, tally, futureSailor, identifier, + isTimedOut, destination, operation), + () -> failedMajority(result, maxCount(gathered), operation), operationsFrequency); try { return result.get(); } catch (InterruptedException e) { @@ -443,13 +440,12 @@ public KERL_ getKERL(Ident identifier) { var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); var operation = "getKerl(%s)".formatted(Identifier.from(identifier)); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKERL(identifier), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KERL_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + var slice = context.bftSubset(digest); + var iter = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); + iter.iterate(link -> link.getKERL(identifier), + (futureSailor, tally, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation), + () -> failedMajority(result, maxCount(gathered), operation), operationsFrequency); try { return result.get(); } catch (InterruptedException e) { @@ -482,13 +478,12 @@ public KeyEvent_ getKeyEvent(EventCoords coordinates) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyEvent(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyEvent_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + var slice = context.bftSubset(digest); + var iter = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); + iter.iterate(link -> link.getKeyEvent(coordinates), + (futureSailor, tally, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation), + () -> failedMajority(result, maxCount(gathered), operation), operationsFrequency); try { return result.get(); } catch (InterruptedException e) { @@ -518,13 +513,12 @@ public KeyState_ getKeyState(EventCoords coordinates) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyState(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyState_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + var slice = context.bftSubset(digest); + var iter = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); + iter.iterate(link -> link.getKeyState(coordinates), + (futureSailor, tally, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation), + () -> failedMajority(result, maxCount(gathered), operation), operationsFrequency); try { return result.get(); } catch (InterruptedException e) { @@ -555,13 +549,12 @@ public KeyState_ getKeyState(Ident identifier, long sequenceNumber) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyState(identAndSeq), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyState_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + var slice = context.bftSubset(digest); + var iter = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); + iter.iterate(link -> link.getKeyState(identAndSeq), + (futureSailor, tally, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation), + () -> failedMajority(result, maxCount(gathered), operation), operationsFrequency); try { return result.get(); } catch (InterruptedException e) { @@ -591,13 +584,12 @@ public KeyState_ getKeyState(Ident identifier) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyState(identifier), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyState_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + var slice = context.bftSubset(digest); + var iter = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); + iter.iterate(link -> link.getKeyState(identifier), + (futureSailor, tally, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation), + () -> failedMajority(result, maxCount(gathered), operation), operationsFrequency); try { return result.get(); } catch (InterruptedException e) { @@ -627,13 +619,12 @@ public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinat Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyStateWithAttachments(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyStateWithAttachments_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + var slice = context.bftSubset(digest); + var iter = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); + iter.iterate(link -> link.getKeyStateWithAttachments(coordinates), + (futureSailor, tally, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation), + () -> failedMajority(result, maxCount(gathered), operation), operationsFrequency); try { return result.get(); } catch (InterruptedException e) { @@ -663,13 +654,12 @@ public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndVal Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(digest, null, (link, r) -> link.getKeyStateWithEndorsementsAndValidations(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, - isTimedOut, destination, operation, - KeyStateWithEndorsementsAndValidations_.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + var slice = context.bftSubset(digest); + var iter = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); + iter.iterate(link -> link.getKeyStateWithEndorsementsAndValidations(coordinates), + (futureSailor, tally, destination) -> read(result, gathered, tally, futureSailor, digest, + isTimedOut, destination, operation), + () -> failedMajority(result, maxCount(gathered), operation), operationsFrequency); try { return result.get(); } catch (InterruptedException e) { @@ -699,13 +689,12 @@ public Validations getValidations(EventCoords coordinates) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - var iterator = new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms); - iterator.iterate(identifier, null, (link, r) -> link.getValidations(coordinates), - () -> failedMajority(result, maxCount(gathered), operation), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, identifier, - isTimedOut, destination, operation, - Validations.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered), operation)); + var slice = context.bftSubset(identifier); + var iter = new SliceIterator<>(context.getId().toString(), member, slice, dhtComms, scheduler); + iter.iterate(link -> link.getValidations(coordinates), + (futureSailor, tally, destination) -> read(result, gathered, tally, futureSailor, identifier, + isTimedOut, destination, operation), + () -> failedMajority(result, maxCount(gathered), operation), operationsFrequency); try { return result.get(); } catch (InterruptedException e) { @@ -734,13 +723,9 @@ public Optional verifierFor(Identifier identifier) { }; } - public Entry max(HashMultiset gathered) { - return gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)).orElse(null); - } - - public int maxCount(HashMultiset gathered) { - final var max = gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)); - return max.map(Entry::getCount).orElse(0); + public void nextView(ViewChange viewChange) { + log.info("Next view: {} context: {} on: {}", viewChange.diadem(), viewChange.context().getId(), member.getId()); + context.setContext(viewChange.context()); } public void start(Duration duration) { @@ -778,7 +763,7 @@ private void completeIt(CompletableFuture result, HashMultiset gathere .stream() .max(Ordering.natural().onResultOf(Multiset.Entry::getCount)) .orElse(null); - var majority = context.size() == 1 ? 1 : context.majority(); + var majority = context.size() == 1 ? 1 : context.toleranceLevel() + 1; if (max != null) { if (max.getCount() >= majority) { try { @@ -788,6 +773,8 @@ private void completeIt(CompletableFuture result, HashMultiset gathere } return; } + } else { + log.warn("Unable to achieve majority, max agree: 0 required: {}", majority + " on: {}", member.getId()); } result.completeExceptionally(new CompletionException( "Unable to achieve majority, max: " + (max == null ? 0 : max.getCount()) + " required: " + majority + " on: " @@ -795,11 +782,11 @@ private void completeIt(CompletableFuture result, HashMultiset gathere } private boolean failedMajority(CompletableFuture result, int maxAgree, String operation) { - log.debug("Unable to achieve majority read: {}, max: {} required: {} on: {}", operation, maxAgree, - context.majority(), member.getId()); + log.debug("Unable to achieve majority read: {}, max agree: {} required: {} on: {}", operation, maxAgree, + context.toleranceLevel() + 1, member.getId()); return result.completeExceptionally(new CompletionException( - "Unable to achieve majority read: " + operation + ", max: " + maxAgree + " required: " + context.majority() - + " on: " + member.getId())); + "Unable to achieve majority read: " + operation + ", max agree: " + maxAgree + " required: " + + context.toleranceLevel() + 1 + " on: " + member.getId())); } private void initializeSchema() { @@ -843,12 +830,21 @@ private CombinedIntervals keyIntervals() { return new CombinedIntervals(intervals); } + private Entry max(HashMultiset gathered) { + return gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)).orElse(null); + } + + private int maxCount(HashMultiset gathered) { + final var max = gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)); + return max.map(Entry::getCount).orElse(0); + } + private boolean mutate(HashMultiset gathered, Optional futureSailor, Digest identifier, - Supplier isTimedOut, AtomicInteger tally, - RingCommunications.Destination destination, String action) { + Supplier isTimedOut, AtomicInteger tally, DhtService destination, + String action) { if (futureSailor.isEmpty()) { log.debug("Failed {}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), - destination.member() == null ? "" : destination.member().getId(), member.getId()); + destination.getMember() == null ? "" : destination.getMember().getId(), member.getId()); return !isTimedOut.get(); } T content = futureSailor.get(); @@ -857,38 +853,71 @@ private boolean mutate(HashMultiset gathered, Optional futureSailor, D .stream() .max(Ordering.natural().onResultOf(Entry::getCount)) .ifPresent(max -> tally.set(max.getCount())); - log.warn("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(), + log.warn("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.getMember().getId(), member.getId()); return !isTimedOut.get(); } private boolean read(CompletableFuture result, HashMultiset gathered, AtomicInteger tally, Optional futureSailor, Digest identifier, Supplier isTimedOut, - RingCommunications.Destination destination, String action, T empty) { + DhtService destination, String action) { if (futureSailor.isEmpty()) { log.debug("Failed {}: {} tally: {} from: {} on: {}", action, identifier, tally, - destination.member() == null ? "" : destination.member().getId(), member.getId()); + destination.getMember() == null ? "" : destination.getMember().getId(), member.getId()); return !isTimedOut.get(); } T content = futureSailor.get(); - log.trace("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(), + log.trace("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.getMember().getId(), member.getId()); gathered.add(content); var max = max(gathered); if (max != null) { tally.set(max.getCount()); - var ctxMajority = context.size() == 1 ? 1 : context.majority(); + var ctxMajority = context.size() == 1 ? 1 : context.toleranceLevel() + 1; final var majority = tally.get() >= ctxMajority; if (majority) { result.complete(max.getElement()); log.debug("Majority: {} achieved: {}: {} tally: {} on: {}", max.getCount(), action, identifier, tally.get(), member.getId()); return false; + } else { + log.info("Majority: {} required: {} not achieved: {}: {} tally: {} on: {}", max.getCount(), ctxMajority, + action, identifier, tally.get(), member.getId()); } } return !isTimedOut.get(); } + // private boolean read(CompletableFuture result, HashMultiset gathered, AtomicInteger tally, + // Optional futureSailor, Digest identifier, Supplier isTimedOut, + // RingCommunications.Destination destination, String action, T empty) { + // if (futureSailor.isEmpty()) { + // log.debug("Failed {}: {} tally: {} from: {} on: {}", action, identifier, tally, + // destination.member() == null ? "" : destination.member().getId(), member.getId()); + // return !isTimedOut.get(); + // } + // T content = futureSailor.get(); + // log.trace("{}: {} tally: {} from: {} on: {}", action, identifier, tally.get(), destination.member().getId(), + // member.getId()); + // gathered.add(content); + // var max = max(gathered); + // if (max != null) { + // tally.set(max.getCount()); + // var ctxMajority = context.size() == 1 ? 1 : context.toleranceLevel() + 1; + // final var majority = tally.get() >= ctxMajority; + // if (majority) { + // result.complete(max.getElement()); + // log.debug("Majority: {} achieved: {}: {} tally: {} on: {}", max.getCount(), action, identifier, + // tally.get(), member.getId()); + // return false; + // } else { + // log.info("Majority: {} required: {} not achieved: {}: {} tally: {} on: {}", max.getCount(), ctxMajority, + // action, identifier, tally.get(), member.getId()); + // } + // } + // return !isTimedOut.get(); + // } + private void reconcile(Optional result, RingCommunications.Destination destination, ScheduledExecutorService scheduler, Duration duration) {