Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 8, 2024
1 parent a453f4f commit 6d9b042
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.UnsafeExecutors;
import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DynamicContext;
Expand All @@ -25,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -41,6 +43,7 @@ public class DynamicTest {
private Map<Member, Router> routers;
private Map<Member, CHOAM> choams;
private Map<Member, DynamicContext<Member>> contexts;
private ExecutorService executor;

@BeforeEach
public void setUp() throws Exception {
Expand All @@ -61,10 +64,11 @@ public void setUp() throws Exception {
.map(ControlledIdentifierMember::new)
.map(e -> (Member) e)
.toList();
executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
final var prefix = UUID.randomUUID().toString();
routers = members.stream()
.collect(Collectors.toMap(m -> m, m -> new LocalServer(prefix, m).router(
ServerConnectionCache.newBuilder().setTarget(cardinality * 2))));
ServerConnectionCache.newBuilder().setTarget(cardinality * 2), executor)));

var template = Parameters.newBuilder()
.setGenerateGenesis(true)
Expand Down Expand Up @@ -215,6 +219,9 @@ public void tearDown() throws Exception {
routers = null;
}
members = null;
if (executor != null) {
executor.shutdown();
}
}

private CHOAM constructCHOAM(SigningMember m, Parameters.Builder params, Context<Member> context) {
Expand Down
14 changes: 8 additions & 6 deletions choam/src/test/java/com/salesforce/apollo/choam/TestCHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.choam.CHOAM.TransactionExecutor;
import com.salesforce.apollo.choam.Parameters.ProducerParameters;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
Expand Down Expand Up @@ -71,6 +68,7 @@ public class TestCHOAM {
private MetricRegistry registry;
private Map<Digest, Router> routers;
private ScheduledExecutorService scheduler;
private ExecutorService executor;

@AfterEach
public void after() throws Exception {
Expand All @@ -85,13 +83,17 @@ public void after() throws Exception {
if (scheduler != null) {
scheduler.shutdown();
}
if (executor != null) {
executor.shutdown();
}
members = null;
registry = null;
}

@BeforeEach
public void before() throws Exception {
scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory());
executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
var origin = DigestAlgorithm.DEFAULT.getOrigin();
registry = new MetricRegistry();
var metrics = new ChoamMetricsImpl(origin, registry);
Expand All @@ -101,7 +103,7 @@ public void before() throws Exception {

var params = Parameters.newBuilder()
.setGenerateGenesis(true)
.setGenesisViewId(origin.prefix("Slack"))
.setGenesisViewId(origin.prefix(entropy.nextLong()))
.setGossipDuration(Duration.ofMillis(20))
.setProducer(ProducerParameters.newBuilder()
.setMaxBatchCount(15_000)
Expand All @@ -128,7 +130,7 @@ public void before() throws Exception {
.collect(Collectors.toMap(m -> m.getId(), m -> new LocalServer(prefix, m).router(
ServerConnectionCache.newBuilder()
.setMetrics(new ServerConnectionCacheMetricsImpl(registry))
.setTarget(CARDINALITY))));
.setTarget(CARDINALITY), executor)));
choams = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> {
var recording = new AtomicInteger();
blocks.put(m.getId(), recording);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -127,7 +128,7 @@ public void smoke() throws Exception {
builder.setMetrics(new ServerConnectionCacheMetricsImpl(frist.getAndSet(false) ? node0Registry : registry));
CertificateWithPrivateKey certWithKey = certs.get(node.getId());
Router comms = new MtlsServer(node, ep, clientContextSupplier, serverContextSupplier(certWithKey)).router(
builder);
builder, Executors.newVirtualThreadPerTaskExecutor());
communications.add(comms);
return new View(context, node, endpoints.get(node.getId()), EventValidation.NONE, Verifiers.NONE, comms,
parameters, DigestAlgorithm.DEFAULT, metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,14 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* @author hal.hildebrand
*/
public interface RouterSupplier {
static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return newCachedThreadPool(corePoolSize, threadFactory, true);
}

static ExecutorService newCachedThreadPool(int corePoolSize, ThreadFactory threadFactory, boolean preStart) {
var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory);
if (preStart) {
threadPoolExecutor.prestartAllCoreThreads();
}
return threadPoolExecutor;
}

default Router router() {
return router(ServerConnectionCache.newBuilder(), RouterImpl::defaultServerLimit, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
*/
package com.salesforce.apollo.model;

import com.salesforce.apollo.archipelago.EndpointProvider;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
Expand All @@ -36,6 +33,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -50,17 +48,22 @@ public class ContainmentDomainTest {
"Give me food or give me slack or kill me".getBytes());
private final ArrayList<Domain> domains = new ArrayList<>();
private final ArrayList<Router> routers = new ArrayList<>();
private ExecutorService executor;

@AfterEach
public void after() {
domains.forEach(Domain::stop);
domains.clear();
routers.forEach(r -> r.close(Duration.ofSeconds(0)));
routers.clear();
if (executor != null) {
executor.shutdown();
}
}

@BeforeEach
public void before() throws Exception {
executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
final var commsDirectory = Path.of("target/comms");
commsDirectory.toFile().mkdirs();

Expand All @@ -83,7 +86,8 @@ public void before() throws Exception {
final var group = DigestAlgorithm.DEFAULT.getOrigin();
identities.forEach((d, id) -> {
final var member = new ControlledIdentifierMember(id);
var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30));
var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30),
executor);
routers.add(localRouter);
var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID());
var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1),
Expand Down
14 changes: 9 additions & 5 deletions model/src/test/java/com/salesforce/apollo/model/DomainTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
*/
package com.salesforce.apollo.model;

import com.salesforce.apollo.archipelago.EndpointProvider;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
Expand Down Expand Up @@ -38,6 +35,7 @@
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -55,6 +53,7 @@ public class DomainTest {
"Give me food or give me slack or kill me".getBytes());
private final ArrayList<Domain> domains = new ArrayList<>();
private final ArrayList<Router> routers = new ArrayList<>();
private ExecutorService executor;

public static void smoke(Oracle oracle) throws Exception {
// Namespace
Expand Down Expand Up @@ -217,10 +216,14 @@ public void after() {
domains.clear();
routers.forEach(r -> r.close(Duration.ofSeconds(0)));
routers.clear();
if (executor != null) {
executor.shutdown();
}
}

@BeforeEach
public void before() throws Exception {
executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
var ffParams = com.salesforce.apollo.fireflies.Parameters.newBuilder();
var entropy = SecureRandom.getInstance("SHA1PRNG");
entropy.setSeed(new byte[] { 6, 6, 6 });
Expand All @@ -240,7 +243,8 @@ public void before() throws Exception {
final var group = DigestAlgorithm.DEFAULT.getOrigin();
identities.forEach((d, id) -> {
final var member = new ControlledIdentifierMember(id);
var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30));
var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30),
executor);
routers.add(localRouter);
var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID());
var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
*/
package com.salesforce.apollo.model;

import com.salesforce.apollo.archipelago.EndpointProvider;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
Expand Down Expand Up @@ -37,6 +34,7 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand All @@ -55,17 +53,22 @@ public class FireFliesTest {

private final List<ProcessDomain> domains = new ArrayList<>();
private final Map<ProcessDomain, Router> routers = new HashMap<>();
private ExecutorService executor;

@AfterEach
public void after() {
domains.forEach(n -> n.stop());
domains.clear();
routers.values().forEach(r -> r.close(Duration.ofSeconds(0)));
routers.clear();
if (executor != null) {
executor.shutdown();
}
}

@BeforeEach
public void before() throws Exception {
executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
var ffParams = com.salesforce.apollo.fireflies.Parameters.newBuilder();
var entropy = SecureRandom.getInstance("SHA1PRNG");
entropy.setSeed(new byte[] { 6, 6, 6 });
Expand All @@ -84,7 +87,8 @@ public void before() throws Exception {
identities.forEach((digest, id) -> {
var context = new DynamicContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3);
final var member = new ControlledIdentifierMember(id);
var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30));
var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30),
executor);
var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID());
var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofSeconds(5),
"jdbc:h2:mem:%s-state".formatted(digest),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.UnsafeExecutors;
import com.salesforce.apollo.choam.CHOAM;
import com.salesforce.apollo.choam.CHOAM.TransactionExecutor;
import com.salesforce.apollo.choam.Parameters;
Expand Down Expand Up @@ -82,6 +83,7 @@ public class CHOAMTest {
private MetricRegistry registry;
private Map<Digest, Router> routers;
private ScheduledExecutorService scheduler;
private ExecutorService executor;

private static Txn initialInsert() {
return Txn.newBuilder()
Expand All @@ -107,6 +109,9 @@ public void after() throws Exception {
scheduler.shutdownNow();
scheduler = null;
}
if (executor != null) {
executor.shutdown();
}
updaters.values().forEach(up -> up.close());
updaters.clear();
members = null;
Expand All @@ -123,6 +128,7 @@ public void after() throws Exception {
@BeforeEach
public void before() throws Exception {
scheduler = Executors.newScheduledThreadPool(10, Thread.ofVirtual().factory());
executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
registry = new MetricRegistry();
checkpointDirBase = new File("target/ct-chkpoints-" + Entropy.nextBitsStreamLong());
Utils.clean(checkpointDirBase);
Expand Down Expand Up @@ -155,7 +161,8 @@ public void before() throws Exception {
members.forEach(m -> context.activate(m));
final var prefix = UUID.randomUUID().toString();
routers = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> {
var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30));
var localRouter = new LocalServer(prefix, m).router(ServerConnectionCache.newBuilder().setTarget(30),
executor);
return localRouter;
}));
choams = members.stream()
Expand Down

0 comments on commit 6d9b042

Please sign in to comment.