diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 6e3ffe1ea..45d3a39c5 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -781,7 +781,7 @@ public void start() { return; } log.info("CHOAM startup, majority: {} on: {}", params.majority(), params.member().getId()); - combine.start(params.producer().gossipDuration(), params.scheduler()); + combine.start(params.producer().gossipDuration()); transitions.fsm().enterStartState(); transitions.start(); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index 9cd6a9308..5b0056b26 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -167,7 +167,7 @@ public void gather() { proposals.put(params().member().getId(), proposed); ds.setValue(join.toByteString()); - coordinator.start(params().producer().gossipDuration(), params().scheduler()); + coordinator.start(params().producer().gossipDuration()); controller.start(); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java b/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java index bba67798f..27662c492 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java @@ -6,23 +6,6 @@ */ package com.salesforce.apollo.choam; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.function.Supplier; - -import org.h2.mvstore.MVStore; -import org.h2.mvstore.OffHeapStore; -import org.joou.ULong; - import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.MetricRegistry; import com.netflix.concurrency.limits.limit.AIMDLimit; @@ -46,10 +29,24 @@ import com.salesforce.apollo.membership.Member; import com.salesforce.apollo.membership.SigningMember; import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster; +import org.h2.mvstore.MVStore; +import org.h2.mvstore.OffHeapStore; +import org.joou.ULong; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; /** * @author hal.hildebrand - * */ public record Parameters(Parameters.RuntimeParameters runtime, ReliableBroadcaster.Parameters combine, Duration gossipDuration, int maxCheckpointSegments, Duration submitTimeout, @@ -60,10 +57,54 @@ public record Parameters(Parameters.RuntimeParameters runtime, ReliableBroadcast ExponentialBackoffPolicy.Builder submitPolicy, int checkpointSegmentSize, ExponentialBackoffPolicy.Builder drainPolicy) { + public static Builder newBuilder() { + return new Builder(); + } + public int majority() { return runtime.context.majority(); } + public SigningMember member() { + return runtime.member; + } + + public Context context() { + return runtime.context; + } + + public Router communications() { + return runtime.communications; + } + + public ChoamMetrics metrics() { + return runtime.metrics; + } + + public Function checkpointer() { + return runtime.checkpointer; + } + + public Function, List> genesisData() { + return runtime.genesisData; + } + + public TransactionExecutor processor() { + return runtime.processor; + } + + public BiConsumer restorer() { + return runtime.restorer; + } + + public Executor exec() { + return runtime.exec; + } + + public Supplier kerl() { + return runtime.kerl; + } + public static class MvStoreBuilder implements Cloneable { private int autoCommitBufferSize = -1; private int autoCompactFillRate = -1; @@ -142,68 +183,67 @@ public int getAutoCommitBufferSize() { return autoCommitBufferSize; } - public int getAutoCompactFillRate() { - return autoCompactFillRate; - } - - public int getCachConcurrency() { - return cachConcurrency; + public MvStoreBuilder setAutoCommitBufferSize(int autoCommitBufferSize) { + this.autoCommitBufferSize = autoCommitBufferSize; + return this; } - public int getCachSize() { - return cachSize; + public int getAutoCompactFillRate() { + return autoCompactFillRate; } - public File getFileName() { - return fileName; + public MvStoreBuilder setAutoCompactFillRate(int autoCompactFillRate) { + this.autoCompactFillRate = autoCompactFillRate; + return this; } - public int getKeysPerPage() { - return keysPerPage; + public int getCachConcurrency() { + return cachConcurrency; } - public int getPageSplitSize() { - return pageSplitSize; + public MvStoreBuilder setCachConcurrency(int cachConcurrency) { + this.cachConcurrency = cachConcurrency; + return this; } - public boolean isCompress() { - return compress; + public int getCachSize() { + return cachSize; } - public boolean isCompressHigh() { - return compressHigh; + public MvStoreBuilder setCachSize(int cachSize) { + this.cachSize = cachSize; + return this; } - public boolean isOffHeap() { - return offHeap; + public File getFileName() { + return fileName; } - public boolean isReadOnly() { - return readOnly; + public MvStoreBuilder setFileName(File fileName) { + this.fileName = fileName; + return this; } - public boolean isRecoveryMode() { - return recoveryMode; + public int getKeysPerPage() { + return keysPerPage; } - public MvStoreBuilder setAutoCommitBufferSize(int autoCommitBufferSize) { - this.autoCommitBufferSize = autoCommitBufferSize; + public MvStoreBuilder setKeysPerPage(int keysPerPage) { + this.keysPerPage = keysPerPage; return this; } - public MvStoreBuilder setAutoCompactFillRate(int autoCompactFillRate) { - this.autoCompactFillRate = autoCompactFillRate; - return this; + public int getPageSplitSize() { + return pageSplitSize; } - public MvStoreBuilder setCachConcurrency(int cachConcurrency) { - this.cachConcurrency = cachConcurrency; + public MvStoreBuilder setPageSplitSize(int pageSplitSize) { + this.pageSplitSize = pageSplitSize; return this; } - public MvStoreBuilder setCachSize(int cachSize) { - this.cachSize = cachSize; - return this; + public boolean isCompress() { + return compress; } public MvStoreBuilder setCompress(boolean compress) { @@ -211,19 +251,17 @@ public MvStoreBuilder setCompress(boolean compress) { return this; } - public MvStoreBuilder setCompressHigh(boolean compressHigh) { - this.compressHigh = compressHigh; - return this; + public boolean isCompressHigh() { + return compressHigh; } - public MvStoreBuilder setFileName(File fileName) { - this.fileName = fileName; + public MvStoreBuilder setCompressHigh(boolean compressHigh) { + this.compressHigh = compressHigh; return this; } - public MvStoreBuilder setKeysPerPage(int keysPerPage) { - this.keysPerPage = keysPerPage; - return this; + public boolean isOffHeap() { + return offHeap; } public MvStoreBuilder setOffHeap(boolean offHeap) { @@ -231,9 +269,8 @@ public MvStoreBuilder setOffHeap(boolean offHeap) { return this; } - public MvStoreBuilder setPageSplitSize(int pageSplitSize) { - this.pageSplitSize = pageSplitSize; - return this; + public boolean isReadOnly() { + return readOnly; } public MvStoreBuilder setReadOnly(boolean readOnly) { @@ -241,6 +278,10 @@ public MvStoreBuilder setReadOnly(boolean readOnly) { return this; } + public boolean isRecoveryMode() { + return recoveryMode; + } + public MvStoreBuilder setRecoveryMode(boolean recoveryMode) { this.recoveryMode = recoveryMode; return this; @@ -248,13 +289,17 @@ public MvStoreBuilder setRecoveryMode(boolean recoveryMode) { } public record RuntimeParameters(Context context, Router communications, SigningMember member, - ScheduledExecutorService scheduler, Function, List> genesisData, TransactionExecutor processor, BiConsumer restorer, Function checkpointer, ChoamMetrics metrics, Executor exec, Supplier kerl, FoundationSeal foundation) { + public static Builder newBuilder() { + return new Builder(); + } + public static class Builder implements Cloneable { private final static Function NULL_CHECKPOINTER; + static { NULL_CHECKPOINTER = h -> { File cp; @@ -270,6 +315,7 @@ public static class Builder implements Cloneable { return cp; }; } + private Function checkpointer = NULL_CHECKPOINTER; private Router communications; private Context context; @@ -280,14 +326,13 @@ public static class Builder implements Cloneable { private SigningMember member; private ChoamMetrics metrics; private TransactionExecutor processor = (i, h, t, f, exec) -> { - }; + }; private BiConsumer restorer = (height, checkpointState) -> { - }; - private ScheduledExecutorService scheduler; + }; public RuntimeParameters build() { - return new RuntimeParameters(context, communications, member, scheduler, genesisData, processor, - restorer, checkpointer, metrics, exec, kerl, foundation); + return new RuntimeParameters(context, communications, member, genesisData, processor, restorer, + checkpointer, metrics, exec, kerl, foundation); } @Override @@ -305,115 +350,102 @@ public Function getCheckpointer() { return checkpointer; } - public Router getCommunications() { - return communications; - } - - public Context getContext() { - return context; - } - - public Executor getExec() { - return exec; - } - - public FoundationSeal getFoundation() { - return foundation; - } - - public Function, List> getGenesisData() { - return genesisData; - } - - public Supplier getKerl() { - return kerl; - } - - public SigningMember getMember() { - return member; - } - - public ChoamMetrics getMetrics() { - return metrics; - } - - public TransactionExecutor getProcessor() { - return processor; - } - - public BiConsumer getRestorer() { - return restorer; - } - - public ScheduledExecutorService getScheduler() { - return scheduler; - } - public Builder setCheckpointer(Function checkpointer) { this.checkpointer = checkpointer; return this; } + public Router getCommunications() { + return communications; + } + public Builder setCommunications(Router communications) { this.communications = communications; return this; } + public Context getContext() { + return context; + } + @SuppressWarnings("unchecked") public Builder setContext(Context context) { this.context = (Context) context; return this; } + public Executor getExec() { + return exec; + } + public Builder setExec(Executor exec) { this.exec = exec; return this; } + public FoundationSeal getFoundation() { + return foundation; + } + public Builder setFoundation(FoundationSeal foundation) { this.foundation = foundation; return this; } + public Function, List> getGenesisData() { + return genesisData; + } + public Builder setGenesisData(Function, List> genesisData) { this.genesisData = genesisData; return this; } + public Supplier getKerl() { + return kerl; + } + public Builder setKerl(Supplier kerl) { this.kerl = kerl; return this; } + public SigningMember getMember() { + return member; + } + public Builder setMember(SigningMember member) { this.member = member; return this; } + public ChoamMetrics getMetrics() { + return metrics; + } + public Builder setMetrics(ChoamMetrics metrics) { this.metrics = metrics; return this; } + public TransactionExecutor getProcessor() { + return processor; + } + public Builder setProcessor(TransactionExecutor processor) { this.processor = processor; return this; } - public Builder setRestorer(BiConsumer biConsumer) { - this.restorer = biConsumer; - return this; + public BiConsumer getRestorer() { + return restorer; } - public Builder setScheduler(ScheduledExecutorService scheduler) { - this.scheduler = scheduler; + public Builder setRestorer(BiConsumer biConsumer) { + this.restorer = biConsumer; return this; } } - - public static Builder newBuilder() { - return new Builder(); - } } public record BootstrapParameters(Duration gossipDuration, int maxViewBlocks, int maxSyncBlocks) { @@ -421,6 +453,7 @@ public record BootstrapParameters(Duration gossipDuration, int maxViewBlocks, in public static Builder newBuilder() { return new Builder(); } + public static class Builder { private Duration gossipDuration = Duration.ofSeconds(1); private int maxSyncBlocks = 100; @@ -434,24 +467,24 @@ public Duration getGossipDuration() { return gossipDuration; } - public int getMaxSyncBlocks() { - return maxSyncBlocks; - } - - public int getMaxViewBlocks() { - return maxViewBlocks; - } - public Builder setGossipDuration(Duration gossipDuration) { this.gossipDuration = gossipDuration; return this; } + public int getMaxSyncBlocks() { + return maxSyncBlocks; + } + public Builder setMaxSyncBlocks(int maxSyncBlocks) { this.maxSyncBlocks = maxSyncBlocks; return this; } + public int getMaxViewBlocks() { + return maxViewBlocks; + } + public Builder setMaxViewBlocks(int maxViewBlocks) { this.maxViewBlocks = maxViewBlocks; return this; @@ -483,51 +516,51 @@ public Duration getBatchInterval() { return batchInterval; } - public Config.Builder getEthereal() { - return ethereal; - } - - public Duration getGossipDuration() { - return gossipDuration; - } - - public int getMaxBatchByteSize() { - return maxBatchByteSize; - } - - public int getMaxBatchCount() { - return maxBatchCount; - } - - public Duration getMaxGossipDelay() { - return maxGossipDelay; - } - public Builder setBatchInterval(Duration batchInterval) { this.batchInterval = batchInterval; return this; } + public Config.Builder getEthereal() { + return ethereal; + } + public Builder setEthereal(Config.Builder ethereal) { this.ethereal = ethereal; return this; } + public Duration getGossipDuration() { + return gossipDuration; + } + public Builder setGossipDuration(Duration gossipDuration) { this.gossipDuration = gossipDuration; return this; } + public int getMaxBatchByteSize() { + return maxBatchByteSize; + } + public Builder setMaxBatchByteSize(int maxBatchByteSize) { this.maxBatchByteSize = maxBatchByteSize; return this; } + public int getMaxBatchCount() { + return maxBatchCount; + } + public Builder setMaxBatchCount(int maxBatchCount) { this.maxBatchCount = maxBatchCount; return this; } + public Duration getMaxGossipDelay() { + return maxGossipDelay; + } + public Builder setMaxGossipDelay(Duration maxGossipDelay) { this.maxGossipDelay = maxGossipDelay; return this; @@ -535,10 +568,6 @@ public Builder setMaxGossipDelay(Duration maxGossipDelay) { } } - public static Builder newBuilder() { - return new Builder(); - } - public static class LimiterBuilder { private Duration backlogDuration = Duration.ofSeconds(1); private int backlogSize = 1_000; @@ -574,60 +603,60 @@ public int getBacklogSize() { return backlogSize; } - public double getBackoffRatio() { - return backoffRatio; - } - - public int getInitialLimit() { - return initialLimit; - } - - public int getMaxLimit() { - return maxLimit; - } - - public int getMinLimit() { - return minLimit; - } - - public Duration getTimeout() { - return timeout; - } - - public LimiterBuilder setBacklogDuration(Duration backlogDuration) { - this.backlogDuration = backlogDuration; - return this; - } - public LimiterBuilder setBacklogSize(int backlogSize) { this.backlogSize = backlogSize; return this; } + public double getBackoffRatio() { + return backoffRatio; + } + public LimiterBuilder setBackoffRatio(double backoffRatio) { this.backoffRatio = backoffRatio; return this; } + public int getInitialLimit() { + return initialLimit; + } + public LimiterBuilder setInitialLimit(int initialLimit) { this.initialLimit = initialLimit; return this; } + public int getMaxLimit() { + return maxLimit; + } + public LimiterBuilder setMaxLimit(int maxLimit) { this.maxLimit = maxLimit; return this; } + public int getMinLimit() { + return minLimit; + } + public LimiterBuilder setMinLimit(int minLimit) { this.minLimit = minLimit; return this; } + public Duration getTimeout() { + return timeout; + } + public LimiterBuilder setTimeout(Duration timeout) { this.timeout = timeout; return this; } + + public LimiterBuilder setBacklogDuration(Duration backlogDuration) { + this.backlogDuration = backlogDuration; + return this; + } } public static class Builder implements Cloneable { @@ -639,10 +668,13 @@ public static class Builder implements Cloneable { .build(); private DigestAlgorithm digestAlgorithm = DigestAlgorithm.DEFAULT; private ExponentialBackoffPolicy.Builder drainPolicy = ExponentialBackoffPolicy.newBuilder() - .setInitialBackoff(Duration.ofMillis(5)) + .setInitialBackoff( + Duration.ofMillis(5)) .setJitter(0.2) .setMultiplier(1.2) - .setMaxBackoff(Duration.ofMillis(500)); + .setMaxBackoff( + Duration.ofMillis( + 500)); private Digest genesisViewId; private Duration gossipDuration = Duration.ofSeconds(1); private int maxCheckpointSegments = 200; @@ -650,10 +682,13 @@ public static class Builder implements Cloneable { private ProducerParameters producer = ProducerParameters.newBuilder().build(); private int regenerationCycles = 20; private ExponentialBackoffPolicy.Builder submitPolicy = ExponentialBackoffPolicy.newBuilder() - .setInitialBackoff(Duration.ofMillis(10)) + .setInitialBackoff( + Duration.ofMillis(10)) .setJitter(0.2) .setMultiplier(1.6) - .setMaxBackoff(Duration.ofMillis(500)); + .setMaxBackoff( + Duration.ofMillis( + 500)); private Duration submitTimeout = Duration.ofSeconds(30); private int synchronizationCycles = 10; private LimiterBuilder txnLimiterBuilder = new LimiterBuilder(); @@ -679,198 +714,154 @@ public BootstrapParameters getBootstrap() { return bootstrap; } - public int getCheckpointBlockDelta() { - return checkpointBlockDelta; - } - - public int getCheckpointSegmentSize() { - return checkpointSegmentSize; - } - - public ReliableBroadcaster.Parameters getCombine() { - return combine; - } - - public DigestAlgorithm getDigestAlgorithm() { - return digestAlgorithm; - } - - public ExponentialBackoffPolicy.Builder getDrainPolicy() { - return drainPolicy; - } - - public Digest getGenesisViewId() { - return genesisViewId; - } - - public Duration getGossipDuration() { - return gossipDuration; - } - - public int getMaxCheckpointSegments() { - return maxCheckpointSegments; - } - - public MvStoreBuilder getMvBuilder() { - return mvBuilder; - } - - public ProducerParameters getProducer() { - return producer; - } - - public int getRegenerationCycles() { - return regenerationCycles; - } - - public ExponentialBackoffPolicy.Builder getSubmitPolicy() { - return submitPolicy; - } - - public Duration getSubmitTimeout() { - return submitTimeout; - } - - public int getSynchronizationCycles() { - return synchronizationCycles; - } - - public LimiterBuilder getTxnLimiterBuilder() { - return txnLimiterBuilder; - } - - public SignatureAlgorithm getViewSigAlgorithm() { - return viewSigAlgorithm; - } - public Builder setBootstrap(BootstrapParameters bootstrap) { this.bootstrap = bootstrap; return this; } + public int getCheckpointBlockDelta() { + return checkpointBlockDelta; + } + public Builder setCheckpointBlockDelta(int checkpointBlockDelta) { this.checkpointBlockDelta = checkpointBlockDelta; return this; } + public int getCheckpointSegmentSize() { + return checkpointSegmentSize; + } + public Builder setCheckpointSegmentSize(int checkpointSegmentSize) { this.checkpointSegmentSize = checkpointSegmentSize; return this; } + public ReliableBroadcaster.Parameters getCombine() { + return combine; + } + public Builder setCombine(ReliableBroadcaster.Parameters combine) { this.combine = combine; return this; } + public DigestAlgorithm getDigestAlgorithm() { + return digestAlgorithm; + } + public Builder setDigestAlgorithm(DigestAlgorithm digestAlgorithm) { this.digestAlgorithm = digestAlgorithm; return this; } + public ExponentialBackoffPolicy.Builder getDrainPolicy() { + return drainPolicy; + } + public Builder setDrainPolicy(ExponentialBackoffPolicy.Builder drainPolicy) { this.drainPolicy = drainPolicy; return this; } + public Digest getGenesisViewId() { + return genesisViewId; + } + public Builder setGenesisViewId(Digest genesisViewId) { this.genesisViewId = genesisViewId; return this; } + public Duration getGossipDuration() { + return gossipDuration; + } + public Parameters.Builder setGossipDuration(Duration gossipDuration) { this.gossipDuration = gossipDuration; return this; } + public int getMaxCheckpointSegments() { + return maxCheckpointSegments; + } + public Builder setMaxCheckpointSegments(int maxCheckpointSegments) { this.maxCheckpointSegments = maxCheckpointSegments; return this; } + public MvStoreBuilder getMvBuilder() { + return mvBuilder; + } + public Builder setMvBuilder(MvStoreBuilder mvBuilder) { this.mvBuilder = mvBuilder; return this; } + public ProducerParameters getProducer() { + return producer; + } + public Builder setProducer(ProducerParameters producer) { this.producer = producer; return this; } + public int getRegenerationCycles() { + return regenerationCycles; + } + public Builder setRegenerationCycles(int regenerationCycles) { this.regenerationCycles = regenerationCycles; return this; } + public ExponentialBackoffPolicy.Builder getSubmitPolicy() { + return submitPolicy; + } + public Builder setSubmitPolicy(ExponentialBackoffPolicy.Builder submitPolicy) { this.submitPolicy = submitPolicy; return this; } + public Duration getSubmitTimeout() { + return submitTimeout; + } + public Builder setSubmitTimeout(Duration submitTimeout) { this.submitTimeout = submitTimeout; return this; } + public int getSynchronizationCycles() { + return synchronizationCycles; + } + public Builder setSynchronizationCycles(int synchronizationCycles) { this.synchronizationCycles = synchronizationCycles; return this; } + public LimiterBuilder getTxnLimiterBuilder() { + return txnLimiterBuilder; + } + public Builder setTxnLimiterBuilder(LimiterBuilder txnLimiterBuilder) { this.txnLimiterBuilder = txnLimiterBuilder; return this; } + public SignatureAlgorithm getViewSigAlgorithm() { + return viewSigAlgorithm; + } + public Builder setViewSigAlgorithm(SignatureAlgorithm viewSigAlgorithm) { this.viewSigAlgorithm = viewSigAlgorithm; return this; } } - public SigningMember member() { - return runtime.member; - } - - public Context context() { - return runtime.context; - } - - public Router communications() { - return runtime.communications; - } - - public ChoamMetrics metrics() { - return runtime.metrics; - } - - public ScheduledExecutorService scheduler() { - return runtime.scheduler; - } - - public Function checkpointer() { - return runtime.checkpointer; - } - - public Function, List> genesisData() { - return runtime.genesisData; - } - - public TransactionExecutor processor() { - return runtime.processor; - } - - public BiConsumer restorer() { - return runtime.restorer; - } - - public Executor exec() { - return runtime.exec; - } - - public Supplier kerl() { - return runtime.kerl; - } - } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 537129666..ae1d27b62 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -163,7 +163,7 @@ public void complete() { public void startProduction() { log.debug("Starting production for: {} on: {}", getViewId(), params().member().getId()); controller.start(); - coordinator.start(params().producer().gossipDuration(), params().scheduler()); + coordinator.start(params().producer().gossipDuration()); } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index e27ffb653..b41c84427 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -25,6 +25,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -35,26 +36,25 @@ import static com.salesforce.apollo.crypto.QualifiedBase64.signature; /** - * View reconfiguration. Attempts to create a new view reconfiguration. View - * reconfiguration needs at least 2f+1 certified members from the next view. The - * protol finishes with a list of at least 2f+1 Joins with at least 2f+1 + * View reconfiguration. Attempts to create a new view reconfiguration. View reconfiguration needs at least 2f+1 + * certified members from the next view. The protol finishes with a list of at least 2f+1 Joins with at least 2f+1 * certifications from the current view, or fails * * @author hal.hildebrand */ public class ViewAssembly { - private final static Logger log = LoggerFactory.getLogger(ViewAssembly.class); - protected final Transitions transitions; - private final AtomicBoolean cancelSlice = new AtomicBoolean(); - private final SliceIterator committee; - private final Map nextAssembly; - private final Digest nextViewId; - private final Map proposals = new ConcurrentHashMap<>(); - private final Consumer publisher; - private final Map slate = new ConcurrentHashMap<>(); - private final Map> unassigned = new ConcurrentHashMap<>(); - private final ViewContext view; + private final static Logger log = LoggerFactory.getLogger(ViewAssembly.class); + protected final Transitions transitions; + private final AtomicBoolean cancelSlice = new AtomicBoolean(); + private final SliceIterator committee; + private final Map nextAssembly; + private final Digest nextViewId; + private final Map proposals = new ConcurrentHashMap<>(); + private final Consumer publisher; + private final Map slate = new ConcurrentHashMap<>(); + private final Map> unassigned = new ConcurrentHashMap<>(); + private final ViewContext view; public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publisher, CommonCommunications comms) { @@ -62,19 +62,19 @@ public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publ this.nextViewId = nextViewId; this.publisher = publisher; nextAssembly = Committee.viewMembersOf(nextViewId, params().context()) - .stream() - .collect(Collectors.toMap(m -> m.getId(), m -> m)); + .stream() + .collect(Collectors.toMap(m -> m.getId(), m -> m)); var slice = new ArrayList<>(nextAssembly.values()); committee = new SliceIterator("Committee for " + nextViewId, params().member(), slice, comms, - params().exec()); + params().exec()); final Fsm fsm = Fsm.construct(new Recon(), Transitions.class, - Reconfigure.AWAIT_ASSEMBLY, true); + Reconfigure.AWAIT_ASSEMBLY, true); this.transitions = fsm.getTransitions(); fsm.setName("View Recon" + params().member().getId()); log.debug("View reconfiguration from: {} to: {}, next assembly: {} on: {}", view.context().getId(), nextViewId, - nextAssembly.keySet(), params().member().getId()); + nextAssembly.keySet(), params().member().getId()); } public Map getSlate() { @@ -98,25 +98,24 @@ void complete() { cancelSlice.set(true); if (slate.size() < params().context().majority()) { proposals.values() - .stream() - .filter(p -> p.validations.size() >= view.context().majority()) - .sorted(Comparator.comparing(p -> p.member.getId())) - .forEach(p -> slate.put(p.member(), joinOf(p))); + .stream() + .filter(p -> p.validations.size() >= view.context().majority()) + .sorted(Comparator.comparing(p -> p.member.getId())) + .forEach(p -> slate.put(p.member(), joinOf(p))); if (slate.size() >= params().context().majority()) { log.debug("Complete. Electing slate: {} of: {} on: {}", slate.size(), nextViewId, params().member()); } else { log.error("Failed completion, election required: {} slate: {} of: {} on: {}", - params().context().majority() + 1, - proposals.values() - .stream() - .map(p -> String.format("%s:%s", p.member.getId(), p.validations.size())) - .toList(), - nextViewId, params().member()); + params().context().majority() + 1, proposals.values() + .stream() + .map(p -> String.format("%s:%s", p.member.getId(), + p.validations.size())) + .toList(), nextViewId, params().member()); transitions.failed(); } } log.debug("View Assembly: {} completed with: {} members on: {}", nextViewId, slate.size(), - params().member().getId()); + params().member().getId()); } void finalElection() { @@ -126,14 +125,14 @@ void finalElection() { Consumer> inbound() { return lre -> { lre.stream() - .flatMap(re -> re.getMembersList().stream()) - .map(e -> join(e)) - .filter(r -> r != null) - .reduce((a, b) -> Reassemble.newBuilder(a) - .addAllMembers(b.getMembersList()) - .addAllValidations(b.getValidationsList()) - .build()) - .ifPresent(publisher); + .flatMap(re -> re.getMembersList().stream()) + .map(e -> join(e)) + .filter(r -> r != null) + .reduce((a, b) -> Reassemble.newBuilder(a) + .addAllMembers(b.getMembersList()) + .addAllValidations(b.getValidationsList()) + .build()) + .ifPresent(publisher); lre.stream().flatMap(re -> re.getValidationsList().stream()).forEach(e -> validate(e)); }; } @@ -149,9 +148,10 @@ private void completeSlice(AtomicReference retryDelay, AtomicReference } log.trace("Proposal incomplete of: {} gathered: {} desired: {}, retrying: {} on: {}", nextViewId, - proposals.keySet().stream().toList(), nextAssembly.size(), delay, params().member().getId()); + proposals.keySet().stream().toList(), nextAssembly.size(), delay, params().member().getId()); if (!cancelSlice.get()) { - params().scheduler().schedule(() -> reiterate.get().run(), delay.toMillis(), TimeUnit.MILLISECONDS); + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()) + .schedule(() -> reiterate.get().run(), delay.toMillis(), TimeUnit.MILLISECONDS); } } @@ -169,7 +169,7 @@ private boolean consider(Optional futureSailor, Terminal term, Membe var vm = new Digest(member.getId()); if (!m.getId().equals(vm)) { log.debug("Invalid join response from: {} expected: {} on: {}", term.getMember().getId(), vm, - params().member().getId()); + params().member().getId()); return !gathered(); } var reassemble = join(member); @@ -193,7 +193,7 @@ private Reassemble join(ViewMember vm) { if (m == null) { if (log.isTraceEnabled()) { log.trace("Invalid view member: {} on: {}", ViewContext.print(vm, params().digestAlgorithm()), - params().member().getId()); + params().member().getId()); } return null; } @@ -203,7 +203,7 @@ private Reassemble join(ViewMember vm) { if (!m.verify(signature(vm.getSignature()), encoded.toByteString())) { if (log.isTraceEnabled()) { log.trace("Could not verify consensus key from view member: {} on: {}", - ViewContext.print(vm, params().digestAlgorithm()), params().member().getId()); + ViewContext.print(vm, params().digestAlgorithm()), params().member().getId()); } return null; } @@ -212,7 +212,7 @@ private Reassemble join(ViewMember vm) { if (consensusKey == null) { if (log.isTraceEnabled()) { log.trace("Could not deserialize consensus key from view member: {} on: {}", - ViewContext.print(vm, params().digestAlgorithm()), params().member().getId()); + ViewContext.print(vm, params().digestAlgorithm()), params().member().getId()); } return null; } @@ -227,7 +227,7 @@ private Reassemble join(ViewMember vm) { if (newJoin.get()) { if (log.isTraceEnabled()) { log.trace("Adding view member: {} on: {}", ViewContext.print(vm, params().digestAlgorithm()), - params().member().getId()); + params().member().getId()); } var validations = unassigned.remove(mid); if (validations != null) { @@ -237,24 +237,25 @@ private Reassemble join(ViewMember vm) { transitions.gathered(); } return Reassemble.newBuilder() - .addMembers(vm) - .addValidations(proposed.validations.get(params().member())) - .build(); + .addMembers(vm) + .addValidations(proposed.validations.get(params().member())) + .build(); } return null; } private Join joinOf(Proposed candidate) { final List witnesses = candidate.validations.values() - .stream() - .map(v -> v.getWitness()) - .sorted(Comparator.comparing(c -> new Digest(c.getId()))) - .collect(Collectors.toList()); + .stream() + .map(v -> v.getWitness()) + .sorted( + Comparator.comparing(c -> new Digest(c.getId()))) + .collect(Collectors.toList()); return Join.newBuilder() - .setMember(candidate.vm) - .setView(nextViewId.toDigeste()) - .addAllEndorsements(witnesses) - .build(); + .setMember(candidate.vm) + .setView(nextViewId.toDigeste()) + .addAllEndorsements(witnesses) + .build(); } private Parameters params() { @@ -282,13 +283,13 @@ private void validate(Validate v) { } if (!view.validate(proposed.vm, v)) { log.warn("Invalid cetification for view join: {} from: {} on: {}", digest, - Digest.from(v.getWitness().getId()), params().member().getId()); + Digest.from(v.getWitness().getId()), params().member().getId()); return; } var newCertifier = new AtomicBoolean(); proposed.validations.computeIfAbsent(certifier, k -> { log.debug("Validation of view member: {}:{} using certifier: {} on: {}", member.getId(), digest, - certifier.getId(), params().member().getId()); + certifier.getId(), params().member().getId()); newCertifier.set(true); return v; }); @@ -307,20 +308,17 @@ private class Recon implements Reconfiguration { @Override public void certify() { - if (proposals.values() - .stream() - .filter(p -> p.validations.size() == nextAssembly.size()) - .count() == nextAssembly.size()) { + if (proposals.values().stream().filter(p -> p.validations.size() == nextAssembly.size()).count() + == nextAssembly.size()) { cancelSlice.set(true); log.debug("Certifying slate: {} of: {} on: {}", proposals.size(), nextViewId, params().member()); transitions.certified(); } - log.debug("Not certifying slate: {} of: {} on: {}", - proposals.entrySet() - .stream() - .map(e -> String.format("%s:%s", e.getKey(), e.getValue().validations.size())) - .toList(), - nextViewId, params().member()); + log.debug("Not certifying slate: {} of: {} on: {}", proposals.entrySet() + .stream() + .map(e -> String.format("%s:%s", e.getKey(), + e.getValue().validations.size())) + .toList(), nextViewId, params().member()); } @Override @@ -331,21 +329,20 @@ public void complete() { @Override public void elect() { proposals.values() - .stream() - .filter(p -> p.validations.size() >= view.context().majority()) - .sorted(Comparator.comparing(p -> p.member.getId())) - .forEach(p -> slate.put(p.member(), joinOf(p))); + .stream() + .filter(p -> p.validations.size() >= view.context().majority()) + .sorted(Comparator.comparing(p -> p.member.getId())) + .forEach(p -> slate.put(p.member(), joinOf(p))); if (slate.size() >= params().context().majority()) { cancelSlice.set(true); log.debug("Electing slate: {} of: {} on: {}", slate.size(), nextViewId, params().member()); transitions.complete(); } else { log.error("Failed election, required: {} slate: {} of: {} on: {}", params().context().majority() + 1, - proposals.values() - .stream() - .map(p -> String.format("%s:%s", p.member.getId(), p.validations.size())) - .toList(), - nextViewId, params().member()); + proposals.values() + .stream() + .map(p -> String.format("%s:%s", p.member.getId(), p.validations.size())) + .toList(), nextViewId, params().member()); } } @@ -361,25 +358,26 @@ public void gather() { AtomicReference reiterate = new AtomicReference<>(); AtomicReference retryDelay = new AtomicReference<>(Duration.ofMillis(10)); reiterate.set(() -> committee.iterate((term, m) -> { - if (proposals.containsKey(m.getId())) { - return null; - } - log.trace("Requesting Join from: {} on: {}", term.getMember().getId(), params().member().getId()); - return term.join(nextViewId); - }, (futureSailor, term, m) -> consider(futureSailor, term, m), () -> completeSlice(retryDelay, reiterate), - params().scheduler(), params().gossipDuration())); + if (proposals.containsKey(m.getId())) { + return null; + } + log.trace("Requesting Join from: {} on: {}", term.getMember().getId(), params().member().getId()); + return term.join(nextViewId); + }, (futureSailor, term, m) -> consider(futureSailor, term, m), () -> completeSlice(retryDelay, reiterate), + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), + params().gossipDuration())); reiterate.get().run(); } @Override public void nominate() { publisher.accept(Reassemble.newBuilder() - .addAllMembers(proposals.values().stream().map(p -> p.vm).toList()) - .addAllValidations(proposals.values() - .stream() - .flatMap(p -> p.validations.values().stream()) - .toList()) - .build()); + .addAllMembers(proposals.values().stream().map(p -> p.vm).toList()) + .addAllValidations(proposals.values() + .stream() + .flatMap(p -> p.validations.values().stream()) + .toList()) + .build()); transitions.nominated(); } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java index 5993ccb51..caa80f148 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java @@ -31,6 +31,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -39,20 +41,22 @@ * @author hal.hildebrand */ public class Bootstrapper { - private static final Logger log = LoggerFactory.getLogger(Bootstrapper.class); - private final HashedCertifiedBlock anchor; - private final CompletableFuture anchorSynchronized = new CompletableFuture<>(); - private final CommonCommunications comms; - private final ULong lastCheckpoint; - private final Parameters params; - private final Store store; - private final CompletableFuture sync = new CompletableFuture<>(); - private final CompletableFuture viewChainSynchronized = new CompletableFuture<>(); - private volatile HashedCertifiedBlock checkpoint; - private volatile CompletableFuture checkpointAssembled; - private volatile CheckpointState checkpointState; - private volatile HashedCertifiedBlock checkpointView; - private volatile HashedCertifiedBlock genesis; + private static final Logger log = LoggerFactory.getLogger( + Bootstrapper.class); + private final HashedCertifiedBlock anchor; + private final CompletableFuture anchorSynchronized = new CompletableFuture<>(); + private final CommonCommunications comms; + private final ULong lastCheckpoint; + private final Parameters params; + private final Store store; + private final CompletableFuture sync = new CompletableFuture<>(); + private final CompletableFuture viewChainSynchronized = new CompletableFuture<>(); + private final ScheduledExecutorService scheduler; + private volatile HashedCertifiedBlock checkpoint; + private volatile CompletableFuture checkpointAssembled; + private volatile CheckpointState checkpointState; + private volatile HashedCertifiedBlock checkpointView; + private volatile HashedCertifiedBlock genesis; public Bootstrapper(HashedCertifiedBlock anchor, Parameters params, Store store, CommonCommunications bootstrapComm) { @@ -70,6 +74,7 @@ public Bootstrapper(HashedCertifiedBlock anchor, Parameters params, Store store, log.info("Restore using no prior state on: {}", params.member().getId()); lastCheckpoint = null; } + scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); } public static Digest randomCut(DigestAlgorithm algo) { @@ -89,27 +94,26 @@ private void anchor(AtomicReference start, ULong end) { final var randomCut = randomCut(params.digestAlgorithm()); log.trace("Anchoring from: {} to: {} cut: {} on: {}", start.get(), end, randomCut, params.member().getId()); new RingIterator<>(params.gossipDuration(), params.context(), params.member(), comms, params.exec(), true, - params.scheduler()).iterate(randomCut, (link, ring) -> anchor(link, start, end), - (tally, futureSailor, - destination) -> completeAnchor(futureSailor, start, end, - destination), - t -> scheduleAnchorCompletion(start, end)); + scheduler).iterate(randomCut, (link, ring) -> anchor(link, start, end), + (tally, futureSailor, destination) -> completeAnchor(futureSailor, start, + end, destination), + t -> scheduleAnchorCompletion(start, end)); } private Blocks anchor(Terminal link, AtomicReference start, ULong end) { log.debug("Attempting Anchor completion ({} to {}) with: {} on: {}", start, end, link.getMember().getId(), - params.member().getId()); + params.member().getId()); long seed = Entropy.nextBitsStreamLong(); BloomFilter blocksBff = new BloomFilter.ULongBloomFilter(seed, params.bootstrap().maxViewBlocks() * 2, - params.combine().falsePositiveRate()); + params.combine().falsePositiveRate()); start.set(store.firstGap(start.get(), end)); store.blocksFrom(start.get(), end, params.bootstrap().maxSyncBlocks()).forEachRemaining(h -> blocksBff.add(h)); BlockReplication replication = BlockReplication.newBuilder() - .setBlocksBff(blocksBff.toBff()) - .setFrom(start.get().longValue()) - .setTo(end.longValue()) - .build(); + .setBlocksBff(blocksBff.toBff()) + .setFrom(start.get().longValue()) + .setTo(end.longValue()) + .build(); return link.fetchBlocks(replication); } @@ -120,38 +124,38 @@ private void checkpointCompletion(int threshold, Initial mostRecent) { checkpointView = new HashedCertifiedBlock(params.digestAlgorithm(), mostRecent.getCheckpointView()); store.put(checkpointView); assert !checkpointView.height() - .equals(Unsigned.ulong(0)) : "Should not attempt when bootstrapping from genesis"; + .equals(Unsigned.ulong(0)) : "Should not attempt when bootstrapping from genesis"; log.info("Assembling from checkpoint: {}:{} on: {}", checkpoint.height(), checkpoint.hash, - params.member().getId()); + params.member().getId()); CheckpointAssembler assembler = new CheckpointAssembler(params.gossipDuration(), checkpoint.height(), - checkpoint.block.getCheckpoint(), params.member(), - store, comms, params.context(), threshold, - params.digestAlgorithm()); + checkpoint.block.getCheckpoint(), params.member(), + store, comms, params.context(), threshold, + params.digestAlgorithm()); // assemble the checkpoint - checkpointAssembled = assembler.assemble(params.scheduler(), params.gossipDuration(), params.exec()) - .whenComplete((cps, t) -> { - log.info("Restored checkpoint: {} on: {}", checkpoint.height(), - params.member().getId()); - checkpointState = cps; - }); + checkpointAssembled = assembler.assemble(scheduler, params.gossipDuration(), params.exec()) + .whenComplete((cps, t) -> { + log.info("Restored checkpoint: {} on: {}", checkpoint.height(), + params.member().getId()); + checkpointState = cps; + }); // reconstruct chain to genesis mostRecent.getViewChainList() - .stream() - .filter(cb -> cb.getBlock().hasReconfigure()) - .map(cb -> new HashedCertifiedBlock(params.digestAlgorithm(), cb)) - .forEach(reconfigure -> { - store.put(reconfigure); - }); + .stream() + .filter(cb -> cb.getBlock().hasReconfigure()) + .map(cb -> new HashedCertifiedBlock(params.digestAlgorithm(), cb)) + .forEach(reconfigure -> { + store.put(reconfigure); + }); scheduleViewChainCompletion(new AtomicReference<>(checkpointView.height()), ULong.valueOf(0)); } - private boolean completeAnchor(Optional futureSailor, AtomicReference start, - ULong end, RingCommunications.Destination destination) { + private boolean completeAnchor(Optional futureSailor, AtomicReference start, ULong end, + RingCommunications.Destination destination) { if (sync.isDone() || anchorSynchronized.isDone()) { log.trace("Anchor synchronized isDone: {} anchor sync: {} on: {}", sync.isDone(), - anchorSynchronized.isDone(), params.member().getId()); + anchorSynchronized.isDone(), params.member().getId()); return false; } if (futureSailor.isEmpty()) { @@ -159,13 +163,13 @@ private boolean completeAnchor(Optional futureSailor, AtomicReference
    new HashedCertifiedBlock(params.digestAlgorithm(), cb)) - .peek(cb -> log.trace("Adding anchor completion: {} block[{}] from: {} on: {}", cb.height(), cb.hash, - destination.member().getId(), params.member().getId())) - .forEach(cb -> store.put(cb)); + .stream() + .map(cb -> new HashedCertifiedBlock(params.digestAlgorithm(), cb)) + .peek(cb -> log.trace("Adding anchor completion: {} block[{}] from: {} on: {}", cb.height(), cb.hash, + destination.member().getId(), params.member().getId())) + .forEach(cb -> store.put(cb)); if (store.firstGap(start.get(), end).equals(end)) { validateAnchor(); return false; @@ -174,20 +178,19 @@ private boolean completeAnchor(Optional futureSailor, AtomicReference
      start, ULong end) { - new RingIterator<>(params.gossipDuration(), params.context(), params.member(), params.scheduler(), comms, - params.exec()).iterate(randomCut(params.digestAlgorithm()), - (link, ring) -> completeViewChain(link, start, end), - (tally, result, destination) -> completeViewChain(result, - start, end, - destination), - t -> scheduleViewChainCompletion(start, end)); + new RingIterator<>(params.gossipDuration(), params.context(), params.member(), scheduler, comms, + params.exec()).iterate(randomCut(params.digestAlgorithm()), + (link, ring) -> completeViewChain(link, start, end), + (tally, result, destination) -> completeViewChain(result, start, end, + destination), + t -> scheduleViewChainCompletion(start, end)); } - private boolean completeViewChain(Optional futureSailor, AtomicReference start, - ULong end, RingCommunications.Destination destination) { + private boolean completeViewChain(Optional futureSailor, AtomicReference start, ULong end, + RingCommunications.Destination destination) { if (sync.isDone() || viewChainSynchronized.isDone()) { log.trace("View chain synchronized isDone: {} sync: {} on: {}", sync.isDone(), - viewChainSynchronized.isDone(), params.member().getId()); + viewChainSynchronized.isDone(), params.member().getId()); return false; } if (futureSailor.isEmpty()) { @@ -196,17 +199,17 @@ private boolean completeViewChain(Optional futureSailor, AtomicReference Blocks blocks = futureSailor.get(); log.debug("View chain completion reply ({} to {}) from: {} on: {}", start.get(), end, - destination.member().getId(), params.member().getId()); + destination.member().getId(), params.member().getId()); blocks.getBlocksList() - .stream() - .map(cb -> new HashedCertifiedBlock(params.digestAlgorithm(), cb)) - .peek(cb -> log.trace("Adding view completion: {} block[{}] from: {} on: {}", cb.height(), cb.hash, - destination.member().getId(), params.member().getId())) - .forEach(cb -> store.put(cb)); + .stream() + .map(cb -> new HashedCertifiedBlock(params.digestAlgorithm(), cb)) + .peek(cb -> log.trace("Adding view completion: {} block[{}] from: {} on: {}", cb.height(), cb.hash, + destination.member().getId(), params.member().getId())) + .forEach(cb -> store.put(cb)); if (store.completeFrom(start.get())) { validateViewChain(); log.debug("View chain complete ({} to {}) from: {} on: {}", start.get(), end, destination.member().getId(), - params.member().getId()); + params.member().getId()); return false; } return true; @@ -214,67 +217,64 @@ private boolean completeViewChain(Optional futureSailor, AtomicReference private Blocks completeViewChain(Terminal link, AtomicReference start, ULong end) { log.debug("Attempting view chain completion ({} to {}) with: {} on: {}", start.get(), end, - link.getMember().getId(), params.member().getId()); + link.getMember().getId(), params.member().getId()); long seed = Entropy.nextBitsStreamLong(); ULongBloomFilter blocksBff = new BloomFilter.ULongBloomFilter(seed, params.bootstrap().maxViewBlocks() * 2, - params.combine().falsePositiveRate()); + params.combine().falsePositiveRate()); start.set(store.lastViewChainFrom(start.get())); store.viewChainFrom(start.get(), end).forEachRemaining(h -> blocksBff.add(h)); BlockReplication replication = BlockReplication.newBuilder() - .setBlocksBff(blocksBff.toBff()) - .setFrom(start.get().longValue()) - .setTo(end.longValue()) - .build(); + .setBlocksBff(blocksBff.toBff()) + .setFrom(start.get().longValue()) + .setTo(end.longValue()) + .build(); return link.fetchViewChain(replication); } private void computeGenesis(Map votes) { log.info("Computing genesis with {} votes, required: {} on: {}", votes.size(), params.majority(), - params.member().getId()); + params.member().getId()); Multiset tally = TreeMultiset.create(); Map valid = votes.entrySet() - .stream() - .filter(e -> e.getValue().hasGenesis()) // Has a genesis - .filter(e -> genesis == null ? true : genesis.hash.equals(e.getKey())) // If - // restoring - // from - // known - // genesis... - .filter(e -> { - if (e.getValue().hasGenesis()) { - if (lastCheckpoint != null && - lastCheckpoint.compareTo(ULong.valueOf(0)) > 0) { - log.trace("Rejecting genesis: {} last checkpoint: {} > 0 on: {}", - e.getKey(), lastCheckpoint, params.member().getId()); - return false; - } - log.trace("Accepting genesis: {} on: {}", e.getKey(), - params.member().getId()); - return true; - } - if (!e.getValue().hasCheckpoint()) { - log.trace("Rejecting: {} has no checkpoint. last checkpoint: {} > 0 on: {}", - e.getKey(), lastCheckpoint, params.member().getId()); - return false; - } - - ULong checkpointViewHeight = HashedBlock.height(e.getValue() - .getCheckpointView() - .getBlock()); - ULong recordedCheckpointViewHeight = ULong.valueOf(e.getValue() - .getCheckpoint() - .getBlock() - .getHeader() - .getLastReconfig()); - // checkpoint's view should match - log.trace("Accepting checkpoint: {} on: {}", e.getKey(), - params.member().getId()); - return checkpointViewHeight.equals(recordedCheckpointViewHeight); - }) - .peek(e -> tally.add(new HashedCertifiedBlock(params.digestAlgorithm(), - e.getValue().getGenesis()))) - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + .stream() + .filter(e -> e.getValue().hasGenesis()) // Has a genesis + .filter(e -> genesis == null ? true : genesis.hash.equals(e.getKey())) // If + // restoring + // from + // known + // genesis... + .filter(e -> { + if (e.getValue().hasGenesis()) { + if (lastCheckpoint != null + && lastCheckpoint.compareTo(ULong.valueOf(0)) > 0) { + log.trace("Rejecting genesis: {} last checkpoint: {} > 0 on: {}", + e.getKey(), lastCheckpoint, params.member().getId()); + return false; + } + log.trace("Accepting genesis: {} on: {}", e.getKey(), + params.member().getId()); + return true; + } + if (!e.getValue().hasCheckpoint()) { + log.trace( + "Rejecting: {} has no checkpoint. last checkpoint: {} > 0 on: {}", + e.getKey(), lastCheckpoint, params.member().getId()); + return false; + } + + ULong checkpointViewHeight = HashedBlock.height( + e.getValue().getCheckpointView().getBlock()); + ULong recordedCheckpointViewHeight = ULong.valueOf( + e.getValue().getCheckpoint().getBlock().getHeader().getLastReconfig()); + // checkpoint's view should match + log.trace("Accepting checkpoint: {} on: {}", e.getKey(), + params.member().getId()); + return checkpointViewHeight.equals(recordedCheckpointViewHeight); + }) + .peek(e -> tally.add(new HashedCertifiedBlock(params.digestAlgorithm(), + e.getValue().getGenesis()))) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); int threshold = params.majority(); if (genesis == null) { @@ -302,24 +302,23 @@ private void computeGenesis(Map votes) { // get the most recent checkpoint. Initial mostRecent = valid.values() - .stream() - .filter(i -> i.hasGenesis()) - .filter(i -> genesis.hash.equals(new HashedCertifiedBlock(params.digestAlgorithm(), - i.getGenesis()).hash)) - .filter(i -> i.hasCheckpoint()) - .filter(i -> lastCheckpoint != null ? true - : lastCheckpoint != null - ? HashedBlock.height(i.getCheckpoint()) - .compareTo(lastCheckpoint) > 0 - : true) - .max((a, b) -> Long.compare(a.getCheckpoint().getBlock().getHeader().getHeight(), - b.getCheckpoint().getBlock().getHeader().getHeight())) - .orElse(null); + .stream() + .filter(i -> i.hasGenesis()) + .filter(i -> genesis.hash.equals( + new HashedCertifiedBlock(params.digestAlgorithm(), i.getGenesis()).hash)) + .filter(i -> i.hasCheckpoint()) + .filter(i -> lastCheckpoint != null ? true : lastCheckpoint != null ? + HashedBlock.height(i.getCheckpoint()) + .compareTo(lastCheckpoint) > 0 + : true) + .max((a, b) -> Long.compare(a.getCheckpoint().getBlock().getHeader().getHeight(), + b.getCheckpoint().getBlock().getHeader().getHeight())) + .orElse(null); store.put(genesis); ULong anchorTo; - boolean genesisBootstrap = mostRecent == null || - mostRecent.getCheckpointView().getBlock().getHeader().getHeight() == 0; + boolean genesisBootstrap = + mostRecent == null || mostRecent.getCheckpointView().getBlock().getHeader().getHeight() == 0; if (!genesisBootstrap) { checkpointCompletion(threshold, mostRecent); anchorTo = checkpoint.height(); @@ -331,27 +330,26 @@ private void computeGenesis(Map votes) { // Checkpoint must be assembled, view chain synchronized, and blocks spanning // the anchor block to the checkpoint must be filled - CompletableFuture completion = !genesisBootstrap ? CompletableFuture.allOf(checkpointAssembled, - viewChainSynchronized, - anchorSynchronized) - : CompletableFuture.allOf(anchorSynchronized); + CompletableFuture completion = + !genesisBootstrap ? CompletableFuture.allOf(checkpointAssembled, viewChainSynchronized, anchorSynchronized) + : CompletableFuture.allOf(anchorSynchronized); completion.whenComplete((v, t) -> { if (t == null) { log.info("Synchronized to: {} from: {} last view: {} on: {}", genesis.hash, - checkpoint == null ? genesis.hash : checkpoint.hash, - checkpointView == null ? genesis.hash : checkpoint.hash, params.member().getId()); + checkpoint == null ? genesis.hash : checkpoint.hash, + checkpointView == null ? genesis.hash : checkpoint.hash, params.member().getId()); sync.complete(new SynchronizedState(genesis, checkpointView, checkpoint, checkpointState)); } else { log.error("Failed synchronizing to {} from: {} last view: {} on: {}", genesis.hash, - checkpoint == null ? genesis.hash : checkpoint.hash, - checkpointView == null ? genesis.hash : checkpoint.hash, t); + checkpoint == null ? genesis.hash : checkpoint.hash, + checkpointView == null ? genesis.hash : checkpoint.hash, t); sync.completeExceptionally(t); } }).exceptionally(t -> { log.error("Failed synchronizing to {} from: {} last view: {} on: {}", genesis.hash, - checkpoint == null ? genesis.hash : checkpoint.hash, - checkpointView == null ? genesis.hash : checkpoint.hash, t); + checkpoint == null ? genesis.hash : checkpoint.hash, + checkpointView == null ? genesis.hash : checkpoint.hash, t); sync.completeExceptionally(t); return null; }); @@ -361,28 +359,28 @@ private void sample() { final HashedCertifiedBlock established = genesis; if (sync.isDone() || established != null) { log.trace("Synchronization isDone: {} genesis: {} on: {}", sync.isDone(), - established == null ? null : established.hash, params.member().getId()); + established == null ? null : established.hash, params.member().getId()); return; } HashMap votes = new HashMap<>(); Synchronize s = Synchronize.newBuilder().setHeight(anchor.height().longValue()).build(); final var randomCut = randomCut(params.digestAlgorithm()); new RingIterator<>(params.gossipDuration(), params.context(), params.member(), comms, params.exec(), true, - params.scheduler()).iterate(randomCut, (link, ring) -> synchronize(s, link), - (tally, futureSailor, - destination) -> synchronize(futureSailor, votes, destination), - t -> computeGenesis(votes)); + scheduler).iterate(randomCut, (link, ring) -> synchronize(s, link), + (tally, futureSailor, destination) -> synchronize(futureSailor, votes, + destination), + t -> computeGenesis(votes)); } private void scheduleAnchorCompletion(AtomicReference start, ULong anchorTo) { assert !start.get().equals(anchorTo) : "Should not schedule anchor completion on an empty interval: [" - + start.get() + ":" + anchorTo + "]"; + + start.get() + ":" + anchorTo + "]"; if (sync.isDone()) { return; } log.info("Scheduling Anchor completion ({} to {}) duration: {} on: {}", start, anchorTo, - params.gossipDuration(), params.member().getId()); - params.scheduler().schedule(() -> { + params.gossipDuration(), params.member().getId()); + scheduler.schedule(() -> { try { anchor(start, anchorTo); } catch (Throwable e) { @@ -397,32 +395,33 @@ private void scheduleSample() { return; } log.info("Scheduling state sample on: {}", params.member().getId()); - params.scheduler().schedule(() -> { + scheduler.schedule(() -> { final HashedCertifiedBlock established = genesis; if (sync.isDone() || established != null) { log.trace("Synchronization isDone: {} genesis: {} on: {}", sync.isDone(), - established == null ? null : established.hash, params.member().getId()); + established == null ? null : established.hash, params.member().getId()); return; } try { sample(); } catch (Throwable e) { log.error("Unable to sample sync state on: {}", params.member().getId(), e); - sync.completeExceptionally(e);e.printStackTrace(); + sync.completeExceptionally(e); + e.printStackTrace(); } }, params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS); } private void scheduleViewChainCompletion(AtomicReference start, ULong to) { assert !start.get().equals(to) : "Should not schedule view chain completion on an empty interval: [" - + start.get() + ":" + to + "]"; + + start.get() + ":" + to + "]"; if (sync.isDone()) { log.trace("View chain complete on: {}", params.member().getId()); return; } log.info("Scheduling view chain completion ({} to {}) duration: {} on: {}", start, to, params.gossipDuration(), - params.member().getId()); - params.scheduler().schedule(() -> { + params.member().getId()); + scheduler.schedule(() -> { try { completeViewChain(start, to); } catch (Throwable e) { @@ -437,13 +436,13 @@ private boolean synchronize(Optional futureSailor, HashMap futureSailor, HashMap assemblies = new HashMap<>(); - private Map communications = new HashMap<>(); - private CountDownLatch complete; - private Context context; - private List controllers = new ArrayList<>(); - private Map dataSources; - private List gossipers = new ArrayList<>(); - private List members; - private Digest nextViewId; + private static short CARDINALITY = 4; + private Map assemblies = new HashMap<>(); + private Map communications = new HashMap<>(); + private CountDownLatch complete; + private Context context; + private List controllers = new ArrayList<>(); + private Map dataSources; + private List gossipers = new ArrayList<>(); + private List members; + private Digest nextViewId; @AfterEach public void after() { @@ -72,16 +72,19 @@ public void after() { public void before() throws Exception { var entropy = SecureRandom.getInstance("SHA1PRNG"); - entropy.setSeed(new byte[]{6, 6, 6}); + entropy.setSeed(new byte[] { 6, 6, 6 }); var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy); - members = IntStream.range(0, CARDINALITY).mapToObj(i -> stereotomy.newIdentifier()).map(cpk -> new ControlledIdentifierMember(cpk)).map(e -> (SigningMember) e).toList(); + members = IntStream.range(0, CARDINALITY) + .mapToObj(i -> stereotomy.newIdentifier()) + .map(cpk -> new ControlledIdentifierMember(cpk)) + .map(e -> (SigningMember) e) + .toList(); final var prefix = UUID.randomUUID().toString(); members.forEach(m -> { - var com = new LocalServer(prefix, m, - Executors.newSingleThreadExecutor()).router(ServerConnectionCache.newBuilder(), - Executors.newFixedThreadPool(2)); + var com = new LocalServer(prefix, m, Executors.newSingleThreadExecutor()).router( + ServerConnectionCache.newBuilder(), Executors.newFixedThreadPool(2)); communications.put(m, com); }); context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getOrigin().prefix(2), members.size(), 0.1, 3); @@ -104,16 +107,16 @@ public void testIt() throws Exception { assemblies.values().forEach(assembly -> assembly.assembled()); controllers.forEach(e -> e.start()); communications.values().forEach(e -> e.start()); - gossipers.forEach(e -> e.start(gossipPeriod, Executors.newSingleThreadScheduledExecutor())); + gossipers.forEach(e -> e.start(gossipPeriod)); assertTrue(complete.await(60, TimeUnit.SECONDS), "Failed to reconfigure"); } private void buildAssemblies() { Parameters.Builder params = Parameters.newBuilder() - .setProducer(ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(10)) - .build()) - .setGossipDuration(Duration.ofMillis(10)); + .setProducer(ProducerParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(10)) + .build()) + .setGossipDuration(Duration.ofMillis(10)); Map servers = members.stream().collect(Collectors.toMap(m -> m, m -> mock(Concierge.class))); Map consensusPairs = new HashMap<>(); servers.forEach((m, s) -> { @@ -124,47 +127,49 @@ private void buildAssemblies() { @Override public ViewMember answer(InvocationOnMock invocation) throws Throwable { return ViewMember.newBuilder() - .setId(m.getId().toDigeste()) - .setConsensusKey(consensus) - .setSignature(((Signer) m).sign(consensus.toByteString()).toSig()) - .build(); + .setId(m.getId().toDigeste()) + .setConsensusKey(consensus) + .setSignature(((Signer) m).sign(consensus.toByteString()).toSig()) + .build(); } }); }); var comms = members.stream() - .collect(Collectors.toMap(m -> m, - m -> communications.get(m) - .create(m, context.getId(), servers.get(m), - servers.get(m) - .getClass() - .getCanonicalName(), - r -> { - Router router = communications.get(m); - return new TerminalServer(router.getClientIdentityProvider(), - null, r); - }, TerminalClient.getCreate(null), - Terminal.getLocalLoopback(m, - servers.get(m))))); + .collect(Collectors.toMap(m -> m, m -> communications.get(m) + .create(m, context.getId(), + servers.get(m), servers.get(m) + .getClass() + .getCanonicalName(), + r -> { + Router router = communications.get( + m); + return new TerminalServer( + router.getClientIdentityProvider(), + null, r); + }, + TerminalClient.getCreate(null), + Terminal.getLocalLoopback(m, + servers.get( + m))))); Map validators = consensusPairs.entrySet() - .stream() - .collect(Collectors.toMap(e -> e.getKey(), - e -> new Verifier.DefaultVerifier(e.getValue() - .getPublic()))); + .stream() + .collect(Collectors.toMap(e -> e.getKey(), + e -> new Verifier.DefaultVerifier( + e.getValue().getPublic()))); Map views = new HashMap<>(); context.active().forEach(m -> { SigningMember sm = (SigningMember) m; Router router = communications.get(m); - ViewContext view = new ViewContext(context, - params.build(RuntimeParameters.newBuilder() - .setExec(Executors.newFixedThreadPool(2)) - .setScheduler(Executors.newSingleThreadScheduledExecutor()) - .setContext(context) - .setMember(sm) - .setCommunications(router) - .build()), - new Signer.SignerImpl(consensusPairs.get(m).getPrivate()), validators, - null); + ViewContext view = new ViewContext(context, params.build(RuntimeParameters.newBuilder() + .setExec( + Executors.newFixedThreadPool(2)) + .setContext(context) + .setMember(sm) + .setCommunications(router) + .build()), + new Signer.SignerImpl(consensusPairs.get(m).getPrivate()), validators, + null); views.put(m, view); var ds = dataSources.get(m); var com = comms.get(m); @@ -180,8 +185,8 @@ public void complete() { private void initEthereals() { var builder = Config.newBuilder() - .setnProc(CARDINALITY) - .setVerifiers(members.toArray(new Verifier[members.size()])); + .setnProc(CARDINALITY) + .setVerifiers(members.toArray(new Verifier[members.size()])); for (short i = 0; i < CARDINALITY; i++) { final short pid = i; @@ -191,11 +196,11 @@ private void initEthereals() { assembly.inbound().accept(process(pb, last)); }; var controller = new Ethereal(builder.setSigner(members.get(i)).setPid(pid).build(), 1024 * 1024, - dataSources.get(member), blocker, ep -> { + dataSources.get(member), blocker, ep -> { }, Ethereal.consumer(Integer.toString(i))); var gossiper = new ChRbcGossip(context, member, controller.processor(), communications.get(member), - Executors.newFixedThreadPool(2), null); + Executors.newFixedThreadPool(2), null); gossipers.add(gossiper); controllers.add(controller); } diff --git a/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java b/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java index dc735604b..376fdfdba 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java @@ -103,7 +103,6 @@ public void smoke() throws Exception { .build(RuntimeParameters.newBuilder() .setContext(context) .setMember(member) - .setScheduler(Executors.newSingleThreadScheduledExecutor()) .build()), store, comms); diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java index ea4f4a825..aa2d7e285 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/memberships/ChRbcGossip.java @@ -31,32 +31,30 @@ import java.time.Duration; import java.util.NoSuchElementException; import java.util.Optional; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import static com.salesforce.apollo.ethereal.memberships.comm.GossiperClient.getCreate; /** - * Handles the gossip propigation of proposals, commits and preVotes from this - * node, as well as the notification of the adder of such from other nodes. + * Handles the gossip propigation of proposals, commits and preVotes from this node, as well as the notification of the + * adder of such from other nodes. * * @author hal.hildebrand */ public class ChRbcGossip { - private static final Logger log = LoggerFactory.getLogger(ChRbcGossip.class); - private final CommonCommunications comm; - private final Context context; - private final Executor exec; - private final SigningMember member; - private final EtherealMetrics metrics; - private final Processor processor; - private final RingCommunications ring; - private final AtomicBoolean started = new AtomicBoolean(); - private volatile ScheduledFuture scheduled; + private static final Logger log = LoggerFactory.getLogger( + ChRbcGossip.class); + private final CommonCommunications comm; + private final Context context; + private final Executor exec; + private final SigningMember member; + private final EtherealMetrics metrics; + private final Processor processor; + private final RingCommunications ring; + private final AtomicBoolean started = new AtomicBoolean(); + private volatile ScheduledFuture scheduled; public ChRbcGossip(Context context, SigningMember member, Processor processor, Router communications, Executor exec, EtherealMetrics m) { @@ -66,8 +64,8 @@ public ChRbcGossip(Context context, SigningMember member, Processor proc this.metrics = m; this.exec = exec; comm = communications.create((Member) member, context.getId(), new Terminal(), getClass().getCanonicalName(), - r -> new GossiperServer(communications.getClientIdentityProvider(), metrics, r), - getCreate(metrics), Gossiper.getLocalLoopback(member)); + r -> new GossiperServer(communications.getClientIdentityProvider(), metrics, r), + getCreate(metrics), Gossiper.getLocalLoopback(member)); ring = new RingCommunications<>(context, member, this.comm, exec); } @@ -78,13 +76,14 @@ public Context getContext() { /** * Start the receiver's gossip */ - public void start(Duration duration, ScheduledExecutorService scheduler) { + public void start(Duration duration) { if (!started.compareAndSet(false, true)) { return; } Duration initialDelay = duration.plusMillis(Entropy.nextBitsStreamLong(duration.toMillis())); log.trace("Starting GossipService[{}] on: {}", context.getId(), member.getId()); comm.register(context.getId(), new Terminal()); + var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); scheduler.schedule(() -> { try { oneRound(duration, scheduler); @@ -111,8 +110,7 @@ public void stop() { } /** - * Perform the first phase of the gossip. Send our partner the Have state of the - * receiver + * Perform the first phase of the gossip. Send our partner the Have state of the receiver */ private Update gossipRound(Gossiper link, int ring) { if (!started.get()) { @@ -123,11 +121,11 @@ private Update gossipRound(Gossiper link, int ring) { return link.gossip(processor.gossip(context.getId(), ring)); } catch (StatusRuntimeException e) { log.debug("gossiping[{}] failed with: {} with {} ring: {} on {}", context.getId(), e.getMessage(), - member.getId(), ring, link.getMember().getId(), member.getId(), e); + member.getId(), ring, link.getMember().getId(), member.getId(), e); return null; } catch (Throwable e) { log.warn("gossiping[{}] failed from {} with {} ring: {} on {}", context.getId(), member.getId(), ring, - link.getMember().getId(), ring, member.getId(), e); + link.getMember().getId(), ring, member.getId(), e); return null; } } @@ -163,17 +161,17 @@ private void handle(Optional result, RingCommunications.Destination oneRound(duration, scheduler), duration.toMillis(), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); } } } @@ -188,7 +186,7 @@ private void oneRound(Duration duration, ScheduledExecutorService scheduler) { exec.execute(Utils.wrapped(() -> { var timer = metrics == null ? null : metrics.gossipRoundDuration().time(); ring.execute((link, ring) -> gossipRound(link, ring), - (result, destination) -> handle(result, destination, duration, scheduler, timer)); + (result, destination) -> handle(result, destination, duration, scheduler, timer)); }, log)); } @@ -201,12 +199,12 @@ public Update gossip(Gossip request, Digest from) { Member predecessor = context.ring(request.getRing()).predecessor(member); if (predecessor == null || !from.equals(predecessor.getId())) { log.debug("Invalid inbound gossip on {}:{} from: {} on ring: {} - not predecessor: {}", context.getId(), - member, from, request.getRing(), predecessor.getId()); + member, from, request.getRing(), predecessor.getId()); return Update.getDefaultInstance(); } final var update = processor.gossip(request); log.trace("GossipService received from: {} missing: {} on: {}", from, update.getMissingCount(), - member.getId()); + member.getId()); return update; } @@ -215,7 +213,7 @@ public void update(ContextUpdate request, Digest from) { Member predecessor = context.ring(request.getRing()).predecessor(member); if (predecessor == null || !from.equals(predecessor.getId())) { log.debug("Invalid inbound update on {}:{} from: {} on ring: {} - not predecessor: {}", context.getId(), - member.getId(), from, request.getRing(), predecessor.getId()); + member.getId(), from, request.getRing(), predecessor.getId()); return; } log.trace("gossip update with {} on: {}", from, member); diff --git a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java index 4ab18c8d7..042513dc5 100644 --- a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java +++ b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java @@ -162,9 +162,7 @@ private void one(int iteration, List consumers) throws NoSuc controllers.forEach(e -> e.start()); comms.forEach(e -> e.start()); gossipers.forEach(e -> { - final var sched = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()); - executors.add(sched); - e.start(gossipPeriod, sched); + e.start(gossipPeriod); }); finished.await(LARGE_TESTS ? 90 : 10, TimeUnit.SECONDS); } finally { diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index 5dbccdca3..79843efe5 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -180,13 +180,14 @@ public void removeRoundListener(UUID registration) { roundListeners.remove(registration); } - public void start(Duration duration, ScheduledExecutorService scheduler) { + public void start(Duration duration) { if (!started.compareAndSet(false, true)) { return; } var initialDelay = Entropy.nextBitsStreamLong(duration.toMillis()); log.info("Starting Reliable Broadcaster[{}] for {}", context.getId(), member.getId()); comm.register(context.getId(), new Service()); + var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); scheduler.schedule(() -> oneRound(duration, scheduler), initialDelay, TimeUnit.MILLISECONDS); } diff --git a/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java b/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java index 9084ce58e..26959995f 100644 --- a/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/membership/messaging/RbcTest.java @@ -99,7 +99,7 @@ public void broadcast() throws Exception { }).collect(Collectors.toList()); System.out.println("Messaging with " + messengers.size() + " members"); - messengers.forEach(view -> view.start(Duration.ofMillis(10), Executors.newScheduledThreadPool(3))); + messengers.forEach(view -> view.start(Duration.ofMillis(10))); Map receivers = new HashMap<>(); AtomicInteger current = new AtomicInteger(-1); diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index 22952321e..b71da5f73 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -63,15 +63,12 @@ import static com.salesforce.apollo.crypto.QualifiedBase64.qb64; /** - * The logical domain of the current "Process" - OS and Simulation defined, - * 'natch. + * The logical domain of the current "Process" - OS and Simulation defined, 'natch. *

      - * The ProcessDomain represents a member node in the top level domain and - * represents the top level container model for the distributed system. This top - * level domain contains every sub domain as decendents. The membership of this - * domain is the entirety of all process members in the system. The Context of - * this domain is also the foundational fireflies membership domain of the - * entire system. + * The ProcessDomain represents a member node in the top level domain and represents the top level container model for + * the distributed system. This top level domain contains every sub domain as decendents. The membership of this domain + * is the entirety of all process members in the system. The Context of this domain is also the foundational fireflies + * membership domain of the entire system. * * @author hal.hildebrand */ @@ -81,20 +78,20 @@ public class ProcessDomain extends Domain { private final static Logger log = LoggerFactory.getLogger(ProcessDomain.class); - private final DomainSocketAddress bridge; - private final EventLoopGroup clientEventLoopGroup = getEventLoopGroup(); - private final Path communicationsDirectory; - private final EventLoopGroup contextEventLoopGroup = getEventLoopGroup(); - private final KerlDHT dht; - private final View foundation; - private final Map hostedDomains = new ConcurrentHashMap<>(); - private final UUID listener; - private final DomainSocketAddress outerContextEndpoint; - private final Server outerContextService; - private final Portal portal; - private final DomainSocketAddress portalEndpoint; - private final EventLoopGroup portalEventLoopGroup = getEventLoopGroup(); - private final Map routes = new HashMap<>(); + private final DomainSocketAddress bridge; + private final EventLoopGroup clientEventLoopGroup = getEventLoopGroup(); + private final Path communicationsDirectory; + private final EventLoopGroup contextEventLoopGroup = getEventLoopGroup(); + private final KerlDHT dht; + private final View foundation; + private final Map hostedDomains = new ConcurrentHashMap<>(); + private final UUID listener; + private final DomainSocketAddress outerContextEndpoint; + private final Server outerContextService; + private final Portal portal; + private final DomainSocketAddress portalEndpoint; + private final EventLoopGroup portalEventLoopGroup = getEventLoopGroup(); + private final Map routes = new HashMap<>(); private final IdentifierSpecification.Builder subDomainSpecification; public ProcessDomain(Digest group, ControlledIdentifierMember member, Builder builder, String dbURL, @@ -106,40 +103,39 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, Builder bu super(member, builder, dbURL, checkpointBaseDir, runtime, txnConfig); communicationsDirectory = commDirectory; var base = Context.newBuilder() - .setId(group) - .setCardinality(params.runtime().foundation().getFoundation().getMembershipCount()) - .build(); + .setId(group) + .setCardinality(params.runtime().foundation().getFoundation().getMembershipCount()) + .build(); this.foundation = new View(base, getMember(), endpoint, eventValidation, params.communications(), ff.build(), - DigestAlgorithm.DEFAULT, null, params.exec()); + DigestAlgorithm.DEFAULT, null, params.exec()); final var url = String.format("jdbc:h2:mem:%s-%s;DB_CLOSE_DELAY=-1", member.getId(), ""); JdbcConnectionPool connectionPool = JdbcConnectionPool.create(url, "", ""); connectionPool.setMaxConnections(10); dht = new KerlDHT(Duration.ofMillis(10), foundation.getContext(), member, connectionPool, - params.digestAlgorithm(), params.communications(), params.exec(), Duration.ofSeconds(1), - params.runtime().scheduler(), 0.00125, null); + params.digestAlgorithm(), params.communications(), params.exec(), Duration.ofSeconds(1), + 0.00125, null); listener = foundation.register(listener()); bridge = new DomainSocketAddress(communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile()); - portalEndpoint = new DomainSocketAddress(communicationsDirectory.resolve(UUID.randomUUID().toString()) - .toFile()); - portal = new Portal(member.getId(), - NettyServerBuilder.forAddress(portalEndpoint) - .protocolNegotiator(new DomainSocketNegotiator()) - .channelType(getServerDomainSocketChannelClass()) - .workerEventLoopGroup(portalEventLoopGroup) - .bossEventLoopGroup(portalEventLoopGroup) - .intercept(new DomainSocketServerInterceptor()), - s -> handler(portalEndpoint), bridge, runtime.getExec(), Duration.ofMillis(1), - s -> routes.get(s)); - outerContextEndpoint = new DomainSocketAddress(communicationsDirectory.resolve(UUID.randomUUID().toString()) - .toFile()); + portalEndpoint = new DomainSocketAddress( + communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile()); + portal = new Portal(member.getId(), NettyServerBuilder.forAddress(portalEndpoint) + .protocolNegotiator(new DomainSocketNegotiator()) + .channelType(getServerDomainSocketChannelClass()) + .workerEventLoopGroup(portalEventLoopGroup) + .bossEventLoopGroup(portalEventLoopGroup) + .intercept(new DomainSocketServerInterceptor()), + s -> handler(portalEndpoint), bridge, runtime.getExec(), Duration.ofMillis(1), + s -> routes.get(s)); + outerContextEndpoint = new DomainSocketAddress( + communicationsDirectory.resolve(UUID.randomUUID().toString()).toFile()); outerContextService = NettyServerBuilder.forAddress(outerContextEndpoint) - .protocolNegotiator(new DomainSocketNegotiator()) - .channelType(getServerDomainSocketChannelClass()) - .addService(new DemesneKERLServer(dht, null)) - .addService(outerContextService()) - .workerEventLoopGroup(contextEventLoopGroup) - .bossEventLoopGroup(contextEventLoopGroup) - .build(); + .protocolNegotiator(new DomainSocketNegotiator()) + .channelType(getServerDomainSocketChannelClass()) + .addService(new DemesneKERLServer(dht, null)) + .addService(outerContextService()) + .workerEventLoopGroup(contextEventLoopGroup) + .bossEventLoopGroup(contextEventLoopGroup) + .build(); this.subDomainSpecification = subDomainSpecification; } @@ -147,8 +143,7 @@ public View getFoundation() { return foundation; } - public CertificateWithPrivateKey provision(Duration duration, - SignatureAlgorithm signatureAlgorithm) { + public CertificateWithPrivateKey provision(Duration duration, SignatureAlgorithm signatureAlgorithm) { return member.getIdentifier().provision(Instant.now(), duration, signatureAlgorithm); } @@ -156,9 +151,9 @@ public SelfAddressingIdentifier spawn(DemesneParameters.Builder prototype) { final var witness = member.getIdentifier().newEphemeral().get(); final var cloned = prototype.clone(); var parameters = cloned.setCommDirectory(communicationsDirectory.toString()) - .setPortal(portalEndpoint.path()) - .setParent(outerContextEndpoint.path()) - .build(); + .setPortal(portalEndpoint.path()) + .setParent(outerContextEndpoint.path()) + .build(); var ctxId = Digest.from(parameters.getContext()); final AtomicBoolean added = new AtomicBoolean(); final var demesne = new JniBridge(parameters); @@ -177,12 +172,12 @@ public SelfAddressingIdentifier spawn(DemesneParameters.Builder prototype) { sigs.put(0, new Signer.SignerImpl(witness.getPrivate()).sign(incp.toKeyEvent_().toByteString())); var attached = new com.salesforce.apollo.stereotomy.event.AttachmentEvent.AttachmentImpl(sigs); var seal = Seal.EventSeal.construct(incp.getIdentifier(), incp.hash(dht.digestAlgorithm()), - incp.getSequenceNumber().longValue()); + incp.getSequenceNumber().longValue()); var builder = InteractionSpecification.newBuilder().addAllSeals(Collections.singletonList(seal)); KeyState_ ks = dht.append(AttachmentEvent.newBuilder() - .setCoordinates(incp.getCoordinates().toEventCoords()) - .setAttachment(attached.toAttachemente()) - .build()); + .setCoordinates(incp.getCoordinates().toEventCoords()) + .setAttachment(attached.toAttachemente()) + .build()); var coords = member.getIdentifier().seal(builder); demesne.commit(coords.toEventCoords()); demesne.start(); @@ -238,11 +233,11 @@ public void stop() { private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) - .eventLoopGroup(clientEventLoopGroup) - .channelType(channelType) - .keepAliveTime(1, TimeUnit.SECONDS) - .usePlaintext() - .build(); + .eventLoopGroup(clientEventLoopGroup) + .channelType(channelType) + .keepAliveTime(1, TimeUnit.SECONDS) + .usePlaintext() + .build(); } private ViewLifecycleListener listener() { @@ -271,7 +266,7 @@ public void viewChange(Context context, Digest id, List delegates; + private final MVMap delegates; @SuppressWarnings("unused") - private final Map delegations = new HashMap<>(); - private final double fpr; - private final Duration gossipInterval; - private final int maxTransfer; + private final Map delegations = new HashMap<>(); + private final double fpr; + private final Duration gossipInterval; + private final int maxTransfer; private final RingCommunications ring; - private final AtomicBoolean started = new AtomicBoolean(); - private final MVStore store; - private ScheduledFuture scheduled; + private final AtomicBoolean started = new AtomicBoolean(); + private final MVStore store; + private ScheduledFuture scheduled; public SubDomain(ControlledIdentifierMember member, Builder params, Path checkpointBaseDir, RuntimeParameters.Builder runtime, TransactionConfiguration txnConfig, int maxTransfer, @@ -82,11 +83,11 @@ public SubDomain(ControlledIdentifierMember member, Builder prm, String dbURL, P store = builder.build(); delegates = store.openMap(DELEGATES_MAP_TEMPLATE.formatted(identifier)); CommonCommunications comms = params.communications() - .create(member, params.context().getId(), delegation(), - "delegates", - r -> new DelegationServer((RoutingClientIdentity) params.communications() - .getClientIdentityProvider(), - r, null)); + .create(member, params.context().getId(), delegation(), + "delegates", r -> new DelegationServer( + (RoutingClientIdentity) params.communications() + .getClientIdentityProvider(), r, + null)); ring = new RingCommunications(params.context(), member, comms, params.exec()); this.gossipInterval = gossipInterval; @@ -95,7 +96,7 @@ public SubDomain(ControlledIdentifierMember member, Builder prm, String dbURL, P public SubDomain(ControlledIdentifierMember member, Builder params, String dbURL, RuntimeParameters.Builder runtime, TransactionConfiguration txnConfig, int maxTransfer, Duration gossipInterval, double fpr) { this(member, params, dbURL, tempDirOf(member.getIdentifier()), runtime, txnConfig, maxTransfer, gossipInterval, - fpr); + fpr); } @Override @@ -106,7 +107,8 @@ public void start() { super.start(); Duration initialDelay = gossipInterval.plusMillis(Entropy.nextBitsStreamLong(gossipInterval.toMillis())); log.trace("Starting SubDomain[{}:{}]", params.context().getId(), member.getId()); - params.runtime().scheduler().schedule(() -> oneRound(), initialDelay.toMillis(), TimeUnit.MILLISECONDS); + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()) + .schedule(() -> oneRound(), initialDelay.toMillis(), TimeUnit.MILLISECONDS); } @Override @@ -140,11 +142,11 @@ public void update(DelegationUpdate update, Digest from) { }; } - private DelegationUpdate gossipRound(Delegation link, Integer ring) { + private DelegationUpdate gossipRound(Delegation link, Integer ring) { return link.gossip(have()); } - private void handle(Optional< DelegationUpdate> result, + private void handle(Optional result, RingCommunications.Destination destination, Timer.Context timer) { if (!started.get() || destination.link() == null) { if (timer != null) { @@ -166,17 +168,16 @@ private void handle(Optional< DelegationUpdate> result, } log.trace("gossip update with {} on: {}", destination.member().getId(), member.getId()); destination.link() - .update(update(update, DelegationUpdate.newBuilder() - .setRing(destination.ring()) - .setHave(have())).build()); + .update(update(update, DelegationUpdate.newBuilder() + .setRing(destination.ring()) + .setHave(have())).build()); } finally { if (timer != null) { timer.stop(); } if (started.get()) { - scheduled = params.runtime() - .scheduler() - .schedule(() -> oneRound(), gossipInterval.toMillis(), TimeUnit.MILLISECONDS); + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()) + .schedule(() -> oneRound(), gossipInterval.toMillis(), TimeUnit.MILLISECONDS); } } } @@ -191,7 +192,7 @@ private void oneRound() { Timer.Context timer = null; try { ring.execute((link, ring) -> gossipRound(link, ring), - (result, destination) -> handle(result, destination, timer)); + (result, destination) -> handle(result, destination, timer)); } catch (Throwable e) { log.error("Error in delegation gossip in SubDomain[{}:{}]", params.context().getId(), member.getId(), e); } @@ -201,10 +202,10 @@ private DelegationUpdate.Builder update(DelegationUpdate update, DelegationUpdat update.getUpdateList().forEach(sd -> delegates.putIfAbsent(sd.getDelegate().getDelegate(), sd)); BloomFilter bff = BloomFilter.from(update.getHave()); delegates.entrySet() - .stream() - .filter(e -> !bff.contains(Digest.from(e.getKey()))) - .limit(maxTransfer) - .forEach(e -> builder.addUpdate(e.getValue())); + .stream() + .filter(e -> !bff.contains(Digest.from(e.getKey()))) + .limit(maxTransfer) + .forEach(e -> builder.addUpdate(e.getValue())); return builder; } } diff --git a/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java b/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java index 4219792e5..a2a2bdeab 100644 --- a/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java +++ b/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java @@ -248,9 +248,6 @@ private RuntimeParameters.Builder runtimeParameters(DemesneParameters parameters return RuntimeParameters.newBuilder() .setCommunications(current.router(exec)) .setExec(exec) - .setScheduler(Executors.newScheduledThreadPool(parameters.getVirtualThreads() == 0 ? DEFAULT_VIRTUAL_THREADS - : parameters.getVirtualThreads(), - Thread.ofVirtual().factory())) .setKerl(() -> { return member.kerl(); }) diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index 51f8502dd..3c575866a 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -227,9 +227,6 @@ public void before() throws Exception { var domain = new ProcessDomain(group, member, params, "jdbc:h2:mem:", checkpointDirBase, RuntimeParameters.newBuilder() .setFoundation(sealed) - .setScheduler(Executors.newScheduledThreadPool(5, - Thread.ofVirtual() - .factory())) .setContext(context) .setExec(exec) .setCommunications(localRouter), diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java index f82dda9bc..a693b8a23 100644 --- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java @@ -106,9 +106,6 @@ public void before() throws Exception { var node = new ProcessDomain(group, member, params, "jdbc:h2:mem:", checkpointDirBase, RuntimeParameters.newBuilder() .setFoundation(sealed) - .setScheduler(Executors.newScheduledThreadPool(5, - Thread.ofVirtual() - .factory())) .setContext(context) .setExec(exec) .setCommunications(localRouter), diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java index dedb41a44..ee4e2b273 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/AbstractLifecycleTest.java @@ -335,8 +335,6 @@ private CHOAM createChoam(Random entropy, Builder params, SigningMember m, boole .setContext(context) .setExec(exec) .setGenesisData(view -> GENESIS_DATA) - .setScheduler(Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual() - .factory())) .setMember(m) .setCommunications(routers.get(m.getId())) .setCheckpointer(wrap(up)) diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java index d8d2e26ad..7cd258ce7 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java @@ -295,7 +295,6 @@ private CHOAM createCHOAM(Random entropy, Builder params, SigningMember m, Conte return new CHOAM(params.build(RuntimeParameters.newBuilder() .setContext(context) .setGenesisData(view -> GENESIS_DATA) - .setScheduler(scheduler) .setMember(m) .setCommunications(routers.get(m.getId())) .setExec(exec) diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index 4f7c53473..2cf9885fe 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -88,35 +88,40 @@ * @author hal.hildebrand */ public class KerlDHT implements ProtoKERLService { - private final static Logger log = LoggerFactory.getLogger(KerlDHT.class); - private final Ani ani; - private final CachingKERL cache; - private final JdbcConnectionPool connectionPool; - private final Context context; - private final CommonCommunications dhtComms; - private final Executor executor; - private final double fpr; - private final Duration frequency; - private final CachingKERL kerl; - private final UniKERLDirectPooled kerlPool; - private final KerlSpace kerlSpace; - private final SigningMember member; - private final RingCommunications reconcile; - private final CommonCommunications reconcileComms; - private final Reconcile reconciliation = new Reconcile(); - private final ScheduledExecutorService scheduler; - private final Service service = new Service(); - private final AtomicBoolean started = new AtomicBoolean(); - private final TemporalAmount timeout; - - public KerlDHT(Duration frequency, Context context, SigningMember member, BiFunction wrap, JdbcConnectionPool connectionPool, DigestAlgorithm digestAlgorithm, Router communications, Executor executor, TemporalAmount timeout, ScheduledExecutorService scheduler, double falsePositiveRate, StereotomyMetrics metrics) { - @SuppressWarnings("unchecked") final var casting = (Context) context; + private final static Logger log = LoggerFactory.getLogger( + KerlDHT.class); + private final Ani ani; + private final CachingKERL cache; + private final JdbcConnectionPool connectionPool; + private final Context context; + private final CommonCommunications dhtComms; + private final Executor executor; + private final double fpr; + private final Duration frequency; + private final CachingKERL kerl; + private final UniKERLDirectPooled kerlPool; + private final KerlSpace kerlSpace; + private final SigningMember member; + private final RingCommunications reconcile; + private final CommonCommunications reconcileComms; + private final Reconcile reconciliation = new Reconcile(); + private final ScheduledExecutorService scheduler; + private final Service service = new Service(); + private final AtomicBoolean started = new AtomicBoolean(); + private final TemporalAmount timeout; + + public KerlDHT(Duration frequency, Context context, SigningMember member, + BiFunction wrap, JdbcConnectionPool connectionPool, + DigestAlgorithm digestAlgorithm, Router communications, Executor executor, TemporalAmount timeout, + double falsePositiveRate, StereotomyMetrics metrics) { + @SuppressWarnings("unchecked") + final var casting = (Context) context; this.context = casting; this.member = member; this.timeout = timeout; this.fpr = falsePositiveRate; this.frequency = frequency; - this.scheduler = scheduler; + this.scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); this.cache = new CachingKERL(f -> { try { return f.apply(new KERLAdapter(this, digestAlgorithm())); @@ -125,8 +130,16 @@ public KerlDHT(Duration frequency, Context context, SigningMem return null; } }); - dhtComms = communications.create(member, context.getId(), service, service.getClass().getCanonicalName(), r -> new DhtServer(r, metrics), DhtClient.getCreate(metrics), DhtClient.getLocalLoopback(service, member)); - reconcileComms = communications.create(member, context.getId(), reconciliation, reconciliation.getClass().getCanonicalName(), r -> new ReconciliationServer(r, communications.getClientIdentityProvider(), metrics), ReconciliationClient.getCreate(context.getId(), metrics), ReconciliationClient.getLocalLoopback(reconciliation, member)); + dhtComms = communications.create(member, context.getId(), service, service.getClass().getCanonicalName(), + r -> new DhtServer(r, metrics), DhtClient.getCreate(metrics), + DhtClient.getLocalLoopback(service, member)); + reconcileComms = communications.create(member, context.getId(), reconciliation, + reconciliation.getClass().getCanonicalName(), + r -> new ReconciliationServer(r, + communications.getClientIdentityProvider(), + metrics), + ReconciliationClient.getCreate(context.getId(), metrics), + ReconciliationClient.getLocalLoopback(reconciliation, member)); this.connectionPool = connectionPool; kerlPool = new UniKERLDirectPooled(connectionPool, digestAlgorithm); this.executor = executor; @@ -145,8 +158,11 @@ public KerlDHT(Duration frequency, Context context, SigningMem this.ani = new Ani(member.getId(), asKERL()); } - public KerlDHT(Duration frequency, Context context, SigningMember member, JdbcConnectionPool connectionPool, DigestAlgorithm digestAlgorithm, Router communications, Executor executor, TemporalAmount timeout, ScheduledExecutorService scheduler, double falsePositiveRate, StereotomyMetrics metrics) { - this(frequency, context, member, (t, k) -> k, connectionPool, digestAlgorithm, communications, executor, timeout, scheduler, falsePositiveRate, metrics); + public KerlDHT(Duration frequency, Context context, SigningMember member, + JdbcConnectionPool connectionPool, DigestAlgorithm digestAlgorithm, Router communications, + Executor executor, TemporalAmount timeout, double falsePositiveRate, StereotomyMetrics metrics) { + this(frequency, context, member, (t, k) -> k, connectionPool, digestAlgorithm, communications, executor, + timeout, falsePositiveRate, metrics); } public static void updateLocationHash(Identifier identifier, DigestAlgorithm digestAlgorithm, DSLContext dsl) { @@ -160,7 +176,11 @@ public static void updateLocationHash(Identifier identifier, DigestAlgorithm dig } var hashed = digestAlgorithm.digest(identBytes); - context.insertInto(IDENTIFIER_LOCATION_HASH, IDENTIFIER_LOCATION_HASH.IDENTIFIER, IDENTIFIER_LOCATION_HASH.DIGEST).values(id.value1(), hashed.getBytes()).onDuplicateKeyIgnore().execute(); + context.insertInto(IDENTIFIER_LOCATION_HASH, IDENTIFIER_LOCATION_HASH.IDENTIFIER, + IDENTIFIER_LOCATION_HASH.DIGEST) + .values(id.value1(), hashed.getBytes()) + .onDuplicateKeyIgnore() + .execute(); }); } @@ -188,7 +208,19 @@ public KeyState_ append(AttachmentEvent event) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates().iterate(identifier, null, (link, r) -> link.append(Collections.emptyList(), Collections.singletonList(event)), null, (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, tally, destination, "append events"), t -> completeIt(result, gathered)); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.append( + Collections.emptyList(), + Collections.singletonList( + event)), null, + (tally, futureSailor, destination) -> mutate( + gathered, futureSailor, + identifier, isTimedOut, + tally, destination, + "append events"), + t -> completeIt(result, + gathered)); try { List s = result.get().getKeyStatesList(); return s.isEmpty() ? null : s.getFirst(); @@ -214,7 +246,17 @@ public List append(KERL_ kerl) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates().iterate(identifier, null, (link, r) -> link.append(kerl), null, (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, tally, destination, "append kerl"), t -> completeIt(result, gathered)); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.append( + kerl), null, + (tally, futureSailor, destination) -> mutate( + gathered, futureSailor, + identifier, isTimedOut, + tally, destination, + "append kerl"), + t -> completeIt(result, + gathered)); try { return result.get().getKeyStatesList(); } catch (InterruptedException e) { @@ -234,7 +276,18 @@ public KeyState_ append(KeyEvent_ event) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates().iterate(identifier, null, (link, r) -> link.append(Collections.singletonList(event)), null, (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, tally, destination, "append events"), t -> completeIt(result, gathered)); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.append( + Collections.singletonList( + event)), null, + (tally, futureSailor, destination) -> mutate( + gathered, futureSailor, + identifier, isTimedOut, + tally, destination, + "append events"), + t -> completeIt(result, + gathered)); try { var ks = result.get(); return ks.getKeyStatesCount() == 0 ? KeyState_.getDefaultInstance() : ks.getKeyStatesList().get(0); @@ -286,7 +339,17 @@ public Empty appendAttachments(List events) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates().iterate(identifier, null, (link, r) -> link.appendAttachments(events), null, (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, tally, destination, "append attachments"), t -> completeIt(result, gathered)); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.appendAttachments( + events), null, + (tally, futureSailor, destination) -> mutate( + gathered, futureSailor, + identifier, isTimedOut, + tally, destination, + "append attachments"), + t -> completeIt(result, + gathered)); return Empty.getDefaultInstance(); } @@ -303,7 +366,17 @@ public Empty appendValidations(Validations validations) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates().iterate(identifier, null, (link, r) -> link.appendValidations(validations), null, (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, tally, destination, "append validations"), t -> completeIt(result, gathered)); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.appendValidations( + validations), null, + (tally, futureSailor, destination) -> mutate( + gathered, futureSailor, + identifier, isTimedOut, + tally, destination, + "append validations"), + t -> completeIt(result, + gathered)); try { return result.get(); } catch (InterruptedException e) { @@ -339,13 +412,22 @@ public Attachment getAttachment(EventCoords coordinates) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor) - .noDuplicates() - .iterate(identifier, null, - (link, r) -> link.getAttachment(coordinates), - () -> failedMajority(result, maxCount(gathered)), - (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, identifier, isTimedOut, destination, "get attachment", Attachment.getDefaultInstance()), - t -> failedMajority(result, maxCount(gathered))); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.getAttachment( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, identifier, + isTimedOut, destination, + "get attachment", + Attachment.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -369,7 +451,22 @@ public KERL_ getKERL(Ident identifier) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates().iterate(digest, null, (link, r) -> link.getKERL(identifier), () -> failedMajority(result, maxCount(gathered)), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, isTimedOut, destination, "get kerl", KERL_.getDefaultInstance()), t -> failedMajority(result, maxCount(gathered))); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates() + .iterate(digest, null, + (link, r) -> link.getKERL( + identifier), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get kerl", + KERL_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -394,7 +491,22 @@ public KeyEvent_ getKeyEvent(EventCoords coordinates) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates().iterate(digest, null, (link, r) -> link.getKeyEvent(coordinates), () -> failedMajority(result, maxCount(gathered)), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, isTimedOut, destination, "get key event", KeyEvent_.getDefaultInstance()), t -> failedMajority(result, maxCount(gathered))); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates() + .iterate(digest, null, + (link, r) -> link.getKeyEvent( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get key event", + KeyEvent_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -419,7 +531,22 @@ public KeyState_ getKeyState(EventCoords coordinates) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates().iterate(digest, null, (link, r) -> link.getKeyState(coordinates), () -> failedMajority(result, maxCount(gathered)), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, isTimedOut, destination, "get key state for coordinates", KeyState_.getDefaultInstance()), t -> failedMajority(result, maxCount(gathered))); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates() + .iterate(digest, null, + (link, r) -> link.getKeyState( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get key state for coordinates", + KeyState_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -444,7 +571,21 @@ public KeyState_ getKeyState(Ident identifier) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).iterate(digest, null, (link, r) -> link.getKeyState(identifier), () -> failedMajority(result, maxCount(gathered)), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, isTimedOut, destination, "get current key state", KeyState_.getDefaultInstance()), t -> failedMajority(result, maxCount(gathered))); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).iterate(digest, null, + (link, r) -> link.getKeyState( + identifier), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get current key state", + KeyState_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -469,7 +610,21 @@ public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinat Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).iterate(digest, null, (link, r) -> link.getKeyStateWithAttachments(coordinates), () -> failedMajority(result, maxCount(gathered)), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, isTimedOut, destination, "get key state with attachments", KeyStateWithAttachments_.getDefaultInstance()), t -> failedMajority(result, maxCount(gathered))); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).iterate(digest, null, + (link, r) -> link.getKeyStateWithAttachments( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get key state with attachments", + KeyStateWithAttachments_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -482,7 +637,8 @@ public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinat @Override public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndValidations(EventCoords coordinates) { - log.info("Get key state with endorsements and validations: {} on: {}", EventCoordinates.from(coordinates), member.getId()); + log.info("Get key state with endorsements and validations: {} on: {}", EventCoordinates.from(coordinates), + member.getId()); if (coordinates == null) { return completeIt(KeyStateWithEndorsementsAndValidations_.getDefaultInstance()); } @@ -494,7 +650,21 @@ public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndVal Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).iterate(digest, null, (link, r) -> link.getKeyStateWithEndorsementsAndValidations(coordinates), () -> failedMajority(result, maxCount(gathered)), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, digest, isTimedOut, destination, "get key state with endorsements", KeyStateWithEndorsementsAndValidations_.getDefaultInstance()), t -> failedMajority(result, maxCount(gathered))); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).iterate(digest, null, + (link, r) -> link.getKeyStateWithEndorsementsAndValidations( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get key state with endorsements", + KeyStateWithEndorsementsAndValidations_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -519,7 +689,21 @@ public Validations getValidations(EventCoords coordinates) { Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).iterate(identifier, null, (link, r) -> link.getValidations(coordinates), () -> failedMajority(result, maxCount(gathered)), (tally, futureSailor, destination) -> read(result, gathered, tally, futureSailor, identifier, isTimedOut, destination, "get validations", Validations.getDefaultInstance()), t -> failedMajority(result, maxCount(gathered))); + new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).iterate(identifier, null, + (link, r) -> link.getValidations( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, identifier, + isTimedOut, destination, + "get validations", + Validations.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -539,7 +723,7 @@ public int maxCount(HashMultiset gathered) { return max.isEmpty() ? 0 : max.get().getCount(); } - public void start(ScheduledExecutorService scheduler, Duration duration) { + public void start(Duration duration) { if (!started.compareAndSet(false, true)) { return; } @@ -566,7 +750,10 @@ private T complete(Function func) { } private void completeIt(CompletableFuture result, HashMultiset gathered) { - var max = gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)).orElse(null); + var max = gathered.entrySet() + .stream() + .max(Ordering.natural().onResultOf(Multiset.Entry::getCount)) + .orElse(null); if (max != null) { if (max.getCount() >= context.majority()) { try { @@ -577,20 +764,26 @@ private void completeIt(CompletableFuture result, HashMultiset gathere return; } } - result.completeExceptionally(new CompletionException("Unable to achieve majority, max: " + (max == null ? 0 : max.getCount()) + " required: " + context.majority() + " on: " + member.getId())); + result.completeExceptionally(new CompletionException( + "Unable to achieve majority, max: " + (max == null ? 0 : max.getCount()) + " required: " + context.majority() + + " on: " + member.getId())); } private boolean failedMajority(CompletableFuture result, int maxAgree) { - return result.completeExceptionally(new CompletionException("Unable to achieve majority read, max: " + maxAgree + " required: " + context.majority() + " on: " + member.getId())); + return result.completeExceptionally(new CompletionException( + "Unable to achieve majority read, max: " + maxAgree + " required: " + context.majority() + " on: " + + member.getId())); } private void initializeSchema() { ConsoleUIService service = (ConsoleUIService) Scope.getCurrentScope().get(Attr.ui, UIService.class); - service.setOutputStream(new PrintStream(new LoggingOutputStream(LoggerFactory.getLogger("liquibase"), LogLevel.INFO))); + service.setOutputStream( + new PrintStream(new LoggingOutputStream(LoggerFactory.getLogger("liquibase"), LogLevel.INFO))); var database = new H2Database(); try (var connection = connectionPool.getConnection()) { database.setConnection(new liquibase.database.jvm.JdbcConnection(connection)); - try (Liquibase liquibase = new Liquibase("/initialize-thoth.xml", new ClassLoaderResourceAccessor(), database)) { + try (Liquibase liquibase = new Liquibase("/initialize-thoth.xml", new ClassLoaderResourceAccessor(), + database)) { liquibase.update((String) null); } catch (LiquibaseException e) { throw new IllegalStateException(e); @@ -622,7 +815,9 @@ private CombinedIntervals keyIntervals() { return new CombinedIntervals(intervals); } - private boolean mutate(HashMultiset gathered, Optional futureSailor, Digest identifier, Supplier isTimedOut, AtomicInteger tally, RingCommunications.Destination destination, String action) { + private boolean mutate(HashMultiset gathered, Optional futureSailor, Digest identifier, + Supplier isTimedOut, AtomicInteger tally, + RingCommunications.Destination destination, String action) { if (futureSailor.isEmpty()) { return !isTimedOut.get(); } @@ -630,18 +825,24 @@ private boolean mutate(HashMultiset gathered, Optional futureSailor, D if (content != null) { log.trace("{}: {} from: {} on: {}", action, identifier, destination.member().getId(), member.getId()); gathered.add(content); - var max = gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)).orElse(null); + var max = gathered.entrySet() + .stream() + .max(Ordering.natural().onResultOf(Multiset.Entry::getCount)) + .orElse(null); if (max != null) { tally.set(max.getCount()); } return !isTimedOut.get(); } else { - log.debug("Failed {}: {} from: {} on: {}", action, identifier, destination.member().getId(), member.getId()); + log.debug("Failed {}: {} from: {} on: {}", action, identifier, destination.member().getId(), + member.getId()); return !isTimedOut.get(); } } - private boolean read(CompletableFuture result, HashMultiset gathered, AtomicInteger tally, Optional futureSailor, Digest identifier, Supplier isTimedOut, RingCommunications.Destination destination, String action, T empty) { + private boolean read(CompletableFuture result, HashMultiset gathered, AtomicInteger tally, + Optional futureSailor, Digest identifier, Supplier isTimedOut, + RingCommunications.Destination destination, String action, T empty) { if (futureSailor.isEmpty()) { return !isTimedOut.get(); } @@ -654,28 +855,34 @@ private boolean read(CompletableFuture result, HashMultiset gathered, tally.set(max.getCount()); if (max.getCount() > context.toleranceLevel()) { result.complete(max.getElement()); - log.debug("Majority: {} achieved: {}: {} on: {}", max.getCount(), action, identifier, member.getId()); + log.debug("Majority: {} achieved: {}: {} on: {}", max.getCount(), action, identifier, + member.getId()); return false; } } return !isTimedOut.get(); } else { - log.debug("Failed {}: {} from: {} on: {}", action, identifier, destination.member().getId(), member.getId()); + log.debug("Failed {}: {} from: {} on: {}", action, identifier, destination.member().getId(), + member.getId()); return !isTimedOut.get(); } } - private void reconcile(Optional result, RingCommunications.Destination destination, ScheduledExecutorService scheduler, Duration duration) { + private void reconcile(Optional result, + RingCommunications.Destination destination, + ScheduledExecutorService scheduler, Duration duration) { if (!started.get()) { return; } if (!result.isEmpty()) { try { Update update = result.get(); - log.trace("Received: {} events in interval reconciliation from: {} on: {}", update.getEventsCount(), destination.member().getId(), member.getId()); + log.trace("Received: {} events in interval reconciliation from: {} on: {}", update.getEventsCount(), + destination.member().getId(), member.getId()); kerlSpace.update(update.getEventsList(), kerl); } catch (NoSuchElementException e) { - log.debug("null interval reconciliation with {} : {} on: {}", destination.member().getId(), member.getId(), e.getCause()); + log.debug("null interval reconciliation with {} : {} on: {}", destination.member().getId(), + member.getId(), e.getCause()); } } if (started.get()) { @@ -685,15 +892,21 @@ private void reconcile(Optional result, RingCommunications.Destination reconcile(link, ring), (futureSailor, destination) -> reconcile(futureSailor, destination, scheduler, duration)); + reconcile.execute((link, ring) -> reconcile(link, ring), + (futureSailor, destination) -> reconcile(futureSailor, destination, scheduler, duration)); } @@ -703,7 +916,8 @@ private void updateLocationHash(Identifier identifier) { updateLocationHash(identifier, kerl.getDigestAlgorithm(), dsl); } catch (SQLException e) { log.error("Cannot update location hash for: {} on: {}", identifier, member.getId()); - throw new IllegalStateException("Cannot update location hash S for: %s on: %s".formatted(identifier, member.getId())); + throw new IllegalStateException( + "Cannot update location hash S for: %s on: %s".formatted(identifier, member.getId())); } } @@ -748,7 +962,8 @@ public List append(KeyEvent... events) { } @Override - public List append(List events, List attachments) { + public List append(List events, + List attachments) { List lks = super.append(events, attachments); if (lks.size() > 0) { updateLocationHash(lks.get(0).getCoordinates().getIdentifier()); @@ -778,7 +993,8 @@ public Update reconcile(Intervals intervals, Digest from) { try (var k = kerlPool.create()) { final var builder = KerlDHT.this.kerlSpace.reconcile(intervals, k); CombinedIntervals keyIntervals = keyIntervals(); - builder.addAllIntervals(keyIntervals.toIntervals()).setHave(kerlSpace.populate(Entropy.nextBitsStreamLong(), keyIntervals, fpr)); + builder.addAllIntervals(keyIntervals.toIntervals()) + .setHave(kerlSpace.populate(Entropy.nextBitsStreamLong(), keyIntervals, fpr)); return builder.build(); } catch (IOException | SQLException e) { throw new IllegalStateException("Cannot acquire KERL", e); @@ -873,14 +1089,22 @@ public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coords) { } @Override - public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndValidations(EventCoords coordinates) { + public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndValidations( + EventCoords coordinates) { log.trace("get key state with endorsements and attachments for coordinates on: {}", member.getId()); return complete(k -> { final var fs = new CompletableFuture(); KeyStateWithAttachments_ ksa = k.getKeyStateWithAttachments(coordinates); var validations = complete(ks -> ks.getValidations(coordinates)); - return ksa == null ? KeyStateWithEndorsementsAndValidations_.getDefaultInstance() : KeyStateWithEndorsementsAndValidations_.newBuilder().setState(ksa.getState()).putAllEndorsements(ksa.getAttachment().getEndorsementsMap()).addAllValidations(validations.getValidationsList()).build(); + return ksa == null ? KeyStateWithEndorsementsAndValidations_.getDefaultInstance() + : KeyStateWithEndorsementsAndValidations_.newBuilder() + .setState(ksa.getState()) + .putAllEndorsements( + ksa.getAttachment().getEndorsementsMap()) + .addAllValidations( + validations.getValidationsList()) + .build(); }); } } diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java index a960076fc..4f07713af 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java @@ -140,8 +140,7 @@ protected void instantiate(SigningMember member, Context context, routers.put(member, router); dhts.put(member, new KerlDHT(Duration.ofMillis(5), context, member, wrap(), connectionPool, DigestAlgorithm.DEFAULT, - router, exec, Duration.ofSeconds(10), - Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()), 0.0125, null)); + router, exec, Duration.ofSeconds(10), 0.0125, null)); } protected BiFunction wrap() { diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/AniTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/AniTest.java index 1549732bf..2834e31b6 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/AniTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/AniTest.java @@ -33,7 +33,7 @@ public void smokin() throws Exception { routers.values().forEach(lr -> lr.start()); dhts.values() - .forEach(e -> e.start(Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().factory()), + .forEach(e -> e.start( Duration.ofSeconds(1))); var dht = dhts.values().stream().findFirst().get(); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java index fe7717d60..934b7624f 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java @@ -56,8 +56,7 @@ public void beforeIt() { public void smokin() throws Exception { routers.values().forEach(r -> r.start()); dhts.values() - .forEach(dht -> dht.start(Executors.newScheduledThreadPool(getCardinality(), Thread.ofVirtual().factory()), - LARGE_TESTS ? Duration.ofSeconds(100) : Duration.ofMillis(10))); + .forEach(dht -> dht.start(LARGE_TESTS ? Duration.ofSeconds(100) : Duration.ofMillis(10))); identities.entrySet() .forEach(e -> dhts.get(e.getKey()).asKERL().append(e.getValue().getLastEstablishingEvent())); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java index a8068c5f2..f3e16e494 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/DhtRebalanceTest.java @@ -6,16 +6,6 @@ */ package com.salesforce.apollo.thoth; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.security.SecureRandom; -import java.time.Duration; -import java.util.List; -import java.util.concurrent.Executors; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - import com.salesforce.apollo.crypto.DigestAlgorithm; import com.salesforce.apollo.stereotomy.EventCoordinates; import com.salesforce.apollo.stereotomy.KERL; @@ -27,10 +17,17 @@ import com.salesforce.apollo.stereotomy.identifier.spec.InteractionSpecification; import com.salesforce.apollo.stereotomy.identifier.spec.RotationSpecification; import com.salesforce.apollo.stereotomy.mem.MemKeyStore; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.security.SecureRandom; +import java.time.Duration; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; /** * @author hal.hildebrand - * */ public class DhtRebalanceTest extends AbstractDhtTest { private SecureRandom secureRandom; @@ -44,23 +41,21 @@ public void beforeIt() throws Exception { @Test public void lifecycle() throws Exception { routers.values().forEach(r -> r.start()); - dhts.values() - .forEach(dht -> dht.start(Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()), - Duration.ofSeconds(1))); + dhts.values().forEach(dht -> dht.start(Duration.ofSeconds(1))); KERL kerl = dhts.values().stream().findFirst().get().asKERL(); Stereotomy controller = new StereotomyImpl(new MemKeyStore(), kerl, secureRandom); - var i = controller.newIdentifier() ; + var i = controller.newIdentifier(); var digest = DigestAlgorithm.BLAKE3_256.digest("digest seal".getBytes()); - var event = EventCoordinates.of(kerl.getKeyEvent(i.getLastEstablishmentEvent()) ); + var event = EventCoordinates.of(kerl.getKeyEvent(i.getLastEstablishmentEvent())); var seals = List.of(DigestSeal.construct(digest), DigestSeal.construct(digest), CoordinatesSeal.construct(event)); i.rotate(); - i.seal(InteractionSpecification.newBuilder()) ; + i.seal(InteractionSpecification.newBuilder()); i.rotate(RotationSpecification.newBuilder().addAllSeals(seals)); i.seal(InteractionSpecification.newBuilder().addAllSeals(seals)); i.rotate(); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlDhtTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlDhtTest.java index 54512e054..3cedc94f8 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlDhtTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlDhtTest.java @@ -13,7 +13,6 @@ import java.security.SecureRandom; import java.time.Duration; import java.util.Collections; -import java.util.concurrent.Executors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -26,11 +25,9 @@ public class KerlDhtTest extends AbstractDhtTest { @Test public void smokin() throws Exception { var entropy = SecureRandom.getInstance("SHA1PRNG"); - entropy.setSeed(new byte[]{6, 6, 6}); + entropy.setSeed(new byte[] { 6, 6, 6 }); routers.values().forEach(r -> r.start()); - dhts.values() - .forEach(dht -> dht.start(Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()), - Duration.ofMillis(10))); + dhts.values().forEach(dht -> dht.start(Duration.ofMillis(10))); // inception var specification = IdentifierSpecification.newBuilder(); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java index 9a7ed7f38..5bdb0b016 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlTest.java @@ -6,31 +6,10 @@ */ package com.salesforce.apollo.thoth; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.security.SecureRandom; -import java.time.Duration; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - -import org.joou.ULong; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - import com.salesforce.apollo.crypto.DigestAlgorithm; import com.salesforce.apollo.crypto.SigningThreshold; import com.salesforce.apollo.crypto.SigningThreshold.Unweighted; -import com.salesforce.apollo.stereotomy.ControlledIdentifier; -import com.salesforce.apollo.stereotomy.EventCoordinates; -import com.salesforce.apollo.stereotomy.KERL; -import com.salesforce.apollo.stereotomy.KeyCoordinates; -import com.salesforce.apollo.stereotomy.Stereotomy; -import com.salesforce.apollo.stereotomy.StereotomyImpl; +import com.salesforce.apollo.stereotomy.*; import com.salesforce.apollo.stereotomy.event.EstablishmentEvent; import com.salesforce.apollo.stereotomy.event.KeyEvent; import com.salesforce.apollo.stereotomy.event.Seal.CoordinatesSeal; @@ -43,10 +22,18 @@ import com.salesforce.apollo.stereotomy.identifier.spec.RotationSpecification; import com.salesforce.apollo.stereotomy.mem.MemKeyStore; import com.salesforce.apollo.utils.Hex; +import org.joou.ULong; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.security.SecureRandom; +import java.time.Duration; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; /** * @author hal.hildebrand - * */ public class KerlTest extends AbstractDhtTest { private SecureRandom secureRandom; @@ -59,10 +46,8 @@ public void beforeIt() throws Exception { @Test public void delegated() throws Exception { - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(getCardinality(), - Thread.ofVirtual().factory()); routers.values().forEach(r -> r.start()); - dhts.values().forEach(dht -> dht.start(scheduler, Duration.ofSeconds(1))); + dhts.values().forEach(dht -> dht.start(Duration.ofSeconds(1))); KERL kerl = dhts.values().stream().findFirst().get().asKERL(); @@ -87,7 +72,8 @@ public void delegated() throws Exception { assertEquals(1, delegated.getKeys().size()); assertNotNull(delegated.getKeys().get(0)); - EstablishmentEvent lastEstablishmentEvent = (EstablishmentEvent) kerl.getKeyEvent(delegated.getLastEstablishmentEvent()); + EstablishmentEvent lastEstablishmentEvent = (EstablishmentEvent) kerl.getKeyEvent( + delegated.getLastEstablishmentEvent()); assertEquals(delegated.getKeys().get(0), lastEstablishmentEvent.getKeys().get(0)); var keyCoordinates = KeyCoordinates.of(lastEstablishmentEvent, 0); @@ -119,7 +105,7 @@ public void delegated() throws Exception { assertEquals(lastEstablishmentEvent.hash(DigestAlgorithm.DEFAULT), delegated.getDigest()); // lastEvent - assertNull(kerl.getKeyEvent(delegated.getLastEvent()) ); + assertNull(kerl.getKeyEvent(delegated.getLastEvent())); // delegation assertTrue(delegated.getDelegatingIdentifier().isPresent()); @@ -127,36 +113,34 @@ public void delegated() throws Exception { assertTrue(delegated.isDelegated()); var digest = DigestAlgorithm.BLAKE3_256.digest("digest seal".getBytes()); - var event = EventCoordinates.of(kerl.getKeyEvent(delegated.getLastEstablishmentEvent()) ); + var event = EventCoordinates.of(kerl.getKeyEvent(delegated.getLastEstablishmentEvent())); var seals = List.of(DigestSeal.construct(digest), DigestSeal.construct(digest), CoordinatesSeal.construct(event)); - delegated.rotate() ; - delegated.seal(InteractionSpecification.newBuilder()) ; - delegated.rotate(RotationSpecification.newBuilder().addAllSeals(seals)) ; - delegated.seal(InteractionSpecification.newBuilder().addAllSeals(seals)) ; + delegated.rotate(); + delegated.seal(InteractionSpecification.newBuilder()); + delegated.rotate(RotationSpecification.newBuilder().addAllSeals(seals)); + delegated.seal(InteractionSpecification.newBuilder().addAllSeals(seals)); } @Test public void direct() throws Exception { routers.values().forEach(r -> r.start()); - dhts.values() - .forEach(dht -> dht.start(Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()), - Duration.ofSeconds(1))); + dhts.values().forEach(dht -> dht.start(Duration.ofSeconds(1))); KERL kerl = dhts.values().stream().findFirst().get().asKERL(); Stereotomy controller = new StereotomyImpl(new MemKeyStore(), kerl, secureRandom); - var i = controller.newIdentifier() ; + var i = controller.newIdentifier(); var digest = DigestAlgorithm.BLAKE3_256.digest("digest seal".getBytes()); - var event = EventCoordinates.of(kerl.getKeyEvent(i.getLastEstablishmentEvent()) ); + var event = EventCoordinates.of(kerl.getKeyEvent(i.getLastEstablishmentEvent())); var seals = List.of(DigestSeal.construct(digest), DigestSeal.construct(digest), CoordinatesSeal.construct(event)); i.rotate(); - i.seal(InteractionSpecification.newBuilder()) ; + i.seal(InteractionSpecification.newBuilder()); i.rotate(RotationSpecification.newBuilder().addAllSeals(seals)); i.seal(InteractionSpecification.newBuilder().addAllSeals(seals)); i.rotate();