From 4e97f977deeab878e6f2e6fdcc42c5ce71afd497 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 8 Jan 2025 16:01:07 -0800 Subject: [PATCH] shaded jar and configurable failure threshold Signed-off-by: Sean Kao --- build.sbt | 1 + .../opensearch/flint/core/FlintOptions.java | 16 ++++++--- .../core/storage/BulkRequestRateLimiter.java | 29 ++++++++++++--- .../storage/OpenSearchBulkRetryWrapper.java | 9 ++--- .../OpenSearchBulkRetryWrapperTest.java | 35 ++++++++++++++++++- .../sql/flint/config/FlintSparkConf.scala | 9 +++++ .../flint/config/FlintSparkConfSuite.scala | 13 ++++--- 7 files changed, 90 insertions(+), 22 deletions(-) diff --git a/build.sbt b/build.sbt index 55c5d6a4e..c1d877522 100644 --- a/build.sbt +++ b/build.sbt @@ -58,6 +58,7 @@ val packagesToShade = Seq( "com.fasterxml.jackson.core.**", "com.fasterxml.jackson.dataformat.**", "com.fasterxml.jackson.databind.**", + "com.google.**", "com.sun.jna.**", "com.thoughtworks.paranamer.**", "javax.annotation.**", diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index b522bea5c..7be937333 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -111,16 +111,18 @@ public class FlintOptions implements Serializable { private static final String UNKNOWN = "UNKNOWN"; - public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "bulk.rateLimitPerNode.enabled"; + public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "bulk.rate_limit_per_node.enabled"; public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "false"; - public static final String BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "bulk.rateLimitPerNode.min"; + public static final String BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "bulk.rate_limit_per_node.min"; public static final String DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "10"; - public static final String BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "bulk.rateLimitPerNode.max"; + public static final String BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "bulk.rate_limit_per_node.max"; public static final String DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "100"; - public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "bulk.rateLimitPerNode.increaseStep"; + public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "bulk.rate_limit_per_node.increase_step"; public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "10"; - public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "bulk.rateLimitPerNode.decreaseRatio"; + public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "bulk.rate_limit_per_node.decrease_ratio"; public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "0.8"; + public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD = "bulk.rate_limit_per_node.partial_failure_threshold"; + public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD = "0.2"; public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes"; @@ -263,6 +265,10 @@ public double getBulkRequestRateLimitPerNodeDecreaseRatio() { return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO)); } + public double getBulkRequestRateLimitPerNodePartialFailureThreshold() { + return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD)); + } + public String getCustomAsyncQuerySchedulerClass() { return options.getOrDefault(CUSTOM_FLINT_SCHEDULER_CLASS, ""); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java index ea5cce4be..84003d389 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java @@ -19,15 +19,16 @@ public class BulkRequestRateLimiter { private final double maxRate; private final double increaseStep; private final double decreaseRatio; + private final double partialFailureThreshold; public BulkRequestRateLimiter(FlintOptions flintOptions) { - // TODO: instead of leaving rateLimiter as null, use a default no-op impl for BulkRequestRateLimiter + // TODO: instead of leaving rateLimiter as null, use a default no-op impl for BulkRequestRateLimiter? - // TODO: validate values? minRate = flintOptions.getBulkRequestMinRateLimitPerNode(); maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode(); increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep(); decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio(); + partialFailureThreshold = flintOptions.getBulkRequestRateLimitPerNodePartialFailureThreshold(); if (flintOptions.getBulkRequestRateLimitPerNodeEnabled()) { LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec"); @@ -42,15 +43,35 @@ public BulkRequestRateLimiter(FlintOptions flintOptions) { public void acquirePermit() { if (rateLimiter != null) { this.rateLimiter.acquire(); + LOG.info("Acquired 1 permit"); } } public void acquirePermit(int permits) { if (rateLimiter != null) { this.rateLimiter.acquire(permits); + LOG.info("Acquired " + permits + " permits"); } } + /** + * Notify rate limiter of the failure rate of a bulk request. Additive-increase or multiplicative-decrease + * rate limit based on the failure rate. Does nothing if rate limit is not set. + * @param failureRate failure rate of the bulk request between 0 and 1 + */ + public void reportFailure(double failureRate) { + if (rateLimiter != null) { + if (failureRate > partialFailureThreshold) { + decreaseRate(); + } else { + increaseRate(); + } + } + } + + /** + * Rate getter and setter are public for test purpose only + */ public double getRate() { if (rateLimiter != null) { return this.rateLimiter.getRate(); @@ -68,13 +89,13 @@ public void setRate(double permitsPerSecond) { } } - public void increaseRate() { + private void increaseRate() { if (rateLimiter != null) { setRate(getRate() + increaseStep); } } - public void decreaseRate() { + private void decreaseRate() { if (rateLimiter != null) { setRate(getRate() * decreaseRatio); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java index 40678d843..f95439536 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java @@ -79,19 +79,14 @@ private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkReques // decrease rate if retryable result exceeds threshold; otherwise increase rate if (!bulkItemRetryableResultPredicate.test(response)) { MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC, 0); - rateLimiter.increaseRate(); + rateLimiter.reportFailure(0); } else { BulkRequest retryableRequest = getRetryableRequest(nextRequest.get(), response); double retryablePercentage = (double) retryableRequest.requests().size() / response.getItems().length; // TODO: long type metric MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC, (long) (retryablePercentage * 100)); - // TODO: magic number - if (retryablePercentage > 0.4) { - rateLimiter.decreaseRate(); - } else { - rateLimiter.increaseRate(); - } + rateLimiter.reportFailure(retryablePercentage); if (retryPolicy.getConfig().allowsRetries()) { nextRequest.set(retryableRequest); diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java index 3ee4707b5..ac2a62fd1 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapperTest.java @@ -74,7 +74,8 @@ class OpenSearchBulkRetryWrapperTest { FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "2", FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, "20", FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, "1", - FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5")); + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5", + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, "0.2")); FlintOptions optionsWithoutRateLimit = new FlintOptions(Map.of( FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "false")); @@ -284,6 +285,38 @@ public void decreaseRateLimitWhenAllCallFail() throws Exception { }); } + @Test + public void testRateLimitFailureThreshold() throws Exception { + FlintOptions optionsHighFailureThreshold = new FlintOptions(Map.of( + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "true", + FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "2", + FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, "20", + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, "1", + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5", + FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, "0.8")); + + MetricsTestUtil.withMetricEnv(verifier -> { + BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsHighFailureThreshold); + OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper( + retryOptionsWithRetry, rateLimiter); + when(client.bulk(any(), eq(options))) + .thenReturn(failureResponse) + .thenReturn(retriedResponse); + when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1)); + when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES); + mockFailureResponse(); + mockRetriedResponse(); + + rateLimiter.setRate(19); + + bulkRetryWrapper.bulk(client, bulkRequest, options); + + // First request has 50% failure, not exceeding threshold, increasing rate + // Second and third requests has 100% failure, decreasing rate + assertEquals(rateLimiter.getRate(), 5); + }); + } + private void mockFailureResponse() { when(failureResponse.hasFailures()).thenReturn(true); when(failureResponse.getItems()).thenReturn(new BulkItemResponse[]{successItem, failureItem}); diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 18b8ddbcf..9968b0f46 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -170,6 +170,14 @@ object FlintSparkConf { .doc("[Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1.") .createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO) + val BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD = + FlintConfig( + s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD}") + .datasourceOption() + .doc("[Experimental] Partial failure threshold for updating bulk request rate limit per worker node, if rate limit enabled. Must be between 0 and 1.") + .createWithDefault( + FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD) + val RETRYABLE_HTTP_STATUS_CODES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_STATUS_CODES}") .datasourceOption() @@ -371,6 +379,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, + BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, SERVICE_NAME, diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 878945508..ef431cdcf 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -71,22 +71,25 @@ class FlintSparkConfSuite extends FlintSuite { options.getBulkRequestMaxRateLimitPerNode shouldBe 100 options.getBulkRequestRateLimitPerNodeIncreaseStep shouldBe 10 options.getBulkRequestRateLimitPerNodeDecreaseRatio shouldBe 0.8 + options.getBulkRequestRateLimitPerNodePartialFailureThreshold shouldBe 0.2 } test("test specified bulk request rate limit options") { val options = FlintSparkConf( Map( - "bulk.rateLimitPerNode.enabled" -> "true", - "bulk.rateLimitPerNode.min" -> "20", - "bulk.rateLimitPerNode.max" -> "200", - "bulk.rateLimitPerNode.increaseStep" -> "20", - "bulk.rateLimitPerNode.decreaseRatio" -> "0.5").asJava) + "bulk.rate_limit_per_node.enabled" -> "true", + "bulk.rate_limit_per_node.min" -> "20", + "bulk.rate_limit_per_node.max" -> "200", + "bulk.rate_limit_per_node.increase_step" -> "20", + "bulk.rate_limit_per_node.decrease_ratio" -> "0.5", + "bulk.rate_limit_per_node.partial_failure_threshold" -> "0.5").asJava) .flintOptions() options.getBulkRequestRateLimitPerNodeEnabled shouldBe true options.getBulkRequestMinRateLimitPerNode shouldBe 20 options.getBulkRequestMaxRateLimitPerNode shouldBe 200 options.getBulkRequestRateLimitPerNodeIncreaseStep shouldBe 20 options.getBulkRequestRateLimitPerNodeDecreaseRatio shouldBe 0.5 + options.getBulkRequestRateLimitPerNodePartialFailureThreshold shouldBe 0.5 } test("test metadata access AWS credentials provider option") {