Skip to content

Commit

Permalink
Refactor away sched exec from Session
Browse files Browse the repository at this point in the history
and downstream Mutator, etc.  add retry logic for cancelled and other exceptional transaction submissions
  • Loading branch information
Hellblazer committed Nov 12, 2023
1 parent 2ac0354 commit 360bcc9
Show file tree
Hide file tree
Showing 23 changed files with 317 additions and 293 deletions.
44 changes: 35 additions & 9 deletions choam/src/main/java/com/salesforce/apollo/choam/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* @author hal.hildebrand
Expand All @@ -45,7 +46,10 @@ public class Session {
private final Function<SubmittedTransaction, SubmitResult> service;
private final Map<Digest, SubmittedTransaction> submitted = new ConcurrentHashMap<>();
private final AtomicReference<HashedCertifiedBlock> view = new AtomicReference<>();
private AtomicInteger nonce = new AtomicInteger();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
Thread.ofVirtual()
.factory());
private final AtomicInteger nonce = new AtomicInteger();

public Session(Parameters params, Function<SubmittedTransaction, SubmitResult> service) {
this.params = params;
Expand Down Expand Up @@ -81,6 +85,16 @@ public static boolean verify(Transaction transaction, Verifier verifier) {
transaction.getContent().asReadOnlyByteBuffer());
}

public static <T> CompletableFuture<T> retryNesting(Supplier<CompletableFuture<T>> supplier, int maxRetries) {
CompletableFuture<T> cf = supplier.get();
for (int i = 0; i < maxRetries; i++) {
cf = cf.thenApply(CompletableFuture::completedFuture)
.exceptionally(__ -> supplier.get())
.thenCompose(java.util.function.Function.identity());
}
return cf;
}

/**
* Cancel all pending transactions
*/
Expand All @@ -92,13 +106,31 @@ public void cancelAll() {
* Submit a transaction.
*
* @param transaction - the Message to submit as a transaction
* @param retries - the number of retries for Cancelled transaction submissions
* @param timeout - non-null timeout of the transaction
* @param scheduler
* @return onCompletion - the future result of the submitted transaction
* @throws InvalidTransaction - if the submitted transaction is invalid in any way
*/
public <T> CompletableFuture<T> submit(Message transaction, Duration timeout, ScheduledExecutorService scheduler)
public <T> CompletableFuture<T> submit(Message transaction, int retries, Duration timeout)
throws InvalidTransaction {
return retryNesting(() -> {
try {
return submit(transaction, timeout);
} catch (InvalidTransaction e) {
throw new IllegalStateException("Invalid txn", e);
}
}, retries);
}

/**
* Submit a transaction.
*
* @param transaction - the Message to submit as a transaction
* @param timeout - non-null timeout of the transaction
* @return onCompletion - the future result of the submitted transaction
* @throws InvalidTransaction - if the submitted transaction is invalid in any way
*/
public <T> CompletableFuture<T> submit(Message transaction, Duration timeout) throws InvalidTransaction {
final var txnView = view.get();
if (txnView == null) {
throw new InvalidTransaction("No view available");
Expand Down Expand Up @@ -127,7 +159,6 @@ public <T> CompletableFuture<T> submit(Message transaction, Duration timeout, Sc
submitted.put(stxn.hash(), stxn);

var backoff = params.submitPolicy().build();
boolean submitted = false;
var target = Instant.now().plus(timeout);
int i = 0;

Expand Down Expand Up @@ -167,28 +198,24 @@ public <T> CompletableFuture<T> submit(Message transaction, Duration timeout, Sc
if (params.metrics() != null) {
params.metrics().transactionSubmitRateLimited();
}
break;
}
case BUFFER_FULL -> {
if (params.metrics() != null) {
params.metrics().transactionSubmittedBufferFull();
}
submit.limiter.get().onDropped();
break;
}
case INACTIVE, NO_COMMITTEE -> {
if (params.metrics() != null) {
params.metrics().transactionSubmittedInvalidCommittee();
}
submit.limiter.get().onDropped();
break;
}
case UNAVAILABLE -> {
if (params.metrics() != null) {
params.metrics().transactionSubmittedUnavailable();
}
submit.limiter.get().onIgnore();
break;
}
case INVALID_SUBMIT, ERROR_SUBMITTING -> {
if (params.metrics() != null) {
Expand All @@ -197,7 +224,6 @@ public <T> CompletableFuture<T> submit(Message transaction, Duration timeout, Sc
result.completeExceptionally(
new TransactionFailed("Invalid submission: " + submit.result.getErrorMsg()));
submit.limiter.get().onIgnore();
break;
}
case UNRECOGNIZED, INVALID_RESULT -> {
if (params.metrics() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -103,10 +106,7 @@ public void genesisBootstrap() throws Exception {
.toList());

final var countdown = new CountDownLatch(1);
var transactioneer = new Transactioneer(txneer.getSession(), timeout, 1,
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()),
countdown,
Executors.newSingleThreadExecutor(Thread.ofVirtual().factory()));
var transactioneer = new Transactioneer(txneer.getSession(), timeout, 1, countdown);

transactioneer.start();
assertTrue(countdown.await(30, TimeUnit.SECONDS), "Could not submit transaction");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class SessionTest {

@Test
public void func() throws Exception {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory());
Context<Member> context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getOrigin(), 9, 0.2, 2);
var entropy = SecureRandom.getInstance("SHA1PRNG");
entropy.setSeed(new byte[] { 6, 6, 6 });
Expand Down Expand Up @@ -95,7 +94,7 @@ public void func() throws Exception {
.build()));
final String content = "Give me food or give me slack or kill me";
Message tx = ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8(content)).build();
var result = session.submit(tx, null, exec);
var result = session.submit(tx, null);
assertEquals(1, session.submitted());
gate.countDown();
assertEquals(content, result.get(1, TimeUnit.SECONDS));
Expand Down Expand Up @@ -152,7 +151,7 @@ public void scalingTest() throws Exception {
Message tx = ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8(content)).build();
CompletableFuture<Object> result;
try {
result = session.submit(tx, null, scheduler).whenComplete((r, t) -> {
result = session.submit(tx, null).whenComplete((r, t) -> {
if (t != null) {
if (t instanceof CompletionException ce) {
reg.counter(t.getCause().getClass().getSimpleName()).inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,9 @@ public void submitMultiplTxn() throws Exception {
final var clientCount = LARGE_TESTS ? 1_500 : 50;
final var max = LARGE_TESTS ? 100 : 10;
final var countdown = new CountDownLatch(clientCount * choams.size());

var txExec = Executors.newVirtualThreadPerTaskExecutor();
choams.values().forEach(c -> {
for (int i = 0; i < clientCount; i++) {
transactioneers.add(new Transactioneer(c.getSession(), timeout, max, Executors.newScheduledThreadPool(5,
Thread.ofVirtual()
.factory()),
countdown, txExec));
transactioneers.add(new Transactioneer(c.getSession(), timeout, max, countdown));
}
});

Expand Down
75 changes: 34 additions & 41 deletions choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,28 @@
import java.util.concurrent.atomic.AtomicReference;

class Transactioneer {
private final static Random entropy = new Random();
private final static Logger log = LoggerFactory.getLogger(Transactioneer.class);
private final static Random entropy = new Random();
private final static Logger log = LoggerFactory.getLogger(Transactioneer.class);
private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual()
.factory());
private final AtomicInteger completed = new AtomicInteger();
private final CountDownLatch countdown;
private final List<CompletableFuture<?>> inFlight = new CopyOnWriteArrayList<>();
private final int max;
private final Session session;
private final Duration timeout;
private final ByteMessage tx = ByteMessage.newBuilder()
.setContents(ByteString.copyFromUtf8(
"Give me food or give me slack or kill me"))
.build();
private final AtomicBoolean finished = new AtomicBoolean();

private final AtomicInteger completed = new AtomicInteger();
private final CountDownLatch countdown;
private final List<CompletableFuture<?>> inFlight = new CopyOnWriteArrayList<>();
private final int max;
private final ScheduledExecutorService scheduler;
private final Session session;
private final Duration timeout;
private final ByteMessage tx = ByteMessage.newBuilder()
.setContents(ByteString.copyFromUtf8(
"Give me food or give me slack or kill me"))
.build();
private final Executor txnExecutor;
private final AtomicBoolean finished = new AtomicBoolean();

Transactioneer(Session session, Duration timeout, int max, ScheduledExecutorService scheduler,
CountDownLatch countdown, Executor txnScheduler) {
Transactioneer(Session session, Duration timeout, int max, CountDownLatch countdown) {
this.session = session;
this.timeout = timeout;
this.max = max;
this.scheduler = scheduler;
this.countdown = countdown;
this.txnExecutor = txnScheduler;
}

public int getCompleted() {
Expand All @@ -59,44 +56,40 @@ void decorate(CompletableFuture<?> fs) {
inFlight.remove(futureSailor.get());
if (t != null) {
if (completed.get() < max) {
scheduler.schedule(() -> {
txnExecutor.execute(Utils.wrapped(() -> {
try {
decorate(session.submit(tx, timeout, scheduler));
} catch (InvalidTransaction e) {
throw new IllegalStateException(e);
}
}, log));
}, entropy.nextInt(100), TimeUnit.MILLISECONDS);
scheduler.schedule(() -> executor.execute(Utils.wrapped(() -> {
try {
decorate(session.submit(tx, timeout));
} catch (InvalidTransaction e) {
throw new IllegalStateException(e);
}
}, log)), entropy.nextInt(100), TimeUnit.MILLISECONDS);
}
} else {
if (completed.incrementAndGet() >= max) {
if (finished.compareAndSet(false, true)) {
countdown.countDown();
}
} else {
txnExecutor.execute(Utils.wrapped(() -> {
executor.execute(Utils.wrapped(() -> {
try {
decorate(session.submit(tx, timeout, scheduler));
decorate(session.submit(tx, timeout));
} catch (InvalidTransaction e) {
throw new IllegalStateException(e);
}
}, log));
}
}
}, txnExecutor));
}, executor));
inFlight.add(futureSailor.get());
}

void start() {
scheduler.schedule(() -> {
txnExecutor.execute(Utils.wrapped(() -> {
try {
decorate(session.submit(tx, timeout, scheduler));
} catch (InvalidTransaction e) {
throw new IllegalStateException(e);
}
}, log));
}, 2, TimeUnit.SECONDS);
scheduler.schedule(() -> executor.execute(Utils.wrapped(() -> {
try {
decorate(session.submit(tx, timeout));
} catch (InvalidTransaction e) {
throw new IllegalStateException(e);
}
}, log)), 2, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@
*
*/
public class DemesneIsolate {
private static final Class<? extends Channel> channelType = getChannelType();
private static final AtomicReference<DemesneImpl> demesne = new AtomicReference<>();
private static final EventLoopGroup eventLoopGroup = getEventLoopGroup();
private static final Lock lock = new ReentrantLock();
private static final Logger log = LoggerFactory.getLogger(DemesneIsolate.class);
static {
Expand Down Expand Up @@ -207,13 +205,7 @@ private static boolean launch(JNIEnvironment jniEnv, JClass clazz, @CEntryPoint.
try {
launch(jniEnv, parametersBuff, clazz);
return true;
} catch (InvalidProtocolBufferException e) {
log.error("Cannot launch demesne", e);
return false;
} catch (GeneralSecurityException e) {
log.error("Cannot launch demesne", e);
return false;
} catch (IOException e) {
} catch (GeneralSecurityException | IOException e) {
log.error("Cannot launch demesne", e);
return false;
}
Expand Down Expand Up @@ -285,8 +277,8 @@ private static boolean viewChange(JNIEnvironment jniEnv, JClass clazz,
return false;
}
current.viewChange(Digest.from(viewChange.getView()),
viewChange.getJoiningList().stream().map(j -> EventCoordinates.from(j)).toList(),
viewChange.getLeavingList().stream().map(d -> Digest.from(d)).toList());
viewChange.getJoiningList().stream().map(EventCoordinates::from).toList(),
viewChange.getLeavingList().stream().map(Digest::from).toList());
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.ContextImpl;
import com.salesforce.apollo.membership.stereotomy.ControlledIdentifierMember;
import com.salesforce.apollo.model.Domain.TransactionConfiguration;
import com.salesforce.apollo.model.ProcessDomain;
import com.salesforce.apollo.stereotomy.EventCoordinates;
import com.salesforce.apollo.stereotomy.EventValidation;
Expand Down Expand Up @@ -200,8 +199,6 @@ public void before() throws Exception {
var foundation = Foundation.newBuilder();
identities.keySet().forEach(d -> foundation.addMembership(d.toDigeste()));
var sealed = FoundationSeal.newBuilder().setFoundation(foundation).build();
TransactionConfiguration txnConfig = new TransactionConfiguration(
Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()));
identities.forEach((digest, id) -> {
var context = new ContextImpl<>(DigestAlgorithm.DEFAULT.getLast(), CARDINALITY, 0.2, 3);
final var member = new ControlledIdentifierMember(id);
Expand All @@ -211,7 +208,7 @@ public void before() throws Exception {
.setFoundation(sealed)
.setContext(context)
.setCommunications(localRouter), new InetSocketAddress(0),
commsDirectory, ffParams, txnConfig, EventValidation.NONE,
commsDirectory, ffParams, EventValidation.NONE,
IdentifierSpecification.newBuilder());
domains.add(node);
routers.put(node, localRouter);
Expand Down
Loading

0 comments on commit 360bcc9

Please sign in to comment.