Skip to content

Commit

Permalink
fix com
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
xxhZs committed May 24, 2024
1 parent f4caee5 commit 8230237
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.risingwave.connector.ElasticRestHighLevelClientAdapter;
import com.risingwave.connector.EsSink;
import com.risingwave.connector.EsSinkConfig;
import com.risingwave.connector.RestHighLevelClientAdapter;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.ArraySinkRow;
import com.risingwave.proto.Data;
Expand Down Expand Up @@ -74,13 +74,13 @@ public void testEsSink(ElasticsearchContainer container, String username, String
fail(e.getMessage());
}

RestHighLevelClientAdapter client = sink.getClient();
ElasticRestHighLevelClientAdapter client =
(ElasticRestHighLevelClientAdapter) sink.getClient();
SearchRequest searchRequest = new SearchRequest("test");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse =
(SearchResponse) client.search(searchRequest, RequestOptions.DEFAULT);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

SearchHits hits = searchResponse.getHits();
assertEquals(2, hits.getHits().length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

package com.risingwave.connector;

import com.risingwave.connector.EsSink.RequestTracker;
import com.risingwave.connector.api.sink.SinkRow;
import java.util.concurrent.TimeUnit;

public interface BulkProcessorAdapter {
public void addRow(SinkRow row, String indexName, RequestTracker requestTracker);
public void addRow(String index, String key, String doc);

public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker);
public void deleteRow(String index, String key);

public void flush();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
package com.risingwave.connector;

import com.risingwave.connector.EsSink.RequestTracker;
import com.risingwave.connector.api.sink.SinkRow;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
Expand All @@ -36,56 +34,17 @@
public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
BulkProcessor esBulkProcessor;

private class BulkListener implements BulkProcessor.Listener {
private final RequestTracker requestTracker;

public BulkListener(RequestTracker requestTracker) {
this.requestTracker = requestTracker;
}

/** This method is called just before bulk is executed. */
@Override
public void beforeBulk(long executionId, BulkRequest request) {
LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
}

/** This method is called after bulk execution. */
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), response.buildFailureMessage());
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
}
}

/** This method is called when the bulk failed and raised a Throwable */
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), failure.getMessage());
this.requestTracker.addErrResult(errMessage);
}
}
private final RequestTracker requestTracker;

public ElasticBulkProcessorAdapter(
RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(ElasticBulkRequestConsumerFactory)
(bulkRequest, bulkResponseActionListener) ->
client.bulkAsync(
bulkRequest,
RequestOptions.DEFAULT,
bulkResponseActionListener),
(bulkRequest, bulkResponseActionListener) ->
client.bulkAsync(
bulkRequest,
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
Expand All @@ -101,6 +60,7 @@ public ElasticBulkProcessorAdapter(
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.esBulkProcessor = builder.build();
this.requestTracker = requestTracker;
}

@Override
Expand All @@ -114,34 +74,94 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}

@Override
public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) {
final String index = (String) row.get(0);
final String key = (String) row.get(1);
String doc = (String) row.get(2);

public void addRow(String index, String key, String doc) {
UpdateRequest updateRequest;
if (indexName != null) {
updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON);
} else {
updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON);
}
updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
updateRequest.docAsUpsert(true);
requestTracker.addWriteTask();
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) {
final String index = (String) row.get(0);
final String key = (String) row.get(1);

public void deleteRow(String index, String key) {
DeleteRequest deleteRequest;
if (indexName != null) {
deleteRequest = new DeleteRequest(indexName, "_doc", key);
deleteRequest = new DeleteRequest(index, "_doc", key);
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(deleteRequest);
}
}

class BulkListener
implements org.elasticsearch.action.bulk.BulkProcessor.Listener,
org.opensearch.action.bulk.BulkProcessor.Listener {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private final RequestTracker requestTracker;

public BulkListener(RequestTracker requestTracker) {
this.requestTracker = requestTracker;
}

@Override
public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) {
LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
}

@Override
public void afterBulk(
long executionId,
org.elasticsearch.action.bulk.BulkRequest request,
org.elasticsearch.action.bulk.BulkResponse response) {
if (response.hasFailures()) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), response.buildFailureMessage());
this.requestTracker.addErrResult(errMessage);
} else {
deleteRequest = new DeleteRequest(index, "_doc", key);
this.requestTracker.addOkResult(request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
}
requestTracker.addWriteTask();
this.esBulkProcessor.add(deleteRequest);
}

/** This method is called when the bulk failed and raised a Throwable */
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), failure.getMessage());
this.requestTracker.addErrResult(errMessage);
}

@Override
public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) {
LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions());
}

@Override
public void afterBulk(
long executionId,
org.opensearch.action.bulk.BulkRequest request,
org.opensearch.action.bulk.BulkResponse response) {
if (response.hasFailures()) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), response.buildFailureMessage());
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions());
}
}

@Override
public void afterBulk(
long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), failure.getMessage());
this.requestTracker.addErrResult(errMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.risingwave.connector;

import java.io.IOException;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;

public class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
RestHighLevelClient esClient;

private static RestClientBuilder configureRestClientBuilder(
RestClientBuilder builder, EsSinkConfig config) {
// Possible config:
// 1. Connection path prefix
// 2. Username and password
if (config.getPassword() != null && config.getUsername() != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
builder.setHttpClientConfigCallback(
httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
// 3. Timeout
return builder;
}

public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) {
this.esClient =
new RestHighLevelClientBuilder(
configureRestClientBuilder(RestClient.builder(host), config)
.build())
.setApiCompatibilityMode(true)
.build();
}

@Override
public void close() throws IOException {
esClient.close();
}

public boolean ping(RequestOptions options) throws IOException {
boolean flag = esClient.ping(options);
return flag;
}

public Cancellable bulkAsync(
BulkRequest bulkRequest,
RequestOptions options,
ActionListener<BulkResponse> listener) {
Cancellable cancellable = esClient.bulkAsync(bulkRequest, options, listener);
return cancellable;
}

public SearchResponse search(SearchRequest searchRequest, RequestOptions options)
throws IOException {
return this.esClient.search(searchRequest, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/


package com.risingwave.connector;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -167,14 +166,22 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
}

private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException {
final String key = (String) row.get(1);
String doc = (String) row.get(2);
final String index;
if (config.getIndex() == null) {
index = (String) row.get(0);
} else {
index = config.getIndex();
}
switch (row.getOp()) {
case INSERT:
case UPDATE_INSERT:
this.bulkProcessor.addRow(row, config.getIndex(), requestTracker);
this.bulkProcessor.addRow(index, key, doc);
break;
case DELETE:
case UPDATE_DELETE:
this.bulkProcessor.deleteRow(row, config.getIndex(), requestTracker);
this.bulkProcessor.deleteRow(index, key);
break;
default:
throw Status.INVALID_ARGUMENT
Expand Down
Loading

0 comments on commit 8230237

Please sign in to comment.