Skip to content

Commit

Permalink
add BootstrapVerifiers to provide key state resolution via delegation…
Browse files Browse the repository at this point in the history
… to successors in joined group
  • Loading branch information
Hellblazer committed Dec 27, 2023
1 parent f9404b1 commit 2a15db3
Show file tree
Hide file tree
Showing 16 changed files with 436 additions and 99 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package com.salesforce.apollo.fireflies;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.google.common.collect.HashMultiset;
import com.google.common.util.concurrent.ListenableFuture;
import com.salesforce.apollo.archipelago.RouterImpl;
import com.salesforce.apollo.cryptography.Verifier;
import com.salesforce.apollo.fireflies.comm.entrance.EntranceClient;
import com.salesforce.apollo.fireflies.proto.IdentifierSequenceNumber;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.ring.SliceIterator;
import com.salesforce.apollo.stereotomy.EventCoordinates;
import com.salesforce.apollo.stereotomy.KeyState;
import com.salesforce.apollo.stereotomy.KeyStateVerifier;
import com.salesforce.apollo.stereotomy.Verifiers;
import com.salesforce.apollo.stereotomy.event.proto.KeyState_;
import com.salesforce.apollo.stereotomy.event.protobuf.KeyStateImpl;
import com.salesforce.apollo.stereotomy.identifier.Identifier;
import io.grpc.StatusRuntimeException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joou.ULong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

/**
* Verifiers that delegate to the joining member's successors in the full context for key state retrieval
* <p>
* This is used to bootstrap the node via delegated key state resolution of the joined group
* </p>
*
* @author hal.hildebrand
**/
public class BootstrapVerifiers implements Verifiers {
private final static Logger log = LoggerFactory.getLogger(
BootstrapVerifiers.class);
private final List<View.Participant> successors;
private final View.Node member;
private final int majority;
private final LoadingCache<EventCoordinates, KeyState> ksCoords;
private final LoadingCache<IdentifierSequence, KeyState> ksSeq;
private final RouterImpl.CommonCommunications<EntranceClient, ?> communications;
private final Duration operationTimeout;
private final Duration operationsFrequency;

public BootstrapVerifiers(View.Node member, Duration operationTimeout, List<View.Participant> successors,
int majority, Duration operationsFrequency,
RouterImpl.CommonCommunications<EntranceClient, ?> communications) {
this.member = member;
this.successors = successors;
this.majority = majority;
this.communications = communications;
this.operationTimeout = operationTimeout;
this.operationsFrequency = operationsFrequency;
ksCoords = Caffeine.newBuilder()
.maximumSize(10)
.expireAfterWrite(Duration.ofMinutes(10))
.removalListener((EventCoordinates coords, KeyState ks, RemovalCause cause) -> log.trace(
"KeyState {} was removed ({})", coords, cause))
.build(new CacheLoader<EventCoordinates, KeyState>() {

@Override
public @Nullable KeyState load(EventCoordinates key) throws Exception {
return delegate(key);
}
});
ksSeq = Caffeine.newBuilder()
.maximumSize(10)
.expireAfterWrite(Duration.ofMinutes(10))
.removalListener((IdentifierSequence seq, KeyState ks, RemovalCause cause) -> log.trace(
"KeyState {} was removed ({})", seq, cause))
.build(new CacheLoader<IdentifierSequence, KeyState>() {

@Override
public @Nullable KeyState load(IdentifierSequence key) throws Exception {
return delegate(key);
}
});
}

@Override
public Optional<Verifier> verifierFor(EventCoordinates coordinates) {
return Optional.of(new BootstrapVerifier(coordinates.getIdentifier()));
}

@Override
public Optional<Verifier> verifierFor(Identifier identifier) {
return Optional.of(new BootstrapVerifier(identifier));
}

private <T> boolean complete(CompletableFuture<KeyState> ksFuture,
Optional<ListenableFuture<KeyState_>> futureSailor, HashMultiset<KeyState_> keystates,
Member m) {
if (futureSailor.isEmpty()) {
return true;
}
try {
final var ks = futureSailor.get().get();
keystates.add(ks);
} catch (ExecutionException ex) {
if (ex.getCause() instanceof StatusRuntimeException sre) {
switch (sre.getStatus().getCode()) {
case RESOURCE_EXHAUSTED:
log.trace("SRE in redirect: {} on: {}", sre.getStatus(), member.getId());
break;
default:
log.trace("SRE in redirect: {} on: {}", sre.getStatus(), member.getId());
}
} else {
log.error("Error in redirect: {} on: {}", ex.getCause(), member.getId());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (CancellationException e) {
// noop
}

var vs = keystates.entrySet()
.stream()
.filter(e -> e.getCount() >= majority)
.map(e -> e.getElement())
.findFirst()
.orElse(null);
if (vs != null) {
var keyState = new KeyStateImpl(vs);
if (ksFuture.complete(keyState)) {
log.debug("Key state: {} received majority on: {}", keyState.getCoordinates(), member.getId());
return false;
}
}
return true;
}

private KeyState delegate(IdentifierSequence idSeq) {
var iterator = new SliceIterator<>("Retrieve KeyState", member, successors, communications);
final var identifierSeq = idSeq.toIdSeq();
var ks = new CompletableFuture<KeyState>();
HashMultiset<KeyState_> keystates = HashMultiset.create();
iterator.iterate((link, m) -> {
log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId());
return link.getKeyState(identifierSeq);
}, (futureSailor, link, m) -> complete(ks, futureSailor, keystates, m), () -> {
if (!ks.isDone()) {

}
}, Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), Duration.ofMillis(10));
try {
return ks.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.warn("Unable to retrieve key state: {} on: {}", idSeq, member.getId());
}
return null;
}

private KeyState delegate(EventCoordinates coordinates) {
var iterator = new SliceIterator<>("Retrieve KeyState", member, successors, communications);
final var coords = coordinates.toEventCoords();
var ks = new CompletableFuture<KeyState>();
HashMultiset<KeyState_> keystates = HashMultiset.create();
iterator.iterate((link, m) -> {
log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), member.getId());
return link.getKeyState(coords);
}, (futureSailor, link, m) -> complete(ks, futureSailor, keystates, m), () -> {
if (!ks.isDone()) {

}
}, Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()), Duration.ofMillis(10));
try {
return ks.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.warn("Unable to retrieve key state: {} on: {}", coordinates, member.getId());
}
return null;
}

private record IdentifierSequence(Identifier identifier, ULong seqNum) {
public IdentifierSequenceNumber toIdSeq() {
return IdentifierSequenceNumber.newBuilder()
.setIdentifier(identifier.toIdent())
.setSequenceNumber(seqNum.longValue())
.build();
}

@Override
public String toString() {
return "{" + "identifier=" + identifier + ", seqNum=" + seqNum + '}';
}
}

private class BootstrapVerifier extends KeyStateVerifier {

public BootstrapVerifier(Identifier identifier) {
super(identifier);
}

@Override
protected KeyState getKeyState(ULong sequenceNumber) {
return ksSeq.get(new IdentifierSequence(identifier, sequenceNumber));
}

@Override
protected KeyState getKeyState(EventCoordinates coordinates) {
return ksCoords.get(coordinates);
}
}
}
26 changes: 24 additions & 2 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@
import com.salesforce.apollo.stereotomy.ControlledIdentifier;
import com.salesforce.apollo.stereotomy.EventCoordinates;
import com.salesforce.apollo.stereotomy.EventValidation;
import com.salesforce.apollo.stereotomy.KeyState;
import com.salesforce.apollo.stereotomy.event.proto.KERL_;
import com.salesforce.apollo.stereotomy.event.proto.KeyState_;
import com.salesforce.apollo.stereotomy.identifier.Identifier;
import com.salesforce.apollo.stereotomy.identifier.SelfAddressingIdentifier;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.joou.ULong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1841,6 +1844,26 @@ boolean setNote(NoteWrapper next) {

public class Service implements EntranceService, FFService, ServiceRouting {

@Override
public KeyState getKeyState(Identifier identifier, ULong seqNum, Digest from) {
if (!introduced.get()) {
log.trace("Not introduced!, ignoring key state request from: {} on: {}", from, node.getId());
return null;
}
var keyState = validation.getKeyState(identifier, seqNum);
return keyState.isEmpty() ? null : keyState.get();
}

@Override
public KeyState getKeyState(EventCoordinates coordinates, Digest from) {
if (!introduced.get()) {
log.trace("Not introduced!, ignoring key state request from: {} on: {}", from, node.getId());
return null;
}
var keyState = validation.getKeyState(coordinates);
return keyState.isEmpty() ? null : keyState.get();
}

/**
* Asynchronously add a member to the next view
*/
Expand All @@ -1859,7 +1882,6 @@ public void join(Join join, Digest from, StreamObserver<Gateway> responseObserve
* with the Gossip that represents the digests newer or not known in this view, as well as updates from this
* node based on out of date information in the supplied digests.
*
* @param ring - the index of the gossip ring the inbound member is gossiping on
* @param request - the Gossip from our partner
* @return Teh response for Moar gossip - updates this node has which the sender is out of touch with, and
* digests from the sender that this node would like updated.
Expand Down Expand Up @@ -1928,7 +1950,7 @@ public Redirect seed(Registration registration, Digest from) {
/**
* The third and final message in the anti-entropy protocol. Process the inbound update from another member.
*
* @param state - update state
* @param request - update state
* @param from
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@
*/
package com.salesforce.apollo.fireflies.comm.entrance;

import java.io.IOException;
import java.time.Duration;

import com.google.common.util.concurrent.ListenableFuture;
import com.salesforce.apollo.fireflies.proto.Gateway;
import com.salesforce.apollo.fireflies.proto.Join;
import com.salesforce.apollo.fireflies.proto.Redirect;
import com.salesforce.apollo.fireflies.proto.Registration;
import com.salesforce.apollo.archipelago.Link;
import com.salesforce.apollo.fireflies.View.Node;
import com.salesforce.apollo.fireflies.proto.*;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.stereotomy.event.proto.EventCoords;
import com.salesforce.apollo.stereotomy.event.proto.KeyState_;

import java.io.IOException;
import java.time.Duration;

/**
* @author hal.hildebrand
Expand Down Expand Up @@ -47,6 +46,10 @@ public ListenableFuture<Redirect> seed(Registration registration) {
};
}

ListenableFuture<KeyState_> getKeyState(IdentifierSequenceNumber idSeq);

ListenableFuture<KeyState_> getKeyState(EventCoords coords);

ListenableFuture<Gateway> join(Join join, Duration timeout);

ListenableFuture<Redirect> seed(Registration registration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,18 @@
*/
package com.salesforce.apollo.fireflies.comm.entrance;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ListenableFuture;
import com.salesforce.apollo.fireflies.proto.EntranceGrpc;
import com.salesforce.apollo.fireflies.proto.EntranceGrpc.EntranceFutureStub;
import com.salesforce.apollo.fireflies.proto.Gateway;
import com.salesforce.apollo.fireflies.proto.Join;
import com.salesforce.apollo.fireflies.proto.Redirect;
import com.salesforce.apollo.fireflies.proto.Registration;
import com.salesforce.apollo.archipelago.ManagedServerChannel;
import com.salesforce.apollo.archipelago.ServerConnectionCache.CreateClientCommunications;
import com.salesforce.apollo.fireflies.FireflyMetrics;
import com.salesforce.apollo.fireflies.proto.*;
import com.salesforce.apollo.fireflies.proto.EntranceGrpc.EntranceFutureStub;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.stereotomy.event.proto.EventCoords;
import com.salesforce.apollo.stereotomy.event.proto.KeyState_;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
* @author hal.hildebrand
Expand All @@ -29,6 +27,7 @@ public class EntranceClient implements Entrance {
private final ManagedServerChannel channel;
private final EntranceFutureStub client;
private final FireflyMetrics metrics;

public EntranceClient(ManagedServerChannel channel, FireflyMetrics metrics) {
this.channel = channel;
this.client = EntranceGrpc.newFutureStub(channel).withCompression("gzip");
Expand All @@ -45,6 +44,16 @@ public void close() {
channel.release();
}

@Override
public ListenableFuture<KeyState_> getKeyState(IdentifierSequenceNumber idSeq) {
return client.getKeyStateIdentifier(idSeq);
}

@Override
public ListenableFuture<KeyState_> getKeyState(EventCoords coords) {
return client.getKeyStateCoords(coords);
}

@Override
public Member getMember() {
return channel.getMember();
Expand Down Expand Up @@ -94,5 +103,4 @@ public ListenableFuture<Redirect> seed(Registration registration) {
}, r -> r.run());
return result;
}

}
Loading

0 comments on commit 2a15db3

Please sign in to comment.