Skip to content

Commit

Permalink
use SliceIterator for Ethereal.simplify RingCommunications.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jun 2, 2024
1 parent 91e0d86 commit deacf1a
Show file tree
Hide file tree
Showing 16 changed files with 179 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public GenesisAssembly(ViewContext vc, CommonCommunications<Terminal, ?> comms,
config.setLabel("Genesis Assembly" + view.context().getId() + " on: " + params().member().getId());
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize(), dataSource(),
transitions::process, transitions::nextEpoch, label);
coordinator = new ChRbcGossip(reContext, params().member(), controller.processor(), params().communications(),
coordinator = new ChRbcGossip(reContext.getId(), params().member(), nextAssembly.values(),
controller.processor(), params().communications(),
params().metrics() == null ? null : params().metrics().getGensisMetrics());
log.debug("Genesis Assembly: {} recontext: {} next assembly: {} on: {}", view.context().getId(),
reContext.getId(), nextAssembly.keySet(), params().member().getId());
Expand Down
8 changes: 4 additions & 4 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, Hash

config.setLabel("Producer" + getViewId() + " on: " + params().member().getId());
var producerMetrics = params().metrics() == null ? null : params().metrics().getProducerMetrics();
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds,
(preblock, last) -> serial(preblock, last), this::newEpoch, label);
coordinator = new ChRbcGossip(view.context(), params().member(), controller.processor(),
params().communications(), producerMetrics);
controller = new Ethereal(config.build(), params().producer().maxBatchByteSize() + (8 * 1024), ds, this::serial,
this::newEpoch, label);
coordinator = new ChRbcGossip(view.context().getId(), params().member(), view.membership(),
controller.processor(), params().communications(), producerMetrics);
log.debug("Roster for: {} is: {} on: {}", getViewId(), view.roster(), params().member().getId());

var onConsensus = new CompletableFuture<ViewAssembly.Vue>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static com.salesforce.apollo.cryptography.QualifiedBase64.publicKey;
Expand Down Expand Up @@ -135,6 +136,10 @@ public Signer getSigner() {
return signer;
}

public Set<Member> membership() {
return validators.keySet();
}

/**
* The process has failed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ private void sample() {
}
var randomCut = member.getId();
log.info("Random cut: {} on: {}", randomCut, params.member().getId());
var iterator = new RingIterator<Member, Terminal>(params.gossipDuration(), params.context(), params.member(),
comms, true, scheduler);
var iterator = new RingIterator<>(params.gossipDuration(), params.context(), params.member(), comms, true,
scheduler);
iterator.allowDuplicates();
iterator.iterate(randomCut, (link, _) -> synchronize(s, link),
(_, futureSailor, destination) -> synchronize(futureSailor, votes, destination),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ public String dump() {
public Processor processor() {
return new Processor() {
@Override
public Gossip gossip(Digest context, int ring) {
final var builder = Gossip.newBuilder().setRing(ring);
public Gossip gossip(Digest context) {
final var builder = Gossip.newBuilder();
final var current = currentEpoch.get();
epochs.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

package com.salesforce.apollo.ethereal;

import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.ethereal.proto.Gossip;
import com.salesforce.apollo.ethereal.proto.Update;
import com.salesforce.apollo.cryptography.Digest;

/**
* @author hal.hildebrand
Expand All @@ -20,10 +20,9 @@ public interface Processor {
* First phase request. Answer the gossip for the current state of the receiver
*
* @param context - the digest id of the context for routing
* @param ring - the ring we're gossiping on
* @return the Gossip
*/
Gossip gossip(Digest context, int ring);
Gossip gossip(Digest context);

/**
* First phase reply. Answer the Update from the receiver's state, based on the suppled Have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
*/
package com.salesforce.apollo.ethereal.memberships;

import com.codahale.metrics.Timer;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.archipelago.server.FernetServerInterceptor;
import com.salesforce.apollo.context.Context;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.ethereal.Processor;
import com.salesforce.apollo.ethereal.memberships.comm.EtherealMetrics;
Expand All @@ -22,14 +20,17 @@
import com.salesforce.apollo.ethereal.proto.Update;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.ring.RingCommunications;
import com.salesforce.apollo.ring.SliceIterator;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
import io.grpc.StatusRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -51,29 +52,28 @@ public class ChRbcGossip {
private static final Logger log = LoggerFactory.getLogger(
ChRbcGossip.class);
private final CommonCommunications<Gossiper, GossiperService> comm;
private final Context<Member> context;
private final Digest id;
private final SigningMember member;
private final EtherealMetrics metrics;
private final Processor processor;
private final RingCommunications<Member, Gossiper> ring;
private final SliceIterator<Gossiper> ring;
private final AtomicBoolean started = new AtomicBoolean();
private final Terminal terminal = new Terminal();
private final List<Member> membership;
private volatile ScheduledFuture<?> scheduled;

public ChRbcGossip(Context<Member> context, SigningMember member, Processor processor, Router communications,
EtherealMetrics m) {
public ChRbcGossip(Digest id, SigningMember member, Collection<Member> membership, Processor processor,
Router communications, EtherealMetrics m) {
this.processor = processor;
this.context = context;
this.member = member;
this.metrics = m;
comm = communications.create(member, context.getId(), terminal, getClass().getCanonicalName(),
this.id = id;
this.membership = new ArrayList<>(membership);
comm = communications.create(member, id, terminal, getClass().getCanonicalName(),
r -> new GossiperServer(communications.getClientIdentityProvider(), metrics, r),
getCreate(metrics), Gossiper.getLocalLoopback(member));
ring = new RingCommunications<>(context, member, this.comm);
}

public Context<Member> getContext() {
return context;
ring = new SliceIterator<Gossiper>("ChRbcGossip[%s on: %s]".formatted(id, member.getId()), member, membership,
comm);
}

/**
Expand All @@ -91,16 +91,20 @@ public void start(Duration duration, Predicate<FernetServerInterceptor.HashedTok
return;
}
Duration initialDelay = duration.plusMillis(Entropy.nextBitsStreamLong(duration.toMillis()));
log.trace("Starting GossipService[{}] on: {}", context.getId(), member.getId());
comm.register(context.getId(), terminal, validator);
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> {
try {
oneRound(duration, scheduler);
} catch (Throwable e) {
log.error("Error in gossip on: {}", member.getId(), e);
}
}, log)), initialDelay.toMillis(), TimeUnit.MILLISECONDS);
log.trace("Starting GossipService[{}] on: {}", id, member.getId());
comm.register(id, terminal, validator);
try {
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> {
try {
gossip(duration, scheduler);
} catch (Throwable e) {
log.error("Error in gossip on: {}", member.getId(), e);
}
}, log)), initialDelay.toMillis(), TimeUnit.MILLISECONDS);
} catch (Throwable e) {
log.error("Error in gossip on: {}", member.getId(), e);
}
}

/**
Expand All @@ -110,94 +114,85 @@ public void stop() {
if (!started.compareAndSet(true, false)) {
return;
}
log.trace("Stopping GossipService [{}] for {}", context.getId(), member.getId());
comm.deregister(context.getId());
log.trace("Stopping GossipService [{}] for {}", id, member.getId());
comm.deregister(id);
final var current = scheduled;
scheduled = null;
if (current != null) {
current.cancel(true);
}
}

private void gossip(Duration frequency, ScheduledExecutorService scheduler) {
if (!started.get()) {
return;
}
var timer = metrics == null ? null : metrics.gossipRoundDuration().time();
ring.iterate((link, _) -> gossipRound(link), (result, link, _) -> {
handle(result, link);
return true;
}, () -> {
if (timer != null) {
timer.stop();
}
if (started.get()) {
scheduled = scheduler.schedule(
() -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(frequency, scheduler), log)),
frequency.toNanos(), TimeUnit.NANOSECONDS);
}
}, frequency);
}

/**
* Perform the first phase of the gossip. Send our partner the Have state of the receiver
*/
private Update gossipRound(Gossiper link, int ring) {
private Update gossipRound(Gossiper link) {
if (!started.get()) {
return null;
}
log.trace("gossiping[{}] with {} ring: {} on {}", context.getId(), link.getMember(), ring, member);
log.trace("gossiping[{}] with {} on {}", id, link.getMember(), member);
try {
return link.gossip(processor.gossip(context.getId(), ring));
return link.gossip(processor.gossip(id));
} catch (StatusRuntimeException e) {
log.debug("gossiping[{}] failed: {} with: {} with {} ring: {} on: {}", context.getId(), e.getMessage(),
member.getId(), ring, link.getMember().getId(), member.getId());
log.debug("gossiping[{}] failed: {} with: {} with {} on: {}", id, e.getMessage(), member.getId(),
link.getMember().getId(), member.getId());
return null;
} catch (Throwable e) {
log.warn("gossiping[{}] failed: {} from {} with {} ring: {} on: {}", context.getId(), member.getId(), ring,
link.getMember().getId(), ring, member.getId(), e);
log.warn("gossiping:{} with: {} failed on: {}", id, link.getMember().getId(), member.getId(), e);
return null;
}
}

/**
* The second phase of the gossip. Handle the update from our gossip partner
*/
private void handle(Optional<Update> result, RingCommunications.Destination<Member, Gossiper> destination,
Duration duration, ScheduledExecutorService scheduler, Timer.Context timer) {
if (!started.get() || destination == null || destination.link() == null) {
if (timer != null) {
timer.stop();
}
private void handle(Optional<Update> result, Gossiper link) {
if (!started.get()) {
return;
}
try {
if (result.isEmpty()) {
if (timer != null) {
timer.stop();
}
return;
}
Update update = result.get();
if (update.equals(Update.getDefaultInstance())) {
return;
}
try {
var u = processor.update(update);
if (!Update.getDefaultInstance().equals(u)) {
log.trace("Gossip update with: {} on: {}", destination.member().getId(), member.getId());
destination.link()
.update(ContextUpdate.newBuilder().setRing(destination.ring()).setUpdate(u).build());
}
} catch (StatusRuntimeException e) {
log.debug("gossiping[{}] failed: {} with: {} with {} ring: {} on: {}", context.getId(), e.getMessage(),
member.getId(), ring, destination.member().getId(), member.getId());
} catch (Throwable e) {
log.warn("gossiping[{}] failed: {} with: {} with {} ring: {} on: {}", context.getId(), e.getMessage(),
member.getId(), ring, destination.member().getId(), member.getId(), e);
}
} finally {
if (timer != null) {
timer.stop();
}
if (started.get()) {
scheduled = scheduler.schedule(
() -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(duration, scheduler), log)),
duration.toMillis(), TimeUnit.MILLISECONDS);
}
if (link == null) {
return;
}
}

/**
* Perform one round of gossip
*/
private void oneRound(Duration duration, ScheduledExecutorService scheduler) {
if (!started.get()) {
if (result.isEmpty()) {
return;
}
var timer = metrics == null ? null : metrics.gossipRoundDuration().time();
ring.execute(this::gossipRound,
(result, destination) -> handle(result, destination, duration, scheduler, timer));
Update update = result.get();
if (update.equals(Update.getDefaultInstance())) {
return;
}
try {
var u = processor.update(update);
if (!Update.getDefaultInstance().equals(u)) {
log.trace("Gossip update with: {} on: {}", link.getMember().getId(), member.getId());
link.update(ContextUpdate.newBuilder().setUpdate(u).build());
}
} catch (StatusRuntimeException e) {
log.debug("gossiping[{}] failed: {} with: {} with {} on: {}", id, e.getMessage(), member.getId(),
link.getMember().getId(), member.getId());
} catch (Throwable e) {
log.warn("gossiping[{}] failed: {} with: {} with {} on: {}", id, e.getMessage(), member.getId(),
link.getMember().getId(), member.getId(), e);
}
}

/**
Expand All @@ -206,13 +201,6 @@ private void oneRound(Duration duration, ScheduledExecutorService scheduler) {
private class Terminal implements GossiperService, Router.ServiceRouting {
@Override
public Update gossip(Gossip request, Digest from) {
Member predecessor = context.predecessor(request.getRing(), member);
if (predecessor == null || !from.equals(predecessor.getId())) {
log.debug("Invalid inbound gossip context: {} from: {} on ring: {} - not predecessor: {} on: {}",
context.getId(), from, request.getRing(),
predecessor == null ? "<null>" : predecessor.getId(), member.getId());
return Update.getDefaultInstance();
}
final var update = processor.gossip(request);
log.trace("GossipService received from: {} missing: {} on: {}", from, update.getMissingCount(),
member.getId());
Expand All @@ -221,13 +209,6 @@ public Update gossip(Gossip request, Digest from) {

@Override
public void update(ContextUpdate request, Digest from) {
Member predecessor = context.predecessor(request.getRing(), member);
if (predecessor == null || !from.equals(predecessor.getId())) {
log.debug("Invalid inbound update context:{} from: {} on ring: {} - not predecessor: {} on: {}",
context.getId(), from, request.getRing(),
predecessor == null ? "<null>" : predecessor.getId(), member.getId());
return;
}
log.trace("gossip update with {} on: {}", from, member.getId());
processor.updateFrom(request.getUpdate());
}
Expand Down
Loading

0 comments on commit deacf1a

Please sign in to comment.