Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive rate limiting for OpenSearch bulk requests #1011

Merged
merged 14 commits into from
Jan 13, 2025
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("org.apache.httpcomponents.client5", "httpclient5"),
"org.opensearch" % "opensearch-job-scheduler-spi" % opensearchMavenVersion,
"dev.failsafe" % "failsafe" % "3.3.2",
"com.google.guava" % "guava" % "33.3.1-jre",
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.client.transport.rest_client.RestClientTransport;

import java.io.IOException;
import org.opensearch.flint.core.storage.BulkRequestRateLimiter;
import org.opensearch.flint.core.storage.OpenSearchBulkRetryWrapper;

import static org.opensearch.flint.core.metrics.MetricConstants.OS_BULK_OP_METRIC_PREFIX;
Expand All @@ -54,7 +53,6 @@
*/
public class RestHighLevelClientWrapper implements IRestHighLevelClient {
private final RestHighLevelClient client;
private final BulkRequestRateLimiter rateLimiter;
private final OpenSearchBulkRetryWrapper bulkRetryWrapper;

private final static JacksonJsonpMapper JACKSON_MAPPER = new JacksonJsonpMapper();
Expand All @@ -64,22 +62,15 @@ public class RestHighLevelClientWrapper implements IRestHighLevelClient {
*
* @param client the RestHighLevelClient instance to wrap
*/
public RestHighLevelClientWrapper(RestHighLevelClient client, BulkRequestRateLimiter rateLimiter, OpenSearchBulkRetryWrapper bulkRetryWrapper) {
public RestHighLevelClientWrapper(RestHighLevelClient client, OpenSearchBulkRetryWrapper bulkRetryWrapper) {
this.client = client;
this.rateLimiter = rateLimiter;
this.bulkRetryWrapper = bulkRetryWrapper;
}

@Override
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
return execute(() -> {
try {
rateLimiter.acquirePermit();
return bulkRetryWrapper.bulkWithPartialRetry(client, bulkRequest, options);
} catch (InterruptedException e) {
throw new RuntimeException("rateLimiter.acquirePermit was interrupted.", e);
}
}, OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX);
return execute(() -> bulkRetryWrapper.bulk(client, bulkRequest, options),
OS_WRITE_OP_METRIC_PREFIX, OS_BULK_OP_METRIC_PREFIX);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ public final class MetricConstants {
public static final String OPENSEARCH_BULK_RETRY_COUNT_METRIC = "opensearch.bulk.retry.count";
public static final String OPENSEARCH_BULK_ALL_RETRY_FAILED_COUNT_METRIC = "opensearch.bulk.allRetryFailed.count";

/**
* Metric name for opensearch bulk request rate limit
*/
public static final String OS_BULK_RATE_LIMIT_METRIC = "opensearch.bulk.rateLimit";

/**
* Metric name for tracking the percentage of retryable errors in bulk responses
*/
public static final String OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC = "opensearch.bulk.retryableResult.percentage";

/**
* Metric name for counting the errors encountered with Amazon S3 operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,17 @@ public class FlintOptions implements Serializable {

private static final String UNKNOWN = "UNKNOWN";

public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE = "bulkRequestRateLimitPerNode";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE = "0";
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "bulk.rateLimitPerNode.enabled";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED = "false";
public static final String BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "bulk.rateLimitPerNode.min";
public static final String DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE = "10";
public static final String BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "bulk.rateLimitPerNode.max";
public static final String DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE = "100";
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "bulk.rateLimitPerNode.increaseStep";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP = "10";
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "bulk.rateLimitPerNode.decreaseRatio";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO = "0.8";

public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes";

public FlintOptions(Map<String, String> options) {
Expand Down Expand Up @@ -234,8 +243,24 @@ public boolean supportShard() {
DEFAULT_SUPPORT_SHARD);
}

public long getBulkRequestRateLimitPerNode() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE));
public boolean getBulkRequestRateLimitPerNodeEnabled() {
return Boolean.parseBoolean(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED));
}

public double getBulkRequestMinRateLimitPerNode() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE));
}

public double getBulkRequestMaxRateLimitPerNode() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_MAX_RATE_LIMIT_PER_NODE));
}

public double getBulkRequestRateLimitPerNodeIncreaseStep() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_INCREASE_STEP));
}

public double getBulkRequestRateLimitPerNodeDecreaseRatio() {
return Double.parseDouble(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE_DECREASE_RATIO));
}

public String getCustomAsyncQuerySchedulerClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,78 @@

package org.opensearch.flint.core.storage;

import dev.failsafe.RateLimiter;
import java.time.Duration;
import com.google.common.util.concurrent.RateLimiter;
import java.util.logging.Logger;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.metrics.MetricConstants;
import org.opensearch.flint.core.metrics.MetricsUtil;

public class BulkRequestRateLimiter {
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiter.class.getName());
private RateLimiter<Void> rateLimiter;
private RateLimiter rateLimiter;

private final double minRate;
private final double maxRate;
private final double increaseStep;
private final double decreaseRatio;

public BulkRequestRateLimiter(FlintOptions flintOptions) {
long bulkRequestRateLimitPerNode = flintOptions.getBulkRequestRateLimitPerNode();
if (bulkRequestRateLimitPerNode > 0) {
LOG.info("Setting rate limit for bulk request to " + bulkRequestRateLimitPerNode + "/sec");
this.rateLimiter = RateLimiter.<Void>smoothBuilder(
flintOptions.getBulkRequestRateLimitPerNode(),
Duration.ofSeconds(1)).build();
// TODO: instead of leaving rateLimiter as null, use a default no-op impl for BulkRequestRateLimiter

// TODO: validate values?
minRate = flintOptions.getBulkRequestMinRateLimitPerNode();
maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode();
increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep();
decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio();

if (flintOptions.getBulkRequestRateLimitPerNodeEnabled()) {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec");
this.rateLimiter = RateLimiter.create(minRate);
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, (long) minRate);
} else {
LOG.info("Rate limit for bulk request was not set.");
}
}

// Wait so it won't exceed rate limit. Does nothing if rate limit is not set.
public void acquirePermit() throws InterruptedException {
public void acquirePermit() {
if (rateLimiter != null) {
this.rateLimiter.acquire();
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
}
}

public void acquirePermit(int permits) {
if (rateLimiter != null) {
this.rateLimiter.acquire(permits);
}
}

public double getRate() {
if (rateLimiter != null) {
return this.rateLimiter.getRate();
}
return 0;
}

public void setRate(double permitsPerSecond) {
if (rateLimiter != null) {
permitsPerSecond = Math.max(minRate, Math.min(maxRate, permitsPerSecond));
LOG.info("Setting rate limit for bulk request to " + permitsPerSecond + " documents/sec");
this.rateLimiter.setRate(permitsPerSecond);
// TODO: now it's using long metric
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, (long) permitsPerSecond);
}
}

public void increaseRate() {
if (rateLimiter != null) {
setRate(getRate() + increaseStep);
}
}

public void decreaseRate() {
if (rateLimiter != null) {
this.rateLimiter.acquirePermit();
setRate(getRate() * decreaseRatio);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,43 @@
import org.opensearch.flint.core.metrics.MetricsUtil;
import org.opensearch.rest.RestStatus;

/**
* Wrapper class for OpenSearch bulk API with retry and rate limiting capability.
* TODO: remove Retry from name (also rename variables)
*/
public class OpenSearchBulkRetryWrapper {

private static final Logger LOG = Logger.getLogger(OpenSearchBulkRetryWrapper.class.getName());

private final RetryPolicy<BulkResponse> retryPolicy;
private final BulkRequestRateLimiter rateLimiter;

public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions) {
public OpenSearchBulkRetryWrapper(FlintRetryOptions retryOptions, BulkRequestRateLimiter rateLimiter) {
this.retryPolicy = retryOptions.getBulkRetryPolicy(bulkItemRetryableResultPredicate);
this.rateLimiter = rateLimiter;
}

// TODO: need test case using bulk with rate limiter

/**
* Delegate bulk request to the client, and retry the request if the response contains retryable
* failure. It won't retry when bulk call thrown exception.
* Bulk request with retry and rate limiting. Delegate bulk request to the client, and retry the
* request if the response contains retryable failure. It won't retry when bulk call thrown
* exception. In addition, adjust rate limit based on the responses.
* @param client used to call bulk API
* @param bulkRequest requests passed to bulk method
* @param options options passed to bulk method
* @return Last result
*/
public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest,
public BulkResponse bulk(RestHighLevelClient client, BulkRequest bulkRequest, RequestOptions options) {
rateLimiter.acquirePermit(bulkRequest.requests().size());
return bulkWithPartialRetry(client, bulkRequest, options);
}

private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest bulkRequest,
RequestOptions options) {
final AtomicInteger requestCount = new AtomicInteger(0);
// TODO: notice for metric each retry attempt counts, but rate limit doesn't restrict retries
// could appear weird in dashboards
try {
final AtomicReference<BulkRequest> nextRequest = new AtomicReference<>(bulkRequest);
BulkResponse res = Failsafe
Expand All @@ -59,9 +75,27 @@ public BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkRequest
.get(() -> {
requestCount.incrementAndGet();
BulkResponse response = client.bulk(nextRequest.get(), options);
if (retryPolicy.getConfig().allowsRetries() && bulkItemRetryableResultPredicate.test(
response)) {
nextRequest.set(getRetryableRequest(nextRequest.get(), response));

// decrease rate if retryable result exceeds threshold; otherwise increase rate
if (!bulkItemRetryableResultPredicate.test(response)) {
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC, 0);
rateLimiter.increaseRate();
} else {
BulkRequest retryableRequest = getRetryableRequest(nextRequest.get(), response);
double retryablePercentage = (double) retryableRequest.requests().size() / response.getItems().length;
// TODO: long type metric
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RETRYABLE_RESULT_PERCENTAGE_METRIC, (long) (retryablePercentage * 100));

// TODO: magic number
if (retryablePercentage > 0.4) {
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
rateLimiter.decreaseRate();
} else {
rateLimiter.increaseRate();
}

if (retryPolicy.getConfig().allowsRetries()) {
nextRequest.set(retryableRequest);
}
}
return response;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public static RestHighLevelClient createRestHighLevelClient(FlintOptions options

public static IRestHighLevelClient createClient(FlintOptions options) {
return new RestHighLevelClientWrapper(createRestHighLevelClient(options),
BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options),
new OpenSearchBulkRetryWrapper(options.getRetryOptions()));
new OpenSearchBulkRetryWrapper(options.getRetryOptions(),
BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,54 @@
package org.opensearch.flint.core.storage;


import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.opensearch.flint.core.FlintOptions;

/**
* These tests are largely dependent on the choice of the underlying rate limiter. While conceptually
* they all distribute permits at some rate, the actual behavior varies based on implementation.
* To avoid flakiness and creating test cases for specific implementation, we measure the time required
* for acquiring several permits, and set lenient thresholds.
*/
class BulkRequestRateLimiterTest {
FlintOptions flintOptionsWithRateLimit = new FlintOptions(ImmutableMap.of(FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE, "1"));
FlintOptions flintOptionsWithoutRateLimit = new FlintOptions(ImmutableMap.of(FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE, "0"));

@Test
void acquirePermitWithRateConfig() throws Exception {
BulkRequestRateLimiter limiter = new BulkRequestRateLimiter(flintOptionsWithRateLimit);
FlintOptions options = new FlintOptions(ImmutableMap.of(
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "true",
FlintOptions.BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE, "1"));
BulkRequestRateLimiter limiter = new BulkRequestRateLimiter(options);

assertTrue(timer(() -> {
limiter.acquirePermit();
limiter.acquirePermit();
}) >= 1000);
limiter.acquirePermit();
limiter.acquirePermit();
limiter.acquirePermit();
limiter.acquirePermit();
}) >= 4500);
assertTrue(timer(() -> {
limiter.acquirePermit(5);
limiter.acquirePermit();
}) >= 4500);
}

@Test
void acquirePermitWithoutRateConfig() throws Exception {
BulkRequestRateLimiter limiter = new BulkRequestRateLimiter(flintOptionsWithoutRateLimit);
FlintOptions options = new FlintOptions(ImmutableMap.of(
FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED, "false"));
BulkRequestRateLimiter limiter = new BulkRequestRateLimiter(options);

assertTrue(timer(() -> {
limiter.acquirePermit();
limiter.acquirePermit();
limiter.acquirePermit();
limiter.acquirePermit();
limiter.acquirePermit();
limiter.acquirePermit();
}) < 100);
}

Expand Down
Loading
Loading