Skip to content

Commit

Permalink
optimize gather, parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Apr 7, 2024
1 parent a7ed6e4 commit 508b3a7
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ jobs:
cache: 'maven'
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: ./mvnw -T 0.5C -batch-mode clean install -Ppre --file pom.xml -Dforks=2
run: ./mvnw -T 0.5C -batch-mode clean install -Ppre --file pom.xml -Dforks=4
11 changes: 6 additions & 5 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,14 @@ public CHOAM(Parameters params) {

nextView();
var bContext = new DelegatedContext<>(params.context());
var adapter = new MessageAdapter(any -> true, (Function<ByteString, Digest>) this::signatureHash,
(Function<ByteString, List<Digest>>) any -> Collections.emptyList(),
(m, any) -> any,
(Function<AgedMessageOrBuilder, ByteString>) AgedMessageOrBuilder::getContent);

combine = new ReliableBroadcaster(bContext, params.member(), params.combine(), params.communications(),
params.metrics() == null ? null : params.metrics().getCombineMetrics(),
new MessageAdapter(any -> true,
(Function<ByteString, Digest>) any -> signatureHash(any),
(Function<ByteString, List<Digest>>) any -> Collections.emptyList(),
(m, any) -> any,
(Function<AgedMessageOrBuilder, ByteString>) am -> am.getContent()));
adapter);
linear = Executors.newSingleThreadExecutor(
Thread.ofVirtual().name("Linear " + params.member().getId()).factory());
combine.registerHandler((ctx, messages) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ private boolean consider(Optional<ViewMember> futureSailor, Terminal term, Membe
params().member().getId());
return !gathered();
}
polled.add(vm);
join(member, true);
return !gathered();
}
Expand Down Expand Up @@ -243,6 +242,7 @@ private void join(ViewMember vm, boolean direct) {
validations.forEach(this::validate);
}
}
polled.add(mid);
var reass = builder.build();
if (reass.isInitialized()) {
publisher.accept(reass);
Expand Down
13 changes: 0 additions & 13 deletions fireflies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,4 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
</configuration>
</plugin>
</plugins>
</build>
</project>
8 changes: 8 additions & 0 deletions sql-state/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@
* @author hal.hildebrand
*/
abstract public class AbstractLifecycleTest {
protected static final int CARDINALITY = 5;
protected static final Random entropy = new Random();
private static final List<Transaction> GENESIS_DATA;

private static final Digest GENESIS_VIEW_ID = DigestAlgorithm.DEFAULT.digest(
protected static final int CARDINALITY = 5;
private static final Digest GENESIS_VIEW_ID = DigestAlgorithm.DEFAULT.digest(
"Give me food or give me slack or kill me".getBytes());
protected final AtomicReference<ULong> checkpointHeight = new AtomicReference<>();
protected final Map<Member, SqlStateMachine> updaters = new HashMap<>();
// static {
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Session.class)).setLevel(Level.TRACE);
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(CHOAM.class)).setLevel(Level.TRACE);
Expand All @@ -71,16 +70,9 @@ abstract public class AbstractLifecycleTest {
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Fsm.class)).setLevel(Level.TRACE);
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(TxDataSource.class)).setLevel(Level.TRACE);
// }

static {
var txns = MigrationTest.initializeBookSchema();
txns.add(initialInsert());
GENESIS_DATA = CHOAM.toGenesisData(txns);
}

protected final AtomicReference<ULong> checkpointHeight = new AtomicReference<>();
protected final Map<Member, SqlStateMachine> updaters = new HashMap<>();
private final List<Transaction> GENESIS_DATA;
private final Map<Member, Parameters> parameters = new HashMap<>();
protected SecureRandom entropy;
protected CountDownLatch checkpointOccurred;
protected Map<Digest, CHOAM> choams;
protected List<SigningMember> members;
Expand All @@ -92,6 +84,12 @@ abstract public class AbstractLifecycleTest {
private File checkpointDirBase;
private List<Transactioneer> transactioneers;

{
var txns = MigrationTest.initializeBookSchema();
txns.add(initialInsert());
GENESIS_DATA = CHOAM.toGenesisData(txns);
}

public AbstractLifecycleTest() {
super();
}
Expand Down Expand Up @@ -132,7 +130,7 @@ public void before() throws Exception {
baseDir = new File(System.getProperty("user.dir"), "target/cluster-" + Entropy.nextBitsStreamLong());
Utils.clean(baseDir);
baseDir.mkdirs();
var entropy = SecureRandom.getInstance("SHA1PRNG");
entropy = SecureRandom.getInstance("SHA1PRNG");
entropy.setSeed(new byte[] { 6, 6, 6, disc() });
context = new DynamicContextImpl<>(DigestAlgorithm.DEFAULT.getOrigin(), CARDINALITY, 0.2, 3);
toleranceLevel = context.toleranceLevel();
Expand Down

0 comments on commit 508b3a7

Please sign in to comment.