Skip to content

Commit

Permalink
fxi
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Aug 28, 2024
1 parent 9202db4 commit d0032e4
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter {
public ElasticBulkProcessorAdapter(
RequestTracker requestTracker,
ElasticRestHighLevelClientAdapter client,
int retryOnConflict) {
EsSinkConfig config) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
Expand All @@ -48,22 +48,31 @@ public ElasticBulkProcessorAdapter(
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
System.out.println(
"BulkProcessor.Builder: "
+ config.getBulkActions()
+ " "
+ config.getBulkSize()
+ " "
+ config.getConcurrentRequests()
+ " "
+ config.getRetryOnConflict());
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(1000);
builder.setBulkActions(config.getBulkActions());
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
builder.setBulkSize(new ByteSizeValue(config.getBulkSize(), ByteSizeUnit.KB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
builder.setConcurrentRequests(1);
builder.setConcurrentRequests(config.getConcurrentRequests());
// Set a custom backoff policy which will initially wait for 100ms, increase exponentially
// and retries up to three times.
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.esBulkProcessor = builder.build();
this.requestTracker = requestTracker;
this.retryOnConflict = retryOnConflict;
this.retryOnConflict = config.getRetryOnConflict();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ class RequestTracker {
// Count of write tasks in progress
private int taskCount = 0;

private Integer maxTaskNum;

public RequestTracker(Integer maxTaskNum) {
this.maxTaskNum = maxTaskNum;
}

void addErrResult(String errorMsg) {
blockingQueue.add(new EsWriteResultResp(errorMsg));
}
Expand All @@ -74,17 +68,12 @@ void addOkResult(int numberOfActions) {
blockingQueue.add(new EsWriteResultResp(numberOfActions));
}

void addWriteTask() throws InterruptedException {
void addWriteTask() {
taskCount++;
EsWriteResultResp esWriteResultResp;
while (true) {
if ((esWriteResultResp = this.blockingQueue.poll(10, TimeUnit.MILLISECONDS))
!= null) {
if ((esWriteResultResp = this.blockingQueue.poll()) != null) {
checkEsWriteResultResp(esWriteResultResp);
}

if (taskCount >= maxTaskNum) {
continue;
} else {
return;
}
Expand Down Expand Up @@ -157,24 +146,18 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
}

this.config = config;
if (config.getMaxTaskNum() == null) {
this.requestTracker = new RequestTracker(Integer.MAX_VALUE);
} else {
this.requestTracker = new RequestTracker(config.getMaxTaskNum());
}
int retryOnConflict = config.getRetryOnConflict() == null ? 3 : config.getRetryOnConflict();
this.requestTracker = new RequestTracker();
// ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever.
if (config.getConnector().equals("elasticsearch")) {
ElasticRestHighLevelClientAdapter client =
new ElasticRestHighLevelClientAdapter(host, config);
this.bulkProcessor =
new ElasticBulkProcessorAdapter(this.requestTracker, client, retryOnConflict);
new ElasticBulkProcessorAdapter(this.requestTracker, client, config);
} else if (config.getConnector().equals("opensearch")) {
OpensearchRestHighLevelClientAdapter client =
new OpensearchRestHighLevelClientAdapter(host, config);
this.bulkProcessor =
new OpensearchBulkProcessorAdapter(
this.requestTracker, client, retryOnConflict);
new OpensearchBulkProcessorAdapter(this.requestTracker, client, config);
} else {
throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,18 @@ public class EsSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "index_column")
private String indexColumn;

@JsonProperty(value = "max_task_num")
private Integer maxTaskNum;

@JsonProperty(value = "retry_on_conflict")
private Integer retryOnConflict;

@JsonProperty(value = "bulk_actions")
private Integer bulkActions;

@JsonProperty(value = "bulk_size")
private Integer bulkSize;

@JsonProperty(value = "concurrent_requests")
private Integer concurrentRequests;

@JsonCreator
public EsSinkConfig(@JsonProperty(value = "url") String url) {
this.url = url;
Expand Down Expand Up @@ -101,21 +107,39 @@ public EsSinkConfig withIndexColumn(String indexColumn) {
return this;
}

public Integer getMaxTaskNum() {
return maxTaskNum;
public Integer getBulkActions() {
return this.bulkActions == null ? 1000 : this.bulkActions;
}

public EsSinkConfig withBulkActions(Integer bulkActions) {
this.bulkActions = bulkActions;
return this;
}

public Integer getBulkSize() {
return this.bulkSize == null ? 5 * 1024 : this.bulkSize;
}

public EsSinkConfig withMaxTaskNum(Integer maxTaskNum) {
this.maxTaskNum = maxTaskNum;
public EsSinkConfig withBulkSize(Integer bulkSize) {
this.bulkSize = bulkSize;
return this;
}

public Integer getRetryOnConflict() {
return retryOnConflict;
return this.retryOnConflict == null ? 3 : this.retryOnConflict;
}

public EsSinkConfig withRetryOnConflict(Integer retryOnConflict) {
this.retryOnConflict = retryOnConflict;
return this;
}

public Integer getConcurrentRequests() {
return this.concurrentRequests == null ? 1 : this.concurrentRequests;
}

public EsSinkConfig withConcurrentRequests(Integer concurrentRequests) {
this.concurrentRequests = concurrentRequests;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter {
public OpensearchBulkProcessorAdapter(
RequestTracker requestTracker,
OpensearchRestHighLevelClientAdapter client,
int retryOnConflict) {
EsSinkConfig config) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
Expand All @@ -50,20 +50,20 @@ public OpensearchBulkProcessorAdapter(
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(1000);
builder.setBulkActions(config.getBulkActions());
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
builder.setBulkSize(new ByteSizeValue(config.getBulkSize(), ByteSizeUnit.KB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
builder.setConcurrentRequests(1);
builder.setConcurrentRequests(config.getConcurrentRequests());
// Set a custom backoff policy which will initially wait for 100ms, increase exponentially
// and retries up to three times.
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.opensearchBulkProcessor = builder.build();
this.requestTracker = requestTracker;
this.retryOnConflict = retryOnConflict;
this.retryOnConflict = config.getRetryOnConflict();
}

@Override
Expand Down

0 comments on commit d0032e4

Please sign in to comment.