diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java index d72ebe283395..11fe576d76eb 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java @@ -19,9 +19,9 @@ import java.util.concurrent.TimeUnit; public interface BulkProcessorAdapter { - public void addRow(String index, String key, String doc); + public void addRow(String index, String key, String doc) throws InterruptedException; - public void deleteRow(String index, String key); + public void deleteRow(String index, String key) throws InterruptedException; public void flush(); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java index de6ab3414f65..77caae8bff8e 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -34,9 +34,12 @@ public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); BulkProcessor esBulkProcessor; private final RequestTracker requestTracker; + private int retryOnConflict; public ElasticBulkProcessorAdapter( - RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) { + RequestTracker requestTracker, + ElasticRestHighLevelClientAdapter client, + EsSinkConfig config) { BulkProcessor.Builder builder = BulkProcessor.builder( (bulkRequest, bulkResponseActionListener) -> @@ -45,21 +48,21 @@ public ElasticBulkProcessorAdapter( RequestOptions.DEFAULT, bulkResponseActionListener), new BulkListener(requestTracker)); - // Possible feature: move these to config - // execute the bulk every 10 000 requests - builder.setBulkActions(1000); - // flush the bulk every 5mb - builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); + // flush the bulk every `batchNumMessages` rows + builder.setBulkActions(config.getBatchNumMessages()); + // flush the bulk every `batchSizeKb` Kb + builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB)); // flush the bulk every 5 seconds whatever the number of requests builder.setFlushInterval(TimeValue.timeValueSeconds(5)); // Set the number of concurrent requests - builder.setConcurrentRequests(1); + builder.setConcurrentRequests(config.getConcurrentRequests()); // Set a custom backoff policy which will initially wait for 100ms, increase exponentially // and retries up to three times. builder.setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); this.esBulkProcessor = builder.build(); this.requestTracker = requestTracker; + this.retryOnConflict = config.getRetryOnConflict(); } @Override @@ -73,16 +76,19 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException } @Override - public void addRow(String index, String key, String doc) { + public void addRow(String index, String key, String doc) throws InterruptedException { UpdateRequest updateRequest; - updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON); + updateRequest = + new UpdateRequest(index, "_doc", key) + .doc(doc, XContentType.JSON) + .retryOnConflict(this.retryOnConflict); updateRequest.docAsUpsert(true); this.requestTracker.addWriteTask(); this.esBulkProcessor.add(updateRequest); } @Override - public void deleteRow(String index, String key) { + public void deleteRow(String index, String key) throws InterruptedException { DeleteRequest deleteRequest; deleteRequest = new DeleteRequest(index, "_doc", key); this.requestTracker.addWriteTask(); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 315fc800a2ef..985c27639926 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -147,22 +147,24 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { this.config = config; this.requestTracker = new RequestTracker(); - // ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever. if (config.getConnector().equals("elasticsearch")) { ElasticRestHighLevelClientAdapter client = new ElasticRestHighLevelClientAdapter(host, config); - this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client); + this.bulkProcessor = + new ElasticBulkProcessorAdapter(this.requestTracker, client, config); } else if (config.getConnector().equals("opensearch")) { OpensearchRestHighLevelClientAdapter client = new OpensearchRestHighLevelClientAdapter(host, config); - this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client); + this.bulkProcessor = + new OpensearchBulkProcessorAdapter(this.requestTracker, client, config); } else { throw new RuntimeException("Sink type must be elasticsearch or opensearch"); } } - private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException { + private void writeRow(SinkRow row) + throws JsonMappingException, JsonProcessingException, InterruptedException { final String key = (String) row.get(1); String doc = (String) row.get(2); final String index; diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java index 4ee49efca10a..36dfed37dbfd 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java @@ -41,6 +41,18 @@ public class EsSinkConfig extends CommonSinkConfig { @JsonProperty(value = "index_column") private String indexColumn; + @JsonProperty(value = "retry_on_conflict") + private Integer retryOnConflict; + + @JsonProperty(value = "batch_num_messages") + private Integer batchNumMessages; + + @JsonProperty(value = "batch_size_kb") + private Integer batchSizeKb; + + @JsonProperty(value = "concurrent_requests") + private Integer concurrentRequests; + @JsonCreator public EsSinkConfig(@JsonProperty(value = "url") String url) { this.url = url; @@ -94,4 +106,40 @@ public EsSinkConfig withIndexColumn(String indexColumn) { this.indexColumn = indexColumn; return this; } + + public Integer getBatchNumMessages() { + return this.batchNumMessages == null ? 1000 : this.batchNumMessages; + } + + public EsSinkConfig withBatchNumMessages(Integer batchNumMessages) { + this.batchNumMessages = batchNumMessages; + return this; + } + + public Integer getBatchSizeKb() { + return this.batchSizeKb == null ? 5 * 1024 : this.batchSizeKb; + } + + public EsSinkConfig withBatchSizeKb(Integer batchSizeKb) { + this.batchSizeKb = batchSizeKb; + return this; + } + + public Integer getRetryOnConflict() { + return this.retryOnConflict == null ? 3 : this.retryOnConflict; + } + + public EsSinkConfig withRetryOnConflict(Integer retryOnConflict) { + this.retryOnConflict = retryOnConflict; + return this; + } + + public Integer getConcurrentRequests() { + return this.concurrentRequests == null ? 1 : this.concurrentRequests; + } + + public EsSinkConfig withConcurrentRequests(Integer concurrentRequests) { + this.concurrentRequests = concurrentRequests; + return this; + } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java index d5d8cdc3d237..40375b932460 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java @@ -34,9 +34,12 @@ public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); private final RequestTracker requestTracker; BulkProcessor opensearchBulkProcessor; + private int retryOnConflict; public OpensearchBulkProcessorAdapter( - RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) { + RequestTracker requestTracker, + OpensearchRestHighLevelClientAdapter client, + EsSinkConfig config) { BulkProcessor.Builder builder = BulkProcessor.builder( (bulkRequest, bulkResponseActionListener) -> @@ -45,21 +48,21 @@ public OpensearchBulkProcessorAdapter( RequestOptions.DEFAULT, bulkResponseActionListener), new BulkListener(requestTracker)); - // Possible feature: move these to config - // execute the bulk every 10 000 requests - builder.setBulkActions(1000); - // flush the bulk every 5mb - builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); + // flush the bulk every `batchNumMessages` rows + builder.setBulkActions(config.getBatchNumMessages()); + // flush the bulk every `batchSizeKb` Kb + builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB)); // flush the bulk every 5 seconds whatever the number of requests builder.setFlushInterval(TimeValue.timeValueSeconds(5)); // Set the number of concurrent requests - builder.setConcurrentRequests(1); + builder.setConcurrentRequests(config.getConcurrentRequests()); // Set a custom backoff policy which will initially wait for 100ms, increase exponentially // and retries up to three times. builder.setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); this.opensearchBulkProcessor = builder.build(); this.requestTracker = requestTracker; + this.retryOnConflict = config.getRetryOnConflict(); } @Override @@ -73,16 +76,19 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException } @Override - public void addRow(String index, String key, String doc) { + public void addRow(String index, String key, String doc) throws InterruptedException { UpdateRequest updateRequest; - updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON); + updateRequest = + new UpdateRequest(index, key) + .doc(doc, XContentType.JSON) + .retryOnConflict(this.retryOnConflict); updateRequest.docAsUpsert(true); this.requestTracker.addWriteTask(); this.opensearchBulkProcessor.add(updateRequest); } @Override - public void deleteRow(String index, String key) { + public void deleteRow(String index, String key) throws InterruptedException { DeleteRequest deleteRequest; deleteRequest = new DeleteRequest(index, key); this.requestTracker.addWriteTask();