Skip to content

Commit

Permalink
B 5 (#181)
Browse files Browse the repository at this point in the history
* Moar cleanup

Mostly lambda cleanup, but a few minor bugs as well.

* split out verifiers from Parameters to allow de/serialization

* missed

* Configure DHT URL template, rather than hard-wire

* Configure DHT URL template, rather than hard-wire

* add encryption caps, expand keystore to alias storage :(

moar cleanup

* add Key Encapsulation Method to EncryptionAlgorithm

moar cleanup

* forgot about the algo diff.

Weird asymmetry

* error messages

* add context interceptor and provide bootstrapping

* further cleanup of notes/certs/validations

Fallout from the addition of event sequence to signature.  No longer need event coordinates, just the identifier.

* better error handling/logging for KerlDHT.

* bootstrap optimization for context majority

* far better logging.  add support for getKeyState(id, seq)

* ugh.  forgot this

* implement getKS(id, seq) - wow. Gate with empty check on reconcile.

* bootstrapping accommodations. use diadem's cardinality, rather than context bft normalized cardinality. Ring* handle no members

* moar bootstrapping accommodations.  "up to 4"
  • Loading branch information
Hellblazer authored Jan 18, 2024
1 parent b0aeca0 commit f89e8b7
Show file tree
Hide file tree
Showing 113 changed files with 1,713 additions and 2,045 deletions.
42 changes: 12 additions & 30 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmen
length = state.length();
}
int count = (int) (length / segmentSize);
if (length != 0 && count * segmentSize < length) {
if (length != 0 && (long) count * segmentSize < length) {
count++;
}
var accumulator = new HexBloom.HexAccumulator(count, crowns, initial);
Expand Down Expand Up @@ -190,15 +190,8 @@ public static Digest hashOf(Transaction transaction, DigestAlgorithm digestAlgor
}

public static String print(Join join, DigestAlgorithm da) {
StringBuilder builder = new StringBuilder();
builder.append("J[view: ")
.append(Digest.from(join.getView()))
.append(" member: ")
.append(ViewContext.print(join.getMember(), da))
.append("certifications: ")
.append(join.getEndorsementsList().stream().map(c -> ViewContext.print(c, da)).toList())
.append("]");
return builder.toString();
return "J[view: " + Digest.from(join.getView()) + " member: " + ViewContext.print(join.getMember(), da)
+ "certifications: " + join.getEndorsementsList().stream().map(c -> ViewContext.print(c, da)).toList() + "]";
}

public static Reconfigure reconfigure(Digest nextViewId, Map<Member, Join> joins, Context<Member> context,
Expand All @@ -208,7 +201,7 @@ public static Reconfigure reconfigure(Digest nextViewId, Map<Member, Join> joins
// Canonical labeling of the view members for Ethereal
var remapped = rosterMap(context, joins.keySet());

remapped.keySet().stream().sorted().map(d -> remapped.get(d)).forEach(m -> builder.addJoins(joins.get(m)));
remapped.keySet().stream().sorted().map(remapped::get).forEach(m -> builder.addJoins(joins.get(m)));

var reconfigure = builder.build();
return reconfigure;
Expand All @@ -234,7 +227,7 @@ public static Map<Digest, Member> rosterMap(Context<Member> baseContext, Collect

// Canonical labeling of the view members for Ethereal
var ring0 = baseContext.ring(0);
return members.stream().collect(Collectors.toMap(m -> ring0.hash(m), m -> m));
return members.stream().collect(Collectors.toMap(ring0::hash, m -> m));
}

public static List<Transaction> toGenesisData(List<? extends Message> initializationData) {
Expand Down Expand Up @@ -444,7 +437,7 @@ private void combine() {
}

private void combine(List<Msg> messages) {
messages.forEach(m -> combine(m));
messages.forEach(this::combine);
transitions.combine();
}

Expand Down Expand Up @@ -600,10 +593,7 @@ private boolean isNext(HashedBlock next) {
return true;
}
final Digest prev = next.getPrevious();
if (h.hash.equals(prev)) {
return true;
}
return false;
return h.hash.equals(prev);
}

private ViewMember join(Digest nextView, Digest from) {
Expand Down Expand Up @@ -785,7 +775,7 @@ private Digest signatureHash(ByteString any) {
.stream()
.map(cert -> JohnHancock.from(cert.getSignature()))
.map(sig -> sig.toDigest(params.digestAlgorithm()))
.reduce(Digest.from(cb.getBlock().getHeader().getBodyHash()), (a, b) -> a.xor(b));
.reduce(Digest.from(cb.getBlock().getHeader().getBodyHash()), Digest::xor);
}

/**
Expand Down Expand Up @@ -875,7 +865,7 @@ private void synchronize(SynchronizedState state) {
state.lastCheckpoint != null ? state.lastCheckpoint.hash : state.genesis.hash, pending.size(),
params.member().getId());
try {
linear.execute(() -> transitions.regenerated());
linear.execute(transitions::regenerated);
} catch (RejectedExecutionException e) {
// ignore
}
Expand Down Expand Up @@ -982,7 +972,6 @@ public void anchor() {
if (anchor != null) {
log.info("Synchronizing from anchor: {} on: {}", anchor.hash, params.member().getId());
transitions.bootstrap(anchor);
return;
}
}

Expand Down Expand Up @@ -1161,10 +1150,6 @@ public SubmitResult submitTxn(Transaction transaction) {
log.debug("No link for: {} for submitting txn on: {}", target.getId(), params.member().getId());
return SubmitResult.newBuilder().setResult(Result.UNAVAILABLE).build();
}
// if (log.isTraceEnabled()) {
// log.trace("Submitting received txn: {} to: {} in: {} on: {}",
// hashOf(transaction, params.digestAlgorithm()), target.getId(), viewId, params.member().getId());
// }
return link.submit(transaction);
} catch (StatusRuntimeException e) {
log.trace("Failed submitting txn: {} status:{} to: {} in: {} on: {}",
Expand All @@ -1191,8 +1176,7 @@ public boolean validate(HashedCertifiedBlock hb) {
/** a member of the current committee */
private class Associate extends Administration {

private final Producer producer;
private final ViewContext viewContext;
private final Producer producer;

Associate(HashedCertifiedBlock viewChange, Map<Member, Verifier> validators, nextView nextView) {
super(validators, new Digest(
Expand All @@ -1204,8 +1188,8 @@ private class Associate extends Administration {
params.digestAlgorithm().digest(nextView.member.getSignature().toByteString()), viewId,
params.member().getId());
Signer signer = new SignerImpl(nextView.consensusKeyPair.getPrivate(), ULong.MIN);
viewContext = new ViewContext(context, params, signer, validators, constructBlock());
producer = new Producer(viewContext, head.get(), checkpoint.get(), comm, getLabel());
producer = new Producer(new ViewContext(context, params, signer, validators, constructBlock()), head.get(),
checkpoint.get(), comm, getLabel());
producer.start();
}

Expand All @@ -1221,8 +1205,6 @@ public void complete() {

@Override
public SubmitResult submit(Transaction request) {
// log.trace("Submit txn: {} to producer on: {}", hashOf(request.getTransaction(), params.digestAlgorithm()),
// params().member());
return producer.submit(request);
}
}
Expand Down
23 changes: 9 additions & 14 deletions choam/src/main/java/com/salesforce/apollo/choam/Committee.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,8 @@
*/
package com.salesforce.apollo.choam;

import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.slf4j.Logger;

import com.salesforce.apollo.choam.proto.Certification;
import com.salesforce.apollo.choam.proto.Reconfigure;
import com.salesforce.apollo.choam.proto.SubmitResult;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.choam.proto.SubmitResult.Result;
import com.salesforce.apollo.choam.proto.Transaction;
import com.salesforce.apollo.choam.proto.ViewMember;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
Expand All @@ -30,6 +17,14 @@
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.ContextImpl;
import com.salesforce.apollo.membership.Member;
import org.slf4j.Logger;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey;

/**
* @author hal.hildebrand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
import com.chiralbehaviors.tron.Fsm;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.cryptography.proto.PubKey;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.choam.comm.Terminal;
import com.salesforce.apollo.choam.fsm.Genesis;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.choam.support.HashedBlock;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock.NullBlock;
import com.salesforce.apollo.choam.support.OneShot;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.proto.PubKey;
import com.salesforce.apollo.ethereal.Config;
import com.salesforce.apollo.ethereal.Dag;
import com.salesforce.apollo.ethereal.DataSource;
Expand All @@ -31,10 +31,7 @@
import org.slf4j.LoggerFactory;

import java.security.PublicKey;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -70,7 +67,7 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
ds = new OneShot();
nextAssembly = Committee.viewMembersOf(view.context().getId(), params().context())
.stream()
.collect(Collectors.toMap(m -> m.getId(), m -> m));
.collect(Collectors.toMap(Member::getId, m -> m));
if (!Dag.validate(nextAssembly.size())) {
log.error("Invalid cardinality: {} for: {} on: {}", nextAssembly.size(), view.context().getId(),
params().member().getId());
Expand All @@ -80,9 +77,9 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,

// Create a new context for reconfiguration
final Digest reconPrefixed = view.context().getId().prefix("Genesis Assembly");
Context<Member> reContext = new ContextImpl<Member>(reconPrefixed, view.context().memberCount(),
view.context().getProbabilityByzantine(),
view.context().getBias());
Context<Member> reContext = new ContextImpl<>(reconPrefixed, view.context().memberCount(),
view.context().getProbabilityByzantine(),
view.context().getBias());
reContext.activate(view.context().activeMembers());

final Fsm<Genesis, Transitions> fsm = Fsm.construct(this, Transitions.class, BrickLayer.INITIAL, true);
Expand All @@ -101,8 +98,7 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
config.setEpochLength(7).setNumberOfEpochs(3);
config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId());
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(),
(preblock, last) -> transitions.process(preblock, last),
epoch -> transitions.nextEpoch(epoch), label);
transitions::process, transitions::nextEpoch, label);
coordinator = new ChRbcGossip(reContext, params().member(), controller.processor(), params().communications(),
params().metrics() == null ? null : params().metrics().getGensisMetrics());
log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(),
Expand Down Expand Up @@ -133,7 +129,7 @@ public void certify(List<ByteString> preblock, boolean last) {
} catch (InvalidProtocolBufferException e) {
return null;
}
}).filter(v -> v != null).filter(v -> !v.equals(Validate.getDefaultInstance())).forEach(v -> certify(v));
}).filter(Objects::nonNull).filter(v -> !v.equals(Validate.getDefaultInstance())).forEach(this::certify);
}

@Override
Expand Down Expand Up @@ -162,7 +158,7 @@ public void gather(List<ByteString> preblock, boolean last) {
} catch (InvalidProtocolBufferException e) {
return null;
}
}).filter(j -> j != null).filter(j -> !j.equals(Join.getDefaultInstance())).forEach(j -> join(j));
}).filter(Objects::nonNull).filter(j -> !j.equals(Join.getDefaultInstance())).forEach(this::join);
}

@Override
Expand All @@ -173,7 +169,7 @@ public void nominate() {
.stream()
.filter(p -> !p.member.equals(params().member()))
.map(p -> view.generateValidation(p.join.getMember()))
.forEach(v -> validations.addValidations(v));
.forEach(validations::addValidations);
ds.setValue(validations.build().toByteString());
}

Expand All @@ -187,10 +183,10 @@ public void nominations(List<ByteString> preblock, boolean last) {
return null;
}
})
.filter(v -> v != null)
.filter(Objects::nonNull)
.flatMap(vs -> vs.getValidationsList().stream())
.filter(v -> !v.equals(Validate.getDefaultInstance()))
.forEach(v -> validate(v));
.forEach(this::validate);
}

@Override
Expand All @@ -199,7 +195,7 @@ public void publish() {
witnesses.entrySet()
.stream()
.sorted(Comparator.comparing(e -> e.getKey().getId()))
.map(e -> e.getValue())
.map(Map.Entry::getValue)
.forEach(v -> b.addCertifications(v.getWitness()));
view.publish(new HashedCertifiedBlock(params().digestAlgorithm(), b.build()));
log.debug("Genesis block: {} published for: {} on: {}", reconfiguration.hash, view.context().getId(),
Expand Down Expand Up @@ -247,19 +243,16 @@ private void certify(Validate v) {
}

private DataSource dataSource() {
return new DataSource() {
@Override
public ByteString getData() {
if (!started.get()) {
return ByteString.EMPTY;
}
try {
blockingThread = Thread.currentThread();
final var take = ds.get();
return take;
} finally {
blockingThread = null;
}
return () -> {
if (!started.get()) {
return ByteString.EMPTY;
}
try {
blockingThread = Thread.currentThread();
final var take = ds.get();
return take;
} finally {
blockingThread = null;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
*/
package com.salesforce.apollo.choam;

import java.util.Collections;

import com.salesforce.apollo.choam.proto.Validate;
import com.salesforce.apollo.choam.CHOAM.BlockProducer;
import com.salesforce.apollo.choam.proto.Validate;
import com.salesforce.apollo.cryptography.Signer;
import com.salesforce.apollo.cryptography.Verifier;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.Member;

import java.util.Collections;

/**
* @author hal.hildebrand
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
import com.netflix.concurrency.limits.limit.AIMDLimit;
import com.netflix.concurrency.limits.limiter.LifoBlockingLimiter;
import com.netflix.concurrency.limits.limiter.SimpleLimiter;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.choam.CHOAM.TransactionExecutor;
import com.salesforce.apollo.choam.proto.FoundationSeal;
import com.salesforce.apollo.choam.proto.Join;
import com.salesforce.apollo.choam.proto.Transaction;
import com.salesforce.apollo.stereotomy.event.proto.KERL_;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.choam.CHOAM.TransactionExecutor;
import com.salesforce.apollo.choam.support.CheckpointState;
import com.salesforce.apollo.choam.support.ChoamMetrics;
import com.salesforce.apollo.choam.support.ExponentialBackoffPolicy;
Expand All @@ -29,6 +28,7 @@
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster;
import com.salesforce.apollo.stereotomy.event.proto.KERL_;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.OffHeapStore;
import org.joou.ULong;
Expand Down Expand Up @@ -574,7 +574,7 @@ public Limiter<Void> build(String name, MetricRegistry metrics) {
.backoffRatio(backoffRatio)
.build())
.build();
return LifoBlockingLimiter.<Void>newBuilder(limiter)
return LifoBlockingLimiter.newBuilder(limiter)
.backlogSize(backlogSize)
.backlogTimeout(backlogDuration)
.build();
Expand Down
Loading

0 comments on commit f89e8b7

Please sign in to comment.