diff --git a/memberships/pom.xml b/memberships/pom.xml
index 154f335a6..d1d312b63 100644
--- a/memberships/pom.xml
+++ b/memberships/pom.xml
@@ -15,6 +15,16 @@
org.slf4j
slf4j-api
+
+ com.salesforce.apollo
+ domain-kqueue
+ provided
+
+
+ com.salesforce.apollo
+ domain-epoll
+ provided
+
com.salesforce.apollo
protocols
@@ -40,13 +50,10 @@
native-lib-loader
- com.salesforce.apollo
- domain-epoll
- provided
-
-
- com.salesforce.apollo
- domain-kqueue
+ io.netty
+ netty-transport-native-unix-common
+ ${netty.version}
+ ${os.detected.classifier}
provided
@@ -77,4 +84,51 @@
test
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.7.1
+
+
+
+
+
+
+ mac-domain
+
+
+ mac
+
+
+
+
+ io.netty
+ netty-transport-native-kqueue
+ ${netty.version}
+ ${os.detected.classifier}
+ provided
+
+
+
+
+ linux-domain
+
+
+ linux
+
+
+
+
+ io.netty
+ netty-transport-native-epoll
+ ${netty.version}
+ ${os.detected.classifier}
+ provided
+
+
+
+
diff --git a/model/pom.xml b/model/pom.xml
index 8ca8bbc73..742499992 100644
--- a/model/pom.xml
+++ b/model/pom.xml
@@ -40,7 +40,9 @@
org.scijava
native-lib-loader
+
+
org.hamcrest
hamcrest
@@ -279,4 +281,42 @@
+
+
+
+
+ mac-domain
+
+
+ mac
+
+
+
+
+ io.netty
+ netty-transport-native-kqueue
+ ${netty.version}
+ ${os.detected.classifier}
+ test
+
+
+
+
+ linux-domain
+
+
+ linux
+
+
+
+
+ io.netty
+ netty-transport-native-epoll
+ ${netty.version}
+ ${os.detected.classifier}
+ test
+
+
+
+
diff --git a/protocols/pom.xml b/protocols/pom.xml
index 604caacf8..4bfabe235 100644
--- a/protocols/pom.xml
+++ b/protocols/pom.xml
@@ -37,6 +37,12 @@
io.grpc
grpc-netty
+
+ io.netty
+ netty-transport-native-unix-common
+ ${os.detected.classifier}
+ provided
+
io.perfmark
perfmark-api
@@ -114,13 +120,6 @@
-
- io.netty
- netty-transport-native-unix-common
- ${netty.version}
- ${os.detected.classifier}
- provided
-
io.netty
netty-transport-native-kqueue
@@ -138,19 +137,12 @@
-
- io.netty
- netty-transport-native-unix-common
- ${netty.version}
- ${os.detected.classifier}
- test
-
io.netty
netty-transport-native-epoll
${netty.version}
${os.detected.classifier}
- optional
+ provided
diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java
index dea7518ab..ea1cd7ff9 100644
--- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java
+++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java
@@ -96,7 +96,7 @@ public class KerlDHT implements ProtoKERLService {
private final Context context;
private final CommonCommunications dhtComms;
private final double fpr;
- private final Duration frequency;
+ private final Duration operationsFrequency;
private final CachingKERL kerl;
private final UniKERLDirectPooled kerlPool;
private final KerlSpace kerlSpace;
@@ -107,19 +107,19 @@ public class KerlDHT implements ProtoKERLService {
private final ScheduledExecutorService scheduler;
private final Service service = new Service();
private final AtomicBoolean started = new AtomicBoolean();
- private final TemporalAmount timeout;
+ private final TemporalAmount operationTimeout;
- public KerlDHT(Duration frequency, Context extends Member> context, SigningMember member,
+ public KerlDHT(Duration operationsFrequency, Context extends Member> context, SigningMember member,
BiFunction wrap, JdbcConnectionPool connectionPool,
- DigestAlgorithm digestAlgorithm, Router communications, TemporalAmount timeout,
+ DigestAlgorithm digestAlgorithm, Router communications, TemporalAmount operationTimeout,
double falsePositiveRate, StereotomyMetrics metrics) {
@SuppressWarnings("unchecked")
final var casting = (Context) context;
this.context = casting;
this.member = member;
- this.timeout = timeout;
+ this.operationTimeout = operationTimeout;
this.fpr = falsePositiveRate;
- this.frequency = frequency;
+ this.operationsFrequency = operationsFrequency;
this.scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
this.cache = new CachingKERL(f -> {
try {
@@ -156,11 +156,11 @@ public KerlDHT(Duration frequency, Context extends Member> context, SigningMem
this.ani = new Ani(member.getId(), asKERL());
}
- public KerlDHT(Duration frequency, Context extends Member> context, SigningMember member,
+ public KerlDHT(Duration operationsFrequency, Context extends Member> context, SigningMember member,
JdbcConnectionPool connectionPool, DigestAlgorithm digestAlgorithm, Router communications,
- TemporalAmount timeout, double falsePositiveRate, StereotomyMetrics metrics) {
- this(frequency, context, member, (t, k) -> k, connectionPool, digestAlgorithm, communications, timeout,
- falsePositiveRate, metrics);
+ TemporalAmount operationTimeout, double falsePositiveRate, StereotomyMetrics metrics) {
+ this(operationsFrequency, context, member, (t, k) -> k, connectionPool, digestAlgorithm, communications,
+ operationTimeout, falsePositiveRate, metrics);
}
public static void updateLocationHash(Identifier identifier, DigestAlgorithm digestAlgorithm, DSLContext dsl) {
@@ -195,21 +195,23 @@ public KeyState_ append(AttachmentEvent event) {
if (identifier == null) {
return null;
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates()
- .iterate(identifier, null,
- (link, r) -> link.append(
- Collections.emptyList(),
- Collections.singletonList(event)),
- null,
- (tally, futureSailor, destination) -> mutate(
- gathered, futureSailor, identifier,
- isTimedOut, tally, destination,
- "append events"),
- t -> completeIt(result, gathered));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates()
+ .iterate(identifier, null,
+ (link, r) -> link.append(
+ Collections.emptyList(),
+ Collections.singletonList(
+ event)), null,
+ (tally, futureSailor, destination) -> mutate(
+ gathered, futureSailor,
+ identifier, isTimedOut,
+ tally, destination,
+ "append events"),
+ t -> completeIt(result,
+ gathered));
try {
List s = result.get().getKeyStatesList();
return s.isEmpty() ? null : s.getFirst();
@@ -231,19 +233,21 @@ public List append(KERL_ kerl) {
if (identifier == null) {
return completeIt(Collections.emptyList());
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates()
- .iterate(identifier, null,
- (link, r) -> link.append(kerl),
- null,
- (tally, futureSailor, destination) -> mutate(
- gathered, futureSailor, identifier,
- isTimedOut, tally, destination,
- "append kerl"),
- t -> completeIt(result, gathered));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates()
+ .iterate(identifier, null,
+ (link, r) -> link.append(
+ kerl), null,
+ (tally, futureSailor, destination) -> mutate(
+ gathered, futureSailor,
+ identifier, isTimedOut,
+ tally, destination,
+ "append kerl"),
+ t -> completeIt(result,
+ gathered));
try {
return result.get().getKeyStatesList();
} catch (InterruptedException e) {
@@ -259,20 +263,22 @@ public KeyState_ append(KeyEvent_ event) {
if (identifier == null) {
return null;
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates()
- .iterate(identifier, null,
- (link, r) -> link.append(
- Collections.singletonList(event)),
- null,
- (tally, futureSailor, destination) -> mutate(
- gathered, futureSailor, identifier,
- isTimedOut, tally, destination,
- "append events"),
- t -> completeIt(result, gathered));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates()
+ .iterate(identifier, null,
+ (link, r) -> link.append(
+ Collections.singletonList(
+ event)), null,
+ (tally, futureSailor, destination) -> mutate(
+ gathered, futureSailor,
+ identifier, isTimedOut,
+ tally, destination,
+ "append events"),
+ t -> completeIt(result,
+ gathered));
try {
var ks = result.get();
return ks.getKeyStatesCount() == 0 ? KeyState_.getDefaultInstance() : ks.getKeyStatesList().get(0);
@@ -320,19 +326,21 @@ public Empty appendAttachments(List events) {
if (identifier == null) {
return completeIt(Empty.getDefaultInstance());
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates()
- .iterate(identifier, null,
- (link, r) -> link.appendAttachments(
- events), null,
- (tally, futureSailor, destination) -> mutate(
- gathered, futureSailor, identifier,
- isTimedOut, tally, destination,
- "append attachments"),
- t -> completeIt(result, gathered));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates()
+ .iterate(identifier, null,
+ (link, r) -> link.appendAttachments(
+ events), null,
+ (tally, futureSailor, destination) -> mutate(
+ gathered, futureSailor,
+ identifier, isTimedOut,
+ tally, destination,
+ "append attachments"),
+ t -> completeIt(result,
+ gathered));
return Empty.getDefaultInstance();
}
@@ -345,19 +353,21 @@ public Empty appendValidations(Validations validations) {
if (identifier == null) {
return completeIt(null);
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates()
- .iterate(identifier, null,
- (link, r) -> link.appendValidations(
- validations), null,
- (tally, futureSailor, destination) -> mutate(
- gathered, futureSailor, identifier,
- isTimedOut, tally, destination,
- "append validations"),
- t -> completeIt(result, gathered));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates()
+ .iterate(identifier, null,
+ (link, r) -> link.appendValidations(
+ validations), null,
+ (tally, futureSailor, destination) -> mutate(
+ gathered, futureSailor,
+ identifier, isTimedOut,
+ tally, destination,
+ "append validations"),
+ t -> completeIt(result,
+ gathered));
try {
return result.get();
} catch (InterruptedException e) {
@@ -396,26 +406,26 @@ public Attachment getAttachment(EventCoords coordinates) {
if (identifier == null) {
return completeIt(Attachment.getDefaultInstance());
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates()
- .iterate(identifier, null,
- (link, r) -> link.getAttachment(
- coordinates),
- () -> failedMajority(result,
- maxCount(
- gathered)),
- (tally, futureSailor, destination) -> read(
- result, gathered, tally,
- futureSailor, identifier,
- isTimedOut, destination,
- "get attachment",
- Attachment.getDefaultInstance()),
- t -> failedMajority(result,
- maxCount(
- gathered)));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates()
+ .iterate(identifier, null,
+ (link, r) -> link.getAttachment(
+ coordinates),
+ () -> failedMajority(
+ result,
+ maxCount(gathered)),
+ (tally, futureSailor, destination) -> read(
+ result, gathered, tally,
+ futureSailor, identifier,
+ isTimedOut, destination,
+ "get attachment",
+ Attachment.getDefaultInstance()),
+ t -> failedMajority(
+ result,
+ maxCount(gathered)));
try {
return result.get();
} catch (InterruptedException e) {
@@ -435,25 +445,26 @@ public KERL_ getKERL(Ident identifier) {
if (digest == null) {
return completeIt(KERL_.getDefaultInstance());
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates()
- .iterate(digest, null,
- (link, r) -> link.getKERL(
- identifier),
- () -> failedMajority(result,
- maxCount(
- gathered)),
- (tally, futureSailor, destination) -> read(
- result, gathered, tally,
- futureSailor, digest, isTimedOut,
- destination, "get kerl",
- KERL_.getDefaultInstance()),
- t -> failedMajority(result,
- maxCount(
- gathered)));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates()
+ .iterate(digest, null,
+ (link, r) -> link.getKERL(
+ identifier),
+ () -> failedMajority(
+ result,
+ maxCount(gathered)),
+ (tally, futureSailor, destination) -> read(
+ result, gathered, tally,
+ futureSailor, digest,
+ isTimedOut, destination,
+ "get kerl",
+ KERL_.getDefaultInstance()),
+ t -> failedMajority(
+ result,
+ maxCount(gathered)));
try {
return result.get();
} catch (InterruptedException e) {
@@ -474,25 +485,26 @@ public KeyEvent_ getKeyEvent(EventCoords coordinates) {
if (digest == null) {
return completeIt(KeyEvent_.getDefaultInstance());
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates()
- .iterate(digest, null,
- (link, r) -> link.getKeyEvent(
- coordinates),
- () -> failedMajority(result,
- maxCount(
- gathered)),
- (tally, futureSailor, destination) -> read(
- result, gathered, tally,
- futureSailor, digest, isTimedOut,
- destination, "get key event",
- KeyEvent_.getDefaultInstance()),
- t -> failedMajority(result,
- maxCount(
- gathered)));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates()
+ .iterate(digest, null,
+ (link, r) -> link.getKeyEvent(
+ coordinates),
+ () -> failedMajority(
+ result,
+ maxCount(gathered)),
+ (tally, futureSailor, destination) -> read(
+ result, gathered, tally,
+ futureSailor, digest,
+ isTimedOut, destination,
+ "get key event",
+ KeyEvent_.getDefaultInstance()),
+ t -> failedMajority(
+ result,
+ maxCount(gathered)));
try {
return result.get();
} catch (InterruptedException e) {
@@ -513,26 +525,26 @@ public KeyState_ getKeyState(EventCoords coordinates) {
if (digest == null) {
return completeIt(KeyState_.getDefaultInstance());
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates()
- .iterate(digest, null,
- (link, r) -> link.getKeyState(
- coordinates),
- () -> failedMajority(result,
- maxCount(
- gathered)),
- (tally, futureSailor, destination) -> read(
- result, gathered, tally,
- futureSailor, digest, isTimedOut,
- destination,
- "get key state for coordinates",
- KeyState_.getDefaultInstance()),
- t -> failedMajority(result,
- maxCount(
- gathered)));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates()
+ .iterate(digest, null,
+ (link, r) -> link.getKeyState(
+ coordinates),
+ () -> failedMajority(
+ result,
+ maxCount(gathered)),
+ (tally, futureSailor, destination) -> read(
+ result, gathered, tally,
+ futureSailor, digest,
+ isTimedOut, destination,
+ "get key state for coordinates",
+ KeyState_.getDefaultInstance()),
+ t -> failedMajority(
+ result,
+ maxCount(gathered)));
try {
return result.get();
} catch (InterruptedException e) {
@@ -557,26 +569,26 @@ public KeyState_ getKeyState(Ident identifier, long sequenceNumber) {
return completeIt(KeyState_.getDefaultInstance());
}
var identAndSeq = IdentAndSeq.newBuilder().setIdentifier(identifier).setSequenceNumber(sequenceNumber).build();
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates()
- .iterate(digest, null,
- (link, r) -> link.getKeyState(
- identAndSeq),
- () -> failedMajority(result,
- maxCount(
- gathered)),
- (tally, futureSailor, destination) -> read(
- result, gathered, tally,
- futureSailor, digest, isTimedOut,
- destination,
- "get key state for coordinates",
- KeyState_.getDefaultInstance()),
- t -> failedMajority(result,
- maxCount(
- gathered)));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates()
+ .iterate(digest, null,
+ (link, r) -> link.getKeyState(
+ identAndSeq),
+ () -> failedMajority(
+ result,
+ maxCount(gathered)),
+ (tally, futureSailor, destination) -> read(
+ result, gathered, tally,
+ futureSailor, digest,
+ isTimedOut, destination,
+ "get key state for coordinates",
+ KeyState_.getDefaultInstance()),
+ t -> failedMajority(
+ result,
+ maxCount(gathered)));
try {
return result.get();
} catch (InterruptedException e) {
@@ -597,25 +609,25 @@ public KeyState_ getKeyState(Ident identifier) {
if (digest == null) {
return completeIt(KeyState_.getDefaultInstance());
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).iterate(digest, null,
- (link, r) -> link.getKeyState(
- identifier),
- () -> failedMajority(result,
- maxCount(
- gathered)),
- (tally, futureSailor, destination) -> read(
- result, gathered, tally,
- futureSailor, digest, isTimedOut,
- destination,
- "get current key state",
- KeyState_.getDefaultInstance()),
- t -> failedMajority(result,
- maxCount(
- gathered)));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(digest, null,
+ (link, r) -> link.getKeyState(
+ identifier),
+ () -> failedMajority(
+ result,
+ maxCount(gathered)),
+ (tally, futureSailor, destination) -> read(
+ result, gathered, tally,
+ futureSailor, digest,
+ isTimedOut, destination,
+ "get current key state",
+ KeyState_.getDefaultInstance()),
+ t -> failedMajority(
+ result,
+ maxCount(gathered)));
try {
return result.get();
} catch (InterruptedException e) {
@@ -636,25 +648,25 @@ public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinat
if (digest == null) {
return completeIt(KeyStateWithAttachments_.getDefaultInstance());
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).iterate(digest, null,
- (link, r) -> link.getKeyStateWithAttachments(
- coordinates),
- () -> failedMajority(result,
- maxCount(
- gathered)),
- (tally, futureSailor, destination) -> read(
- result, gathered, tally,
- futureSailor, digest, isTimedOut,
- destination,
- "get key state with attachments",
- KeyStateWithAttachments_.getDefaultInstance()),
- t -> failedMajority(result,
- maxCount(
- gathered)));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(digest, null,
+ (link, r) -> link.getKeyStateWithAttachments(
+ coordinates),
+ () -> failedMajority(
+ result,
+ maxCount(gathered)),
+ (tally, futureSailor, destination) -> read(
+ result, gathered, tally,
+ futureSailor, digest,
+ isTimedOut, destination,
+ "get key state with attachments",
+ KeyStateWithAttachments_.getDefaultInstance()),
+ t -> failedMajority(
+ result,
+ maxCount(gathered)));
try {
return result.get();
} catch (InterruptedException e) {
@@ -676,25 +688,25 @@ public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndVal
if (digest == null) {
return completeIt(KeyStateWithEndorsementsAndValidations_.getDefaultInstance());
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).iterate(digest, null,
- (link, r) -> link.getKeyStateWithEndorsementsAndValidations(
- coordinates),
- () -> failedMajority(result,
- maxCount(
- gathered)),
- (tally, futureSailor, destination) -> read(
- result, gathered, tally,
- futureSailor, digest, isTimedOut,
- destination,
- "get key state with endorsements",
- KeyStateWithEndorsementsAndValidations_.getDefaultInstance()),
- t -> failedMajority(result,
- maxCount(
- gathered)));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(digest, null,
+ (link, r) -> link.getKeyStateWithEndorsementsAndValidations(
+ coordinates),
+ () -> failedMajority(
+ result,
+ maxCount(gathered)),
+ (tally, futureSailor, destination) -> read(
+ result, gathered, tally,
+ futureSailor, digest,
+ isTimedOut, destination,
+ "get key state with endorsements",
+ KeyStateWithEndorsementsAndValidations_.getDefaultInstance()),
+ t -> failedMajority(
+ result,
+ maxCount(gathered)));
try {
return result.get();
} catch (InterruptedException e) {
@@ -715,25 +727,25 @@ public Validations getValidations(EventCoords coordinates) {
if (identifier == null) {
return completeIt(Validations.getDefaultInstance());
}
- Instant timedOut = Instant.now().plus(timeout);
+ Instant timedOut = Instant.now().plus(operationTimeout);
Supplier isTimedOut = () -> Instant.now().isAfter(timedOut);
var result = new CompletableFuture();
HashMultiset gathered = HashMultiset.create();
- new RingIterator<>(frequency, context, member, scheduler, dhtComms).iterate(identifier, null,
- (link, r) -> link.getValidations(
- coordinates),
- () -> failedMajority(result,
- maxCount(
- gathered)),
- (tally, futureSailor, destination) -> read(
- result, gathered, tally,
- futureSailor, identifier,
- isTimedOut, destination,
- "get validations",
- Validations.getDefaultInstance()),
- t -> failedMajority(result,
- maxCount(
- gathered)));
+ new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null,
+ (link, r) -> link.getValidations(
+ coordinates),
+ () -> failedMajority(
+ result,
+ maxCount(gathered)),
+ (tally, futureSailor, destination) -> read(
+ result, gathered, tally,
+ futureSailor, identifier,
+ isTimedOut, destination,
+ "get validations",
+ Validations.getDefaultInstance()),
+ t -> failedMajority(
+ result,
+ maxCount(gathered)));
try {
return result.get();
} catch (InterruptedException e) {
diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java b/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java
index 9f435a0ad..435627d76 100644
--- a/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java
+++ b/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java
@@ -21,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
@@ -29,14 +30,23 @@
* @author hal.hildebrand
*/
public class Thoth {
- private static final Logger log = LoggerFactory.getLogger(Thoth.class);
- private final Stereotomy stereotomy;
- private volatile SelfAddressingIdentifier controller;
- private volatile ControlledIdentifier identifier;
- private volatile Consumer pending;
+ private static final Logger log = LoggerFactory.getLogger(
+ Thoth.class);
+ private final Stereotomy stereotomy;
+ private final Consumer> onInception;
+ private volatile SelfAddressingIdentifier controller;
+ private volatile ControlledIdentifier identifier;
+ private volatile Consumer pending;
+ private AtomicBoolean initialized = new AtomicBoolean();
public Thoth(Stereotomy stereotomy) {
+ this(stereotomy, identifier -> {
+ });
+ }
+
+ public Thoth(Stereotomy stereotomy, Consumer> onInception) {
this.stereotomy = stereotomy;
+ this.onInception = onInception;
}
public void commit(EventCoordinates coords) {
@@ -50,14 +60,18 @@ public void commit(EventCoordinates coords) {
}
public SelfAddressingIdentifier identifier() {
- if (identifier == null) {
+ final var current = identifier;
+ if (current == null) {
throw new IllegalStateException("Identifier has not been established");
}
- return identifier.getIdentifier();
+ return current.getIdentifier();
}
public DelegatedInceptionEvent inception(SelfAddressingIdentifier controller,
IdentifierSpecification.Builder specification) {
+ if (initialized.get()) {
+ throw new IllegalStateException("Already initialized for: " + identifier);
+ }
final var inception = stereotomy.newDelegatedIdentifier(controller, specification);
pending = inception(inception);
return inception;
@@ -85,6 +99,9 @@ public DelegatedRotationEvent rotate(RotationSpecification.Builder specification
private Consumer inception(DelegatedInceptionEvent incp) {
return coordinates -> {
+ if (!initialized.compareAndSet(false, true)) {
+ return;
+ }
var commitment = ProtobufEventFactory.INSTANCE.attachment(incp, new AttachmentImpl(
Seal.EventSeal.construct(coordinates.getIdentifier(), coordinates.getDigest(),
coordinates.getSequenceNumber().longValue())));
@@ -92,6 +109,12 @@ private Consumer inception(DelegatedInceptionEvent incp) {
identifier = cid;
controller = (SelfAddressingIdentifier) identifier.getDelegatingIdentifier().get();
pending = null;
+ Thread.ofVirtual().factory().newThread(() -> {
+ log.info("Notifying inception complete for: {} controller: {}", identifier.getIdentifier(), controller);
+ if (onInception != null) {
+ onInception.accept(identifier);
+ }
+ }).start();
log.info("Created delegated identifier: {} controller: {}", identifier.getIdentifier(), controller);
};
}
diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java
index bf5edafbd..523eacd54 100644
--- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java
+++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java
@@ -28,6 +28,10 @@ public class ThothServer extends Thoth_Grpc.Thoth_ImplBase {
private final RotationSpecification.Builder rotation;
private final Thoth thoth;
+ public ThothServer(Thoth thoth) {
+ this(IdentifierSpecification.newBuilder(), RotationSpecification.newBuilder(), thoth);
+ }
+
public ThothServer(IdentifierSpecification.Builder inception,
RotationSpecification.Builder rotation, Thoth thoth) {
this.inception = inception;
@@ -48,6 +52,10 @@ public void commit(EventCoords request, StreamObserver responseObserver)
}
}
+ public Thoth getThoth() {
+ return thoth;
+ }
+
@Override
public void identifier(Empty request, StreamObserver responseObserver) {
try {
diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java
index 7dba13264..93b3e2bf6 100644
--- a/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java
+++ b/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java
@@ -14,9 +14,7 @@
import com.salesforce.apollo.stereotomy.event.protobuf.ProtobufEventFactory;
import com.salesforce.apollo.stereotomy.identifier.Identifier;
import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier;
-import com.salesforce.apollo.stereotomy.identifier.spec.IdentifierSpecification;
import com.salesforce.apollo.stereotomy.identifier.spec.InteractionSpecification;
-import com.salesforce.apollo.stereotomy.identifier.spec.RotationSpecification;
import com.salesforce.apollo.stereotomy.mem.MemKERL;
import com.salesforce.apollo.stereotomy.mem.MemKeyStore;
import com.salesforce.apollo.thoth.grpc.ThothServer;
@@ -55,14 +53,10 @@ public void smokin() throws Exception {
var localId = UUID.randomUUID().toString();
ServerBuilder> serverBuilder = InProcessServerBuilder.forName(localId)
- .addService(
- new ThothServer(IdentifierSpecification.newBuilder(),
- RotationSpecification.newBuilder(),
- new Thoth(stereotomy)));
+ .addService(new ThothServer(new Thoth(stereotomy)));
var server = serverBuilder.build();
server.start();
try {
-
var channel = InProcessChannelBuilder.forName(localId).usePlaintext().build();
var thoth = new ThothClient(channel);