Skip to content

Commit

Permalink
Networking (#199)
Browse files Browse the repository at this point in the history
* Change endpoints to opaque string encoding

* cleanup + 4 forks
  • Loading branch information
Hellblazer authored May 19, 2024
1 parent 36fcde6 commit 999350a
Show file tree
Hide file tree
Showing 18 changed files with 91 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ jobs:
cache: 'maven'
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: ./mvnw -batch-mode clean install -Ppre --file pom.xml
run: ./mvnw -batch-mode clean install -Dforks=4 -Ppre --file pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,7 @@ private Registration registration() {
private NoteWrapper seedFor(Seed seed) {
SignedNote seedNote = SignedNote.newBuilder()
.setNote(Note.newBuilder()
.setHost(seed.endpoint().getHostName())
.setPort(seed.endpoint().getPort())
.setEndpoint(seed.endpoint())
.setIdentifier(seed.identifier().toIdent())
.setEpoch(-1)
.setMask(ByteString.copyFrom(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public Digest currentView() {
return currentView;
}

public String getEndpoint() {
return note.getNote().getEndpoint();
}

public long getEpoch() {
return note.getNote().getEpoch();
}
Expand All @@ -48,10 +52,6 @@ public Digest getHash() {
return hash;
}

public String getHost() {
return note.getNote().getHost();
}

public Digest getId() {
return getIdentifier().getDigest();
}
Expand All @@ -64,10 +64,6 @@ public BitSet getMask() {
return mask;
}

public int getPort() {
return note.getNote().getPort();
}

public JohnHancock getSignature() {
return signature(note.getSignature());
}
Expand Down
20 changes: 8 additions & 12 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
Expand Down Expand Up @@ -113,14 +111,14 @@ public class View {
private final Verifiers verifiers;
private volatile ScheduledFuture<?> futureGossip;

public View(DynamicContext<Participant> context, ControlledIdentifierMember member, InetSocketAddress endpoint,
public View(DynamicContext<Participant> context, ControlledIdentifierMember member, String endpoint,
EventValidation validation, Verifiers verifiers, Router communications, Parameters params,
DigestAlgorithm digestAlgo, FireflyMetrics metrics) {
this(context, member, endpoint, validation, verifiers, communications, params, communications, digestAlgo,
metrics);
}

public View(DynamicContext<Participant> context, ControlledIdentifierMember member, InetSocketAddress endpoint,
public View(DynamicContext<Participant> context, ControlledIdentifierMember member, String endpoint,
EventValidation validation, Verifiers verifiers, Router communications, Parameters params,
Router gateway, DigestAlgorithm digestAlgo, FireflyMetrics metrics) {
this.metrics = metrics;
Expand Down Expand Up @@ -1481,19 +1479,18 @@ private boolean verify(SelfAddressingIdentifier id, SigningThreshold threshold,
return verifier.get().verify(threshold, signature, message);
}

public record Seed(SelfAddressingIdentifier identifier, InetSocketAddress endpoint) {
public record Seed(SelfAddressingIdentifier identifier, String endpoint) {
}

public class Node extends Participant implements SigningMember {
private final ControlledIdentifierMember wrapped;

public Node(ControlledIdentifierMember wrapped, InetSocketAddress endpoint) {
public Node(ControlledIdentifierMember wrapped, String endpoint) {
super(wrapped.getId());
this.wrapped = wrapped;
var n = Note.newBuilder()
.setEpoch(0)
.setHost(endpoint.getHostName())
.setPort(endpoint.getPort())
.setEndpoint(endpoint)
.setIdentifier(wrapped.getIdentifier().getIdentifier().toIdent())
.setMask(ByteString.copyFrom(nextMask().toByteArray()))
.build();
Expand Down Expand Up @@ -1656,8 +1653,7 @@ void reset() {
var n = Note.newBuilder()
.setEpoch(0)
.setCurrentView(currentView().toDigeste())
.setHost(current.getHost())
.setPort(current.getPort())
.setEndpoint(current.getEndpoint())
.setIdentifier(current.getIdentifier().toIdent())
.setMask(ByteString.copyFrom(nextMask().toByteArray()))
.build();
Expand Down Expand Up @@ -1693,12 +1689,12 @@ public int compareTo(Member o) {
return id.compareTo(o.getId());
}

public SocketAddress endpoint() {
public String endpoint() {
final var current = note;
if (current == null) {
return null;
}
return new InetSocketAddress(current.getHost(), current.getPort());
return current.getEndpoint();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.context.DynamicContext;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
Expand All @@ -27,7 +24,6 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.*;
Expand Down Expand Up @@ -93,10 +89,9 @@ public void churn() throws Exception {
System.out.println();
System.out.println("Starting views");
System.out.println();

var seeds = members.values()
.stream()
.map(m -> new Seed(m.getIdentifier().getIdentifier(), new InetSocketAddress(0)))
.map(m -> new Seed(m.getIdentifier().getIdentifier(), EndpointProvider.allocatePort()))
.limit(25)
.toList();

Expand Down Expand Up @@ -289,8 +284,8 @@ private void initialize() {

gateway.start();
gateways.add(comms);
return new View(context, node, new InetSocketAddress(0), EventValidation.NONE, Verifiers.from(kerl), comms,
parameters, gateway, DigestAlgorithm.DEFAULT, metrics);
return new View(context, node, EndpointProvider.allocatePort(), EventValidation.NONE, Verifiers.from(kerl),
comms, parameters, gateway, DigestAlgorithm.DEFAULT, metrics);
}).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.context.DynamicContext;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
Expand All @@ -27,7 +24,6 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.*;
Expand Down Expand Up @@ -102,7 +98,7 @@ public void smokin() throws Exception {

final var seeds = members.values()
.stream()
.map(m -> new Seed(m.getIdentifier().getIdentifier(), new InetSocketAddress(0)))
.map(m -> new Seed(m.getIdentifier().getIdentifier(), EndpointProvider.allocatePort()))
.limit(largeTests ? 10 : 1)
.toList();
final var bootstrapSeed = seeds.subList(0, 1);
Expand Down Expand Up @@ -206,8 +202,8 @@ private void initialize() {

gateway.start();
gateways.add(comms);
return new View(context, node, new InetSocketAddress(0), EventValidation.NONE, Verifiers.from(kerl), comms,
parameters, gateway, DigestAlgorithm.DEFAULT, metrics);
return new View(context, node, EndpointProvider.allocatePort(), EventValidation.NONE, Verifiers.from(kerl),
comms, parameters, gateway, DigestAlgorithm.DEFAULT, metrics);
}).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.Provider;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
Expand All @@ -59,7 +56,7 @@
public class MtlsTest {
private static final int CARDINALITY;
private static final Map<Digest, CertificateWithPrivateKey> certs = new HashMap<>();
private static final Map<Digest, InetSocketAddress> endpoints = new HashMap<>();
private static final Map<Digest, String> endpoints = new HashMap<>();
private static final boolean LARGE_TESTS = Boolean.getBoolean(
"large_tests");
private static Map<Digest, ControlledIdentifier<SelfAddressingIdentifier>> identities;
Expand All @@ -75,19 +72,21 @@ public class MtlsTest {
public static void beforeClass() throws Exception {
var entropy = SecureRandom.getInstance("SHA1PRNG");
entropy.setSeed(new byte[] { 6, 6, 6 });
String localhost = InetAddress.getLoopbackAddress().getHostName();
var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy);
identities = IntStream.range(0, CARDINALITY).mapToObj(i -> {
return stereotomy.newIdentifier();
}).collect(Collectors.toMap(controlled -> controlled.getIdentifier().getDigest(), controlled -> controlled));
identities.entrySet().forEach(e -> {
InetSocketAddress endpoint = new InetSocketAddress(localhost, Utils.allocatePort());
certs.put(e.getKey(),
e.getValue().provision(Instant.now(), Duration.ofDays(1), SignatureAlgorithm.DEFAULT));
endpoints.put(e.getKey(), endpoint);
endpoints.put(e.getKey(), EndpointProvider.allocatePort());
});
}

private static String endpoint(Member m) {
return ((Participant) m).endpoint();
}

@AfterEach
public void after() {
if (views != null) {
Expand Down Expand Up @@ -117,15 +116,14 @@ public void smoke() throws Exception {

var builder = ServerConnectionCache.newBuilder().setTarget(30);
var frist = new AtomicBoolean(true);
Function<Member, SocketAddress> resolver = m -> ((Participant) m).endpoint();

var clientContextSupplier = clientContextSupplier();
views = members.stream().map(node -> {
DynamicContext<Participant> context = ctxBuilder.build();
FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(),
frist.getAndSet(false) ? node0Registry : registry);
EndpointProvider ep = new StandardEpProvider(endpoints.get(node.getId()), ClientAuth.REQUIRE,
CertificateValidator.NONE, resolver);
CertificateValidator.NONE, MtlsTest::endpoint);
builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry));
CertificateWithPrivateKey certWithKey = certs.get(node.getId());
Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.context.DynamicContext;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
Expand All @@ -27,7 +24,6 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.*;
Expand Down Expand Up @@ -103,7 +99,7 @@ public void swarm() throws Exception {

final var seeds = members.values()
.stream()
.map(m -> new Seed(m.getIdentifier().getIdentifier(), new InetSocketAddress(0)))
.map(m -> new Seed(m.getIdentifier().getIdentifier(), EndpointProvider.allocatePort()))
.limit(largeTests ? 100 : 10)
.toList();
final var bootstrapSeed = seeds.subList(0, 1);
Expand Down Expand Up @@ -241,8 +237,8 @@ private void initialize() {

gateway.start();
gateways.add(comms);
return new View(context, node, new InetSocketAddress(0), EventValidation.NONE, Verifiers.from(kerl), comms,
parameters, gateway, DigestAlgorithm.DEFAULT, metrics);
return new View(context, node, EndpointProvider.allocatePort(), EventValidation.NONE, Verifiers.from(kerl),
comms, parameters, gateway, DigestAlgorithm.DEFAULT, metrics);
}).collect(Collectors.toList());
}
}
3 changes: 1 addition & 2 deletions grpc/src/main/proto/fireflies.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ message Note {
crypto.Digeste currentView = 2;
stereotomy.Ident identifier = 3;
bytes mask = 4;
string host = 5;
int32 port = 6;
string endpoint = 5;
}

message ViewChange {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package com.salesforce.apollo.demesnes;

import com.salesforce.apollo.archipelago.EndpointProvider;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
Expand All @@ -31,7 +32,6 @@
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;

import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.security.SecureRandom;
import java.time.Duration;
Expand Down Expand Up @@ -208,7 +208,7 @@ public void before() throws Exception {
.setContext(context)
.setCommunications(
localRouter),
new InetSocketAddress(0), commsDirectory, ffParams,
EndpointProvider.allocatePort(), commsDirectory, ffParams,
IdentifierSpecification.newBuilder(), null);
domains.add(node);
routers.put(node, localRouter);
Expand All @@ -221,7 +221,7 @@ public void smokin() throws Exception {
long then = System.currentTimeMillis();
final var countdown = new CountDownLatch(domains.size());
final var seeds = Collections.singletonList(
new Seed(domains.getFirst().getMember().getIdentifier().getIdentifier(), new InetSocketAddress(0)));
new Seed(domains.getFirst().getMember().getIdentifier().getIdentifier(), EndpointProvider.allocatePort()));
domains.forEach(d -> {
BiConsumer<Context, Digest> c = (context, viewId) -> {
if (context.cardinality() == CARDINALITY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,28 @@
*/
package com.salesforce.apollo.archipelago;

import java.net.SocketAddress;

import com.google.common.net.HostAndPort;
import com.salesforce.apollo.cryptography.ssl.CertificateValidator;
import com.salesforce.apollo.membership.Member;

import com.salesforce.apollo.utils.Utils;
import io.netty.handler.ssl.ClientAuth;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

/**
* @author hal.hildebrand
*
*/
public interface EndpointProvider {
static String allocatePort() {
var addr = new InetSocketAddress(Utils.allocatePort());
return HostAndPort.fromParts(addr.getHostName(), addr.getPort()).toString();
}

static <T extends SocketAddress> T reify(String encoded) {
var hnp = HostAndPort.fromString(encoded);
return (T) new InetSocketAddress(hnp.getHost(), hnp.getPort());
}

SocketAddress addressFor(Member to);

Expand All @@ -27,6 +37,6 @@ public interface EndpointProvider {

ClientAuth getClientAuth();

CertificateValidator getValiator();
CertificateValidator getValidator();

}
Loading

0 comments on commit 999350a

Please sign in to comment.