Skip to content

Commit

Permalink
shaded jar and configurable failure threshold
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Jan 9, 2025
1 parent 0d96b55 commit 4e97f97
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 22 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ val packagesToShade = Seq(
"com.fasterxml.jackson.core.**",
"com.fasterxml.jackson.dataformat.**",
"com.fasterxml.jackson.databind.**",
"com.google.**",
"com.sun.jna.**",
"com.thoughtworks.paranamer.**",
"javax.annotation.**",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,18 @@ public class FlintOptions implements Serializable {

private static final String UNKNOWN = "UNKNOWN";

public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "bulk.rateLimitPerNode.enabled";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "bulk.rate_limit_per_node.enabled";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "false";
public static final String BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "bulk.rateLimitPerNode.min";
public static final String BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "bulk.rate_limit_per_node.min";
public static final String DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "10";
public static final String BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "bulk.rateLimitPerNode.max";
public static final String BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "bulk.rate_limit_per_node.max";
public static final String DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "100";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "bulk.rateLimitPerNode.increaseStep";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "bulk.rate_limit_per_node.increase_step";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "10";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "bulk.rateLimitPerNode.decreaseRatio";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "bulk.rate_limit_per_node.decrease_ratio";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "0.8";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD = "bulk.rate_limit_per_node.partial_failure_threshold";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD = "0.2";

public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes";

Expand Down Expand Up @@ -263,6 +265,10 @@ public double getBulkRequestRateLimitPerNodeDecreaseRatio() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO));
}

public double getBulkRequestRateLimitPerNodePartialFailureThreshold() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD));
}

public String getCustomAsyncQuerySchedulerClass() {
return options.getOrDefault(CUSTOM_FLINT_SCHEDULER_CLASS, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ public class BulkRequestRateLimiter {
private final double maxRate;
private final double increaseStep;
private final double decreaseRatio;
private final double partialFailureThreshold;

public BulkRequestRateLimiter(FlintOptions flintOptions) {
// TODO: instead of leaving rateLimiter as null, use a default no-op impl for BulkRequestRateLimiter
// TODO: instead of leaving rateLimiter as null, use a default no-op impl for BulkRequestRateLimiter?

// TODO: validate values?
minRate = flintOptions.getBulkRequestMinRateLimitPerNode();
maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode();
increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep();
decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio();
partialFailureThreshold = flintOptions.getBulkRequestRateLimitPerNodePartialFailureThreshold();

if (flintOptions.getBulkRequestRateLimitPerNodeEnabled()) {
LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec");
Expand All @@ -42,15 +43,35 @@ public BulkRequestRateLimiter(FlintOptions flintOptions) {
public void acquirePermit() {
if (rateLimiter != null) {
this.rateLimiter.acquire();
LOG.info("Acquired 1 permit");
}
}

public void acquirePermit(int permits) {
if (rateLimiter != null) {
this.rateLimiter.acquire(permits);
LOG.info("Acquired " + permits + " permits");
}
}

/**
* Notify rate limiter of the failure rate of a bulk request. Additive-increase or multiplicative-decrease
* rate limit based on the failure rate. Does nothing if rate limit is not set.
* @param failureRate failure rate of the bulk request between 0 and 1
*/
public void reportFailure(double failureRate) {
if (rateLimiter != null) {
if (failureRate > partialFailureThreshold) {
decreaseRate();
} else {
increaseRate();
}
}
}

/**
* Rate getter and setter are public for test purpose only
*/
public double getRate() {
if (rateLimiter != null) {
return this.rateLimiter.getRate();
Expand All @@ -68,13 +89,13 @@ public void setRate(double permitsPerSecond) {
}
}

public void increaseRate() {
private void increaseRate() {
if (rateLimiter != null) {
setRate(getRate() + increaseStep);
}
}

public void decreaseRate() {
private void decreaseRate() {
if (rateLimiter != null) {
setRate(getRate() * decreaseRatio);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,14 @@ private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkReques
// decrease rate if retryable result exceeds threshold; otherwise increase rate
if (!bulkItemRetryableResultPredicate.test(response)) {
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC, 0);
rateLimiter.increaseRate();
rateLimiter.reportFailure(0);
} else {
BulkRequest retryableRequest = getRetryableRequest(nextRequest.get(), response);
double retryablePercentage = (double) retryableRequest.requests().size() / response.getItems().length;
// TODO: long type metric
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC, (long) (retryablePercentage * 100));

// TODO: magic number
if (retryablePercentage > 0.4) {
rateLimiter.decreaseRate();
} else {
rateLimiter.increaseRate();
}
rateLimiter.reportFailure(retryablePercentage);

if (retryPolicy.getConfig().allowsRetries()) {
nextRequest.set(retryableRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class OpenSearchBulkRetryWrapperTest {
FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "2",
FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, "20",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, "1",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5"));
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, "0.2"));
FlintOptions optionsWithoutRateLimit = new FlintOptions(Map.of(
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "false"));

Expand Down Expand Up @@ -284,6 +285,38 @@ public void decreaseRateLimitWhenAllCallFail() throws Exception {
});
}

@Test
public void testRateLimitFailureThreshold() throws Exception {
FlintOptions optionsHighFailureThreshold = new FlintOptions(Map.of(
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "true",
FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "2",
FlintOptions.BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, "20",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, "1",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, "0.5",
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD, "0.8"));

MetricsTestUtil.withMetricEnv(verifier -> {
BulkRequestRateLimiter rateLimiter = new BulkRequestRateLimiter(optionsHighFailureThreshold);
OpenSearchBulkRetryWrapper bulkRetryWrapper = new OpenSearchBulkRetryWrapper(
retryOptionsWithRetry, rateLimiter);
when(client.bulk(any(), eq(options)))
.thenReturn(failureResponse)
.thenReturn(retriedResponse);
when(bulkRequest.requests()).thenReturn(ImmutableList.of(indexRequest0, indexRequest1));
when(bulkRequest.estimatedSizeInBytes()).thenReturn(ESTIMATED_SIZE_IN_BYTES);
mockFailureResponse();
mockRetriedResponse();

rateLimiter.setRate(19);

bulkRetryWrapper.bulk(client, bulkRequest, options);

// First request has 50% failure, not exceeding threshold, increasing rate
// Second and third requests has 100% failure, decreasing rate
assertEquals(rateLimiter.getRate(), 5);
});
}

private void mockFailureResponse() {
when(failureResponse.hasFailures()).thenReturn(true);
when(failureResponse.getItems()).thenReturn(new BulkItemResponse[]{successItem, failureItem});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ object FlintSparkConf {
.doc("[Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1.")
.createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO)

val BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD =
FlintConfig(
s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD}")
.datasourceOption()
.doc("[Experimental] Partial failure threshold for updating bulk request rate limit per worker node, if rate limit enabled. Must be between 0 and 1.")
.createWithDefault(
FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD)

val RETRYABLE_HTTP_STATUS_CODES =
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_STATUS_CODES}")
.datasourceOption()
Expand Down Expand Up @@ -371,6 +379,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE,
BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP,
BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO,
BULK_REQUEST_RATE_LIMIT_PER_NODE_PARTIAL_FAILURE_THRESHOLD,
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
SERVICE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,25 @@ class FlintSparkConfSuite extends FlintSuite {
options.getBulkRequestMaxRateLimitPerNode shouldBe 100
options.getBulkRequestRateLimitPerNodeIncreaseStep shouldBe 10
options.getBulkRequestRateLimitPerNodeDecreaseRatio shouldBe 0.8
options.getBulkRequestRateLimitPerNodePartialFailureThreshold shouldBe 0.2
}

test("test specified bulk request rate limit options") {
val options = FlintSparkConf(
Map(
"bulk.rateLimitPerNode.enabled" -> "true",
"bulk.rateLimitPerNode.min" -> "20",
"bulk.rateLimitPerNode.max" -> "200",
"bulk.rateLimitPerNode.increaseStep" -> "20",
"bulk.rateLimitPerNode.decreaseRatio" -> "0.5").asJava)
"bulk.rate_limit_per_node.enabled" -> "true",
"bulk.rate_limit_per_node.min" -> "20",
"bulk.rate_limit_per_node.max" -> "200",
"bulk.rate_limit_per_node.increase_step" -> "20",
"bulk.rate_limit_per_node.decrease_ratio" -> "0.5",
"bulk.rate_limit_per_node.partial_failure_threshold" -> "0.5").asJava)
.flintOptions()
options.getBulkRequestRateLimitPerNodeEnabled shouldBe true
options.getBulkRequestMinRateLimitPerNode shouldBe 20
options.getBulkRequestMaxRateLimitPerNode shouldBe 200
options.getBulkRequestRateLimitPerNodeIncreaseStep shouldBe 20
options.getBulkRequestRateLimitPerNodeDecreaseRatio shouldBe 0.5
options.getBulkRequestRateLimitPerNodePartialFailureThreshold shouldBe 0.5
}

test("test metadata access AWS credentials provider option") {
Expand Down

0 comments on commit 4e97f97

Please sign in to comment.