Skip to content

Commit

Permalink
Rename to Bootstrapper, add EventValidation functionality. Basically …
Browse files Browse the repository at this point in the history
…Ani for bootstrapping the view member's join
  • Loading branch information
Hellblazer committed Dec 27, 2023
1 parent 71a61d9 commit b984a14
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
import com.google.common.collect.HashMultiset;
import com.google.common.util.concurrent.ListenableFuture;
import com.salesforce.apollo.archipelago.RouterImpl;
import com.salesforce.apollo.cryptography.JohnHancock;
import com.salesforce.apollo.cryptography.SigningThreshold;
import com.salesforce.apollo.cryptography.Verifier;
import com.salesforce.apollo.fireflies.comm.entrance.Entrance;
import com.salesforce.apollo.fireflies.proto.Validation;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.ring.SliceIterator;
import com.salesforce.apollo.stereotomy.EventCoordinates;
import com.salesforce.apollo.stereotomy.KeyState;
import com.salesforce.apollo.stereotomy.KeyStateVerifier;
import com.salesforce.apollo.stereotomy.Verifiers;
import com.salesforce.apollo.stereotomy.*;
import com.salesforce.apollo.stereotomy.event.EstablishmentEvent;
import com.salesforce.apollo.stereotomy.event.proto.IdentAndSeq;
import com.salesforce.apollo.stereotomy.event.proto.KeyState_;
import com.salesforce.apollo.stereotomy.event.protobuf.KeyStateImpl;
Expand All @@ -26,6 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
Expand All @@ -42,9 +44,8 @@
*
* @author hal.hildebrand
**/
public class BootstrapVerifiers implements Verifiers {
private final static Logger log = LoggerFactory.getLogger(
BootstrapVerifiers.class);
public class Bootstrapper implements Verifiers {
private final static Logger log = LoggerFactory.getLogger(Bootstrapper.class);
private final List<? extends Member> successors;
private final SigningMember member;
private final int majority;
Expand All @@ -54,10 +55,10 @@ public class BootstrapVerifiers implements Verifiers {
private final Duration operationTimeout;
private final Duration operationsFrequency;

public <S extends SigningMember, M extends Member> BootstrapVerifiers(S member, Duration operationTimeout,
List<M> successors, int majority,
Duration operationsFrequency,
RouterImpl.CommonCommunications<Entrance, ?> communications) {
public <S extends SigningMember, M extends Member> Bootstrapper(S member, Duration operationTimeout,
List<M> successors, int majority,
Duration operationsFrequency,
RouterImpl.CommonCommunications<Entrance, ?> communications) {
this.member = member;
this.successors = successors;
this.majority = majority;
Expand Down Expand Up @@ -90,16 +91,70 @@ public <S extends SigningMember, M extends Member> BootstrapVerifiers(S member,
});
}

@Override
public Optional<Verifier> verifierFor(EventCoordinates coordinates) {
return Optional.of(new BootstrapVerifier(coordinates.getIdentifier()));
public EventValidation getValidator() {
return new EventValidation() {
@Override
public Verifier.Filtered filtered(EventCoordinates coordinates, SigningThreshold threshold,
JohnHancock signature, InputStream message) {
log.trace("Filtering for: {} on: {}", coordinates, member);
var keyState = getKeyState(coordinates);
if (keyState.isEmpty()) {
return new Verifier.Filtered(false, 0, null);
}
KeyState ks = keyState.get();
var v = new Verifier.DefaultVerifier(ks.getKeys());
return v.filtered(threshold, signature, message);
}

@Override
public Optional<KeyState> getKeyState(EventCoordinates coordinates) {
log.trace("Get key state: {} on: {}", coordinates, member);
return getKeyState(coordinates);
}

@Override
public Optional<KeyState> getKeyState(Identifier identifier, ULong seqNum) {
log.trace("Get key state: {}:{} on: {}", identifier, seqNum, member);
return getKeyState(identifier, seqNum);
}

@Override
public boolean validate(EstablishmentEvent event) {
log.trace("Validate event: {} on: {}", event, member);
return Bootstrapper.this.validate(event.getCoordinates());
}

@Override
public boolean validate(EventCoordinates coordinates) {
log.trace("Validating coordinates: {} on: {}", coordinates, member);
return Bootstrapper.this.validate(coordinates);
}

@Override
public boolean verify(EventCoordinates coordinates, JohnHancock signature, InputStream message) {
log.trace("Verify coordinates: {} on: {}", coordinates, member);
return new BootstrapVerifier(coordinates.getIdentifier()).verify(signature, message);
}

@Override
public boolean verify(EventCoordinates coordinates, SigningThreshold threshold, JohnHancock signature,
InputStream message) {
log.trace("Verify coordinates: {} on: {}", coordinates, member);
return new BootstrapVerifier(coordinates.getIdentifier()).verify(threshold, signature, message);
}
};
}

@Override
public Optional<Verifier> verifierFor(Identifier identifier) {
return Optional.of(new BootstrapVerifier(identifier));
}

@Override
public Optional<Verifier> verifierFor(EventCoordinates coordinates) {
return Optional.of(new BootstrapVerifier(coordinates.getIdentifier()));
}

private <T> boolean complete(CompletableFuture<KeyState> ksFuture,
Optional<ListenableFuture<KeyState_>> futureSailor, HashMultiset<KeyState_> keystates,
Member m) {
Expand All @@ -111,15 +166,9 @@ private <T> boolean complete(CompletableFuture<KeyState> ksFuture,
keystates.add(ks);
} catch (ExecutionException ex) {
if (ex.getCause() instanceof StatusRuntimeException sre) {
switch (sre.getStatus().getCode()) {
case RESOURCE_EXHAUSTED:
log.trace("SRE in redirect: {} on: {}", sre.getStatus(), member.getId());
break;
default:
log.trace("SRE in redirect: {} on: {}", sre.getStatus(), member.getId());
}
log.trace("SRE in get key state: {} on: {}", sre.getStatus(), member.getId());
} else {
log.error("Error in redirect: {} on: {}", ex.getCause(), member.getId());
log.error("Error in key state: {} on: {}", ex.getCause(), member.getId());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -143,52 +192,109 @@ private <T> boolean complete(CompletableFuture<KeyState> ksFuture,
return true;
}

private KeyState delegate(IdentifierSequence idSeq) {
private boolean completeValidation(CompletableFuture<Validation> valid,
Optional<ListenableFuture<Validation>> futureSailor,
HashMultiset<Validation> validations, Member m) {
if (futureSailor.isEmpty()) {
return true;
}
try {
final var v = futureSailor.get().get();
validations.add(v);
} catch (ExecutionException ex) {
if (ex.getCause() instanceof StatusRuntimeException sre) {
log.trace("SRE in validate: {} on: {}", sre.getStatus(), member.getId());
} else {
log.error("Error in validate: {} on: {}", ex.getCause(), member.getId());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (CancellationException e) {
// noop
}

var validation = validations.entrySet()
.stream()
.filter(e -> e.getCount() >= majority)
.map(e -> e.getElement())
.findFirst();
if (!validation.isEmpty()) {
if (valid.complete(validation.get())) {
log.debug("Validation: {} received majority on: {}", validation.get().getResult(), member.getId());
return false;
}
}
return true;
}

private KeyState delegate(EventCoordinates coordinates) {
var iterator = new SliceIterator<>("Retrieve KeyState", member, successors, communications);
final var identifierSeq = idSeq.toIdSeq();
final var coords = coordinates.toEventCoords();
var ks = new CompletableFuture<KeyState>();
HashMultiset<KeyState_> keystates = HashMultiset.create();
iterator.iterate((link, m) -> {
log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId());
return link.getKeyState(identifierSeq);
return link.getKeyState(coords);
}, (futureSailor, link, m) -> complete(ks, futureSailor, keystates, m), () -> {
if (!ks.isDone()) {

ks.complete(null);
}
}, Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), Duration.ofMillis(10));
try {
return ks.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.warn("Unable to retrieve key state: {} on: {}", idSeq, member.getId());
log.warn("Unable to retrieve key state: {} on: {}", coordinates, member.getId());
}
return null;
}

private KeyState delegate(EventCoordinates coordinates) {
private KeyState delegate(IdentifierSequence idSeq) {
var iterator = new SliceIterator<>("Retrieve KeyState", member, successors, communications);
final var coords = coordinates.toEventCoords();
final var identifierSeq = idSeq.toIdSeq();
var ks = new CompletableFuture<KeyState>();
HashMultiset<KeyState_> keystates = HashMultiset.create();
iterator.iterate((link, m) -> {
log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId());
return link.getKeyState(coords);
return link.getKeyState(identifierSeq);
}, (futureSailor, link, m) -> complete(ks, futureSailor, keystates, m), () -> {
if (!ks.isDone()) {

ks.complete(null);
}
}, Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), Duration.ofMillis(10));
try {
return ks.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.warn("Unable to retrieve key state: {} on: {}", coordinates, member.getId());
log.warn("Unable to retrieve key state: {} on: {}", idSeq, member.getId());
}
return null;
}

private boolean validate(EventCoordinates coordinates) {
var iterator = new SliceIterator<>("Retrieve KeyState", member, successors, communications);
var valid = new CompletableFuture<Validation>();
HashMultiset<Validation> validations = HashMultiset.create();
iterator.iterate((link, m) -> {
log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId());
return link.validate(coordinates.toEventCoords());
}, (futureSailor, link, m) -> completeValidation(valid, futureSailor, validations, m), () -> {
if (!valid.isDone()) {
valid.complete(Validation.newBuilder().setResult(false).build());
}
}, Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), Duration.ofMillis(10));
try {
return valid.get().getResult();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.warn("Unable to validate: {} on: {}", coordinates, member.getId());
}
return false;
}

private record IdentifierSequence(Identifier identifier, ULong seqNum) {
public IdentAndSeq toIdSeq() {
return IdentAndSeq.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.salesforce.apollo.stereotomy.EventCoordinates;
import com.salesforce.apollo.stereotomy.EventValidation;
import com.salesforce.apollo.stereotomy.KeyState;
import com.salesforce.apollo.stereotomy.event.proto.EventCoords;
import com.salesforce.apollo.stereotomy.event.proto.KERL_;
import com.salesforce.apollo.stereotomy.event.proto.KeyState_;
import com.salesforce.apollo.stereotomy.identifier.Identifier;
Expand Down Expand Up @@ -1985,5 +1986,10 @@ public void update(State request, Digest from) {
}
});
}

@Override
public Validation validateCoords(EventCoords request, Digest from) {
return Validation.newBuilder().setResult(validation.validate(EventCoordinates.from(request))).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.salesforce.apollo.archipelago.Link;
import com.salesforce.apollo.fireflies.View.Node;
import com.salesforce.apollo.fireflies.proto.Gateway;
import com.salesforce.apollo.fireflies.proto.Join;
import com.salesforce.apollo.fireflies.proto.Redirect;
import com.salesforce.apollo.fireflies.proto.Registration;
import com.salesforce.apollo.fireflies.proto.*;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.stereotomy.event.proto.EventCoords;
import com.salesforce.apollo.stereotomy.event.proto.IdentAndSeq;
Expand Down Expand Up @@ -57,6 +54,11 @@ public ListenableFuture<Gateway> join(Join join, Duration timeout) {
public ListenableFuture<Redirect> seed(Registration registration) {
return null;
}

@Override
public ListenableFuture<Validation> validate(EventCoords coords) {
return null;
}
};
}

Expand All @@ -67,4 +69,6 @@ public ListenableFuture<Redirect> seed(Registration registration) {
ListenableFuture<Gateway> join(Join join, Duration timeout);

ListenableFuture<Redirect> seed(Registration registration);

ListenableFuture<Validation> validate(EventCoords coords);
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ public ListenableFuture<Redirect> seed(Registration registration) {
}, r -> r.run());
return result;
}

@Override
public ListenableFuture<Validation> validate(EventCoords coords) {
return client.validate(coords);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import com.salesforce.apollo.fireflies.FireflyMetrics;
import com.salesforce.apollo.fireflies.View.Service;
import com.salesforce.apollo.fireflies.proto.EntranceGrpc.EntranceImplBase;
import com.salesforce.apollo.fireflies.proto.Gateway;
import com.salesforce.apollo.fireflies.proto.Join;
import com.salesforce.apollo.fireflies.proto.Redirect;
import com.salesforce.apollo.fireflies.proto.Registration;
import com.salesforce.apollo.fireflies.proto.*;
import com.salesforce.apollo.protocols.ClientIdentity;
import com.salesforce.apollo.stereotomy.EventCoordinates;
import com.salesforce.apollo.stereotomy.event.proto.EventCoords;
Expand All @@ -32,7 +29,7 @@ public class EntranceServer extends EntranceImplBase {

private final FireflyMetrics metrics;
private final RoutableService<Service> router;
private ClientIdentity identity;
private final ClientIdentity identity;

public EntranceServer(ClientIdentity identity, RoutableService<Service> r, FireflyMetrics metrics) {
this.metrics = metrics;
Expand Down Expand Up @@ -113,4 +110,18 @@ public void seed(Registration request, StreamObserver<Redirect> responseObserver
}
});
}

@Override
public void validate(EventCoords request, StreamObserver<Validation> responseObserver) {
Digest from = identity.getFrom();
if (from == null) {
responseObserver.onError(new IllegalStateException("Member has been removed"));
return;
}
router.evaluate(responseObserver, s -> {
var r = s.validateCoords(request, from);
responseObserver.onNext(r);
responseObserver.onCompleted();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@

import com.codahale.metrics.Timer.Context;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.fireflies.proto.Gateway;
import com.salesforce.apollo.fireflies.proto.Join;
import com.salesforce.apollo.fireflies.proto.Redirect;
import com.salesforce.apollo.fireflies.proto.Registration;
import com.salesforce.apollo.fireflies.proto.*;
import com.salesforce.apollo.stereotomy.EventCoordinates;
import com.salesforce.apollo.stereotomy.KeyState;
import com.salesforce.apollo.stereotomy.event.proto.EventCoords;
import com.salesforce.apollo.stereotomy.identifier.Identifier;
import io.grpc.stub.StreamObserver;
import org.joou.ULong;
Expand All @@ -30,4 +28,6 @@ public interface EntranceService {
void join(Join request, Digest from, StreamObserver<Gateway> responseObserver, Context timer);

Redirect seed(Registration request, Digest from);

Validation validateCoords(EventCoords request, Digest from);
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public Member answer(InvocationOnMock invocation) throws Throwable {
when(comm.connect(any())).thenReturn(client);

RouterImpl.CommonCommunications<EntranceClient, ?> communications = null;
var verifiers = new BootstrapVerifiers(member, Duration.ofSeconds(1), members, majority, Duration.ofMillis(10),
comm);
var verifiers = new Bootstrapper(member, Duration.ofSeconds(1), members, majority, Duration.ofMillis(10), comm);

var verifier = verifiers.verifierFor(member.getIdentifier().getIdentifier());
assertFalse(verifier.isEmpty());
Expand Down
Loading

0 comments on commit b984a14

Please sign in to comment.