Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Aug 29, 2024
1 parent c01e4b0 commit c4770d6
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ public ElasticBulkProcessorAdapter(
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(config.getBulkActions());
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(config.getBulkSize(), ByteSizeUnit.KB));
// flush the bulk every `batchNumMessages` rows
builder.setBulkActions(config.getBatchNumMessages());
// flush the bulk every `batchSizeKb` Kb
builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public class EsSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "retry_on_conflict")
private Integer retryOnConflict;

@JsonProperty(value = "bulk_actions")
private Integer bulkActions;
@JsonProperty(value = "batch_num_messages")
private Integer batchNumMessages;

@JsonProperty(value = "bulk_size")
private Integer bulkSize;
@JsonProperty(value = "batch_size_kb")
private Integer batchSizeKb;

@JsonProperty(value = "concurrent_requests")
private Integer concurrentRequests;
Expand Down Expand Up @@ -107,21 +107,21 @@ public EsSinkConfig withIndexColumn(String indexColumn) {
return this;
}

public Integer getBulkActions() {
return this.bulkActions == null ? 1000 : this.bulkActions;
public Integer getBatchNumMessages() {
return this.batchNumMessages == null ? 1000 : this.batchNumMessages;
}

public EsSinkConfig withBulkActions(Integer bulkActions) {
this.bulkActions = bulkActions;
public EsSinkConfig withBatchNumMessages(Integer batchNumMessages) {
this.batchNumMessages = batchNumMessages;
return this;
}

public Integer getBulkSize() {
return this.bulkSize == null ? 5 * 1024 : this.bulkSize;
public Integer getBatchSizeKb() {
return this.batchSizeKb == null ? 5 * 1024 : this.batchSizeKb;
}

public EsSinkConfig withBulkSize(Integer bulkSize) {
this.bulkSize = bulkSize;
public EsSinkConfig withBatchSizeKb(Integer batchSizeKb) {
this.batchSizeKb = batchSizeKb;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ public OpensearchBulkProcessorAdapter(
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(config.getBulkActions());
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(config.getBulkSize(), ByteSizeUnit.KB));
// flush the bulk every `batchNumMessages` rows
builder.setBulkActions(config.getBatchNumMessages());
// flush the bulk every `batchSizeKb` Kb
builder.setBulkSize(new ByteSizeValue(config.getBatchSizeKb(), ByteSizeUnit.KB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
Expand Down

0 comments on commit c4770d6

Please sign in to comment.