Skip to content

Commit

Permalink
prepare for view synchronization; moar refactoring around view change.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Apr 15, 2024
1 parent 6e1e5be commit 5f9f93a
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 59 deletions.
16 changes: 8 additions & 8 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class Producer {
private final int lastEpoch;
private final Map<Digest, Member> nextAssembly = new HashMap<>();
private final Map<Digest, PendingBlock> pending = new ConcurrentSkipListMap<>();
private final List<SignedJoin> pendingJoins = new CopyOnWriteArrayList<>();
private final List<Assemblies> pendingAssemblies = new CopyOnWriteArrayList<>();
private final Map<Digest, List<Validate>> pendingValidations = new ConcurrentSkipListMap<>();
private final AtomicReference<HashedBlock> previousBlock = new AtomicReference<>();
private final AtomicBoolean reconfigured = new AtomicBoolean();
Expand Down Expand Up @@ -151,8 +151,8 @@ public SubmitResult submit(Transaction transaction) {
}
}

private void addJoin(SignedJoin signedJoin) {
if (ds.offer(signedJoin)) {
private void addAssembly(Assemblies assemblies) {
if (ds.offer(assemblies)) {
log.trace("Adding on: {}", params().member().getId());
} else {
log.trace("Cannot add join on: {}", params().member().getId());
Expand Down Expand Up @@ -181,14 +181,14 @@ private void create(List<ByteString> preblock, boolean last) {
.filter(p -> p.witnesses.size() >= params().majority())
.forEach(this::publish);

var joins = aggregate.stream().flatMap(e -> e.getJoinsList().stream()).filter(view::validate).toList();
var joins = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList();
final var ass = assembly;
if (ass != null) {
log.trace("Consuming {} units, {} joins on: {}", aggregate.size(), joins.size(), params().member().getId());
ass.inbound().accept(joins);
} else {
log.trace("Pending {} units, {} joins on: {}", aggregate.size(), joins.size(), params().member().getId());
pendingJoins.addAll(joins);
pendingAssemblies.addAll(joins);
}

HashedBlock lb = previousBlock.get();
Expand Down Expand Up @@ -261,7 +261,7 @@ private void publish(PendingBlock p) {

private void reconfigure() {
log.debug("Starting view reconfiguration: {} on: {}", nextViewId, params().member().getId());
assembly = new ViewAssembly(nextViewId, view, Producer.this::addJoin, comms) {
assembly = new ViewAssembly(nextViewId, view, Producer.this::addAssembly, comms) {
@Override
public void complete() {
super.complete();
Expand All @@ -273,8 +273,8 @@ public void complete() {
};
assembly.start();
assembly.assembled();
var joins = new ArrayList<>(pendingJoins);
pendingJoins.clear();
var joins = new ArrayList<>(pendingAssemblies);
pendingAssemblies.clear();
assembly.inbound().accept(joins);
}

Expand Down
30 changes: 21 additions & 9 deletions choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.salesforce.apollo.choam.fsm.Reconfiguration;
import com.salesforce.apollo.choam.fsm.Reconfiguration.Reconfigure;
import com.salesforce.apollo.choam.fsm.Reconfiguration.Transitions;
import com.salesforce.apollo.choam.proto.Assemblies;
import com.salesforce.apollo.choam.proto.Join;
import com.salesforce.apollo.choam.proto.SignedJoin;
import com.salesforce.apollo.choam.proto.SignedViewMember;
Expand Down Expand Up @@ -54,15 +55,15 @@ public class ViewAssembly {
private final AtomicBoolean cancelSlice = new AtomicBoolean();
private final Digest nextViewId;
private final Map<Digest, SignedViewMember> proposals = new ConcurrentHashMap<>();
private final Consumer<SignedJoin> publisher;
private final Consumer<Assemblies> publisher;
private final Map<Digest, Join> slate = new ConcurrentSkipListMap<>();
private final ViewContext view;
private final CommonCommunications<Terminal, ?> comms;
private final Set<Digest> polled = Collections.newSetFromMap(
new ConcurrentSkipListMap<>());
private volatile Map<Digest, Member> nextAssembly;

public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer<SignedJoin> publisher,
public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer<Assemblies> publisher,
CommonCommunications<Terminal, ?> comms) {
view = vc;
this.nextViewId = nextViewId;
Expand Down Expand Up @@ -111,12 +112,17 @@ void finalElection() {
transitions.complete();
}

Consumer<List<SignedJoin>> inbound() {
Consumer<List<Assemblies>> inbound() {
return lre -> {
lre.forEach(vm -> join(vm.getJoin(), false));
lre.forEach(ass -> assemble(ass));
};
}

private void assemble(Assemblies ass) {
ass.getJoinsList().stream().filter(sj -> view.validate(sj)).forEach(sj -> join(sj.getJoin(), false));
ass.getViewsList().stream().filter(sv -> view.validate(sv));
}

private void completeSlice(Duration retryDelay, AtomicReference<Runnable> reiterate) {
if (gathered()) {
return;
Expand Down Expand Up @@ -214,10 +220,12 @@ private void join(SignedViewMember svm, boolean direct) {
if (proposals.putIfAbsent(mid, svm) == null) {
if (direct) {
var signature = view.sign(svm);
publisher.accept(SignedJoin.newBuilder()
.setJoin(svm)
.setMember(params().member().getId().toDigeste())
.setSignature(signature.toSig())
publisher.accept(Assemblies.newBuilder()
.addJoins(SignedJoin.newBuilder()
.setJoin(svm)
.setMember(params().member().getId().toDigeste())
.setSignature(signature.toSig())
.build())
.build());
if (log.isTraceEnabled()) {
log.trace("Publishing view member: {} sig: {} on: {}",
Expand Down Expand Up @@ -306,10 +314,14 @@ public void gather() {

@Override
public void nominate() {
// publisher.accept(getMemberProposal());
transitions.nominated();
}

@Override
public void viewAgreement() {

}

private Join joinOf(SignedViewMember vm) {
return Join.newBuilder().setMember(vm).build();
}
Expand Down
51 changes: 29 additions & 22 deletions choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,45 +223,52 @@ public boolean validate(SignedJoin join) {
return validated;
}

protected Verifier verifierOf(Validate validate) {
final var mid = Digest.from(validate.getWitness().getId());
var m = context.getMember(mid);
if (m == null) {
public boolean validate(SignedViews sv) {
Verifier v = verifierOf(sv);
if (v == null) {
if (log.isDebugEnabled()) {
log.debug("Unable to get verifier by non existent member: [{}] on: {}",
print(validate, params.digestAlgorithm()), params.member().getId());
log.debug("no verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()),
params.member().getId());
}
return null;
return false;
}
Verifier v = validators.get(m);
if (v == null) {
if (log.isDebugEnabled()) {
log.debug("Unable to get verifier by non existent validator: [{}] on: {}",
print(validate, params.digestAlgorithm()), params.member().getId());
var validated = v.verify(JohnHancock.from(sv.getSignature()), sv.getViews().toByteString());
if (!validated) {
if (log.isTraceEnabled()) {
log.trace("Cannot validate views signed by: {} on: {}", Digest.from(sv.getViews().getMember()),
params().member().getId());
}
return null;
} else if (log.isTraceEnabled()) {
log.trace("Validated views signed by: {} on: {}", Digest.from(sv.getViews().getMember()),
params().member().getId());
}
return v;
return validated;
}

protected Verifier verifierOf(Validate validate) {
return getVerifier(context.getMember(Digest.from(validate.getWitness().getId())));
}

protected Verifier verifierOf(SignedJoin sj) {
final var mid = Digest.from(sj.getMember());
var m = context.getMember(mid);
return getVerifier(context.getMember(Digest.from(sj.getMember())));
}

protected Verifier verifierOf(SignedViews sv) {
return getVerifier(context.getMember(Digest.from(sv.getViews().getMember())));
}

private Verifier getVerifier(Member m) {
if (m == null) {
if (log.isDebugEnabled()) {
log.debug("Unable to get verifier by non existent member: [{}] on: {}",
String.format("id: %s sig: %s", Digest.from(sj.getMember()),
params.digestAlgorithm().digest(sj.getSignature().toByteString())),
log.debug("Unable to get verifier by non existent member: {} on: {}", m.getId(),
params.member().getId());
}
return null;
}
Verifier v = validators.get(m);
if (v == null) {
if (log.isDebugEnabled()) {
log.debug("Unable to validate key by non existent validator: [{}] on: {}",
String.format("id: %s sig: %s", Digest.from(sj.getMember()),
params.digestAlgorithm().digest(sj.getSignature().toByteString())),
log.debug("Unable to validate key by non existent validator: {} on: {}", m.getId(),
params.member().getId());
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public interface Reconfiguration {

void nominate();

void viewAgreement();

enum Reconfigure implements Transitions {
AWAIT_ASSEMBLY {
@Override
Expand Down Expand Up @@ -131,6 +133,16 @@ public Transitions complete() {
public void completion() {
context().complete();
}
}, VIEW_AGREEMENT {
@Entry
public void viewConsensus() {
context().viewAgreement();
}

@Override
public Transitions viewDetermined() {
return GATHER;
}
}
}

Expand Down Expand Up @@ -162,5 +174,9 @@ default Transitions nominated() {
default Transitions validation() {
return null;
}

default Transitions viewDetermined() {
throw fsm().invalidTransitionOn();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
package com.salesforce.apollo.choam.support;

import com.google.protobuf.ByteString;
import com.salesforce.apollo.choam.proto.SignedJoin;
import com.salesforce.apollo.choam.proto.Assemblies;
import com.salesforce.apollo.choam.proto.Transaction;
import com.salesforce.apollo.choam.proto.UnitData;
import com.salesforce.apollo.choam.proto.Validate;
Expand Down Expand Up @@ -42,7 +42,7 @@ public class TxDataSource implements DataSource {
private final Member member;
private final ChoamMetrics metrics;
private final BatchingQueue<Transaction> processing;
private final BlockingQueue<SignedJoin> joins = new LinkedBlockingQueue<>();
private final BlockingQueue<Assemblies> assemblies = new LinkedBlockingQueue<>();
private final BlockingQueue<Validate> validations = new LinkedBlockingQueue<>();
private volatile Thread blockingThread;

Expand All @@ -63,10 +63,10 @@ public void close() {
}
blockingThread = null;
if (metrics != null) {
metrics.dropped(processing.size(), validations.size(), joins.size());
metrics.dropped(processing.size(), validations.size(), assemblies.size());
}
log.trace("Closing with remaining txns: {}({}:{}) validations: {} reassemblies: {} on: {}", processing.size(),
processing.added(), processing.taken(), validations.size(), joins.size(), member.getId());
log.trace("Closing with remaining txns: {}({}:{}) validations: {} assemblies: {} on: {}", processing.size(),
processing.added(), processing.taken(), validations.size(), assemblies.size(), member.getId());
}

public void drain() {
Expand All @@ -81,23 +81,23 @@ public ByteString getData() {
log.trace("Requesting unit data on: {}", member.getId());
blockingThread = Thread.currentThread();
try {
var r = new ArrayList<SignedJoin>();
var r = new ArrayList<Assemblies>();
var v = new ArrayList<Validate>();

if (draining.get()) {
var target = Instant.now().plus(drainPolicy.nextBackoff());
while (target.isAfter(Instant.now()) && builder.getJoinsCount() == 0
while (target.isAfter(Instant.now()) && builder.getAssembliesCount() == 0
&& builder.getValidationsCount() == 0) {
// rinse and repeat
r = new ArrayList<>();
joins.drainTo(r);
builder.addAllJoins(r);
assemblies.drainTo(r);
builder.addAllAssemblies(r);

v = new ArrayList<Validate>();
validations.drainTo(v);
builder.addAllValidations(v);

if (builder.getJoinsCount() != 0 || builder.getValidationsCount() != 0) {
if (builder.getAssembliesCount() != 0 || builder.getValidationsCount() != 0) {
break;
}

Expand All @@ -123,8 +123,8 @@ public ByteString getData() {

// One more time into ye breech
r = new ArrayList<>();
joins.drainTo(r);
builder.addAllJoins(r);
assemblies.drainTo(r);
builder.addAllAssemblies(r);

v = new ArrayList<Validate>();
validations.drainTo(v);
Expand All @@ -133,19 +133,19 @@ public ByteString getData() {
ByteString bs = builder.build().toByteString();
if (metrics != null) {
metrics.publishedBatch(builder.getTransactionsCount(), bs.size(), builder.getValidationsCount(),
builder.getJoinsCount());
builder.getAssembliesCount());
}
log.trace("Unit data: {} txns, {} validations, {} joins totalling: {} bytes on: {}",
builder.getTransactionsCount(), builder.getValidationsCount(), builder.getJoinsCount(), bs.size(),
member.getId());
log.trace("Unit data: {} txns, {} validations, {} assemblies totalling: {} bytes on: {}",
builder.getTransactionsCount(), builder.getValidationsCount(), builder.getAssembliesCount(),
bs.size(), member.getId());
return bs;
} finally {
blockingThread = null;
}
}

public int getRemainingReassemblies() {
return joins.size();
return assemblies.size();
}

public int getRemainingTransactions() {
Expand All @@ -156,8 +156,8 @@ public int getRemainingValidations() {
return validations.size();
}

public boolean offer(SignedJoin signedJoin) {
return joins.offer(signedJoin);
public boolean offer(Assemblies assemblies) {
return this.assemblies.offer(assemblies);
}

public boolean offer(Transaction txn) {
Expand Down
23 changes: 22 additions & 1 deletion grpc/src/main/proto/choam.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ message Transaction {
message UnitData {
repeated Validate validations = 1;
repeated Transaction transactions = 2;
repeated SignedJoin joins = 3;
repeated Assemblies assemblies = 3;
}

message Assemblies {
repeated SignedJoin joins = 1;
repeated SignedViews views = 2;
}

message Join {
Expand All @@ -112,6 +117,22 @@ message SignedJoin {
crypto.Sig signature = 3;
}

message SignedViews {
Views views = 1;
crypto.Sig signature = 2;
}

message Views {
crypto.Digeste member = 1;
crypto.Digeste vid = 2;
repeated View views = 3;
}

message View {
crypto.Digeste diadem = 1;
repeated crypto.Digeste committee = 2;
}

message ViewMember {
crypto.Digeste id = 1;
crypto.Digeste view = 2;
Expand Down

0 comments on commit 5f9f93a

Please sign in to comment.