From ebc2848d255ff616be5e58f62c17b115c7a6756f Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 15 Apr 2024 15:56:14 +0800 Subject: [PATCH 1/8] save --- .../com/risingwave/connector/SinkUtils.java | 1 + .../risingwave-sink-es-7/pom.xml | 17 +++ .../connector/BulkProcessorAdapter.java | 11 ++ .../connector/BulkRequestConsumerFactory.java | 8 +- .../ElasticBulkProcessorAdapter.java | 101 +++++++++++++ .../java/com/risingwave/connector/EsSink.java | 143 ++---------------- .../risingwave/connector/EsSinkFactory.java | 31 ++-- .../OpensearchBulkProcessorAdapter.java | 101 +++++++++++++ .../connector/RestHighLevelClientAdapter.java | 126 +++++++++++++++ src/connector/src/sink/elasticsearch.rs | 4 +- src/connector/src/sink/mod.rs | 1 + src/connector/src/sink/remote.rs | 9 +- 12 files changed, 397 insertions(+), 156 deletions(-) create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java index 679deedebcabf..73f0799e44d1d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java @@ -40,6 +40,7 @@ public static SinkFactory getSinkFactory(String sinkName) { case "jdbc": return new JDBCSinkFactory(); case "elasticsearch": + case "opensearch": return new EsSinkFactory(); case "cassandra": return new CassandraFactory(); diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml index 9c8515098d7d8..b6da9a49a0e8b 100644 --- a/java/connector-node/risingwave-sink-es-7/pom.xml +++ b/java/connector-node/risingwave-sink-es-7/pom.xml @@ -51,6 +51,23 @@ org.elasticsearch.client elasticsearch-rest-high-level-client + + org.opensearch + opensearch + 2.12.0 + + + + org.opensearch.client + opensearch-rest-high-level-client + 2.12.0 + + + org.apache.httpcomponents + httpcore-nio + + + org.apache.httpcomponents httpclient diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java new file mode 100644 index 0000000000000..6b27e28b647a3 --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java @@ -0,0 +1,11 @@ +package com.risingwave.connector; + +import java.util.concurrent.TimeUnit; + +public interface BulkProcessorAdapter { + public void add(Object request); + + public void flush(); + + public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException; +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java index e26248b5fef74..6da55bd5feb8f 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java @@ -25,5 +25,11 @@ * {@link BulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API calls * across different Elasticsearch versions. */ -interface BulkRequestConsumerFactory +interface ElasticBulkRequestConsumerFactory extends BiConsumer> {} + +interface OpenSearchBulkRequestConsumerFactory + extends BiConsumer< + org.opensearch.action.bulk.BulkRequest, + org.opensearch.core.action.ActionListener< + org.opensearch.action.bulk.BulkResponse>> {} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java new file mode 100644 index 0000000000000..9538d6007ce50 --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -0,0 +1,101 @@ +package com.risingwave.connector; + +import com.risingwave.connector.EsSink.RequestTracker; +import java.util.concurrent.TimeUnit; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter { + private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + BulkProcessor esBulkProcessor; + + private class BulkListener implements BulkProcessor.Listener { + private final RequestTracker requestTracker; + + public BulkListener(RequestTracker requestTracker) { + this.requestTracker = requestTracker; + } + + /** This method is called just before bulk is executed. */ + @Override + public void beforeBulk(long executionId, BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + + /** This method is called after bulk execution. */ + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); + } else { + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + } + + /** This method is called when the bulk failed and raised a Throwable */ + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); + } + } + + public ElasticBulkProcessorAdapter( + RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) { + BulkProcessor.Builder builder = + BulkProcessor.builder( + (ElasticBulkRequestConsumerFactory) + (bulkRequest, bulkResponseActionListener) -> + client.bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + bulkResponseActionListener), + new BulkListener(requestTracker)); + // Possible feature: move these to config + // execute the bulk every 10 000 requests + builder.setBulkActions(1000); + // flush the bulk every 5mb + builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); + // flush the bulk every 5 seconds whatever the number of requests + builder.setFlushInterval(TimeValue.timeValueSeconds(5)); + // Set the number of concurrent requests + builder.setConcurrentRequests(1); + // Set a custom backoff policy which will initially wait for 100ms, increase exponentially + // and retries up to three times. + builder.setBackoffPolicy( + BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); + this.esBulkProcessor = builder.build(); + } + + @Override + public void add(Object request) { + esBulkProcessor.add((IndexRequest) request); + } + + @Override + public void flush() { + esBulkProcessor.flush(); + } + + @Override + public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException { + esBulkProcessor.awaitClose(timeout, unit); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index cc5977a9c208c..88f807c61e403 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -25,24 +25,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.RestHighLevelClientBuilder; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,8 +49,8 @@ public class EsSink extends SinkWriterBase { private static final String ERROR_REPORT_TEMPLATE = "Error message %s"; private final EsSinkConfig config; - private BulkProcessor bulkProcessor; - private final RestHighLevelClient client; + private BulkProcessorAdapter bulkProcessor; + private final RestHighLevelClientAdapter client; // Used to handle the return message of ES and throw errors private final RequestTracker requestTracker; @@ -167,113 +151,18 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { this.requestTracker = new RequestTracker(); // ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever. - this.client = - new RestHighLevelClientBuilder( - configureRestClientBuilder(RestClient.builder(host), config) - .build()) - .setApiCompatibilityMode(true) - .build(); - // Test connection - try { - boolean isConnected = this.client.ping(RequestOptions.DEFAULT); - if (!isConnected) { - throw Status.INVALID_ARGUMENT - .withDescription("Cannot connect to " + config.getUrl()) - .asRuntimeException(); - } - } catch (Exception e) { - throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); - } - this.bulkProcessor = createBulkProcessor(this.requestTracker); - } - - private static RestClientBuilder configureRestClientBuilder( - RestClientBuilder builder, EsSinkConfig config) { - // Possible config: - // 1. Connection path prefix - // 2. Username and password - if (config.getPassword() != null && config.getUsername() != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); - builder.setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - } - // 3. Timeout - return builder; - } - - private BulkProcessor.Builder applyBulkConfig( - RestHighLevelClient client, EsSinkConfig config, BulkProcessor.Listener listener) { - BulkProcessor.Builder builder = - BulkProcessor.builder( - (BulkRequestConsumerFactory) - (bulkRequest, bulkResponseActionListener) -> - client.bulkAsync( - bulkRequest, - RequestOptions.DEFAULT, - bulkResponseActionListener), - listener); - // Possible feature: move these to config - // execute the bulk every 10 000 requests - builder.setBulkActions(1000); - // flush the bulk every 5mb - builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); - // flush the bulk every 5 seconds whatever the number of requests - builder.setFlushInterval(TimeValue.timeValueSeconds(5)); - // Set the number of concurrent requests - builder.setConcurrentRequests(1); - // Set a custom backoff policy which will initially wait for 100ms, increase exponentially - // and retries up to three times. - builder.setBackoffPolicy( - BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); - return builder; - } - - private BulkProcessor createBulkProcessor(RequestTracker requestTracker) { - BulkProcessor.Builder builder = - applyBulkConfig(this.client, this.config, new BulkListener(requestTracker)); - return builder.build(); - } - - private class BulkListener implements BulkProcessor.Listener { - private final RequestTracker requestTracker; - - public BulkListener(RequestTracker requestTracker) { - this.requestTracker = requestTracker; - } - - /** This method is called just before bulk is executed. */ - @Override - public void beforeBulk(long executionId, BulkRequest request) { - LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - - /** This method is called after bulk execution. */ - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), response.buildFailureMessage()); - this.requestTracker.addErrResult(errMessage); - } else { - this.requestTracker.addOkResult(request.numberOfActions()); - LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - } - - /** This method is called when the bulk failed and raised a Throwable */ - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), failure.getMessage()); - this.requestTracker.addErrResult(errMessage); + if (config.getConnector().equals("elasticsearch")) { + ElasticRestHighLevelClientAdapter client = + new ElasticRestHighLevelClientAdapter(host, config); + this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client); + this.client = client; + } else if (config.getConnector().equals("opensearch")) { + OpensearchRestHighLevelClientAdapter client = + new OpensearchRestHighLevelClientAdapter(host, config); + this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client); + this.client = client; + } else { + throw new RuntimeException("Sink type must be elasticsearch or opensearch"); } } @@ -360,8 +249,4 @@ public void drop() { .asRuntimeException(); } } - - public RestHighLevelClient getClient() { - return client; - } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index f3fa3bfa16c3b..11888204e0b1f 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -26,14 +26,7 @@ import java.io.IOException; import java.util.Map; import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,21 +82,19 @@ public void validate( } } + RestHighLevelClientAdapter client; // 2. check connection - RestClientBuilder builder = RestClient.builder(host); - if (config.getPassword() != null && config.getUsername() != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); - builder.setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - } - RestHighLevelClient client = new RestHighLevelClient(builder); - // Test connection try { - boolean isConnected = client.ping(RequestOptions.DEFAULT); + boolean isConnected; + if (config.getConnector().equals("elasticsearch")) { + client = new ElasticRestHighLevelClientAdapter(host, config); + isConnected = client.ping(RequestOptions.DEFAULT); + } else if (config.getConnector().equals("opensearch")) { + client = new OpensearchRestHighLevelClientAdapter(host, config); + isConnected = client.ping(org.opensearch.client.RequestOptions.DEFAULT); + } else { + throw new RuntimeException("Sink type must be elasticsearch or opensearch"); + } if (!isConnected) { throw Status.INVALID_ARGUMENT .withDescription("Cannot connect to " + config.getUrl()) diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java new file mode 100644 index 0000000000000..8c16299711e2f --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java @@ -0,0 +1,101 @@ +package com.risingwave.connector; + +import com.risingwave.connector.EsSink.RequestTracker; +import java.util.concurrent.TimeUnit; +import org.opensearch.action.bulk.BackoffPolicy; +import org.opensearch.action.bulk.BulkProcessor; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.RequestOptions; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter { + private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + BulkProcessor opensearchBulkProcessor; + + private class BulkListener implements BulkProcessor.Listener { + private final RequestTracker requestTracker; + + public BulkListener(RequestTracker requestTracker) { + this.requestTracker = requestTracker; + } + + /** This method is called just before bulk is executed. */ + @Override + public void beforeBulk(long executionId, BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + + /** This method is called after bulk execution. */ + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); + } else { + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + } + + /** This method is called when the bulk failed and raised a Throwable */ + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); + } + } + + public OpensearchBulkProcessorAdapter( + RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) { + BulkProcessor.Builder builder = + BulkProcessor.builder( + (OpenSearchBulkRequestConsumerFactory) + (bulkRequest, bulkResponseActionListener) -> + client.bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + bulkResponseActionListener), + new BulkListener(requestTracker)); + // Possible feature: move these to config + // execute the bulk every 10 000 requests + builder.setBulkActions(1000); + // flush the bulk every 5mb + builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); + // flush the bulk every 5 seconds whatever the number of requests + builder.setFlushInterval(TimeValue.timeValueSeconds(5)); + // Set the number of concurrent requests + builder.setConcurrentRequests(1); + // Set a custom backoff policy which will initially wait for 100ms, increase exponentially + // and retries up to three times. + builder.setBackoffPolicy( + BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); + this.opensearchBulkProcessor = builder.build(); + } + + @Override + public void add(Object request) { + opensearchBulkProcessor.add((IndexRequest) request); + } + + @Override + public void flush() { + opensearchBulkProcessor.flush(); + } + + @Override + public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException { + opensearchBulkProcessor.awaitClose(timeout, unit); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java new file mode 100644 index 0000000000000..f656537798819 --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java @@ -0,0 +1,126 @@ +package com.risingwave.connector; + +import java.io.IOException; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestHighLevelClientBuilder; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.client.Cancellable; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.core.action.ActionListener; + +public interface RestHighLevelClientAdapter { + public void close() throws IOException; + + public boolean ping(Object options) throws IOException; +} + +class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter { + org.elasticsearch.client.RestHighLevelClient esClient; + + private static org.elasticsearch.client.RestClientBuilder configureRestClientBuilder( + org.elasticsearch.client.RestClientBuilder builder, EsSinkConfig config) { + // Possible config: + // 1. Connection path prefix + // 2. Username and password + if (config.getPassword() != null && config.getUsername() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); + builder.setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + // 3. Timeout + return builder; + } + + public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { + this.esClient = + new RestHighLevelClientBuilder( + configureRestClientBuilder( + org.elasticsearch.client.RestClient.builder(host), + config) + .build()) + .setApiCompatibilityMode(true) + .build(); + } + + @Override + public void close() throws IOException { + esClient.close(); + } + + @Override + public boolean ping(Object options) throws IOException { + boolean flag = esClient.ping((org.elasticsearch.client.RequestOptions) options); + return flag; + } + + public org.elasticsearch.client.Cancellable bulkAsync( + org.elasticsearch.action.bulk.BulkRequest bulkRequest, org.elasticsearch.client.RequestOptions options, org.elasticsearch.action.ActionListener< + org.elasticsearch.action.bulk.BulkResponse> listener) { + org.elasticsearch.client.Cancellable cancellable = + esClient.bulkAsync( + bulkRequest, + options, + listener); + return cancellable; + } +} + +class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter { + RestHighLevelClient opensearchClient; + + private static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, EsSinkConfig config) { + // Possible config: + // 1. Connection path prefix + // 2. Username and password + if (config.getPassword() != null && config.getUsername() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); + builder.setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + // 3. Timeout + return builder; + } + + public OpensearchRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { + this.opensearchClient = + new RestHighLevelClient( + configureRestClientBuilder(RestClient.builder(host), config)); + } + + @Override + public void close() throws IOException { + opensearchClient.close(); + } + + @Override + public boolean ping(Object options) throws IOException { + boolean flag = opensearchClient.ping((RequestOptions) options); + return flag; + } + + public Cancellable bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener listener) { + Cancellable cancellable = + opensearchClient.bulkAsync( + bulkRequest, + options, + listener); + return cancellable; + } +} diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs index 578c768f1ce30..24f15e2537fdc 100644 --- a/src/connector/src/sink/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch.rs @@ -24,7 +24,7 @@ use risingwave_common::types::{JsonbVal, Scalar, ToText}; use serde_json::Value; use super::encoder::{JsonEncoder, RowEncoder}; -use super::remote::ElasticSearchSink; +use super::remote::{ElasticSearchSink, OpensearchSink}; use crate::sink::{Result, Sink}; pub const ES_OPTION_DELIMITER: &str = "delimiter"; pub const ES_OPTION_INDEX_COLUMN: &str = "index_column"; @@ -40,7 +40,7 @@ impl StreamChunkConverter { pk_indices: &Vec, properties: &HashMap, ) -> Result { - if sink_name == ElasticSearchSink::SINK_NAME { + if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME { let index_column = properties .get(ES_OPTION_INDEX_COLUMN) .cloned() diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index c430b4303f1e9..395829fc8d501 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -89,6 +89,7 @@ macro_rules! for_all_sinks { { Nats, $crate::sink::nats::NatsSink }, { Jdbc, $crate::sink::remote::JdbcSink }, { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, + { Opensearch, $crate::sink::remote::OpensearchSink }, { Cassandra, $crate::sink::remote::CassandraSink }, { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 4d3c48b6ec5c7..31bf0a92a86ac 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -73,6 +73,7 @@ macro_rules! def_remote_sink { () => { def_remote_sink! { { ElasticSearch, ElasticSearchSink, "elasticsearch" } + { Opensearch, OpensearchSink, "opensearch"} { Cassandra, CassandraSink, "cassandra" } { Jdbc, JdbcSink, "jdbc", |desc| { desc.sink_type.is_append_only() @@ -165,7 +166,7 @@ impl Sink for RemoteSink { } async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> { - if sink_name == ElasticSearchSink::SINK_NAME + if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME && param.downstream_pk.len() > 1 && param.properties.get(ES_OPTION_DELIMITER).is_none() { @@ -190,7 +191,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe | DataType::Jsonb | DataType::Bytea => Ok(()), DataType::List(list) => { - if (sink_name==ElasticSearchSink::SINK_NAME) | matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ + if (sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) | matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ Ok(()) } else{ Err(SinkError::Remote(anyhow!( @@ -201,7 +202,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe } }, DataType::Struct(_) => { - if sink_name==ElasticSearchSink::SINK_NAME{ + if sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME{ Ok(()) }else{ Err(SinkError::Remote(anyhow!( @@ -264,7 +265,7 @@ impl RemoteLogSinker { sink_name: &str, ) -> Result { let sink_proto = sink_param.to_proto(); - let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME { + let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME { let columns = vec![ ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(), ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(), From 1746451b28f11630dc065acc09952cd4ed36a680 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 16 Apr 2024 15:02:42 +0800 Subject: [PATCH 2/8] support --- .../sink/elasticsearch/EsSinkTest.java | 7 +-- .../risingwave-sink-es-7/pom.xml | 9 ---- .../connector/BulkProcessorAdapter.java | 6 ++- .../connector/BulkRequestConsumerFactory.java | 10 +++-- .../ElasticBulkProcessorAdapter.java | 42 +++++++++++++++--- .../java/com/risingwave/connector/EsSink.java | 42 +++--------------- .../OpensearchBulkProcessorAdapter.java | 44 ++++++++++++++++--- .../connector/RestHighLevelClientAdapter.java | 38 +++++++++++----- java/pom.xml | 19 +++++++- src/connector/src/sink/remote.rs | 8 ++-- 10 files changed, 144 insertions(+), 81 deletions(-) diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index 509f71ec1e569..f2894d2e790a2 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import com.risingwave.connector.EsSink; import com.risingwave.connector.EsSinkConfig; +import com.risingwave.connector.RestHighLevelClientAdapter; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.ArraySinkRow; import com.risingwave.proto.Data; @@ -31,7 +32,6 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; @@ -74,12 +74,13 @@ public void testEsSink(ElasticsearchContainer container, String username, String fail(e.getMessage()); } - RestHighLevelClient client = sink.getClient(); + RestHighLevelClientAdapter client = sink.getClient(); SearchRequest searchRequest = new SearchRequest("test"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + SearchResponse searchResponse = + (SearchResponse) client.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = searchResponse.getHits(); assertEquals(2, hits.getHits().length); diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml index b6da9a49a0e8b..4ff4bd76ef109 100644 --- a/java/connector-node/risingwave-sink-es-7/pom.xml +++ b/java/connector-node/risingwave-sink-es-7/pom.xml @@ -54,19 +54,10 @@ org.opensearch opensearch - 2.12.0 - org.opensearch.client opensearch-rest-high-level-client - 2.12.0 - - - org.apache.httpcomponents - httpcore-nio - - org.apache.httpcomponents diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java index 6b27e28b647a3..5cb743f05344e 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java @@ -1,9 +1,13 @@ package com.risingwave.connector; +import com.risingwave.connector.EsSink.RequestTracker; +import com.risingwave.connector.api.sink.SinkRow; import java.util.concurrent.TimeUnit; public interface BulkProcessorAdapter { - public void add(Object request); + public void addRow(SinkRow row, String indexName, RequestTracker requestTracker); + + public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker); public void flush(); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java index 6da55bd5feb8f..ac7fc45c6b030 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java @@ -22,13 +22,17 @@ import org.elasticsearch.action.bulk.BulkResponse; /** - * {@link BulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API calls - * across different Elasticsearch versions. + * {@link ElasticBulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API + * calls across different Elasticsearch versions. */ interface ElasticBulkRequestConsumerFactory extends BiConsumer> {} -interface OpenSearchBulkRequestConsumerFactory +/** + * {@link ElasticBulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API + * calls across different Elasticsearch versions. + */ +interface OpensearchBulkRequestConsumerFactory extends BiConsumer< org.opensearch.action.bulk.BulkRequest, org.opensearch.core.action.ActionListener< diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java index 9538d6007ce50..e9a362a2773fe 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -1,16 +1,19 @@ package com.risingwave.connector; import com.risingwave.connector.EsSink.RequestTracker; +import com.risingwave.connector.api.sink.SinkRow; import java.util.concurrent.TimeUnit; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,11 +87,6 @@ public ElasticBulkProcessorAdapter( this.esBulkProcessor = builder.build(); } - @Override - public void add(Object request) { - esBulkProcessor.add((IndexRequest) request); - } - @Override public void flush() { esBulkProcessor.flush(); @@ -98,4 +96,36 @@ public void flush() { public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException { esBulkProcessor.awaitClose(timeout, unit); } + + @Override + public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) { + final String index = (String) row.get(0); + final String key = (String) row.get(1); + String doc = (String) row.get(2); + + UpdateRequest updateRequest; + if (indexName != null) { + updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON); + } else { + updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON); + } + updateRequest.docAsUpsert(true); + requestTracker.addWriteTask(); + this.esBulkProcessor.add(updateRequest); + } + + @Override + public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) { + final String index = (String) row.get(0); + final String key = (String) row.get(1); + + DeleteRequest deleteRequest; + if (indexName != null) { + deleteRequest = new DeleteRequest(indexName, "_doc", key); + } else { + deleteRequest = new DeleteRequest(index, "_doc", key); + } + requestTracker.addWriteTask(); + this.esBulkProcessor.add(deleteRequest); + } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 88f807c61e403..b29e74902be45 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -25,9 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.http.HttpHost; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,46 +163,15 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { } } - private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - String doc = (String) row.get(2); - - UpdateRequest updateRequest; - if (config.getIndex() != null) { - updateRequest = - new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON); - } else { - updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON); - } - updateRequest.docAsUpsert(true); - this.requestTracker.addWriteTask(); - bulkProcessor.add(updateRequest); - } - - private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - - DeleteRequest deleteRequest; - if (config.getIndex() != null) { - deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key); - } else { - deleteRequest = new DeleteRequest(index, "_doc", key); - } - this.requestTracker.addWriteTask(); - bulkProcessor.add(deleteRequest); - } - private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException { switch (row.getOp()) { case INSERT: case UPDATE_INSERT: - processUpsert(row); + this.bulkProcessor.addRow(row, config.getIndex(), requestTracker); break; case DELETE: case UPDATE_DELETE: - processDelete(row); + this.bulkProcessor.deleteRow(row, config.getIndex(), requestTracker); break; default: throw Status.INVALID_ARGUMENT @@ -249,4 +215,8 @@ public void drop() { .asRuntimeException(); } } + + public RestHighLevelClientAdapter getClient() { + return this.client; + } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java index 8c16299711e2f..9bf0b90d9bcc8 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java @@ -1,14 +1,17 @@ package com.risingwave.connector; import com.risingwave.connector.EsSink.RequestTracker; +import com.risingwave.connector.api.sink.SinkRow; import java.util.concurrent.TimeUnit; import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.bulk.BulkProcessor; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.slf4j.Logger; @@ -61,7 +64,7 @@ public OpensearchBulkProcessorAdapter( RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) { BulkProcessor.Builder builder = BulkProcessor.builder( - (OpenSearchBulkRequestConsumerFactory) + (OpensearchBulkRequestConsumerFactory) (bulkRequest, bulkResponseActionListener) -> client.bulkAsync( bulkRequest, @@ -84,11 +87,6 @@ public OpensearchBulkProcessorAdapter( this.opensearchBulkProcessor = builder.build(); } - @Override - public void add(Object request) { - opensearchBulkProcessor.add((IndexRequest) request); - } - @Override public void flush() { opensearchBulkProcessor.flush(); @@ -98,4 +96,36 @@ public void flush() { public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException { opensearchBulkProcessor.awaitClose(timeout, unit); } + + @Override + public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) { + final String index = (String) row.get(0); + final String key = (String) row.get(1); + String doc = (String) row.get(2); + + UpdateRequest updateRequest; + if (indexName != null) { + updateRequest = new UpdateRequest(indexName, key).doc(doc, XContentType.JSON); + } else { + updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON); + } + updateRequest.docAsUpsert(true); + requestTracker.addWriteTask(); + this.opensearchBulkProcessor.add(updateRequest); + } + + @Override + public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) { + final String index = (String) row.get(0); + final String key = (String) row.get(1); + + DeleteRequest deleteRequest; + if (indexName != null) { + deleteRequest = new DeleteRequest(indexName, key); + } else { + deleteRequest = new DeleteRequest(index, key); + } + requestTracker.addWriteTask(); + this.opensearchBulkProcessor.add(deleteRequest); + } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java index f656537798819..ec35b14dea32a 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java @@ -20,6 +20,8 @@ public interface RestHighLevelClientAdapter { public void close() throws IOException; public boolean ping(Object options) throws IOException; + + public Object search(Object searchRequest, Object options) throws IOException; } class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter { @@ -66,15 +68,21 @@ public boolean ping(Object options) throws IOException { } public org.elasticsearch.client.Cancellable bulkAsync( - org.elasticsearch.action.bulk.BulkRequest bulkRequest, org.elasticsearch.client.RequestOptions options, org.elasticsearch.action.ActionListener< - org.elasticsearch.action.bulk.BulkResponse> listener) { + org.elasticsearch.action.bulk.BulkRequest bulkRequest, + org.elasticsearch.client.RequestOptions options, + org.elasticsearch.action.ActionListener + listener) { org.elasticsearch.client.Cancellable cancellable = - esClient.bulkAsync( - bulkRequest, - options, - listener); + esClient.bulkAsync(bulkRequest, options, listener); return cancellable; } + + @Override + public Object search(Object searchRequest, Object options) throws IOException { + return this.esClient.search( + (org.elasticsearch.action.search.SearchRequest) searchRequest, + (org.elasticsearch.client.RequestOptions) options); + } } class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter { @@ -115,12 +123,18 @@ public boolean ping(Object options) throws IOException { return flag; } - public Cancellable bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener listener) { - Cancellable cancellable = - opensearchClient.bulkAsync( - bulkRequest, - options, - listener); + public Cancellable bulkAsync( + BulkRequest bulkRequest, + RequestOptions options, + ActionListener listener) { + Cancellable cancellable = opensearchClient.bulkAsync(bulkRequest, options, listener); return cancellable; } + + @Override + public Object search(Object searchRequest, Object options) throws IOException { + return this.opensearchClient.search( + (org.opensearch.action.search.SearchRequest) searchRequest, + (org.opensearch.client.RequestOptions) options); + } } diff --git a/java/pom.xml b/java/pom.xml index 89df5b870077b..822bfea2fbe24 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -77,10 +77,11 @@ 1.10.0 3.12.0 2.4.2.Final - 2.13.5 + 2.15.0 3.3.1 3.3.3 7.17.19 + 2.11.1 4.15.0 1.18.0 1.17.6 @@ -197,6 +198,22 @@ elasticsearch-rest-high-level-client ${elasticsearch.version} + + org.opensearch + opensearch + ${opensearch.version} + + + org.opensearch.client + opensearch-rest-high-level-client + ${opensearch.version} + + + org.apache.httpcomponents + httpcore-nio + + + io.grpc grpc-netty-shaded diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 31bf0a92a86ac..cd666242af06a 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -166,7 +166,7 @@ impl Sink for RemoteSink { } async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> { - if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME + if (sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) && param.downstream_pk.len() > 1 && param.properties.get(ES_OPTION_DELIMITER).is_none() { @@ -191,7 +191,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe | DataType::Jsonb | DataType::Bytea => Ok(()), DataType::List(list) => { - if (sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) | matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ + if (sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ Ok(()) } else{ Err(SinkError::Remote(anyhow!( @@ -265,7 +265,9 @@ impl RemoteLogSinker { sink_name: &str, ) -> Result { let sink_proto = sink_param.to_proto(); - let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME { + let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME + || sink_name == OpensearchSink::SINK_NAME + { let columns = vec![ ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(), ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(), From f4caee5bd13473b003c8e8e4a235b8ccabc69712 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 17 Apr 2024 17:43:26 +0800 Subject: [PATCH 3/8] fmt --- .../connector/BulkProcessorAdapter.java | 16 ++++++++++ .../ElasticBulkProcessorAdapter.java | 16 ++++++++++ .../java/com/risingwave/connector/EsSink.java | 29 ++++++++++-------- .../risingwave/connector/EsSinkFactory.java | 29 ++++++++++-------- .../OpensearchBulkProcessorAdapter.java | 16 ++++++++++ .../connector/RestHighLevelClientAdapter.java | 16 ++++++++++ java/pom.xml | 30 +++++++++---------- 7 files changed, 111 insertions(+), 41 deletions(-) diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java index 5cb743f05344e..a4acc5e1e7e7d 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector; import com.risingwave.connector.EsSink.RequestTracker; diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java index e9a362a2773fe..16037caf4ca76 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector; import com.risingwave.connector.EsSink.RequestTracker; diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index b29e74902be45..613e01206a323 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -1,16 +1,19 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector; diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index 11888204e0b1f..7cf741a66f87c 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -1,16 +1,19 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector; diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java index 9bf0b90d9bcc8..4ae1c430071d1 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector; import com.risingwave.connector.EsSink.RequestTracker; diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java index ec35b14dea32a..d441719efe3c6 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector; import java.io.IOException; diff --git a/java/pom.xml b/java/pom.xml index 822bfea2fbe24..67d33b9800a00 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -199,21 +199,21 @@ ${elasticsearch.version} - org.opensearch - opensearch - ${opensearch.version} - - - org.opensearch.client - opensearch-rest-high-level-client - ${opensearch.version} - - - org.apache.httpcomponents - httpcore-nio - - - + org.opensearch + opensearch + ${opensearch.version} + + + org.opensearch.client + opensearch-rest-high-level-client + ${opensearch.version} + + + org.apache.httpcomponents + httpcore-nio + + + io.grpc grpc-netty-shaded From e6b07a74d07607d76357c868c29bd7ebfdecbcbd Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 24 May 2024 14:20:19 +0800 Subject: [PATCH 4/8] fix com fix fix --- .../sink/elasticsearch/EsSinkTest.java | 8 +- .../connector/BulkProcessorAdapter.java | 6 +- .../connector/BulkRequestConsumerFactory.java | 39 ----- .../ElasticBulkProcessorAdapter.java | 154 ++++++++++-------- .../ElasticRestHighLevelClientAdapter.java | 89 ++++++++++ .../java/com/risingwave/connector/EsSink.java | 13 +- .../risingwave/connector/EsSinkFactory.java | 37 ++--- .../OpensearchBulkProcessorAdapter.java | 82 ++-------- .../OpensearchRestHighLevelClientAdapter.java | 78 +++++++++ .../connector/RestHighLevelClientAdapter.java | 133 --------------- src/connector/src/sink/elasticsearch.rs | 6 +- src/connector/src/sink/remote.rs | 4 +- 12 files changed, 306 insertions(+), 343 deletions(-) delete mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index f2894d2e790a2..79636d5590ca6 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -19,9 +19,9 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.risingwave.connector.ElasticRestHighLevelClientAdapter; import com.risingwave.connector.EsSink; import com.risingwave.connector.EsSinkConfig; -import com.risingwave.connector.RestHighLevelClientAdapter; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.ArraySinkRow; import com.risingwave.proto.Data; @@ -74,13 +74,13 @@ public void testEsSink(ElasticsearchContainer container, String username, String fail(e.getMessage()); } - RestHighLevelClientAdapter client = sink.getClient(); + ElasticRestHighLevelClientAdapter client = + (ElasticRestHighLevelClientAdapter) sink.getClient(); SearchRequest searchRequest = new SearchRequest("test"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = - (SearchResponse) client.search(searchRequest, RequestOptions.DEFAULT); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = searchResponse.getHits(); assertEquals(2, hits.getHits().length); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java index a4acc5e1e7e7d..d72ebe2833953 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java @@ -16,14 +16,12 @@ package com.risingwave.connector; -import com.risingwave.connector.EsSink.RequestTracker; -import com.risingwave.connector.api.sink.SinkRow; import java.util.concurrent.TimeUnit; public interface BulkProcessorAdapter { - public void addRow(SinkRow row, String indexName, RequestTracker requestTracker); + public void addRow(String index, String key, String doc); - public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker); + public void deleteRow(String index, String key); public void flush(); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java deleted file mode 100644 index ac7fc45c6b030..0000000000000 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.risingwave.connector; - -import java.util.function.BiConsumer; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; - -/** - * {@link ElasticBulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API - * calls across different Elasticsearch versions. - */ -interface ElasticBulkRequestConsumerFactory - extends BiConsumer> {} - -/** - * {@link ElasticBulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API - * calls across different Elasticsearch versions. - */ -interface OpensearchBulkRequestConsumerFactory - extends BiConsumer< - org.opensearch.action.bulk.BulkRequest, - org.opensearch.core.action.ActionListener< - org.opensearch.action.bulk.BulkResponse>> {} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java index 16037caf4ca76..8e3c57911118f 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -17,12 +17,10 @@ package com.risingwave.connector; import com.risingwave.connector.EsSink.RequestTracker; -import com.risingwave.connector.api.sink.SinkRow; import java.util.concurrent.TimeUnit; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; @@ -36,56 +34,17 @@ public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); BulkProcessor esBulkProcessor; - - private class BulkListener implements BulkProcessor.Listener { - private final RequestTracker requestTracker; - - public BulkListener(RequestTracker requestTracker) { - this.requestTracker = requestTracker; - } - - /** This method is called just before bulk is executed. */ - @Override - public void beforeBulk(long executionId, BulkRequest request) { - LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - - /** This method is called after bulk execution. */ - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), response.buildFailureMessage()); - this.requestTracker.addErrResult(errMessage); - } else { - this.requestTracker.addOkResult(request.numberOfActions()); - LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - } - - /** This method is called when the bulk failed and raised a Throwable */ - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), failure.getMessage()); - this.requestTracker.addErrResult(errMessage); - } - } + private final RequestTracker requestTracker; public ElasticBulkProcessorAdapter( RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) { BulkProcessor.Builder builder = BulkProcessor.builder( - (ElasticBulkRequestConsumerFactory) - (bulkRequest, bulkResponseActionListener) -> - client.bulkAsync( - bulkRequest, - RequestOptions.DEFAULT, - bulkResponseActionListener), + (bulkRequest, bulkResponseActionListener) -> + client.bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + bulkResponseActionListener), new BulkListener(requestTracker)); // Possible feature: move these to config // execute the bulk every 10 000 requests @@ -101,6 +60,7 @@ public ElasticBulkProcessorAdapter( builder.setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); this.esBulkProcessor = builder.build(); + this.requestTracker = requestTracker; } @Override @@ -114,34 +74,94 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException } @Override - public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - String doc = (String) row.get(2); - + public void addRow(String index, String key, String doc) { UpdateRequest updateRequest; - if (indexName != null) { - updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON); - } else { - updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON); - } + updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON); updateRequest.docAsUpsert(true); - requestTracker.addWriteTask(); + this.requestTracker.addWriteTask(); this.esBulkProcessor.add(updateRequest); } @Override - public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - + public void deleteRow(String index, String key) { DeleteRequest deleteRequest; - if (indexName != null) { - deleteRequest = new DeleteRequest(indexName, "_doc", key); + deleteRequest = new DeleteRequest(index, "_doc", key); + this.requestTracker.addWriteTask(); + this.esBulkProcessor.add(deleteRequest); + } +} + +class BulkListener + implements org.elasticsearch.action.bulk.BulkProcessor.Listener, + org.opensearch.action.bulk.BulkProcessor.Listener { + private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + private final RequestTracker requestTracker; + + public BulkListener(RequestTracker requestTracker) { + this.requestTracker = requestTracker; + } + + @Override + public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + + @Override + public void afterBulk( + long executionId, + org.elasticsearch.action.bulk.BulkRequest request, + org.elasticsearch.action.bulk.BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); } else { - deleteRequest = new DeleteRequest(index, "_doc", key); + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); } - requestTracker.addWriteTask(); - this.esBulkProcessor.add(deleteRequest); + } + + /** This method is called when the bulk failed and raised a Throwable */ + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); + } + + @Override + public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions()); + } + + @Override + public void afterBulk( + long executionId, + org.opensearch.action.bulk.BulkRequest request, + org.opensearch.action.bulk.BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); + } else { + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions()); + } + } + + @Override + public void afterBulk( + long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java new file mode 100644 index 0000000000000..65e531bf8ea13 --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java @@ -0,0 +1,89 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import java.io.IOException; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Cancellable; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.RestHighLevelClientBuilder; + +public class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter { + RestHighLevelClient esClient; + + private static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, EsSinkConfig config) { + // Possible config: + // 1. Connection path prefix + // 2. Username and password + if (config.getPassword() != null && config.getUsername() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); + builder.setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + // 3. Timeout + return builder; + } + + public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { + this.esClient = + new RestHighLevelClientBuilder( + configureRestClientBuilder(RestClient.builder(host), config) + .build()) + .setApiCompatibilityMode(true) + .build(); + } + + @Override + public void close() throws IOException { + esClient.close(); + } + + public boolean ping(RequestOptions options) throws IOException { + boolean flag = esClient.ping(options); + return flag; + } + + public Cancellable bulkAsync( + BulkRequest bulkRequest, + RequestOptions options, + ActionListener listener) { + Cancellable cancellable = esClient.bulkAsync(bulkRequest, options, listener); + return cancellable; + } + + public SearchResponse search(SearchRequest searchRequest, RequestOptions options) + throws IOException { + return this.esClient.search(searchRequest, options); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 613e01206a323..3b03e24899056 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -14,7 +14,6 @@ * limitations under the License. */ - package com.risingwave.connector; import com.fasterxml.jackson.core.JsonProcessingException; @@ -167,14 +166,22 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { } private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException { + final String key = (String) row.get(1); + String doc = (String) row.get(2); + final String index; + if (config.getIndex() == null) { + index = (String) row.get(0); + } else { + index = config.getIndex(); + } switch (row.getOp()) { case INSERT: case UPDATE_INSERT: - this.bulkProcessor.addRow(row, config.getIndex(), requestTracker); + this.bulkProcessor.addRow(index, key, doc); break; case DELETE: case UPDATE_DELETE: - this.bulkProcessor.deleteRow(row, config.getIndex(), requestTracker); + this.bulkProcessor.deleteRow(index, key); break; default: throw Status.INVALID_ARGUMENT diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index 7cf741a66f87c..f6bd427b3aa17 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -14,7 +14,6 @@ * limitations under the License. */ - package com.risingwave.connector; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -26,10 +25,8 @@ import com.risingwave.proto.Catalog; import com.risingwave.proto.Data; import io.grpc.Status; -import java.io.IOException; import java.util.Map; import org.apache.http.HttpHost; -import org.elasticsearch.client.RequestOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,33 +82,31 @@ public void validate( } } - RestHighLevelClientAdapter client; // 2. check connection try { - boolean isConnected; if (config.getConnector().equals("elasticsearch")) { - client = new ElasticRestHighLevelClientAdapter(host, config); - isConnected = client.ping(RequestOptions.DEFAULT); + ElasticRestHighLevelClientAdapter esClient = + new ElasticRestHighLevelClientAdapter(host, config); + if (esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) { + throw Status.INVALID_ARGUMENT + .withDescription("Cannot connect to " + config.getUrl()) + .asRuntimeException(); + } + esClient.close(); } else if (config.getConnector().equals("opensearch")) { - client = new OpensearchRestHighLevelClientAdapter(host, config); - isConnected = client.ping(org.opensearch.client.RequestOptions.DEFAULT); + OpensearchRestHighLevelClientAdapter opensearchClient = + new OpensearchRestHighLevelClientAdapter(host, config); + if (opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) { + throw Status.INVALID_ARGUMENT + .withDescription("Cannot connect to " + config.getUrl()) + .asRuntimeException(); + } + opensearchClient.close(); } else { throw new RuntimeException("Sink type must be elasticsearch or opensearch"); } - if (!isConnected) { - throw Status.INVALID_ARGUMENT - .withDescription("Cannot connect to " + config.getUrl()) - .asRuntimeException(); - } } catch (Exception e) { throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); } - - // 3. close client - try { - client.close(); - } catch (IOException e) { - throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); - } } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java index 4ae1c430071d1..d5d8cdc3d237d 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java @@ -17,12 +17,9 @@ package com.risingwave.connector; import com.risingwave.connector.EsSink.RequestTracker; -import com.risingwave.connector.api.sink.SinkRow; import java.util.concurrent.TimeUnit; import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.bulk.BulkProcessor; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; @@ -35,57 +32,18 @@ public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + private final RequestTracker requestTracker; BulkProcessor opensearchBulkProcessor; - private class BulkListener implements BulkProcessor.Listener { - private final RequestTracker requestTracker; - - public BulkListener(RequestTracker requestTracker) { - this.requestTracker = requestTracker; - } - - /** This method is called just before bulk is executed. */ - @Override - public void beforeBulk(long executionId, BulkRequest request) { - LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - - /** This method is called after bulk execution. */ - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), response.buildFailureMessage()); - this.requestTracker.addErrResult(errMessage); - } else { - this.requestTracker.addOkResult(request.numberOfActions()); - LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - } - - /** This method is called when the bulk failed and raised a Throwable */ - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), failure.getMessage()); - this.requestTracker.addErrResult(errMessage); - } - } - public OpensearchBulkProcessorAdapter( RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) { BulkProcessor.Builder builder = BulkProcessor.builder( - (OpensearchBulkRequestConsumerFactory) - (bulkRequest, bulkResponseActionListener) -> - client.bulkAsync( - bulkRequest, - RequestOptions.DEFAULT, - bulkResponseActionListener), + (bulkRequest, bulkResponseActionListener) -> + client.bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + bulkResponseActionListener), new BulkListener(requestTracker)); // Possible feature: move these to config // execute the bulk every 10 000 requests @@ -101,6 +59,7 @@ public OpensearchBulkProcessorAdapter( builder.setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); this.opensearchBulkProcessor = builder.build(); + this.requestTracker = requestTracker; } @Override @@ -114,34 +73,19 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException } @Override - public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - String doc = (String) row.get(2); - + public void addRow(String index, String key, String doc) { UpdateRequest updateRequest; - if (indexName != null) { - updateRequest = new UpdateRequest(indexName, key).doc(doc, XContentType.JSON); - } else { - updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON); - } + updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON); updateRequest.docAsUpsert(true); - requestTracker.addWriteTask(); + this.requestTracker.addWriteTask(); this.opensearchBulkProcessor.add(updateRequest); } @Override - public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - + public void deleteRow(String index, String key) { DeleteRequest deleteRequest; - if (indexName != null) { - deleteRequest = new DeleteRequest(indexName, key); - } else { - deleteRequest = new DeleteRequest(index, key); - } - requestTracker.addWriteTask(); + deleteRequest = new DeleteRequest(index, key); + this.requestTracker.addWriteTask(); this.opensearchBulkProcessor.add(deleteRequest); } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java new file mode 100644 index 0000000000000..f962564c3f2a8 --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import java.io.IOException; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.client.Cancellable; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.core.action.ActionListener; + +public class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter { + RestHighLevelClient opensearchClient; + + private static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, EsSinkConfig config) { + // Possible config: + // 1. Connection path prefix + // 2. Username and password + if (config.getPassword() != null && config.getUsername() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); + builder.setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + // 3. Timeout + return builder; + } + + public OpensearchRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { + this.opensearchClient = + new org.opensearch.client.RestHighLevelClient( + configureRestClientBuilder( + org.opensearch.client.RestClient.builder(host), config)); + } + + @Override + public void close() throws IOException { + opensearchClient.close(); + } + + public boolean ping(org.opensearch.client.RequestOptions options) throws IOException { + boolean flag = opensearchClient.ping(options); + return flag; + } + + public Cancellable bulkAsync( + BulkRequest bulkRequest, + RequestOptions options, + ActionListener listener) { + Cancellable cancellable = opensearchClient.bulkAsync(bulkRequest, options, listener); + return cancellable; + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java index d441719efe3c6..dacddaf3704d2 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java @@ -17,140 +17,7 @@ package com.risingwave.connector; import java.io.IOException; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.client.RestHighLevelClientBuilder; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.client.Cancellable; -import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestClient; -import org.opensearch.client.RestClientBuilder; -import org.opensearch.client.RestHighLevelClient; -import org.opensearch.core.action.ActionListener; public interface RestHighLevelClientAdapter { public void close() throws IOException; - - public boolean ping(Object options) throws IOException; - - public Object search(Object searchRequest, Object options) throws IOException; -} - -class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter { - org.elasticsearch.client.RestHighLevelClient esClient; - - private static org.elasticsearch.client.RestClientBuilder configureRestClientBuilder( - org.elasticsearch.client.RestClientBuilder builder, EsSinkConfig config) { - // Possible config: - // 1. Connection path prefix - // 2. Username and password - if (config.getPassword() != null && config.getUsername() != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); - builder.setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - } - // 3. Timeout - return builder; - } - - public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { - this.esClient = - new RestHighLevelClientBuilder( - configureRestClientBuilder( - org.elasticsearch.client.RestClient.builder(host), - config) - .build()) - .setApiCompatibilityMode(true) - .build(); - } - - @Override - public void close() throws IOException { - esClient.close(); - } - - @Override - public boolean ping(Object options) throws IOException { - boolean flag = esClient.ping((org.elasticsearch.client.RequestOptions) options); - return flag; - } - - public org.elasticsearch.client.Cancellable bulkAsync( - org.elasticsearch.action.bulk.BulkRequest bulkRequest, - org.elasticsearch.client.RequestOptions options, - org.elasticsearch.action.ActionListener - listener) { - org.elasticsearch.client.Cancellable cancellable = - esClient.bulkAsync(bulkRequest, options, listener); - return cancellable; - } - - @Override - public Object search(Object searchRequest, Object options) throws IOException { - return this.esClient.search( - (org.elasticsearch.action.search.SearchRequest) searchRequest, - (org.elasticsearch.client.RequestOptions) options); - } -} - -class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter { - RestHighLevelClient opensearchClient; - - private static RestClientBuilder configureRestClientBuilder( - RestClientBuilder builder, EsSinkConfig config) { - // Possible config: - // 1. Connection path prefix - // 2. Username and password - if (config.getPassword() != null && config.getUsername() != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); - builder.setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - } - // 3. Timeout - return builder; - } - - public OpensearchRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { - this.opensearchClient = - new RestHighLevelClient( - configureRestClientBuilder(RestClient.builder(host), config)); - } - - @Override - public void close() throws IOException { - opensearchClient.close(); - } - - @Override - public boolean ping(Object options) throws IOException { - boolean flag = opensearchClient.ping((RequestOptions) options); - return flag; - } - - public Cancellable bulkAsync( - BulkRequest bulkRequest, - RequestOptions options, - ActionListener listener) { - Cancellable cancellable = opensearchClient.bulkAsync(bulkRequest, options, listener); - return cancellable; - } - - @Override - public Object search(Object searchRequest, Object options) throws IOException { - return this.opensearchClient.search( - (org.opensearch.action.search.SearchRequest) searchRequest, - (org.opensearch.client.RequestOptions) options); - } } diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs index 24f15e2537fdc..b3cfe3b9a801b 100644 --- a/src/connector/src/sink/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch.rs @@ -40,7 +40,7 @@ impl StreamChunkConverter { pk_indices: &Vec, properties: &HashMap, ) -> Result { - if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME { + if is_es_sink(sink_name) { let index_column = properties .get(ES_OPTION_INDEX_COLUMN) .cloned() @@ -170,3 +170,7 @@ impl EsStreamChunkConverter { (self.fn_build_id)(row) } } + +pub fn is_es_sink(sink_name: &str) -> bool { + sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME +} diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index cd666242af06a..c45d729bec6be 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -57,7 +57,7 @@ use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; -use super::elasticsearch::{StreamChunkConverter, ES_OPTION_DELIMITER}; +use super::elasticsearch::{is_es_sink, StreamChunkConverter, ES_OPTION_DELIMITER}; use crate::error::ConnectorResult; use crate::sink::catalog::desc::SinkDesc; use crate::sink::coordinate::CoordinatedSinkWriter; @@ -166,7 +166,7 @@ impl Sink for RemoteSink { } async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> { - if (sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) + if is_es_sink(sink_name) && param.downstream_pk.len() > 1 && param.properties.get(ES_OPTION_DELIMITER).is_none() { From c57df293e7bec5d385023f4c41cf071d6f4531d4 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 24 May 2024 15:00:49 +0800 Subject: [PATCH 5/8] fix ci --- java/connector-node/risingwave-connector-test/pom.xml | 4 ++-- .../risingwave/connector/sink/elasticsearch/EsSinkTest.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/java/connector-node/risingwave-connector-test/pom.xml b/java/connector-node/risingwave-connector-test/pom.xml index 14b1c7bd65fc0..d3d47b0bc4571 100644 --- a/java/connector-node/risingwave-connector-test/pom.xml +++ b/java/connector-node/risingwave-connector-test/pom.xml @@ -128,13 +128,13 @@ com.fasterxml.jackson.core jackson-databind - ${jackson.version} + 2.13.5 test com.fasterxml.jackson.core jackson-core - ${jackson.version} + 2.13.5 test diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index 79636d5590ca6..3b58878295311 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -55,6 +55,7 @@ public void testEsSink(ElasticsearchContainer container, String username, String EsSink sink = new EsSink( new EsSinkConfig(container.getHttpHostAddress()) + .setConnector("elasticsearch") .withIndex("test") .withDelimiter("$") .withUsername(username) From 5848441be1b4fddfefd9d388a65749a433ffd723 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 27 May 2024 17:01:06 +0800 Subject: [PATCH 6/8] fix comm fix fix ci fix ci --- .../sink/elasticsearch/EsSinkTest.java | 21 ++-- .../risingwave/connector/BulkListener.java | 97 +++++++++++++++++++ .../ElasticBulkProcessorAdapter.java | 76 --------------- .../ElasticRestHighLevelClientAdapter.java | 2 +- .../java/com/risingwave/connector/EsSink.java | 8 -- .../OpensearchRestHighLevelClientAdapter.java | 2 +- .../connector/RestHighLevelClientAdapter.java | 23 ----- src/connector/src/sink/remote.rs | 4 +- 8 files changed, 112 insertions(+), 121 deletions(-) create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java delete mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index 3b58878295311..d2873fac9d216 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -29,6 +29,7 @@ import com.risingwave.proto.Data.Op; import java.io.IOException; import java.util.Map; +import org.apache.http.HttpHost; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; @@ -52,15 +53,14 @@ static TableSchema getTestTableSchema() { public void testEsSink(ElasticsearchContainer container, String username, String password) throws IOException { - EsSink sink = - new EsSink( - new EsSinkConfig(container.getHttpHostAddress()) - .setConnector("elasticsearch") - .withIndex("test") - .withDelimiter("$") - .withUsername(username) - .withPassword(password), - getTestTableSchema()); + EsSinkConfig config = + new EsSinkConfig(container.getHttpHostAddress()) + .withIndex("test") + .withDelimiter("$") + .withUsername(username) + .withPassword(password); + config.setConnector("elasticsearch"); + EsSink sink = new EsSink(config, getTestTableSchema()); sink.write( Iterators.forArray( new ArraySinkRow( @@ -75,8 +75,9 @@ public void testEsSink(ElasticsearchContainer container, String username, String fail(e.getMessage()); } + HttpHost host = HttpHost.create(config.getUrl()); ElasticRestHighLevelClientAdapter client = - (ElasticRestHighLevelClientAdapter) sink.getClient(); + new ElasticRestHighLevelClientAdapter(host, config); SearchRequest searchRequest = new SearchRequest("test"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java new file mode 100644 index 0000000000000..4ce1165ba1baf --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java @@ -0,0 +1,97 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.risingwave.connector.EsSink.RequestTracker; +import org.elasticsearch.action.bulk.BulkRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BulkListener + implements org.elasticsearch.action.bulk.BulkProcessor.Listener, + org.opensearch.action.bulk.BulkProcessor.Listener { + private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + private final RequestTracker requestTracker; + + public BulkListener(RequestTracker requestTracker) { + this.requestTracker = requestTracker; + } + + @Override + public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + + @Override + public void afterBulk( + long executionId, + org.elasticsearch.action.bulk.BulkRequest request, + org.elasticsearch.action.bulk.BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); + } else { + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + } + + /** This method is called when the bulk failed and raised a Throwable */ + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); + } + + @Override + public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions()); + } + + @Override + public void afterBulk( + long executionId, + org.opensearch.action.bulk.BulkRequest request, + org.opensearch.action.bulk.BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); + } else { + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions()); + } + } + + @Override + public void afterBulk( + long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java index 8e3c57911118f..de6ab3414f65a 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; @@ -90,78 +89,3 @@ public void deleteRow(String index, String key) { this.esBulkProcessor.add(deleteRequest); } } - -class BulkListener - implements org.elasticsearch.action.bulk.BulkProcessor.Listener, - org.opensearch.action.bulk.BulkProcessor.Listener { - private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); - private final RequestTracker requestTracker; - - public BulkListener(RequestTracker requestTracker) { - this.requestTracker = requestTracker; - } - - @Override - public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) { - LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - - @Override - public void afterBulk( - long executionId, - org.elasticsearch.action.bulk.BulkRequest request, - org.elasticsearch.action.bulk.BulkResponse response) { - if (response.hasFailures()) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), response.buildFailureMessage()); - this.requestTracker.addErrResult(errMessage); - } else { - this.requestTracker.addOkResult(request.numberOfActions()); - LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - } - - /** This method is called when the bulk failed and raised a Throwable */ - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), failure.getMessage()); - this.requestTracker.addErrResult(errMessage); - } - - @Override - public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) { - LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions()); - } - - @Override - public void afterBulk( - long executionId, - org.opensearch.action.bulk.BulkRequest request, - org.opensearch.action.bulk.BulkResponse response) { - if (response.hasFailures()) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), response.buildFailureMessage()); - this.requestTracker.addErrResult(errMessage); - } else { - this.requestTracker.addOkResult(request.numberOfActions()); - LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions()); - } - } - - @Override - public void afterBulk( - long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), failure.getMessage()); - this.requestTracker.addErrResult(errMessage); - } -} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java index 65e531bf8ea13..c64def3bef8a7 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java @@ -34,7 +34,7 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClientBuilder; -public class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter { +public class ElasticRestHighLevelClientAdapter implements AutoCloseable { RestHighLevelClient esClient; private static RestClientBuilder configureRestClientBuilder( diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 3b03e24899056..315fc800a2ef0 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -49,7 +49,6 @@ public class EsSink extends SinkWriterBase { private final EsSinkConfig config; private BulkProcessorAdapter bulkProcessor; - private final RestHighLevelClientAdapter client; // Used to handle the return message of ES and throw errors private final RequestTracker requestTracker; @@ -154,12 +153,10 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { ElasticRestHighLevelClientAdapter client = new ElasticRestHighLevelClientAdapter(host, config); this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client); - this.client = client; } else if (config.getConnector().equals("opensearch")) { OpensearchRestHighLevelClientAdapter client = new OpensearchRestHighLevelClientAdapter(host, config); this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client); - this.client = client; } else { throw new RuntimeException("Sink type must be elasticsearch or opensearch"); } @@ -218,15 +215,10 @@ public void sync() { public void drop() { try { bulkProcessor.awaitClose(100, TimeUnit.SECONDS); - client.close(); } catch (Exception e) { throw io.grpc.Status.INTERNAL .withDescription(String.format(ERROR_REPORT_TEMPLATE, e.getMessage())) .asRuntimeException(); } } - - public RestHighLevelClientAdapter getClient() { - return this.client; - } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java index f962564c3f2a8..5f3773b0a91aa 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java @@ -30,7 +30,7 @@ import org.opensearch.client.RestHighLevelClient; import org.opensearch.core.action.ActionListener; -public class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter { +public class OpensearchRestHighLevelClientAdapter implements AutoCloseable { RestHighLevelClient opensearchClient; private static RestClientBuilder configureRestClientBuilder( diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java deleted file mode 100644 index dacddaf3704d2..0000000000000 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.risingwave.connector; - -import java.io.IOException; - -public interface RestHighLevelClientAdapter { - public void close() throws IOException; -} diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index c45d729bec6be..cdfb0322658ec 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -191,7 +191,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe | DataType::Jsonb | DataType::Bytea => Ok(()), DataType::List(list) => { - if (sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ + if is_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ Ok(()) } else{ Err(SinkError::Remote(anyhow!( @@ -202,7 +202,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe } }, DataType::Struct(_) => { - if sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME{ + if is_es_sink(sink_name){ Ok(()) }else{ Err(SinkError::Remote(anyhow!( From 85d9870d3cc66b0d12774c89469a389757a58c96 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 18 Jun 2024 16:14:59 +0800 Subject: [PATCH 7/8] fix comm --- src/connector/src/sink/remote.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index a3faf1310c12d..863ca02f69d59 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -264,9 +264,7 @@ impl RemoteLogSinker { sink_name: &str, ) -> Result { let sink_proto = sink_param.to_proto(); - let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME - || sink_name == OpensearchSink::SINK_NAME - { + let payload_schema = if is_es_sink(sink_name) { let columns = vec![ ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(), ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(), From 8eca24a36d474ba8e66a814d8a012d2726c4b0f8 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 18 Jun 2024 16:44:12 +0800 Subject: [PATCH 8/8] fix ci --- .../src/main/java/com/risingwave/connector/EsSinkFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index f6bd427b3aa17..03e888a892df3 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -87,7 +87,7 @@ public void validate( if (config.getConnector().equals("elasticsearch")) { ElasticRestHighLevelClientAdapter esClient = new ElasticRestHighLevelClientAdapter(host, config); - if (esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) { + if (!esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) { throw Status.INVALID_ARGUMENT .withDescription("Cannot connect to " + config.getUrl()) .asRuntimeException(); @@ -96,7 +96,7 @@ public void validate( } else if (config.getConnector().equals("opensearch")) { OpensearchRestHighLevelClientAdapter opensearchClient = new OpensearchRestHighLevelClientAdapter(host, config); - if (opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) { + if (!opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) { throw Status.INVALID_ARGUMENT .withDescription("Cannot connect to " + config.getUrl()) .asRuntimeException();