diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java index a58a0ed1424ab..77caae8bff8e8 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -34,9 +34,12 @@ public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); BulkProcessor esBulkProcessor; private final RequestTracker requestTracker; + private int retryOnConflict; public ElasticBulkProcessorAdapter( - RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) { + RequestTracker requestTracker, + ElasticRestHighLevelClientAdapter client, + EsSinkConfig config) { BulkProcessor.Builder builder = BulkProcessor.builder( (bulkRequest, bulkResponseActionListener) -> @@ -45,21 +48,21 @@ public ElasticBulkProcessorAdapter( RequestOptions.DEFAULT, bulkResponseActionListener), new BulkListener(requestTracker)); - // Possible feature: move these to config - // execute the bulk every 10 000 requests - builder.setBulkActions(1000); - // flush the bulk every 5mb - builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); + // flush the bulk every `batchNumMessages` rows + builder.setBulkActions(config.getBatchNumMessages()); + // flush the bulk every `batchSizeKb` Kb + builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB)); // flush the bulk every 5 seconds whatever the number of requests builder.setFlushInterval(TimeValue.timeValueSeconds(5)); // Set the number of concurrent requests - builder.setConcurrentRequests(1); + builder.setConcurrentRequests(config.getConcurrentRequests()); // Set a custom backoff policy which will initially wait for 100ms, increase exponentially // and retries up to three times. builder.setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); this.esBulkProcessor = builder.build(); this.requestTracker = requestTracker; + this.retryOnConflict = config.getRetryOnConflict(); } @Override @@ -75,7 +78,10 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException @Override public void addRow(String index, String key, String doc) throws InterruptedException { UpdateRequest updateRequest; - updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON); + updateRequest = + new UpdateRequest(index, "_doc", key) + .doc(doc, XContentType.JSON) + .retryOnConflict(this.retryOnConflict); updateRequest.docAsUpsert(true); this.requestTracker.addWriteTask(); this.esBulkProcessor.add(updateRequest); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 90d9e799bb12c..985c27639926d 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -60,12 +60,6 @@ class RequestTracker { // Count of write tasks in progress private int taskCount = 0; - private Integer maxTaskNum; - - public RequestTracker(Integer maxTaskNum) { - this.maxTaskNum = maxTaskNum; - } - void addErrResult(String errorMsg) { blockingQueue.add(new EsWriteResultResp(errorMsg)); } @@ -74,17 +68,12 @@ void addOkResult(int numberOfActions) { blockingQueue.add(new EsWriteResultResp(numberOfActions)); } - void addWriteTask() throws InterruptedException { + void addWriteTask() { taskCount++; EsWriteResultResp esWriteResultResp; while (true) { - if ((esWriteResultResp = this.blockingQueue.poll(10, TimeUnit.MILLISECONDS)) - != null) { + if ((esWriteResultResp = this.blockingQueue.poll()) != null) { checkEsWriteResultResp(esWriteResultResp); - } - - if (taskCount >= maxTaskNum) { - continue; } else { return; } @@ -157,21 +146,18 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { } this.config = config; - if (config.getMaxTaskNum() == null) { - this.requestTracker = new RequestTracker(Integer.MAX_VALUE); - } else { - this.requestTracker = new RequestTracker(config.getMaxTaskNum()); - } - + this.requestTracker = new RequestTracker(); // ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever. if (config.getConnector().equals("elasticsearch")) { ElasticRestHighLevelClientAdapter client = new ElasticRestHighLevelClientAdapter(host, config); - this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client); + this.bulkProcessor = + new ElasticBulkProcessorAdapter(this.requestTracker, client, config); } else if (config.getConnector().equals("opensearch")) { OpensearchRestHighLevelClientAdapter client = new OpensearchRestHighLevelClientAdapter(host, config); - this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client); + this.bulkProcessor = + new OpensearchBulkProcessorAdapter(this.requestTracker, client, config); } else { throw new RuntimeException("Sink type must be elasticsearch or opensearch"); } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java index c1b42342a2810..36dfed37dbfd2 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java @@ -41,8 +41,17 @@ public class EsSinkConfig extends CommonSinkConfig { @JsonProperty(value = "index_column") private String indexColumn; - @JsonProperty(value = "max_task_num") - private Integer maxTaskNum; + @JsonProperty(value = "retry_on_conflict") + private Integer retryOnConflict; + + @JsonProperty(value = "batch_num_messages") + private Integer batchNumMessages; + + @JsonProperty(value = "batch_size_kb") + private Integer batchSizeKb; + + @JsonProperty(value = "concurrent_requests") + private Integer concurrentRequests; @JsonCreator public EsSinkConfig(@JsonProperty(value = "url") String url) { @@ -98,12 +107,39 @@ public EsSinkConfig withIndexColumn(String indexColumn) { return this; } - public Integer getMaxTaskNum() { - return maxTaskNum; + public Integer getBatchNumMessages() { + return this.batchNumMessages == null ? 1000 : this.batchNumMessages; + } + + public EsSinkConfig withBatchNumMessages(Integer batchNumMessages) { + this.batchNumMessages = batchNumMessages; + return this; + } + + public Integer getBatchSizeKb() { + return this.batchSizeKb == null ? 5 * 1024 : this.batchSizeKb; + } + + public EsSinkConfig withBatchSizeKb(Integer batchSizeKb) { + this.batchSizeKb = batchSizeKb; + return this; + } + + public Integer getRetryOnConflict() { + return this.retryOnConflict == null ? 3 : this.retryOnConflict; + } + + public EsSinkConfig withRetryOnConflict(Integer retryOnConflict) { + this.retryOnConflict = retryOnConflict; + return this; + } + + public Integer getConcurrentRequests() { + return this.concurrentRequests == null ? 1 : this.concurrentRequests; } - public EsSinkConfig withMaxTaskNum(Integer maxTaskNum) { - this.maxTaskNum = maxTaskNum; + public EsSinkConfig withConcurrentRequests(Integer concurrentRequests) { + this.concurrentRequests = concurrentRequests; return this; } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java index 93d48899beddf..40375b9324601 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java @@ -34,9 +34,12 @@ public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); private final RequestTracker requestTracker; BulkProcessor opensearchBulkProcessor; + private int retryOnConflict; public OpensearchBulkProcessorAdapter( - RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) { + RequestTracker requestTracker, + OpensearchRestHighLevelClientAdapter client, + EsSinkConfig config) { BulkProcessor.Builder builder = BulkProcessor.builder( (bulkRequest, bulkResponseActionListener) -> @@ -45,21 +48,21 @@ public OpensearchBulkProcessorAdapter( RequestOptions.DEFAULT, bulkResponseActionListener), new BulkListener(requestTracker)); - // Possible feature: move these to config - // execute the bulk every 10 000 requests - builder.setBulkActions(1000); - // flush the bulk every 5mb - builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); + // flush the bulk every `batchNumMessages` rows + builder.setBulkActions(config.getBatchNumMessages()); + // flush the bulk every `batchSizeKb` Kb + builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB)); // flush the bulk every 5 seconds whatever the number of requests builder.setFlushInterval(TimeValue.timeValueSeconds(5)); // Set the number of concurrent requests - builder.setConcurrentRequests(1); + builder.setConcurrentRequests(config.getConcurrentRequests()); // Set a custom backoff policy which will initially wait for 100ms, increase exponentially // and retries up to three times. builder.setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); this.opensearchBulkProcessor = builder.build(); this.requestTracker = requestTracker; + this.retryOnConflict = config.getRetryOnConflict(); } @Override @@ -75,7 +78,10 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException @Override public void addRow(String index, String key, String doc) throws InterruptedException { UpdateRequest updateRequest; - updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON); + updateRequest = + new UpdateRequest(index, key) + .doc(doc, XContentType.JSON) + .retryOnConflict(this.retryOnConflict); updateRequest.docAsUpsert(true); this.requestTracker.addWriteTask(); this.opensearchBulkProcessor.add(updateRequest);