diff --git a/cryptography/src/main/java/com/salesforce/apollo/cryptography/JohnHancock.java b/cryptography/src/main/java/com/salesforce/apollo/cryptography/JohnHancock.java index 809839b4d..b0a3fbf00 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/cryptography/JohnHancock.java +++ b/cryptography/src/main/java/com/salesforce/apollo/cryptography/JohnHancock.java @@ -59,15 +59,18 @@ public static JohnHancock of(Sig signature) { return new JohnHancock(signature); } + public static JohnHancock nullSignature(SignatureAlgorithm algorithm) { + return new JohnHancock(algorithm, new byte[0][], ULong.valueOf(0)); + } + @Override public boolean equals(Object obj) { if (this == obj) { return true; } - if (!(obj instanceof JohnHancock)) { + if (!(obj instanceof JohnHancock other)) { return false; } - JohnHancock other = (JohnHancock) obj; return algorithm == other.algorithm && Arrays.equals(bytes, other.bytes); } diff --git a/cryptography/src/main/java/com/salesforce/apollo/cryptography/SignatureAlgorithm.java b/cryptography/src/main/java/com/salesforce/apollo/cryptography/SignatureAlgorithm.java index 5965ff6e0..1e6f868a0 100644 --- a/cryptography/src/main/java/com/salesforce/apollo/cryptography/SignatureAlgorithm.java +++ b/cryptography/src/main/java/com/salesforce/apollo/cryptography/SignatureAlgorithm.java @@ -300,6 +300,10 @@ private static SignatureAlgorithm lookupEd(NamedParameterSpec params) { abstract public KeyPair generateKeyPair(SecureRandom secureRandom); + public JohnHancock nullSignature() { + return JohnHancock.nullSignature(this); + } + abstract public PublicKey publicKey(byte[] bytes); abstract public int publicKeyLength(); diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java index 30b290fdf..6c23dfbec 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/Binding.java @@ -265,47 +265,57 @@ private void join(Redirect redirect, Digest v, Duration duration) { final var join = join(v); final var abandon = new AtomicInteger(); var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); - regate.set(() -> redirecting.iterate((link, m) -> { - log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId()); - try { - var g = link.join(join, params.seedingTimeout()); - if (g == null || g.equals(Gateway.getDefaultInstance())) { - log.info("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId()); + regate.set(() -> { + if (!view.started.get()) { + return; + } + redirecting.iterate((link, m) -> { + if (!view.started.get()) { + return null; + } + log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId()); + try { + var g = link.join(join, params.seedingTimeout()); + if (g == null || g.equals(Gateway.getDefaultInstance())) { + log.info("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId()); + abandon.incrementAndGet(); + return null; + } + return g; + } catch (StatusRuntimeException sre) { + gatewaySRE(v, link, sre, abandon); + return null; + } catch (Throwable t) { + log.info("Gateway view: {} error: {} from: {} on: {}", v, t, link.getMember().getId(), + node.getId()); abandon.incrementAndGet(); return null; } - return g; - } catch (StatusRuntimeException sre) { - gatewaySRE(v, link, sre, abandon); - return null; - } catch (Throwable t) { - log.info("Gateway view: {} error: {} from: {} on: {}", v, t, link.getMember().getId(), node.getId()); - abandon.incrementAndGet(); - return null; - } - }, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, trusts, initialSeedSet, v, - majority), () -> { - if (gateway.isDone()) { - return; - } - if (abandon.get() >= majority) { - log.info("Abandoning Gateway view: {} reseeding on: {}", v, node.getId()); - seeding(); - } else { - abandon.set(0); - if (retries.get() < params.joinRetries()) { - log.info("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(), - params.joinRetries(), node.getId()); - trusts.clear(); - initialSeedSet.clear(); - scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(regate.get(), log)), - Entropy.nextBitsStreamLong(params.retryDelay().toNanos()), TimeUnit.NANOSECONDS); + }, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, trusts, + initialSeedSet, v, majority), () -> { + if (!view.started.get() || gateway.isDone()) { + return; + } + if (abandon.get() >= majority) { + log.info("Abandoning Gateway view: {} reseeding on: {}", v, node.getId()); + seeding(); } else { - log.error("Failed to join view: {} cannot obtain majority Gateway on: {}", view, node.getId()); - view.stop(); + abandon.set(0); + if (retries.get() < params.joinRetries()) { + log.info("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(), + params.joinRetries(), node.getId()); + trusts.clear(); + initialSeedSet.clear(); + scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(regate.get(), log)), + Entropy.nextBitsStreamLong(params.retryDelay().toNanos()), + TimeUnit.NANOSECONDS); + } else { + log.error("Failed to join view: {} cannot obtain majority Gateway on: {}", view, node.getId()); + view.stop(); + } } - } - }, scheduler, params.retryDelay())); + }, scheduler, params.retryDelay()); + }); regate.get().run(); } diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 006cd320c..e82e8bfd4 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -83,6 +83,7 @@ public class View { View.class); private static final String SCHEDULED_VIEW_CHANGE = "Scheduled View Change"; final CommonCommunications comm; + final AtomicBoolean started = new AtomicBoolean(); private final CommonCommunications approaches; private final Context context; private final DigestAlgorithm digestAlgo; @@ -96,7 +97,6 @@ public class View { private final ConcurrentMap pendingRebuttals = new ConcurrentSkipListMap<>(); private final RoundScheduler roundTimers; private final Set shunned = new ConcurrentSkipListSet<>(); - private final AtomicBoolean started = new AtomicBoolean(); private final Map timers = new HashMap<>(); private final ReadWriteLock viewChange = new ReentrantReadWriteLock( true); @@ -338,6 +338,9 @@ Digest currentView() { * Finalize the view change */ void finalizeViewChange() { + if (!started.get()) { + return; + } viewChange(() -> { final var cardinality = context.memberCount(); final var superMajority = cardinality - ((cardinality - 1) / 4); @@ -499,6 +502,9 @@ void scheduleFinalizeViewChange() { void scheduleFinalizeViewChange(final int finalizeViewRounds) { // log.trace("View change finalization scheduled: {} rounds for: {} joining: {} leaving: {} on: {}", // finalizeViewRounds, currentView(), joins.size(), context.getOffline().size(), node.getId()); + if (!started.get()) { + return; + } timers.put(FINALIZE_VIEW_CHANGE, roundTimers.schedule(FINALIZE_VIEW_CHANGE, this::finalizeViewChange, finalizeViewRounds)); } @@ -510,6 +516,9 @@ void scheduleViewChange() { void scheduleViewChange(final int viewChangeRounds) { // log.trace("Schedule view change: {} rounds for: {} on: {}", viewChangeRounds, currentView(), // node.getId()); + if (!started.get()) { + return; + } timers.put(SCHEDULED_VIEW_CHANGE, roundTimers.schedule(SCHEDULED_VIEW_CHANGE, viewManagement::maybeViewChange, viewChangeRounds)); } @@ -620,15 +629,18 @@ protected Gossip gossip(Fireflies link, int ring) { node.getId()); break; case RESOURCE_EXHAUSTED: - log.trace("Unavailable for gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), - p.getId(), node.getId()); + log.trace("Resource exhausted for gossip: {} view: {} from: {} on: {}", sre.getStatus(), + currentView(), p.getId(), node.getId()); break; case CANCELLED: log.trace("Communication cancelled for gossip view: {} from: {} on: {}", currentView(), p.getId(), node.getId()); break; + case UNAVAILABLE: + accuse(p, ring, sre); + break; default: - log.debug("Error gossiping: {} view: {} from: {} on: {}", sre.getStatus(), p.getId(), currentView(), + log.debug("Error gossiping: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(), node.getId()); accuse(p, ring, sre); break; @@ -811,11 +823,13 @@ private boolean add(SignedViewChange observation) { } var currentObservation = observations.get(observer); if (currentObservation != null) { - if (observation.getChange().getAttempt() <= currentObservation.getChange().getAttempt()) { + if (observation.getChange().getAttempt() < currentObservation.getChange().getAttempt()) { log.trace("Stale observation: {} current: {} view change: {} current: {} offline: {} on: {}", observation.getChange().getAttempt(), currentObservation.getChange().getAttempt(), inView, currentView(), observer, node.getId()); return false; + } else if (observation.getChange().getAttempt() < currentObservation.getChange().getAttempt()) { + return false; } } final var member = context.getActiveMember(observer); @@ -1890,7 +1904,7 @@ public void join(Join join, Digest from, StreamObserver responseObserve @Override public Gossip rumors(SayWhat request, Digest from) { if (!introduced.get()) { - log.trace("Not introduced!, ring: {} from: {} on: {}", request.getRing(), from, node.getId()); + log.trace("Not introduced!, ring: {} on: {}", request.getRing(), node.getId()); throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription( "Not introduced!, ring: %s from: %s on: %s".formatted(request.getRing(), from, node.getId()))); } diff --git a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java index 028927e5e..f49810ef1 100644 --- a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java +++ b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java @@ -195,6 +195,9 @@ private SignedNonce generateNonce(KERL_ application) { Thread.currentThread().interrupt(); return null; } catch (ExecutionException e) { + if (e.getCause() instanceof StatusRuntimeException sre) { + throw sre; + } throw new RuntimeException(e.getCause()); } } @@ -314,7 +317,14 @@ public void apply(KERL_ request, Digest from, StreamObserver respon new StatusRuntimeException(Status.UNAUTHENTICATED.withDescription("Invalid application"))); return; } - SignedNonce sn = generateNonce(request); + SignedNonce sn; + try { + sn = generateNonce(request); + } catch (StatusRuntimeException sre) { + responseObserver.onError(sre); + return; + } + if (sn == null) { responseObserver.onError( new StatusRuntimeException(Status.UNAUTHENTICATED.withDescription("Invalid application"))); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/ManagedServerChannel.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/ManagedServerChannel.java index 01cc45e07..aa799d4d9 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/ManagedServerChannel.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/ManagedServerChannel.java @@ -7,7 +7,6 @@ package com.salesforce.apollo.archipelago; import com.google.common.base.MoreObjects; -import com.salesforce.apollo.archipelago.ServerConnectionCache.ReleasableManagedChannel; import com.salesforce.apollo.cryptography.Digest; import com.salesforce.apollo.membership.Member; import io.grpc.*; @@ -22,10 +21,10 @@ public class ManagedServerChannel extends ManagedChannel { private final static Logger log = LoggerFactory.getLogger(ManagedServerChannel.class); - private final Digest context; - private final ReleasableManagedChannel delegate; + private final Digest context; + private final Releasable delegate; - ManagedServerChannel(Digest context, ReleasableManagedChannel delegate) { + ManagedServerChannel(Digest context, Releasable delegate) { this.context = context; this.delegate = delegate; } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java index 8d1eeff41..5363d3d38 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java @@ -18,14 +18,11 @@ import io.netty.handler.ssl.ClientAuth; import java.net.SocketAddress; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; /** * @author hal.hildebrand */ public class MtlsClient { - private final Executor exec = Executors.newVirtualThreadPerTaskExecutor(); private final ManagedChannel channel; @@ -34,7 +31,6 @@ public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, Cl Limiter limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build(); channel = NettyChannelBuilder.forAddress(address) - .executor(exec) .sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3)) .intercept(new ConcurrencyLimitClientInterceptor(limiter, () -> Status.RESOURCE_EXHAUSTED.withDescription( diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java index cc61fee23..24b9a227b 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsServer.java @@ -39,6 +39,7 @@ import java.security.Security; import java.security.cert.X509Certificate; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.function.Function; import java.util.function.Supplier; @@ -55,6 +56,7 @@ public class MtlsServer implements RouterSupplier { private final Member from; private final Context.Key sslSessionContext = Context.key("SSLSession"); private final ServerContextSupplier supplier; + private final Executor executor; public MtlsServer(Member from, EndpointProvider epProvider, Function contextSupplier, ServerContextSupplier supplier) { @@ -62,6 +64,7 @@ public MtlsServer(Member from, EndpointProvider epProvider, Function() { @Override public Digest load(X509Certificate key) throws Exception { @@ -137,7 +140,6 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier
  • { - private final AtomicInteger borrowed = new AtomicInteger(); + class ReleasableManagedChannel implements Comparable, Releasable { + private final AtomicInteger borrowed = new AtomicInteger(); private final ManagedChannel channel; private final Instant created; private final Member member; - private final AtomicInteger usageCount = new AtomicInteger(); private final Digest from; private volatile Instant lastUsed; @@ -306,7 +305,7 @@ public ReleasableManagedChannel(Member member, ManagedChannel channel, Digest fr @Override public int compareTo(ReleasableManagedChannel o) { - return Integer.compare(usageCount.get(), o.usageCount.get()); + return Integer.compare(borrowed.get(), o.borrowed.get()); } @Override @@ -318,14 +317,17 @@ public boolean equals(Object obj) { return member.equals(((ReleasableManagedChannel) obj).member); } + @Override public ManagedChannel getChannel() { return channel; } + @Override public Digest getFrom() { return from; } + @Override public Member getMember() { return member; } @@ -339,15 +341,18 @@ public boolean isCloseable() { return lastUsed.plus(minIdle).isBefore(Instant.now(clock)); } + @Override public void release() { log.trace("Release connection to: {} on: {}", getMember().getId(), getFrom()); ServerConnectionCache.this.release(this); } + @Override public ManagedChannel shutdown() { throw new IllegalStateException("Should not be called"); } + @Override public ManagedChannel shutdownNow() { throw new IllegalStateException("Should not be called"); } @@ -361,7 +366,6 @@ private boolean decrementBorrow() { } private boolean incrementBorrow() { - usageCount.incrementAndGet(); return borrowed.incrementAndGet() == 1; } } diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/stereotomy/ControlledIdentifierMember.java b/memberships/src/main/java/com/salesforce/apollo/membership/stereotomy/ControlledIdentifierMember.java index 5ff68b7b1..af815d58c 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/stereotomy/ControlledIdentifierMember.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/stereotomy/ControlledIdentifierMember.java @@ -6,7 +6,6 @@ */ package com.salesforce.apollo.membership.stereotomy; -import com.salesforce.apollo.stereotomy.event.proto.KERL_; import com.salesforce.apollo.cryptography.*; import com.salesforce.apollo.cryptography.cert.CertificateWithPrivateKey; import com.salesforce.apollo.membership.Member; @@ -14,7 +13,9 @@ import com.salesforce.apollo.stereotomy.ControlledIdentifier; import com.salesforce.apollo.stereotomy.KERL.EventWithAttachments; import com.salesforce.apollo.stereotomy.event.EstablishmentEvent; +import com.salesforce.apollo.stereotomy.event.proto.KERL_; import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; +import org.slf4j.LoggerFactory; import java.io.InputStream; import java.time.Duration; @@ -36,8 +37,7 @@ public ControlledIdentifierMember(ControlledIdentifier @Override public SignatureAlgorithm algorithm() { - Signer signer = identifier.getSigner(); - return signer.algorithm(); + return identifier.algorithm(); } @Override @@ -95,6 +95,10 @@ public KERL_ kerl() { @Override public JohnHancock sign(InputStream message) { Signer signer = identifier.getSigner(); + if (signer == null) { + LoggerFactory.getLogger(ControlledIdentifierMember.class).warn("Null signer for: {}", getId()); + return algorithm().nullSignature(); + } return signer.sign(message); } diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java index ea90e9102..f6e229396 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java @@ -41,16 +41,19 @@ * @author hal.hildebrand */ public class ProcessDomain extends Domain { - private final static Logger log = LoggerFactory.getLogger(ProcessDomain.class); - - protected final KerlDHT dht; - protected final View foundation; - private final UUID listener; - - public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDomainParameters parameters, + private final static Logger log = LoggerFactory.getLogger(ProcessDomain.class); + protected final KerlDHT dht; + protected final View foundation; + private final UUID listener; + private final EventValidation.DelegatedValidation validations; + private final Verifiers.DelegatedVerifiers verifiers; + private final ProcessDomainParameters parameters; + + public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDomainParameters pdParams, Builder builder, Parameters.RuntimeParameters.Builder runtime, InetSocketAddress endpoint, com.salesforce.apollo.fireflies.Parameters.Builder ff, StereotomyMetrics stereotomyMetrics) { - super(member, builder, parameters.dbURL, parameters.checkpointBaseDir, runtime); + super(member, builder, pdParams.dbURL, pdParams.checkpointBaseDir, runtime); + parameters = pdParams; var base = Context.newBuilder() .setBias(parameters.dhtBias) .setpByz(parameters.dhtPbyz) @@ -61,10 +64,9 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDom dht = new KerlDHT(parameters.dhtOpsFrequency, params.context(), member, connectionPool, params.digestAlgorithm(), params.communications(), parameters.dhtOperationsTimeout, parameters.dhtFpr, stereotomyMetrics); - var mock = true; - var validation = mock ? EventValidation.NONE : dht.getAni().eventValidation(parameters.dhtEventValidTO); - var verifiers = mock ? Verifiers.NONE : dht.getVerifiers(); - this.foundation = new View(base, getMember(), endpoint, validation, verifiers, params.communications(), + validations = new EventValidation.DelegatedValidation(EventValidation.NONE); + verifiers = new Verifiers.DelegatedVerifiers(Verifiers.NONE); + this.foundation = new View(base, getMember(), endpoint, validations, verifiers, params.communications(), ff.build(), DigestAlgorithm.DEFAULT, null); listener = foundation.register(listener()); } @@ -81,6 +83,22 @@ public CertificateWithPrivateKey provision(Duration duration, SignatureAlgorithm return member.getIdentifier().provision(Instant.now(), duration, signatureAlgorithm); } + public void setAniValidations() { + validations.setDelegate(dht.getAni().eventValidation(parameters.dhtEventValidTO)); + } + + public void setDhtVerifiers() { + verifiers.setDelegate(dht.getVerifiers()); + } + + public void setValidationsNONE() { + validations.setDelegate(EventValidation.NONE); + } + + public void setVerifiersNONE() { + verifiers.setDelegate(Verifiers.NONE); + } + @Override public void start() { startServices(); @@ -118,6 +136,7 @@ protected void startServices() { protected void stopServices() { dht.stop(); + foundation.stop(); } public record ProcessDomainParameters(String dbURL, Duration dhtOperationsTimeout, String dhtDbUrl, diff --git a/pom.xml b/pom.xml index 0fe5dc06d..e4dbd549a 100644 --- a/pom.xml +++ b/pom.xml @@ -36,13 +36,13 @@ 1.9.5 2.15.2 - 4.0.0-beta.3 + 4.0.5 2.2.220 3.17.2 1.74 1.4.12 - 1.60.1 - 3.25.1 + 1.61.0 + 3.25.2 4.8.0 4.1.100.Final 0.9.27 diff --git a/protocols/pom.xml b/protocols/pom.xml index 4bfabe235..2d5f68740 100644 --- a/protocols/pom.xml +++ b/protocols/pom.xml @@ -33,6 +33,10 @@ com.salesforce.apollo cryptography + + io.grpc + grpc-core + io.grpc grpc-netty diff --git a/stereotomy/pom.xml b/stereotomy/pom.xml index 7f99a7040..708f03d66 100644 --- a/stereotomy/pom.xml +++ b/stereotomy/pom.xml @@ -43,6 +43,11 @@ com.github.ben-manes.caffeine caffeine + + io.dropwizard + dropwizard-metrics + true + diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/ControlledIdentifier.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/ControlledIdentifier.java index 074fef3ab..867212c7d 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/ControlledIdentifier.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/ControlledIdentifier.java @@ -31,6 +31,8 @@ * @author hal.hildebrand */ public interface ControlledIdentifier extends BoundIdentifier { + SignatureAlgorithm algorithm(); + /** * @return the binding of the identifier to the current key state */ diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/EventValidation.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/EventValidation.java index c02ddf89e..3227d7841 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/EventValidation.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/EventValidation.java @@ -48,4 +48,26 @@ public boolean validate(Identifier identifier) { boolean validate(EstablishmentEvent event); boolean validate(Identifier identifier); + + class DelegatedValidation implements EventValidation { + private volatile EventValidation delegate; + + public DelegatedValidation(EventValidation delegate) { + this.delegate = delegate; + } + + public void setDelegate(EventValidation delegate) { + this.delegate = delegate; + } + + @Override + public boolean validate(Identifier identifier) { + return delegate.validate(identifier); + } + + @Override + public boolean validate(EstablishmentEvent event) { + return delegate.validate(event); + } + } } diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/StereotomyImpl.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/StereotomyImpl.java index b4340b101..39fdbe946 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/StereotomyImpl.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/StereotomyImpl.java @@ -148,14 +148,12 @@ private EstablishmentEvent getLastEstablishingEvent(KeyState state) { } private Signer getSigner(KeyState state) { - var identifier = state.getIdentifier(); var signers = new PrivateKey[state.getKeys().size()]; EstablishmentEvent e = getLastEstablishingEvent(state); for (int i = 0; i < signers.length; i++) { Optional keyPair = getKeyPair(i, e); if (keyPair.isEmpty()) { - log.warn("Last establishment event not found in KEL: {} : {} missing: {}", identifier, - state.getCoordinates(), state.getLastEstablishmentEvent()); + log.warn("Key pair: {} is unavailable: {}", state.getCoordinates(), state.getLastEstablishmentEvent()); return null; } signers[i] = keyPair.get().getPrivate(); @@ -475,6 +473,12 @@ public ControlledIdentifierImpl(KeyState state) { super(state); } + @Override + public SignatureAlgorithm algorithm() { + EstablishmentEvent e = getLastEstablishingEvent(); + return SignatureAlgorithm.lookup(e.getKeys().getFirst()); + } + @Override public BoundIdentifier bind() { return new BoundControllableIdentifier<>(getState()); diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKEL.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKEL.java index 33eb64c41..d095a612f 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKEL.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/CachingKEL.java @@ -48,7 +48,7 @@ public class CachingKEL implements KEL.AppendKEL { private final LoadingCache ksCoords; public CachingKEL(Function, ?> kelSupplier) { - this(kelSupplier, defaultKsCoordsBuilder(), defaultEventCoordsBuilder()); + this(kelSupplier, defaultKsCoordsBuilder(null), defaultEventCoordsBuilder(null)); } public CachingKEL(Function, ?> kelSupplier, Caffeine builder, @@ -70,20 +70,28 @@ public CachingKEL(Function, ?> kelSupplier, Caffeine defaultEventCoordsBuilder() { - return Caffeine.newBuilder() - .maximumSize(10_000) - .expireAfterWrite(Duration.ofMinutes(10)) - .removalListener((EventCoordinates coords, KeyEvent e, RemovalCause cause) -> log.trace( - "KeyEvent {} was removed ({})", coords, cause)); + public static Caffeine defaultEventCoordsBuilder(MetricsStatsCounter metrics) { + var builder = Caffeine.newBuilder() + .maximumSize(10_000) + .expireAfterWrite(Duration.ofMinutes(10)) + .removalListener((EventCoordinates coords, KeyEvent e, RemovalCause cause) -> log.trace( + "KeyEvent {} was removed ({})", coords, cause)); + if (metrics != null) { + builder.recordStats(() -> metrics); + } + return builder; } - public static Caffeine defaultKsCoordsBuilder() { - return Caffeine.newBuilder() - .maximumSize(10_000) - .expireAfterWrite(Duration.ofMinutes(10)) - .removalListener((EventCoordinates coords, KeyState ks, RemovalCause cause) -> log.trace( - "KeyState {} was removed ({})", coords, cause)); + public static Caffeine defaultKsCoordsBuilder(MetricsStatsCounter metrics) { + var builder = Caffeine.newBuilder() + .maximumSize(10_000) + .expireAfterWrite(Duration.ofMinutes(10)) + .removalListener((EventCoordinates coords, KeyState ks, RemovalCause cause) -> log.trace( + "KeyState {} was removed ({})", coords, cause)); + if (metrics != null) { + builder.recordStats(() -> metrics); + } + return builder; } public KeyState append(KeyEvent event) { diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/MetricsStatsCounter.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/MetricsStatsCounter.java new file mode 100644 index 000000000..8bc3136ec --- /dev/null +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/caching/MetricsStatsCounter.java @@ -0,0 +1,111 @@ +/* + * Copyright 2016 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.salesforce.apollo.stereotomy.caching; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.github.benmanes.caffeine.cache.stats.StatsCounter; +import org.checkerframework.checker.index.qual.NonNegative; + +import java.util.EnumMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +import static java.util.Objects.requireNonNull; + +/** + * A {@link StatsCounter} instrumented with Dropwizard Metrics. + * + * @author ben.manes@gmail.com (Ben Manes) + * @author John Karp + */ +public final class MetricsStatsCounter implements StatsCounter { + private final Counter hitCount; + private final Counter missCount; + private final Timer loadSuccess; + private final Timer loadFailure; + private final Histogram evictions; + private final Counter evictionWeight; + private final EnumMap evictionsWithCause; + + // for implementing snapshot() + private final LongAdder totalLoadTime = new LongAdder(); + + /** + * Constructs an instance for use by a single cache. + * + * @param registry the registry of metric instances + * @param metricsPrefix the prefix name for the metrics + */ + public MetricsStatsCounter(MetricRegistry registry, String metricsPrefix) { + requireNonNull(metricsPrefix); + hitCount = registry.counter(MetricRegistry.name(metricsPrefix, "hits")); + missCount = registry.counter(MetricRegistry.name(metricsPrefix, "misses")); + loadSuccess = registry.timer(MetricRegistry.name(metricsPrefix, "loads-success")); + loadFailure = registry.timer(MetricRegistry.name(metricsPrefix, "loads-failure")); + evictions = registry.histogram(MetricRegistry.name(metricsPrefix, "evictions")); + evictionWeight = registry.counter(MetricRegistry.name(metricsPrefix, "evictions-weight")); + + evictionsWithCause = new EnumMap<>(RemovalCause.class); + for (RemovalCause cause : RemovalCause.values()) { + evictionsWithCause.put(cause, + registry.histogram(MetricRegistry.name(metricsPrefix, "evictions", cause.name()))); + } + } + + @Override + public void recordEviction(@NonNegative int weight, RemovalCause cause) { + evictionsWithCause.get(cause).update(weight); + evictionWeight.inc(weight); + } + + @Override + public void recordHits(int count) { + hitCount.inc(count); + } + + @Override + public void recordLoadFailure(long loadTime) { + loadFailure.update(loadTime, TimeUnit.NANOSECONDS); + totalLoadTime.add(loadTime); + } + + @Override + public void recordLoadSuccess(long loadTime) { + loadSuccess.update(loadTime, TimeUnit.NANOSECONDS); + totalLoadTime.add(loadTime); + } + + @Override + public void recordMisses(int count) { + missCount.inc(count); + } + + @Override + public CacheStats snapshot() { + return CacheStats.of(hitCount.getCount(), missCount.getCount(), loadSuccess.getCount(), loadFailure.getCount(), + totalLoadTime.sum(), evictions.getCount(), evictionWeight.getCount()); + } + + @Override + public String toString() { + return snapshot().toString(); + } +} diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/jks/JksKeyStore.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/jks/JksKeyStore.java index 2b23f9238..00dd1e65d 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/jks/JksKeyStore.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/jks/JksKeyStore.java @@ -140,14 +140,16 @@ private Optional get(String alias, KeyCoordinates keyCoordinates) { return Optional.empty(); } } catch (KeyStoreException e) { - log.error("Unable to query keystore for: {}", keyCoordinates != null ? keyCoordinates : alias, e); + log.error("Unable to query keystore for: {} : {}", keyCoordinates != null ? keyCoordinates : alias, + e.getMessage()); return Optional.empty(); } Certificate cert; try { cert = keyStore.getCertificate(alias); } catch (KeyStoreException e) { - log.error("Unable to retrieve certificate for: {}", keyCoordinates != null ? keyCoordinates : alias, e); + log.error("Unable to retrieve certificate for: {} : {}", keyCoordinates != null ? keyCoordinates : alias, + e.getMessage()); return Optional.empty(); } var publicKey = cert.getPublicKey(); @@ -155,7 +157,8 @@ private Optional get(String alias, KeyCoordinates keyCoordinates) { try { privateKey = (PrivateKey) keyStore.getKey(alias, passwordProvider.get()); } catch (UnrecoverableKeyException | KeyStoreException | NoSuchAlgorithmException e) { - log.error("Unable to retrieve certificate for: {}", keyCoordinates != null ? keyCoordinates : alias, e); + log.error("Unable to retrieve certificate for: {} : {}", keyCoordinates != null ? keyCoordinates : alias, + e.getMessage()); return Optional.empty(); } return Optional.of(new KeyPair(publicKey, privateKey)); diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index 466566385..65ce18665 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -122,11 +122,12 @@ public KerlDHT(Duration operationsFrequency, Context context, this.fpr = falsePositiveRate; this.operationsFrequency = operationsFrequency; this.scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); + var kerlAdapter = new KERLAdapter(this, digestAlgorithm); this.cache = new CachingKERL(f -> { try { - return f.apply(new KERLAdapter(this, digestAlgorithm())); + return f.apply(kerlAdapter); } catch (Throwable t) { - log.error("error applying cache", t); + log.error("error applying cache on: {}", member.getId(), t); return null; } }); @@ -150,7 +151,7 @@ public KerlDHT(Duration operationsFrequency, Context context, try (var k = kerlPool.create()) { return f.apply(wrap.apply(this, wrap(k))); } catch (Throwable e) { - log.error("Cannot apply kerl", e); + log.error("Cannot apply kerl on: {}", member.getId(), e); return null; } });