diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java index 82d338e23903..77caae8bff8e 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -48,11 +48,10 @@ public ElasticBulkProcessorAdapter( RequestOptions.DEFAULT, bulkResponseActionListener), new BulkListener(requestTracker)); - // Possible feature: move these to config - // execute the bulk every 10 000 requests - builder.setBulkActions(config.getBulkActions()); - // flush the bulk every 5mb - builder.setBulkSize(new ByteSizeValue(config.getBulkSize(), ByteSizeUnit.KB)); + // flush the bulk every `batchNumMessages` rows + builder.setBulkActions(config.getBatchNumMessages()); + // flush the bulk every `batchSizeKb` Kb + builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB)); // flush the bulk every 5 seconds whatever the number of requests builder.setFlushInterval(TimeValue.timeValueSeconds(5)); // Set the number of concurrent requests diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java index c222e7eee33c..36dfed37dbfd 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java @@ -44,11 +44,11 @@ public class EsSinkConfig extends CommonSinkConfig { @JsonProperty(value = "retry_on_conflict") private Integer retryOnConflict; - @JsonProperty(value = "bulk_actions") - private Integer bulkActions; + @JsonProperty(value = "batch_num_messages") + private Integer batchNumMessages; - @JsonProperty(value = "bulk_size") - private Integer bulkSize; + @JsonProperty(value = "batch_size_kb") + private Integer batchSizeKb; @JsonProperty(value = "concurrent_requests") private Integer concurrentRequests; @@ -107,21 +107,21 @@ public EsSinkConfig withIndexColumn(String indexColumn) { return this; } - public Integer getBulkActions() { - return this.bulkActions == null ? 1000 : this.bulkActions; + public Integer getBatchNumMessages() { + return this.batchNumMessages == null ? 1000 : this.batchNumMessages; } - public EsSinkConfig withBulkActions(Integer bulkActions) { - this.bulkActions = bulkActions; + public EsSinkConfig withBatchNumMessages(Integer batchNumMessages) { + this.batchNumMessages = batchNumMessages; return this; } - public Integer getBulkSize() { - return this.bulkSize == null ? 5 * 1024 : this.bulkSize; + public Integer getBatchSizeKb() { + return this.batchSizeKb == null ? 5 * 1024 : this.batchSizeKb; } - public EsSinkConfig withBulkSize(Integer bulkSize) { - this.bulkSize = bulkSize; + public EsSinkConfig withBatchSizeKb(Integer batchSizeKb) { + this.batchSizeKb = batchSizeKb; return this; } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java index 2d60ebe73207..40375b932460 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java @@ -48,11 +48,10 @@ public OpensearchBulkProcessorAdapter( RequestOptions.DEFAULT, bulkResponseActionListener), new BulkListener(requestTracker)); - // Possible feature: move these to config - // execute the bulk every 10 000 requests - builder.setBulkActions(config.getBulkActions()); - // flush the bulk every 5mb - builder.setBulkSize(new ByteSizeValue(config.getBulkSize(), ByteSizeUnit.KB)); + // flush the bulk every `batchNumMessages` rows + builder.setBulkActions(config.getBatchNumMessages()); + // flush the bulk every `batchSizeKb` Kb + builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB)); // flush the bulk every 5 seconds whatever the number of requests builder.setFlushInterval(TimeValue.timeValueSeconds(5)); // Set the number of concurrent requests