Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

virtual threading #151

Merged
merged 44 commits into from
Nov 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f160699
First form, thread per task iterator
Hellblazer Sep 23, 2023
b9b6db8
Fix some crap
Hellblazer Sep 23, 2023
27a3726
Smoke test for synchronous ring iterator
Hellblazer Sep 23, 2023
e7c661d
correct test
Hellblazer Sep 23, 2023
6dbbe23
SyncSliceIterator + Test
Hellblazer Sep 23, 2023
e7f7b00
fix
Hellblazer Sep 23, 2023
780ae9b
add sync version of ring comms + test + refactor iterator.
Hellblazer Sep 24, 2023
7744e92
ReliableBroadcast uses synchronous version of ring comms
Hellblazer Sep 24, 2023
ac258f1
Fireflies uses synchronous version of ring comms
Hellblazer Sep 24, 2023
cb0c126
No more async usage of RingComm
Hellblazer Sep 24, 2023
f03806b
mass refactoring to eliminate completable/listenable future
Hellblazer Sep 24, 2023
21294cb
Renaming and eliminating more futures. Gorgoneion module passing
Hellblazer Sep 24, 2023
40b88b7
Fix corgoneion bootstrapping for only one active member.
Hellblazer Sep 24, 2023
668bd88
run to CHOAM. 1 failure, 2 errors ;)
Hellblazer Sep 25, 2023
4f13506
trace rather than info
Hellblazer Sep 25, 2023
d95388a
trace rather than info
Hellblazer Sep 25, 2023
d225253
remove class cast mock madness
Hellblazer Oct 23, 2023
5dc7fa9
correctly handle StatusRuntimeException
Hellblazer Oct 23, 2023
655e29e
back to info
Hellblazer Oct 23, 2023
3ab46bd
scheduler per
Hellblazer Oct 23, 2023
74785ff
Remove schedulers, just hard-wire where needed
Hellblazer Oct 24, 2023
d19ad6e
remove some parameterized thread pools
Hellblazer Oct 25, 2023
959d85d
handle status runtime exception
Hellblazer Oct 25, 2023
5ef5682
whoops
Hellblazer Oct 25, 2023
de5ca4f
handle another issue with statusruntimeex.
Hellblazer Oct 27, 2023
bbf9b27
timout?
Hellblazer Oct 27, 2023
c58e869
amp up logging
Hellblazer Oct 27, 2023
72a169d
stereotomy logging
Hellblazer Oct 28, 2023
e23e134
catch remote runtime excep
Hellblazer Oct 28, 2023
93259ed
squelch unikerl
Hellblazer Oct 28, 2023
5bd89c5
remove even moar thread pools.
Hellblazer Oct 28, 2023
944d98d
ensure shutdown
Hellblazer Oct 28, 2023
ba0321a
further thread pool clean up
Hellblazer Oct 28, 2023
f61ea27
further thread pool clean up. Fix BootstrappingTest
Hellblazer Nov 1, 2023
862db5c
fix liquibase error on missing dir
Hellblazer Nov 1, 2023
2864dcc
amp up logging
Hellblazer Nov 3, 2023
d9ec92f
better logging
Hellblazer Nov 3, 2023
54232db
cleaner testing
Hellblazer Nov 3, 2023
f96e25e
cleaner testing
Hellblazer Nov 3, 2023
edc38fb
cleaner testing
Hellblazer Nov 3, 2023
77fa8f0
squelch logging
Hellblazer Nov 3, 2023
5034ee4
squelch logging
Hellblazer Nov 3, 2023
e105b6f
minor cleanup and fix ReliableBroadcaster - was not executing finally…
Hellblazer Nov 5, 2023
6a91c8f
set explicit executors to netty client/server builders
Hellblazer Nov 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,281 changes: 1,116 additions & 1,165 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java

Large diffs are not rendered by default.

116 changes: 55 additions & 61 deletions choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,10 @@
*/
package com.salesforce.apollo.choam;

import static com.salesforce.apollo.crypto.QualifiedBase64.publicKey;
import static com.salesforce.apollo.crypto.QualifiedBase64.signature;

import java.security.PublicKey;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.chiralbehaviors.tron.Fsm;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.salesfoce.apollo.choam.proto.Certification;
import com.salesfoce.apollo.choam.proto.CertifiedBlock;
import com.salesfoce.apollo.choam.proto.Join;
import com.salesfoce.apollo.choam.proto.Validate;
import com.salesfoce.apollo.choam.proto.Validations;
import com.salesfoce.apollo.choam.proto.ViewMember;
import com.salesfoce.apollo.choam.proto.*;
import com.salesfoce.apollo.utils.proto.PubKey;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.choam.comm.Terminal;
Expand All @@ -49,39 +28,46 @@
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.ContextImpl;
import com.salesforce.apollo.membership.Member;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.security.PublicKey;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.salesforce.apollo.crypto.QualifiedBase64.publicKey;
import static com.salesforce.apollo.crypto.QualifiedBase64.signature;

/**
* Construction of the genesis block
*
* @author hal.hildebrand
*
* @author hal.hildebrand
*/
public class GenesisAssembly implements Genesis {
private record Proposed(Join join, Member member, Map<Member, Certification> certifications) {
public Proposed(Join join, Member member) {
this(join, member, new HashMap<>());
}
}

private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class);

private volatile Thread blockingThread;
private final Ethereal controller;
private final ChRbcGossip coordinator;
private volatile OneShot ds;
private final ViewMember genesisMember;
private final Map<Digest, Member> nextAssembly;
private final Map<Digest, Proposed> proposals = new ConcurrentHashMap<>();
private final AtomicBoolean published = new AtomicBoolean();
private volatile HashedBlock reconfiguration;
private final Map<Member, Join> slate = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
private final Transitions transitions;
private final ViewContext view;
private final Map<Member, Validate> witnesses = new ConcurrentHashMap<>();
private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class);
private final Ethereal controller;
private final ChRbcGossip coordinator;
private final ViewMember genesisMember;
private final Map<Digest, Member> nextAssembly;
private final Map<Digest, Proposed> proposals = new ConcurrentHashMap<>();
private final AtomicBoolean published = new AtomicBoolean();
private final Map<Member, Join> slate = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
private final Transitions transitions;
private final ViewContext view;
private final Map<Member, Validate> witnesses = new ConcurrentHashMap<>();
private volatile Thread blockingThread;
private volatile OneShot ds;
private volatile HashedBlock reconfiguration;

public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms, ViewMember genesisMember,
ThreadPoolExecutor consumer) {
ThreadPoolExecutor executor) {
view = vc;
ds = new OneShot();
nextAssembly = Committee.viewMembersOf(view.context().getId(), params().context())
Expand Down Expand Up @@ -118,9 +104,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId());
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(),
(preblock, last) -> transitions.process(preblock, last),
epoch -> transitions.nextEpoch(epoch), consumer);
epoch -> transitions.nextEpoch(epoch), executor);
coordinator = new ChRbcGossip(reContext, params().member(), controller.processor(), params().communications(),
params().exec(),
params().metrics() == null ? null : params().metrics().getGensisMetrics());
log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(),
reContext.getId(), nextAssembly.keySet(), params().member().getId());
Expand All @@ -132,9 +117,9 @@ public void certify() {
.stream()
.filter(p -> p.certifications.size() >= params().majority())
.forEach(p -> slate.put(p.member(), joinOf(p)));
reconfiguration = new HashedBlock(params().digestAlgorithm(),
view.genesis(slate, view.context().getId(),
new NullBlock(params().digestAlgorithm())));
reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(),
new NullBlock(
params().digestAlgorithm())));
var validate = view.generateValidation(reconfiguration);
log.trace("Certifying genesis block: {} for: {} count: {} on: {}", reconfiguration.hash, view.context().getId(),
slate.size(), params().member().getId());
Expand Down Expand Up @@ -167,7 +152,7 @@ public void gather() {
proposals.put(params().member().getId(), proposed);

ds.setValue(join.toByteString());
coordinator.start(params().producer().gossipDuration(), params().scheduler());
coordinator.start(params().producer().gossipDuration());
controller.start();
}

Expand Down Expand Up @@ -196,13 +181,15 @@ public void nominate() {

@Override
public void nominations(PreBlock preblock, boolean last) {
preblock.data().stream().map(bs -> {
try {
return Validations.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
return null;
}
})
preblock.data()
.stream()
.map(bs -> {
try {
return Validations.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
return null;
}
})
.filter(v -> v != null)
.flatMap(vs -> vs.getValidationsList().stream())
.filter(v -> !v.equals(Validate.getDefaultInstance()))
Expand Down Expand Up @@ -326,7 +313,8 @@ private void join(Join join) {
private Join joinOf(Proposed candidate) {
final List<Certification> witnesses = candidate.certifications.values()
.stream()
.sorted(Comparator.comparing(c -> new Digest(c.getId())))
.sorted(
Comparator.comparing(c -> new Digest(c.getId())))
.collect(Collectors.toList());
return Join.newBuilder(candidate.join).clearEndorsements().addAllEndorsements(witnesses).build();
}
Expand Down Expand Up @@ -361,4 +349,10 @@ private void validate(Validate v) {
log.debug("Validation of view member: {}:{} using certifier: {} on: {}", member.getId(), hash,
certifier.getId(), params().member().getId());
}

private record Proposed(Join join, Member member, Map<Member, Certification> certifications) {
public Proposed(Join join, Member member) {
this(join, member, new HashMap<>());
}
}
}
Loading