Skip to content

Commit

Permalink
no need to pass prioritized executor.
Browse files Browse the repository at this point in the history
Hides this. less chance for errors.
  • Loading branch information
Hellblazer committed Nov 18, 2023
1 parent 27f717e commit a722994
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 210 deletions.
10 changes: 6 additions & 4 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class CHOAM {
private final AtomicReference<HashedCertifiedBlock> checkpoint = new AtomicReference<>();
private final ReliableBroadcaster combine;
private final CommonCommunications<Terminal, Concierge> comm;
private final ThreadPoolExecutor consumer;
private final AtomicReference<Committee> current = new AtomicReference<>();
private final ExecutorService executions;
private final AtomicReference<CompletableFuture<SynchronizedState>> futureBootstrap = new AtomicReference<>();
Expand Down Expand Up @@ -136,7 +135,6 @@ public CHOAM(Parameters params) {
params.context().timeToLive());
combine.register(i -> roundScheduler.tick());
session = new Session(params, service());
consumer = Ethereal.consumer("CHOAM" + params.member().getId() + params.context().getId());
}

public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmentSize) {
Expand Down Expand Up @@ -1199,7 +1197,7 @@ private class Associate extends Administration {
params.member().getId());
Signer signer = new SignerImpl(nextView.consensusKeyPair.getPrivate());
viewContext = new ViewContext(context, params, signer, validators, constructBlock());
producer = new Producer(viewContext, head.get(), checkpoint.get(), comm, consumer);
producer = new Producer(viewContext, head.get(), checkpoint.get(), comm, getLabel());
producer.start();
}

Expand Down Expand Up @@ -1244,7 +1242,7 @@ private Formation() {
params.member().getId());
Signer signer = new SignerImpl(c.consensusKeyPair.getPrivate());
ViewContext vc = new GenesisContext(formation, params, signer, constructBlock());
assembly = new GenesisAssembly(vc, comm, next.get().member, consumer);
assembly = new GenesisAssembly(vc, comm, next.get().member, getLabel());
nextViewId.set(params.genesisViewId());
} else {
log.trace("No formation on: {}", params.member().getId());
Expand Down Expand Up @@ -1315,6 +1313,10 @@ public boolean validate(HashedCertifiedBlock hb) {
}
}

private String getLabel() {
return "CHOAM" + params.member().getId() + params.context().getId();
}

/** a synchronizer of the current committee */
private class Synchronizer implements Committee {
private final Map<Member, Verifier> validators;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class GenesisAssembly implements Genesis {
private volatile HashedBlock reconfiguration;

public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms, ViewMember genesisMember,
ThreadPoolExecutor executor) {
String label) {
view = vc;
ds = new OneShot();
nextAssembly = Committee.viewMembersOf(view.context().getId(), params().context())
Expand Down Expand Up @@ -103,7 +103,7 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId());
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(),
(preblock, last) -> transitions.process(preblock, last),
epoch -> transitions.nextEpoch(epoch), executor);
epoch -> transitions.nextEpoch(epoch), label);
coordinator = new ChRbcGossip(reContext, params().member(), controller.processor(), params().communications(),
params().metrics() == null ? null : params().metrics().getGensisMetrics());
log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(),
Expand Down
4 changes: 2 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void startProduction() {
private final ViewContext view;

public Producer( ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint,
CommonCommunications<Terminal, ?> comms, ThreadPoolExecutor consumer) {
CommonCommunications<Terminal, ?> comms, String label) {
assert view != null;
this.view = view;
this.previousBlock.set(lastBlock);
Expand Down Expand Up @@ -227,7 +227,7 @@ public Producer( ViewContext view, HashedBlock lastBlock, HashedBlock checkpoin
var producerMetrics = params().metrics() == null ? null : params().metrics().getProducerMetrics();
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds,
(preblock, last) -> transitions.create(preblock, last), epoch -> newEpoch(epoch),
consumer);
label);
coordinator = new ChRbcGossip(view.context(), params().member(), controller.processor(),
params().communications(), producerMetrics);
log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.salesforce.apollo.crypto.Digest;
import com.salesforce.apollo.crypto.DigestAlgorithm;
import com.salesforce.apollo.crypto.Signer;
import com.salesforce.apollo.ethereal.Ethereal;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.ContextImpl;
import com.salesforce.apollo.membership.Member;
Expand All @@ -42,7 +41,6 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -67,19 +65,23 @@ public void genesis() throws Exception {
Digest viewId = DigestAlgorithm.DEFAULT.getOrigin().prefix(2);
int cardinality = 5;
var entropy = SecureRandom.getInstance("SHA1PRNG");
entropy.setSeed(new byte[]{6, 6, 6});
entropy.setSeed(new byte[] { 6, 6, 6 });
var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy);

List<Member> members = IntStream.range(0, cardinality).mapToObj(i -> stereotomy.newIdentifier()).map(cpk -> new ControlledIdentifierMember(cpk)).map(e -> (Member) e).toList();
List<Member> members = IntStream.range(0, cardinality)
.mapToObj(i -> stereotomy.newIdentifier())
.map(cpk -> new ControlledIdentifierMember(cpk))
.map(e -> (Member) e)
.toList();
Context<Member> base = new ContextImpl<>(viewId, members.size(), 0.2, 3);
base.activate(members);
Context<Member> committee = Committee.viewFor(viewId, base);

Parameters.Builder params = Parameters.newBuilder()
.setProducer(ProducerParameters.newBuilder()
.setGossipDuration(Duration.ofMillis(100))
.build())
.setGossipDuration(Duration.ofMillis(10));
.setProducer(ProducerParameters.newBuilder()
.setGossipDuration(Duration.ofMillis(100))
.build())
.setGossipDuration(Duration.ofMillis(10));

Map<Member, GenesisAssembly> genii = new HashMap<>();

Expand All @@ -92,10 +94,10 @@ public ViewMember answer(InvocationOnMock invocation) throws Throwable {
KeyPair keyPair = params.getViewSigAlgorithm().generateKeyPair();
final PubKey consensus = bs(keyPair.getPublic());
return ViewMember.newBuilder()
.setId(m.getId().toDigeste())
.setConsensusKey(consensus)
.setSignature(((Signer) m).sign(consensus.toByteString()).toSig())
.build();
.setId(m.getId().toDigeste())
.setConsensusKey(consensus)
.setSignature(((Signer) m).sign(consensus.toByteString()).toSig())
.build();

}
});
Expand All @@ -108,27 +110,25 @@ public ViewMember answer(InvocationOnMock invocation) throws Throwable {
}));
CountDownLatch complete = new CountDownLatch(committee.activeCount());
var comms = members.stream()
.collect(Collectors.toMap(m -> m,
m -> communications.get(m)
.create(m, base.getId(), servers.get(m),
servers.get(m)
.getClass()
.getCanonicalName(),
r -> new TerminalServer(communications.get(m)
.getClientIdentityProvider(),
null, r),
TerminalClient.getCreate(null),
Terminal.getLocalLoopback((SigningMember) m,
servers.get(m)))));
.collect(Collectors.toMap(m -> m, m -> communications.get(m)
.create(m, base.getId(), servers.get(m),
servers.get(m)
.getClass()
.getCanonicalName(),
r -> new TerminalServer(
communications.get(m)
.getClientIdentityProvider(),
null, r),
TerminalClient.getCreate(null),
Terminal.getLocalLoopback(
(SigningMember) m,
servers.get(m)))));
committee.active().forEach(m -> {
SigningMember sm = (SigningMember) m;
Router router = communications.get(m);
params.getProducer().ethereal().setSigner(sm);
var built = params.build(RuntimeParameters.newBuilder()
.setContext(base)
.setMember(sm)
.setCommunications(router)
.build());
var built = params.build(
RuntimeParameters.newBuilder().setContext(base).setMember(sm).setCommunications(router).build());
BlockProducer reconfigure = new BlockProducer() {

@Override
Expand All @@ -139,7 +139,7 @@ public Block checkpoint() {
@Override
public Block genesis(Map<Member, Join> joining, Digest nextViewId, HashedBlock previous) {
return CHOAM.genesis(viewId, joining, previous, committee, previous, built, previous,
Collections.emptyList());
Collections.emptyList());
}

@Override
Expand Down Expand Up @@ -168,11 +168,11 @@ public Block reconfigure(Map<Member, Join> joining, Digest nextViewId, HashedBlo
KeyPair keyPair = params.getViewSigAlgorithm().generateKeyPair();
final PubKey consensus = bs(keyPair.getPublic());
var vm = ViewMember.newBuilder()
.setId(m.getId().toDigeste())
.setConsensusKey(consensus)
.setSignature(((Signer) m).sign(consensus.toByteString()).toSig())
.build();
genii.put(m, new GenesisAssembly(view, comms.get(m), vm, Ethereal.consumer(m.getId().toString())));
.setId(m.getId().toDigeste())
.setConsensusKey(consensus)
.setSignature(((Signer) m).sign(consensus.toByteString()).toSig())
.build();
genii.put(m, new GenesisAssembly(view, comms.get(m), vm, m.getId().toString()));
});

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private void initEthereals() {
};
var controller = new Ethereal(builder.setSigner(members.get(i)).setPid(pid).build(), 1024 * 1024,
dataSources.get(member), blocker, ep -> {
}, Ethereal.consumer(Integer.toString(i)));
}, Integer.toString(i));

var gossiper = new ChRbcGossip(context, member, controller.processor(), communications.get(member), null);
gossipers.add(gossiper);
Expand Down
Loading

0 comments on commit a722994

Please sign in to comment.