Skip to content

Commit

Permalink
firm up contracts
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jan 20, 2024
1 parent d77ea37 commit 609504c
Show file tree
Hide file tree
Showing 17 changed files with 252 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,18 @@ public static JohnHancock of(Sig signature) {
return new JohnHancock(signature);
}

public static JohnHancock nullSignature(SignatureAlgorithm algorithm) {
return new JohnHancock(algorithm, new byte[0][], ULong.valueOf(0));
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof JohnHancock)) {
if (!(obj instanceof JohnHancock other)) {
return false;
}
JohnHancock other = (JohnHancock) obj;
return algorithm == other.algorithm && Arrays.equals(bytes, other.bytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ private static SignatureAlgorithm lookupEd(NamedParameterSpec params) {

abstract public KeyPair generateKeyPair(SecureRandom secureRandom);

public JohnHancock nullSignature() {
return JohnHancock.nullSignature(this);
}

abstract public PublicKey publicKey(byte[] bytes);

abstract public int publicKeyLength();
Expand Down
24 changes: 19 additions & 5 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,9 @@ Digest currentView() {
* Finalize the view change
*/
void finalizeViewChange() {
if (!started.get()) {
return;
}
viewChange(() -> {
final var cardinality = context.memberCount();
final var superMajority = cardinality - ((cardinality - 1) / 4);
Expand Down Expand Up @@ -499,6 +502,9 @@ void scheduleFinalizeViewChange() {
void scheduleFinalizeViewChange(final int finalizeViewRounds) {
// log.trace("View change finalization scheduled: {} rounds for: {} joining: {} leaving: {} on: {}",
// finalizeViewRounds, currentView(), joins.size(), context.getOffline().size(), node.getId());
if (!started.get()) {
return;
}
timers.put(FINALIZE_VIEW_CHANGE,
roundTimers.schedule(FINALIZE_VIEW_CHANGE, this::finalizeViewChange, finalizeViewRounds));
}
Expand All @@ -510,6 +516,9 @@ void scheduleViewChange() {
void scheduleViewChange(final int viewChangeRounds) {
// log.trace("Schedule view change: {} rounds for: {} on: {}", viewChangeRounds, currentView(),
// node.getId());
if (!started.get()) {
return;
}
timers.put(SCHEDULED_VIEW_CHANGE,
roundTimers.schedule(SCHEDULED_VIEW_CHANGE, viewManagement::maybeViewChange, viewChangeRounds));
}
Expand Down Expand Up @@ -620,15 +629,18 @@ protected Gossip gossip(Fireflies link, int ring) {
node.getId());
break;
case RESOURCE_EXHAUSTED:
log.trace("Unavailable for gossip: {} view: {} from: {} on: {}", sre.getStatus(), currentView(),
p.getId(), node.getId());
log.trace("Resource exhausted for gossip: {} view: {} from: {} on: {}", sre.getStatus(),
currentView(), p.getId(), node.getId());
break;
case CANCELLED:
log.trace("Communication cancelled for gossip view: {} from: {} on: {}", currentView(), p.getId(),
node.getId());
break;
case UNAVAILABLE:
accuse(p, ring, sre);
break;
default:
log.debug("Error gossiping: {} view: {} from: {} on: {}", sre.getStatus(), p.getId(), currentView(),
log.debug("Error gossiping: {} view: {} from: {} on: {}", sre.getStatus(), currentView(), p.getId(),
node.getId());
accuse(p, ring, sre);
break;
Expand Down Expand Up @@ -811,11 +823,13 @@ private boolean add(SignedViewChange observation) {
}
var currentObservation = observations.get(observer);
if (currentObservation != null) {
if (observation.getChange().getAttempt() <= currentObservation.getChange().getAttempt()) {
if (observation.getChange().getAttempt() < currentObservation.getChange().getAttempt()) {
log.trace("Stale observation: {} current: {} view change: {} current: {} offline: {} on: {}",
observation.getChange().getAttempt(), currentObservation.getChange().getAttempt(), inView,
currentView(), observer, node.getId());
return false;
} else if (observation.getChange().getAttempt() < currentObservation.getChange().getAttempt()) {
return false;
}
}
final var member = context.getActiveMember(observer);
Expand Down Expand Up @@ -1890,7 +1904,7 @@ public void join(Join join, Digest from, StreamObserver<Gateway> responseObserve
@Override
public Gossip rumors(SayWhat request, Digest from) {
if (!introduced.get()) {
log.trace("Not introduced!, ring: {} from: {} on: {}", request.getRing(), from, node.getId());
log.trace("Not introduced!, ring: {} on: {}", request.getRing(), node.getId());
throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription(
"Not introduced!, ring: %s from: %s on: %s".formatted(request.getRing(), from, node.getId())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -69,6 +71,7 @@ public class MtlsTest {
}

private final List<Router> communications = new ArrayList<>();
private final Executor executor = Executors.newFixedThreadPool(10);
private List<View> views;

@BeforeAll
Expand Down Expand Up @@ -128,8 +131,8 @@ public void smoke() throws Exception {
CertificateValidator.NONE, resolver);
builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry));
CertificateWithPrivateKey certWithKey = certs.get(node.getId());
Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router(
builder);
Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey),
executor).router(builder);
communications.add(comms);
return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, Verifiers.NONE, comms,
parameters, DigestAlgorithm.DEFAULT, metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
import io.netty.handler.ssl.ClientAuth;

import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
* @author hal.hildebrand
*/
public class MtlsClient {
private final Executor exec = Executors.newVirtualThreadPerTaskExecutor();

private final ManagedChannel channel;

Expand All @@ -34,7 +31,6 @@ public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, Cl

Limiter<GrpcClientRequestContext> limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build();
channel = NettyChannelBuilder.forAddress(address)
.executor(exec)
.sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3))
.intercept(new ConcurrencyLimitClientInterceptor(limiter,
() -> Status.RESOURCE_EXHAUSTED.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import java.security.Security;
import java.security.cert.X509Certificate;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -55,13 +55,15 @@ public class MtlsServer implements RouterSupplier {
private final Member from;
private final Context.Key<SSLSession> sslSessionContext = Context.key("SSLSession");
private final ServerContextSupplier supplier;
private final Executor executor;

public MtlsServer(Member from, EndpointProvider epProvider, Function<Member, ClientContextSupplier> contextSupplier,
ServerContextSupplier supplier) {
ServerContextSupplier supplier, Executor executor) {
this.from = from;
this.epProvider = epProvider;
this.contextSupplier = contextSupplier;
this.supplier = supplier;
this.executor = executor;
cachedMembership = CacheBuilder.newBuilder().build(new CacheLoader<X509Certificate, Digest>() {
@Override
public Digest load(X509Certificate key) throws Exception {
Expand Down Expand Up @@ -137,7 +139,6 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Li
limitsBuilder.metricRegistry(limitsRegistry);
}
NettyServerBuilder serverBuilder = NettyServerBuilder.forAddress(epProvider.getBindAddress())
.executor(Executors.newVirtualThreadPerTaskExecutor())
.withOption(ChannelOption.SO_REUSEADDR, true)
.sslContext(supplier.forServer(ClientAuth.REQUIRE,
epProvider.getAlias(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
*/
package com.salesforce.apollo.membership.stereotomy;

import com.salesforce.apollo.stereotomy.event.proto.KERL_;
import com.salesforce.apollo.cryptography.*;
import com.salesforce.apollo.cryptography.cert.CertificateWithPrivateKey;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.stereotomy.ControlledIdentifier;
import com.salesforce.apollo.stereotomy.KERL.EventWithAttachments;
import com.salesforce.apollo.stereotomy.event.EstablishmentEvent;
import com.salesforce.apollo.stereotomy.event.proto.KERL_;
import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.time.Duration;
Expand All @@ -36,8 +37,7 @@ public ControlledIdentifierMember(ControlledIdentifier<SelfAddressingIdentifier>

@Override
public SignatureAlgorithm algorithm() {
Signer signer = identifier.getSigner();
return signer.algorithm();
return identifier.algorithm();
}

@Override
Expand Down Expand Up @@ -95,6 +95,10 @@ public KERL_ kerl() {
@Override
public JohnHancock sign(InputStream message) {
Signer signer = identifier.getSigner();
if (signer == null) {
LoggerFactory.getLogger(ControlledIdentifierMember.class).warn("Null signer for: {}", getId());
return algorithm().nullSignature();
}
return signer.sign(message);
}

Expand Down
42 changes: 30 additions & 12 deletions model/src/main/java/com/salesforce/apollo/model/ProcessDomain.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@
* @author hal.hildebrand
*/
public class ProcessDomain extends Domain {
private final static Logger log = LoggerFactory.getLogger(ProcessDomain.class);

protected final KerlDHT dht;
protected final View foundation;
private final UUID listener;

public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDomainParameters parameters,
private final static Logger log = LoggerFactory.getLogger(ProcessDomain.class);
protected final KerlDHT dht;
protected final View foundation;
private final UUID listener;
private final EventValidation.DelegatedValidation validations;
private final Verifiers.DelegatedVerifiers verifiers;
private final ProcessDomainParameters parameters;

public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDomainParameters pdParams,
Builder builder, Parameters.RuntimeParameters.Builder runtime, InetSocketAddress endpoint,
com.salesforce.apollo.fireflies.Parameters.Builder ff, StereotomyMetrics stereotomyMetrics) {
super(member, builder, parameters.dbURL, parameters.checkpointBaseDir, runtime);
super(member, builder, pdParams.dbURL, pdParams.checkpointBaseDir, runtime);
parameters = pdParams;
var base = Context.<Participant>newBuilder()
.setBias(parameters.dhtBias)
.setpByz(parameters.dhtPbyz)
Expand All @@ -61,10 +64,9 @@ public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDom
dht = new KerlDHT(parameters.dhtOpsFrequency, params.context(), member, connectionPool,
params.digestAlgorithm(), params.communications(), parameters.dhtOperationsTimeout,
parameters.dhtFpr, stereotomyMetrics);
var mock = true;
var validation = mock ? EventValidation.NONE : dht.getAni().eventValidation(parameters.dhtEventValidTO);
var verifiers = mock ? Verifiers.NONE : dht.getVerifiers();
this.foundation = new View(base, getMember(), endpoint, validation, verifiers, params.communications(),
validations = new EventValidation.DelegatedValidation(EventValidation.NONE);
verifiers = new Verifiers.DelegatedVerifiers(Verifiers.NONE);
this.foundation = new View(base, getMember(), endpoint, validations, verifiers, params.communications(),
ff.build(), DigestAlgorithm.DEFAULT, null);
listener = foundation.register(listener());
}
Expand All @@ -81,6 +83,22 @@ public CertificateWithPrivateKey provision(Duration duration, SignatureAlgorithm
return member.getIdentifier().provision(Instant.now(), duration, signatureAlgorithm);
}

public void setAniValidations() {
validations.setDelegate(dht.getAni().eventValidation(parameters.dhtEventValidTO));
}

public void setDhtVerifiers() {
verifiers.setDelegate(dht.getVerifiers());
}

public void setValidationsNONE() {
validations.setDelegate(EventValidation.NONE);
}

public void setVerifiersNONE() {
verifiers.setDelegate(Verifiers.NONE);
}

@Override
public void start() {
startServices();
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
<jooq.version>3.17.2</jooq.version>
<bc.version>1.74</bc.version>
<logback.version>1.4.12</logback.version>
<grpc.version>1.60.1</grpc.version>
<protobuf.version>3.25.1</protobuf.version>
<grpc.version>1.61.0</grpc.version>
<protobuf.version>3.25.2</protobuf.version>
<liquibase.version>4.8.0</liquibase.version>
<netty.version>4.1.100.Final</netty.version>
<native.maven.plugin.version>0.9.27</native.maven.plugin.version>
Expand Down
4 changes: 4 additions & 0 deletions protocols/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<groupId>com.salesforce.apollo</groupId>
<artifactId>cryptography</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions stereotomy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-metrics</artifactId>
<optional>true</optional>
</dependency>

<!-- Test only deps below this line -->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
* @author hal.hildebrand
*/
public interface ControlledIdentifier<D extends Identifier> extends BoundIdentifier<D> {
SignatureAlgorithm algorithm();

/**
* @return the binding of the identifier to the current key state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,26 @@ public boolean validate(Identifier identifier) {
boolean validate(EstablishmentEvent event);

boolean validate(Identifier identifier);

class DelegatedValidation implements EventValidation {
private volatile EventValidation delegate;

public DelegatedValidation(EventValidation delegate) {
this.delegate = delegate;
}

public void setDelegate(EventValidation delegate) {
this.delegate = delegate;
}

@Override
public boolean validate(Identifier identifier) {
return delegate.validate(identifier);
}

@Override
public boolean validate(EstablishmentEvent event) {
return delegate.validate(event);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,12 @@ private EstablishmentEvent getLastEstablishingEvent(KeyState state) {
}

private Signer getSigner(KeyState state) {
var identifier = state.getIdentifier();
var signers = new PrivateKey[state.getKeys().size()];
EstablishmentEvent e = getLastEstablishingEvent(state);
for (int i = 0; i < signers.length; i++) {
Optional<KeyPair> keyPair = getKeyPair(i, e);
if (keyPair.isEmpty()) {
log.warn("Last establishment event not found in KEL: {} : {} missing: {}", identifier,
state.getCoordinates(), state.getLastEstablishmentEvent());
log.warn("Key pair: {} is unavailable: {}", state.getCoordinates(), state.getLastEstablishmentEvent());
return null;
}
signers[i] = keyPair.get().getPrivate();
Expand Down Expand Up @@ -475,6 +473,12 @@ public ControlledIdentifierImpl(KeyState state) {
super(state);
}

@Override
public SignatureAlgorithm algorithm() {
EstablishmentEvent e = getLastEstablishingEvent();
return SignatureAlgorithm.lookup(e.getKeys().getFirst());
}

@Override
public BoundIdentifier<D> bind() {
return new BoundControllableIdentifier<>(getState());
Expand Down
Loading

0 comments on commit 609504c

Please sign in to comment.