Skip to content

Commit

Permalink
virtual threading (#151)
Browse files Browse the repository at this point in the history
* First form, thread per task iterator

* Fix some crap

* Smoke test for synchronous ring iterator

* correct test

* SyncSliceIterator + Test

test refactoring

* fix

* add sync version of ring comms + test + refactor iterator.

* ReliableBroadcast uses synchronous version of ring comms

* Fireflies uses synchronous version of ring comms

* No more async usage of RingComm

* mass refactoring to eliminate completable/listenable future

Everything async eliminated and relying on virtual threads.

* Renaming and eliminating more futures.  Gorgoneion module passing

Sounds like a sci fi novel

* Fix corgoneion bootstrapping for only one active member.

* run to CHOAM. 1 failure, 2 errors ;)

progress.

finally.

* trace rather than info

* trace rather than info

* remove class cast mock madness

* correctly handle StatusRuntimeException

* back to info

* scheduler per

* Remove schedulers, just hard-wire where needed

* remove some parameterized thread pools

replace with new v thread per

* handle status runtime exception

* whoops

* handle another issue with statusruntimeex.

Use loopback address for testing, not local host

* timout?

* amp up logging

* stereotomy logging

* catch remote runtime excep

* squelch unikerl

* remove even moar thread pools.

basically drive through schedulers using virtual thread pools.  fix some blocking starts in iterators/etc

* ensure shutdown

* further thread pool clean up

* further thread pool clean up.  Fix BootstrappingTest

* fix liquibase error on missing dir

* amp up logging

* better logging

* cleaner testing

* cleaner testing

* cleaner testing

* squelch logging

* squelch logging

* minor cleanup and fix ReliableBroadcaster - was not executing finally if empty result from partner

* set explicit executors to netty client/server builders
  • Loading branch information
Hellblazer authored Nov 5, 2023
1 parent 07a0fb8 commit 64a8359
Show file tree
Hide file tree
Showing 171 changed files with 11,013 additions and 13,274 deletions.
2,281 changes: 1,116 additions & 1,165 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java

Large diffs are not rendered by default.

116 changes: 55 additions & 61 deletions choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,10 @@
*/
package com.salesforce.apollo.choam;

import static com.salesforce.apollo.crypto.QualifiedBase64.publicKey;
import static com.salesforce.apollo.crypto.QualifiedBase64.signature;

import java.security.PublicKey;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.chiralbehaviors.tron.Fsm;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.salesfoce.apollo.choam.proto.Certification;
import com.salesfoce.apollo.choam.proto.CertifiedBlock;
import com.salesfoce.apollo.choam.proto.Join;
import com.salesfoce.apollo.choam.proto.Validate;
import com.salesfoce.apollo.choam.proto.Validations;
import com.salesfoce.apollo.choam.proto.ViewMember;
import com.salesfoce.apollo.choam.proto.*;
import com.salesfoce.apollo.utils.proto.PubKey;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.choam.comm.Terminal;
Expand All @@ -49,39 +28,46 @@
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.ContextImpl;
import com.salesforce.apollo.membership.Member;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.security.PublicKey;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.salesforce.apollo.crypto.QualifiedBase64.publicKey;
import static com.salesforce.apollo.crypto.QualifiedBase64.signature;

/**
* Construction of the genesis block
*
* @author hal.hildebrand
*
* @author hal.hildebrand
*/
public class GenesisAssembly implements Genesis {
private record Proposed(Join join, Member member, Map<Member, Certification> certifications) {
public Proposed(Join join, Member member) {
this(join, member, new HashMap<>());
}
}

private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class);

private volatile Thread blockingThread;
private final Ethereal controller;
private final ChRbcGossip coordinator;
private volatile OneShot ds;
private final ViewMember genesisMember;
private final Map<Digest, Member> nextAssembly;
private final Map<Digest, Proposed> proposals = new ConcurrentHashMap<>();
private final AtomicBoolean published = new AtomicBoolean();
private volatile HashedBlock reconfiguration;
private final Map<Member, Join> slate = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
private final Transitions transitions;
private final ViewContext view;
private final Map<Member, Validate> witnesses = new ConcurrentHashMap<>();
private static final Logger log = LoggerFactory.getLogger(GenesisAssembly.class);
private final Ethereal controller;
private final ChRbcGossip coordinator;
private final ViewMember genesisMember;
private final Map<Digest, Member> nextAssembly;
private final Map<Digest, Proposed> proposals = new ConcurrentHashMap<>();
private final AtomicBoolean published = new AtomicBoolean();
private final Map<Member, Join> slate = new ConcurrentHashMap<>();
private final AtomicBoolean started = new AtomicBoolean();
private final Transitions transitions;
private final ViewContext view;
private final Map<Member, Validate> witnesses = new ConcurrentHashMap<>();
private volatile Thread blockingThread;
private volatile OneShot ds;
private volatile HashedBlock reconfiguration;

public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms, ViewMember genesisMember,
ThreadPoolExecutor consumer) {
ThreadPoolExecutor executor) {
view = vc;
ds = new OneShot();
nextAssembly = Committee.viewMembersOf(view.context().getId(), params().context())
Expand Down Expand Up @@ -118,9 +104,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId());
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(),
(preblock, last) -> transitions.process(preblock, last),
epoch -> transitions.nextEpoch(epoch), consumer);
epoch -> transitions.nextEpoch(epoch), executor);
coordinator = new ChRbcGossip(reContext, params().member(), controller.processor(), params().communications(),
params().exec(),
params().metrics() == null ? null : params().metrics().getGensisMetrics());
log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(),
reContext.getId(), nextAssembly.keySet(), params().member().getId());
Expand All @@ -132,9 +117,9 @@ public void certify() {
.stream()
.filter(p -> p.certifications.size() >= params().majority())
.forEach(p -> slate.put(p.member(), joinOf(p)));
reconfiguration = new HashedBlock(params().digestAlgorithm(),
view.genesis(slate, view.context().getId(),
new NullBlock(params().digestAlgorithm())));
reconfiguration = new HashedBlock(params().digestAlgorithm(), view.genesis(slate, view.context().getId(),
new NullBlock(
params().digestAlgorithm())));
var validate = view.generateValidation(reconfiguration);
log.trace("Certifying genesis block: {} for: {} count: {} on: {}", reconfiguration.hash, view.context().getId(),
slate.size(), params().member().getId());
Expand Down Expand Up @@ -167,7 +152,7 @@ public void gather() {
proposals.put(params().member().getId(), proposed);

ds.setValue(join.toByteString());
coordinator.start(params().producer().gossipDuration(), params().scheduler());
coordinator.start(params().producer().gossipDuration());
controller.start();
}

Expand Down Expand Up @@ -196,13 +181,15 @@ public void nominate() {

@Override
public void nominations(PreBlock preblock, boolean last) {
preblock.data().stream().map(bs -> {
try {
return Validations.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
return null;
}
})
preblock.data()
.stream()
.map(bs -> {
try {
return Validations.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
return null;
}
})
.filter(v -> v != null)
.flatMap(vs -> vs.getValidationsList().stream())
.filter(v -> !v.equals(Validate.getDefaultInstance()))
Expand Down Expand Up @@ -326,7 +313,8 @@ private void join(Join join) {
private Join joinOf(Proposed candidate) {
final List<Certification> witnesses = candidate.certifications.values()
.stream()
.sorted(Comparator.comparing(c -> new Digest(c.getId())))
.sorted(
Comparator.comparing(c -> new Digest(c.getId())))
.collect(Collectors.toList());
return Join.newBuilder(candidate.join).clearEndorsements().addAllEndorsements(witnesses).build();
}
Expand Down Expand Up @@ -361,4 +349,10 @@ private void validate(Validate v) {
log.debug("Validation of view member: {}:{} using certifier: {} on: {}", member.getId(), hash,
certifier.getId(), params().member().getId());
}

private record Proposed(Join join, Member member, Map<Member, Certification> certifications) {
public Proposed(Join join, Member member) {
this(join, member, new HashMap<>());
}
}
}
Loading

0 comments on commit 64a8359

Please sign in to comment.