diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index e014625c7..f0970605d 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -1114,8 +1114,8 @@ public record PendingView(Digest diadem, Context context) { */ public View getView(Digest hash) { var builder = View.newBuilder().setDiadem(diadem.toDigeste()).setMajority(context.majority()); - ((Context) context).bftSubset(hash).forEach( - d -> builder.addCommittee(d.getId().toDigeste())); + ((Context) context).bftSubset(hash) + .forEach(d -> builder.addCommittee(d.getId().toDigeste())); return builder.build(); } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java index baf5250ee..96d5cbd37 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/ViewAssembly.java @@ -23,10 +23,7 @@ import org.slf4j.LoggerFactory; import java.security.PublicKey; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -269,6 +266,7 @@ private Map assemblyOf(List committee) { var last = view.pendingViews().last(); return committee.stream() .map(d -> last.context().getMember(Digest.from(d))) + .filter(Objects::nonNull) .collect(Collectors.toMap(Member::getId, m -> m)); } diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java index 74403835d..fcee2a5af 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/LocalServer.java @@ -26,7 +26,6 @@ import java.lang.reflect.Method; import java.util.List; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.function.Predicate; import java.util.function.Supplier; @@ -37,10 +36,10 @@ * @author hal.hildebrand */ public class LocalServer implements RouterSupplier { - private static final Logger log = LoggerFactory.getLogger(LocalServer.class); - private static final String NAME_TEMPLATE = "%s-%s"; - private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); + private static final Logger log = LoggerFactory.getLogger(LocalServer.class); + private static final String NAME_TEMPLATE = "%s-%s"; + private final Executor executor = UnsafeExecutors.newVirtualThreadPerTaskExecutor(); private final ClientInterceptor clientInterceptor; private final Member from; private final String prefix; @@ -78,7 +77,8 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier
  • serverBuilder = InProcessServerBuilder.forName(name) - .executor(Executors.newVirtualThreadPerTaskExecutor()) + .executor( + UnsafeExecutors.newVirtualThreadPerTaskExecutor()) .intercept(ConcurrencyLimitServerInterceptor.newBuilder( limitsBuilder.build()) .statusSupplier( diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java new file mode 100644 index 000000000..1b0e35cb6 --- /dev/null +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/UnsafeExecutors.java @@ -0,0 +1,125 @@ +package com.salesforce.apollo.archipelago; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.lang.invoke.MethodHandles.insertArguments; +import static java.lang.invoke.MethodType.methodType; + +@SuppressWarnings("unused") +public class UnsafeExecutors { + private static final MethodHandle SET_EXECUTOR; + + static { + try { + var unsafeClass = Class.forName("sun.misc.Unsafe"); + var unsafeField = unsafeClass.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + var unsafe = unsafeField.get(null); + var objectFieldOffset = unsafeClass.getMethod("objectFieldOffset", Field.class); + var executorField = VTB.class.getDeclaredField("executor"); + executorField.setAccessible(true); + var executorOffset = (long) objectFieldOffset.invoke(unsafe, executorField); + var putObject = MethodHandles.lookup() + .findVirtual(unsafeClass, "putObject", + methodType(void.class, Object.class, long.class, Object.class)); + var setExecutor = insertArguments(insertArguments(putObject, 2, executorOffset), 0, unsafe); + SET_EXECUTOR = setExecutor; + } catch (ClassNotFoundException | NoSuchFieldException | NoSuchMethodException | IllegalAccessException | + InvocationTargetException e) { + throw new AssertionError(e); + } + } + + public static ExecutorService newVirtualThreadPerTaskExecutor() { + return virtualThreadExecutor(new ForkJoinPool()); + } + + public static B configureBuilderExecutor(B builder, Executor executor) { + if (executor != null) { + setExecutor(builder, executor); + } + return builder; + } + + public static ExecutorService virtualThreadExecutor(ExecutorService executor) { + Objects.requireNonNull(executor); + return new VirtualThreadExecutor(executor); + } + + private static void setExecutor(Object builder, Object executor) { + try { + SET_EXECUTOR.invokeExact(builder, executor); + } catch (Throwable e) { + throw new AssertionError(e); + } + } + + private static class BTB { + private int characteristics; + private long counter; + private String name; + private UncaughtExceptionHandler uhe; + } + + private static class VirtualThreadExecutor extends AbstractExecutorService { + private final ExecutorService executor; + private final AtomicBoolean started = new AtomicBoolean(true); + + public VirtualThreadExecutor(ExecutorService executor) { + this.executor = executor; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return true; + } + + @Override + public void execute(Runnable command) { + if (!started.get()) { + throw new RejectedExecutionException("Executor shutdown"); + } + var builder = Thread.ofVirtual(); + setExecutor(builder, executor); + builder.start(command); + } + + @Override + public boolean isShutdown() { + return executor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return !executor.isTerminated(); + } + + @Override + public void shutdown() { + if (!started.compareAndSet(true, false)) { + return; + } + executor.shutdown(); + } + + @Override + public List shutdownNow() { + if (!started.compareAndSet(true, false)) { + return List.of(); + } + return executor.shutdownNow(); + } + } + + private static class VTB extends BTB { + private Executor executor; + } +} diff --git a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java index 4b255669c..137db0a44 100644 --- a/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java +++ b/memberships/src/main/java/com/salesforce/apollo/ring/SliceIterator.java @@ -18,10 +18,7 @@ import java.io.IOException; import java.time.Duration; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -51,6 +48,7 @@ public SliceIterator(String label, SigningMember member, Collection s, CommonCommunications comm, ScheduledExecutorService scheduler) { assert member != null && s != null && comm != null; + assert !s.stream().filter(Objects::nonNull).toList().isEmpty() : "All elements must be non-null: " + s; this.label = label; this.member = member; this.slice = new CopyOnWriteArrayList<>(s); @@ -58,7 +56,7 @@ public SliceIterator(String label, SigningMember member, Collection is: {} on: {}", label, slice.stream().map(m -> m.getId()).toList(), member.getId()); + log.debug("Slice for: <{}> is: {} on: {}", label, slice.stream().map(Member::getId).toList(), member.getId()); } public void iterate(BiFunction round, SlicePredicateHandler handler, diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/UnsafeExecutorsTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/UnsafeExecutorsTest.java new file mode 100644 index 000000000..a26a3c7cd --- /dev/null +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/UnsafeExecutorsTest.java @@ -0,0 +1,90 @@ +package com.salesforce.apollo.archipelago; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class UnsafeExecutorsTest { + private static String carrierThreadName() { + var name = Thread.currentThread().toString(); + var index = name.lastIndexOf('@'); + if (index == -1) { + throw new AssertionError(); + } + return name.substring(index + 1); + } + + @Test + public void virtualThreadExecutorSingleThreadExecutor() throws InterruptedException { + var executor = Executors.newSingleThreadExecutor(); + var virtualExecutor = UnsafeExecutors.virtualThreadExecutor(executor); + var carrierThreadNames = new CopyOnWriteArraySet(); + for (var i = 0; i < 10; i++) { + virtualExecutor.execute(() -> carrierThreadNames.add(carrierThreadName())); + } + executor.shutdown(); + executor.awaitTermination(1, TimeUnit.DAYS); + assertEquals(1, carrierThreadNames.size()); + } + + @Test + void testVirtualThread() { + Queue executor = new ArrayDeque<>(); + var virtualExecutor = UnsafeExecutors.virtualThreadExecutor(wrap(executor::add)); + + Lock lock = new ReentrantLock(); + lock.lock(); + virtualExecutor.execute(lock::lock); + assertEquals(1, executor.size(), "runnable for vthread has not been submitted"); + executor.poll().run(); + assertEquals(0, executor.size(), "vthread has not blocked"); + lock.unlock(); + assertEquals(1, executor.size(), "vthread is not schedulable"); + executor.poll().run(); + assertFalse(lock.tryLock(), "the virtual thread does not hold the lock"); + } + + private ExecutorService wrap(Executor ex) { + return new AbstractExecutorService() { + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public void execute(Runnable command) { + System.out.println("Yes!"); + ex.execute(command); + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public void shutdown() { + + } + + @Override + public List shutdownNow() { + return List.of(); + } + }; + } +}