From f03d0545df57bc56db15cbd536f040297abb8675 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Mon, 4 Dec 2023 15:07:31 -0800 Subject: [PATCH] Provide action upon Thoth inception. clean up parameters/etc dep clean up for kq/ep madness, yet again --- memberships/pom.xml | 68 ++- model/pom.xml | 40 ++ protocols/pom.xml | 22 +- .../com/salesforce/apollo/thoth/KerlDHT.java | 432 +++++++++--------- .../com/salesforce/apollo/thoth/Thoth.java | 37 +- .../apollo/thoth/grpc/ThothServer.java | 8 + .../apollo/thoth/ThothServerTest.java | 8 +- 7 files changed, 369 insertions(+), 246 deletions(-) diff --git a/memberships/pom.xml b/memberships/pom.xml index 154f335a6..d1d312b63 100644 --- a/memberships/pom.xml +++ b/memberships/pom.xml @@ -15,6 +15,16 @@ org.slf4j slf4j-api + + com.salesforce.apollo + domain-kqueue + provided + + + com.salesforce.apollo + domain-epoll + provided + com.salesforce.apollo protocols @@ -40,13 +50,10 @@ native-lib-loader - com.salesforce.apollo - domain-epoll - provided - - - com.salesforce.apollo - domain-kqueue + io.netty + netty-transport-native-unix-common + ${netty.version} + ${os.detected.classifier} provided @@ -77,4 +84,51 @@ test + + + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + + + + + mac-domain + + + mac + + + + + io.netty + netty-transport-native-kqueue + ${netty.version} + ${os.detected.classifier} + provided + + + + + linux-domain + + + linux + + + + + io.netty + netty-transport-native-epoll + ${netty.version} + ${os.detected.classifier} + provided + + + + diff --git a/model/pom.xml b/model/pom.xml index 8ca8bbc73..742499992 100644 --- a/model/pom.xml +++ b/model/pom.xml @@ -40,7 +40,9 @@ org.scijava native-lib-loader + + org.hamcrest hamcrest @@ -279,4 +281,42 @@ + + + + + mac-domain + + + mac + + + + + io.netty + netty-transport-native-kqueue + ${netty.version} + ${os.detected.classifier} + test + + + + + linux-domain + + + linux + + + + + io.netty + netty-transport-native-epoll + ${netty.version} + ${os.detected.classifier} + test + + + + diff --git a/protocols/pom.xml b/protocols/pom.xml index 604caacf8..4bfabe235 100644 --- a/protocols/pom.xml +++ b/protocols/pom.xml @@ -37,6 +37,12 @@ io.grpc grpc-netty + + io.netty + netty-transport-native-unix-common + ${os.detected.classifier} + provided + io.perfmark perfmark-api @@ -114,13 +120,6 @@ - - io.netty - netty-transport-native-unix-common - ${netty.version} - ${os.detected.classifier} - provided - io.netty netty-transport-native-kqueue @@ -138,19 +137,12 @@ - - io.netty - netty-transport-native-unix-common - ${netty.version} - ${os.detected.classifier} - test - io.netty netty-transport-native-epoll ${netty.version} ${os.detected.classifier} - optional + provided diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index dea7518ab..ea1cd7ff9 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -96,7 +96,7 @@ public class KerlDHT implements ProtoKERLService { private final Context context; private final CommonCommunications dhtComms; private final double fpr; - private final Duration frequency; + private final Duration operationsFrequency; private final CachingKERL kerl; private final UniKERLDirectPooled kerlPool; private final KerlSpace kerlSpace; @@ -107,19 +107,19 @@ public class KerlDHT implements ProtoKERLService { private final ScheduledExecutorService scheduler; private final Service service = new Service(); private final AtomicBoolean started = new AtomicBoolean(); - private final TemporalAmount timeout; + private final TemporalAmount operationTimeout; - public KerlDHT(Duration frequency, Context context, SigningMember member, + public KerlDHT(Duration operationsFrequency, Context context, SigningMember member, BiFunction wrap, JdbcConnectionPool connectionPool, - DigestAlgorithm digestAlgorithm, Router communications, TemporalAmount timeout, + DigestAlgorithm digestAlgorithm, Router communications, TemporalAmount operationTimeout, double falsePositiveRate, StereotomyMetrics metrics) { @SuppressWarnings("unchecked") final var casting = (Context) context; this.context = casting; this.member = member; - this.timeout = timeout; + this.operationTimeout = operationTimeout; this.fpr = falsePositiveRate; - this.frequency = frequency; + this.operationsFrequency = operationsFrequency; this.scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); this.cache = new CachingKERL(f -> { try { @@ -156,11 +156,11 @@ public KerlDHT(Duration frequency, Context context, SigningMem this.ani = new Ani(member.getId(), asKERL()); } - public KerlDHT(Duration frequency, Context context, SigningMember member, + public KerlDHT(Duration operationsFrequency, Context context, SigningMember member, JdbcConnectionPool connectionPool, DigestAlgorithm digestAlgorithm, Router communications, - TemporalAmount timeout, double falsePositiveRate, StereotomyMetrics metrics) { - this(frequency, context, member, (t, k) -> k, connectionPool, digestAlgorithm, communications, timeout, - falsePositiveRate, metrics); + TemporalAmount operationTimeout, double falsePositiveRate, StereotomyMetrics metrics) { + this(operationsFrequency, context, member, (t, k) -> k, connectionPool, digestAlgorithm, communications, + operationTimeout, falsePositiveRate, metrics); } public static void updateLocationHash(Identifier identifier, DigestAlgorithm digestAlgorithm, DSLContext dsl) { @@ -195,21 +195,23 @@ public KeyState_ append(AttachmentEvent event) { if (identifier == null) { return null; } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates() - .iterate(identifier, null, - (link, r) -> link.append( - Collections.emptyList(), - Collections.singletonList(event)), - null, - (tally, futureSailor, destination) -> mutate( - gathered, futureSailor, identifier, - isTimedOut, tally, destination, - "append events"), - t -> completeIt(result, gathered)); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.append( + Collections.emptyList(), + Collections.singletonList( + event)), null, + (tally, futureSailor, destination) -> mutate( + gathered, futureSailor, + identifier, isTimedOut, + tally, destination, + "append events"), + t -> completeIt(result, + gathered)); try { List s = result.get().getKeyStatesList(); return s.isEmpty() ? null : s.getFirst(); @@ -231,19 +233,21 @@ public List append(KERL_ kerl) { if (identifier == null) { return completeIt(Collections.emptyList()); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates() - .iterate(identifier, null, - (link, r) -> link.append(kerl), - null, - (tally, futureSailor, destination) -> mutate( - gathered, futureSailor, identifier, - isTimedOut, tally, destination, - "append kerl"), - t -> completeIt(result, gathered)); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.append( + kerl), null, + (tally, futureSailor, destination) -> mutate( + gathered, futureSailor, + identifier, isTimedOut, + tally, destination, + "append kerl"), + t -> completeIt(result, + gathered)); try { return result.get().getKeyStatesList(); } catch (InterruptedException e) { @@ -259,20 +263,22 @@ public KeyState_ append(KeyEvent_ event) { if (identifier == null) { return null; } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates() - .iterate(identifier, null, - (link, r) -> link.append( - Collections.singletonList(event)), - null, - (tally, futureSailor, destination) -> mutate( - gathered, futureSailor, identifier, - isTimedOut, tally, destination, - "append events"), - t -> completeIt(result, gathered)); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.append( + Collections.singletonList( + event)), null, + (tally, futureSailor, destination) -> mutate( + gathered, futureSailor, + identifier, isTimedOut, + tally, destination, + "append events"), + t -> completeIt(result, + gathered)); try { var ks = result.get(); return ks.getKeyStatesCount() == 0 ? KeyState_.getDefaultInstance() : ks.getKeyStatesList().get(0); @@ -320,19 +326,21 @@ public Empty appendAttachments(List events) { if (identifier == null) { return completeIt(Empty.getDefaultInstance()); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates() - .iterate(identifier, null, - (link, r) -> link.appendAttachments( - events), null, - (tally, futureSailor, destination) -> mutate( - gathered, futureSailor, identifier, - isTimedOut, tally, destination, - "append attachments"), - t -> completeIt(result, gathered)); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.appendAttachments( + events), null, + (tally, futureSailor, destination) -> mutate( + gathered, futureSailor, + identifier, isTimedOut, + tally, destination, + "append attachments"), + t -> completeIt(result, + gathered)); return Empty.getDefaultInstance(); } @@ -345,19 +353,21 @@ public Empty appendValidations(Validations validations) { if (identifier == null) { return completeIt(null); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates() - .iterate(identifier, null, - (link, r) -> link.appendValidations( - validations), null, - (tally, futureSailor, destination) -> mutate( - gathered, futureSailor, identifier, - isTimedOut, tally, destination, - "append validations"), - t -> completeIt(result, gathered)); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.appendValidations( + validations), null, + (tally, futureSailor, destination) -> mutate( + gathered, futureSailor, + identifier, isTimedOut, + tally, destination, + "append validations"), + t -> completeIt(result, + gathered)); try { return result.get(); } catch (InterruptedException e) { @@ -396,26 +406,26 @@ public Attachment getAttachment(EventCoords coordinates) { if (identifier == null) { return completeIt(Attachment.getDefaultInstance()); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates() - .iterate(identifier, null, - (link, r) -> link.getAttachment( - coordinates), - () -> failedMajority(result, - maxCount( - gathered)), - (tally, futureSailor, destination) -> read( - result, gathered, tally, - futureSailor, identifier, - isTimedOut, destination, - "get attachment", - Attachment.getDefaultInstance()), - t -> failedMajority(result, - maxCount( - gathered))); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates() + .iterate(identifier, null, + (link, r) -> link.getAttachment( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, identifier, + isTimedOut, destination, + "get attachment", + Attachment.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -435,25 +445,26 @@ public KERL_ getKERL(Ident identifier) { if (digest == null) { return completeIt(KERL_.getDefaultInstance()); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates() - .iterate(digest, null, - (link, r) -> link.getKERL( - identifier), - () -> failedMajority(result, - maxCount( - gathered)), - (tally, futureSailor, destination) -> read( - result, gathered, tally, - futureSailor, digest, isTimedOut, - destination, "get kerl", - KERL_.getDefaultInstance()), - t -> failedMajority(result, - maxCount( - gathered))); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates() + .iterate(digest, null, + (link, r) -> link.getKERL( + identifier), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get kerl", + KERL_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -474,25 +485,26 @@ public KeyEvent_ getKeyEvent(EventCoords coordinates) { if (digest == null) { return completeIt(KeyEvent_.getDefaultInstance()); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates() - .iterate(digest, null, - (link, r) -> link.getKeyEvent( - coordinates), - () -> failedMajority(result, - maxCount( - gathered)), - (tally, futureSailor, destination) -> read( - result, gathered, tally, - futureSailor, digest, isTimedOut, - destination, "get key event", - KeyEvent_.getDefaultInstance()), - t -> failedMajority(result, - maxCount( - gathered))); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates() + .iterate(digest, null, + (link, r) -> link.getKeyEvent( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get key event", + KeyEvent_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -513,26 +525,26 @@ public KeyState_ getKeyState(EventCoords coordinates) { if (digest == null) { return completeIt(KeyState_.getDefaultInstance()); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates() - .iterate(digest, null, - (link, r) -> link.getKeyState( - coordinates), - () -> failedMajority(result, - maxCount( - gathered)), - (tally, futureSailor, destination) -> read( - result, gathered, tally, - futureSailor, digest, isTimedOut, - destination, - "get key state for coordinates", - KeyState_.getDefaultInstance()), - t -> failedMajority(result, - maxCount( - gathered))); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates() + .iterate(digest, null, + (link, r) -> link.getKeyState( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get key state for coordinates", + KeyState_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -557,26 +569,26 @@ public KeyState_ getKeyState(Ident identifier, long sequenceNumber) { return completeIt(KeyState_.getDefaultInstance()); } var identAndSeq = IdentAndSeq.newBuilder().setIdentifier(identifier).setSequenceNumber(sequenceNumber).build(); - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).noDuplicates() - .iterate(digest, null, - (link, r) -> link.getKeyState( - identAndSeq), - () -> failedMajority(result, - maxCount( - gathered)), - (tally, futureSailor, destination) -> read( - result, gathered, tally, - futureSailor, digest, isTimedOut, - destination, - "get key state for coordinates", - KeyState_.getDefaultInstance()), - t -> failedMajority(result, - maxCount( - gathered))); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).noDuplicates() + .iterate(digest, null, + (link, r) -> link.getKeyState( + identAndSeq), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get key state for coordinates", + KeyState_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -597,25 +609,25 @@ public KeyState_ getKeyState(Ident identifier) { if (digest == null) { return completeIt(KeyState_.getDefaultInstance()); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).iterate(digest, null, - (link, r) -> link.getKeyState( - identifier), - () -> failedMajority(result, - maxCount( - gathered)), - (tally, futureSailor, destination) -> read( - result, gathered, tally, - futureSailor, digest, isTimedOut, - destination, - "get current key state", - KeyState_.getDefaultInstance()), - t -> failedMajority(result, - maxCount( - gathered))); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(digest, null, + (link, r) -> link.getKeyState( + identifier), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get current key state", + KeyState_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -636,25 +648,25 @@ public KeyStateWithAttachments_ getKeyStateWithAttachments(EventCoords coordinat if (digest == null) { return completeIt(KeyStateWithAttachments_.getDefaultInstance()); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).iterate(digest, null, - (link, r) -> link.getKeyStateWithAttachments( - coordinates), - () -> failedMajority(result, - maxCount( - gathered)), - (tally, futureSailor, destination) -> read( - result, gathered, tally, - futureSailor, digest, isTimedOut, - destination, - "get key state with attachments", - KeyStateWithAttachments_.getDefaultInstance()), - t -> failedMajority(result, - maxCount( - gathered))); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(digest, null, + (link, r) -> link.getKeyStateWithAttachments( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get key state with attachments", + KeyStateWithAttachments_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -676,25 +688,25 @@ public KeyStateWithEndorsementsAndValidations_ getKeyStateWithEndorsementsAndVal if (digest == null) { return completeIt(KeyStateWithEndorsementsAndValidations_.getDefaultInstance()); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).iterate(digest, null, - (link, r) -> link.getKeyStateWithEndorsementsAndValidations( - coordinates), - () -> failedMajority(result, - maxCount( - gathered)), - (tally, futureSailor, destination) -> read( - result, gathered, tally, - futureSailor, digest, isTimedOut, - destination, - "get key state with endorsements", - KeyStateWithEndorsementsAndValidations_.getDefaultInstance()), - t -> failedMajority(result, - maxCount( - gathered))); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(digest, null, + (link, r) -> link.getKeyStateWithEndorsementsAndValidations( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, digest, + isTimedOut, destination, + "get key state with endorsements", + KeyStateWithEndorsementsAndValidations_.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { @@ -715,25 +727,25 @@ public Validations getValidations(EventCoords coordinates) { if (identifier == null) { return completeIt(Validations.getDefaultInstance()); } - Instant timedOut = Instant.now().plus(timeout); + Instant timedOut = Instant.now().plus(operationTimeout); Supplier isTimedOut = () -> Instant.now().isAfter(timedOut); var result = new CompletableFuture(); HashMultiset gathered = HashMultiset.create(); - new RingIterator<>(frequency, context, member, scheduler, dhtComms).iterate(identifier, null, - (link, r) -> link.getValidations( - coordinates), - () -> failedMajority(result, - maxCount( - gathered)), - (tally, futureSailor, destination) -> read( - result, gathered, tally, - futureSailor, identifier, - isTimedOut, destination, - "get validations", - Validations.getDefaultInstance()), - t -> failedMajority(result, - maxCount( - gathered))); + new RingIterator<>(operationsFrequency, context, member, scheduler, dhtComms).iterate(identifier, null, + (link, r) -> link.getValidations( + coordinates), + () -> failedMajority( + result, + maxCount(gathered)), + (tally, futureSailor, destination) -> read( + result, gathered, tally, + futureSailor, identifier, + isTimedOut, destination, + "get validations", + Validations.getDefaultInstance()), + t -> failedMajority( + result, + maxCount(gathered))); try { return result.get(); } catch (InterruptedException e) { diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java b/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java index 9f435a0ad..435627d76 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/Thoth.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; /** @@ -29,14 +30,23 @@ * @author hal.hildebrand */ public class Thoth { - private static final Logger log = LoggerFactory.getLogger(Thoth.class); - private final Stereotomy stereotomy; - private volatile SelfAddressingIdentifier controller; - private volatile ControlledIdentifier identifier; - private volatile Consumer pending; + private static final Logger log = LoggerFactory.getLogger( + Thoth.class); + private final Stereotomy stereotomy; + private final Consumer> onInception; + private volatile SelfAddressingIdentifier controller; + private volatile ControlledIdentifier identifier; + private volatile Consumer pending; + private AtomicBoolean initialized = new AtomicBoolean(); public Thoth(Stereotomy stereotomy) { + this(stereotomy, identifier -> { + }); + } + + public Thoth(Stereotomy stereotomy, Consumer> onInception) { this.stereotomy = stereotomy; + this.onInception = onInception; } public void commit(EventCoordinates coords) { @@ -50,14 +60,18 @@ public void commit(EventCoordinates coords) { } public SelfAddressingIdentifier identifier() { - if (identifier == null) { + final var current = identifier; + if (current == null) { throw new IllegalStateException("Identifier has not been established"); } - return identifier.getIdentifier(); + return current.getIdentifier(); } public DelegatedInceptionEvent inception(SelfAddressingIdentifier controller, IdentifierSpecification.Builder specification) { + if (initialized.get()) { + throw new IllegalStateException("Already initialized for: " + identifier); + } final var inception = stereotomy.newDelegatedIdentifier(controller, specification); pending = inception(inception); return inception; @@ -85,6 +99,9 @@ public DelegatedRotationEvent rotate(RotationSpecification.Builder specification private Consumer inception(DelegatedInceptionEvent incp) { return coordinates -> { + if (!initialized.compareAndSet(false, true)) { + return; + } var commitment = ProtobufEventFactory.INSTANCE.attachment(incp, new AttachmentImpl( Seal.EventSeal.construct(coordinates.getIdentifier(), coordinates.getDigest(), coordinates.getSequenceNumber().longValue()))); @@ -92,6 +109,12 @@ private Consumer inception(DelegatedInceptionEvent incp) { identifier = cid; controller = (SelfAddressingIdentifier) identifier.getDelegatingIdentifier().get(); pending = null; + Thread.ofVirtual().factory().newThread(() -> { + log.info("Notifying inception complete for: {} controller: {}", identifier.getIdentifier(), controller); + if (onInception != null) { + onInception.accept(identifier); + } + }).start(); log.info("Created delegated identifier: {} controller: {}", identifier.getIdentifier(), controller); }; } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java index bf5edafbd..523eacd54 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/ThothServer.java @@ -28,6 +28,10 @@ public class ThothServer extends Thoth_Grpc.Thoth_ImplBase { private final RotationSpecification.Builder rotation; private final Thoth thoth; + public ThothServer(Thoth thoth) { + this(IdentifierSpecification.newBuilder(), RotationSpecification.newBuilder(), thoth); + } + public ThothServer(IdentifierSpecification.Builder inception, RotationSpecification.Builder rotation, Thoth thoth) { this.inception = inception; @@ -48,6 +52,10 @@ public void commit(EventCoords request, StreamObserver responseObserver) } } + public Thoth getThoth() { + return thoth; + } + @Override public void identifier(Empty request, StreamObserver responseObserver) { try { diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java index 7dba13264..93b3e2bf6 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/ThothServerTest.java @@ -14,9 +14,7 @@ import com.salesforce.apollo.stereotomy.event.protobuf.ProtobufEventFactory; import com.salesforce.apollo.stereotomy.identifier.Identifier; import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; -import com.salesforce.apollo.stereotomy.identifier.spec.IdentifierSpecification; import com.salesforce.apollo.stereotomy.identifier.spec.InteractionSpecification; -import com.salesforce.apollo.stereotomy.identifier.spec.RotationSpecification; import com.salesforce.apollo.stereotomy.mem.MemKERL; import com.salesforce.apollo.stereotomy.mem.MemKeyStore; import com.salesforce.apollo.thoth.grpc.ThothServer; @@ -55,14 +53,10 @@ public void smokin() throws Exception { var localId = UUID.randomUUID().toString(); ServerBuilder serverBuilder = InProcessServerBuilder.forName(localId) - .addService( - new ThothServer(IdentifierSpecification.newBuilder(), - RotationSpecification.newBuilder(), - new Thoth(stereotomy))); + .addService(new ThothServer(new Thoth(stereotomy))); var server = serverBuilder.build(); server.start(); try { - var channel = InProcessChannelBuilder.forName(localId).usePlaintext().build(); var thoth = new ThothClient(channel);