Skip to content

Commit

Permalink
feat(sink): add es retry_on_conflict and max_task_num (#17867)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored and xxhZs committed Sep 5, 2024
1 parent c512972 commit 473b7d4
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import java.util.concurrent.TimeUnit;

public interface BulkProcessorAdapter {
public void addRow(String index, String key, String doc);
public void addRow(String index, String key, String doc) throws InterruptedException;

public void deleteRow(String index, String key);
public void deleteRow(String index, String key) throws InterruptedException;

public void flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
BulkProcessor esBulkProcessor;
private final RequestTracker requestTracker;
private int retryOnConflict;

public ElasticBulkProcessorAdapter(
RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) {
RequestTracker requestTracker,
ElasticRestHighLevelClientAdapter client,
EsSinkConfig config) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
Expand All @@ -45,21 +48,21 @@ public ElasticBulkProcessorAdapter(
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(1000);
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
// flush the bulk every `batchNumMessages` rows
builder.setBulkActions(config.getBatchNumMessages());
// flush the bulk every `batchSizeKb` Kb
builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
builder.setConcurrentRequests(1);
builder.setConcurrentRequests(config.getConcurrentRequests());
// Set a custom backoff policy which will initially wait for 100ms, increase exponentially
// and retries up to three times.
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.esBulkProcessor = builder.build();
this.requestTracker = requestTracker;
this.retryOnConflict = config.getRetryOnConflict();
}

@Override
Expand All @@ -73,16 +76,19 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(String index, String key, String doc) {
public void addRow(String index, String key, String doc) throws InterruptedException {
UpdateRequest updateRequest;
updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
updateRequest =
new UpdateRequest(index, "_doc", key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) {
public void deleteRow(String index, String key) throws InterruptedException {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, "_doc", key);
this.requestTracker.addWriteTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,24 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {

this.config = config;
this.requestTracker = new RequestTracker();

// ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever.
if (config.getConnector().equals("elasticsearch")) {
ElasticRestHighLevelClientAdapter client =
new ElasticRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client);
this.bulkProcessor =
new ElasticBulkProcessorAdapter(this.requestTracker, client, config);
} else if (config.getConnector().equals("opensearch")) {
OpensearchRestHighLevelClientAdapter client =
new OpensearchRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client);
this.bulkProcessor =
new OpensearchBulkProcessorAdapter(this.requestTracker, client, config);
} else {
throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
}

private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException {
private void writeRow(SinkRow row)
throws JsonMappingException, JsonProcessingException, InterruptedException {
final String key = (String) row.get(1);
String doc = (String) row.get(2);
final String index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ public class EsSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "index_column")
private String indexColumn;

@JsonProperty(value = "retry_on_conflict")
private Integer retryOnConflict;

@JsonProperty(value = "batch_num_messages")
private Integer batchNumMessages;

@JsonProperty(value = "batch_size_kb")
private Integer batchSizeKb;

@JsonProperty(value = "concurrent_requests")
private Integer concurrentRequests;

@JsonCreator
public EsSinkConfig(@JsonProperty(value = "url") String url) {
this.url = url;
Expand Down Expand Up @@ -94,4 +106,40 @@ public EsSinkConfig withIndexColumn(String indexColumn) {
this.indexColumn = indexColumn;
return this;
}

public Integer getBatchNumMessages() {
return this.batchNumMessages == null ? 1000 : this.batchNumMessages;
}

public EsSinkConfig withBatchNumMessages(Integer batchNumMessages) {
this.batchNumMessages = batchNumMessages;
return this;
}

public Integer getBatchSizeKb() {
return this.batchSizeKb == null ? 5 * 1024 : this.batchSizeKb;
}

public EsSinkConfig withBatchSizeKb(Integer batchSizeKb) {
this.batchSizeKb = batchSizeKb;
return this;
}

public Integer getRetryOnConflict() {
return this.retryOnConflict == null ? 3 : this.retryOnConflict;
}

public EsSinkConfig withRetryOnConflict(Integer retryOnConflict) {
this.retryOnConflict = retryOnConflict;
return this;
}

public Integer getConcurrentRequests() {
return this.concurrentRequests == null ? 1 : this.concurrentRequests;
}

public EsSinkConfig withConcurrentRequests(Integer concurrentRequests) {
this.concurrentRequests = concurrentRequests;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private final RequestTracker requestTracker;
BulkProcessor opensearchBulkProcessor;
private int retryOnConflict;

public OpensearchBulkProcessorAdapter(
RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) {
RequestTracker requestTracker,
OpensearchRestHighLevelClientAdapter client,
EsSinkConfig config) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
Expand All @@ -45,21 +48,21 @@ public OpensearchBulkProcessorAdapter(
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(1000);
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
// flush the bulk every `batchNumMessages` rows
builder.setBulkActions(config.getBatchNumMessages());
// flush the bulk every `batchSizeKb` Kb
builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
builder.setConcurrentRequests(1);
builder.setConcurrentRequests(config.getConcurrentRequests());
// Set a custom backoff policy which will initially wait for 100ms, increase exponentially
// and retries up to three times.
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.opensearchBulkProcessor = builder.build();
this.requestTracker = requestTracker;
this.retryOnConflict = config.getRetryOnConflict();
}

@Override
Expand All @@ -73,16 +76,19 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(String index, String key, String doc) {
public void addRow(String index, String key, String doc) throws InterruptedException {
UpdateRequest updateRequest;
updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON);
updateRequest =
new UpdateRequest(index, key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) {
public void deleteRow(String index, String key) throws InterruptedException {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, key);
this.requestTracker.addWriteTask();
Expand Down

0 comments on commit 473b7d4

Please sign in to comment.