Skip to content

Commit

Permalink
B-6 (#182)
Browse files Browse the repository at this point in the history
* firm up contracts

* firm up contracts

* tidy up old "usage" blunder ;) restore virtual threads to Mtls*

* update dropwizard version
  • Loading branch information
Hellblazer authored Jan 20, 2024
1 parent f89e8b7 commit 557032b
Show file tree
Hide file tree
Showing 22 changed files with 345 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,18 @@ public static JohnHancock of(Sig signature) {
return new JohnHancock(signature);
}

public static JohnHancock nullSignature(SignatureAlgorithm algorithm) {
return new JohnHancock(algorithm, new byte[0][], ULong.valueOf(0));
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof JohnHancock)) {
if (!(obj instanceof JohnHancock other)) {
return false;
}
JohnHancock other = (JohnHancock) obj;
return algorithm == other.algorithm && Arrays.equals(bytes, other.bytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ private static SignatureAlgorithm lookupEd(NamedParameterSpec params) {

abstract public KeyPair generateKeyPair(SecureRandom secureRandom);

public JohnHancock nullSignature() {
return JohnHancock.nullSignature(this);
}

abstract public PublicKey publicKey(byte[] bytes);

abstract public int publicKeyLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,47 +265,57 @@ private void join(Redirect redirect, Digest v, Duration duration) {
final var join = join(v);
final var abandon = new AtomicInteger();
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
regate.set(() -> redirecting.iterate((link, m) -> {
log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId());
try {
var g = link.join(join, params.seedingTimeout());
if (g == null || g.equals(Gateway.getDefaultInstance())) {
log.info("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId());
regate.set(() -> {
if (!view.started.get()) {
return;
}
redirecting.iterate((link, m) -> {
if (!view.started.get()) {
return null;
}
log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId());
try {
var g = link.join(join, params.seedingTimeout());
if (g == null || g.equals(Gateway.getDefaultInstance())) {
log.info("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId());
abandon.incrementAndGet();
return null;
}
return g;
} catch (StatusRuntimeException sre) {
gatewaySRE(v, link, sre, abandon);
return null;
} catch (Throwable t) {
log.info("Gateway view: {} error: {} from: {} on: {}", v, t, link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
return null;
}
return g;
} catch (StatusRuntimeException sre) {
gatewaySRE(v, link, sre, abandon);
return null;
} catch (Throwable t) {
log.info("Gateway view: {} error: {} from: {} on: {}", v, t, link.getMember().getId(), node.getId());
abandon.incrementAndGet();
return null;
}
}, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, trusts, initialSeedSet, v,
majority), () -> {
if (gateway.isDone()) {
return;
}
if (abandon.get() >= majority) {
log.info("Abandoning Gateway view: {} reseeding on: {}", v, node.getId());
seeding();
} else {
abandon.set(0);
if (retries.get() < params.joinRetries()) {
log.info("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(),
params.joinRetries(), node.getId());
trusts.clear();
initialSeedSet.clear();
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(regate.get(), log)),
Entropy.nextBitsStreamLong(params.retryDelay().toNanos()), TimeUnit.NANOSECONDS);
}, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, trusts,
initialSeedSet, v, majority), () -> {
if (!view.started.get() || gateway.isDone()) {
return;
}
if (abandon.get() >= majority) {
log.info("Abandoning Gateway view: {} reseeding on: {}", v, node.getId());
seeding();
} else {
log.error("Failed to join view: {} cannot obtain majority Gateway on: {}", view, node.getId());
view.stop();
abandon.set(0);
if (retries.get() < params.joinRetries()) {
log.info("Failed to join view: {} retry: {} out of: {} on: {}", v, retries.incrementAndGet(),
params.joinRetries(), node.getId());
trusts.clear();
initialSeedSet.clear();
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(regate.get(), log)),
Entropy.nextBitsStreamLong(params.retryDelay().toNanos()),
TimeUnit.NANOSECONDS);
} else {
log.error("Failed to join view: {} cannot obtain majority Gateway on: {}", view, node.getId());
view.stop();
}
}
}
}, scheduler, params.retryDelay()));
}, scheduler, params.retryDelay());
});
regate.get().run();
}

Expand Down
26 changes: 20 additions & 6 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class View {
View.class);
private static final String SCHEDULED_VIEW_CHANGE = "Scheduled View Change";
final CommonCommunications<Fireflies, Service> comm;
final AtomicBoolean started = new AtomicBoolean();
private final CommonCommunications<Entrance, Service> approaches;
private final Context<Participant> context;
private final DigestAlgorithm digestAlgo;
Expand All @@ -96,7 +97,6 @@ public class View {
private final ConcurrentMap<Digest, RoundScheduler.Timer> pendingRebuttals = new ConcurrentSkipListMap<>();
private final RoundScheduler roundTimers;
private final Set<Digest> shunned = new ConcurrentSkipListSet<>();
private final AtomicBoolean started = new AtomicBoolean();
private final Map<String, RoundScheduler.Timer> timers = new HashMap<>();
private final ReadWriteLock viewChange = new ReentrantReadWriteLock(
true);
Expand Down Expand Up @@ -338,6 +338,9 @@ Digest currentView() {
* Finalize the view change
*/
void finalizeViewChange() {
if (!started.get()) {
return;
}
viewChange(() -> {
final var cardinality = context.memberCount();
final var superMajority = cardinality - ((cardinality - 1) / 4);
Expand Down Expand Up @@ -499,6 +502,9 @@ void scheduleFinalizeViewChange() {
void scheduleFinalizeViewChange(final int finalizeViewRounds) {
// log.trace("View change finalization scheduled: {} rounds for: {} joining: {} leaving: {} on: {}",
// finalizeViewRounds, currentView(), joins.size(), context.getOffline().size(), node.getId());
if (!started.get()) {
return;
}
timers.put(FINALIZE_VIEW_CHANGE,
roundTimers.schedule(FINALIZE_VIEW_CHANGE, this::finalizeViewChange, finalizeViewRounds));
}
Expand All @@ -510,6 +516,9 @@ void scheduleViewChange() {
void scheduleViewChange(final int viewChangeRounds) {
// log.trace("Schedule view change: {} rounds for: {} on: {}", viewChangeRounds, currentView(),
// node.getId());
if (!started.get()) {
return;
}
timers.put(SCHEDULED_VIEW_CHANGE,
roundTimers.schedule(SCHEDULED_VIEW_CHANGE, viewManagement::maybeViewChange, viewChangeRounds));
}
Expand Down Expand Up @@ -620,15 +629,18 @@ protected Gossip gossip(Fireflies link, int ring) {
node.getId());
break;
case RESOURCE_EXHAUSTED:
log.trace("Unavailable for gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(),
p.getId(), node.getId());
log.trace("Resource exhausted for gossip: {} view: {} from: {} on: {}", sre.getStatus(),
currentView(), p.getId(), node.getId());
break;
case CANCELLED:
log.trace("Communication cancelled for gossip view: {} from: {} on: {}", currentView(), p.getId(),
node.getId());
break;
case UNAVAILABLE:
accuse(p, ring, sre);
break;
default:
log.debug("Error gossiping: {} view: {} from: {} on: {}", sre.getStatus(), p.getId(), currentView(),
log.debug("Error gossiping: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(),
node.getId());
accuse(p, ring, sre);
break;
Expand Down Expand Up @@ -811,11 +823,13 @@ private boolean add(SignedViewChange observation) {
}
var currentObservation = observations.get(observer);
if (currentObservation != null) {
if (observation.getChange().getAttempt() <= currentObservation.getChange().getAttempt()) {
if (observation.getChange().getAttempt() < currentObservation.getChange().getAttempt()) {
log.trace("Stale observation: {} current: {} view change: {} current: {} offline: {} on: {}",
observation.getChange().getAttempt(), currentObservation.getChange().getAttempt(), inView,
currentView(), observer, node.getId());
return false;
} else if (observation.getChange().getAttempt() < currentObservation.getChange().getAttempt()) {
return false;
}
}
final var member = context.getActiveMember(observer);
Expand Down Expand Up @@ -1890,7 +1904,7 @@ public void join(Join join, Digest from, StreamObserver<Gateway> responseObserve
@Override
public Gossip rumors(SayWhat request, Digest from) {
if (!introduced.get()) {
log.trace("Not introduced!, ring: {} from: {} on: {}", request.getRing(), from, node.getId());
log.trace("Not introduced!, ring: {} on: {}", request.getRing(), node.getId());
throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription(
"Not introduced!, ring: %s from: %s on: %s".formatted(request.getRing(), from, node.getId())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ private SignedNonce generateNonce(KERL_ application) {
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
if (e.getCause() instanceof StatusRuntimeException sre) {
throw sre;
}
throw new RuntimeException(e.getCause());
}
}
Expand Down Expand Up @@ -314,7 +317,14 @@ public void apply(KERL_ request, Digest from, StreamObserver<SignedNonce> respon
new StatusRuntimeException(Status.UNAUTHENTICATED.withDescription("Invalid application")));
return;
}
SignedNonce sn = generateNonce(request);
SignedNonce sn;
try {
sn = generateNonce(request);
} catch (StatusRuntimeException sre) {
responseObserver.onError(sre);
return;
}

if (sn == null) {
responseObserver.onError(
new StatusRuntimeException(Status.UNAUTHENTICATED.withDescription("Invalid application")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package com.salesforce.apollo.archipelago;

import com.google.common.base.MoreObjects;
import com.salesforce.apollo.archipelago.ServerConnectionCache.ReleasableManagedChannel;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.membership.Member;
import io.grpc.*;
Expand All @@ -22,10 +21,10 @@
public class ManagedServerChannel extends ManagedChannel {
private final static Logger log = LoggerFactory.getLogger(ManagedServerChannel.class);

private final Digest context;
private final ReleasableManagedChannel delegate;
private final Digest context;
private final Releasable delegate;

ManagedServerChannel(Digest context, ReleasableManagedChannel delegate) {
ManagedServerChannel(Digest context, Releasable delegate) {
this.context = context;
this.delegate = delegate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
import io.netty.handler.ssl.ClientAuth;

import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
* @author hal.hildebrand
*/
public class MtlsClient {
private final Executor exec = Executors.newVirtualThreadPerTaskExecutor();

private final ManagedChannel channel;

Expand All @@ -34,7 +31,6 @@ public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, Cl

Limiter<GrpcClientRequestContext> limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build();
channel = NettyChannelBuilder.forAddress(address)
.executor(exec)
.sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3))
.intercept(new ConcurrencyLimitClientInterceptor(limiter,
() -> Status.RESOURCE_EXHAUSTED.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.security.Security;
import java.security.cert.X509Certificate;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -55,13 +56,15 @@ public class MtlsServer implements RouterSupplier {
private final Member from;
private final Context.Key<SSLSession> sslSessionContext = Context.key("SSLSession");
private final ServerContextSupplier supplier;
private final Executor executor;

public MtlsServer(Member from, EndpointProvider epProvider, Function<Member, ClientContextSupplier> contextSupplier,
ServerContextSupplier supplier) {
this.from = from;
this.epProvider = epProvider;
this.contextSupplier = contextSupplier;
this.supplier = supplier;
this.executor = Executors.newVirtualThreadPerTaskExecutor();
cachedMembership = CacheBuilder.newBuilder().build(new CacheLoader<X509Certificate, Digest>() {
@Override
public Digest load(X509Certificate key) throws Exception {
Expand Down Expand Up @@ -137,7 +140,6 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Li
limitsBuilder.metricRegistry(limitsRegistry);
}
NettyServerBuilder serverBuilder = NettyServerBuilder.forAddress(epProvider.getBindAddress())
.executor(Executors.newVirtualThreadPerTaskExecutor())
.withOption(ChannelOption.SO_REUSEADDR, true)
.sslContext(supplier.forServer(ClientAuth.REQUIRE,
epProvider.getAlias(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.salesforce.apollo.archipelago;

import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.membership.Member;
import io.grpc.ManagedChannel;

public interface Releasable {
ManagedChannel getChannel();

Digest getFrom();

Member getMember();

void release();

ManagedChannel shutdown();

ManagedChannel shutdownNow();
}
Loading

0 comments on commit 557032b

Please sign in to comment.