Skip to content

Commit

Permalink
cherry-pick: cherry-pick 17867 to wangzheng/release-1.10-es-fix (#18341)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Aug 30, 2024
1 parent 9081ebb commit e9c22f1
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
BulkProcessor esBulkProcessor;
private final RequestTracker requestTracker;
private int retryOnConflict;

public ElasticBulkProcessorAdapter(
RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) {
RequestTracker requestTracker,
ElasticRestHighLevelClientAdapter client,
EsSinkConfig config) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
Expand All @@ -45,21 +48,21 @@ public ElasticBulkProcessorAdapter(
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(1000);
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
// flush the bulk every `batchNumMessages` rows
builder.setBulkActions(config.getBatchNumMessages());
// flush the bulk every `batchSizeKb` Kb
builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
builder.setConcurrentRequests(1);
builder.setConcurrentRequests(config.getConcurrentRequests());
// Set a custom backoff policy which will initially wait for 100ms, increase exponentially
// and retries up to three times.
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.esBulkProcessor = builder.build();
this.requestTracker = requestTracker;
this.retryOnConflict = config.getRetryOnConflict();
}

@Override
Expand All @@ -75,7 +78,10 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
@Override
public void addRow(String index, String key, String doc) throws InterruptedException {
UpdateRequest updateRequest;
updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
updateRequest =
new UpdateRequest(index, "_doc", key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(updateRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ class RequestTracker {
// Count of write tasks in progress
private int taskCount = 0;

private Integer maxTaskNum;

public RequestTracker(Integer maxTaskNum) {
this.maxTaskNum = maxTaskNum;
}

void addErrResult(String errorMsg) {
blockingQueue.add(new EsWriteResultResp(errorMsg));
}
Expand All @@ -74,17 +68,12 @@ void addOkResult(int numberOfActions) {
blockingQueue.add(new EsWriteResultResp(numberOfActions));
}

void addWriteTask() throws InterruptedException {
void addWriteTask() {
taskCount++;
EsWriteResultResp esWriteResultResp;
while (true) {
if ((esWriteResultResp = this.blockingQueue.poll(10, TimeUnit.MILLISECONDS))
!= null) {
if ((esWriteResultResp = this.blockingQueue.poll()) != null) {
checkEsWriteResultResp(esWriteResultResp);
}

if (taskCount >= maxTaskNum) {
continue;
} else {
return;
}
Expand Down Expand Up @@ -157,21 +146,18 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
}

this.config = config;
if (config.getMaxTaskNum() == null) {
this.requestTracker = new RequestTracker(Integer.MAX_VALUE);
} else {
this.requestTracker = new RequestTracker(config.getMaxTaskNum());
}

this.requestTracker = new RequestTracker();
// ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever.
if (config.getConnector().equals("elasticsearch")) {
ElasticRestHighLevelClientAdapter client =
new ElasticRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client);
this.bulkProcessor =
new ElasticBulkProcessorAdapter(this.requestTracker, client, config);
} else if (config.getConnector().equals("opensearch")) {
OpensearchRestHighLevelClientAdapter client =
new OpensearchRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client);
this.bulkProcessor =
new OpensearchBulkProcessorAdapter(this.requestTracker, client, config);
} else {
throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,17 @@ public class EsSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "index_column")
private String indexColumn;

@JsonProperty(value = "max_task_num")
private Integer maxTaskNum;
@JsonProperty(value = "retry_on_conflict")
private Integer retryOnConflict;

@JsonProperty(value = "batch_num_messages")
private Integer batchNumMessages;

@JsonProperty(value = "batch_size_kb")
private Integer batchSizeKb;

@JsonProperty(value = "concurrent_requests")
private Integer concurrentRequests;

@JsonCreator
public EsSinkConfig(@JsonProperty(value = "url") String url) {
Expand Down Expand Up @@ -98,12 +107,39 @@ public EsSinkConfig withIndexColumn(String indexColumn) {
return this;
}

public Integer getMaxTaskNum() {
return maxTaskNum;
public Integer getBatchNumMessages() {
return this.batchNumMessages == null ? 1000 : this.batchNumMessages;
}

public EsSinkConfig withBatchNumMessages(Integer batchNumMessages) {
this.batchNumMessages = batchNumMessages;
return this;
}

public Integer getBatchSizeKb() {
return this.batchSizeKb == null ? 5 * 1024 : this.batchSizeKb;
}

public EsSinkConfig withBatchSizeKb(Integer batchSizeKb) {
this.batchSizeKb = batchSizeKb;
return this;
}

public Integer getRetryOnConflict() {
return this.retryOnConflict == null ? 3 : this.retryOnConflict;
}

public EsSinkConfig withRetryOnConflict(Integer retryOnConflict) {
this.retryOnConflict = retryOnConflict;
return this;
}

public Integer getConcurrentRequests() {
return this.concurrentRequests == null ? 1 : this.concurrentRequests;
}

public EsSinkConfig withMaxTaskNum(Integer maxTaskNum) {
this.maxTaskNum = maxTaskNum;
public EsSinkConfig withConcurrentRequests(Integer concurrentRequests) {
this.concurrentRequests = concurrentRequests;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private final RequestTracker requestTracker;
BulkProcessor opensearchBulkProcessor;
private int retryOnConflict;

public OpensearchBulkProcessorAdapter(
RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) {
RequestTracker requestTracker,
OpensearchRestHighLevelClientAdapter client,
EsSinkConfig config) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
Expand All @@ -45,21 +48,21 @@ public OpensearchBulkProcessorAdapter(
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(1000);
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
// flush the bulk every `batchNumMessages` rows
builder.setBulkActions(config.getBatchNumMessages());
// flush the bulk every `batchSizeKb` Kb
builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
builder.setConcurrentRequests(1);
builder.setConcurrentRequests(config.getConcurrentRequests());
// Set a custom backoff policy which will initially wait for 100ms, increase exponentially
// and retries up to three times.
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.opensearchBulkProcessor = builder.build();
this.requestTracker = requestTracker;
this.retryOnConflict = config.getRetryOnConflict();
}

@Override
Expand All @@ -75,7 +78,10 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
@Override
public void addRow(String index, String key, String doc) throws InterruptedException {
UpdateRequest updateRequest;
updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON);
updateRequest =
new UpdateRequest(index, key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(updateRequest);
Expand Down

0 comments on commit e9c22f1

Please sign in to comment.