From f61ea276af1af5a673c0c752ef69fce55c310f89 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Wed, 1 Nov 2023 15:48:34 -0700 Subject: [PATCH] further thread pool clean up. Fix BootstrappingTest --- .../apollo/gorgoneion/Gorgoneion.java | 203 ++++++++-------- .../apollo/archipelago/Enclave.java | 9 +- .../apollo/archipelago/LocalServer.java | 5 +- .../apollo/archipelago/RoutableService.java | 42 ++-- .../apollo/archipelago/RouterImpl.java | 218 ++++++++---------- .../apollo/thoth/DirectPublisher.java | 11 +- .../com/salesforce/apollo/thoth/Maat.java | 40 ++-- .../apollo/thoth/AbstractDhtTest.java | 3 + .../apollo/thoth/BootstrappingTest.java | 26 +-- thoth/src/test/resources/logback-test.xml | 10 +- 10 files changed, 269 insertions(+), 298 deletions(-) diff --git a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java index 640107ac3..c47a4fc35 100644 --- a/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java +++ b/gorgoneion/src/main/java/com/salesforce/apollo/gorgoneion/Gorgoneion.java @@ -35,6 +35,7 @@ import com.salesforce.apollo.ring.SliceIterator; import com.salesforce.apollo.stereotomy.EventCoordinates; import com.salesforce.apollo.stereotomy.event.EstablishmentEvent; +import com.salesforce.apollo.stereotomy.event.InceptionEvent; import com.salesforce.apollo.stereotomy.event.protobuf.ProtobufEventFactory; import com.salesforce.apollo.stereotomy.identifier.Identifier; import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier; @@ -50,7 +51,10 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import static com.salesforce.apollo.stereotomy.event.protobuf.ProtobufEventFactory.digestOf; @@ -58,15 +62,16 @@ * @author hal.hildebrand */ public class Gorgoneion { - public static final Logger log = LoggerFactory.getLogger(Gorgoneion.class); + public static final Logger log = LoggerFactory.getLogger( + Gorgoneion.class); @SuppressWarnings("unused") - private final CommonCommunications admissionsComm; - private final Context context; - private final CommonCommunications endorsementComm; - private final ControlledIdentifierMember member; - private final ProtoEventObserver observer; - private final Parameters parameters; - private final ScheduledExecutorService scheduler; + private final CommonCommunications admissionsComm; + private final Context context; + private final CommonCommunications endorsementComm; + private final ControlledIdentifierMember member; + private final ProtoEventObserver observer; + private final Parameters parameters; + private final ScheduledExecutorService scheduler; public Gorgoneion(Parameters parameters, ControlledIdentifierMember member, Context context, ProtoEventObserver observer, Router router, ScheduledExecutorService scheduler, @@ -84,15 +89,15 @@ public Gorgoneion(Parameters parameters, ControlledIdentifierMember member, Cont this.observer = observer; admissionsComm = admissionsRouter.create(member, context.getId(), new Admit(), ":admissions", - r -> new AdmissionsServer(admissionsRouter.getClientIdentityProvider(), - r, metrics)); + r -> new AdmissionsServer(admissionsRouter.getClientIdentityProvider(), + r, metrics)); final var service = new Endorse(); endorsementComm = endorsementRouter.create(member, context.getId(), service, ":endorsement", - r -> new EndorsementServer(admissionsRouter.getClientIdentityProvider(), - r, metrics), - EndorsementClient.getCreate(metrics), - Endorsement.getLocalLoopback(member, service)); + r -> new EndorsementServer( + admissionsRouter.getClientIdentityProvider(), r, metrics), + EndorsementClient.getCreate(metrics), + Endorsement.getLocalLoopback(member, service)); } private boolean completeEndorsement(Optional futureSailor, Member from, @@ -105,8 +110,7 @@ private boolean completeEndorsement(Optional futureSailor, Memb return true; } - private boolean completeEnrollment(Optional futureSailor, Member m, - HashSet completed) { + private boolean completeEnrollment(Optional futureSailor, Member m, HashSet completed) { if (futureSailor.isEmpty()) { return true; } @@ -127,9 +131,9 @@ private boolean completeVerification(Optional futureSailor, Member private MemberSignature endorse(Nonce request) { return MemberSignature.newBuilder() - .setId(member.getId().toDigeste()) - .setSignature(member.sign(request.toByteString()).toSig()) - .build(); + .setId(member.getId().toDigeste()) + .setSignature(member.sign(request.toByteString()).toSig()) + .build(); } private void enroll(Notarization request) { @@ -145,39 +149,39 @@ private SignedNonce generateNonce(KERL_ application) { var now = parameters.clock().instant(); final var ident = identifier.toIdent(); var nonce = Nonce.newBuilder() - .setMember(ident) - .setIssuer(member.getId().toDigeste()) - .setNoise(parameters.digestAlgorithm().random().toDigeste()) - .setTimestamp(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).setNanos(now.getNano())) - .build(); + .setMember(ident) + .setIssuer(member.getId().toDigeste()) + .setNoise(parameters.digestAlgorithm().random().toDigeste()) + .setTimestamp(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).setNanos(now.getNano())) + .build(); var successors = context.totalCount() == 1 ? Collections.singletonList(member) - : Context.uniqueSuccessors(context, - digestOf(ident, - parameters.digestAlgorithm())); + : Context.uniqueSuccessors(context, digestOf(ident, + parameters.digestAlgorithm())); final var majority = context.totalCount() == 1 ? 1 : context.majority(); final var redirecting = new SliceIterator<>("Nonce Endorsement", member, successors, endorsementComm); Set endorsements = Collections.newSetFromMap(new ConcurrentHashMap<>()); var generated = new CompletableFuture(); redirecting.iterate((link, m) -> { log.info("Request signing nonce for: {} contacting: {} on: {}", identifier, link.getMember().getId(), - member.getId()); + member.getId()); return link.endorse(nonce, parameters.registrationTimeout()); }, (futureSailor, link, m) -> completeEndorsement(futureSailor, m, endorsements), () -> { if (endorsements.size() < majority) { - generated.completeExceptionally(new StatusRuntimeException(Status.ABORTED.withDescription("Cannot gather required nonce endorsements"))); + generated.completeExceptionally(new StatusRuntimeException( + Status.ABORTED.withDescription("Cannot gather required nonce endorsements"))); } else { generated.complete(SignedNonce.newBuilder() - .addSignatures(MemberSignature.newBuilder() - .setId(member.getId().toDigeste()) - .setSignature(member.sign(nonce.toByteString()) - .toSig()) - .build()) - .setNonce(nonce) - .addAllSignatures(endorsements) - .build()); + .addSignatures(MemberSignature.newBuilder() + .setId(member.getId().toDigeste()) + .setSignature( + member.sign(nonce.toByteString()).toSig()) + .build()) + .setNonce(nonce) + .addAllSignatures(endorsements) + .build()); log.info("Generated nonce for: {} signatures: {} on: {}", identifier, endorsements.size(), - member.getId()); + member.getId()); } }, scheduler, parameters.frequency()); try { @@ -192,7 +196,7 @@ private SignedNonce generateNonce(KERL_ application) { private Identifier identifier(KERL_ kerl) { if (ProtobufEventFactory.from(kerl.getEvents(kerl.getEventsCount() - 1)) - .event() instanceof EstablishmentEvent establishment) { + .event() instanceof EstablishmentEvent establishment) { return establishment.getIdentifier(); } return null; @@ -206,12 +210,12 @@ private void notarize(Credentials credentials, Validations validations) { } var notarization = Notarization.newBuilder() - .setKerl(credentials.getAttestation().getAttestation().getKerl()) - .setValidations(validations) - .build(); + .setKerl(credentials.getAttestation().getAttestation().getKerl()) + .setValidations(validations) + .build(); var successors = Context.uniqueSuccessors(context, - digestOf(identifier.toIdent(), parameters.digestAlgorithm())); + digestOf(identifier.toIdent(), parameters.digestAlgorithm())); final var majority = context.activeCount() == 1 ? 0 : context.majority(); SliceIterator redirecting = new SliceIterator<>("Enrollment", member, successors, endorsementComm); var completed = new HashSet(); @@ -233,30 +237,34 @@ private Validations register(Credentials request) { throw new IllegalArgumentException("No identifier"); } log.debug("Validating credentials for: {} nonce signatures: {} on: {}", identifier, - request.getNonce().getSignaturesCount(), member.getId()); + request.getNonce().getSignaturesCount(), member.getId()); var validated = new CompletableFuture(); var successors = Context.uniqueSuccessors(context, - digestOf(identifier.toIdent(), parameters.digestAlgorithm())); + digestOf(identifier.toIdent(), parameters.digestAlgorithm())); final var majority = context.activeCount() == 1 ? 0 : context.majority(); final var redirecting = new SliceIterator<>("Credential verification", member, successors, endorsementComm); var verifications = new HashSet(); redirecting.iterate((link, m) -> { log.debug("Validating credentials for: {} contacting: {} on: {}", identifier, link.getMember().getId(), - member.getId()); + member.getId()); return link.validate(request, parameters.registrationTimeout()); }, (futureSailor, link, m) -> completeVerification(futureSailor, m, verifications), () -> { if (verifications.size() < majority) { - throw new StatusRuntimeException(Status.ABORTED.withDescription("Cannot gather required credential validations")); + throw new StatusRuntimeException( + Status.ABORTED.withDescription("Cannot gather required credential validations")); } else { validated.complete(Validations.newBuilder() - .setCoordinates(ProtobufEventFactory.from(kerl.getEvents(kerl.getEventsCount() - - 1)).event().getCoordinates().toEventCoords()) - .addAllValidations(verifications) - .build()); + .setCoordinates( + ProtobufEventFactory.from(kerl.getEvents(kerl.getEventsCount() - 1)) + .event() + .getCoordinates() + .toEventCoords()) + .addAllValidations(verifications) + .build()); log.debug("Validated credentials for: {} verifications: {} on: {}", identifier, verifications.size(), - member.getId()); + member.getId()); } }, scheduler, parameters.frequency()); try { @@ -273,17 +281,14 @@ private Validations register(Credentials request) { } private Validation_ validate(Credentials credentials) { - var event = (com.salesforce.apollo.stereotomy.event.InceptionEvent) ProtobufEventFactory.from(credentials.getAttestation() - .getAttestation() - .getKerl() - .getEvents(0)) - .event(); + var event = (InceptionEvent) ProtobufEventFactory.from( + credentials.getAttestation().getAttestation().getKerl().getEvents(0)).event(); log.info("Validating credentials for: {} on: {}", event.getIdentifier(), member.getId()); Signer signer = member.getIdentifier().getSigner(); return Validation_.newBuilder() - .setValidator(member.getIdentifier().getCoordinates().toEventCoords()) - .setSignature(signer.sign(event.toKeyEvent_().toByteString()).toSig()) - .build(); + .setValidator(member.getIdentifier().getCoordinates().toEventCoords()) + .setSignature(signer.sign(event.toKeyEvent_().toByteString()).toSig()) + .build(); } private Validation_ verificationOf(Credentials credentials) { @@ -300,12 +305,14 @@ public void apply(KERL_ request, Digest from, StreamObserver respon Timer.Context time) { if (!validate(request, from)) { log.warn("Invalid application from: {} on: {}", from, member.getId()); - responseObserver.onError(new StatusRuntimeException(Status.UNAUTHENTICATED.withDescription("Invalid application"))); + responseObserver.onError( + new StatusRuntimeException(Status.UNAUTHENTICATED.withDescription("Invalid application"))); return; } SignedNonce sn = generateNonce(request); if (sn == null) { - responseObserver.onError(new StatusRuntimeException(Status.UNAUTHENTICATED.withDescription("Invalid application"))); + responseObserver.onError( + new StatusRuntimeException(Status.UNAUTHENTICATED.withDescription("Invalid application"))); } else { responseObserver.onNext(sn); responseObserver.onCompleted(); @@ -317,13 +324,15 @@ public void register(Credentials request, Digest from, StreamObserver= (context.activeCount() == 1 ? 1 : context.majority()); + final var majority = count >= (context.activeCount() == 1 ? 1 : context.majority()); if (!majority) { log.warn("Invalid notarization, no majority: {} required: {} for: {} from: {} on: {}", count, - context.majority(), identifier, from, member.getId()); + context.majority(), identifier, from, member.getId()); } return majority; } else { log.warn("Invalid notarization, invalid kerl for: {} from: {} on: {}", identifier, from, - member.getId()); + member.getId()); return false; } } @@ -469,7 +476,7 @@ private boolean validateCredentials(Credentials credentials, Digest from) { final var issuer = Digest.from(sn.getNonce().getIssuer()); if (!context.isMember(issuer)) { log.warn("Invalid credential nonce, non existent issuer: {} from: {} on: {}", issuer, from, - member.getId()); + member.getId()); return false; } if (!from.equals(issuer)) { @@ -481,11 +488,11 @@ private boolean validateCredentials(Credentials credentials, Digest from) { return false; } var nInstant = Instant.ofEpochSecond(sn.getNonce().getTimestamp().getSeconds(), - sn.getNonce().getTimestamp().getNanos()); + sn.getNonce().getTimestamp().getNanos()); final var now = Instant.now(); if (now.isBefore(nInstant) || nInstant.plus(parameters.maxDuration()).isBefore(now)) { log.warn("Invalid credential nonce, invalid timestamp: {} from: {} on: {}", nInstant, from, - member.getId()); + member.getId()); return false; } @@ -511,12 +518,12 @@ private boolean validateCredentials(Credentials credentials, Digest from) { if (count < context.majority()) { log.warn("Invalid credential nonce, no majority signature: {} required > {} from: {} on: {}", count, - context.majority(), from, member.getId()); + context.majority(), from, member.getId()); return false; } log.info("Valid credential nonce for: {} from: {} on: {}", Identifier.from(sn.getNonce().getMember()), from, - member.getId()); + member.getId()); var sa = credentials.getAttestation(); final var kerl = sa.getAttestation().getKerl(); @@ -528,34 +535,34 @@ private boolean validateCredentials(Credentials credentials, Digest from) { var m = Identifier.from(sn.getNonce().getMember()); if (!m.equals(identifier)) { log.warn("Invalid credential attestation, identifier: {} not equal to nonce member: {} from: {} on: {}", - identifier, m, from, member.getId()); + identifier, m, from, member.getId()); return false; } if (identifier instanceof SelfAddressingIdentifier sai) { } else { log.warn("Invalid credential attestation, invalid identifier: {} from: {} on: {}", identifier, from, - member.getId()); + member.getId()); return false; } var aInstant = Instant.ofEpochSecond(sa.getAttestation().getTimestamp().getSeconds(), - sa.getAttestation().getTimestamp().getNanos()); - if (now.isBefore(aInstant) || aInstant.plus(parameters.maxDuration()).isBefore(now) || - aInstant.isBefore(nInstant)) { + sa.getAttestation().getTimestamp().getNanos()); + if (now.isBefore(aInstant) || aInstant.plus(parameters.maxDuration()).isBefore(now) || aInstant.isBefore( + nInstant)) { log.warn("Invalid credential attestation, invalid timestamp: {} for: {} from: {} on: {}", aInstant, - identifier, from, member.getId()); + identifier, from, member.getId()); return false; } if (ProtobufEventFactory.from(kerl.getEvents(kerl.getEventsCount() - 1)) - .event() instanceof EstablishmentEvent establishment) { + .event() instanceof EstablishmentEvent establishment) { final var verifier = new Verifier.DefaultVerifier(establishment.getKeys()); if (!verifier.verify(JohnHancock.from(sa.getAttestation().getNonce()), sn.toByteString())) { log.warn("Invalid credential attestation, invalid nonce signature for: {} from: {} on: {}", - identifier, from, member.getId()); + identifier, from, member.getId()); return false; } } else { log.warn("Invalid credential attestation, invalid kerl for: {} from: {} on: {}", identifier, from, - member.getId()); + member.getId()); return false; } log.info("Valid credential attestation for: {} from: {} on: {}", identifier, from, member.getId()); diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java index d871ed700..2cf277808 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java @@ -39,10 +39,9 @@ * @author hal.hildebrand */ public class Enclave implements RouterSupplier { - protected final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); - private final static Class channelType = getChannelType(); - private static final Logger log = LoggerFactory.getLogger( - Enclave.class); + private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final static Class channelType = getChannelType(); + private static final Logger log = LoggerFactory.getLogger(Enclave.class); private final DomainSocketAddress bridge; private final Consumer contextRegistration; @@ -103,7 +102,7 @@ public Digest getAgent() { public Digest getFrom() { return Router.SERVER_CLIENT_ID_KEY.get(); } - }, contextRegistration, executor); + }, contextRegistration); } private ManagedChannel connectTo(Member to) { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java index cc6489b24..9a061727e 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java @@ -34,7 +34,7 @@ * @author hal.hildebrand */ public class LocalServer implements RouterSupplier { - private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor(); private static final Logger log = LoggerFactory.getLogger(LocalServer.class); private static final String NAME_TEMPLATE = "%s-%s"; @@ -87,7 +87,8 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier
  • { + }); } private ManagedChannel connectTo(Member to) { diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java index c0c583b37..2bb993f52 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RoutableService.java @@ -6,37 +6,27 @@ */ package com.salesforce.apollo.archipelago; -import static com.salesforce.apollo.archipelago.Router.SERVER_CONTEXT_KEY; +import com.salesforce.apollo.crypto.Digest; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.salesforce.apollo.crypto.Digest; - -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.grpc.stub.StreamObserver; +import static com.salesforce.apollo.archipelago.Router.SERVER_CONTEXT_KEY; /** * Service implementation routable by Digest context * * @author hal.hildebrand - * */ public class RoutableService { - private static final Logger log = LoggerFactory.getLogger(RoutableService.class); - - private final Executor executor; - private final Map services = new ConcurrentHashMap<>(); - - public RoutableService(Executor executor) { - this.executor = executor; - } + private static final Logger log = LoggerFactory.getLogger(RoutableService.class); + private final Map services = new ConcurrentHashMap<>(); public void bind(Digest context, Service service) { services.put(context, service); @@ -54,14 +44,12 @@ public void evaluate(StreamObserver responseObserver, Consumer c) { log.trace("No service for context {}", context); responseObserver.onError(new StatusRuntimeException(Status.NOT_FOUND)); } else { - executor.execute(() -> { - try { - c.accept(service); - } catch (Throwable t) { - log.error("Uncaught exception in service evaluation for context: {}", context, t); - responseObserver.onError(t); - } - }); + try { + c.accept(service); + } catch (Throwable t) { + log.error("Uncaught exception in service evaluation for context: {}", context, t); + responseObserver.onError(t); + } } } } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java index 32b3f953c..8e5e9fcfe 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/RouterImpl.java @@ -6,111 +6,58 @@ */ package com.salesforce.apollo.archipelago; -import static com.salesforce.apollo.crypto.QualifiedBase64.digest; -import static com.salesforce.apollo.crypto.QualifiedBase64.qb64; +import com.netflix.concurrency.limits.Limit; +import com.netflix.concurrency.limits.limit.AIMDLimit; +import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications; +import com.salesforce.apollo.crypto.Digest; +import com.salesforce.apollo.membership.Member; +import com.salesforce.apollo.protocols.ClientIdentity; +import io.grpc.*; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.util.MutableHandlerRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.netflix.concurrency.limits.Limit; -import com.netflix.concurrency.limits.limit.AIMDLimit; -import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications; -import com.salesforce.apollo.crypto.Digest; -import com.salesforce.apollo.membership.Member; -import com.salesforce.apollo.protocols.ClientIdentity; - -import io.grpc.BindableService; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.Context; -import io.grpc.Contexts; -import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.Server; -import io.grpc.ServerBuilder; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.util.MutableHandlerRegistry; +import static com.salesforce.apollo.crypto.QualifiedBase64.digest; +import static com.salesforce.apollo.crypto.QualifiedBase64.qb64; /** * Context based GRPC routing * * @author hal.hildebrand - * */ public class RouterImpl implements Router { - public class CommonCommunications implements Router.ClientConnector { - public static Client vanilla(Member from) { - @SuppressWarnings("unchecked") - Client client = (Client) new Link() { - - @Override - public void close() throws IOException { - } - - @Override - public Member getMember() { - return from; - } - }; - return client; - } - - private final Digest context; - private final CreateClientCommunications createFunction; - private final Member from; - private final Client localLoopback; - - private final RoutableService routing; - - public CommonCommunications(Digest context, Member from, RoutableService routing) { - this(context, from, routing, m -> vanilla(from), vanilla(from)); - - } - - public CommonCommunications(Digest context, Member from, RoutableService routing, - CreateClientCommunications createFunction, - Client localLoopback) { - this.context = context; - this.routing = routing; - this.createFunction = createFunction; - this.localLoopback = localLoopback; - this.from = from; - } - - @Override - public Client connect(Member to) { - if (to == null) { - return null; - } - return started.get() ? (to.equals(from) ? localLoopback : cache.borrow(context, to, createFunction)) : null; - } - - public void deregister(Digest context) { - routing.unbind(context); - } - - public void register(Digest context, Service service) { - routing.bind(context, service); - } - } - private final static Logger log = LoggerFactory.getLogger(RouterImpl.class); + private final ServerConnectionCache cache; + private final ClientIdentity clientIdentityProvider; + private final Consumer contextRegistration; + private final Member from; + private final MutableHandlerRegistry registry = new MutableHandlerRegistry(); + private final Server server; + private final Map> services = new ConcurrentHashMap<>(); + private final AtomicBoolean started = new AtomicBoolean(); + public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, + ClientIdentity clientIdentityProvider) { + this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> {}); + } + public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, + ClientIdentity clientIdentityProvider, Consumer contextRegistration) { + this.server = serverBuilder.fallbackHandlerRegistry(registry).intercept(serverInterceptor()).build(); + this.cache = cacheBuilder.build(); + this.clientIdentityProvider = clientIdentityProvider; + this.contextRegistration = contextRegistration; + this.from = from; + } public static ClientInterceptor clientInterceptor(Digest ctx) { return new ClientInterceptor() { @@ -151,37 +98,6 @@ public ServerCall.Listener interceptCall(ServerCall contextRegistration; - private final Executor executor; - private final Member from; - private final MutableHandlerRegistry registry = new MutableHandlerRegistry(); - private final Server server; - private final Map> services = new ConcurrentHashMap<>(); - private final AtomicBoolean started = new AtomicBoolean(); - - public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, - ClientIdentity clientIdentityProvider) { - this(from, serverBuilder, cacheBuilder, clientIdentityProvider, r -> r.run()); - } - - public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, - ClientIdentity clientIdentityProvider, Consumer contextRegistration, Executor executor) { - this.server = serverBuilder.fallbackHandlerRegistry(registry).intercept(serverInterceptor()).build(); - this.cache = cacheBuilder.build(); - this.clientIdentityProvider = clientIdentityProvider; - this.contextRegistration = contextRegistration; - this.executor = executor; - this.from = from; - } - - public RouterImpl(Member from, ServerBuilder serverBuilder, ServerConnectionCache.Builder cacheBuilder, - ClientIdentity clientIdentityProvider, Executor executor) { - this(from, serverBuilder, cacheBuilder, clientIdentityProvider, d -> { - }, executor); - } - @Override public void close(Duration await) { if (!started.compareAndSet(true, false)) { @@ -197,12 +113,9 @@ public void close(Duration await) { } @Override - public CommonCommunications create(Member member, - Digest context, - Service service, - Function, BindableService> factory, - CreateClientCommunications createFunction, - Client localLoopback) { + public CommonCommunications create( + Member member, Digest context, Service service, Function, BindableService> factory, + CreateClientCommunications createFunction, Client localLoopback) { return create(member, context, service, service.routing(), factory, createFunction, localLoopback); } @@ -213,7 +126,7 @@ public CommonCommunications crea Function, BindableService> factory) { @SuppressWarnings("unchecked") RoutableService routing = (RoutableService) services.computeIfAbsent(routingLabel, c -> { - var route = new RoutableService(executor); + var route = new RoutableService(); BindableService bindableService = factory.apply(route); registry.addService(bindableService); return route; @@ -233,7 +146,7 @@ public CommonCommunications crea Client localLoopback) { @SuppressWarnings("unchecked") RoutableService routing = (RoutableService) services.computeIfAbsent(routingLabel, c -> { - var route = new RoutableService(executor); + var route = new RoutableService(); BindableService bindableService = factory.apply(route); registry.addService(bindableService); return route; @@ -266,4 +179,59 @@ public void start() { } log.info("Started router: {}", server.getListenSockets()); } + + public class CommonCommunications implements Router.ClientConnector { + private final Digest context; + private final CreateClientCommunications createFunction; + private final Member from; + private final Client localLoopback; + private final RoutableService routing; + + public CommonCommunications(Digest context, Member from, RoutableService routing) { + this(context, from, routing, m -> vanilla(from), vanilla(from)); + + } + + public CommonCommunications(Digest context, Member from, RoutableService routing, + CreateClientCommunications createFunction, + Client localLoopback) { + this.context = context; + this.routing = routing; + this.createFunction = createFunction; + this.localLoopback = localLoopback; + this.from = from; + } + + public static Client vanilla(Member from) { + @SuppressWarnings("unchecked") + Client client = (Client) new Link() { + + @Override + public void close() throws IOException { + } + + @Override + public Member getMember() { + return from; + } + }; + return client; + } + + @Override + public Client connect(Member to) { + if (to == null) { + return null; + } + return started.get() ? (to.equals(from) ? localLoopback : cache.borrow(context, to, createFunction)) : null; + } + + public void deregister(Digest context) { + routing.unbind(context); + } + + public void register(Digest context, Service service) { + routing.bind(context, service); + } + } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/DirectPublisher.java b/thoth/src/main/java/com/salesforce/apollo/thoth/DirectPublisher.java index aa71d15f9..35b574d43 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/DirectPublisher.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/DirectPublisher.java @@ -12,6 +12,8 @@ import com.salesfoce.apollo.stereotomy.event.proto.Validations; import com.salesforce.apollo.stereotomy.services.proto.ProtoEventObserver; import com.salesforce.apollo.stereotomy.services.proto.ProtoKERLAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; @@ -19,27 +21,34 @@ * @author hal.hildebrand */ public class DirectPublisher implements ProtoEventObserver { + private final static Logger log = LoggerFactory.getLogger(DirectPublisher.class); + private final ProtoKERLAdapter kerl; public DirectPublisher(ProtoKERLAdapter kerl) { - super(); this.kerl = kerl; } @Override public void publish(KERL_ kerl_, List validations) { + log.info("publishing KERL[{}] and validations[{}]", kerl_.getEventsCount(), validations.size()); validations.stream().forEach(v -> kerl.appendValidations(v)); + log.info("published KERL[{}] and validations[{}]", kerl_.getEventsCount(), validations.size()); kerl.append(kerl_); } @Override public void publishAttachments(List attachments) { + log.info("Publishing attachments[{}]", attachments.size()); kerl.appendAttachments(attachments); + log.info("Published attachments[{}]", attachments.size()); } @Override public void publishEvents(List events, List validations) { + log.info("Publishing events[{}], validations[{}]", events.size(), validations.size()); validations.forEach(v -> kerl.appendValidations(v)); kerl.append(events); + log.info("Published events[{}], validations[{}]", events.size(), validations.size()); } } diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java b/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java index cd643557c..0e331b3c5 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/Maat.java @@ -50,8 +50,7 @@ public Maat(Context context, KERL delegate, KERL validators) { @Override public KeyState append(KeyEvent event) { - var l = append(Collections.singletonList(event), - Collections.emptyList()); + var l = append(Collections.singletonList(event), Collections.emptyList()); return l.isEmpty() ? null : l.get(0); } @@ -63,13 +62,15 @@ public List append(KeyEvent... events) { @Override public List append(List events, List attachments) { final List filtered = events.stream().filter(e -> { - if (e instanceof EstablishmentEvent est && - est.getCoordinates().getSequenceNumber().equals(ULong.valueOf(0))) { + if (e instanceof EstablishmentEvent est && est.getCoordinates() + .getSequenceNumber() + .equals(ULong.valueOf(0))) { return validate(est); } return true; }).toList(); - return filtered.isEmpty() && attachments.isEmpty() ? Collections.emptyList() : super.append(filtered, attachments); + return filtered.isEmpty() && attachments.isEmpty() ? Collections.emptyList() + : super.append(filtered, attachments); } public boolean validate(EstablishmentEvent event) { @@ -81,9 +82,9 @@ public boolean validate(EstablishmentEvent event) { } final Context ctx = context; var successors = Context.uniqueSuccessors(ctx, digestOf(event.getIdentifier().toIdent(), digest.getAlgorithm())) - .stream() - .map(m -> m.getId()) - .collect(Collectors.toSet()); + .stream() + .map(m -> m.getId()) + .collect(Collectors.toSet()); record validator(EstablishmentEvent validating, JohnHancock signature) { } @@ -91,29 +92,26 @@ record validator(EstablishmentEvent validating, JohnHancock signature) { final var serialized = event.toKeyEvent_().toByteString(); Map validations = delegate.getValidations(event.getCoordinates()); - - var futures = validations.entrySet().stream().map(e -> { + validations.entrySet().forEach(e -> { KeyEvent ev = validators.getKeyEvent(e.getKey()); if (ev == null) { - return null; + return; } var signer = (EstablishmentEvent) ev; if ((signer.getIdentifier() instanceof SelfAddressingIdentifier sai)) { if (!successors.contains(sai.getDigest())) { - log.warn("Signature: {} not successor of: {} ", signer.getCoordinates(), - event.getCoordinates()); + log.warn("Signature: {} not successor of: {} ", signer.getCoordinates(), event.getCoordinates()); } mapped.add(new validator(signer, e.getValue())); log.trace("Signature: {} valid for: {}", signer.getCoordinates(), event.getCoordinates()); } else { log.warn("Signature not SAI: {} for: {}", signer.getCoordinates(), event.getCoordinates(), - event.getCoordinates()); + event.getCoordinates()); } - return event; - }).toList(); + }); - log.trace("Evaluating validation of: {} validations: {} mapped: {}", event.getCoordinates(), - validations.size(), mapped.size()); + log.trace("Evaluating validation of: {} validations: {} mapped: {}", event.getCoordinates(), validations.size(), + mapped.size()); if (mapped.size() == 0) { log.warn("No validations of: {} ", event.getCoordinates()); return false; @@ -126,13 +124,13 @@ record validator(EstablishmentEvent validating, JohnHancock signature) { verified++; } else { log.trace("Cannot verify sig: {} of: {} by: {}", r.signature, event.getCoordinates(), - r.validating.getIdentifier()); + r.validating.getIdentifier()); } } var validated = verified >= context.majority(); - log.trace("Validated: {} valid: {} out of: {} required: {} for: {} ", validated, verified, - mapped.size(), ctx.majority(), event.getCoordinates()); + log.trace("Validated: {} valid: {} out of: {} required: {} for: {} ", validated, verified, mapped.size(), + ctx.majority(), event.getCoordinates()); return validated; } } diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java index 0210721ff..db4e5321c 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/AbstractDhtTest.java @@ -41,6 +41,7 @@ import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -57,6 +58,8 @@ public class AbstractDhtTest { protected static final double PBYZ = 0.25; protected final Map dhts = new HashMap<>(); protected final Map routers = new HashMap<>(); + protected final AtomicBoolean gate = new AtomicBoolean( + false); protected Context context; protected Map> identities; protected MemKERL kerl; diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java index f2ce29a5c..9b848f30f 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/BootstrappingTest.java @@ -26,14 +26,13 @@ import com.salesforce.apollo.stereotomy.mem.MemKERL; import com.salesforce.apollo.stereotomy.mem.MemKeyStore; import com.salesforce.apollo.stereotomy.services.proto.ProtoKERLAdapter; -import org.junit.jupiter.api.BeforeEach; +import com.salesforce.apollo.utils.Utils; import org.junit.jupiter.api.Test; import java.security.SecureRandom; import java.time.Clock; import java.time.Duration; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Function; @@ -45,25 +44,16 @@ */ public class BootstrappingTest extends AbstractDhtTest { - private AtomicBoolean gate; - - @BeforeEach - public void beforeIt() { - gate = new AtomicBoolean(false); - } - @Test public void smokin() throws Exception { routers.values().forEach(r -> r.start()); - dhts.values() - .forEach(dht -> dht.start(LARGE_TESTS ? Duration.ofSeconds(100) : Duration.ofMillis(10))); + dhts.values().forEach(dht -> dht.start(LARGE_TESTS ? Duration.ofSeconds(100) : Duration.ofMillis(10))); identities.entrySet() .forEach(e -> dhts.get(e.getKey()).asKERL().append(e.getValue().getLastEstablishingEvent())); gate.set(true); - @SuppressWarnings("unused") - final var gorgons = routers.values().stream().map(r -> { + var gorgoneions = routers.values().stream().map(r -> { var k = dhts.get(r.getFrom()).asKERL(); return new Gorgoneion(Parameters.newBuilder().setKerl(k).build(), (ControlledIdentifierMember) r.getFrom(), context, new DirectPublisher(new ProtoKERLAdapter(k)), r, @@ -81,8 +71,7 @@ context, new DirectPublisher(new ProtoKERLAdapter(k)), r, var client = new ControlledIdentifierMember(clientStereotomy.newIdentifier()); // Registering client comms - var clientRouter = new LocalServer(prefix, client).router(ServerConnectionCache.newBuilder().setTarget(2) - ); + var clientRouter = new LocalServer(prefix, client).router(ServerConnectionCache.newBuilder().setTarget(2)); AdmissionsService admissions = mock(AdmissionsService.class); var clientComminications = clientRouter.create(client, context.getId(), admissions, ":admissions-client", r -> new AdmissionsServer( @@ -113,16 +102,17 @@ context, new DirectPublisher(new ProtoKERLAdapter(k)), r, assertNotNull(invitation); assertNotEquals(Validations.getDefaultInstance(), invitation); assertTrue(invitation.getValidationsCount() >= context.majority()); - - Thread.sleep(3000); // Verify client KERL published + Utils.waitForCondition(30_000, 1000, () -> testKerl.getKeyEvent(client.getEvent().getCoordinates()) != null); var keyS = testKerl.getKeyEvent(client.getEvent().getCoordinates()); + assertNotNull(keyS); admin.close(); } @Override protected BiFunction wrap() { - return (t, k) -> gate.get() ? new Maat(context, k, t.asKERL()) : k; + // This allows us to have the core member keys trusted for this test, as we're testing the bootstrapping of the client, not the entire system + return (t, k) -> gate.get() ? new Maat(context, k, k) : k; } } diff --git a/thoth/src/test/resources/logback-test.xml b/thoth/src/test/resources/logback-test.xml index 07bb88f65..c44f6e324 100644 --- a/thoth/src/test/resources/logback-test.xml +++ b/thoth/src/test/resources/logback-test.xml @@ -35,11 +35,19 @@ + + + + - + + + + +