Skip to content

Commit

Permalink
add new option
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jul 30, 2024
1 parent 784ba5f commit 9202db4
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
BulkProcessor esBulkProcessor;
private final RequestTracker requestTracker;
private int retryOnConflict;

public ElasticBulkProcessorAdapter(
RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) {
RequestTracker requestTracker,
ElasticRestHighLevelClientAdapter client,
int retryOnConflict) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
Expand All @@ -60,6 +63,7 @@ public ElasticBulkProcessorAdapter(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.esBulkProcessor = builder.build();
this.requestTracker = requestTracker;
this.retryOnConflict = retryOnConflict;
}

@Override
Expand All @@ -75,7 +79,10 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
@Override
public void addRow(String index, String key, String doc) throws InterruptedException {
UpdateRequest updateRequest;
updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
updateRequest =
new UpdateRequest(index, "_doc", key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(updateRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,19 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
} else {
this.requestTracker = new RequestTracker(config.getMaxTaskNum());
}

int retryOnConflict = config.getRetryOnConflict() == null ? 3 : config.getRetryOnConflict();
// ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever.
if (config.getConnector().equals("elasticsearch")) {
ElasticRestHighLevelClientAdapter client =
new ElasticRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client);
this.bulkProcessor =
new ElasticBulkProcessorAdapter(this.requestTracker, client, retryOnConflict);
} else if (config.getConnector().equals("opensearch")) {
OpensearchRestHighLevelClientAdapter client =
new OpensearchRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client);
this.bulkProcessor =
new OpensearchBulkProcessorAdapter(
this.requestTracker, client, retryOnConflict);
} else {
throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private final RequestTracker requestTracker;
BulkProcessor opensearchBulkProcessor;
private int retryOnConflict;

public OpensearchBulkProcessorAdapter(
RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) {
RequestTracker requestTracker,
OpensearchRestHighLevelClientAdapter client,
int retryOnConflict) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
Expand All @@ -60,6 +63,7 @@ public OpensearchBulkProcessorAdapter(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.opensearchBulkProcessor = builder.build();
this.requestTracker = requestTracker;
this.retryOnConflict = retryOnConflict;
}

@Override
Expand All @@ -75,7 +79,10 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
@Override
public void addRow(String index, String key, String doc) throws InterruptedException {
UpdateRequest updateRequest;
updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON);
updateRequest =
new UpdateRequest(index, key)
.doc(doc, XContentType.JSON)
.retryOnConflict(this.retryOnConflict);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(updateRequest);
Expand Down

0 comments on commit 9202db4

Please sign in to comment.