Skip to content

Commit

Permalink
Adaptive rate limiting for OpenSearch bulk requests (#1011)
Browse files Browse the repository at this point in the history
* adaptive rate limiting

- replace fail safe rate limiter for google guava's
- move rate limiter from RestHighLevelClientWrapper to
  OpenSearchBulkRetryWrapper
- add metrics for rate limit (now convert rate from double to int)
- add spark conf for rate limit parameters
- adjust rate limit based on retryable result percentage

Signed-off-by: Sean Kao <[email protected]>

* metrics and test cases WIP

Signed-off-by: Sean Kao <[email protected]>

* test case

Signed-off-by: Sean Kao <[email protected]>

* shaded jar and configurable failure threshold

Signed-off-by: Sean Kao <[email protected]>

* update default values; add doc

Signed-off-by: Sean Kao <[email protected]>

* rename OpenSearchBulkRetryWrapper (remove Retry)

Signed-off-by: Sean Kao <[email protected]>

* remove failure threshold

Signed-off-by: Sean Kao <[email protected]>

* update metric name suffix to comply with setting

Signed-off-by: Sean Kao <[email protected]>

* remove bulk failure percentage metric

Signed-off-by: Sean Kao <[email protected]>

* change rate from double to long

Signed-off-by: Sean Kao <[email protected]>

* fix spark conf name

Signed-off-by: Sean Kao <[email protected]>

* change default value

Signed-off-by: Sean Kao <[email protected]>

* address comments

- swap parameter for test case asserts
- remove excessive null check (create noop impl for rate limiter)

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Jan 13, 2025
1 parent 974d7d4 commit f054022
Show file tree
Hide file tree
Showing 15 changed files with 412 additions and 96 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ val packagesToShade = Seq(
"com.fasterxml.jackson.core.**",
"com.fasterxml.jackson.dataformat.**",
"com.fasterxml.jackson.databind.**",
"com.google.**",
"com.sun.jna.**",
"com.thoughtworks.paranamer.**",
"javax.annotation.**",
Expand Down Expand Up @@ -120,6 +121,7 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("org.apache.httpcomponents.client5", "httpclient5"),
"org.opensearch" % "opensearch-job-scheduler-spi" % opensearchMavenVersion,
"dev.failsafe" % "failsafe" % "3.3.2",
"com.google.guava" % "guava" % "33.3.1-jre",
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593"
Expand Down
6 changes: 5 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,11 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.write.bulkRequestRateLimitPerNode`: [Experimental] Rate limit(request/sec) for bulk request per worker node. Only accept integer value. To reduce the traffic less than 1 req/sec, batch_bytes or batch_size should be reduced. Default value is 0, which disables rate limit.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.enabled`: [Experimental] Enable rate limit for bulk request per worker node. Default is false.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not exceed this value. Set to -1 for no upper bound. Default is 50000.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step`: [Experimental] Adaptive rate limit increase step for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 500.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio`: [Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0.8.
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
import org.opensearch.client.transport.rest_client.RestClientTransport;

import java.io.IOException;
import org.opensearch.flint.core.storage.BulkRequestRateLimiter;
import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper;
import org.opensearch.flint.core.storage.OpenSearchBulkWrapper;

import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_CREATE_OP_METRIC_PREFIX;
Expand All @@ -54,8 +53,7 @@
*/
public class RestHighLevelClientWrapper implements IRestHighLevelClient {
private final RestHighLevelClient client;
private final BulkRequestRateLimiter rateLimiter;
private final OpenSearchBulkRetryWrapper bulkRetryWrapper;
private final OpenSearchBulkWrapper bulkRetryWrapper;

private final static JacksonJsonpMapper JACKSON_MAPPER = new JacksonJsonpMapper();

Expand All @@ -64,22 +62,15 @@ public class RestHighLevelClientWrapper implements IRestHighLevelClient {
*
* @param client the RestHighLevelClient instance to wrap
*/
public RestHighLevelClientWrapper(RestHighLevelClient client, BulkRequestRateLimiter rateLimiter, OpenSearchBulkRetryWrapper bulkRetryWrapper) {
public RestHighLevelClientWrapper(RestHighLevelClient client, OpenSearchBulkWrapper bulkRetryWrapper) {
this.client = client;
this.rateLimiter = rateLimiter;
this.bulkRetryWrapper = bulkRetryWrapper;
}

@Override
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
return execute(() -> {
try {
rateLimiter.acquirePermit();
return bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options);
} catch (InterruptedException e) {
throw new RuntimeException("rateLimiter.acquirePermit was interrupted.", e);
}
}, OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX);
return execute(() -> bulkRetryWrapper.bulk(client, bulkRequest, options),
OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public final class MetricConstants {
public static final String OPENSEARCH_BULK_RETRY_COUNT_METRIC = "opensearch.bulk.retry.count";
public static final String OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC = "opensearch.bulk.allRetryFailed.count";

/**
* Metric name for opensearch bulk request rate limit
*/
public static final String OS_BULK_RATE_LIMIT_METRIC = "opensearch.bulk.rateLimit.documentsPerSecond.count";

/**
* Metric name for counting the errors encountered with Amazon S3 operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,17 @@ public class FlintOptions implements Serializable {

private static final String UNKNOWN = "UNKNOWN";

public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE = "bulkRequestRateLimitPerNode";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE = "0";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "write.bulk.rate_limit_per_node.enabled";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "false";
public static final String BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "write.bulk.rate_limit_per_node.min";
public static final String DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "5000";
public static final String BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "write.bulk.rate_limit_per_node.max";
public static final String DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "50000";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "write.bulk.rate_limit_per_node.increase_step";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "500";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "write.bulk.rate_limit_per_node.decrease_ratio";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "0.8";

public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes";

public FlintOptions(Map<String, String> options) {
Expand Down Expand Up @@ -234,8 +243,24 @@ public boolean supportShard() {
DEFAULT_SUPPORT_SHARD);
}

public long getBulkRequestRateLimitPerNode() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE));
public boolean getBulkRequestRateLimitPerNodeEnabled() {
return Boolean.parseBoolean(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED));
}

public long getBulkRequestMinRateLimitPerNode() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE));
}

public long getBulkRequestMaxRateLimitPerNode() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE));
}

public long getBulkRequestRateLimitPerNodeIncreaseStep() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP));
}

public double getBulkRequestRateLimitPerNodeDecreaseRatio() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO));
}

public String getCustomAsyncQuerySchedulerClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,16 @@

package org.opensearch.flint.core.storage;

import dev.failsafe.RateLimiter;
import java.time.Duration;
import java.util.logging.Logger;
import org.opensearch.flint.core.FlintOptions;
public interface BulkRequestRateLimiter {
void acquirePermit();

public class BulkRequestRateLimiter {
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiter.class.getName());
private RateLimiter<Void> rateLimiter;
void acquirePermit(int permits);

public BulkRequestRateLimiter(FlintOptions flintOptions) {
long bulkRequestRateLimitPerNode = flintOptions.getBulkRequestRateLimitPerNode();
if (bulkRequestRateLimitPerNode > 0) {
LOG.info("Setting rate limit for bulk request to " + bulkRequestRateLimitPerNode + "/sec");
this.rateLimiter = RateLimiter.<Void>smoothBuilder(
flintOptions.getBulkRequestRateLimitPerNode(),
Duration.ofSeconds(1)).build();
} else {
LOG.info("Rate limit for bulk request was not set.");
}
}
void increaseRate();

// Wait so it won't exceed rate limit. Does nothing if rate limit is not set.
public void acquirePermit() throws InterruptedException {
if (rateLimiter != null) {
this.rateLimiter.acquirePermit();
}
}
void decreaseRate();

long getRate();

void setRate(long permitsPerSecond);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ private BulkRequestRateLimiterHolder() {}
public synchronized static BulkRequestRateLimiter getBulkRequestRateLimiter(
FlintOptions flintOptions) {
if (instance == null) {
instance = new BulkRequestRateLimiter(flintOptions);
if (flintOptions.getBulkRequestRateLimitPerNodeEnabled()) {
instance = new BulkRequestRateLimiterImpl(flintOptions);
} else {
instance = new BulkRequestRateLimiterNoop();
}
}
return instance;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import com.google.common.util.concurrent.RateLimiter;
import java.util.logging.Logger;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.metrics.MetricConstants;
import org.opensearch.flint.core.metrics.MetricsUtil;

public class BulkRequestRateLimiterImpl implements BulkRequestRateLimiter {
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiterImpl.class.getName());
private RateLimiter rateLimiter;

private final long minRate;
private final long maxRate;
private final long increaseStep;
private final double decreaseRatio;

public BulkRequestRateLimiterImpl(FlintOptions flintOptions) {
minRate = flintOptions.getBulkRequestMinRateLimitPerNode();
maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode();
increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep();
decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio();

LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec");
this.rateLimiter = RateLimiter.create(minRate);
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, minRate);
}

// Wait so it won't exceed rate limit. Does nothing if rate limit is not set.
@Override
public void acquirePermit() {
this.rateLimiter.acquire();
LOG.info("Acquired 1 permit");
}

@Override
public void acquirePermit(int permits) {
this.rateLimiter.acquire(permits);
LOG.info("Acquired " + permits + " permits");
}

/**
* Increase rate limit additively.
*/
@Override
public void increaseRate() {
setRate(getRate() + increaseStep);
}

/**
* Decrease rate limit multiplicatively.
*/
@Override
public void decreaseRate() {
setRate((long) (getRate() * decreaseRatio));
}

@Override
public long getRate() {
return (long) this.rateLimiter.getRate();
}

/**
* Set rate limit to the given value, clamped by minRate and maxRate. Non-positive maxRate means
* there's no maximum rate restriction, and the rate can be set to any value greater than
* minRate.
*/
@Override
public void setRate(long permitsPerSecond) {
if (maxRate > 0) {
permitsPerSecond = Math.min(permitsPerSecond, maxRate);
}
permitsPerSecond = Math.max(minRate, permitsPerSecond);
LOG.info("Setting rate limit for bulk request to " + permitsPerSecond + " documents/sec");
this.rateLimiter.setRate(permitsPerSecond);
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, permitsPerSecond);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import java.util.logging.Logger;

public class BulkRequestRateLimiterNoop implements BulkRequestRateLimiter {
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiterNoop.class.getName());

public BulkRequestRateLimiterNoop() {
LOG.info("Rate limit for bulk request was not set.");
}

@Override
public void acquirePermit() {}

@Override
public void acquirePermit(int permits) {}

@Override
public void increaseRate() {}

@Override
public void decreaseRate() {}

@Override
public long getRate() {
return 0;
}

@Override
public void setRate(long permitsPerSecond) {}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,36 @@
import org.opensearch.flint.core.metrics.MetricsUtil;
import org.opensearch.rest.RestStatus;

public class OpenSearchBulkRetryWrapper {
/**
* Wrapper class for OpenSearch bulk API with retry and rate limiting capability.
*/
public class OpenSearchBulkWrapper {

private static final Logger LOG = Logger.getLogger(OpenSearchBulkRetryWrapper.class.getName());
private static final Logger LOG = Logger.getLogger(OpenSearchBulkWrapper.class.getName());

private final RetryPolicy<BulkResponse> retryPolicy;
private final BulkRequestRateLimiter rateLimiter;

public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions) {
public OpenSearchBulkWrapper(FlintRetryOptions retryOptions, BulkRequestRateLimiter rateLimiter) {
this.retryPolicy = retryOptions.getBulkRetryPolicy(bulkItemRetryableResultPredicate);
this.rateLimiter = rateLimiter;
}

/**
* Delegate bulk request to the client, and retry the request if the response contains retryable
* failure. It won't retry when bulk call thrown exception.
* Bulk request with retry and rate limiting. Delegate bulk request to the client, and retry the
* request if the response contains retryable failure. It won't retry when bulk call thrown
* exception. In addition, adjust rate limit based on the responses.
* @param client used to call bulk API
* @param bulkRequest requests passed to bulk method
* @param options options passed to bulk method
* @return Last result
*/
public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest,
public BulkResponse bulk(RestHighLevelClient client, BulkRequest bulkRequest, RequestOptions options) {
rateLimiter.acquirePermit(bulkRequest.requests().size());
return bulkWithPartialRetry(client, bulkRequest, options);
}

private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest,
RequestOptions options) {
final AtomicInteger requestCount = new AtomicInteger(0);
try {
Expand All @@ -59,9 +70,14 @@ public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest
.get(() -> {
requestCount.incrementAndGet();
BulkResponse response = client.bulk(nextRequest.get(), options);
if (retryPolicy.getConfig().allowsRetries() && bulkItemRetryableResultPredicate.test(
response)) {
nextRequest.set(getRetryableRequest(nextRequest.get(), response));

if (!bulkItemRetryableResultPredicate.test(response)) {
rateLimiter.increaseRate();
} else {
rateLimiter.decreaseRate();
if (retryPolicy.getConfig().allowsRetries()) {
nextRequest.set(getRetryableRequest(nextRequest.get(), response));
}
}
return response;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public static RestHighLevelClient createRestHighLevelClient(FlintOptions options

public static IRestHighLevelClient createClient(FlintOptions options) {
return new RestHighLevelClientWrapper(createRestHighLevelClient(options),
BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options),
new OpenSearchBulkRetryWrapper(options.getRetryOptions()));
new OpenSearchBulkWrapper(options.getRetryOptions(),
BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options)));
}

/**
Expand Down
Loading

0 comments on commit f054022

Please sign in to comment.