From e40b25cb620a20e65ee0bc0d0f374d9a8f501c45 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Thu, 27 Jun 2024 20:07:33 -0700 Subject: [PATCH] consolidate executors - just fork threads - and schedulers. better concurrency control/management --- .../com/salesforce/apollo/choam/CHOAM.java | 9 ++---- .../com/salesforce/apollo/choam/Session.java | 28 ++++++++---------- .../apollo/choam/support/Bootstrapper.java | 5 ++-- .../salesforce/apollo/choam/SessionTest.java | 5 ++-- .../choam/support/BootstrapperTest.java | 3 +- .../com/salesforce/apollo/fireflies/View.java | 19 ++++++++---- .../messaging/rbc/ReliableBroadcaster.java | 2 +- .../com/salesforce/apollo/state/Emulator.java | 29 +++++++++++-------- 8 files changed, 54 insertions(+), 46 deletions(-) diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index 79f3d4e24..f90ad6ca0 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -150,7 +150,7 @@ public CHOAM(Parameters params) { roundScheduler = new RoundScheduler("CHOAM" + params.member().getId() + params.context().getId(), params.context().timeToLive()); combine.register(_ -> roundScheduler.tick()); - session = new Session(params, service()); + session = new Session(params, service(), scheduler); } public static Checkpoint checkpoint(DigestAlgorithm algo, File state, int segmentSize, Digest initial, int crowns, @@ -360,10 +360,6 @@ public void stop() { } catch (Throwable e) { } session.cancelAll(); - try { - session.stop(); - } catch (Throwable e) { - } final var c = current.get(); if (c != null) { try { @@ -760,7 +756,8 @@ private void recover(HashedCertifiedBlock anchor) { log.info("Recovering from: {} height: {} on: {}", anchor.hash, anchor.height(), params.member().getId()); cancelSynchronization(); cancelBootstrap(); - futureBootstrap.set(new Bootstrapper(anchor, params, store, comm).synchronize().whenComplete((s, t) -> { + futureBootstrap.set( + new Bootstrapper(anchor, params, store, comm, scheduler).synchronize().whenComplete((s, t) -> { if (t == null) { try { synchronize(s); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Session.java b/choam/src/main/java/com/salesforce/apollo/choam/Session.java index f21d652ad..dbe6b5969 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Session.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Session.java @@ -40,19 +40,18 @@ */ public class Session { - private final static Logger log = LoggerFactory.getLogger( - Session.class); - private final Limiter limiter; - private final Parameters params; - private final Function service; - private final Map submitted = new ConcurrentHashMap<>(); - private final AtomicReference view = new AtomicReference<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, - Thread.ofVirtual() - .factory()); - private final AtomicInteger nonce = new AtomicInteger(); + private final static Logger log = LoggerFactory.getLogger(Session.class); - public Session(Parameters params, Function service) { + private final Limiter limiter; + private final Parameters params; + private final Function service; + private final Map submitted = new ConcurrentHashMap<>(); + private final AtomicReference view = new AtomicReference<>(); + private final ScheduledExecutorService scheduler; + private final AtomicInteger nonce = new AtomicInteger(); + + public Session(Parameters params, Function service, + ScheduledExecutorService scheduler) { this.params = params; this.service = service; final var metrics = params.metrics(); @@ -60,6 +59,7 @@ public Session(Parameters params, Function s .build(params.member().getId().shortString(), metrics == null ? EmptyMetricRegistry.INSTANCE : metrics.getMetricRegistry( params.context().getId().shortString() + ".txnLimiter")); + this.scheduler = scheduler; } public static Transaction transactionOf(Digest source, int nonce, Message message, Signer signer) { @@ -117,10 +117,6 @@ public void setView(HashedCertifiedBlock v) { } } - public void stop() { - scheduler.shutdown(); - } - /** * Submit a transaction. * diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java index 35155358c..691d332cb 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/Bootstrapper.java @@ -30,7 +30,6 @@ import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -60,11 +59,12 @@ public class Bootstrapper { private volatile HashedCertifiedBlock genesis; public Bootstrapper(HashedCertifiedBlock anchor, Parameters params, Store store, - CommonCommunications bootstrapComm) { + CommonCommunications bootstrapComm, ScheduledExecutorService scheduler) { this.anchor = anchor; this.params = params; this.store = store; this.comms = bootstrapComm; + this.scheduler = scheduler; CertifiedBlock g = store.getCertifiedBlock(ULong.valueOf(0)); store.put(anchor); if (g != null) { @@ -75,7 +75,6 @@ public Bootstrapper(HashedCertifiedBlock anchor, Parameters params, Store store, log.info("Restore using no prior state on: {}", params.member().getId()); lastCheckpoint = null; } - scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); } public CompletableFuture synchronize() { diff --git a/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java b/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java index 1f8b9405a..315a836a6 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java @@ -84,7 +84,8 @@ public void func() throws Exception { }); return SubmitResult.newBuilder().setResult(Result.PUBLISHED).build(); }; - Session session = new Session(params, service); + Session session = new Session(params, service, + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())); session.setView(new HashedCertifiedBlock(DigestAlgorithm.DEFAULT, CertifiedBlock.newBuilder() .setBlock(Block.newBuilder() .setHeader( @@ -136,7 +137,7 @@ public void scalingTest() throws Exception { MetricRegistry reg = new MetricRegistry(); Timer latency = reg.timer("Transaction latency"); - Session session = new Session(params, service); + Session session = new Session(params, service, scheduler); session.setView(new HashedCertifiedBlock(DigestAlgorithm.DEFAULT, CertifiedBlock.newBuilder() .setBlock(Block.newBuilder() .setHeader( diff --git a/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java b/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java index e0a2b6233..5c2805533 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/support/BootstrapperTest.java @@ -33,6 +33,7 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -107,7 +108,7 @@ public void smoke() throws Exception { context) .setMember(member) .build()), store, - comms); + comms, Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())); CompletableFuture syncFuture = boot.synchronize(); SynchronizedState state = syncFuture.get(10, TimeUnit.SECONDS); diff --git a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java index 9005abd90..d730d4741 100644 --- a/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java +++ b/fireflies/src/main/java/com/salesforce/apollo/fireflies/View.java @@ -96,7 +96,7 @@ public class View { private final RingCommunications gossiper; private final AtomicBoolean introduced = new AtomicBoolean(); private final Map> viewChangeListeners = new HashMap<>(); - private final Executor viewNotificationQueue; + private final Semaphore viewSerialization = new Semaphore(1); private final FireflyMetrics metrics; private final Node node; private final Map observations = new ConcurrentSkipListMap<>(); @@ -111,7 +111,6 @@ public class View { private final Verifiers verifiers; private final ScheduledExecutorService scheduler; private volatile ScheduledFuture futureGossip; - private volatile boolean boostrap = false; public View(DynamicContext context, ControlledIdentifierMember member, String endpoint, EventValidation validation, Verifiers verifiers, Router communications, Parameters params, @@ -144,7 +143,6 @@ public View(DynamicContext context, ControlledIdentifierMember memb gossiper.ignoreSelf(); this.validation = validation; this.verifiers = verifiers; - viewNotificationQueue = Executors.newSingleThreadExecutor(Thread.ofVirtual().factory()); viewChange = new ReentrantReadWriteLock(true); } @@ -243,6 +241,7 @@ public void stop() { if (!started.compareAndSet(true, false)) { return; } + viewSerialization.release(10000); roundTimers.reset(); comm.deregister(context.getId()); pendingRebuttals.clear(); @@ -328,7 +327,6 @@ boolean addToView(NoteWrapper note) { } void bootstrap(NoteWrapper nw, Duration dur) { - boostrap = true; viewManagement.bootstrap(nw, dur); } @@ -415,13 +413,24 @@ void notifyListeners(List joining, List leavin joining.stream().map(SelfAddressingIdentifier::getDigest).toList(), Collections.unmodifiableList(leaving)); viewChangeListeners.forEach((key, value) -> { - viewNotificationQueue.execute(Utils.wrapped(() -> { + Thread.ofVirtual().start(Utils.wrapped(() -> { + try { + viewSerialization.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + if (!started.get()) { + return; + } try { log.trace("Notifying: {} view change: {} cardinality: {} joins: {} leaves: {} on: {} ", key, currentView(), context.size(), joining.size(), leaving.size(), node.getId()); value.accept(viewChange); } catch (Throwable e) { log.error("error in view change listener: {} on: {} ", key, node.getId(), e); + } finally { + viewSerialization.release(); } }, log)); }); diff --git a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java index 97698686f..ee0455574 100644 --- a/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java +++ b/memberships/src/main/java/com/salesforce/apollo/membership/messaging/rbc/ReliableBroadcaster.java @@ -198,7 +198,7 @@ public void start(Duration duration, Predicate Thread.ofVirtual().start(Utils.wrapped(() -> oneRound(duration, scheduler), log)), initialDelay, TimeUnit.MILLISECONDS); } diff --git a/sql-state/src/main/java/com/salesforce/apollo/state/Emulator.java b/sql-state/src/main/java/com/salesforce/apollo/state/Emulator.java index 9890d50eb..c9cd2aed6 100644 --- a/sql-state/src/main/java/com/salesforce/apollo/state/Emulator.java +++ b/sql-state/src/main/java/com/salesforce/apollo/state/Emulator.java @@ -34,6 +34,8 @@ import java.sql.Connection; import java.util.Arrays; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -47,15 +49,16 @@ */ public class Emulator { - private final AtomicReference hash; - private final AtomicLong height = new AtomicLong(0); - private final ReentrantLock lock = new ReentrantLock(); - private final Mutator mutator; - private final Parameters params; - private final SqlStateMachine ssm; - private final AtomicBoolean started = new AtomicBoolean(); - private final TransactionExecutor txnExec; - private final AtomicInteger txnIndex = new AtomicInteger(0); + private final AtomicReference hash; + private final AtomicLong height = new AtomicLong(0); + private final ReentrantLock lock = new ReentrantLock(); + private final Mutator mutator; + private final Parameters params; + private final SqlStateMachine ssm; + private final AtomicBoolean started = new AtomicBoolean(); + private final TransactionExecutor txnExec; + private final AtomicInteger txnIndex = new AtomicInteger(0); + private final ScheduledExecutorService scheduler; public Emulator() throws IOException { this(DigestAlgorithm.DEFAULT.getOrigin().prefix(Entropy.nextBitsStreamLong())); @@ -64,11 +67,13 @@ public Emulator() throws IOException { public Emulator(Digest base) throws IOException { this(new SqlStateMachine(DigestAlgorithm.DEFAULT.getOrigin(), String.format("jdbc:h2:mem:emulation-%s-%s", base, Entropy.nextBitsStreamLong()), - new Properties(), Files.createTempDirectory("emulation").toFile()), base); + new Properties(), Files.createTempDirectory("emulation").toFile()), base, + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())); } - public Emulator(SqlStateMachine ssm, Digest base) { + public Emulator(SqlStateMachine ssm, Digest base, ScheduledExecutorService scheduler) throws IOException { this.ssm = ssm; + this.scheduler = scheduler; txnExec = this.ssm.getExecutor(); hash = new AtomicReference<>(base); SecureRandom entropy; @@ -97,7 +102,7 @@ public Emulator(SqlStateMachine ssm, Digest base) { } finally { lock.unlock(); } - }); + }, scheduler); session.setView(new HashedCertifiedBlock(DigestAlgorithm.DEFAULT, CertifiedBlock.newBuilder() .setBlock(Block.newBuilder() .setHeader(