Skip to content

Commit

Permalink
This seems to make everything all right again.
Browse files Browse the repository at this point in the history
Fix/revert a lot of threading changes made that effed things up.  Scheduled execs do not work well with VThreads, so always fork the running.
  • Loading branch information
Hellblazer committed Dec 31, 2023
1 parent 2d8b234 commit d79238b
Show file tree
Hide file tree
Showing 32 changed files with 200 additions and 218 deletions.
2 changes: 1 addition & 1 deletion choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class CHOAM {
public CHOAM(Parameters params) {
this.store = new Store(params.digestAlgorithm(), params.mvBuilder().clone().build());
this.params = params;
executions = Executors.newCachedThreadPool(Thread.ofVirtual().factory());
executions = Executors.newVirtualThreadPerTaskExecutor();

nextView();
combine = new ReliableBroadcaster(params.context(), params.member(), params.combine(), params.communications(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
import com.salesforce.apollo.choam.proto.Block;
import com.salesforce.apollo.choam.proto.CertifiedBlock;
import com.salesforce.apollo.choam.proto.Header;
import com.salesforce.apollo.choam.proto.SubmitResult;
import com.salesforce.apollo.choam.proto.SubmitResult.Result;
import com.salesforce.apollo.test.proto.ByteMessage;
import com.salesforce.apollo.choam.Parameters.RuntimeParameters;
import com.salesforce.apollo.choam.support.HashedCertifiedBlock;
import com.salesforce.apollo.choam.support.InvalidTransaction;
import com.salesforce.apollo.choam.support.SubmittedTransaction;
Expand All @@ -31,6 +30,7 @@
import com.salesforce.apollo.stereotomy.StereotomyImpl;
import com.salesforce.apollo.stereotomy.mem.MemKERL;
import com.salesforce.apollo.stereotomy.mem.MemKeyStore;
import com.salesforce.apollo.test.proto.ByteMessage;
import io.grpc.StatusRuntimeException;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,7 +103,7 @@ public void func() throws Exception {

@Test
public void scalingTest() throws Exception {
var exec = Executors.newCachedThreadPool(Thread.ofVirtual().factory());
var exec = Executors.newVirtualThreadPerTaskExecutor();
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
Context<Member> context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getOrigin(), 9, 0.2, 3);
var entropy = SecureRandom.getInstance("SHA1PRNG");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
package com.salesforce.apollo.choam;

import com.google.protobuf.ByteString;
import com.salesforce.apollo.test.proto.ByteMessage;
import com.salesforce.apollo.choam.support.InvalidTransaction;
import com.salesforce.apollo.test.proto.ByteMessage;
import com.salesforce.apollo.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,8 +24,7 @@
class Transactioneer {
private final static Random entropy = new Random();
private final static Logger log = LoggerFactory.getLogger(Transactioneer.class);
private final static Executor executor = Executors.newCachedThreadPool(
Thread.ofVirtual().factory());
private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual()
.factory());
private final AtomicInteger completed = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.salesforce.apollo.ring.SliceIterator;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.joou.ULong;
import org.slf4j.Logger;
Expand All @@ -35,7 +34,7 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -56,17 +55,15 @@ class Binding {
private final FireflyMetrics metrics;
private final Node node;
private final Parameters params;
private final ScheduledExecutorService scheduler;
private final List<Seed> seeds;
private final View view;

public Binding(View view, List<Seed> seeds, Duration duration, ScheduledExecutorService scheduler,
Context<Participant> context, CommonCommunications<Entrance, Service> approaches, Node node,
Parameters params, FireflyMetrics metrics, DigestAlgorithm digestAlgo) {
public Binding(View view, List<Seed> seeds, Duration duration, Context<Participant> context,
CommonCommunications<Entrance, Service> approaches, Node node, Parameters params,
FireflyMetrics metrics, DigestAlgorithm digestAlgo) {
this.view = view;
this.duration = duration;
this.seeds = new ArrayList<>(seeds);
this.scheduler = scheduler;
this.context = context;
this.node = node;
this.params = params;
Expand All @@ -84,9 +81,9 @@ void seeding() {
log.info("Seeding view: {} context: {} with seeds: {} started on: {}", view.currentView(), this.context.getId(),
seeds.size(), node.getId());

var seeding = new CompletableFuture<Redirect>();
var redirect = new CompletableFuture<Redirect>();
var timer = metrics == null ? null : metrics.seedDuration().time();
seeding.whenComplete(join(duration, scheduler, timer));
redirect.whenComplete(join(duration, timer));

var bootstrappers = seeds.stream()
.map(this::seedFor)
Expand All @@ -95,13 +92,14 @@ void seeding() {
.collect(Collectors.toList());
var seedlings = new SliceIterator<>("Seedlings", node, bootstrappers, approaches);
AtomicReference<Runnable> reseed = new AtomicReference<>();
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
reseed.set(() -> {
final var registration = registration();
seedlings.iterate((link, m) -> {
log.debug("Requesting Seeding from: {} on: {}", link.getMember().getId(), node.getId());
return link.seed(registration);
}, (futureSailor, link, m) -> complete(seeding, futureSailor, m), () -> {
if (!seeding.isDone()) {
}, (futureSailor, link, m) -> complete(redirect, futureSailor, m), () -> {
if (!redirect.isDone()) {
scheduler.schedule(Utils.wrapped(reseed.get(), log), params.retryDelay().toNanos(),
TimeUnit.NANOSECONDS);
}
Expand All @@ -115,22 +113,23 @@ private void bootstrap() {
node.getId());
var nw = node.getNote();

view.bootstrap(nw, scheduler, duration);
view.bootstrap(nw, duration);
}

private boolean complete(CompletableFuture<Redirect> redirect, Optional<Redirect> futureSailor, Member m) {
if (futureSailor.isEmpty()) {
return true;
}
if (redirect.isDone()) {
return true;
return false;
}
final var r = futureSailor.get();
if (redirect.complete(r)) {
log.info("Redirect to view: {} context: {} from: {} on: {}", Digest.from(r.getView()), this.context.getId(),
m.getId(), node.getId());
log.info("Redirected to view: {} context: {} from: {} on: {}", Digest.from(r.getView()),
this.context.getId(), m.getId(), node.getId());
return false;
}
return false;
return true;
}

private boolean completeGateway(Participant member, CompletableFuture<Bound> gateway,
Expand Down Expand Up @@ -180,34 +179,37 @@ private boolean completeGateway(Participant member, CompletableFuture<Bound> gat
}

private void gatewaySRE(Digest v, Entrance link, StatusRuntimeException sre, AtomicInteger abandon) {
if (sre.getStatus().getCode().equals(Status.OUT_OF_RANGE.getCode())) {
switch (sre.getStatus().getCode()) {
case OUT_OF_RANGE -> {
log.info("Gateway view: {} invalid: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
} else if (sre.getStatus().getCode().equals(Status.FAILED_PRECONDITION.getCode())) {
}
case FAILED_PRECONDITION -> {
log.info("Gateway view: {} unavailable: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
} else if (sre.getStatus().getCode().equals(Status.PERMISSION_DENIED.getCode())) {
}
case PERMISSION_DENIED -> {
log.info("Gateway view: {} permission denied: {} from: {} on: {}", v, sre.getMessage(),
link.getMember().getId(), node.getId());
abandon.incrementAndGet();
} else if (sre.getStatus().getCode().equals(Status.RESOURCE_EXHAUSTED.getCode())) {
}
case RESOURCE_EXHAUSTED -> {
log.info("Gateway view: {} full: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
abandon.incrementAndGet();
} else {
log.info("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
}
default -> log.info("Join view: {} error: {} from: {} on: {}", v, sre.getMessage(), link.getMember().getId(),
node.getId());
}
}

private Join join(Digest v) {
return Join.newBuilder().setView(v.toDigeste()).setNote(node.getNote().getWrapped()).build();
}

private BiConsumer<? super Redirect, ? super Throwable> join(Duration duration, ScheduledExecutorService scheduler,
Timer.Context timer) {
private BiConsumer<? super Redirect, ? super Throwable> join(Duration duration, Timer.Context timer) {
return (r, t) -> {
if (t != null) {
log.error("Failed seeding on: {}", node.getId(), t);
Expand All @@ -228,11 +230,11 @@ private Join join(Digest v) {
if (timer != null) {
timer.close();
}
join(r, view, duration, scheduler);
join(r, view, duration);
};
}

private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecutorService scheduler) {
private void join(Redirect redirect, Digest v, Duration duration) {
var sample = redirect.getSampleList()
.stream()
.map(sn -> new NoteWrapper(sn.getNote(), digestAlgo))
Expand All @@ -242,7 +244,7 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu
node.getId());
var gateway = new CompletableFuture<Bound>();
var timer = metrics == null ? null : metrics.joinDuration().time();
gateway.whenComplete(view.join(scheduler, duration, timer));
gateway.whenComplete(view.join(duration, timer));

var regate = new AtomicReference<Runnable>();
var retries = new AtomicInteger();
Expand All @@ -261,6 +263,7 @@ private void join(Redirect redirect, Digest v, Duration duration, ScheduledExecu
var majority = redirect.getBootstrap() ? 1 : Context.minimalQuorum(redirect.getRings(), this.context.getBias());
final var join = join(v);
final var abandon = new AtomicInteger();
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
regate.set(() -> {
redirecting.iterate((link, m) -> {
log.debug("Joining: {} contacting: {} on: {}", v, link.getMember().getId(), node.getId());
Expand Down
24 changes: 12 additions & 12 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ public UUID register(ViewLifecycleListener listener) {
/**
* Start the View
*/
public void start(CompletableFuture<Void> onJoin, Duration d, List<Seed> seedpods,
ScheduledExecutorService scheduler) {
public void start(CompletableFuture<Void> onJoin, Duration d, List<Seed> seedpods) {
Objects.requireNonNull(onJoin, "Join completion must not be null");
if (!started.compareAndSet(false, true)) {
return;
Expand All @@ -214,23 +213,24 @@ public void start(CompletableFuture<Void> onJoin, Duration d, List<Seed> seedpod
context.clear();
node.reset();

var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
var initial = Entropy.nextBitsStreamLong(d.toNanos());
scheduler.schedule(Utils.wrapped(
() -> new Binding(this, seeds, d, scheduler, context, approaches, node, params, metrics, digestAlgo).seeding(),
log), initial, TimeUnit.NANOSECONDS);
() -> new Binding(this, seeds, d, context, approaches, node, params, metrics, digestAlgo).seeding(), log),
initial, TimeUnit.NANOSECONDS);

log.info("{} started on: {}", context.getId(), node.getId());
}

/**
* Start the View
*/
public void start(Runnable onJoin, Duration d, List<Seed> seedpods, ScheduledExecutorService scheduler) {
public void start(Runnable onJoin, Duration d, List<Seed> seedpods) {
final var futureSailor = new CompletableFuture<Void>();
futureSailor.whenComplete((v, t) -> {
onJoin.run();
});
start(futureSailor, d, seedpods, scheduler);
start(futureSailor, d, seedpods);
}

/**
Expand Down Expand Up @@ -326,8 +326,8 @@ boolean addToView(NoteWrapper note) {
return true;
}

void bootstrap(NoteWrapper nw, ScheduledExecutorService sched, Duration dur) {
viewManagement.bootstrap(nw, sched, dur);
void bootstrap(NoteWrapper nw, Duration dur) {
viewManagement.bootstrap(nw, dur);
}

Digest bootstrapView() {
Expand Down Expand Up @@ -405,9 +405,8 @@ void introduced() {
introduced.set(true);
}

BiConsumer<? super Bound, ? super Throwable> join(ScheduledExecutorService scheduler, Duration duration,
com.codahale.metrics.Timer.Context timer) {
return viewManagement.join(scheduler, duration, timer);
BiConsumer<? super Bound, ? super Throwable> join(Duration duration, com.codahale.metrics.Timer.Context timer) {
return viewManagement.join(duration, timer);
}

void notifyListeners(List<EstablishmentEvent> joining, List<Digest> leaving) {
Expand Down Expand Up @@ -490,7 +489,8 @@ void resetBootstrapView() {
viewManagement.resetBootstrapView();
}

void schedule(final Duration duration, final ScheduledExecutorService scheduler) {
void schedule(final Duration duration) {
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
futureGossip = scheduler.schedule(Utils.wrapped(() -> gossip(duration, scheduler), log),
Entropy.nextBitsStreamLong(duration.toNanos()), TimeUnit.NANOSECONDS);
}
Expand Down
Loading

0 comments on commit d79238b

Please sign in to comment.