diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java index 08c8f597e..961fa217f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Producer.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Producer.java @@ -52,7 +52,7 @@ public class Producer { private final int lastEpoch; private final Map nextAssembly = new HashMap<>(); private final Map pending = new ConcurrentSkipListMap<>(); - private final List pendingJoins = new CopyOnWriteArrayList<>(); + private final List pendingAssemblies = new CopyOnWriteArrayList<>(); private final Map> pendingValidations = new ConcurrentSkipListMap<>(); private final AtomicReference previousBlock = new AtomicReference<>(); private final AtomicBoolean reconfigured = new AtomicBoolean(); @@ -151,8 +151,8 @@ public SubmitResult submit(Transaction transaction) { } } - private void addJoin(SignedJoin signedJoin) { - if (ds.offer(signedJoin)) { + private void addAssembly(Assemblies assemblies) { + if (ds.offer(assemblies)) { log.trace("Adding on: {}", params().member().getId()); } else { log.trace("Cannot add join on: {}", params().member().getId()); @@ -181,14 +181,14 @@ private void create(List preblock, boolean last) { .filter(p -> p.witnesses.size() >= params().majority()) .forEach(this::publish); - var joins = aggregate.stream().flatMap(e -> e.getJoinsList().stream()).filter(view::validate).toList(); + var joins = aggregate.stream().flatMap(e -> e.getAssembliesList().stream()).toList(); final var ass = assembly; if (ass != null) { log.trace("Consuming {} units, {} joins on: {}", aggregate.size(), joins.size(), params().member().getId()); ass.inbound().accept(joins); } else { log.trace("Pending {} units, {} joins on: {}", aggregate.size(), joins.size(), params().member().getId()); - pendingJoins.addAll(joins); + pendingAssemblies.addAll(joins); } HashedBlock lb = previousBlock.get(); @@ -261,7 +261,7 @@ private void publish(PendingBlock p) { private void reconfigure() { log.debug("Starting view reconfiguration: {} on: {}", nextViewId, params().member().getId()); - assembly = new ViewAssembly(nextViewId, view, Producer.this::addJoin, comms) { + assembly = new ViewAssembly(nextViewId, view, Producer.this::addAssembly, comms) { @Override public void complete() { super.complete(); @@ -273,8 +273,8 @@ public void complete() { }; assembly.start(); assembly.assembled(); - var joins = new ArrayList<>(pendingJoins); - pendingJoins.clear(); + var joins = new ArrayList<>(pendingAssemblies); + pendingAssemblies.clear(); assembly.inbound().accept(joins); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index ec33dabea..2db8e9d6f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -12,6 +12,7 @@ import com.salesforce.apollo.choam.fsm.Reconfiguration; import com.salesforce.apollo.choam.fsm.Reconfiguration.Reconfigure; import com.salesforce.apollo.choam.fsm.Reconfiguration.Transitions; +import com.salesforce.apollo.choam.proto.Assemblies; import com.salesforce.apollo.choam.proto.Join; import com.salesforce.apollo.choam.proto.SignedJoin; import com.salesforce.apollo.choam.proto.SignedViewMember; @@ -54,7 +55,7 @@ public class ViewAssembly { private final AtomicBoolean cancelSlice = new AtomicBoolean(); private final Digest nextViewId; private final Map proposals = new ConcurrentHashMap<>(); - private final Consumer publisher; + private final Consumer publisher; private final Map slate = new ConcurrentSkipListMap<>(); private final ViewContext view; private final CommonCommunications comms; @@ -62,7 +63,7 @@ public class ViewAssembly { new ConcurrentSkipListMap<>()); private volatile Map nextAssembly; - public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publisher, + public ViewAssembly(Digest nextViewId, ViewContext vc, Consumer publisher, CommonCommunications comms) { view = vc; this.nextViewId = nextViewId; @@ -111,12 +112,17 @@ void finalElection() { transitions.complete(); } - Consumer> inbound() { + Consumer> inbound() { return lre -> { - lre.forEach(vm -> join(vm.getJoin(), false)); + lre.forEach(ass -> assemble(ass)); }; } + private void assemble(Assemblies ass) { + ass.getJoinsList().stream().filter(sj -> view.validate(sj)).forEach(sj -> join(sj.getJoin(), false)); + ass.getViewsList().stream().filter(sv -> view.validate(sv)); + } + private void completeSlice(Duration retryDelay, AtomicReference reiterate) { if (gathered()) { return; @@ -214,10 +220,12 @@ private void join(SignedViewMember svm, boolean direct) { if (proposals.putIfAbsent(mid, svm) == null) { if (direct) { var signature = view.sign(svm); - publisher.accept(SignedJoin.newBuilder() - .setJoin(svm) - .setMember(params().member().getId().toDigeste()) - .setSignature(signature.toSig()) + publisher.accept(Assemblies.newBuilder() + .addJoins(SignedJoin.newBuilder() + .setJoin(svm) + .setMember(params().member().getId().toDigeste()) + .setSignature(signature.toSig()) + .build()) .build()); if (log.isTraceEnabled()) { log.trace("Publishing view member: {} sig: {} on: {}", @@ -306,10 +314,14 @@ public void gather() { @Override public void nominate() { - // publisher.accept(getMemberProposal()); transitions.nominated(); } + @Override + public void viewAgreement() { + + } + private Join joinOf(SignedViewMember vm) { return Join.newBuilder().setMember(vm).build(); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java index 59a55e0e7..2af557f0a 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewContext.java @@ -223,35 +223,44 @@ public boolean validate(SignedJoin join) { return validated; } - protected Verifier verifierOf(Validate validate) { - final var mid = Digest.from(validate.getWitness().getId()); - var m = context.getMember(mid); - if (m == null) { + public boolean validate(SignedViews sv) { + Verifier v = verifierOf(sv); + if (v == null) { if (log.isDebugEnabled()) { - log.debug("Unable to get verifier by non existent member: [{}] on: {}", - print(validate, params.digestAlgorithm()), params.member().getId()); + log.debug("no verifier: {} for signed view on: {}", Digest.from(sv.getViews().getMember()), + params.member().getId()); } - return null; + return false; } - Verifier v = validators.get(m); - if (v == null) { - if (log.isDebugEnabled()) { - log.debug("Unable to get verifier by non existent validator: [{}] on: {}", - print(validate, params.digestAlgorithm()), params.member().getId()); + var validated = v.verify(JohnHancock.from(sv.getSignature()), sv.getViews().toByteString()); + if (!validated) { + if (log.isTraceEnabled()) { + log.trace("Cannot validate views signed by: {} on: {}", Digest.from(sv.getViews().getMember()), + params().member().getId()); } - return null; + } else if (log.isTraceEnabled()) { + log.trace("Validated views signed by: {} on: {}", Digest.from(sv.getViews().getMember()), + params().member().getId()); } - return v; + return validated; + } + + protected Verifier verifierOf(Validate validate) { + return getVerifier(context.getMember(Digest.from(validate.getWitness().getId()))); } protected Verifier verifierOf(SignedJoin sj) { - final var mid = Digest.from(sj.getMember()); - var m = context.getMember(mid); + return getVerifier(context.getMember(Digest.from(sj.getMember()))); + } + + protected Verifier verifierOf(SignedViews sv) { + return getVerifier(context.getMember(Digest.from(sv.getViews().getMember()))); + } + + private Verifier getVerifier(Member m) { if (m == null) { if (log.isDebugEnabled()) { - log.debug("Unable to get verifier by non existent member: [{}] on: {}", - String.format("id: %s sig: %s", Digest.from(sj.getMember()), - params.digestAlgorithm().digest(sj.getSignature().toByteString())), + log.debug("Unable to get verifier by non existent member: {} on: {}", m.getId(), params.member().getId()); } return null; @@ -259,9 +268,7 @@ protected Verifier verifierOf(SignedJoin sj) { Verifier v = validators.get(m); if (v == null) { if (log.isDebugEnabled()) { - log.debug("Unable to validate key by non existent validator: [{}] on: {}", - String.format("id: %s sig: %s", Digest.from(sj.getMember()), - params.digestAlgorithm().digest(sj.getSignature().toByteString())), + log.debug("Unable to validate key by non existent validator: {} on: {}", m.getId(), params.member().getId()); } return null; diff --git a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java index 66b4e51e4..df6705942 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/fsm/Reconfiguration.java @@ -25,6 +25,8 @@ public interface Reconfiguration { void nominate(); + void viewAgreement(); + enum Reconfigure implements Transitions { AWAIT_ASSEMBLY { @Override @@ -131,6 +133,16 @@ public Transitions complete() { public void completion() { context().complete(); } + }, VIEW_AGREEMENT { + @Entry + public void viewConsensus() { + context().viewAgreement(); + } + + @Override + public Transitions viewDetermined() { + return GATHER; + } } } @@ -162,5 +174,9 @@ default Transitions nominated() { default Transitions validation() { return null; } + + default Transitions viewDetermined() { + throw fsm().invalidTransitionOn(); + } } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java index 4a34fc326..350908e90 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/TxDataSource.java @@ -7,7 +7,7 @@ package com.salesforce.apollo.choam.support; import com.google.protobuf.ByteString; -import com.salesforce.apollo.choam.proto.SignedJoin; +import com.salesforce.apollo.choam.proto.Assemblies; import com.salesforce.apollo.choam.proto.Transaction; import com.salesforce.apollo.choam.proto.UnitData; import com.salesforce.apollo.choam.proto.Validate; @@ -42,7 +42,7 @@ public class TxDataSource implements DataSource { private final Member member; private final ChoamMetrics metrics; private final BatchingQueue processing; - private final BlockingQueue joins = new LinkedBlockingQueue<>(); + private final BlockingQueue assemblies = new LinkedBlockingQueue<>(); private final BlockingQueue validations = new LinkedBlockingQueue<>(); private volatile Thread blockingThread; @@ -63,10 +63,10 @@ public void close() { } blockingThread = null; if (metrics != null) { - metrics.dropped(processing.size(), validations.size(), joins.size()); + metrics.dropped(processing.size(), validations.size(), assemblies.size()); } - log.trace("Closing with remaining txns: {}({}:{}) validations: {} reassemblies: {} on: {}", processing.size(), - processing.added(), processing.taken(), validations.size(), joins.size(), member.getId()); + log.trace("Closing with remaining txns: {}({}:{}) validations: {} assemblies: {} on: {}", processing.size(), + processing.added(), processing.taken(), validations.size(), assemblies.size(), member.getId()); } public void drain() { @@ -81,23 +81,23 @@ public ByteString getData() { log.trace("Requesting unit data on: {}", member.getId()); blockingThread = Thread.currentThread(); try { - var r = new ArrayList(); + var r = new ArrayList(); var v = new ArrayList(); if (draining.get()) { var target = Instant.now().plus(drainPolicy.nextBackoff()); - while (target.isAfter(Instant.now()) && builder.getJoinsCount() == 0 + while (target.isAfter(Instant.now()) && builder.getAssembliesCount() == 0 && builder.getValidationsCount() == 0) { // rinse and repeat r = new ArrayList<>(); - joins.drainTo(r); - builder.addAllJoins(r); + assemblies.drainTo(r); + builder.addAllAssemblies(r); v = new ArrayList(); validations.drainTo(v); builder.addAllValidations(v); - if (builder.getJoinsCount() != 0 || builder.getValidationsCount() != 0) { + if (builder.getAssembliesCount() != 0 || builder.getValidationsCount() != 0) { break; } @@ -123,8 +123,8 @@ public ByteString getData() { // One more time into ye breech r = new ArrayList<>(); - joins.drainTo(r); - builder.addAllJoins(r); + assemblies.drainTo(r); + builder.addAllAssemblies(r); v = new ArrayList(); validations.drainTo(v); @@ -133,11 +133,11 @@ public ByteString getData() { ByteString bs = builder.build().toByteString(); if (metrics != null) { metrics.publishedBatch(builder.getTransactionsCount(), bs.size(), builder.getValidationsCount(), - builder.getJoinsCount()); + builder.getAssembliesCount()); } - log.trace("Unit data: {} txns, {} validations, {} joins totalling: {} bytes on: {}", - builder.getTransactionsCount(), builder.getValidationsCount(), builder.getJoinsCount(), bs.size(), - member.getId()); + log.trace("Unit data: {} txns, {} validations, {} assemblies totalling: {} bytes on: {}", + builder.getTransactionsCount(), builder.getValidationsCount(), builder.getAssembliesCount(), + bs.size(), member.getId()); return bs; } finally { blockingThread = null; @@ -145,7 +145,7 @@ public ByteString getData() { } public int getRemainingReassemblies() { - return joins.size(); + return assemblies.size(); } public int getRemainingTransactions() { @@ -156,8 +156,8 @@ public int getRemainingValidations() { return validations.size(); } - public boolean offer(SignedJoin signedJoin) { - return joins.offer(signedJoin); + public boolean offer(Assemblies assemblies) { + return this.assemblies.offer(assemblies); } public boolean offer(Transaction txn) { diff --git a/grpc/src/main/proto/choam.proto b/grpc/src/main/proto/choam.proto index 21d653cb8..0d301c1e1 100644 --- a/grpc/src/main/proto/choam.proto +++ b/grpc/src/main/proto/choam.proto @@ -98,7 +98,12 @@ message Transaction { message UnitData { repeated Validate validations = 1; repeated Transaction transactions = 2; - repeated SignedJoin joins = 3; + repeated Assemblies assemblies = 3; +} + +message Assemblies { + repeated SignedJoin joins = 1; + repeated SignedViews views = 2; } message Join { @@ -112,6 +117,22 @@ message SignedJoin { crypto.Sig signature = 3; } +message SignedViews { + Views views = 1; + crypto.Sig signature = 2; +} + +message Views { + crypto.Digeste member = 1; + crypto.Digeste vid = 2; + repeated View views = 3; +} + +message View { + crypto.Digeste diadem = 1; + repeated crypto.Digeste committee = 2; +} + message ViewMember { crypto.Digeste id = 1; crypto.Digeste view = 2;