Skip to content

Commit

Permalink
Bootstrapping Fireflies.
Browse files Browse the repository at this point in the history
Revert back to olden daze where the EstablishmentEvent was used in ye Note. Use this as the verifier for the Participant who's Note this is.

This locks the View into the current Note of the Participant, as there's only that event as the Verifier.

Validation is used in the Gateway admission, to ensure only valid member IDs join.
  • Loading branch information
Hellblazer committed Dec 30, 2023
1 parent cbdd46b commit d0a54a9
Show file tree
Hide file tree
Showing 44 changed files with 448 additions and 517 deletions.
10 changes: 5 additions & 5 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,27 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.choam.proto.SubmitResult.Result;
import com.salesforce.apollo.cryptography.proto.PubKey;
import com.salesforce.apollo.messaging.proto.AgedMessageOrBuilder;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.choam.comm.*;
import com.salesforce.apollo.choam.fsm.Combine;
import com.salesforce.apollo.choam.fsm.Combine.Merchantile;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.choam.proto.SubmitResult.Result;
import com.salesforce.apollo.choam.support.*;
import com.salesforce.apollo.choam.support.Bootstrapper.SynchronizedState;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock.NullBlock;
import com.salesforce.apollo.cryptography.*;
import com.salesforce.apollo.cryptography.Signer.SignerImpl;
import com.salesforce.apollo.cryptography.proto.PubKey;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.GroupIterator;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.RoundScheduler;
import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster;
import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.MessageAdapter;
import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.Msg;
import com.salesforce.apollo.messaging.proto.AgedMessageOrBuilder;
import com.salesforce.apollo.utils.Utils;
import io.grpc.StatusRuntimeException;
import org.h2.mvstore.MVMap;
Expand Down Expand Up @@ -91,7 +91,7 @@ public class CHOAM {
public CHOAM(Parameters params) {
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
this.params = params;
executions = Executors.newVirtualThreadPerTaskExecutor();
executions = Executors.newCachedThreadPool(Thread.ofVirtual().factory());

nextView();
combine = new ReliableBroadcaster(params.context(), params.member(), params.combine(), params.communications(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void func() throws Exception {

@Test
public void scalingTest() throws Exception {
var exec = Executors.newVirtualThreadPerTaskExecutor();
var exec = Executors.newCachedThreadPool(Thread.ofVirtual().factory());
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
Context<Member> context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getOrigin(), 9, 0.2, 3);
var entropy = SecureRandom.getInstance("SHA1PRNG");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
class Transactioneer {
private final static Random entropy = new Random();
private final static Logger log = LoggerFactory.getLogger(Transactioneer.class);
private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final static Executor executor = Executors.newCachedThreadPool(
Thread.ofVirtual().factory());
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual()
.factory());
private final AtomicInteger completed = new AtomicInteger();
Expand Down
8 changes: 5 additions & 3 deletions fireflies/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.salesforce.apollo</groupId>
Expand All @@ -7,7 +8,8 @@
</parent>
<artifactId>fireflies</artifactId>
<name>Fireflies</name>
<description>Byzantine fault tolerant, virtually synchronous membership service and secure communications ovelay</description>
<description>Byzantine fault-tolerant, virtually synchronous membership service and secure communications ovelay
</description>
<dependencies>
<dependency>
<groupId>com.salesforce.apollo</groupId>
Expand Down Expand Up @@ -44,4 +46,4 @@
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,30 @@ private boolean completeGateway(Participant member, CompletableFuture<Bound> gat
return true;
}

private void gatewaySRE(Digest v, Entrance link, StatusRuntimeException sre, AtomicInteger abandon) {
if (sre.getStatus().getCode().equals(Status.OUT_OF_RANGE.getCode())) {
log.debug("Gateway view: {} invalid: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
} else if (sre.getStatus().getCode().equals(Status.FAILED_PRECONDITION.getCode())) {
log.debug("Gateway view: {} unavailable: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
} else if (sre.getStatus().getCode().equals(Status.PERMISSION_DENIED.getCode())) {
log.debug("Gateway view: {} permission denied: {} from: {} on: {}", v, sre.getMessage(),
link.getMember().getId(), node.getId());
abandon.incrementAndGet();
} else if (sre.getStatus().getCode().equals(Status.RESOURCE_EXHAUSTED.getCode())) {
log.debug("Gateway view: {} full: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
} else {
log.debug("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
}
;
}

private Join join(Digest v) {
return Join.newBuilder().setView(v.toDigeste()).setNote(node.getNote().getWrapped()).build();
}
Expand Down Expand Up @@ -243,16 +267,15 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu
redirecting.iterate((link, m) -> {
log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId());
try {
return link.join(join, params.seedingTimeout());
} catch (StatusRuntimeException sre) {
if (sre.getStatus().getCode().equals(Status.OUT_OF_RANGE.getCode())) {
log.debug("Gateway view: {} invalid: {} from: {} on: {}", v, sre.getMessage(),
link.getMember().getId(), node.getId());
var g = link.join(join, params.seedingTimeout());
if (g.equals(Gateway.getDefaultInstance())) {
log.debug("Gateway view: {} empty from: {} on: {}", v, link.getMember().getId(), node.getId());
abandon.incrementAndGet();
} else {
log.debug("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(),
link.getMember().getId(), node.getId());
return null;
}
return g;
} catch (StatusRuntimeException sre) {
gatewaySRE(v, link, sre, abandon);
return null;
}
}, (futureSailor, link, m) -> completeGateway((Participant) m, gateway, futureSailor, trusts,
Expand Down Expand Up @@ -295,7 +318,7 @@ private NoteWrapper seedFor(Seed seed) {
.setNote(Note.newBuilder()
.setHost(seed.endpoint().getHostName())
.setPort(seed.endpoint().getPort())
.setCoordinates(seed.coordinates().toEventCoords())
.setEstablishment(seed.establishment().toKeyEvent_())
.setEpoch(-1)
.setMask(ByteString.copyFrom(
Node.createInitialMask(context).toByteArray())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.google.common.collect.HashMultiset;
import com.google.protobuf.ByteString;
import com.salesforce.apollo.archipelago.RouterImpl;
import com.salesforce.apollo.cryptography.JohnHancock;
import com.salesforce.apollo.cryptography.SigningThreshold;
Expand Down Expand Up @@ -70,8 +69,8 @@ public <S extends SigningMember, M extends Member> Bootstrapper(S member, Durati
this.operationTimeout = operationTimeout;
this.operationsFrequency = operationsFrequency;
ksCoords = Caffeine.newBuilder()
.maximumSize(10)
.expireAfterWrite(Duration.ofMinutes(10))
.maximumSize(100)
.expireAfterWrite(Duration.ofMinutes(1))
.removalListener((EventCoordinates coords, KeyState ks, RemovalCause cause) -> log.trace(
"KeyState {} was removed ({})", coords, cause))
.build(new CacheLoader<EventCoordinates, KeyState>() {
Expand All @@ -82,8 +81,8 @@ public <S extends SigningMember, M extends Member> Bootstrapper(S member, Durati
}
});
ksSeq = Caffeine.newBuilder()
.maximumSize(10)
.expireAfterWrite(Duration.ofMinutes(10))
.maximumSize(100)
.expireAfterWrite(Duration.ofMinutes(1))
.removalListener((IdentifierSequence seq, KeyState ks, RemovalCause cause) -> log.trace(
"KeyState {} was removed ({})", seq, cause))
.build(new CacheLoader<IdentifierSequence, KeyState>() {
Expand All @@ -100,7 +99,7 @@ public EventValidation getValidator() {
@Override
public Verifier.Filtered filtered(EventCoordinates coordinates, SigningThreshold threshold,
JohnHancock signature, InputStream message) {
log.trace("Filtering for: {} on: {}", coordinates, member);
log.trace("Filtering for: {} on: {}", coordinates, member.getId());
var keyState = getKeyState(coordinates);
if (keyState.isEmpty()) {
return new Verifier.Filtered(false, 0, null);
Expand All @@ -112,40 +111,27 @@ public Verifier.Filtered filtered(EventCoordinates coordinates, SigningThreshold

@Override
public Optional<KeyState> getKeyState(EventCoordinates coordinates) {
log.trace("Get key state: {} on: {}", coordinates, member);
return Optional.of(Bootstrapper.this.getKeyState(coordinates));
log.trace("Get key state: {} on: {}", coordinates, member.getId());
return Optional.of(Bootstrapper.this.delegate(coordinates));
}

@Override
public Optional<KeyState> getKeyState(Identifier identifier, ULong seqNum) {
log.trace("Get key state: {}:{} on: {}", identifier, seqNum, member);
return Optional.of(Bootstrapper.this.getKeyState(identifier, seqNum));
log.trace("Get key state: {}:{} on: {}", identifier, seqNum, member.getId());
return Optional.of(Bootstrapper.this.delegate(new IdentifierSequence(identifier, seqNum)));
}

@Override
public boolean validate(EstablishmentEvent event) {
log.trace("Validate event: {} on: {}", event, member);
log.trace("Validate event: {} on: {}", event, member.getId());
return Bootstrapper.this.validate(event.getCoordinates());
}

@Override
public boolean validate(EventCoordinates coordinates) {
log.trace("Validating coordinates: {} on: {}", coordinates, member);
log.trace("Validating coordinates: {} on: {}", coordinates, member.getId());
return Bootstrapper.this.validate(coordinates);
}

@Override
public boolean verify(EventCoordinates coordinates, JohnHancock signature, InputStream message) {
log.trace("Verify coordinates: {} on: {}", coordinates, member);
return new BootstrapVerifier(coordinates.getIdentifier()).verify(signature, message);
}

@Override
public boolean verify(EventCoordinates coordinates, SigningThreshold threshold, JohnHancock signature,
InputStream message) {
log.trace("Verify coordinates: {} on: {}", coordinates, member);
return new BootstrapVerifier(coordinates.getIdentifier()).verify(threshold, signature, message);
}
};
}

Expand Down Expand Up @@ -226,7 +212,7 @@ private KeyState delegate(EventCoordinates coordinates) {
return link.getKeyState(coords);
}, (futureSailor, link, m) -> complete(ks, futureSailor, keystates, m), () -> {
if (!ks.isDone()) {
log.warn("Failed to retrieve key state: {} from slice on: {}", coordinates, member);
log.warn("Failed to retrieve key state: {} from slice on: {}", coordinates, member.getId());
ks.complete(null);
}
}, scheduler, operationsFrequency);
Expand All @@ -241,7 +227,7 @@ private KeyState delegate(EventCoordinates coordinates) {
}

private KeyState delegate(IdentifierSequence idSeq) {
log.info("Get key state: {} from slice on: {}", idSeq, member);
log.info("Get key state: {} from slice on: {}", idSeq, member.getId());
var iterator = new SliceIterator<>("Retrieve KeyState", member, successors, communications);
final var identifierSeq = idSeq.toIdSeq();
var ks = new CompletableFuture<KeyState>();
Expand All @@ -251,7 +237,7 @@ private KeyState delegate(IdentifierSequence idSeq) {
return link.getKeyState(identifierSeq);
}, (futureSailor, link, m) -> complete(ks, futureSailor, keystates, m), () -> {
if (!ks.isDone()) {
log.warn("Failed to retrieve key state: {} from slice on: {}", idSeq, member);
log.warn("Failed to retrieve key state: {} from slice on: {}", idSeq, member.getId());
ks.complete(null);
}
}, scheduler, operationsFrequency);
Expand All @@ -270,7 +256,7 @@ private KeyState getKeyState(EventCoordinates coordinates) {
}

private boolean validate(EventCoordinates coordinates) {
log.info("Validate event: {} from slice on: {}", coordinates, member);
log.info("Validate event: {} from slice on: {}", coordinates, member.getId());
var succ = successors.stream().filter(m -> coordinates.getIdentifier().equals(m.getId())).findFirst();
if (succ.isPresent()) {
return true;
Expand All @@ -284,7 +270,7 @@ private boolean validate(EventCoordinates coordinates) {
return link.validate(coordinates.toEventCoords());
}, (futureSailor, link, m) -> completeValidation(valid, futureSailor, validations, m), () -> {
if (!valid.isDone()) {
log.warn("Failed to validate: {} from slice on: {}", coordinates, member);
log.warn("Failed to validate: {} from slice on: {}", coordinates, member.getId());
valid.complete(Validation.newBuilder().setResult(false).build());
}
}, scheduler, operationsFrequency);
Expand All @@ -298,60 +284,6 @@ private boolean validate(EventCoordinates coordinates) {
return false;
}

public static class ViewEventValidation implements EventValidation {
private EventValidation delegate;

public ViewEventValidation(EventValidation delegate) {
this.delegate = delegate;
}

@Override
public Verifier.Filtered filtered(EventCoordinates coordinates, SigningThreshold threshold,
JohnHancock signature, InputStream message) {
return delegate.filtered(coordinates, threshold, signature, message);
}

@Override
public Optional<KeyState> getKeyState(EventCoordinates coordinates) {
return delegate.getKeyState(coordinates);
}

@Override
public Optional<KeyState> getKeyState(Identifier identifier, ULong seqNum) {
return delegate.getKeyState(identifier, seqNum);
}

@Override
public boolean validate(EstablishmentEvent event) {
return delegate.validate(event);
}

@Override
public boolean validate(EventCoordinates coordinates) {
return delegate.validate(coordinates);
}

@Override
public boolean verify(EventCoordinates coordinates, SigningThreshold threshold, JohnHancock signature,
InputStream message) {
return delegate.verify(coordinates, threshold, signature, message);
}

@Override
public boolean verify(EventCoordinates coordinates, JohnHancock signature, ByteString byteString) {
return delegate.verify(coordinates, signature, byteString);
}

@Override
public boolean verify(EventCoordinates coordinates, JohnHancock signature, InputStream message) {
return delegate.verify(coordinates, signature, message);
}

void setDelegate(EventValidation delegate) {
this.delegate = delegate;
}
}

private record IdentifierSequence(Identifier identifier, ULong seqNum) {
public IdentAndSeq toIdSeq() {
return IdentAndSeq.newBuilder()
Expand All @@ -375,13 +307,13 @@ public BootstrapVerifier(Identifier identifier) {
@Override
protected KeyState getKeyState(ULong sequenceNumber) {
var key = new IdentifierSequence(identifier, sequenceNumber);
log.info("Get key state: {} on: {}", key, member);
log.info("Get key state: {} on: {}", key, member.getId());
return ksSeq.get(key);
}

@Override
protected KeyState getKeyState(EventCoordinates coordinates) {
log.info("Get key state: {} on: {}", coordinates, member);
log.info("Get key state: {} on: {}", coordinates, member.getId());
return ksCoords.get(coordinates);
}
}
Expand Down
Loading

0 comments on commit d0a54a9

Please sign in to comment.