Skip to content

Commit

Permalink
another attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 6, 2024
1 parent 9142456 commit 1317613
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.UnsafeExecutors;
import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.context.DynamicContext;
Expand Down Expand Up @@ -64,10 +63,9 @@ public void setUp() throws Exception {
.toList();

final var prefix = UUID.randomUUID().toString();
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
routers = members.stream()
.collect(Collectors.toMap(m -> m, m -> new LocalServer(prefix, m).router(
ServerConnectionCache.newBuilder().setTarget(cardinality * 2), executor)));
ServerConnectionCache.newBuilder().setTarget(cardinality * 2))));

var template = Parameters.newBuilder()
.setGenerateGenesis(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.archipelago.ServerConnectionCacheMetricsImpl;
import com.salesforce.apollo.choam.CHOAM.TransactionExecutor;
import com.salesforce.apollo.choam.Parameters.ProducerParameters;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
Expand Down Expand Up @@ -121,12 +124,11 @@ public void before() throws Exception {
.toList();
var context = new StaticContext<>(origin, 0.2, members, 3);
final var prefix = UUID.randomUUID().toString();
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
routers = members.stream()
.collect(Collectors.toMap(m -> m.getId(), m -> new LocalServer(prefix, m).router(
ServerConnectionCache.newBuilder()
.setMetrics(new ServerConnectionCacheMetricsImpl(registry))
.setTarget(CARDINALITY), executor)));
.setTarget(CARDINALITY))));
choams = members.stream().collect(Collectors.toMap(m -> m.getId(), m -> {
var recording = new AtomicInteger();
blocks.put(m.getId(), recording);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ private void initialize() {
AtomicBoolean frist = new AtomicBoolean(true);
final var prefix = UUID.randomUUID().toString();
final var gatewayPrefix = UUID.randomUUID().toString();
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
views = members.values().stream().map(node -> {
DynamicContext<Participant> context = ctxBuilder.build();
FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(),
Expand All @@ -283,15 +282,14 @@ private void initialize() {
.setMetrics(
new ServerConnectionCacheMetricsImpl(
frist.getAndSet(false) ? node0Registry
: registry)),
executor);
: registry)));
var gateway = new LocalServer(gatewayPrefix, node).router(ServerConnectionCache.newBuilder()
.setTarget(200)
.setMetrics(
new ServerConnectionCacheMetricsImpl(
frist.getAndSet(false)
? node0Registry : registry)),
executor);
? node0Registry
: registry)));
comms.start();
communications.add(comms);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,21 +230,19 @@ private void initialize() {
DynamicContext<Participant> context = ctxBuilder.build();
FireflyMetricsImpl metrics = new FireflyMetricsImpl(context.getId(),
frist.getAndSet(false) ? node0Registry : registry);
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
var comms = new LocalServer(prefix, node).router(ServerConnectionCache.newBuilder()
.setTarget(200)
.setMetrics(
new ServerConnectionCacheMetricsImpl(
frist.getAndSet(false) ? node0Registry
: registry)),
executor);
: registry)));
var gateway = new LocalServer(gatewayPrefix, node).router(ServerConnectionCache.newBuilder()
.setTarget(200)
.setMetrics(
new ServerConnectionCacheMetricsImpl(
frist.getAndSet(false)
? node0Registry : registry)),
executor);
? node0Registry
: registry)));
comms.start();
communications.add(comms);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
*/
package com.salesforce.apollo.model;

import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.archipelago.EndpointProvider;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
Expand Down Expand Up @@ -235,11 +238,9 @@ public void before() throws Exception {

var sealed = FoundationSeal.newBuilder().build();
final var group = DigestAlgorithm.DEFAULT.getOrigin();
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
identities.forEach((d, id) -> {
final var member = new ControlledIdentifierMember(id);
var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30),
executor);
var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30));
routers.add(localRouter);
var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID());
var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofMinutes(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
*/
package com.salesforce.apollo.model;

import com.salesforce.apollo.archipelago.*;
import com.salesforce.apollo.archipelago.EndpointProvider;
import com.salesforce.apollo.archipelago.LocalServer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.ServerConnectionCache;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.Parameters.Builder;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
Expand Down Expand Up @@ -78,12 +81,10 @@ public void before() throws Exception {

Digest group = DigestAlgorithm.DEFAULT.getOrigin();
var sealed = FoundationSeal.newBuilder().build();
var executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor();
identities.forEach((digest, id) -> {
var context = new DynamicContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3);
final var member = new ControlledIdentifierMember(id);
var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30),
executor);
var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30));
var dbUrl = String.format("jdbc:h2:mem:sql-%s-%s;DB_CLOSE_DELAY=-1", member.getId(), UUID.randomUUID());
var pdParams = new ProcessDomain.ProcessDomainParameters(dbUrl, Duration.ofSeconds(5),
"jdbc:h2:mem:%s-state".formatted(digest),
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@
<configuration>
<forkCount>${forks}</forkCount>
<reuseForks>false</reuseForks>
<argLine>-Xmx10G -Xms100M</argLine>
<argLine>-Xmx10G -Xms4G</argLine>
<argLine>-Djdk.tracePinnedThreads=full</argLine>
</configuration>
</plugin>
Expand Down

0 comments on commit 1317613

Please sign in to comment.