From a7229942af8274591ad242b9ab123d9af5f62bf1 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Fri, 17 Nov 2023 17:50:10 -0800 Subject: [PATCH] no need to pass prioritized executor. Hides this. less chance for errors. --- .../com/salesforce/apollo/choam/CHOAM.java | 10 +- .../apollo/choam/GenesisAssembly.java | 4 +- .../com/salesforce/apollo/choam/Producer.java | 4 +- .../apollo/choam/GenesisAssemblyTest.java | 70 +++--- .../apollo/choam/ViewAssemblyTest.java | 2 +- .../salesforce/apollo/ethereal/Ethereal.java | 227 ++++++++---------- .../apollo/ethereal/EtherealTest.java | 19 +- grpc/src/main/proto/pal.proto | 29 --- 8 files changed, 155 insertions(+), 210 deletions(-) delete mode 100644 grpc/src/main/proto/pal.proto diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index afd60e7716..d47f05e4ad 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -70,7 +70,6 @@ public class CHOAM { private final AtomicReference checkpoint = new AtomicReference<>(); private final ReliableBroadcaster combine; private final CommonCommunications comm; - private final ThreadPoolExecutor consumer; private final AtomicReference current = new AtomicReference<>(); private final ExecutorService executions; private final AtomicReference> futureBootstrap = new AtomicReference<>(); @@ -136,7 +135,6 @@ public CHOAM(Parameters params) { params.context().timeToLive()); combine.register(i -> roundScheduler.tick()); session = new Session(params, service()); - consumer = Ethereal.consumer("CHOAM" + params.member().getId() + params.context().getId()); } public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmentSize) { @@ -1199,7 +1197,7 @@ private class Associate extends Administration { params.member().getId()); Signer signer = new SignerImpl(nextView.consensusKeyPair.getPrivate()); viewContext = new ViewContext(context, params, signer, validators, constructBlock()); - producer = new Producer(viewContext, head.get(), checkpoint.get(), comm, consumer); + producer = new Producer(viewContext, head.get(), checkpoint.get(), comm, getLabel()); producer.start(); } @@ -1244,7 +1242,7 @@ private Formation() { params.member().getId()); Signer signer = new SignerImpl(c.consensusKeyPair.getPrivate()); ViewContext vc = new GenesisContext(formation, params, signer, constructBlock()); - assembly = new GenesisAssembly(vc, comm, next.get().member, consumer); + assembly = new GenesisAssembly(vc, comm, next.get().member, getLabel()); nextViewId.set(params.genesisViewId()); } else { log.trace("No formation on: {}", params.member().getId()); @@ -1315,6 +1313,10 @@ public boolean validate(HashedCertifiedBlock hb) { } } + private String getLabel() { + return "CHOAM" + params.member().getId() + params.context().getId(); + } + /** a synchronizer of the current committee */ private class Synchronizer implements Committee { private final Map validators; diff --git a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java index c0febd6a3c..f3f5b0a5a4 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/GenesisAssembly.java @@ -66,7 +66,7 @@ public class GenesisAssembly implements Genesis { private volatile HashedBlock reconfiguration; public GenesisAssembly(ViewContext vc, CommonCommunications comms, ViewMember genesisMember, - ThreadPoolExecutor executor) { + String label) { view = vc; ds = new OneShot(); nextAssembly = Committee.viewMembersOf(view.context().getId(), params().context()) @@ -103,7 +103,7 @@ public GenesisAssembly(ViewContext vc, CommonCommunications comms, config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId()); controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(), (preblock, last) -> transitions.process(preblock, last), - epoch -> transitions.nextEpoch(epoch), executor); + epoch -> transitions.nextEpoch(epoch), label); coordinator = new ChRbcGossip(reContext, params().member(), controller.processor(), params().communications(), params().metrics() == null ? null : params().metrics().getGensisMetrics()); log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(), diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 43c9ace344..4bae00fdcd 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -184,7 +184,7 @@ public void startProduction() { private final ViewContext view; public Producer( ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, - CommonCommunications comms, ThreadPoolExecutor consumer) { + CommonCommunications comms, String label) { assert view != null; this.view = view; this.previousBlock.set(lastBlock); @@ -227,7 +227,7 @@ public Producer( ViewContext view, HashedBlock lastBlock, HashedBlock checkpoin var producerMetrics = params().metrics() == null ? null : params().metrics().getProducerMetrics(); controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, (preblock, last) -> transitions.create(preblock, last), epoch -> newEpoch(epoch), - consumer); + label); coordinator = new ChRbcGossip(view.context(), params().member(), controller.processor(), params().communications(), producerMetrics); log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId()); diff --git a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java index 02ec4a11fa..235669f0c1 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java @@ -22,7 +22,6 @@ import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.DigestAlgorithm; import com.salesforce.apollo.crypto.Signer; -import com.salesforce.apollo.ethereal.Ethereal; import com.salesforce.apollo.membership.Context; import com.salesforce.apollo.membership.ContextImpl; import com.salesforce.apollo.membership.Member; @@ -42,7 +41,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -67,19 +65,23 @@ public void genesis() throws Exception { Digest viewId = DigestAlgorithm.DEFAULT.getOrigin().prefix(2); int cardinality = 5; var entropy = SecureRandom.getInstance("SHA1PRNG"); - entropy.setSeed(new byte[]{6, 6, 6}); + entropy.setSeed(new byte[] { 6, 6, 6 }); var stereotomy = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), entropy); - List members = IntStream.range(0, cardinality).mapToObj(i -> stereotomy.newIdentifier()).map(cpk -> new ControlledIdentifierMember(cpk)).map(e -> (Member) e).toList(); + List members = IntStream.range(0, cardinality) + .mapToObj(i -> stereotomy.newIdentifier()) + .map(cpk -> new ControlledIdentifierMember(cpk)) + .map(e -> (Member) e) + .toList(); Context base = new ContextImpl<>(viewId, members.size(), 0.2, 3); base.activate(members); Context committee = Committee.viewFor(viewId, base); Parameters.Builder params = Parameters.newBuilder() - .setProducer(ProducerParameters.newBuilder() - .setGossipDuration(Duration.ofMillis(100)) - .build()) - .setGossipDuration(Duration.ofMillis(10)); + .setProducer(ProducerParameters.newBuilder() + .setGossipDuration(Duration.ofMillis(100)) + .build()) + .setGossipDuration(Duration.ofMillis(10)); Map genii = new HashMap<>(); @@ -92,10 +94,10 @@ public ViewMember answer(InvocationOnMock invocation) throws Throwable { KeyPair keyPair = params.getViewSigAlgorithm().generateKeyPair(); final PubKey consensus = bs(keyPair.getPublic()); return ViewMember.newBuilder() - .setId(m.getId().toDigeste()) - .setConsensusKey(consensus) - .setSignature(((Signer) m).sign(consensus.toByteString()).toSig()) - .build(); + .setId(m.getId().toDigeste()) + .setConsensusKey(consensus) + .setSignature(((Signer) m).sign(consensus.toByteString()).toSig()) + .build(); } }); @@ -108,27 +110,25 @@ public ViewMember answer(InvocationOnMock invocation) throws Throwable { })); CountDownLatch complete = new CountDownLatch(committee.activeCount()); var comms = members.stream() - .collect(Collectors.toMap(m -> m, - m -> communications.get(m) - .create(m, base.getId(), servers.get(m), - servers.get(m) - .getClass() - .getCanonicalName(), - r -> new TerminalServer(communications.get(m) - .getClientIdentityProvider(), - null, r), - TerminalClient.getCreate(null), - Terminal.getLocalLoopback((SigningMember) m, - servers.get(m))))); + .collect(Collectors.toMap(m -> m, m -> communications.get(m) + .create(m, base.getId(), servers.get(m), + servers.get(m) + .getClass() + .getCanonicalName(), + r -> new TerminalServer( + communications.get(m) + .getClientIdentityProvider(), + null, r), + TerminalClient.getCreate(null), + Terminal.getLocalLoopback( + (SigningMember) m, + servers.get(m))))); committee.active().forEach(m -> { SigningMember sm = (SigningMember) m; Router router = communications.get(m); params.getProducer().ethereal().setSigner(sm); - var built = params.build(RuntimeParameters.newBuilder() - .setContext(base) - .setMember(sm) - .setCommunications(router) - .build()); + var built = params.build( + RuntimeParameters.newBuilder().setContext(base).setMember(sm).setCommunications(router).build()); BlockProducer reconfigure = new BlockProducer() { @Override @@ -139,7 +139,7 @@ public Block checkpoint() { @Override public Block genesis(Map joining, Digest nextViewId, HashedBlock previous) { return CHOAM.genesis(viewId, joining, previous, committee, previous, built, previous, - Collections.emptyList()); + Collections.emptyList()); } @Override @@ -168,11 +168,11 @@ public Block reconfigure(Map joining, Digest nextViewId, HashedBlo KeyPair keyPair = params.getViewSigAlgorithm().generateKeyPair(); final PubKey consensus = bs(keyPair.getPublic()); var vm = ViewMember.newBuilder() - .setId(m.getId().toDigeste()) - .setConsensusKey(consensus) - .setSignature(((Signer) m).sign(consensus.toByteString()).toSig()) - .build(); - genii.put(m, new GenesisAssembly(view, comms.get(m), vm, Ethereal.consumer(m.getId().toString()))); + .setId(m.getId().toDigeste()) + .setConsensusKey(consensus) + .setSignature(((Signer) m).sign(consensus.toByteString()).toSig()) + .build(); + genii.put(m, new GenesisAssembly(view, comms.get(m), vm, m.getId().toString())); }); try { diff --git a/choam/src/test/java/com/salesforce/apollo/choam/ViewAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/ViewAssemblyTest.java index 1fc124391f..36d8cf008c 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/ViewAssemblyTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/ViewAssemblyTest.java @@ -193,7 +193,7 @@ private void initEthereals() { }; var controller = new Ethereal(builder.setSigner(members.get(i)).setPid(pid).build(), 1024 * 1024, dataSources.get(member), blocker, ep -> { - }, Ethereal.consumer(Integer.toString(i))); + }, Integer.toString(i)); var gossiper = new ChRbcGossip(context, member, controller.processor(), communications.get(member), null); gossipers.add(gossiper); diff --git a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java index 4087621dec..3885878616 100644 --- a/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java +++ b/ethereal/src/main/java/com/salesforce/apollo/ethereal/Ethereal.java @@ -6,27 +6,6 @@ */ package com.salesforce.apollo.ethereal; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.protobuf.ByteString; import com.salesfoce.apollo.ethereal.proto.Gossip; import com.salesfoce.apollo.ethereal.proto.Missing; @@ -37,69 +16,67 @@ import com.salesforce.apollo.ethereal.EpochProofBuilder.sharesDB; import com.salesforce.apollo.ethereal.linear.Extender; import com.salesforce.apollo.ethereal.linear.TimingRound; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; /** - * * @author hal.hildebrand - * */ public class Ethereal { - record epoch(int id, Dag dag, Adder adder, AtomicBoolean more) { - - public void close() { - adder.close(); - more.set(false); - } - - public void noMoreUnits() { - more.set(false); - } - } - - private record epochWithNewer(epoch epoch, boolean newer) { - - public void noMoreUnits() { - if (epoch != null) { - epoch.noMoreUnits(); - } - } + private static final Logger log = LoggerFactory.getLogger(Ethereal.class); + private final Config config; + private final ThreadPoolExecutor consumer; + private final Creator creator; + private final AtomicInteger currentEpoch = new AtomicInteger(-1); + private final Map epochs = new ConcurrentHashMap<>(); + private final Set failed = new ConcurrentSkipListSet<>(); + private final Queue lastTiming; + private final int maxSerializedSize; + private final Consumer newEpochAction; + private final AtomicBoolean started = new AtomicBoolean(); + private final Consumer> toPreblock; + public Ethereal(Config config, int maxSerializedSize, DataSource ds, BiConsumer, Boolean> blocker, + Consumer newEpochAction, String label) { + this(config, maxSerializedSize, ds, blocker(blocker, config), newEpochAction, label); } - - private record UnitTask(Unit unit, Consumer consumer) implements Runnable, Comparable { - - @Override - public int compareTo(UnitTask o) { - var comp = Integer.compare(unit.epoch(), o.unit.epoch()); - if (comp < 0 || comp > 0) { - return comp; - } - comp = Integer.compare(unit.height(), o.unit.height()); - if (comp < 0 || comp > 0) { - return comp; - } - return Integer.compare(unit.creator(), o.unit.creator()); - } - - @Override - public void run() { - consumer.accept(unit); + public Ethereal(Config conf, int maxSerializedSize, DataSource ds, Consumer> toPreblock, + Consumer newEpochAction, String label) { + if (!Dag.validate(conf.nProc())) { + throw new IllegalArgumentException("Invalid # of processes, unable to build quorum: " + conf.nProc()); } + this.config = conf; + this.lastTiming = new LinkedBlockingDeque<>(); + this.toPreblock = toPreblock; + this.newEpochAction = newEpochAction; + this.maxSerializedSize = maxSerializedSize; + this.consumer = consumer(label); + creator = new Creator(config, ds, lastTiming, u -> { + assert u.creator() == config.pid(); + log.trace("Sending: {} on: {}", u, config.logLabel()); + insert(u); + }, epoch -> new epochProofImpl(config, epoch, new sharesDB(config, new ConcurrentHashMap<>()))); } - private static final Logger log = LoggerFactory.getLogger(Ethereal.class); - - public static ThreadPoolExecutor consumer(String label) { + private static ThreadPoolExecutor consumer(String label) { return new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), Thread.ofVirtual().name("Ethereal Consumer[" + label + "]").factory(), (r, t) -> log.trace("Shutdown, cannot consume unit", t)); } /** - * return a preblock from a slice of units containing a timing round. It assumes - * that the timing unit is the last unit in the slice, and that random source - * data of the timing unit starts with random bytes from the previous level. + * return a preblock from a slice of units containing a timing round. It assumes that the timing unit is the last + * unit in the slice, and that random source data of the timing unit starts with random bytes from the previous + * level. */ public static List toList(List round) { var data = new ArrayList(); @@ -135,42 +112,6 @@ private static Consumer> blocker(BiConsumer, Boolean }; } - private final Config config; - private final ThreadPoolExecutor consumer; - private final Creator creator; - private final AtomicInteger currentEpoch = new AtomicInteger(-1); - private final Map epochs = new ConcurrentHashMap<>(); - private final Set failed = new ConcurrentSkipListSet<>(); - private final Queue lastTiming; - private final int maxSerializedSize; - private final Consumer newEpochAction; - private final AtomicBoolean started = new AtomicBoolean(); - private final Consumer> toPreblock; - - public Ethereal(Config config, int maxSerializedSize, DataSource ds, BiConsumer, Boolean> blocker, - Consumer newEpochAction, ThreadPoolExecutor consumer) { - this(config, maxSerializedSize, ds, blocker(blocker, config), newEpochAction, consumer); - } - - public Ethereal(Config conf, int maxSerializedSize, DataSource ds, Consumer> toPreblock, - Consumer newEpochAction, ThreadPoolExecutor consumer) { - if (!Dag.validate(conf.nProc())) { - throw new IllegalArgumentException("Invalid # of processes, unable to build quorum: " + conf.nProc()); - } - this.config = conf; - this.lastTiming = new LinkedBlockingDeque<>(); - this.toPreblock = toPreblock; - this.newEpochAction = newEpochAction; - this.maxSerializedSize = maxSerializedSize; - this.consumer = consumer; - - creator = new Creator(config, ds, lastTiming, u -> { - assert u.creator() == config.pid(); - log.trace("Sending: {} on: {}", u, config.logLabel()); - insert(u); - }, epoch -> new epochProofImpl(config, epoch, new sharesDB(config, new ConcurrentHashMap<>()))); - } - public String dump() { var builder = new StringBuffer(); builder.append("****************************").append('\n'); @@ -216,9 +157,8 @@ public Update gossip(Gossip gossip) { .filter(e -> e.getKey() >= current) .filter(e -> !haves.contains(e.getKey())) .forEach(e -> { - builder.addMissings(Missing.newBuilder() - .setEpoch(e.getKey()) - .setHaves(e.getValue().adder().have())); + builder.addMissings( + Missing.newBuilder().setEpoch(e.getKey()).setHaves(e.getValue().adder().have())); }); return builder.build(); } @@ -290,8 +230,8 @@ private epoch createEpoch(int epoch) { final var current = lastTU.get(); final var next = ext.chooseNextTimingUnits(current, handleTimingRounds); if (!lastTU.compareAndSet(current, next)) { - throw new IllegalStateException(String.format("LastTU has been changed underneath us, expected: %s have: %s", - current, next)); + throw new IllegalStateException( + String.format("LastTU has been changed underneath us, expected: %s have: %s", current, next)); } consumer.execute(new UnitTask(u, unit -> { @@ -332,12 +272,10 @@ private epochWithNewer getEpoch(int epoch) { } /** - * Waits for ordered round of units produced by Extenders and produces Preblocks - * based on them. Since Extenders in multiple epochs can supply ordered rounds - * simultaneously, handleTimingRounds needs to ensure that Preblocks are - * produced in ascending order with respect to epochs. For the last ordered - * round of the epoch, the timing unit defining it is sent to the creator (to - * produce signature shares.) + * Waits for ordered round of units produced by Extenders and produces Preblocks based on them. Since Extenders in + * multiple epochs can supply ordered rounds simultaneously, handleTimingRounds needs to ensure that Preblocks are + * produced in ascending order with respect to epochs. For the last ordered round of the epoch, the timing unit + * defining it is sent to the creator (to produce signature shares.) */ private Consumer> handleTimingRounds() { AtomicInteger current = new AtomicInteger(0); @@ -359,9 +297,9 @@ private Consumer> handleTimingRounds() { } /** - * insert puts the provided unit directly into the corresponding epoch. If such - * epoch does not exist, creates it. All correctness checks (epoch proof, adder, - * dag checks) are skipped. This method is meant for our own units only. + * insert puts the provided unit directly into the corresponding epoch. If such epoch does not exist, creates it. + * All correctness checks (epoch proof, adder, dag checks) are skipped. This method is meant for our own units + * only. */ private void insert(Unit unit) { if (unit.creator() != config.pid()) { @@ -379,8 +317,8 @@ private void insert(Unit unit) { } /** - * newEpoch creates and returns a new epoch object with the given EpochID. If - * such epoch already exists, returns it. + * newEpoch creates and returns a new epoch object with the given EpochID. If such epoch already exists, returns + * it. */ private epoch newEpoch(int epoch) { if (epoch >= config.numberOfEpochs()) { @@ -411,8 +349,8 @@ private epoch newEpoch(int epoch) { } /** - * newEpoch creates and returns a new epoch object with the given EpochID. If - * such epoch already exists, returns it. + * newEpoch creates and returns a new epoch object with the given EpochID. If such epoch already exists, returns + * it. */ private epoch retreiveEpoch(int epoch) { final var currentId = currentEpoch.get(); @@ -430,8 +368,8 @@ private epoch retreiveEpoch(int epoch) { } /** - * retrieveEpoch returns an epoch for the given preunit. If the preunit comes - * from a future epoch, it is checked for new epoch proof. + * retrieveEpoch returns an epoch for the given preunit. If the preunit comes from a future epoch, it is checked for + * new epoch proof. */ private epoch retrieveEpoch(PreUnit pu) { var epochId = pu.epoch(); @@ -444,4 +382,47 @@ private epoch retrieveEpoch(PreUnit pu) { } return epoch; } + + record epoch(int id, Dag dag, Adder adder, AtomicBoolean more) { + + public void close() { + adder.close(); + more.set(false); + } + + public void noMoreUnits() { + more.set(false); + } + } + + private record epochWithNewer(epoch epoch, boolean newer) { + + public void noMoreUnits() { + if (epoch != null) { + epoch.noMoreUnits(); + } + } + } + + private record UnitTask(Unit unit, Consumer consumer) implements Runnable, Comparable { + + @Override + public int compareTo(UnitTask o) { + var comp = Integer.compare(unit.epoch(), o.unit.epoch()); + if (comp < 0 || comp > 0) { + return comp; + } + comp = Integer.compare(unit.height(), o.unit.height()); + if (comp < 0 || comp > 0) { + return comp; + } + return Integer.compare(unit.creator(), o.unit.creator()); + } + + @Override + public void run() { + consumer.accept(unit); + } + + } } diff --git a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java index 4c08852a63..ccd4e3fd65 100644 --- a/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java +++ b/ethereal/src/test/java/com/salesforce/apollo/ethereal/EtherealTest.java @@ -59,11 +59,7 @@ public class EtherealTest { @Test public void context() throws Exception { - var consumers = new ArrayList(); - for (var i = 0; i < NPROC; i++) { - consumers.add(Ethereal.consumer(Integer.toString(i))); - } - one(0, consumers); + one(0); } @Test @@ -71,18 +67,14 @@ public void lots() throws Exception { if (!LARGE_TESTS) { return; } - var consumers = new ArrayList(); - for (var i = 0; i < NPROC; i++) { - consumers.add(Ethereal.consumer(Integer.toString(i))); - } for (int i = 0; i < 10; i++) { System.out.println("Iteration: " + i); - one(i, consumers); + one(i); System.out.println(); } } - private void one(int iteration, List consumers) + private void one(int iteration) throws NoSuchAlgorithmException, InterruptedException, InvalidProtocolBufferException { final var gossipPeriod = Duration.ofMillis(5); @@ -143,7 +135,7 @@ private void one(int iteration, List consumers) if (pid == 0) { System.out.println("new epoch: " + ep); } - }, consumers.get(i)); + }, "Test: " + i); var gossiper = new ChRbcGossip(context, member, controller.processor(), com, metrics); gossipers.add(gossiper); @@ -210,8 +202,7 @@ private void one(int iteration, List consumers) "Iteration: " + iteration + ", mismatch at block: " + j + " unit: " + k + " process: " + i + " expected: " + a.get(k) + " received: " + b.get(k)); } - outputOrder.add( - new String(ByteMessage.parseFrom(a.get(k)).getContents().toByteArray())); + outputOrder.add(new String(ByteMessage.parseFrom(a.get(k)).getContents().toByteArray())); } } } diff --git a/grpc/src/main/proto/pal.proto b/grpc/src/main/proto/pal.proto deleted file mode 100644 index 49d760cdb1..0000000000 --- a/grpc/src/main/proto/pal.proto +++ /dev/null @@ -1,29 +0,0 @@ -syntax = "proto3"; - -option java_multiple_files = true; -option java_package = "com.salesfoce.apollo.pal.proto"; -option java_outer_classname = "ApolloPalProto"; -option objc_class_prefix = "Ap"; - -import "google/protobuf/any.proto"; - -package pal; - -service Pal { - rpc decrypt ( Encrypted ) returns (Decrypted) {} -} - -message Decrypted { - map secrets = 1; - string error = 2; -} - -message Encrypted { - map secrets = 1; -} - -message Secret { - repeated string labels = 1; - string decryptor = 2; - google.protobuf.Any encrypted = 3; -} \ No newline at end of file