Skip to content

Commit

Permalink
Moar cleanup wrt scheduling
Browse files Browse the repository at this point in the history
Also remove delegated verifier/validation as I don't believe that will be necessary
  • Loading branch information
Hellblazer committed Jan 1, 2024
1 parent 6dc1ad6 commit 8de73a2
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 51 deletions.
5 changes: 3 additions & 2 deletions choam/src/main/java/com/salesforce/apollo/choam/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.salesforce.apollo.cryptography.JohnHancock;
import com.salesforce.apollo.cryptography.Signer;
import com.salesforce.apollo.cryptography.Verifier;
import com.salesforce.apollo.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -169,7 +170,7 @@ public <T> CompletableFuture<T> submit(Message transaction, Duration timeout) th
if (params.metrics() != null) {
params.metrics().transactionSubmittedSuccess();
}
var futureTimeout = scheduler.schedule(() -> {
var futureTimeout = scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> {
if (result.isDone()) {
return;
}
Expand All @@ -179,7 +180,7 @@ public <T> CompletableFuture<T> submit(Message transaction, Duration timeout) th
if (params.metrics() != null) {
params.metrics().transactionComplete(to);
}
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
}, log)), timeout.toMillis(), TimeUnit.MILLISECONDS);

return result.whenComplete((r, t) -> {
futureTimeout.cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
package com.salesforce.apollo.choam;

import com.chiralbehaviors.tron.Fsm;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.cryptography.proto.PubKey;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.choam.comm.Terminal;
import com.salesforce.apollo.choam.fsm.Reconfiguration;
import com.salesforce.apollo.choam.fsm.Reconfiguration.Reconfigure;
import com.salesforce.apollo.choam.fsm.Reconfiguration.Transitions;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.proto.PubKey;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.ring.SliceIterator;
import com.salesforce.apollo.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -150,7 +151,8 @@ private void completeSlice(AtomicReference<Duration> retryDelay, AtomicReference
proposals.keySet().stream().toList(), nextAssembly.size(), delay, params().member().getId());
if (!cancelSlice.get()) {
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())
.schedule(() -> reiterate.get().run(), delay.toMillis(), TimeUnit.MILLISECONDS);
.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(reiterate.get(), log)), delay.toMillis(),
TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

import com.google.common.collect.Multiset;
import com.google.common.collect.TreeMultiset;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.bloomFilters.BloomFilter.ULongBloomFilter;
import com.salesforce.apollo.choam.Parameters;
import com.salesforce.apollo.choam.comm.Concierge;
import com.salesforce.apollo.choam.comm.Terminal;
import com.salesforce.apollo.choam.proto.*;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.cryptography.HexBloom;
Expand All @@ -23,6 +23,7 @@
import com.salesforce.apollo.ring.RingIterator;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Pair;
import com.salesforce.apollo.utils.Utils;
import org.joou.ULong;
import org.joou.Unsigned;
import org.slf4j.Logger;
Expand Down Expand Up @@ -377,22 +378,22 @@ private void scheduleAnchorCompletion(AtomicReference<ULong> start, ULong anchor
}
log.info("Scheduling Anchor completion ({} to {}) duration: {} on: {}", start, anchorTo,
params.gossipDuration(), params.member().getId());
scheduler.schedule(() -> {
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> {
try {
anchor(start, anchorTo);
} catch (Throwable e) {
log.error("Cannot execute completeViewChain on: {}", params.member().getId());
sync.completeExceptionally(e);
}
}, params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS);
}, log)), params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS);
}

private void scheduleSample() {
if (sync.isDone()) {
return;
}
log.info("Scheduling state sample on: {}", params.member().getId());
scheduler.schedule(() -> {
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> {
final HashedCertifiedBlock established = genesis;
if (sync.isDone() || established != null) {
log.trace("Synchronization isDone: {} genesis: {} on: {}", sync.isDone(),
Expand All @@ -406,7 +407,7 @@ private void scheduleSample() {
sync.completeExceptionally(e);
e.printStackTrace();
}
}, params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS);
}, log)), params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS);
}

private void scheduleViewChainCompletion(AtomicReference<ULong> start, ULong to) {
Expand All @@ -418,14 +419,14 @@ private void scheduleViewChainCompletion(AtomicReference<ULong> start, ULong to)
}
log.info("Scheduling view chain completion ({} to {}) duration: {} on: {}", start, to, params.gossipDuration(),
params.member().getId());
scheduler.schedule(() -> {
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> {
try {
completeViewChain(start, to);
} catch (Throwable e) {
log.error("Cannot execute completeViewChain on: {}", params.member().getId());
sync.completeExceptionally(e);
}
}, params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS);
}, log)), params.gossipDuration().toNanos(), TimeUnit.NANOSECONDS);
}

private boolean synchronize(Optional<Initial> futureSailor, HashMap<Digest, Initial> votes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
*/
package com.salesforce.apollo.choam.support;

import com.salesforce.apollo.choam.proto.Checkpoint;
import com.salesforce.apollo.choam.proto.CheckpointReplication;
import com.salesforce.apollo.choam.proto.CheckpointSegments;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
import com.salesforce.apollo.choam.comm.Concierge;
import com.salesforce.apollo.choam.comm.Terminal;
import com.salesforce.apollo.choam.proto.Checkpoint;
import com.salesforce.apollo.choam.proto.CheckpointReplication;
import com.salesforce.apollo.choam.proto.CheckpointSegments;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.cryptography.HexBloom;
Expand All @@ -21,6 +21,7 @@
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.ring.RingIterator;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
import org.h2.mvstore.MVMap;
import org.joou.ULong;
import org.slf4j.Logger;
Expand Down Expand Up @@ -115,9 +116,9 @@ private void gossip(ScheduledExecutorService scheduler, Duration duration) {
member.getId());
var ringer = new RingIterator<>(frequency, context, member, comms, true, scheduler);
ringer.iterate(randomCut(digestAlgorithm), (link, ring) -> gossip(link),
(tally, result, destination) -> gossip(result),
t -> scheduler.schedule(() -> gossip(scheduler, duration), duration.toMillis(),
TimeUnit.MILLISECONDS));
(tally, result, destination) -> gossip(result), t -> scheduler.schedule(
() -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(scheduler, duration), log)), duration.toMillis(),
TimeUnit.MILLISECONDS));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
package com.salesforce.apollo.ethereal.memberships;

import com.codahale.metrics.Timer;
import com.salesforce.apollo.ethereal.proto.ContextUpdate;
import com.salesforce.apollo.ethereal.proto.Gossip;
import com.salesforce.apollo.ethereal.proto.Update;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.cryptography.Digest;
Expand All @@ -18,11 +15,15 @@
import com.salesforce.apollo.ethereal.memberships.comm.Gossiper;
import com.salesforce.apollo.ethereal.memberships.comm.GossiperServer;
import com.salesforce.apollo.ethereal.memberships.comm.GossiperService;
import com.salesforce.apollo.ethereal.proto.ContextUpdate;
import com.salesforce.apollo.ethereal.proto.Gossip;
import com.salesforce.apollo.ethereal.proto.Update;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.ring.RingCommunications;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
import io.grpc.StatusRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -84,13 +85,13 @@ public void start(Duration duration) {
log.trace("Starting GossipService[{}] on: {}", context.getId(), member.getId());
comm.register(context.getId(), new Terminal());
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
scheduler.schedule(() -> {
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> {
try {
oneRound(duration, scheduler);
} catch (Throwable e) {
log.error("Error in gossip on: {}", member.getId(), e);
}
}, initialDelay.toMillis(), TimeUnit.MILLISECONDS);
}, log)), initialDelay.toMillis(), TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -175,8 +176,9 @@ private void handle(Optional<Update> result, RingCommunications.Destination<Memb
timer.stop();
}
if (started.get()) {
scheduled = scheduler.schedule(() -> oneRound(duration, scheduler), duration.toMillis(),
TimeUnit.MILLISECONDS);
scheduled = scheduler.schedule(
() -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(duration, scheduler), log)),
duration.toMillis(), TimeUnit.MILLISECONDS);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ void seeding() {
return link.seed(registration);
}, (futureSailor, link, m) -> complete(redirect, futureSailor, m), () -> {
if (!redirect.isDone()) {
scheduler.schedule(Utils.wrapped(reseed.get(), log), params.retryDelay().toNanos(),
TimeUnit.NANOSECONDS);
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(reseed.get(), log)),
params.retryDelay().toNanos(), TimeUnit.NANOSECONDS);
}
}, scheduler, params.retryDelay());
});
Expand Down Expand Up @@ -299,7 +299,7 @@ private void join(Redirect redirect, Digest v, Duration duration) {
params.joinRetries(), node.getId());
trusts.clear();
initialSeedSet.clear();
scheduler.schedule(Utils.wrapped(regate.get(), log),
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(regate.get(), log)),
Entropy.nextBitsStreamLong(params.retryDelay().toNanos()),
TimeUnit.NANOSECONDS);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
import java.util.concurrent.Executors;

/**
* Verifiers that delegate to the joining member's successors in the full context for key state retrieval
* <p>
* This is used to bootstrap the node via delegated key state resolution of the joined group
* Verifiers that delegate to a majority of the sample for event validation and verification
* </p>
*
* @author hal.hildebrand
Expand Down
25 changes: 14 additions & 11 deletions fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ public class View {
private final ReadWriteLock viewChange = new ReentrantReadWriteLock(
true);
private final ViewManagement viewManagement;
private final EventValidation.DelegatedEventValidation validation;
private final Verifiers.DelegatedVerifiers verifiers;
private final EventValidation validation;
private final Verifiers verifiers;
private volatile ScheduledFuture<?> futureGossip;

public View(Context<Participant> context, ControlledIdentifierMember member, InetSocketAddress endpoint,
Expand Down Expand Up @@ -138,8 +138,8 @@ public View(Context<Participant> context, ControlledIdentifierMember member, Ine
r -> new EntranceServer(gateway.getClientIdentityProvider(), r, metrics),
EntranceClient.getCreate(metrics), Entrance.getLocalLoopback(node, service));
gossiper = new RingCommunications<>(context, node, comm);
this.validation = new EventValidation.DelegatedEventValidation(validation);
this.verifiers = new Verifiers.DelegatedVerifiers(verifiers);
this.validation = validation;
this.verifiers = verifiers;
}

/**
Expand Down Expand Up @@ -222,9 +222,10 @@ public void start(CompletableFuture<Void> onJoin, Duration d, List<Seed> seedpod

var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
var initial = Entropy.nextBitsStreamLong(d.toNanos());
scheduler.schedule(Utils.wrapped(
() -> new Binding(this, seeds, d, context, approaches, node, params, metrics, digestAlgo).seeding(), log),
initial, TimeUnit.NANOSECONDS);
scheduler.schedule(() -> Thread.ofVirtual()
.start(Utils.wrapped(
() -> new Binding(this, seeds, d, context, approaches, node, params, metrics,
digestAlgo).seeding(), log)), initial, TimeUnit.NANOSECONDS);

log.info("{} started on: {}", context.getId(), node.getId());
}
Expand Down Expand Up @@ -495,8 +496,9 @@ void resetBootstrapView() {

void schedule(final Duration duration) {
var scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
futureGossip = scheduler.schedule(Utils.wrapped(() -> gossip(duration, scheduler), log),
Entropy.nextBitsStreamLong(duration.toNanos()), TimeUnit.NANOSECONDS);
futureGossip = scheduler.schedule(
() -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(duration, scheduler), log)),
Entropy.nextBitsStreamLong(duration.toNanos()), TimeUnit.NANOSECONDS);
}

void scheduleFinalizeViewChange() {
Expand Down Expand Up @@ -1083,8 +1085,9 @@ private void gossip(Optional<Gossip> result, RingCommunications.Destination<Part
}

} finally {
futureGossip = scheduler.schedule(Utils.wrapped(() -> gossip(duration, scheduler), log), duration.toNanos(),
TimeUnit.NANOSECONDS);
futureGossip = scheduler.schedule(
() -> Thread.ofVirtual().start(Utils.wrapped(() -> gossip(duration, scheduler), log)), duration.toNanos(),
TimeUnit.NANOSECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ void populate(List<Participant> sample) {
return !joined();
}, () -> {
if (!joined()) {
scheduler.schedule(Utils.wrapped(() -> repopulate.get(), log), 500, TimeUnit.MILLISECONDS);
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(repopulate.get(), log)), 500,
TimeUnit.MILLISECONDS);
}
}, scheduler, Duration.ofMillis(500));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.salesforce.apollo.ring.RingIterator;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Hex;
import com.salesforce.apollo.utils.Utils;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.h2.mvstore.MVMap;
Expand Down Expand Up @@ -380,7 +381,8 @@ private void reconcile(Optional<Update> result,
member.getId());
}
if (started.get()) {
scheduler.schedule(() -> reconcile(scheduler, duration), duration.toNanos(), TimeUnit.NANOSECONDS);
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> reconcile(scheduler, duration), log)),
duration.toNanos(), TimeUnit.NANOSECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.salesforce.apollo.cryptography.proto.Biff;
import com.salesforce.apollo.messaging.proto.*;
import com.salesforce.apollo.archipelago.Router;
import com.salesforce.apollo.archipelago.RouterImpl.CommonCommunications;
import com.salesforce.apollo.bloomFilters.BloomFilter;
Expand All @@ -20,13 +18,16 @@
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.cryptography.JohnHancock;
import com.salesforce.apollo.cryptography.proto.Biff;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.Member;
import com.salesforce.apollo.membership.SigningMember;
import com.salesforce.apollo.membership.messaging.rbc.comms.RbcServer;
import com.salesforce.apollo.membership.messaging.rbc.comms.ReliableBroadcast;
import com.salesforce.apollo.messaging.proto.*;
import com.salesforce.apollo.ring.RingCommunications;
import com.salesforce.apollo.utils.Entropy;
import com.salesforce.apollo.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -187,7 +188,8 @@ public void start(Duration duration) {
log.info("Starting Reliable Broadcaster[{}] for {}", context.getId(), member.getId());
comm.register(context.getId(), new Service());
var scheduler = Executors.newScheduledThreadPool(2, Thread.ofVirtual().factory());
scheduler.schedule(() -> oneRound(duration, scheduler), initialDelay, TimeUnit.MILLISECONDS);
scheduler.schedule(() -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(duration, scheduler), log)),
initialDelay, TimeUnit.MILLISECONDS);
}

public void stop() {
Expand Down Expand Up @@ -251,7 +253,9 @@ private void handle(Optional<Reconcile> result,
}
if (started.get()) {
try {
scheduler.schedule(() -> oneRound(duration, scheduler), duration.toMillis(), TimeUnit.MILLISECONDS);
scheduler.schedule(
() -> Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(duration, scheduler), log)),
duration.toMillis(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
return;
}
Expand Down
Loading

0 comments on commit 8de73a2

Please sign in to comment.