diff --git a/gorgoneion-client/src/main/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClient.java b/gorgoneion-client/src/main/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClient.java index f448a304d..834e8c407 100644 --- a/gorgoneion-client/src/main/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClient.java +++ b/gorgoneion-client/src/main/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClient.java @@ -21,7 +21,6 @@ import java.time.Clock; import java.time.Duration; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; /** @@ -44,7 +43,6 @@ public GorgoneionClient(ControlledIdentifierMember member, Function(); KERL_ application = member.kerl(); var fs = client.apply(application, timeout); Credentials credentials = credentials(fs); diff --git a/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java b/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java index 6687b42c8..dd232d81a 100644 --- a/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java +++ b/gorgoneion-client/src/test/java/com/salesforce/apollo/gorgoneion/client/GorgoneionClientTest.java @@ -26,12 +26,18 @@ import com.salesforce.apollo.stereotomy.mem.MemKeyStore; import com.salesforce.apollo.stereotomy.services.proto.ProtoEventObserver; import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.security.SecureRandom; import java.time.Duration; import java.util.Collections; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.IntStream; @@ -119,8 +125,15 @@ public void multiSmoke() throws Exception { final var prefix = UUID.randomUUID().toString(); final var members = IntStream.range(0, 10).mapToObj(i -> new ControlledIdentifierMember(stereotomy.newIdentifier())).toList(); + var countdown = new CountDownLatch(3); // The kerl observer to publish admitted client KERLs to var observer = mock(ProtoEventObserver.class); + doAnswer(new Answer() { + public Void answer(InvocationOnMock invocation) { + countdown.countDown(); + return null; + } + }).when(observer).publish(Mockito.any(), Mockito.anyList()); var context = Context.newBuilder().setCardinality(members.size()).build(); for (ControlledIdentifierMember member : members) { @@ -128,7 +141,7 @@ public void multiSmoke() throws Exception { } final var parameters = Parameters.newBuilder().setKerl(kerl).build(); final var exec = Executors.newVirtualThreadPerTaskExecutor(); - @SuppressWarnings("unused") final var gorgons = members.stream().map(m -> { + members.stream().map(m -> { final var router = new LocalServer(prefix, m, exec).router(ServerConnectionCache.newBuilder().setTarget(2), exec); router.start(); @@ -170,7 +183,6 @@ public void multiSmoke() throws Exception { assertNotEquals(Validations.getDefaultInstance(), invitation); assertTrue(invitation.getValidationsCount() >= context.majority()); - // Verify client KERL published - verify(observer, times(3)).publish(client.kerl(), Collections.singletonList(invitation)); + assertTrue(countdown.await(1, TimeUnit.SECONDS)); } } diff --git a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java index e0e7930f3..4fe417c52 100644 --- a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java +++ b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java @@ -262,10 +262,17 @@ private Validations register(Credentials request) { member.getId()); } }, scheduler, parameters.frequency()); - return validated.thenApply(v -> { - notarize(request, v); - return v; - }).getNow(null); + try { + return validated.thenApply(v -> { + notarize(request, v); + return v; + }).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } private Validation_ validate(Credentials credentials) { diff --git a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Parameters.java b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Parameters.java index 796583a96..237359686 100644 --- a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Parameters.java +++ b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Parameters.java @@ -12,8 +12,6 @@ import java.time.Clock; import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; import java.util.function.Predicate; /** diff --git a/stereotomy-services/src/main/java/com/salesforce/apollo/stereotomy/services/grpc/kerl/KERLAdapter.java b/stereotomy-services/src/main/java/com/salesforce/apollo/stereotomy/services/grpc/kerl/KERLAdapter.java index 15c27fbb1..226ca5bf9 100644 --- a/stereotomy-services/src/main/java/com/salesforce/apollo/stereotomy/services/grpc/kerl/KERLAdapter.java +++ b/stereotomy-services/src/main/java/com/salesforce/apollo/stereotomy/services/grpc/kerl/KERLAdapter.java @@ -40,7 +40,13 @@ public KERLAdapter(ProtoKERLService kerl, DigestAlgorithm algorithm) { @Override public KeyState append(KeyEvent event) { - return new KeyStateImpl(kerl.append(Collections.singletonList(event.toKeyEvent_())).getFirst()); + List appended = kerl.append(Collections.singletonList(event.toKeyEvent_())); + if (appended.isEmpty()) { + return null; + } + KeyState_ published = appended.getFirst(); + return published.equals(KeyState_.getDefaultInstance()) + ? null : new KeyStateImpl(published); } @Override diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKEL.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKEL.java index 41d608c1d..24776b6f9 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKEL.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKEL.java @@ -50,8 +50,7 @@ public CachingKEL(Function, ?> kelSupplier) { this(kelSupplier, defaultKsCoordsBuilder(), defaultEventCoordsBuilder()); } - public CachingKEL(Function, ?> kelSupplier, Caffeine builder, - Caffeine eventBuilder) { + public CachingKEL(Function, ?> kelSupplier, Caffeine builder, Caffeine eventBuilder) { ksCoords = builder.build(new CacheLoader() { @@ -71,26 +70,19 @@ public CachingKEL(Function, ?> kelSupplier, Caffeine defaultEventCoordsBuilder() { - return Caffeine.newBuilder() - .maximumSize(10_000) - .expireAfterWrite(Duration.ofMinutes(10)) - .removalListener((EventCoordinates coords, KeyEvent e, - RemovalCause cause) -> log.trace("KeyEvent {} was removed ({})", coords, - cause)); + return Caffeine.newBuilder().maximumSize(10_000).expireAfterWrite(Duration.ofMinutes(10)).removalListener((EventCoordinates coords, KeyEvent e, RemovalCause cause) -> log.trace("KeyEvent {} was removed ({})", coords, cause)); } public static Caffeine defaultKsCoordsBuilder() { - return Caffeine.newBuilder() - .maximumSize(10_000) - .expireAfterWrite(Duration.ofMinutes(10)) - .removalListener((EventCoordinates coords, KeyState ks, - RemovalCause cause) -> log.trace("KeyState {} was removed ({})", coords, - cause)); + return Caffeine.newBuilder().maximumSize(10_000).expireAfterWrite(Duration.ofMinutes(10)).removalListener((EventCoordinates coords, KeyState ks, RemovalCause cause) -> log.trace("KeyState {} was removed ({})", coords, cause)); } public KeyState append(KeyEvent event) { try { return complete(kel -> kel.append(event)); + } catch (Throwable e) { + log.error("Cannot complete append", e); + return null; } finally { keyCoords.invalidate(event.getCoordinates()); } @@ -103,6 +95,12 @@ public List append(KeyEvent... events) { } try { return complete(kel -> kel.append(events)); + } catch (ClassCastException e) { + log.error("Cannot complete append", e); + return null; + } catch (Throwable e) { + log.error("Cannot complete append", e); + return null; } finally { for (var event : events) { keyCoords.invalidate(event.getCoordinates()); @@ -115,17 +113,32 @@ public List append(List events, List attach if (events.isEmpty() && attachments.isEmpty()) { return Collections.emptyList(); } - return complete(kel -> kel.append(events, attachments)); + try { + return complete(kel -> kel.append(events, attachments)); + } catch (Throwable e) { + log.error("Cannot complete append", e); + return null; + } } @Override public Attachment getAttachment(EventCoordinates coordinates) { - return complete(kel -> kel.getAttachment(coordinates)); + try { + return complete(kel -> kel.getAttachment(coordinates)); + } catch (Throwable e) { + log.error("Cannot complete append", e); + return null; + } } @Override public DigestAlgorithm getDigestAlgorithm() { - return complete(kel -> kel.getDigestAlgorithm()); + try { + return complete(kel -> kel.getDigestAlgorithm()); + } catch (Throwable e) { + log.error("Cannot complete append", e); + return null; + } } @Override @@ -140,21 +153,46 @@ public KeyState getKeyState(EventCoordinates coordinates) { @Override public KeyState getKeyState(Identifier identifier) { - return complete(kel -> kel.getKeyState(identifier)); + try { + return complete(kel -> kel.getKeyState(identifier)); + } catch (Throwable e) { + log.error("Cannot complete append", e); + return null; + } } @Override public KeyStateWithAttachments getKeyStateWithAttachments(EventCoordinates coordinates) { - return complete(kel -> kel.getKeyStateWithAttachments(coordinates)); + try { + return complete(kel -> kel.getKeyStateWithAttachments(coordinates)); + } catch (Throwable e) { + log.error("Cannot complete append", e); + return null; + } } @Override public Verifier.DefaultVerifier getVerifier(KeyCoordinates coordinates) { - return complete(kel -> kel.getVerifier(coordinates)); + try { + return complete(kel -> kel.getVerifier(coordinates)); + } catch (Throwable e) { + log.error("Cannot complete append", e); + return null; + } } protected T complete(Function func) { - @SuppressWarnings("unchecked") final var result = (T) kelSupplier.apply(func); - return result; + try { + @SuppressWarnings("unchecked") final var result = (T) kelSupplier.apply(func); + return result; + } catch (Throwable t) { + log.error("Error completing cache", t); + return null; + } + } + + public void clear() { + keyCoords.invalidateAll(); + ksCoords.invalidateAll(); } } diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKERL.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKERL.java index 80a3491d6..efa033073 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKERL.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKERL.java @@ -15,6 +15,8 @@ import com.salesforce.apollo.stereotomy.event.AttachmentEvent; import com.salesforce.apollo.stereotomy.event.KeyEvent; import com.salesforce.apollo.stereotomy.identifier.Identifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; @@ -24,6 +26,7 @@ * @author hal.hildebrand */ public class CachingKERL extends CachingKEL implements KERL { + private static final Logger log = LoggerFactory.getLogger(CachingKERL.class); public CachingKERL(Function, ?> kelSupplier) { super(kelSupplier); @@ -36,24 +39,43 @@ public CachingKERL(Function, ?> kelSupplier, Caffeine event) { - complete(kerl -> kerl.append(event)); + try { + complete(kerl -> kerl.append(event)); + } catch (Throwable e) { + log.error("Cannot complete append", e); + return null; + } return null; } @Override public Void appendValidations(EventCoordinates coordinates, Map validations) { - return complete(kerl -> kerl.appendValidations(coordinates, validations)); + try { + return complete(kerl -> kerl.appendValidations(coordinates, validations)); + } catch (Throwable e) { + log.error("Cannot complete append", e); + return null; + } } @Override public Map getValidations(EventCoordinates coordinates) { - return complete(kerl -> kerl.getValidations(coordinates)); + try { + return complete(kerl -> kerl.getValidations(coordinates)); + } catch (Throwable e) { + log.error("Cannot complete getValidations", e); + return null; + } } @Override public List kerl(Identifier identifier) { - return complete(kerl -> kerl.kerl(identifier)); + try { + return complete(kerl -> kerl.kerl(identifier)); + } catch (Throwable e) { + log.error("Cannot complete kerl", e); + return null; + } } - } diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java index 237181271..4f386b520 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java @@ -26,6 +26,7 @@ import com.salesforce.apollo.stereotomy.event.protobuf.ProtobufEventFactory; import com.salesforce.apollo.stereotomy.identifier.Identifier; import com.salesforce.apollo.stereotomy.processing.KeyEventProcessor; +import org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException; import org.jooq.DSLContext; import org.jooq.Record1; import org.jooq.exception.DataAccessException; @@ -163,12 +164,20 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState, final var identBytes = event.getIdentifier().toIdent().toByteArray(); - context.mergeInto(IDENTIFIER) - .using(context.selectOne()) - .on(IDENTIFIER.PREFIX.eq(identBytes)) - .whenNotMatchedThenInsert(IDENTIFIER.PREFIX) - .values(identBytes) - .execute(); + try { + context.mergeInto(IDENTIFIER) + .using(context.selectOne()) + .on(IDENTIFIER.PREFIX.eq(identBytes)) + .whenNotMatchedThenInsert(IDENTIFIER.PREFIX) + .values(identBytes) + .execute(); + } catch (DataAccessException e) { + if (e.getCause() instanceof JdbcSQLIntegrityConstraintViolationException icv) { + log.info("Constraint violation ignored: {}", icv.toString()); + } else { + throw e; + } + } var identifierId = context.select(IDENTIFIER.ID) .from(IDENTIFIER) @@ -186,6 +195,7 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState, .fetchOne() .value1(); } catch (DataAccessException e) { + log.info("already published: {} : {}", event.getCoordinates(), e.toString()); // Already exists var coordinates = event.getCoordinates(); id = context.select(COORDINATES.ID) @@ -210,8 +220,8 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState, .execute(); } catch (DataAccessException e) { // ignore + log.info("already inserted event: {} : {}",e, e.toString()); } - log.trace("Inserted event: {}", event); context.mergeInto(CURRENT_KEY_STATE) .using(context.selectOne()) .on(CURRENT_KEY_STATE.IDENTIFIER.eq(identifierId.value1())) @@ -221,7 +231,7 @@ public static void append(DSLContext context, KeyEvent event, KeyState newState, .set(CURRENT_KEY_STATE.IDENTIFIER, identifierId.value1()) .set(CURRENT_KEY_STATE.CURRENT, id) .execute(); - log.trace("Inserted key state: {}", event); + log.info("Inserted key state: {}", event); } public static void appendAttachments(Connection connection, List attachments) { diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/services/proto/ProtoKERLAdapter.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/services/proto/ProtoKERLAdapter.java index 96a7047c9..6fb78d88f 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/services/proto/ProtoKERLAdapter.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/services/proto/ProtoKERLAdapter.java @@ -23,6 +23,7 @@ import com.salesforce.apollo.stereotomy.identifier.Identifier; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -56,7 +57,8 @@ public List append(List keyEventList) { for (KeyEvent event : keyEventList.stream().map(ke -> ProtobufEventFactory.from(ke)).toList()) { events[i++] = event; } - return kerl.append(events).stream().map(ks -> ks == null ? KeyState_.getDefaultInstance() : ks.toKeyState_()).toList(); + List keyStates = kerl.append(events); + return keyStates == null ? Collections.emptyList() : (keyStates.stream().map(ks -> ks == null ? KeyState_.getDefaultInstance() : ks.toKeyState_()).toList()); } @Override diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/DirectPublisher.java b/thoth/src/main/java/com/salesforce/apollo/thoth/DirectPublisher.java index 1541331d4..aa71d15f9 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/DirectPublisher.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/DirectPublisher.java @@ -29,6 +29,7 @@ public DirectPublisher(ProtoKERLAdapter kerl) { @Override public void publish(KERL_ kerl_, List validations) { validations.stream().forEach(v -> kerl.appendValidations(v)); + kerl.append(kerl_); } @Override diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index eb1b038c6..4f7c53473 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -90,7 +90,7 @@ public class KerlDHT implements ProtoKERLService { private final static Logger log = LoggerFactory.getLogger(KerlDHT.class); private final Ani ani; - private final KERL cache; + private final CachingKERL cache; private final JdbcConnectionPool connectionPool; private final Context context; private final CommonCommunications dhtComms; @@ -117,7 +117,14 @@ public KerlDHT(Duration frequency, Context context, SigningMem this.fpr = falsePositiveRate; this.frequency = frequency; this.scheduler = scheduler; - this.cache = new CachingKERL(f -> f.apply(new KERLAdapter(this, digestAlgorithm()))); + this.cache = new CachingKERL(f -> { + try { + return f.apply(new KERLAdapter(this, digestAlgorithm())); + } catch (Throwable t) { + log.error("error applying cache", t); + return null; + } + }); dhtComms = communications.create(member, context.getId(), service, service.getClass().getCanonicalName(), r -> new DhtServer(r, metrics), DhtClient.getCreate(metrics), DhtClient.getLocalLoopback(service, member)); reconcileComms = communications.create(member, context.getId(), reconciliation, reconciliation.getClass().getCanonicalName(), r -> new ReconciliationServer(r, communications.getClientIdentityProvider(), metrics), ReconciliationClient.getCreate(context.getId(), metrics), ReconciliationClient.getLocalLoopback(reconciliation, member)); this.connectionPool = connectionPool; @@ -131,7 +138,8 @@ public KerlDHT(Duration frequency, Context context, SigningMem try (var k = kerlPool.create()) { return f.apply(wrap.apply(this, wrap(k))); } catch (Throwable e) { - return completeExceptionally(e); + log.error("Cannot apply kerl", e); + return null; } }); this.ani = new Ani(member.getId(), asKERL()); @@ -141,12 +149,6 @@ public KerlDHT(Duration frequency, Context context, SigningMem this(frequency, context, member, (t, k) -> k, connectionPool, digestAlgorithm, communications, executor, timeout, scheduler, falsePositiveRate, metrics); } - public static CompletableFuture completeExceptionally(Throwable t) { - var fs = new CompletableFuture(); - fs.completeExceptionally(t); - return fs; - } - public static void updateLocationHash(Identifier identifier, DigestAlgorithm digestAlgorithm, DSLContext dsl) { dsl.transaction(config -> { var context = DSL.using(config); @@ -166,6 +168,13 @@ static T completeIt(T result) { return result; } + /** + * Clear the caches of the receiver + */ + public void clearCache() { + cache.clear(); + } + public KeyState_ append(AttachmentEvent event) { if (event == null) { return null; @@ -181,7 +190,8 @@ public KeyState_ append(AttachmentEvent event) { HashMultiset gathered = HashMultiset.create(); new RingIterator<>(frequency, context, member, scheduler, dhtComms, executor).noDuplicates().iterate(identifier, null, (link, r) -> link.append(Collections.emptyList(), Collections.singletonList(event)), null, (tally, futureSailor, destination) -> mutate(gathered, futureSailor, identifier, isTimedOut, tally, destination, "append events"), t -> completeIt(result, gathered)); try { - return result.get().getKeyStatesList().getFirst(); + List s = result.get().getKeyStatesList(); + return s.isEmpty() ? null : s.getFirst(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; @@ -547,14 +557,23 @@ public void stop() { } private T complete(Function func) { - return func.apply(new ProtoKERLAdapter(kerl)); + try { + return func.apply(new ProtoKERLAdapter(kerl)); + } catch (Throwable t) { + log.error("Error completing", t); + return null; + } } private void completeIt(CompletableFuture result, HashMultiset gathered) { var max = gathered.entrySet().stream().max(Ordering.natural().onResultOf(Multiset.Entry::getCount)).orElse(null); if (max != null) { if (max.getCount() >= context.majority()) { - result.complete(max.getElement()); + try { + result.complete(max.getElement()); + } catch (Throwable t) { + log.error("Unable to complete it", t); + } return; } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java b/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java index 2b1060192..cd643557c 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java @@ -27,9 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static com.salesforce.apollo.stereotomy.event.protobuf.ProtobufEventFactory.digestOf; @@ -67,28 +65,19 @@ public List append(List events, List attach final List filtered = events.stream().filter(e -> { if (e instanceof EstablishmentEvent est && est.getCoordinates().getSequenceNumber().equals(ULong.valueOf(0))) { - try { - return validate(est).get(); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - } catch (ExecutionException e1) { - log.error("error validating: {}", est.getCoordinates(), e1.getCause()); - return false; - } + return validate(est); } return true; }).toList(); return filtered.isEmpty() && attachments.isEmpty() ? Collections.emptyList() : super.append(filtered, attachments); } - public CompletableFuture validate(EstablishmentEvent event) { + public boolean validate(EstablishmentEvent event) { Digest digest; if (event.getIdentifier() instanceof SelfAddressingIdentifier said) { digest = said.getDigest(); } else { - final CompletableFuture fs = new CompletableFuture(); - fs.complete(false); - return fs; + return false; } final Context ctx = context; var successors = Context.uniqueSuccessors(ctx, digestOf(event.getIdentifier().toIdent(), digest.getAlgorithm())) @@ -122,30 +111,29 @@ record validator(EstablishmentEvent validating, JohnHancock signature) { } return event; }).toList(); - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(e -> { - log.trace("Evaluating validation of: {} validations: {} mapped: {}", event.getCoordinates(), - validations.size(), mapped.size()); - if (mapped.size() == 0) { - log.warn("No validations of: {} ", event.getCoordinates()); - return false; - } - var verified = 0; - for (var r : mapped) { - var verifier = new DefaultVerifier(r.validating.getKeys().get(0)); - if (verifier.verify(r.signature, serialized)) { - verified++; - } else { - log.trace("Cannot verify sig: {} of: {} by: {}", r.signature, event.getCoordinates(), - r.validating.getIdentifier()); - } + log.trace("Evaluating validation of: {} validations: {} mapped: {}", event.getCoordinates(), + validations.size(), mapped.size()); + if (mapped.size() == 0) { + log.warn("No validations of: {} ", event.getCoordinates()); + return false; + } + + var verified = 0; + for (var r : mapped) { + var verifier = new DefaultVerifier(r.validating.getKeys().get(0)); + if (verifier.verify(r.signature, serialized)) { + verified++; + } else { + log.trace("Cannot verify sig: {} of: {} by: {}", r.signature, event.getCoordinates(), + r.validating.getIdentifier()); } - var validated = verified >= context.majority(); + } + var validated = verified >= context.majority(); - log.trace("Validated: {} valid: {} out of: {} required: {} for: {} ", validated, verified, - mapped.size(), ctx.majority(), event.getCoordinates()); - return validated; - }); + log.trace("Validated: {} valid: {} out of: {} required: {} for: {} ", validated, verified, + mapped.size(), ctx.majority(), event.getCoordinates()); + return validated; } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtServer.java b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtServer.java index 224aed0cf..c429d66cf 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtServer.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/grpc/dht/DhtServer.java @@ -206,7 +206,7 @@ public void getKERL(Ident request, StreamObserver responseObserver) { var kerl = response == null ? KERL_.getDefaultInstance() : response; responseObserver.onNext(kerl); responseObserver.onCompleted(); - if (metrics == null) { + if (metrics != null) { final var serializedSize = kerl.getSerializedSize(); metrics.outboundBandwidth().mark(serializedSize); metrics.outboundGetKERLResponse().mark(serializedSize); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java index 017c2314d..68e023e03 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java @@ -21,6 +21,7 @@ import com.salesforce.apollo.gorgoneion.comm.admissions.AdmissionsService; import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember; import com.salesforce.apollo.stereotomy.KERL; +import com.salesforce.apollo.stereotomy.KeyState; import com.salesforce.apollo.stereotomy.StereotomyImpl; import com.salesforce.apollo.stereotomy.mem.MemKERL; import com.salesforce.apollo.stereotomy.mem.MemKeyStore; @@ -33,7 +34,6 @@ import java.time.Duration; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Function; @@ -72,7 +72,8 @@ context, new DirectPublisher(new ProtoKERLAdapter(k)), r, Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory()), null, exec); }).toList(); - final KERL testKerl = dhts.values().stream().findFirst().get().asKERL(); + final var dht = (KerlDHT) dhts.values().stream().findFirst().get(); + final KERL testKerl = dht.asKERL(); var entropy = SecureRandom.getInstance("SHA1PRNG"); entropy.setSeed(new byte[]{7, 7, 7}); var clientKerl = new MemKERL(DigestAlgorithm.DEFAULT); @@ -104,7 +105,9 @@ context, new DirectPublisher(new ProtoKERLAdapter(k)), r, testKerl.getKeyEvent(client.getEvent().getCoordinates()); // Verify we can't publish without correct validation - testKerl.append(client.getEvent()); + KeyState ks = testKerl.append(client.getEvent()); + assertNull(ks); + dht.clearCache(); var gorgoneionClient = new GorgoneionClient(client, attester, Clock.systemUTC(), admin); @@ -113,10 +116,10 @@ context, new DirectPublisher(new ProtoKERLAdapter(k)), r, assertNotEquals(Validations.getDefaultInstance(), invitation); assertTrue(invitation.getValidationsCount() >= context.majority()); -// Thread.sleep(1000); + Thread.sleep(3000); // Verify client KERL published - var ks = testKerl.getKeyEvent(client.getEvent().getCoordinates()); - assertNotNull(ks); + var keyS = testKerl.getKeyEvent(client.getEvent().getCoordinates()); + assertNotNull(keyS); admin.close(); }