diff --git a/choam/pom.xml b/choam/pom.xml
index d4c07c80e..1a91f65b4 100644
--- a/choam/pom.xml
+++ b/choam/pom.xml
@@ -30,6 +30,10 @@
org.jooq
joou
+
+ com.googlecode.concurrentlinkedhashmap
+ concurrentlinkedhashmap-lru
+
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
index 436bedeb3..d05256ed5 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
@@ -11,6 +11,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.choam.comm.*;
@@ -95,11 +96,15 @@ public class CHOAM {
private final TransSubmission txnSubmission = new TransSubmission();
private final AtomicReference view = new AtomicReference<>();
private final AtomicReference> pendingView = new AtomicReference<>();
+ private final ConcurrentLinkedHashMap> pendingViews;
public CHOAM(Parameters params) {
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
this.params = params;
executions = Executors.newVirtualThreadPerTaskExecutor();
+ pendingViews = new ConcurrentLinkedHashMap.Builder>().maximumWeightedCapacity(100)
+ .initialCapacity(10)
+ .build();
nextView();
var bContext = new DelegatedContext<>(params.context());
@@ -321,10 +326,15 @@ public void nextView(Context context, Digest diadem) {
if (c != null) {
c.nextView(context);
} else {
- log.info("Acquiring new view, diadem: {} size: {} on: {}", diadem, context.size(), params.member().getId());
+ log.info("Acquiring new view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(),
+ params.member().getId());
params.context().setContext(context);
pendingView.set(null);
}
+
+ log.info("Pushing pending view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(),
+ params.member().getId());
+ pendingViews.putIfAbsent(diadem, context);
}
public void start() {
@@ -512,20 +522,6 @@ public void onFailure() {
transitions.fail();
}
- @Override
- public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) {
- final HashedCertifiedBlock v = view.get();
- var block = Block.newBuilder()
- .setHeader(
- buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(),
- checkpoint.hash, v.height(), v.hash))
- .setAssemble(assemble)
- .build();
- log.trace("Produced block: {} height: {} on: {}", block.getBodyCase(), block.getHeader().getHeight(),
- params.member().getId());
- return block;
- }
-
@Override
public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) {
final HashedCertifiedBlock v = view.get();
@@ -674,22 +670,19 @@ private Supplier> pendingView() {
};
}
+ private Supplier>> pendingViews() {
+ return () -> pendingViews;
+ }
+
private void process() {
final var c = current.get();
final HashedCertifiedBlock h = head.get();
log.info("Begin block: {} hash: {} height: {} committee: {} on: {}", h.block.getBodyCase(), h.hash, h.height(),
c.getClass().getSimpleName(), params.member().getId());
switch (h.block.getBodyCase()) {
- case ASSEMBLE: {
- params.processor().beginBlock(h.height(), h.hash);
- nextViewId.set(Digest.from(h.block.getAssemble().getNextView()));
- log.info("Assembled next view id: {} on: {}", nextViewId.get(), params.member().getId());
- c.assembled();
- break;
- }
case RECONFIGURE: {
params.processor().beginBlock(h.height(), h.hash);
- reconfigure(h.block.getReconfigure());
+ reconfigure(h.hash, h.block.getReconfigure());
break;
}
case GENESIS: {
@@ -697,7 +690,7 @@ private void process() {
cancelBootstrap();
transitions.regenerated();
genesisInitialization(h, h.block.getGenesis().getInitializeList());
- reconfigure(h.block.getGenesis().getInitialView());
+ reconfigure(h.hash, h.block.getGenesis().getInitialView());
break;
}
case EXECUTIONS: {
@@ -719,9 +712,9 @@ private void process() {
params.member().getId());
}
- private void reconfigure(Reconfigure reconfigure) {
- log.info("Clearing next view id on: {}", params.member().getId());
- nextViewId.set(null);
+ private void reconfigure(Digest hash, Reconfigure reconfigure) {
+ log.info("Setting next view id: {} on: {}", hash, params.member().getId());
+ nextViewId.set(hash);
var pv = pendingView.getAndSet(null);
if (pv != null) {
// always advance view.
@@ -741,7 +734,7 @@ private void reconfigure(Reconfigure reconfigure) {
} else {
log.warn("Reconfiguration to associate failed: {} in view: {} on:{}", validators.size(),
new Digest(reconfigure.getId()), params.member().getId());
- current.set(new Client(validators, getViewId()));
+ transitions.fail();
}
} else {
current.set(new Client(validators, getViewId()));
@@ -1013,8 +1006,6 @@ public interface BlockProducer {
void onFailure();
- Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint);
-
Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint);
void publish(Digest hash, CertifiedBlock cb);
@@ -1306,16 +1297,12 @@ private class Associate extends Administration {
params.member().getId());
Signer signer = new SignerImpl(nextView.consensusKeyPair.getPrivate(), ULong.MIN);
Supplier> pv = pendingView();
- producer = new Producer(new ViewContext(context, params, pv, signer, validators, constructBlock()),
+ producer = new Producer(nextViewId.get(),
+ new ViewContext(context, params, pv, signer, validators, constructBlock()),
head.get(), checkpoint.get(), comm, getLabel());
producer.start();
}
- @Override
- public void assembled() {
- producer.assembled();
- }
-
@Override
public void complete() {
producer.stop();
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Committee.java b/choam/src/main/java/com/salesforce/apollo/choam/Committee.java
index c7ad7fcf4..a9c86256c 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/Committee.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/Committee.java
@@ -64,9 +64,6 @@ static Set viewMembersOf(Digest hash, Context super Member> baseContex
void accept(HashedCertifiedBlock next);
- default void assembled() {
- }
-
void complete();
boolean isMember();
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java
index dd6954b33..31219e253 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java
@@ -21,7 +21,6 @@
import com.salesforce.apollo.choam.support.TxDataSource;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
-import com.salesforce.apollo.cryptography.JohnHancock;
import com.salesforce.apollo.ethereal.Config;
import com.salesforce.apollo.ethereal.Config.Builder;
import com.salesforce.apollo.ethereal.Ethereal;
@@ -45,7 +44,6 @@ public class Producer {
private static final Logger log = LoggerFactory.getLogger(Producer.class);
private final AtomicBoolean assembled = new AtomicBoolean();
- private final AtomicReference assembly = new AtomicReference<>();
private final AtomicReference checkpoint = new AtomicReference<>();
private final CommonCommunications comms;
private final Ethereal controller;
@@ -61,15 +59,17 @@ public class Producer {
private final AtomicBoolean started = new AtomicBoolean(false);
private final Transitions transitions;
private final ViewContext view;
- private volatile Digest nextViewId;
+ private final Digest nextViewId;
+ private ViewAssembly assembly;
- public Producer(ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint,
+ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint,
CommonCommunications comms, String label) {
assert view != null;
this.view = view;
this.previousBlock.set(lastBlock);
this.comms = comms;
this.checkpoint.set(checkpoint);
+ this.nextViewId = nextViewId;
final Parameters params = view.params();
final var producerParams = params.producer();
@@ -112,22 +112,14 @@ public Producer(ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint,
log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId());
}
- public void assembled() {
- transitions.assembled();
- }
-
- public Digest getNextViewId() {
- final Digest current = nextViewId;
- return current;
- }
-
public void start() {
if (!started.compareAndSet(false, true)) {
return;
}
+ reconfigure();
final Block prev = previousBlock.get().block;
- if (prev.hasReconfigure() && prev.getReconfigure().getCheckpointTarget() == 0) { // genesis block won't ever be
- // 0
+ // genesis block won't ever be 0
+ if (prev.hasReconfigure() && prev.getReconfigure().getCheckpointTarget() == 0) {
transitions.checkpoint();
} else {
transitions.start();
@@ -141,7 +133,7 @@ public void stop() {
log.trace("Closing producer for: {} on: {}", getViewId(), params().member().getId());
controller.stop();
coordinator.stop();
- final var c = assembly.get();
+ final var c = assembly;
if (c != null) {
c.stop();
}
@@ -159,11 +151,11 @@ public SubmitResult submit(Transaction transaction) {
}
}
- private void addReassemble(Reassemble r) {
- if (ds.offer(r)) {
- log.trace("Adding joins: {} on: {}", r.getMembersList(), params().member().getId());
+ private void addJoin(SignedJoin signedJoin) {
+ if (ds.offer(signedJoin)) {
+ log.trace("Adding on: {}", params().member().getId());
} else {
- log.trace("Cannot add joins: {} on: {}", r.getMembersCount(), params().member().getId());
+ log.trace("Cannot add join on: {}", params().member().getId());
}
}
@@ -189,13 +181,13 @@ private void create(List preblock, boolean last) {
.filter(p -> p.witnesses.size() >= params().majority())
.forEach(this::publish);
- var joins = aggregate.stream().flatMap(e -> e.getJoinsList().stream()).filter(j -> validate(j)).toList();
- final var ass = assembly.get();
+ var joins = aggregate.stream().flatMap(e -> e.getJoinsList().stream()).filter(view::validate).toList();
+ final var ass = assembly;
if (ass != null) {
- log.trace("Consuming joins: {} on: {}", aggregate.size(), joins.size(), params().member().getId());
+ log.trace("Consuming {} units, {} joins on: {}", aggregate.size(), joins.size(), params().member().getId());
ass.inbound().accept(joins);
} else {
- log.trace("Pending joins: {} on: {}", aggregate.size(), joins.size(), params().member().getId());
+ log.trace("Pending {} units, {} joins on: {}", aggregate.size(), joins.size(), params().member().getId());
pendingJoins.addAll(joins);
}
@@ -254,30 +246,6 @@ private void processPendingValidations(HashedBlock block, PendingBlock p) {
}
}
- private void produceAssemble() {
- final var vlb = previousBlock.get();
- nextViewId = vlb.hash;
- for (var m : Committee.viewMembersOf(nextViewId, view.pendingView())) {
- nextAssembly.put(m.getId(), m);
- }
- log.debug("Assembling: {} on: {}", nextViewId, params().member().getId());
- final var assemble = new HashedBlock(params().digestAlgorithm(), view.produce(vlb.height().add(1), vlb.hash,
- Assemble.newBuilder()
- .setNextView(
- vlb.hash.toDigeste())
- .build(),
- checkpoint.get()));
- previousBlock.set(assemble);
- final var validation = view.generateValidation(assemble);
- final var p = new PendingBlock(assemble, new HashMap<>(), new AtomicBoolean());
- pending.put(assemble.hash, p);
- p.witnesses.put(params().member(), validation);
- ds.offer(validation);
- log.debug("Produced block: {} hash: {} height: {} from: {} on: {}", assemble.block.getBodyCase(), assemble.hash,
- assemble.height(), getViewId(), params().member().getId());
- processPendingValidations(assemble, p);
- }
-
private void publish(PendingBlock p) {
assert p.witnesses.size() >= params().majority() : "Publishing non majority block";
log.debug("Published pending: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(),
@@ -291,26 +259,23 @@ private void publish(PendingBlock p) {
view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), cb));
}
- private boolean validate(SignedJoin join) {
- var mid = Digest.from(join.getMember());
- var m = nextAssembly.get(mid);
- if (m == null) {
- log.trace("Cannot validate join view: {} of: {} signed by: {} on: {}",
- Digest.from(join.getJoin().getVm().getView()), Digest.from(join.getJoin().getVm().getId()), mid,
- params().member().getId());
- return false;
- }
- var validated = m.verify(JohnHancock.from(join.getSignature()), join.getJoin().toByteString());
- if (!validated) {
- log.trace("Cannot validate view join: {} of: {} signed by: {} on: {}",
- Digest.from(join.getJoin().getVm().getView()), Digest.from(join.getJoin().getVm().getId()), mid,
- params().member().getId());
- } else {
- log.trace("Validated view join: {} of: {} signed by: {} on: {}",
- Digest.from(join.getJoin().getVm().getView()), Digest.from(join.getJoin().getVm().getId()), mid,
- params().member().getId());
- }
- return validated;
+ private void reconfigure() {
+ log.debug("Starting view reconfiguration: {} on: {}", nextViewId, params().member().getId());
+ assembly = new ViewAssembly(nextViewId, view, Producer.this::addJoin, comms) {
+ @Override
+ public void complete() {
+ super.complete();
+ log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(),
+ params().member().getId());
+ assembled.set(true);
+ Producer.this.transitions.viewComplete();
+ }
+ };
+ assembly.start();
+ assembly.assembled();
+ var joins = new ArrayList<>(pendingJoins);
+ pendingJoins.clear();
+ assembly.inbound().accept(joins);
}
private PendingBlock validate(Validate v) {
@@ -345,7 +310,7 @@ public void assembled() {
log.debug("assembly already complete on: {}", params().member().getId());
return;
}
- final var slate = assembly.get().getSlate();
+ final var slate = assembly.getSlate();
var reconfiguration = new HashedBlock(params().digestAlgorithm(),
view.reconfigure(slate, nextViewId, previousBlock.get(),
checkpoint.get()));
@@ -368,7 +333,7 @@ public void checkAssembly() {
if (dropped != 0) {
log.warn("Dropped txns: {} on: {}", dropped, params().member().getId());
}
- final var viewAssembly = assembly.get();
+ final var viewAssembly = assembly;
if (viewAssembly == null) {
log.error("Assemble block never processed on: {}", params().member().getId());
transitions.failed();
@@ -420,31 +385,6 @@ public void fail() {
view.onFailure();
}
- @Override
- public void produceAssemble() {
- Producer.this.produceAssemble();
- }
-
- @Override
- public void reconfigure() {
- log.debug("Starting view reconfiguration: {} on: {}", nextViewId, params().member().getId());
- assembly.set(new ViewAssembly(nextViewId, view, Producer.this::addReassemble, comms) {
- @Override
- public void complete() {
- super.complete();
- log.debug("View reconfiguration: {} gathered: {} complete on: {}", nextViewId, getSlate().size(),
- params().member().getId());
- assembled.set(true);
- Producer.this.transitions.viewComplete();
- }
- });
- assembly.get().start();
- assembly.get().assembled();
- var joins = new ArrayList<>(pendingJoins);
- pendingJoins.clear();
- assembly.get().inbound().accept(joins);
- }
-
@Override
public void startProduction() {
log.debug("Starting production for: {} on: {}", getViewId(), params().member().getId());
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
index 1cfc03a25..847aac3c7 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
@@ -13,7 +13,6 @@
import com.salesforce.apollo.choam.fsm.Reconfiguration.Reconfigure;
import com.salesforce.apollo.choam.fsm.Reconfiguration.Transitions;
import com.salesforce.apollo.choam.proto.Join;
-import com.salesforce.apollo.choam.proto.Reassemble;
import com.salesforce.apollo.choam.proto.SignedJoin;
import com.salesforce.apollo.choam.proto.SignedViewMember;
import com.salesforce.apollo.context.Context;
@@ -55,7 +54,7 @@ public class ViewAssembly {
private final AtomicBoolean cancelSlice = new AtomicBoolean();
private final Digest nextViewId;
private final Map proposals = new ConcurrentHashMap<>();
- private final Consumer publisher;
+ private final Consumer publisher;
private final Map slate = new ConcurrentSkipListMap<>();
private final ViewContext view;
private final CommonCommunications comms;
@@ -63,7 +62,7 @@ public class ViewAssembly {
new ConcurrentSkipListMap<>());
private volatile Map nextAssembly;
- public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publisher,
+ public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publisher,
CommonCommunications comms) {
view = vc;
this.nextViewId = nextViewId;
@@ -118,23 +117,17 @@ Consumer> inbound() {
};
}
- private void completeSlice(AtomicReference retryDelay, AtomicReference reiterate) {
+ private void completeSlice(Duration retryDelay, AtomicReference reiterate) {
if (gathered()) {
return;
}
- final var delay = retryDelay.get();
- if (delay.compareTo(params().producer().maxGossipDelay()) < 0) {
- retryDelay.accumulateAndGet(Duration.ofMillis(100), Duration::plus);
- }
-
- log.trace("Proposal incomplete of: {} polled: {}, total: {} majority: {}, retrying: {} on: {}", nextViewId,
- polled.stream().sorted().toList(), nextAssembly.size(), params().majority(), delay,
- params().member().getId());
+ log.trace("Proposal incomplete of: {} polled: {}, total: {} retrying: {} on: {}", nextViewId,
+ polled.stream().sorted().toList(), nextAssembly.size(), retryDelay, params().member().getId());
if (!cancelSlice.get()) {
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())
- .schedule(() -> Thread.ofVirtual().start(Utils.wrapped(reiterate.get(), log)), delay.toMillis(),
- TimeUnit.MILLISECONDS);
+ .schedule(() -> Thread.ofVirtual().start(Utils.wrapped(reiterate.get(), log)),
+ retryDelay.toNanos(), TimeUnit.NANOSECONDS);
}
}
@@ -217,16 +210,25 @@ private void join(SignedViewMember svm, boolean direct) {
}
return;
}
+ polled.add(mid);
if (proposals.putIfAbsent(mid, svm) == null) {
- if (log.isTraceEnabled()) {
- log.trace("Adding view member: {} on: {}", ViewContext.print(svm, params().digestAlgorithm()),
- params().member().getId());
- }
if (direct) {
- publisher.accept(Reassemble.newBuilder().addMembers(svm).build());
+ var sig = params().member().sign(svm.toByteString()).toSig();
+ publisher.accept(SignedJoin.newBuilder()
+ .setJoin(svm)
+ .setMember(params().member().getId().toDigeste())
+ .setSignature(sig)
+ .build());
+ if (log.isTraceEnabled()) {
+ log.trace("Publishing view member: {} sig: {} on: {}",
+ ViewContext.print(svm, params().digestAlgorithm()),
+ params().digestAlgorithm().digest(sig.toByteString()), params().member().getId());
+ }
+ } else if (log.isTraceEnabled()) {
+ log.trace("Adding discovered view member: {} on: {}",
+ ViewContext.print(svm, params().digestAlgorithm()), params().member().getId());
}
}
- polled.add(mid);
}
private Parameters params() {
@@ -262,13 +264,12 @@ public void elect() {
proposals.entrySet().stream().forEach(e -> slate.put(e.getKey(), joinOf(e.getValue())));
if (slate.size() == view.pendingView().getRingCount()) {
cancelSlice.set(true);
- log.debug("Electing: {} of: {} slate: {} proposals: {} on: {}", slate.size(), nextViewId,
- slate.keySet().stream().sorted().toList(), proposals.keySet().stream().sorted().toList(),
- params().member().getId());
+ log.debug("Electing: {} of: {} slate: {} on: {}", slate.size(), nextViewId,
+ slate.keySet().stream().sorted().toList(), params().member().getId());
transitions.complete();
} else {
Context memberContext = view.pendingView();
- log.error("Failed election, required: {} slate: {} of: {} on: {}", memberContext.majority(),
+ log.error("Failed election, required: {} slate: {} of: {} on: {}", view.pendingView().getRingCount(),
proposals.keySet().stream().sorted().toList(), nextViewId, params().member().getId());
}
}
@@ -284,7 +285,7 @@ public void failed() {
public void gather() {
log.trace("Gathering assembly for: {} on: {}", nextViewId, params().member().getId());
AtomicReference reiterate = new AtomicReference<>();
- AtomicReference retryDelay = new AtomicReference<>(Duration.ofMillis(10));
+ var retryDelay = Duration.ofMillis(10);
reiterate.set(() -> {
nextAssembly = Committee.viewMembersOf(nextViewId, view.pendingView())
.stream()
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java
index 1a64d79ae..d7aa35adc 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java
@@ -144,10 +144,6 @@ public Context pendingView() {
return pendingView.get();
}
- public Block produce(ULong l, Digest hash, Assemble assemble, HashedBlock checkpoint) {
- return blockProducer.produce(l, hash, assemble, checkpoint);
- }
-
public Block produce(ULong l, Digest hash, Executions executions, HashedBlock checkpoint) {
return blockProducer.produce(l, hash, executions, checkpoint);
}
@@ -191,12 +187,35 @@ public boolean validate(SignedViewMember svm, Validate validate) {
return valid;
}
+ public boolean validate(SignedJoin join) {
+ if (true) {
+ return true;
+ }
+ Verifier v = verifierOf(join);
+ if (v == null) {
+ log.debug("no verifier: {} for join: {} on: {}", Digest.from(join.getMember()),
+ Digest.from(join.getJoin().getVm().getId()), params.member().getId());
+ return false;
+ }
+ var validated = v.verify(JohnHancock.from(join.getSignature()), join.getJoin().toByteString());
+ if (!validated) {
+ log.trace("Cannot validate view join: [{}] sig: {} signed by: {} on: {}",
+ print(join.getJoin(), params.digestAlgorithm()),
+ params.digestAlgorithm().digest(join.getSignature().toByteString()),
+ Digest.from(join.getMember()), params().member().getId());
+ } else {
+ log.trace("Validated view join: [{}] signed by: {} on: {}", print(join.getJoin(), params.digestAlgorithm()),
+ Digest.from(join.getMember()), params().member().getId());
+ }
+ return validated;
+ }
+
protected Verifier verifierOf(Validate validate) {
final var mid = Digest.from(validate.getWitness().getId());
var m = context.getMember(mid);
if (m == null) {
if (log.isDebugEnabled()) {
- log.debug("Unable to validate key by non existent validator: [{}] on: {}",
+ log.debug("Unable to get verifier by non existent member: [{}] on: {}",
print(validate, params.digestAlgorithm()), params.member().getId());
}
return null;
@@ -204,11 +223,36 @@ protected Verifier verifierOf(Validate validate) {
Verifier v = validators.get(m);
if (v == null) {
if (log.isDebugEnabled()) {
- log.debug("Unable to validate key by non existent validator: [{}] on: {}",
+ log.debug("Unable to get verifier by non existent validator: [{}] on: {}",
print(validate, params.digestAlgorithm()), params.member().getId());
}
return null;
}
return v;
}
+
+ protected Verifier verifierOf(SignedJoin sj) {
+ final var mid = Digest.from(sj.getMember());
+ var m = context.getMember(mid);
+ if (m == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Unable to get verifier by non existent member: [{}] on: {}",
+ String.format("id: %s sig: %s", Digest.from(sj.getMember()),
+ params.digestAlgorithm().digest(sj.getSignature().toByteString())),
+ params.member().getId());
+ }
+ return null;
+ }
+ Verifier v = validators.get(m);
+ if (v == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Unable to validate key by non existent validator: [{}] on: {}",
+ String.format("id: %s sig: %s", Digest.from(sj.getMember()),
+ params.digestAlgorithm().digest(sj.getSignature().toByteString())),
+ params.member().getId());
+ }
+ return null;
+ }
+ return v;
+ }
}
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java
index d0f8c43e8..88ae2bada 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java
@@ -33,20 +33,10 @@ public interface Driven {
void fail();
- void produceAssemble();
-
- void reconfigure();
-
void startProduction();
enum Earner implements Driven.Transitions {
AWAIT_VIEW {
- @Override
- public Transitions assembled() {
- context().assembled();
- return null;
- }
-
@Entry
public void checkAssembly() {
context().checkAssembly();
@@ -125,21 +115,9 @@ public void terminate() {
context().fail();
}
}, SPICE {
- @Override
- public Transitions assembled() {
- context().reconfigure();
- return null;
- }
-
@Override
public Transitions newEpoch(int epoch, int lastEpoch) {
- if (lastEpoch == epoch) {
- return AWAIT_VIEW;
- }
- if (epoch == 0) {
- context().produceAssemble();
- }
- return null;
+ return (lastEpoch == epoch) ? AWAIT_VIEW : null;
}
@Entry
@@ -158,10 +136,6 @@ public Transitions viewComplete() {
interface Transitions extends FsmExecutor {
Logger log = LoggerFactory.getLogger(Transitions.class);
- default Transitions assembled() {
- return null;
- }
-
default Transitions checkpoint() {
throw fsm().invalidTransitionOn();
}
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetrics.java b/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetrics.java
index 77b62f177..f1d6129f9 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetrics.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetrics.java
@@ -27,7 +27,7 @@ public interface ChoamMetrics extends EndpointMetrics {
EtherealMetrics getProducerMetrics();
- void publishedBatch(int batchSize, int byteSize, int validations, int reassemblies);
+ void publishedBatch(int batchSize, int byteSize, int validations, int joins);
void transactionCancelled();
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetricsImpl.java b/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetricsImpl.java
index 6ff2e0e84..71b732242 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetricsImpl.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetricsImpl.java
@@ -35,7 +35,7 @@ public class ChoamMetricsImpl extends EndpointMetricsImpl implements ChoamMetric
private final EtherealMetrics genesisMetrics;
private final EtherealMetrics producerMetrics;
private final Histogram publishedBytes;
- private final Meter publishedReassemblies;
+ private final Meter publishedJoins;
private final Meter publishedTransactions;
private final Meter publishedValidations;
private final MetricRegistry registry;
@@ -67,7 +67,7 @@ public ChoamMetricsImpl(Digest context, MetricRegistry registry) {
cancelledTransactions = registry.meter(name(context.shortString(), "transactions.cancelled"));
publishedTransactions = registry.meter(name(context.shortString(), "transactions.published"));
publishedBytes = registry.histogram(name(context.shortString(), "unit.bytes"));
- publishedReassemblies = registry.meter(name(context.shortString(), "reassemblies.published"));
+ publishedJoins = registry.meter(name(context.shortString(), "joins.published"));
publishedValidations = registry.meter(name(context.shortString(), "validations.published"));
transactionLatency = registry.timer(name(context.shortString(), "transaction.latency"));
transactionSubmitRetry = registry.meter(name(context.shortString(), "transaction.submit.retry"));
@@ -117,11 +117,11 @@ public EtherealMetrics getProducerMetrics() {
}
@Override
- public void publishedBatch(int transactions, int byteSize, int validations, int reassemblies) {
+ public void publishedBatch(int transactions, int byteSize, int validations, int joins) {
publishedTransactions.mark(transactions);
publishedBytes.update(byteSize);
publishedValidations.mark(validations);
- publishedReassemblies.mark(reassemblies);
+ publishedJoins.mark(joins);
}
@Override
diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java
index 5acb7c4cb..4a34fc326 100644
--- a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java
+++ b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java
@@ -7,7 +7,7 @@
package com.salesforce.apollo.choam.support;
import com.google.protobuf.ByteString;
-import com.salesforce.apollo.choam.proto.Reassemble;
+import com.salesforce.apollo.choam.proto.SignedJoin;
import com.salesforce.apollo.choam.proto.Transaction;
import com.salesforce.apollo.choam.proto.UnitData;
import com.salesforce.apollo.choam.proto.Validate;
@@ -37,13 +37,13 @@ public class TxDataSource implements DataSource {
private final static Logger log = LoggerFactory.getLogger(TxDataSource.class);
private final Duration batchInterval;
- private final AtomicBoolean draining = new AtomicBoolean();
+ private final AtomicBoolean draining = new AtomicBoolean();
private final ExponentialBackoffPolicy drainPolicy;
private final Member member;
private final ChoamMetrics metrics;
private final BatchingQueue processing;
- private final BlockingQueue reassemblies = new LinkedBlockingQueue<>();
- private final BlockingQueue validations = new LinkedBlockingQueue<>();
+ private final BlockingQueue joins = new LinkedBlockingQueue<>();
+ private final BlockingQueue validations = new LinkedBlockingQueue<>();
private volatile Thread blockingThread;
public TxDataSource(Member member, int maxElements, ChoamMetrics metrics, int maxBatchByteSize,
@@ -63,10 +63,10 @@ public void close() {
}
blockingThread = null;
if (metrics != null) {
- metrics.dropped(processing.size(), validations.size(), reassemblies.size());
+ metrics.dropped(processing.size(), validations.size(), joins.size());
}
log.trace("Closing with remaining txns: {}({}:{}) validations: {} reassemblies: {} on: {}", processing.size(),
- processing.added(), processing.taken(), validations.size(), reassemblies.size(), member.getId());
+ processing.added(), processing.taken(), validations.size(), joins.size(), member.getId());
}
public void drain() {
@@ -81,23 +81,23 @@ public ByteString getData() {
log.trace("Requesting unit data on: {}", member.getId());
blockingThread = Thread.currentThread();
try {
- var r = new ArrayList();
+ var r = new ArrayList();
var v = new ArrayList();
if (draining.get()) {
var target = Instant.now().plus(drainPolicy.nextBackoff());
- while (target.isAfter(Instant.now()) && builder.getReassembliesCount() == 0
+ while (target.isAfter(Instant.now()) && builder.getJoinsCount() == 0
&& builder.getValidationsCount() == 0) {
// rinse and repeat
- r = new ArrayList();
- reassemblies.drainTo(r);
- builder.addAllReassemblies(r);
+ r = new ArrayList<>();
+ joins.drainTo(r);
+ builder.addAllJoins(r);
v = new ArrayList();
validations.drainTo(v);
builder.addAllValidations(v);
- if (builder.getReassembliesCount() != 0 || builder.getValidationsCount() != 0) {
+ if (builder.getJoinsCount() != 0 || builder.getValidationsCount() != 0) {
break;
}
@@ -122,9 +122,9 @@ public ByteString getData() {
}
// One more time into ye breech
- r = new ArrayList();
- reassemblies.drainTo(r);
- builder.addAllReassemblies(r);
+ r = new ArrayList<>();
+ joins.drainTo(r);
+ builder.addAllJoins(r);
v = new ArrayList();
validations.drainTo(v);
@@ -133,11 +133,11 @@ public ByteString getData() {
ByteString bs = builder.build().toByteString();
if (metrics != null) {
metrics.publishedBatch(builder.getTransactionsCount(), bs.size(), builder.getValidationsCount(),
- builder.getReassembliesCount());
+ builder.getJoinsCount());
}
- log.trace("Unit data: {} txns, {} validations, {} reassemblies totalling: {} bytes on: {}",
- builder.getTransactionsCount(), builder.getValidationsCount(), builder.getReassembliesCount(),
- bs.size(), member.getId());
+ log.trace("Unit data: {} txns, {} validations, {} joins totalling: {} bytes on: {}",
+ builder.getTransactionsCount(), builder.getValidationsCount(), builder.getJoinsCount(), bs.size(),
+ member.getId());
return bs;
} finally {
blockingThread = null;
@@ -145,7 +145,7 @@ public ByteString getData() {
}
public int getRemainingReassemblies() {
- return reassemblies.size();
+ return joins.size();
}
public int getRemainingTransactions() {
@@ -156,8 +156,8 @@ public int getRemainingValidations() {
return validations.size();
}
- public boolean offer(Reassemble reassembly) {
- return reassemblies.offer(reassembly);
+ public boolean offer(SignedJoin signedJoin) {
+ return joins.offer(signedJoin);
}
public boolean offer(Transaction txn) {
diff --git a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java
index 83d441916..136b7e37a 100644
--- a/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java
+++ b/choam/src/test/java/com/salesforce/apollo/choam/GenesisAssemblyTest.java
@@ -147,11 +147,6 @@ public void onFailure() {
// do nothing
}
- @Override
- public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) {
- return null;
- }
-
@Override
public Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint) {
return null;
diff --git a/grpc/src/main/proto/choam.proto b/grpc/src/main/proto/choam.proto
index 5c5c7416c..21d653cb8 100644
--- a/grpc/src/main/proto/choam.proto
+++ b/grpc/src/main/proto/choam.proto
@@ -47,7 +47,6 @@ message Block {
Reconfigure reconfigure = 3;
Checkpoint checkpoint = 4;
Executions executions = 5;
- Assemble assemble = 6;
}
}
@@ -84,10 +83,6 @@ message Executions {
repeated Transaction executions = 1;
}
-message Assemble {
- crypto.Digeste nextView = 1;
-}
-
message FoundationSeal {
stereotomy.KeyEvent_ foundation = 1;
crypto.Sig signature = 2;
@@ -103,8 +98,7 @@ message Transaction {
message UnitData {
repeated Validate validations = 1;
repeated Transaction transactions = 2;
- repeated Reassemble reassemblies = 3;
- repeated SignedJoin joins = 4;
+ repeated SignedJoin joins = 3;
}
message Join {
@@ -157,12 +151,6 @@ message Validate {
Certification witness = 2;
}
-message Reassemble {
- repeated SignedViewMember members = 1;
- repeated Validate validations = 2;
- repeated crypto.Digeste slate = 3;
-}
-
message Validations {
repeated Validate validations = 1;
}
diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java
index 4c72ce176..45a6e9e47 100644
--- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java
+++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java
@@ -258,7 +258,7 @@ private Builder params() {
.setMaxBatchCount(3000)
.build())
.setCheckpointBlockDelta(200);
- params.getProducer().ethereal().setNumberOfEpochs(4);
+ params.getProducer().ethereal().setEpochLength(7).setNumberOfEpochs(3);
return params;
}
}
diff --git a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java
index 9140f7793..509577801 100644
--- a/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java
+++ b/model/src/test/java/com/salesforce/apollo/model/FireFliesTest.java
@@ -181,7 +181,7 @@ private Builder params() {
.build())
.setCheckpointBlockDelta(200);
- params.getProducer().ethereal().setNumberOfEpochs(5);
+ params.getProducer().ethereal().setEpochLength(7).setNumberOfEpochs(3);
return params;
}
}
diff --git a/model/src/test/resources/logback-test.xml b/model/src/test/resources/logback-test.xml
index 6c5f75706..87a97a0aa 100644
--- a/model/src/test/resources/logback-test.xml
+++ b/model/src/test/resources/logback-test.xml
@@ -62,7 +62,7 @@
-
+
diff --git a/pom.xml b/pom.xml
index 977d4005d..c1a3a9a7e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -558,6 +558,11 @@
org-netbeans-modules-keyring
RELEASE200
+
+ com.googlecode.concurrentlinkedhashmap
+ concurrentlinkedhashmap-lru
+ 1.4.2
+