Skip to content

Commit

Permalink
fix(EsSink): Fix es sink error thrown (#13715)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Dec 7, 2023
1 parent 2f16540 commit 7fd2a0e
Showing 1 changed file with 115 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.grpc.Status;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
Expand Down Expand Up @@ -63,19 +65,106 @@
*/
public class EsSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s";
private static final String ERROR_REPORT_TEMPLATE = "Error message %s";

private static final TimeZone UTCTimeZone = TimeZone.getTimeZone("UTC");
private final SimpleDateFormat tDfm;
private final SimpleDateFormat tsDfm;
private final SimpleDateFormat tstzDfm;

private final EsSinkConfig config;
private final BulkProcessor bulkProcessor;
private BulkProcessor bulkProcessor;
private final RestHighLevelClient client;

// Used to handle the return message of ES and throw errors
private final RequestTracker requestTracker;

// For bulk listener
private final List<Integer> primaryKeyIndexes;

class RequestTracker {
// Used to save the return results of es asynchronous writes. The capacity is Integer.Max
private final BlockingQueue<EsWriteResultResp> blockingQueue = new LinkedBlockingQueue<>();

// Count of write tasks in progress
private int taskCount = 0;

void addErrResult(String errorMsg) {
blockingQueue.add(new EsWriteResultResp(errorMsg));
}

void addOkResult(int numberOfActions) {
blockingQueue.add(new EsWriteResultResp(numberOfActions));
}

void addWriteTask() {
taskCount++;
EsWriteResultResp esWriteResultResp;
while (true) {
if ((esWriteResultResp = this.blockingQueue.poll()) != null) {
checkEsWriteResultResp(esWriteResultResp);
} else {
return;
}
}
}

void waitAllFlush() throws InterruptedException {
while (this.taskCount > 0) {
EsWriteResultResp esWriteResultResp = this.blockingQueue.poll(10, TimeUnit.SECONDS);
if (esWriteResultResp == null) {
LOG.warn("EsWriteResultResp is null, try wait again");
} else {
checkEsWriteResultResp(esWriteResultResp);
}
}
}

void checkEsWriteResultResp(EsWriteResultResp esWriteResultResp) {
if (esWriteResultResp.isOk()) {
this.taskCount -= esWriteResultResp.getNumberOfActions();
} else {
throw new RuntimeException(
String.format("Es writer error: %s", esWriteResultResp.getErrorMsg()));
}
if (this.taskCount < 0) {
throw new RuntimeException("The num of task < 0, but blockingQueue is not empty");
}
}
}

class EsWriteResultResp {

private boolean isOK;

private String errorMsg;

// Number of actions included in completed tasks
private Integer numberOfActions;

public boolean isOk() {
return isOK;
}

public EsWriteResultResp(int numberOfActions) {
this.isOK = true;
this.numberOfActions = numberOfActions;
}

public EsWriteResultResp(String errorMsg) {
this.isOK = false;
this.errorMsg = errorMsg;
}

public String getErrorMsg() {
return errorMsg;
}

public int getNumberOfActions() {
return numberOfActions;
}
}

public EsSink(EsSinkConfig config, TableSchema tableSchema) {
super(tableSchema);
HttpHost host;
Expand All @@ -86,6 +175,7 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
}

this.config = config;
this.requestTracker = new RequestTracker();

// ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever.
this.client =
Expand All @@ -105,7 +195,7 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
} catch (Exception e) {
throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
}
this.bulkProcessor = createBulkProcessor();
this.bulkProcessor = createBulkProcessor(this.requestTracker);

primaryKeyIndexes = new ArrayList<Integer>();
for (String primaryKey : tableSchema.getPrimaryKeys()) {
Expand Down Expand Up @@ -162,13 +252,18 @@ private BulkProcessor.Builder applyBulkConfig(
return builder;
}

private BulkProcessor createBulkProcessor() {
private BulkProcessor createBulkProcessor(RequestTracker requestTracker) {
BulkProcessor.Builder builder =
applyBulkConfig(this.client, this.config, new BulkListener());
applyBulkConfig(this.client, this.config, new BulkListener(requestTracker));
return builder.build();
}

private class BulkListener implements BulkProcessor.Listener {
private final RequestTracker requestTracker;

public BulkListener(RequestTracker requestTracker) {
this.requestTracker = requestTracker;
}

/** This method is called just before bulk is executed. */
@Override
Expand All @@ -180,22 +275,25 @@ public void beforeBulk(long executionId, BulkRequest request) {
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
LOG.error(
"Bulk of {} actions failed. Failure: {:?}",
request.numberOfActions(),
response.buildFailureMessage());
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), response.buildFailureMessage());
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.info("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
}
}

/** This method is called when the bulk failed and raised a Throwable */
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
LOG.error(
"Bulk of {} actions failed. Failure: {}",
request.numberOfActions(),
failure.getMessage());
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), failure.getMessage());
this.requestTracker.addErrResult(errMessage);
}
}

Expand Down Expand Up @@ -303,12 +401,14 @@ private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcess

UpdateRequest updateRequest =
new UpdateRequest(config.getIndex(), "doc", key).doc(doc).upsert(doc);
this.requestTracker.addWriteTask();
bulkProcessor.add(updateRequest);
}

private void processDelete(SinkRow row) {
final String key = buildId(row);
DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "doc", key);
this.requestTracker.addWriteTask();
bulkProcessor.add(deleteRequest);
}

Expand Down Expand Up @@ -344,7 +444,8 @@ public void write(Iterator<SinkRow> rows) {
@Override
public void sync() {
try {
bulkProcessor.flush();
this.bulkProcessor.flush();
this.requestTracker.waitAllFlush();
} catch (Exception e) {
throw io.grpc.Status.INTERNAL
.withDescription(String.format(ERROR_REPORT_TEMPLATE, e.getMessage()))
Expand All @@ -355,8 +456,6 @@ public void sync() {
@Override
public void drop() {
try {
// give processor enough time to finish unfinished work, otherwise we will get an error
// in afterbulk
bulkProcessor.awaitClose(100, TimeUnit.SECONDS);
client.close();
} catch (Exception e) {
Expand Down

0 comments on commit 7fd2a0e

Please sign in to comment.