From 8cf2dc11bce1a32b6f10853028c65a43b5bcd590 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Sun, 5 Nov 2023 17:12:45 -0800 Subject: [PATCH 1/2] don't force update snapshots --- .github/workflows/maven.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index a1d29f77b..6c7966b99 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -20,4 +20,4 @@ jobs: cache: 'maven' github-token: ${{ secrets.GITHUB_TOKEN }} - name: Build with Maven - run: mvn -batch-mode --update-snapshots clean install -Ppre --file pom.xml + run: mvn -batch-mode clean install -Ppre --file pom.xml From 5c5e5cf461edab2f48c30416847c73445941a0b0 Mon Sep 17 00:00:00 2001 From: Hellblazer Date: Thu, 9 Nov 2023 12:02:10 -0800 Subject: [PATCH 2/2] proactive transaction cancellation instead of timeouts --- .../com/salesforce/apollo/choam/CHOAM.java | 4 +- .../salesforce/apollo/choam/Parameters.java | 13 +- .../com/salesforce/apollo/choam/Session.java | 253 +++++--- .../apollo/choam/support/ChoamMetrics.java | 14 +- .../choam/support/ChoamMetricsImpl.java | 78 ++- .../choam/support/SubmittedTransaction.java | 11 +- .../choam/support/TransactionCancelled.java | 27 - .../salesforce/apollo/choam/SessionTest.java | 18 + .../apollo/choam/Transactioneer.java | 29 +- grpc/src/main/proto/choam.proto | 1 + .../salesforce/apollo/model/DomainTest.java | 63 +- .../limits/limiter/AbstractLimiter.java | 96 ++- .../com/salesforce/apollo/state/Emulator.java | 44 +- .../apollo/state/SqlStateMachine.java | 547 ++++++++---------- .../salesforce/apollo/state/CHOAMTest.java | 6 +- .../apollo/state/Transactioneer.java | 63 +- sql-state/src/test/resources/logback-test.xml | 4 +- 17 files changed, 688 insertions(+), 583 deletions(-) delete mode 100644 choam/src/main/java/com/salesforce/apollo/choam/support/TransactionCancelled.java diff --git a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java index c3c2e1718..9d6f77f5a 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java @@ -33,6 +33,7 @@ import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.MessageAdapter; import com.salesforce.apollo.membership.messaging.rbc.ReliableBroadcaster.Msg; import com.salesforce.apollo.utils.RoundScheduler; +import com.salesforce.apollo.utils.Utils; import com.salesforce.apollo.utils.bloomFilters.BloomFilter; import io.grpc.StatusRuntimeException; import org.h2.mvstore.MVMap; @@ -107,7 +108,7 @@ public CHOAM(Parameters params) { Thread.ofVirtual().name("Linear " + params.member().getId()).factory()); combine.registerHandler((ctx, messages) -> { try { - linear.execute(() -> combine(messages)); + linear.execute(Utils.wrapped(() -> combine(messages), log)); } catch (RejectedExecutionException e) { // ignore } @@ -683,6 +684,7 @@ private void reconfigure(Reconfigure reconfigure) { } else { current.set(new Client(validators, getViewId())); } + session.setView(h); log.info("Reconfigured to view: {} validators: {} on: {}", new Digest(reconfigure.getId()), validators.entrySet() .stream() diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java b/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java index b63ca8a90..2c78c068a 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Parameters.java @@ -556,11 +556,11 @@ public Builder setMaxGossipDelay(Duration maxGossipDelay) { public static class LimiterBuilder { private Duration backlogDuration = Duration.ofSeconds(1); private int backlogSize = 1_000; - private double backoffRatio = 0.9; + private double backoffRatio = 0.5; private int initialLimit = 1_000; - private int maxLimit = 5_000; - private int minLimit = 1_000; - private Duration timeout = Duration.ofMillis(100); + private int maxLimit = 10_000; + private int minLimit = 1_00; + private Duration timeout = Duration.ofSeconds(1); public Limiter build(String name, MetricRegistry metrics) { final SimpleLimiter limiter = SimpleLimiter.newBuilder() @@ -668,12 +668,11 @@ public static class Builder implements Cloneable { private int regenerationCycles = 20; private ExponentialBackoffPolicy.Builder submitPolicy = ExponentialBackoffPolicy.newBuilder() .setInitialBackoff( - Duration.ofMillis(10)) + Duration.ofMillis(500)) .setJitter(0.2) .setMultiplier(1.6) .setMaxBackoff( - Duration.ofMillis( - 500)); + Duration.ofSeconds(5)); private Duration submitTimeout = Duration.ofSeconds(30); private int synchronizationCycles = 10; private LimiterBuilder txnLimiterBuilder = new LimiterBuilder(); diff --git a/choam/src/main/java/com/salesforce/apollo/choam/Session.java b/choam/src/main/java/com/salesforce/apollo/choam/Session.java index f8bdc44ed..57485943b 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/Session.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/Session.java @@ -6,44 +6,56 @@ */ package com.salesforce.apollo.choam; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.time.Instant; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.codahale.metrics.Timer; import com.google.common.base.Function; import com.google.protobuf.Message; import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; import com.salesfoce.apollo.choam.proto.SubmitResult; -import com.salesfoce.apollo.choam.proto.SubmitResult.Result; import com.salesfoce.apollo.choam.proto.Transaction; +import com.salesforce.apollo.choam.support.HashedCertifiedBlock; import com.salesforce.apollo.choam.support.InvalidTransaction; import com.salesforce.apollo.choam.support.SubmittedTransaction; -import com.salesforce.apollo.choam.support.TransactionCancelled; import com.salesforce.apollo.choam.support.TransactionFailed; import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.JohnHancock; import com.salesforce.apollo.crypto.Signer; import com.salesforce.apollo.crypto.Verifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * @author hal.hildebrand - * */ public class Session { - private final static Logger log = LoggerFactory.getLogger(Session.class); + private final static Logger log = LoggerFactory.getLogger( + Session.class); + private final Limiter limiter; + private final Parameters params; + private final Function service; + private final Map submitted = new ConcurrentHashMap<>(); + private final AtomicReference view = new AtomicReference<>(); + private AtomicInteger nonce = new AtomicInteger(); + + public Session(Parameters params, Function service) { + this.params = params; + this.service = service; + final var metrics = params.metrics(); + this.limiter = params.txnLimiterBuilder() + .build(params.member().getId().shortString(), + metrics == null ? EmptyMetricRegistry.INSTANCE : metrics.getMetricRegistry( + params.context().getId().shortString() + ".txnLimiter")); + } public static Transaction transactionOf(Digest source, int nonce, Message message, Signer signer) { ByteBuffer buff = ByteBuffer.allocate(4); @@ -69,45 +81,28 @@ public static boolean verify(Transaction transaction, Verifier verifier) { transaction.getContent().asReadOnlyByteBuffer()); } - private final Limiter limiter; - private AtomicInteger nonce = new AtomicInteger(); - private final Parameters params; - private final Function service; - private final Map submitted = new ConcurrentHashMap<>(); - - public Session(Parameters params, Function service) { - this.params = params; - this.service = service; - final var metrics = params.metrics(); - this.limiter = params.txnLimiterBuilder() - .build(params.member().getId().shortString(), - metrics == null ? EmptyMetricRegistry.INSTANCE - : metrics.getMetricRegistry(params.context().getId().shortString() - + ".txnLimiter")); - } - /** * Cancel all pending transactions */ public void cancelAll() { - submitted.values() - .forEach(stx -> stx.onCompletion() - .completeExceptionally(new TransactionCancelled("Transaction cancelled"))); + submitted.values().forEach(stx -> stx.onCompletion().cancel(true)); } /** * Submit a transaction. - * + * * @param transaction - the Message to submit as a transaction - * @param timeout - non null timeout of the transaction + * @param timeout - non-null timeout of the transaction * @param scheduler - * * @return onCompletion - the future result of the submitted transaction - * @throws InvalidTransaction - if the submitted transaction is invalid in any - * way + * @throws InvalidTransaction - if the submitted transaction is invalid in any way */ - public CompletableFuture submit(Message transaction, Duration timeout, - ScheduledExecutorService scheduler) throws InvalidTransaction { + public CompletableFuture submit(Message transaction, Duration timeout, ScheduledExecutorService scheduler) + throws InvalidTransaction { + final var txnView = view.get(); + if (txnView == null) { + throw new InvalidTransaction("No view available"); + } final int n = nonce.getAndIncrement(); final var txn = transactionOf(params.member().getId(), n, transaction, params.member()); @@ -117,66 +112,150 @@ public CompletableFuture submit(Message transaction, Duration timeout, var hash = CHOAM.hashOf(txn, params.digestAlgorithm()); final var timer = params.metrics() == null ? null : params.metrics().transactionLatency().time(); - var result = new CompletableFuture(); + var result = new CompletableFuture().whenComplete((r, t) -> { + if (params.metrics() != null) { + if (t instanceof CancellationException) { + params.metrics().transactionCancelled(); + } + } + }); if (timeout == null) { timeout = params.submitTimeout(); } - var stxn = new SubmittedTransaction(hash, txn, result, timer); + var stxn = new SubmittedTransaction(txnView.height(), hash, txn, result, timer); submitted.put(stxn.hash(), stxn); var backoff = params.submitPolicy().build(); boolean submitted = false; var target = Instant.now().plus(timeout); int i = 0; - while (Instant.now().isBefore(target)) { - log.debug("Submitting: {} retry: {} on: {}", stxn.hash(), i, params.member().getId()); - if (submit(stxn)) { - submitted = true; + + while (!result.isDone() && Instant.now().isBefore(target)) { + if (i > 0) { + if (params.metrics() != null) { + params.metrics().transactionSubmitRetry(); + } + } + log.trace("Submitting: {} retry: {} on: {}", stxn.hash(), i, params.member().getId()); + var submit = submit(stxn); + switch (submit.result.getResult()) { + case PUBLISHED -> { + submit.limiter.get().onSuccess(); + log.trace("Transaction submitted: {} on: {}", stxn.hash(), params.member().getId()); + if (params.metrics() != null) { + params.metrics().transactionSubmittedSuccess(); + } + var futureTimeout = scheduler.schedule(() -> { + if (result.isDone()) { + return; + } + log.debug("Timeout of txn: {} on: {}", hash, params.member().getId()); + final var to = new TimeoutException("Transaction timeout"); + result.completeExceptionally(to); + if (params.metrics() != null) { + params.metrics().transactionComplete(to); + } + }, timeout.toMillis(), TimeUnit.MILLISECONDS); + + return result.whenComplete((r, t) -> { + futureTimeout.cancel(true); + complete(hash, timer, t); + }); + } + case RATE_LIMITED -> { + if (params.metrics() != null) { + params.metrics().transactionSubmitRateLimited(); + } + break; + } + case BUFFER_FULL -> { + if (params.metrics() != null) { + params.metrics().transactionSubmittedBufferFull(); + } + submit.limiter.get().onDropped(); break; } + case INACTIVE, NO_COMMITTEE -> { + if (params.metrics() != null) { + params.metrics().transactionSubmittedInvalidCommittee(); + } + submit.limiter.get().onDropped(); + break; + } + case UNAVAILABLE -> { + if (params.metrics() != null) { + params.metrics().transactionSubmittedUnavailable(); + } + submit.limiter.get().onIgnore(); + break; + } + case INVALID_SUBMIT, ERROR_SUBMITTING -> { + if (params.metrics() != null) { + params.metrics().transactionSubmissionError(); + } + result.completeExceptionally( + new TransactionFailed("Invalid submission: " + submit.result.getErrorMsg())); + submit.limiter.get().onIgnore(); + break; + } + case UNRECOGNIZED, INVALID_RESULT -> { + if (params.metrics() != null) { + params.metrics().transactionSubmittedInvalidResult(); + } + var ex = new TransactionFailed("Unrecognized or invalid result: " + submit.result.getErrorMsg()); + result.completeExceptionally(ex); + submit.limiter.get().onIgnore(); + return result; + } + default -> { + if (params.metrics() != null) { + params.metrics().transactionSubmittedInvalidResult(); + } + var ex = new TransactionFailed("Illegal result: " + submit.result.getErrorMsg()); + result.completeExceptionally(ex); + submit.limiter.get().onIgnore(); + return result; + } + } try { final var delay = backoff.nextBackoff(); - log.debug("Failed submitting: {} retry: {} delay: {}ms on: {}", stxn.hash(), i, delay.toMillis(), - params.member().getId()); + log.debug("Failed submitting: {} result: {} retry: {} delay: {}ms on: {}", stxn.hash(), submit.result, + i, delay.toMillis(), params.member().getId()); Thread.sleep(delay.toMillis()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - } - if (params.metrics() != null) { - params.metrics().transactionSubmitRetry(); + return null; } i++; } - if (!submitted) { - if (params.metrics() != null) { - params.metrics().transactionSubmittedBufferFull(); - } - result.completeExceptionally(new TransactionFailed("Buffer Full")); + if (result.isDone()) { return result; } - var futureTimeout = scheduler.schedule(() -> { - if (result.isDone()) { - return; - } - log.debug("Timeout of txn: {} on: {}", hash, params.member().getId()); - final var to = new TimeoutException("Transaction timeout"); - result.completeExceptionally(to); - if (params.metrics() != null) { - params.metrics().transactionComplete(to); - } - }, timeout.toMillis(), TimeUnit.MILLISECONDS); - return result.whenComplete((r, t) -> { - futureTimeout.cancel(true); - complete(hash, timer, t); - }); + if (params.metrics() != null) { + params.metrics().transactionSubmitRetriesExhausted(); + } + result.completeExceptionally(new TransactionFailed("Submission retries exhausted")); + return result; } public int submitted() { return submitted.size(); } + public void setView(HashedCertifiedBlock v) { + view.set(v); + var currentHeight = v.height(); + for (var it = submitted.entrySet().iterator(); it.hasNext(); ) { + var e = it.next(); + if (e.getValue().view().compareTo(currentHeight) < 0) { + e.getValue().onCompletion().cancel(true); + it.remove(); + } + } + } + SubmittedTransaction complete(Digest hash) { final SubmittedTransaction stxn = submitted.remove(hash); if (stxn != null) { @@ -194,28 +273,20 @@ private void complete(Digest hash, final Timer.Context timer, Throwable t) { } } - private boolean submit(SubmittedTransaction stx) { + private Submission submit(SubmittedTransaction stx) { var listener = limiter.acquire(null); if (listener.isEmpty()) { log.debug("Transaction submission: {} rejected on: {}", stx.hash(), params.member().getId()); if (params.metrics() != null) { params.metrics().transactionSubmittedFail(); } - stx.onCompletion().completeExceptionally(new TransactionFailed("Transaction submission rejected")); - return false; + stx.onCompletion().completeExceptionally(new TransactionFailed("Transaction rate limited")); + return new Submission(SubmitResult.newBuilder().setResult(SubmitResult.Result.RATE_LIMITED).build(), + listener); } - var result = service.apply(stx); + return new Submission(service.apply(stx), listener); + } - if (result.getResult() == Result.PUBLISHED) { - listener.get().onSuccess(); - log.trace("Transaction submitted: {} on: {}", stx.hash(), params.member().getId()); - if (params.metrics() != null) { - params.metrics().transactionSubmittedSuccess(); - } - } else { - listener.get().onDropped(); - return false; - } - return true; + private record Submission(SubmitResult result, Optional limiter) { } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetrics.java b/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetrics.java index e3241979f..f02d5acf4 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetrics.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetrics.java @@ -14,7 +14,6 @@ /** * @author hal.hildebrand - * */ public interface ChoamMetrics extends EndpointMetrics { @@ -44,4 +43,17 @@ public interface ChoamMetrics extends EndpointMetrics { void transactionTimeout(); + void transactionSubmittedInvalidCommittee(); + + void transactionSubmittedUnavailable(); + + void transactionSubmissionError(); + + void transactionSubmittedInvalidResult(); + + void transactionSubmitRetriesExhausted(); + + void transactionSubmitRateLimited(); + + void transactionCancelled(); } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetricsImpl.java b/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetricsImpl.java index 48de6b706..e762e225f 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetricsImpl.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/ChoamMetricsImpl.java @@ -6,15 +6,7 @@ */ package com.salesforce.apollo.choam.support; -import static com.codahale.metrics.MetricRegistry.name; - -import java.util.concurrent.TimeoutException; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; +import com.codahale.metrics.*; import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.ethereal.memberships.comm.EtherealMetrics; import com.salesforce.apollo.ethereal.memberships.comm.EtherealMetricsImpl; @@ -23,13 +15,18 @@ import com.salesforce.apollo.protocols.EndpointMetricsImpl; import com.salesforce.apollo.protocols.LimitsRegistry; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeoutException; + +import static com.codahale.metrics.MetricRegistry.name; + /** * @author hal.hildebrand - * */ public class ChoamMetricsImpl extends EndpointMetricsImpl implements ChoamMetrics { private final RbcMetrics combineMetrics; + private final Meter cancelledTransactions; private final Meter completedTransactions; private final Counter droppedReassemblies; private final Counter droppedTransactions; @@ -47,7 +44,14 @@ public class ChoamMetricsImpl extends EndpointMetricsImpl implements ChoamMetric private final Meter transactionSubmitRetry; private final Meter transactionSubmitSuccess; private final Meter transactionSubmittedBufferFull; + private final Meter transactionSubmittedInvalidCommittee; private final Meter transactionTimeout; + private final Meter transactionSubmittedUnavailable; + private final Meter transactionSubmissionError; + private final Meter transactionSubmittedInvalidResult; + private final Meter transactionSubmitRetriesExhausted; + private final Meter transactionSubmitRateLimited; + private final Meter transactionCancelled; public ChoamMetricsImpl(Digest context, MetricRegistry registry) { super(registry); @@ -59,6 +63,8 @@ public ChoamMetricsImpl(Digest context, MetricRegistry registry) { droppedTransactions = registry.counter(name(context.shortString(), "transactions.dropped")); droppedReassemblies = registry.counter(name(context.shortString(), "reassemblies.dropped")); droppedValidations = registry.counter(name(context.shortString(), "validations.dropped")); + + cancelledTransactions = registry.meter(name(context.shortString(), "transactions.cancelled")); publishedTransactions = registry.meter(name(context.shortString(), "transactions.published")); publishedBytes = registry.histogram(name(context.shortString(), "unit.bytes")); publishedReassemblies = registry.meter(name(context.shortString(), "reassemblies.published")); @@ -71,6 +77,18 @@ public ChoamMetricsImpl(Digest context, MetricRegistry registry) { completedTransactions = registry.meter(name(context.shortString(), "transactions.completed")); failedTransactions = registry.meter(name(context.shortString(), "transactions.failed")); transactionSubmittedBufferFull = registry.meter(name(context.shortString(), "transaction.submit.buffer.full")); + transactionSubmittedInvalidCommittee = registry.meter( + name(context.shortString(), "transaction.submit.invalid.committee")); + transactionSubmittedUnavailable = registry.meter(name(context.shortString(), "transaction.submit.unavailable")); + transactionSubmissionError = registry.meter(name(context.shortString(), "transaction.submit.error")); + transactionSubmittedInvalidResult = registry.meter( + name(context.shortString(), "transaction.submit.invalid.result")); + transactionSubmitRetriesExhausted = registry.meter( + name(context.shortString(), "transaction.submit.retries.exhausted")); + transactionSubmitRateLimited = registry.meter( + name(context.shortString(), "transaction.submit.rate.limited")); + transactionCancelled = registry.meter( + name(context.shortString(), "transaction.submit.cancelled")); } @Override @@ -113,9 +131,8 @@ public void transactionComplete(Throwable t) { if (t != null) { if (t instanceof TimeoutException) { transactionTimeout.mark(); - - } else if (t instanceof TransactionCancelled) { - // ignore + } else if (t instanceof CancellationException) { + cancelledTransactions.mark(); } else { failedTransactions.mark(); } @@ -153,4 +170,39 @@ public void transactionSubmittedSuccess() { public void transactionTimeout() { transactionTimeout.mark(); } + + @Override + public void transactionSubmittedInvalidCommittee() { + transactionSubmittedInvalidCommittee.mark(); + } + + @Override + public void transactionSubmittedUnavailable() { + transactionSubmittedUnavailable.mark(); + } + + @Override + public void transactionSubmissionError() { + transactionSubmissionError.mark(); + } + + @Override + public void transactionSubmittedInvalidResult() { + transactionSubmittedInvalidResult.mark(); + } + + @Override + public void transactionSubmitRetriesExhausted() { + transactionSubmitRetriesExhausted.mark(); + } + + @Override + public void transactionSubmitRateLimited() { + transactionSubmitRateLimited.mark(); + } + + @Override + public void transactionCancelled() { + transactionCancelled.mark(); + } } diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/SubmittedTransaction.java b/choam/src/main/java/com/salesforce/apollo/choam/support/SubmittedTransaction.java index c40c9c721..1a9474bda 100644 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/SubmittedTransaction.java +++ b/choam/src/main/java/com/salesforce/apollo/choam/support/SubmittedTransaction.java @@ -6,16 +6,17 @@ */ package com.salesforce.apollo.choam.support; -import java.util.concurrent.CompletableFuture; - import com.codahale.metrics.Timer; import com.salesfoce.apollo.choam.proto.Transaction; import com.salesforce.apollo.crypto.Digest; +import org.joou.ULong; + +import java.util.concurrent.CompletableFuture; /** * @author hal.hildebrand - * */ @SuppressWarnings("rawtypes") -public record SubmittedTransaction(Digest hash, Transaction transaction, CompletableFuture onCompletion, - Timer.Context timer) {} +public record SubmittedTransaction(ULong view, Digest hash, Transaction transaction, CompletableFuture onCompletion, + Timer.Context timer) { +} diff --git a/choam/src/main/java/com/salesforce/apollo/choam/support/TransactionCancelled.java b/choam/src/main/java/com/salesforce/apollo/choam/support/TransactionCancelled.java deleted file mode 100644 index 0c5429588..000000000 --- a/choam/src/main/java/com/salesforce/apollo/choam/support/TransactionCancelled.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2021, salesforce.com, inc. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ -package com.salesforce.apollo.choam.support; - -/** - * @author hal.hildebrand - * - */ -public class TransactionCancelled extends Exception { - - private static final long serialVersionUID = 1L; - - public TransactionCancelled() { - } - - public TransactionCancelled(String message, Throwable cause) { - super(message, cause); - } - - public TransactionCancelled(String message) { - super(message); - } -} diff --git a/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java b/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java index bcaf8bdb2..f74b5cb00 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/SessionTest.java @@ -13,10 +13,14 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import com.salesfoce.apollo.choam.proto.Block; +import com.salesfoce.apollo.choam.proto.CertifiedBlock; +import com.salesfoce.apollo.choam.proto.Header; import com.salesfoce.apollo.choam.proto.SubmitResult; import com.salesfoce.apollo.choam.proto.SubmitResult.Result; import com.salesfoce.apollo.test.proto.ByteMessage; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; +import com.salesforce.apollo.choam.support.HashedCertifiedBlock; import com.salesforce.apollo.choam.support.InvalidTransaction; import com.salesforce.apollo.choam.support.SubmittedTransaction; import com.salesforce.apollo.crypto.DigestAlgorithm; @@ -82,6 +86,13 @@ public void func() throws Exception { return SubmitResult.newBuilder().setResult(Result.PUBLISHED).build(); }; Session session = new Session(params, service); + session.setView(new HashedCertifiedBlock(DigestAlgorithm.DEFAULT, CertifiedBlock.newBuilder() + .setBlock(Block.newBuilder() + .setHeader( + Header.newBuilder() + .setHeight( + 100))) + .build())); final String content = "Give me food or give me slack or kill me"; Message tx = ByteMessage.newBuilder().setContents(ByteString.copyFromUtf8(content)).build(); var result = session.submit(tx, null, exec); @@ -127,6 +138,13 @@ public void scalingTest() throws Exception { Timer latency = reg.timer("Transaction latency"); Session session = new Session(params, service); + session.setView(new HashedCertifiedBlock(DigestAlgorithm.DEFAULT, CertifiedBlock.newBuilder() + .setBlock(Block.newBuilder() + .setHeader( + Header.newBuilder() + .setHeight( + 100))) + .build())); List> futures = new ArrayList<>(); IntStream.range(0, 10000).forEach(i -> { final var time = latency.time(); diff --git a/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java b/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java index 32f4d2b71..93a16d7f4 100644 --- a/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java +++ b/choam/src/test/java/com/salesforce/apollo/choam/Transactioneer.java @@ -6,26 +6,21 @@ */ package com.salesforce.apollo.choam; +import com.google.protobuf.ByteString; +import com.salesfoce.apollo.test.proto.ByteMessage; +import com.salesforce.apollo.choam.support.InvalidTransaction; +import com.salesforce.apollo.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.Duration; import java.util.List; import java.util.Random; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; -import com.salesfoce.apollo.test.proto.ByteMessage; -import com.salesforce.apollo.choam.support.InvalidTransaction; -import com.salesforce.apollo.utils.Utils; - class Transactioneer { private final static Random entropy = new Random(); private final static Logger log = LoggerFactory.getLogger(Transactioneer.class); @@ -38,9 +33,11 @@ class Transactioneer { private final Session session; private final Duration timeout; private final ByteMessage tx = ByteMessage.newBuilder() - .setContents(ByteString.copyFromUtf8("Give me food or give me slack or kill me")) + .setContents(ByteString.copyFromUtf8( + "Give me food or give me slack or kill me")) .build(); private final Executor txnExecutor; + private final AtomicBoolean finished = new AtomicBoolean(); Transactioneer(Session session, Duration timeout, int max, ScheduledExecutorService scheduler, CountDownLatch countdown, Executor txnScheduler) { @@ -74,7 +71,7 @@ void decorate(CompletableFuture fs) { } } else { if (completed.incrementAndGet() >= max) { - if (inFlight.size() == 0) { + if (finished.compareAndSet(false, true)) { countdown.countDown(); } } else { diff --git a/grpc/src/main/proto/choam.proto b/grpc/src/main/proto/choam.proto index b52ff9c8b..657a6aee6 100644 --- a/grpc/src/main/proto/choam.proto +++ b/grpc/src/main/proto/choam.proto @@ -34,6 +34,7 @@ message SubmitResult { UNAVAILABLE = 6; INVALID_SUBMIT = 7; ERROR_SUBMITTING = 8; + RATE_LIMITED = 9; } Result result = 1; string errorMsg = 2; diff --git a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java index 96c4a332d..787c5553e 100644 --- a/model/src/test/java/com/salesforce/apollo/model/DomainTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/DomainTest.java @@ -41,8 +41,9 @@ import java.util.Arrays; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -58,6 +59,16 @@ public class DomainTest { private final ArrayList domains = new ArrayList<>(); private final ArrayList routers = new ArrayList<>(); + static CompletableFuture retryNesting(Supplier> supplier, int maxRetries) { + CompletableFuture cf = supplier.get(); + for (int i = 0; i < maxRetries; i++) { + cf = cf.thenApply(CompletableFuture::completedFuture) + .exceptionally(__ -> supplier.get()) + .thenCompose(Function.identity()); + } + return cf; + } + public static void smoke(Oracle oracle) throws Exception { // Namespace var ns = Oracle.namespace("my-org"); @@ -90,16 +101,23 @@ public static void smoke(Oracle oracle) throws Exception { var burcu = ns.subject("Burcu"); // Map direct edges. Transitive edges added as a side effect - CompletableFuture.allOf(oracle.map(helpDeskMembers, adminMembers), oracle.map(ali, adminMembers), - oracle.map(ali, userMembers), oracle.map(burcu, userMembers), - oracle.map(can, userMembers), oracle.map(managerMembers, userMembers), - oracle.map(technicianMembers, userMembers), oracle.map(demet, helpDeskMembers), - oracle.map(egin, helpDeskMembers), oracle.map(egin, userMembers), - oracle.map(fuat, managerMembers), oracle.map(gl, managerMembers), - oracle.map(hakan, technicianMembers), oracle.map(irmak, technicianMembers), - oracle.map(abcTechMembers, technicianMembers), - oracle.map(flaggedTechnicianMembers, technicianMembers), - oracle.map(jale, abcTechMembers)).get(); + retryNesting(() -> oracle.map(helpDeskMembers, adminMembers), 3).get(); + retryNesting(() -> oracle.map(ali, adminMembers), 3).get(); + retryNesting(() -> oracle.map(ali, userMembers), 3).get(); + retryNesting(() -> oracle.map(burcu, userMembers), 3).get(); + retryNesting(() -> oracle.map(can, userMembers), 3).get(); + retryNesting(() -> oracle.map(managerMembers, userMembers), 3).get(); + retryNesting(() -> oracle.map(technicianMembers, userMembers), 3).get(); + retryNesting(() -> oracle.map(demet, helpDeskMembers), 3).get(); + retryNesting(() -> oracle.map(egin, helpDeskMembers), 3).get(); + retryNesting(() -> oracle.map(egin, userMembers), 3).get(); + retryNesting(() -> oracle.map(fuat, managerMembers), 3).get(); + retryNesting(() -> oracle.map(gl, managerMembers), 3).get(); + retryNesting(() -> oracle.map(hakan, technicianMembers), 3).get(); + retryNesting(() -> oracle.map(irmak, technicianMembers), 3).get(); + retryNesting(() -> oracle.map(abcTechMembers, technicianMembers), 3).get(); + retryNesting(() -> oracle.map(flaggedTechnicianMembers, technicianMembers), 3).get(); + retryNesting(() -> oracle.map(jale, abcTechMembers), 3).get(); // Protected resource namespace var docNs = Oracle.namespace("Document"); @@ -110,7 +128,7 @@ public static void smoke(Oracle oracle) throws Exception { // Users can View Document 123 Assertion tuple = userMembers.assertion(object123View); - oracle.add(tuple).get(); + retryNesting(() -> oracle.add(tuple), 3).get(); // Direct subjects that can View the document var viewers = oracle.read(object123View); @@ -124,7 +142,7 @@ public static void smoke(Oracle oracle) throws Exception { // Assert flagged technicians can directly view the document Assertion grantTechs = flaggedTechnicianMembers.assertion(object123View); - oracle.add(grantTechs).get(); + retryNesting(() -> oracle.add(grantTechs), 3).get(); // Now have 2 direct subjects that can view the doc viewers = oracle.read(object123View); @@ -163,22 +181,22 @@ public static void smoke(Oracle oracle) throws Exception { assertFalse(oracle.check(object123View.assertion(helpDeskMembers))); // Remove them - oracle.remove(abcTechMembers, technicianMembers).get(); + retryNesting(() -> oracle.remove(abcTechMembers, technicianMembers), 3).get(); assertFalse(oracle.check(object123View.assertion(jale))); assertTrue(oracle.check(object123View.assertion(egin))); assertFalse(oracle.check(object123View.assertion(helpDeskMembers))); // Remove our assertion - oracle.delete(tuple).get(); + retryNesting(() -> oracle.delete(tuple), 3).get(); assertFalse(oracle.check(object123View.assertion(jale))); assertFalse(oracle.check(object123View.assertion(egin))); assertFalse(oracle.check(object123View.assertion(helpDeskMembers))); // Some deletes - oracle.delete(abcTechMembers).get(); - oracle.delete(flaggedTechnicianMembers).get(); + retryNesting(() -> oracle.delete(abcTechMembers), 3).get(); + retryNesting(() -> oracle.delete(flaggedTechnicianMembers), 3).get(); } @AfterEach @@ -214,13 +232,11 @@ public void before() throws Exception { identities.keySet().forEach(d -> foundation.addMembership(d.toDigeste())); var sealed = FoundationSeal.newBuilder().setFoundation(foundation).build(); final var group = DigestAlgorithm.DEFAULT.getOrigin(); - TransactionConfiguration txnConfig = new TransactionConfiguration( Executors.newScheduledThreadPool(1, - Thread.ofVirtual() - .factory())); + TransactionConfiguration txnConfig = new TransactionConfiguration( + Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())); identities.forEach((d, id) -> { final var member = new ControlledIdentifierMember(id); - var localRouter = new LocalServer(prefix, member).router( - ServerConnectionCache.newBuilder().setTarget(30)); + var localRouter = new LocalServer(prefix, member).router(ServerConnectionCache.newBuilder().setTarget(30)); routers.add(localRouter); var domain = new ProcessDomain(group, member, params, "jdbc:h2:mem:", checkpointDirBase, RuntimeParameters.newBuilder() @@ -239,8 +255,7 @@ public void before() throws Exception { @Test public void smoke() throws Exception { domains.forEach(Domain::start); - final var activated = Utils.waitForCondition(60_000, 1_000, - () -> domains.stream().allMatch(Domain::active)); + final var activated = Utils.waitForCondition(60_000, 1_000, () -> domains.stream().allMatch(Domain::active)); assertTrue(activated, "Domains did not fully activate: " + (domains.stream() .filter(c -> !c.active()) .map(Domain::logState) diff --git a/protocols/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java b/protocols/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java index d370a5c03..acb21228e 100644 --- a/protocols/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java +++ b/protocols/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java @@ -1,24 +1,17 @@ /** * Copyright 2018 Netflix, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package com.netflix.concurrency.limits.limiter; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; - import com.netflix.concurrency.limits.Limit; import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.MetricIds; @@ -26,42 +19,13 @@ import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; import com.netflix.concurrency.limits.limit.VegasLimit; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + public abstract class AbstractLimiter implements Limiter { public static final String ID_TAG = "id"; public static final String STATUS_TAG = "status"; - - public abstract static class Builder> { - private static final AtomicInteger idCounter = new AtomicInteger(); - - private Limit limit = VegasLimit.newDefault(); - private Supplier clock = System::nanoTime; - - protected String name = "unnamed-" + idCounter.incrementAndGet(); - protected MetricRegistry registry = EmptyMetricRegistry.INSTANCE; - - public BuilderT named(String name) { - this.name = name; - return self(); - } - - public BuilderT limit(Limit limit) { - this.limit = limit; - return self(); - } - - public BuilderT clock(Supplier clock) { - this.clock = clock; - return self(); - } - - public BuilderT metricRegistry(MetricRegistry registry) { - this.registry = registry == null ? EmptyMetricRegistry.INSTANCE : registry; - return self(); - } - - protected abstract BuilderT self(); - } - private final AtomicInteger inFlight = new AtomicInteger(); private final Supplier clock; private final Limit limitAlgorithm; @@ -69,7 +33,6 @@ public BuilderT metricRegistry(MetricRegistry registry) { private final MetricRegistry.Counter droppedCounter; private final MetricRegistry.Counter ignoredCounter; private final MetricRegistry.Counter rejectedCounter; - private volatile int limit; protected AbstractLimiter(Builder builder) { @@ -100,7 +63,8 @@ protected Listener createListener() { return new Listener() { @Override public void onSuccess() { - inFlight.decrementAndGet(); + var result = inFlight.decrementAndGet(); + assert result >= 0 : "result < 0: " + result; successCounter.increment(); limitAlgorithm.onSample(startTime, clock.get() - startTime, currentInflight, false); @@ -108,13 +72,15 @@ public void onSuccess() { @Override public void onIgnore() { - inFlight.decrementAndGet(); + var result = inFlight.decrementAndGet(); + assert result >= 0 : "result < 0: " + result; ignoredCounter.increment(); } @Override public void onDropped() { - inFlight.decrementAndGet(); + var result = inFlight.decrementAndGet(); + assert result >= 0 : "result < 0: " + result; droppedCounter.increment(); limitAlgorithm.onSample(startTime, clock.get() - startTime, currentInflight, true); @@ -134,4 +100,34 @@ protected void onNewLimit(int newLimit) { limit = newLimit; } + public abstract static class Builder> { + private static final AtomicInteger idCounter = new AtomicInteger(); + protected String name = "unnamed-" + idCounter.incrementAndGet(); + protected MetricRegistry registry = EmptyMetricRegistry.INSTANCE; + private Limit limit = VegasLimit.newDefault(); + private Supplier clock = System::nanoTime; + + public BuilderT named(String name) { + this.name = name; + return self(); + } + + public BuilderT limit(Limit limit) { + this.limit = limit; + return self(); + } + + public BuilderT clock(Supplier clock) { + this.clock = clock; + return self(); + } + + public BuilderT metricRegistry(MetricRegistry registry) { + this.registry = registry == null ? EmptyMetricRegistry.INSTANCE : registry; + return self(); + } + + protected abstract BuilderT self(); + } + } diff --git a/sql-state/src/main/java/com/salesforce/apollo/state/Emulator.java b/sql-state/src/main/java/com/salesforce/apollo/state/Emulator.java index dba871703..a2358cc62 100644 --- a/sql-state/src/main/java/com/salesforce/apollo/state/Emulator.java +++ b/sql-state/src/main/java/com/salesforce/apollo/state/Emulator.java @@ -6,15 +6,15 @@ */ package com.salesforce.apollo.state; -import com.salesfoce.apollo.choam.proto.SubmitResult; +import com.salesfoce.apollo.choam.proto.*; import com.salesfoce.apollo.choam.proto.SubmitResult.Result; -import com.salesfoce.apollo.choam.proto.Transaction; import com.salesfoce.apollo.state.proto.Txn; import com.salesforce.apollo.choam.CHOAM; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Parameters; import com.salesforce.apollo.choam.Parameters.RuntimeParameters; import com.salesforce.apollo.choam.Session; +import com.salesforce.apollo.choam.support.HashedCertifiedBlock; import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.DigestAlgorithm; import com.salesforce.apollo.membership.ContextImpl; @@ -48,14 +48,14 @@ public class Emulator { private final AtomicReference hash; - private final AtomicLong height = new AtomicLong(0); - private final ReentrantLock lock = new ReentrantLock(); - private final Mutator mutator; - private final Parameters params; - private final SqlStateMachine ssm; - private final AtomicBoolean started = new AtomicBoolean(); - private final TransactionExecutor txnExec; - private final AtomicInteger txnIndex = new AtomicInteger(0); + private final AtomicLong height = new AtomicLong(0); + private final ReentrantLock lock = new ReentrantLock(); + private final Mutator mutator; + private final Parameters params; + private final SqlStateMachine ssm; + private final AtomicBoolean started = new AtomicBoolean(); + private final TransactionExecutor txnExec; + private final AtomicInteger txnIndex = new AtomicInteger(0); public Emulator() throws IOException { this(DigestAlgorithm.DEFAULT.getOrigin().prefix(Entropy.nextBitsStreamLong())); @@ -63,8 +63,7 @@ public Emulator() throws IOException { public Emulator(Digest base) throws IOException { this(new SqlStateMachine(String.format("jdbc:h2:mem:emulation-%s-%s", base, Entropy.nextBitsStreamLong()), - new Properties(), Files.createTempDirectory("emulation").toFile()), - base); + new Properties(), Files.createTempDirectory("emulation").toFile()), base); } public Emulator(SqlStateMachine ssm, Digest base) { @@ -77,27 +76,34 @@ public Emulator(SqlStateMachine ssm, Digest base) { } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e); } - entropy.setSeed(new byte[]{6, 6, 6}); + entropy.setSeed(new byte[] { 6, 6, 6 }); ControlledIdentifier identifier; identifier = new StereotomyImpl(new MemKeyStore(), new MemKERL(DigestAlgorithm.DEFAULT), - entropy).newIdentifier(); + entropy).newIdentifier(); params = Parameters.newBuilder() - .build(RuntimeParameters.newBuilder() - .setMember(new ControlledIdentifierMember(identifier)) - .setContext(new ContextImpl<>(base, 5, 0.01, 3)) - .build()); + .build(RuntimeParameters.newBuilder() + .setMember(new ControlledIdentifierMember(identifier)) + .setContext(new ContextImpl<>(base, 5, 0.01, 3)) + .build()); var algorithm = base.getAlgorithm(); Session session = new Session(params, st -> { lock.lock(); try { Transaction txn = st.transaction(); txnExec.execute(txnIndex.incrementAndGet(), CHOAM.hashOf(txn, algorithm), txn, st.onCompletion(), - r -> r.run()); + r -> r.run()); return SubmitResult.newBuilder().setResult(Result.PUBLISHED).build(); } finally { lock.unlock(); } }); + session.setView(new HashedCertifiedBlock(DigestAlgorithm.DEFAULT, CertifiedBlock.newBuilder() + .setBlock(Block.newBuilder() + .setHeader( + Header.newBuilder() + .setHeight( + 100))) + .build())); mutator = ssm.getMutator(session); } diff --git a/sql-state/src/main/java/com/salesforce/apollo/state/SqlStateMachine.java b/sql-state/src/main/java/com/salesforce/apollo/state/SqlStateMachine.java index 7df638193..6a896ff1f 100644 --- a/sql-state/src/main/java/com/salesforce/apollo/state/SqlStateMachine.java +++ b/sql-state/src/main/java/com/salesforce/apollo/state/SqlStateMachine.java @@ -6,56 +6,13 @@ */ package com.salesforce.apollo.state; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.lang.reflect.Method; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.sql.CallableStatement; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.zip.GZIPOutputStream; - -import javax.sql.rowset.CachedRowSet; -import javax.sql.rowset.RowSetFactory; -import javax.sql.rowset.RowSetProvider; - -import org.joou.ULong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.InvalidProtocolBufferException; import com.salesfoce.apollo.choam.proto.Transaction; -import com.salesfoce.apollo.state.proto.Arguments; -import com.salesfoce.apollo.state.proto.Batch; -import com.salesfoce.apollo.state.proto.BatchUpdate; -import com.salesfoce.apollo.state.proto.BatchedTransaction; -import com.salesfoce.apollo.state.proto.Call; -import com.salesfoce.apollo.state.proto.ChangeLog; -import com.salesfoce.apollo.state.proto.Drop; -import com.salesfoce.apollo.state.proto.Migration; -import com.salesfoce.apollo.state.proto.Script; import com.salesfoce.apollo.state.proto.Statement; -import com.salesfoce.apollo.state.proto.Txn; +import com.salesfoce.apollo.state.proto.*; import com.salesforce.apollo.choam.CHOAM.TransactionExecutor; import com.salesforce.apollo.choam.Session; import com.salesforce.apollo.choam.support.CheckpointState; @@ -63,26 +20,18 @@ import com.salesforce.apollo.crypto.Digest; import com.salesforce.apollo.crypto.QualifiedBase64; import com.salesforce.apollo.state.Mutator.BatchedTransactionException; -import com.salesforce.apollo.state.liquibase.LiquibaseConnection; -import com.salesforce.apollo.state.liquibase.MigrationAccessor; -import com.salesforce.apollo.state.liquibase.NullResourceAccessor; -import com.salesforce.apollo.state.liquibase.ReplicatedChangeLogHistoryService; -import com.salesforce.apollo.state.liquibase.ThreadLocalScopeManager; +import com.salesforce.apollo.state.liquibase.*; import com.salesforce.apollo.utils.DelegatingJdbcConnector; import com.salesforce.apollo.utils.Entropy; +import com.salesforce.apollo.utils.Utils; import com.salesforce.apollo.utils.bloomFilters.Hash.DigestHasher; - import deterministic.org.h2.api.ErrorCode; import deterministic.org.h2.engine.SessionLocal; import deterministic.org.h2.jdbc.JdbcConnection; import deterministic.org.h2.jdbc.JdbcSQLNonTransientConnectionException; import deterministic.org.h2.jdbc.JdbcSQLNonTransientException; import deterministic.org.h2.message.DbException; -import deterministic.org.h2.util.BlockClock; -import deterministic.org.h2.util.CloseWatcher; -import deterministic.org.h2.util.DateTimeUtils; -import deterministic.org.h2.util.JdbcUtils; -import deterministic.org.h2.util.MathUtils; +import deterministic.org.h2.util.*; import deterministic.org.h2.value.Value; import liquibase.CatalogAndSchema; import liquibase.Contexts; @@ -93,197 +42,51 @@ import liquibase.exception.LiquibaseException; import liquibase.resource.ClassLoaderResourceAccessor; import liquibase.util.StringUtil; +import org.joou.ULong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.rowset.CachedRowSet; +import javax.sql.rowset.RowSetFactory; +import javax.sql.rowset.RowSetProvider; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.Method; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.sql.*; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.zip.GZIPOutputStream; /** - * This is ye Jesus Nut of sql state via distribute linear logs. We use H2 as a - * the local materialized view that is constructed by SQL DML and DDL embedded - * in the log as SQL statements. Checkpointing is accomplished by SCRIPT command - * that will output SQL to recreate the state of the DB at the block height - * (i.e. the checkpoint). Mutation is interactive with the submitter of the - * transaction statements - i.e. result sets n' multiple result sets n' out - * params (in calls). This provides a "reasonable" asynchronous interaction with - * this log based mutation (through consensus). + * This is ye Jesus Nut of sql state via distribute linear logs. We use H2 as a the local materialized view that is + * constructed by SQL DML and DDL embedded in the log as SQL statements. Checkpointing is accomplished by SCRIPT command + * that will output SQL to recreate the state of the DB at the block height (i.e. the checkpoint). Mutation is + * interactive with the submitter of the transaction statements - i.e. result sets n' multiple result sets n' out params + * (in calls). This provides a "reasonable" asynchronous interaction with this log based mutation (through consensus). *

- * Batch oriented, but low enough latency to make it worth the wait (with the - * right system wide consensus/distribution, 'natch). + * Batch oriented, but low enough latency to make it worth the wait (with the right system wide consensus/distribution, + * 'natch). * * @author hal.hildebrand - * */ public class SqlStateMachine { - public static class CallResult { - public final List outValues; - public final List results; - - public CallResult(List out, List results) { - this.outValues = out; - this.results = results; - } - - public ValueType get(int index) { - @SuppressWarnings("unchecked") - ValueType v = (ValueType) outValues.get(index); - return v; - } - } - - public record Current(ULong height, Digest blkHash) {} - - public record Event(String discriminator, JsonNode body) {} - - public static class ReadOnlyConnector extends DelegatingJdbcConnector { - - private final CloseWatcher watcher; - - public ReadOnlyConnector(Connection wrapped, deterministic.org.h2.engine.Session session) throws SQLException { - super(wrapped); - wrapped.setReadOnly(true); - wrapped.setAutoCommit(false); - this.watcher = CloseWatcher.register(this, session, false); - } - - @Override - public void close() throws SQLException { - CloseWatcher.unregister(watcher); - } - - @Override - public boolean getAutoCommit() throws SQLException { - return false; - } - - @Override - public boolean isWrapperFor(Class iface) throws SQLException { - return false; - } - - @Override - public void setAutoCommit(boolean autoCommit) throws SQLException { - if (autoCommit) { - throw new SQLException("Cannot set autocommit on this connection"); - } - } - - @Override - public void setReadOnly(boolean readOnly) throws SQLException { - if (!readOnly) { - throw new SQLException("This is a read only connection"); - } - } - - @Override - public void setTransactionIsolation(int level) throws SQLException { - throw new SQLException("Cannot set transaction isolation level on this connection"); - } - - @Override - public T unwrap(Class iface) throws SQLException { - throw new SQLException("Cannot unwrap: " + iface.getCanonicalName() + "on th connection"); - } - } - - public class TxnExec implements TransactionExecutor { - @Override - public void beginBlock(ULong height, Digest hash) { - SqlStateMachine.this.beginBlock(height, hash); - } - - @Override - public void endBlock(ULong height, Digest hash) { - SqlStateMachine.this.endBlock(height, hash); - } - - @Override - public void execute(int index, Digest txnHash, Transaction tx, - @SuppressWarnings("rawtypes") CompletableFuture onComplete, Executor executor) { - boolean closed; - try { - closed = connection().isClosed(); - } catch (SQLException e) { - return; - } - if (closed) { - return; - } - Txn txn; - try { - txn = Txn.parseFrom(tx.getContent()); - } catch (InvalidProtocolBufferException e) { - log.warn("invalid txn: {}", tx, e); - onComplete.completeExceptionally(e); - return; - } - withContext(() -> { - SqlStateMachine.this.execute(index, txnHash, txn, onComplete, executor); - }); - - } - - @Override - public void genesis(Digest hash, List initialization) { - begin(ULong.valueOf(0), hash); - withContext(() -> { - initializeState(); - updateCurrent(ULong.valueOf(0), hash, -1, Digest.NONE); - }); - int i = 0; - for (Transaction txn : initialization) { - execute(i, Digest.NONE, txn, null, r -> r.run()); - } - log.debug("Genesis executed on: {}", url); - } - } - - @FunctionalInterface - private interface CheckedFunction { - B apply(A a) throws SQLException; - } - - private static class EventTrampoline { - private volatile Consumer> handler; - private volatile List pending = new ArrayList<>(); - - public void deregister() { - handler = null; - } - - private void evaluate() { - try { - if (handler != null) { - try { - handler.accept(pending); - } catch (Throwable e) { - log.trace("handler failed for {}", e); - } - } - } finally { - pending = new ArrayList<>(); - } - } - - private void publish(Event event) { - pending.add(event); - } - - private void register(Consumer> handler) { - this.handler = handler; - } - } - - private record baseAndAccessor(Liquibase liquibase, MigrationAccessor ra) implements AutoCloseable { - @Override - public void close() throws LiquibaseException { - ra.clone(); - liquibase.close(); - } - } - - private static final String CREATE_ALIAS_APOLLO_INTERNAL_PUBLISH = String.format("CREATE ALIAS apollo_internal.publish FOR \"%s.publish\"", - SqlStateMachine.class.getCanonicalName()); + private static final String CREATE_ALIAS_APOLLO_INTERNAL_PUBLISH = String.format( + "CREATE ALIAS apollo_internal.publish FOR \"%s.publish\"", SqlStateMachine.class.getCanonicalName()); private static final String DELETE_FROM_APOLLO_INTERNAL_TRAMPOLINE = "DELETE FROM apollo_internal.trampoline"; private static final RowSetFactory factory; - private static final Logger log = LoggerFactory.getLogger(SqlStateMachine.class); + private static final Logger log = LoggerFactory.getLogger( + SqlStateMachine.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String PUBLISH_INSERT = "INSERT INTO apollo_internal.trampoline(channel, body) VALUES(?1, ?2 FORMAT JSON)"; private static final String SELECT_FROM_APOLLO_INTERNAL_TRAMPOLINE = "SELECT * FROM apollo_internal.trampoline"; @@ -294,6 +97,7 @@ public void close() throws LiquibaseException { ThreadLocalScopeManager.initialize(); ChangeLogHistoryServiceFactory.getInstance().register(new ReplicatedChangeLogHistoryService()); } + static { try { factory = RowSetProvider.newFactory(); @@ -302,31 +106,20 @@ public void close() throws LiquibaseException { } } - public static boolean publish(Connection connection, String channel, String jsonBody) { - try (PreparedStatement statement = connection.prepareStatement(PUBLISH_INSERT)) { - statement.setString(1, channel); - statement.setString(2, jsonBody); - statement.execute(); - } catch (SQLException e) { - throw new IllegalStateException("Unable to publish: " + channel, e); - } - return true; - } - private final File checkpointDirectory; private final BlockClock clock = new BlockClock(); private final ScriptCompiler compiler = new ScriptCompiler(); private final JdbcConnection connection; private final AtomicReference currentBlock = new AtomicReference<>(); - private PreparedStatement deleteEvents; private final AtomicReference entropy = new AtomicReference<>(); private final AtomicReference executingBlock = new AtomicReference<>(); private final TxnExec executor = new TxnExec(); - private PreparedStatement getEvents; private final SecureRandom secureEntropy; private final EventTrampoline trampoline = new EventTrampoline(); - private PreparedStatement updateCurrent; private final String url; + private PreparedStatement deleteEvents; + private PreparedStatement getEvents; + private PreparedStatement updateCurrent; { try { @@ -345,8 +138,8 @@ public SqlStateMachine(String url, Properties info, File cpDir) { } } else { if (!checkpointDirectory.mkdirs()) { - throw new IllegalArgumentException("Cannot create checkpoint directory: " - + checkpointDirectory.getAbsolutePath()); + throw new IllegalArgumentException( + "Cannot create checkpoint directory: " + checkpointDirectory.getAbsolutePath()); } } connection = withContext(() -> { @@ -365,6 +158,17 @@ public SqlStateMachine(String url, Properties info, File cpDir) { }); } + public static boolean publish(Connection connection, String channel, String jsonBody) { + try (PreparedStatement statement = connection.prepareStatement(PUBLISH_INSERT)) { + statement.setString(1, channel); + statement.setString(2, jsonBody); + statement.execute(); + } catch (SQLException e) { + throw new IllegalStateException("Unable to publish: " + channel, e); + } + return true; + } + public void close() { try { connection().rollback(); @@ -429,11 +233,11 @@ public Function getCheckpointer() { log.error("Written file does not exist: {}", temp.getAbsolutePath()); return null; } -// try (FileInputStream fis = new FileInputStream(temp)) { -// System.out.println(Utils.getDocument(fis)); -// } catch (IOException e1) { -// throw new IllegalStateException(e1); -// } + // try (FileInputStream fis = new FileInputStream(temp)) { + // System.out.println(Utils.getDocument(fis)); + // } catch (IOException e1) { + // throw new IllegalStateException(e1); + // } File checkpoint = new File(checkpointDirectory, String.format("checkpoint-%s--%s.gzip", height, rndm)); try (FileInputStream fis = new FileInputStream(temp); FileOutputStream fos = new FileOutputStream(checkpoint); @@ -506,26 +310,18 @@ SessionLocal getSession() { // Test accessible void initializeState() { - java.sql.Statement statement = null; ChangeLogHistoryServiceFactory.getInstance().register(new ReplicatedChangeLogHistoryService()); final var database = new H2Database(); database.setConnection(new liquibase.database.jvm.JdbcConnection(new LiquibaseConnection(connection()))); - try (Liquibase liquibase = new Liquibase(SQL_STATE_INTERNAL, new ClassLoaderResourceAccessor(), database)) { + try (var statement = connection().createStatement(); + Liquibase liquibase = new Liquibase(SQL_STATE_INTERNAL, new ClassLoaderResourceAccessor(), database)) { liquibase.update((String) null); - statement = connection().createStatement(); statement.execute(CREATE_ALIAS_APOLLO_INTERNAL_PUBLISH); initializeStatements(); } catch (SQLException e) { throw new IllegalStateException("unable to initialize db state", e); } catch (LiquibaseException e) { throw new IllegalStateException("unable to initialize db state", e); - } finally { - if (statement != null) { - try { - statement.close(); - } catch (SQLException e) { - } - } } log.debug("Initialized state on: {}", url); } @@ -581,8 +377,8 @@ private CallResult acceptCall(Call call) throws SQLException { for (int t : call.getOutParametersList()) { exec.registerOutParameter(p++, t); } - for (Value v : new StreamTransfer(call.getArgs().getVersion(), getSession()).read(call.getArgs() - .getArgs())) { + for (Value v : new StreamTransfer(call.getArgs().getVersion(), getSession()).read( + call.getArgs().getArgs())) { setArgument(exec, p++, v); } List out = new ArrayList<>(); @@ -722,9 +518,8 @@ private Object acceptScript(Script script) throws SQLException { } if (call == null) { - throw DbException.get(ErrorCode.SYNTAX_ERROR_1, - new IllegalArgumentException("Must contain invocation method named: " + callName - + "(...)"), script.getSource()); + throw DbException.get(ErrorCode.SYNTAX_ERROR_1, new IllegalArgumentException( + "Must contain invocation method named: " + callName + "(...)"), script.getSource()); } Object returnValue = new JavaMethod(call).getValue(instance, getSession(), args); @@ -834,14 +629,14 @@ private void execute(int index, Digest txnHash, Txn tx, try { Object results = switch (tx.getExecutionCase()) { - case BATCH -> SqlStateMachine.this.acceptBatch(tx.getBatch()); - case CALL -> acceptCall(tx.getCall()); - case BATCHUPDATE -> SqlStateMachine.this.acceptBatchUpdate(tx.getBatchUpdate()); - case STATEMENT -> SqlStateMachine.this.acceptPreparedStatement(tx.getStatement()); - case SCRIPT -> SqlStateMachine.this.acceptScript(tx.getScript()); - case BATCHED -> acceptBatchTransaction(tx.getBatched()); - case MIGRATION -> acceptMigration(tx.getMigration()); - default -> null; + case BATCH -> SqlStateMachine.this.acceptBatch(tx.getBatch()); + case CALL -> acceptCall(tx.getCall()); + case BATCHUPDATE -> SqlStateMachine.this.acceptBatchUpdate(tx.getBatchUpdate()); + case STATEMENT -> SqlStateMachine.this.acceptPreparedStatement(tx.getStatement()); + case SCRIPT -> SqlStateMachine.this.acceptScript(tx.getScript()); + case BATCHED -> acceptBatchTransaction(tx.getBatched()); + case MIGRATION -> acceptMigration(tx.getMigration()); + default -> null; }; this.complete(onCompletion, results, executor); } catch (JdbcSQLNonTransientConnectionException e) { @@ -878,14 +673,14 @@ private T execute(String sql, CheckedFunction executio private Object execute(Txn txn) throws Exception { return switch (txn.getExecutionCase()) { - case BATCH -> acceptBatch(txn.getBatch()); - case BATCHUPDATE -> acceptBatchUpdate(txn.getBatchUpdate()); - case CALL -> acceptCall(txn.getCall()); - case SCRIPT -> acceptScript(txn.getScript()); - case STATEMENT -> acceptPreparedStatement(txn.getStatement()); - case BATCHED -> acceptBatchTransaction(txn.getBatched()); - case MIGRATION -> acceptMigration(txn.getMigration()); - default -> null; + case BATCH -> acceptBatch(txn.getBatch()); + case BATCHUPDATE -> acceptBatchUpdate(txn.getBatchUpdate()); + case CALL -> acceptCall(txn.getCall()); + case SCRIPT -> acceptScript(txn.getScript()); + case STATEMENT -> acceptPreparedStatement(txn.getStatement()); + case BATCHED -> acceptBatchTransaction(txn.getBatched()); + case MIGRATION -> acceptMigration(txn.getMigration()); + default -> null; }; } @@ -999,6 +794,9 @@ private void updateCurrent(ULong height, Digest blkHash, int txn, Digest txnHash updateCurrent.setLong(3, txn); updateCurrent.setString(4, QualifiedBase64.qb64(txnHash)); updateCurrent.execute(); + } catch (JdbcSQLNonTransientConnectionException e) { + log.trace("Connection closed, failure to update current block: {} hash: {} txn: {} hash: {} on: {}", height, + blkHash, txn, txnHash, url); } catch (SQLException e) { log.debug("Failure to update current block: {} hash: {} txn: {} hash: {} on: {}", height, blkHash, txn, txnHash, url); @@ -1023,9 +821,178 @@ private T withContext(Callable action) { } private void withContext(Runnable action) { - withContext(() -> { + withContext(Utils.wrapped(() -> { action.run(); return null; - }); + }, log)); + } + + @FunctionalInterface + private interface CheckedFunction { + B apply(A a) throws SQLException; + } + + public static class CallResult { + public final List outValues; + public final List results; + + public CallResult(List out, List results) { + this.outValues = out; + this.results = results; + } + + public ValueType get(int index) { + @SuppressWarnings("unchecked") + ValueType v = (ValueType) outValues.get(index); + return v; + } + } + + public record Current(ULong height, Digest blkHash) { + } + + public record Event(String discriminator, JsonNode body) { + } + + public static class ReadOnlyConnector extends DelegatingJdbcConnector { + + private final CloseWatcher watcher; + + public ReadOnlyConnector(Connection wrapped, deterministic.org.h2.engine.Session session) throws SQLException { + super(wrapped); + wrapped.setReadOnly(true); + wrapped.setAutoCommit(false); + this.watcher = CloseWatcher.register(this, session, false); + } + + @Override + public void close() throws SQLException { + CloseWatcher.unregister(watcher); + } + + @Override + public boolean getAutoCommit() throws SQLException { + return false; + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + if (autoCommit) { + throw new SQLException("Cannot set autocommit on this connection"); + } + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + if (!readOnly) { + throw new SQLException("This is a read only connection"); + } + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + throw new SQLException("Cannot set transaction isolation level on this connection"); + } + + @Override + public T unwrap(Class iface) throws SQLException { + throw new SQLException("Cannot unwrap: " + iface.getCanonicalName() + "on th connection"); + } + } + + private static class EventTrampoline { + private volatile Consumer> handler; + private volatile List pending = new ArrayList<>(); + + public void deregister() { + handler = null; + } + + private void evaluate() { + try { + if (handler != null) { + try { + handler.accept(pending); + } catch (Throwable e) { + log.trace("handler failed for {}", e); + } + } + } finally { + pending = new ArrayList<>(); + } + } + + private void publish(Event event) { + pending.add(event); + } + + private void register(Consumer> handler) { + this.handler = handler; + } + } + + private record baseAndAccessor(Liquibase liquibase, MigrationAccessor ra) implements AutoCloseable { + @Override + public void close() throws LiquibaseException { + ra.clone(); + liquibase.close(); + } + } + + public class TxnExec implements TransactionExecutor { + @Override + public void beginBlock(ULong height, Digest hash) { + SqlStateMachine.this.beginBlock(height, hash); + } + + @Override + public void endBlock(ULong height, Digest hash) { + SqlStateMachine.this.endBlock(height, hash); + } + + @Override + public void execute(int index, Digest txnHash, Transaction tx, + @SuppressWarnings("rawtypes") CompletableFuture onComplete, Executor executor) { + boolean closed; + try { + closed = connection().isClosed(); + } catch (SQLException e) { + return; + } + if (closed) { + return; + } + Txn txn; + try { + txn = Txn.parseFrom(tx.getContent()); + } catch (InvalidProtocolBufferException e) { + log.warn("invalid txn: {}", tx, e); + onComplete.completeExceptionally(e); + return; + } + withContext(() -> { + SqlStateMachine.this.execute(index, txnHash, txn, onComplete, executor); + }); + + } + + @Override + public void genesis(Digest hash, List initialization) { + begin(ULong.valueOf(0), hash); + withContext(() -> { + initializeState(); + updateCurrent(ULong.valueOf(0), hash, -1, Digest.NONE); + }); + int i = 0; + for (Transaction txn : initialization) { + execute(i, Digest.NONE, txn, null, r -> r.run()); + } + log.debug("Genesis executed on: {}", url); + } } } diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java index d6c35e3d6..5a7920747 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/CHOAMTest.java @@ -71,7 +71,7 @@ public class CHOAMTest { var txns = MigrationTest.initializeBookSchema(); txns.add(initialInsert()); GENESIS_DATA = CHOAM.toGenesisData(txns); - CARDINALITY = LARGE_TESTS ? 10 : 5; + CARDINALITY = LARGE_TESTS ? 20 : 5; } private final Map updaters = new ConcurrentHashMap<>(); @@ -134,7 +134,7 @@ public void before() throws Exception { .setProducer(ProducerParameters.newBuilder() .setGossipDuration(Duration.ofMillis(10)) .setBatchInterval(Duration.ofMillis(15)) - .setMaxBatchByteSize(10 * 1024 * 1024) + .setMaxBatchByteSize(100 * 1024) .setMaxBatchCount(10_000) .build()) .setCheckpointBlockDelta(2); @@ -160,7 +160,7 @@ public void before() throws Exception { public void submitMultiplTxn() throws Exception { var exec = Executors.newVirtualThreadPerTaskExecutor(); final Random entropy = new Random(); - final Duration timeout = Duration.ofSeconds(12); + final Duration timeout = Duration.ofSeconds(30); var transactioneers = new ArrayList(); final int clientCount = LARGE_TESTS ? 1_000 : 2; final int max = LARGE_TESTS ? 50 : 10; diff --git a/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java b/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java index 9c1212ac2..7b7a13e96 100644 --- a/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java +++ b/sql-state/src/test/java/com/salesforce/apollo/state/Transactioneer.java @@ -6,38 +6,33 @@ */ package com.salesforce.apollo.state; +import com.salesfoce.apollo.state.proto.Txn; +import com.salesforce.apollo.choam.support.InvalidTransaction; +import com.salesforce.apollo.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.Duration; -import java.util.List; import java.util.Random; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.salesfoce.apollo.state.proto.Txn; -import com.salesforce.apollo.choam.support.InvalidTransaction; -import com.salesforce.apollo.utils.Utils; - class Transactioneer { - private final static Random entropy = new Random(); - private final static Logger log = LoggerFactory.getLogger(Transactioneer.class); - private final AtomicInteger completed = new AtomicInteger(); - private final CountDownLatch countdown; - private final Executor executor; - private final List> inFlight = new CopyOnWriteArrayList<>(); - private final int max; - private final Mutator mutator; - private final ScheduledExecutorService scheduler; - private final Duration timeout; - private final Supplier update; + private final static Random entropy = new Random(); + private final static Logger log = LoggerFactory.getLogger(Transactioneer.class); + private final AtomicInteger completed = new AtomicInteger(); + private final CountDownLatch countdown; + private final Executor executor; + private final AtomicReference inFlight = new AtomicReference<>(); + private final int max; + private final Mutator mutator; + private final ScheduledExecutorService scheduler; + private final Duration timeout; + private final Supplier update; + private final AtomicBoolean finished = new AtomicBoolean(); public Transactioneer(Supplier update, Mutator mutator, Duration timeout, int max, Executor executor, CountDownLatch countdown, ScheduledExecutorService txScheduler) { @@ -55,13 +50,13 @@ public int completed() { } public int inFlight() { - return inFlight.size(); + return inFlight.get() != null ? 1 : 0; } void decorate(CompletableFuture fs) { final var futureSailor = new AtomicReference>(); - futureSailor.set(fs.whenCompleteAsync((o, t) -> { - inFlight.remove(futureSailor.get()); + futureSailor.set(fs.whenComplete((o, t) -> { + inFlight.set(null); if (t != null) { if (completed.get() < max) { scheduler.schedule(() -> { @@ -76,6 +71,8 @@ void decorate(CompletableFuture fs) { } } else { final var complete = completed.incrementAndGet(); + final var finish = finished; + if (complete < max) { scheduler.schedule(() -> { executor.execute(Utils.wrapped(() -> { @@ -86,14 +83,12 @@ void decorate(CompletableFuture fs) { } }, log)); }, entropy.nextInt(100), TimeUnit.MILLISECONDS); - } else if (complete >= max) { - if (inFlight.size() == 0) { - countdown.countDown(); - } + } else if (finish.compareAndSet(false, true)) { + countdown.countDown(); } } - }, executor)); - inFlight.add(futureSailor.get()); + })); + inFlight.set(futureSailor.get()); } void start() { diff --git a/sql-state/src/test/resources/logback-test.xml b/sql-state/src/test/resources/logback-test.xml index da12a7fe7..1223b88dc 100644 --- a/sql-state/src/test/resources/logback-test.xml +++ b/sql-state/src/test/resources/logback-test.xml @@ -47,7 +47,7 @@ - + @@ -59,4 +59,4 @@ - \ No newline at end of file +