Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): add es retry_on_conflict and max_task_num (#17867) #18411

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import java.util.concurrent.TimeUnit;

public interface BulkProcessorAdapter {
public void addRow(String index, String key, String doc);
public void addRow(String index, String key, String doc) throws InterruptedException;

public void deleteRow(String index, String key);
public void deleteRow(String index, String key) throws InterruptedException;

public void flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
BulkProcessor esBulkProcessor;
private final RequestTracker requestTracker;
private int retryOnConflict;

public ElasticBulkProcessorAdapter(
RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) {
RequestTracker requestTracker,
ElasticRestHighLevelClientAdapter client,
EsSinkConfig config) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
Expand All @@ -45,21 +48,21 @@ public ElasticBulkProcessorAdapter(
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(1000);
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
// flush the bulk every `batchNumMessages` rows
builder.setBulkActions(config.getBatchNumMessages());
// flush the bulk every `batchSizeKb` Kb
builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
builder.setConcurrentRequests(1);
builder.setConcurrentRequests(config.getConcurrentRequests());
// Set a custom backoff policy which will initially wait for 100ms, increase exponentially
// and retries up to three times.
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.esBulkProcessor = builder.build();
this.requestTracker = requestTracker;
this.retryOnConflict = config.getRetryOnConflict();
}

@Override
Expand All @@ -73,16 +76,19 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(String index, String key, String doc) {
public void addRow(String index, String key, String doc) throws InterruptedException {
UpdateRequest updateRequest;
updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
updateRequest =
new UpdateRequest(index, "_doc", key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) {
public void deleteRow(String index, String key) throws InterruptedException {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, "_doc", key);
this.requestTracker.addWriteTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,24 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {

this.config = config;
this.requestTracker = new RequestTracker();

// ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever.
if (config.getConnector().equals("elasticsearch")) {
ElasticRestHighLevelClientAdapter client =
new ElasticRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client);
this.bulkProcessor =
new ElasticBulkProcessorAdapter(this.requestTracker, client, config);
} else if (config.getConnector().equals("opensearch")) {
OpensearchRestHighLevelClientAdapter client =
new OpensearchRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client);
this.bulkProcessor =
new OpensearchBulkProcessorAdapter(this.requestTracker, client, config);
} else {
throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
}

private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException {
private void writeRow(SinkRow row)
throws JsonMappingException, JsonProcessingException, InterruptedException {
final String key = (String) row.get(1);
String doc = (String) row.get(2);
final String index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ public class EsSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "index_column")
private String indexColumn;

@JsonProperty(value = "retry_on_conflict")
private Integer retryOnConflict;

@JsonProperty(value = "batch_num_messages")
private Integer batchNumMessages;

@JsonProperty(value = "batch_size_kb")
private Integer batchSizeKb;

@JsonProperty(value = "concurrent_requests")
private Integer concurrentRequests;

@JsonCreator
public EsSinkConfig(@JsonProperty(value = "url") String url) {
this.url = url;
Expand Down Expand Up @@ -94,4 +106,40 @@ public EsSinkConfig withIndexColumn(String indexColumn) {
this.indexColumn = indexColumn;
return this;
}

public Integer getBatchNumMessages() {
return this.batchNumMessages == null ? 1000 : this.batchNumMessages;
}

public EsSinkConfig withBatchNumMessages(Integer batchNumMessages) {
this.batchNumMessages = batchNumMessages;
return this;
}

public Integer getBatchSizeKb() {
return this.batchSizeKb == null ? 5 * 1024 : this.batchSizeKb;
}

public EsSinkConfig withBatchSizeKb(Integer batchSizeKb) {
this.batchSizeKb = batchSizeKb;
return this;
}

public Integer getRetryOnConflict() {
return this.retryOnConflict == null ? 3 : this.retryOnConflict;
}

public EsSinkConfig withRetryOnConflict(Integer retryOnConflict) {
this.retryOnConflict = retryOnConflict;
return this;
}

public Integer getConcurrentRequests() {
return this.concurrentRequests == null ? 1 : this.concurrentRequests;
}

public EsSinkConfig withConcurrentRequests(Integer concurrentRequests) {
this.concurrentRequests = concurrentRequests;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private final RequestTracker requestTracker;
BulkProcessor opensearchBulkProcessor;
private int retryOnConflict;

public OpensearchBulkProcessorAdapter(
RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) {
RequestTracker requestTracker,
OpensearchRestHighLevelClientAdapter client,
EsSinkConfig config) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
Expand All @@ -45,21 +48,21 @@ public OpensearchBulkProcessorAdapter(
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(1000);
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
// flush the bulk every `batchNumMessages` rows
builder.setBulkActions(config.getBatchNumMessages());
// flush the bulk every `batchSizeKb` Kb
builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
builder.setConcurrentRequests(1);
builder.setConcurrentRequests(config.getConcurrentRequests());
// Set a custom backoff policy which will initially wait for 100ms, increase exponentially
// and retries up to three times.
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.opensearchBulkProcessor = builder.build();
this.requestTracker = requestTracker;
this.retryOnConflict = config.getRetryOnConflict();
}

@Override
Expand All @@ -73,16 +76,19 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(String index, String key, String doc) {
public void addRow(String index, String key, String doc) throws InterruptedException {
UpdateRequest updateRequest;
updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON);
updateRequest =
new UpdateRequest(index, key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) {
public void deleteRow(String index, String key) throws InterruptedException {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, key);
this.requestTracker.addWriteTask();
Expand Down
Loading