From ebc2848d255ff616be5e58f62c17b115c7a6756f Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Mon, 15 Apr 2024 15:56:14 +0800
Subject: [PATCH 1/8] save
---
.../com/risingwave/connector/SinkUtils.java | 1 +
.../risingwave-sink-es-7/pom.xml | 17 +++
.../connector/BulkProcessorAdapter.java | 11 ++
.../connector/BulkRequestConsumerFactory.java | 8 +-
.../ElasticBulkProcessorAdapter.java | 101 +++++++++++++
.../java/com/risingwave/connector/EsSink.java | 143 ++----------------
.../risingwave/connector/EsSinkFactory.java | 31 ++--
.../OpensearchBulkProcessorAdapter.java | 101 +++++++++++++
.../connector/RestHighLevelClientAdapter.java | 126 +++++++++++++++
src/connector/src/sink/elasticsearch.rs | 4 +-
src/connector/src/sink/mod.rs | 1 +
src/connector/src/sink/remote.rs | 9 +-
12 files changed, 397 insertions(+), 156 deletions(-)
create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
index 679deedebcabf..73f0799e44d1d 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
@@ -40,6 +40,7 @@ public static SinkFactory getSinkFactory(String sinkName) {
case "jdbc":
return new JDBCSinkFactory();
case "elasticsearch":
+ case "opensearch":
return new EsSinkFactory();
case "cassandra":
return new CassandraFactory();
diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml
index 9c8515098d7d8..b6da9a49a0e8b 100644
--- a/java/connector-node/risingwave-sink-es-7/pom.xml
+++ b/java/connector-node/risingwave-sink-es-7/pom.xml
@@ -51,6 +51,23 @@
org.elasticsearch.client
elasticsearch-rest-high-level-client
+
+ org.opensearch
+ opensearch
+ 2.12.0
+
+
+
+ org.opensearch.client
+ opensearch-rest-high-level-client
+ 2.12.0
+
+
+ org.apache.httpcomponents
+ httpcore-nio
+
+
+
org.apache.httpcomponents
httpclient
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
new file mode 100644
index 0000000000000..6b27e28b647a3
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
@@ -0,0 +1,11 @@
+package com.risingwave.connector;
+
+import java.util.concurrent.TimeUnit;
+
+public interface BulkProcessorAdapter {
+ public void add(Object request);
+
+ public void flush();
+
+ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
index e26248b5fef74..6da55bd5feb8f 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
@@ -25,5 +25,11 @@
* {@link BulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API calls
* across different Elasticsearch versions.
*/
-interface BulkRequestConsumerFactory
+interface ElasticBulkRequestConsumerFactory
extends BiConsumer> {}
+
+interface OpenSearchBulkRequestConsumerFactory
+ extends BiConsumer<
+ org.opensearch.action.bulk.BulkRequest,
+ org.opensearch.core.action.ActionListener<
+ org.opensearch.action.bulk.BulkResponse>> {}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
new file mode 100644
index 0000000000000..9538d6007ce50
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
@@ -0,0 +1,101 @@
+package com.risingwave.connector;
+
+import com.risingwave.connector.EsSink.RequestTracker;
+import java.util.concurrent.TimeUnit;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter {
+ private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
+ BulkProcessor esBulkProcessor;
+
+ private class BulkListener implements BulkProcessor.Listener {
+ private final RequestTracker requestTracker;
+
+ public BulkListener(RequestTracker requestTracker) {
+ this.requestTracker = requestTracker;
+ }
+
+ /** This method is called just before bulk is executed. */
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+ LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
+ }
+
+ /** This method is called after bulk execution. */
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ if (response.hasFailures()) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), response.buildFailureMessage());
+ this.requestTracker.addErrResult(errMessage);
+ } else {
+ this.requestTracker.addOkResult(request.numberOfActions());
+ LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
+ }
+ }
+
+ /** This method is called when the bulk failed and raised a Throwable */
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), failure.getMessage());
+ this.requestTracker.addErrResult(errMessage);
+ }
+ }
+
+ public ElasticBulkProcessorAdapter(
+ RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) {
+ BulkProcessor.Builder builder =
+ BulkProcessor.builder(
+ (ElasticBulkRequestConsumerFactory)
+ (bulkRequest, bulkResponseActionListener) ->
+ client.bulkAsync(
+ bulkRequest,
+ RequestOptions.DEFAULT,
+ bulkResponseActionListener),
+ new BulkListener(requestTracker));
+ // Possible feature: move these to config
+ // execute the bulk every 10 000 requests
+ builder.setBulkActions(1000);
+ // flush the bulk every 5mb
+ builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
+ // flush the bulk every 5 seconds whatever the number of requests
+ builder.setFlushInterval(TimeValue.timeValueSeconds(5));
+ // Set the number of concurrent requests
+ builder.setConcurrentRequests(1);
+ // Set a custom backoff policy which will initially wait for 100ms, increase exponentially
+ // and retries up to three times.
+ builder.setBackoffPolicy(
+ BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
+ this.esBulkProcessor = builder.build();
+ }
+
+ @Override
+ public void add(Object request) {
+ esBulkProcessor.add((IndexRequest) request);
+ }
+
+ @Override
+ public void flush() {
+ esBulkProcessor.flush();
+ }
+
+ @Override
+ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
+ esBulkProcessor.awaitClose(timeout, unit);
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
index cc5977a9c208c..88f807c61e403 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
@@ -25,24 +25,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.RestHighLevelClientBuilder;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,8 +49,8 @@ public class EsSink extends SinkWriterBase {
private static final String ERROR_REPORT_TEMPLATE = "Error message %s";
private final EsSinkConfig config;
- private BulkProcessor bulkProcessor;
- private final RestHighLevelClient client;
+ private BulkProcessorAdapter bulkProcessor;
+ private final RestHighLevelClientAdapter client;
// Used to handle the return message of ES and throw errors
private final RequestTracker requestTracker;
@@ -167,113 +151,18 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
this.requestTracker = new RequestTracker();
// ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever.
- this.client =
- new RestHighLevelClientBuilder(
- configureRestClientBuilder(RestClient.builder(host), config)
- .build())
- .setApiCompatibilityMode(true)
- .build();
- // Test connection
- try {
- boolean isConnected = this.client.ping(RequestOptions.DEFAULT);
- if (!isConnected) {
- throw Status.INVALID_ARGUMENT
- .withDescription("Cannot connect to " + config.getUrl())
- .asRuntimeException();
- }
- } catch (Exception e) {
- throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
- }
- this.bulkProcessor = createBulkProcessor(this.requestTracker);
- }
-
- private static RestClientBuilder configureRestClientBuilder(
- RestClientBuilder builder, EsSinkConfig config) {
- // Possible config:
- // 1. Connection path prefix
- // 2. Username and password
- if (config.getPassword() != null && config.getUsername() != null) {
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
- builder.setHttpClientConfigCallback(
- httpClientBuilder ->
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
- }
- // 3. Timeout
- return builder;
- }
-
- private BulkProcessor.Builder applyBulkConfig(
- RestHighLevelClient client, EsSinkConfig config, BulkProcessor.Listener listener) {
- BulkProcessor.Builder builder =
- BulkProcessor.builder(
- (BulkRequestConsumerFactory)
- (bulkRequest, bulkResponseActionListener) ->
- client.bulkAsync(
- bulkRequest,
- RequestOptions.DEFAULT,
- bulkResponseActionListener),
- listener);
- // Possible feature: move these to config
- // execute the bulk every 10 000 requests
- builder.setBulkActions(1000);
- // flush the bulk every 5mb
- builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
- // flush the bulk every 5 seconds whatever the number of requests
- builder.setFlushInterval(TimeValue.timeValueSeconds(5));
- // Set the number of concurrent requests
- builder.setConcurrentRequests(1);
- // Set a custom backoff policy which will initially wait for 100ms, increase exponentially
- // and retries up to three times.
- builder.setBackoffPolicy(
- BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
- return builder;
- }
-
- private BulkProcessor createBulkProcessor(RequestTracker requestTracker) {
- BulkProcessor.Builder builder =
- applyBulkConfig(this.client, this.config, new BulkListener(requestTracker));
- return builder.build();
- }
-
- private class BulkListener implements BulkProcessor.Listener {
- private final RequestTracker requestTracker;
-
- public BulkListener(RequestTracker requestTracker) {
- this.requestTracker = requestTracker;
- }
-
- /** This method is called just before bulk is executed. */
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
- LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
- }
-
- /** This method is called after bulk execution. */
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- if (response.hasFailures()) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), response.buildFailureMessage());
- this.requestTracker.addErrResult(errMessage);
- } else {
- this.requestTracker.addOkResult(request.numberOfActions());
- LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
- }
- }
-
- /** This method is called when the bulk failed and raised a Throwable */
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), failure.getMessage());
- this.requestTracker.addErrResult(errMessage);
+ if (config.getConnector().equals("elasticsearch")) {
+ ElasticRestHighLevelClientAdapter client =
+ new ElasticRestHighLevelClientAdapter(host, config);
+ this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client);
+ this.client = client;
+ } else if (config.getConnector().equals("opensearch")) {
+ OpensearchRestHighLevelClientAdapter client =
+ new OpensearchRestHighLevelClientAdapter(host, config);
+ this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client);
+ this.client = client;
+ } else {
+ throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
}
@@ -360,8 +249,4 @@ public void drop() {
.asRuntimeException();
}
}
-
- public RestHighLevelClient getClient() {
- return client;
- }
}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
index f3fa3bfa16c3b..11888204e0b1f 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
@@ -26,14 +26,7 @@
import java.io.IOException;
import java.util.Map;
import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,21 +82,19 @@ public void validate(
}
}
+ RestHighLevelClientAdapter client;
// 2. check connection
- RestClientBuilder builder = RestClient.builder(host);
- if (config.getPassword() != null && config.getUsername() != null) {
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
- builder.setHttpClientConfigCallback(
- httpClientBuilder ->
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
- }
- RestHighLevelClient client = new RestHighLevelClient(builder);
- // Test connection
try {
- boolean isConnected = client.ping(RequestOptions.DEFAULT);
+ boolean isConnected;
+ if (config.getConnector().equals("elasticsearch")) {
+ client = new ElasticRestHighLevelClientAdapter(host, config);
+ isConnected = client.ping(RequestOptions.DEFAULT);
+ } else if (config.getConnector().equals("opensearch")) {
+ client = new OpensearchRestHighLevelClientAdapter(host, config);
+ isConnected = client.ping(org.opensearch.client.RequestOptions.DEFAULT);
+ } else {
+ throw new RuntimeException("Sink type must be elasticsearch or opensearch");
+ }
if (!isConnected) {
throw Status.INVALID_ARGUMENT
.withDescription("Cannot connect to " + config.getUrl())
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
new file mode 100644
index 0000000000000..8c16299711e2f
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
@@ -0,0 +1,101 @@
+package com.risingwave.connector;
+
+import com.risingwave.connector.EsSink.RequestTracker;
+import java.util.concurrent.TimeUnit;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.action.bulk.BulkProcessor;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.core.common.unit.ByteSizeUnit;
+import org.opensearch.core.common.unit.ByteSizeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter {
+ private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
+ BulkProcessor opensearchBulkProcessor;
+
+ private class BulkListener implements BulkProcessor.Listener {
+ private final RequestTracker requestTracker;
+
+ public BulkListener(RequestTracker requestTracker) {
+ this.requestTracker = requestTracker;
+ }
+
+ /** This method is called just before bulk is executed. */
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+ LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
+ }
+
+ /** This method is called after bulk execution. */
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ if (response.hasFailures()) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), response.buildFailureMessage());
+ this.requestTracker.addErrResult(errMessage);
+ } else {
+ this.requestTracker.addOkResult(request.numberOfActions());
+ LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
+ }
+ }
+
+ /** This method is called when the bulk failed and raised a Throwable */
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), failure.getMessage());
+ this.requestTracker.addErrResult(errMessage);
+ }
+ }
+
+ public OpensearchBulkProcessorAdapter(
+ RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) {
+ BulkProcessor.Builder builder =
+ BulkProcessor.builder(
+ (OpenSearchBulkRequestConsumerFactory)
+ (bulkRequest, bulkResponseActionListener) ->
+ client.bulkAsync(
+ bulkRequest,
+ RequestOptions.DEFAULT,
+ bulkResponseActionListener),
+ new BulkListener(requestTracker));
+ // Possible feature: move these to config
+ // execute the bulk every 10 000 requests
+ builder.setBulkActions(1000);
+ // flush the bulk every 5mb
+ builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
+ // flush the bulk every 5 seconds whatever the number of requests
+ builder.setFlushInterval(TimeValue.timeValueSeconds(5));
+ // Set the number of concurrent requests
+ builder.setConcurrentRequests(1);
+ // Set a custom backoff policy which will initially wait for 100ms, increase exponentially
+ // and retries up to three times.
+ builder.setBackoffPolicy(
+ BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
+ this.opensearchBulkProcessor = builder.build();
+ }
+
+ @Override
+ public void add(Object request) {
+ opensearchBulkProcessor.add((IndexRequest) request);
+ }
+
+ @Override
+ public void flush() {
+ opensearchBulkProcessor.flush();
+ }
+
+ @Override
+ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
+ opensearchBulkProcessor.awaitClose(timeout, unit);
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
new file mode 100644
index 0000000000000..f656537798819
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
@@ -0,0 +1,126 @@
+package com.risingwave.connector;
+
+import java.io.IOException;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.client.RestHighLevelClientBuilder;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.client.Cancellable;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.core.action.ActionListener;
+
+public interface RestHighLevelClientAdapter {
+ public void close() throws IOException;
+
+ public boolean ping(Object options) throws IOException;
+}
+
+class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
+ org.elasticsearch.client.RestHighLevelClient esClient;
+
+ private static org.elasticsearch.client.RestClientBuilder configureRestClientBuilder(
+ org.elasticsearch.client.RestClientBuilder builder, EsSinkConfig config) {
+ // Possible config:
+ // 1. Connection path prefix
+ // 2. Username and password
+ if (config.getPassword() != null && config.getUsername() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
+ builder.setHttpClientConfigCallback(
+ httpClientBuilder ->
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ }
+ // 3. Timeout
+ return builder;
+ }
+
+ public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) {
+ this.esClient =
+ new RestHighLevelClientBuilder(
+ configureRestClientBuilder(
+ org.elasticsearch.client.RestClient.builder(host),
+ config)
+ .build())
+ .setApiCompatibilityMode(true)
+ .build();
+ }
+
+ @Override
+ public void close() throws IOException {
+ esClient.close();
+ }
+
+ @Override
+ public boolean ping(Object options) throws IOException {
+ boolean flag = esClient.ping((org.elasticsearch.client.RequestOptions) options);
+ return flag;
+ }
+
+ public org.elasticsearch.client.Cancellable bulkAsync(
+ org.elasticsearch.action.bulk.BulkRequest bulkRequest, org.elasticsearch.client.RequestOptions options, org.elasticsearch.action.ActionListener<
+ org.elasticsearch.action.bulk.BulkResponse> listener) {
+ org.elasticsearch.client.Cancellable cancellable =
+ esClient.bulkAsync(
+ bulkRequest,
+ options,
+ listener);
+ return cancellable;
+ }
+}
+
+class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
+ RestHighLevelClient opensearchClient;
+
+ private static RestClientBuilder configureRestClientBuilder(
+ RestClientBuilder builder, EsSinkConfig config) {
+ // Possible config:
+ // 1. Connection path prefix
+ // 2. Username and password
+ if (config.getPassword() != null && config.getUsername() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
+ builder.setHttpClientConfigCallback(
+ httpClientBuilder ->
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ }
+ // 3. Timeout
+ return builder;
+ }
+
+ public OpensearchRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) {
+ this.opensearchClient =
+ new RestHighLevelClient(
+ configureRestClientBuilder(RestClient.builder(host), config));
+ }
+
+ @Override
+ public void close() throws IOException {
+ opensearchClient.close();
+ }
+
+ @Override
+ public boolean ping(Object options) throws IOException {
+ boolean flag = opensearchClient.ping((RequestOptions) options);
+ return flag;
+ }
+
+ public Cancellable bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener listener) {
+ Cancellable cancellable =
+ opensearchClient.bulkAsync(
+ bulkRequest,
+ options,
+ listener);
+ return cancellable;
+ }
+}
diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs
index 578c768f1ce30..24f15e2537fdc 100644
--- a/src/connector/src/sink/elasticsearch.rs
+++ b/src/connector/src/sink/elasticsearch.rs
@@ -24,7 +24,7 @@ use risingwave_common::types::{JsonbVal, Scalar, ToText};
use serde_json::Value;
use super::encoder::{JsonEncoder, RowEncoder};
-use super::remote::ElasticSearchSink;
+use super::remote::{ElasticSearchSink, OpensearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
@@ -40,7 +40,7 @@ impl StreamChunkConverter {
pk_indices: &Vec,
properties: &HashMap,
) -> Result {
- if sink_name == ElasticSearchSink::SINK_NAME {
+ if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME {
let index_column = properties
.get(ES_OPTION_INDEX_COLUMN)
.cloned()
diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs
index c430b4303f1e9..395829fc8d501 100644
--- a/src/connector/src/sink/mod.rs
+++ b/src/connector/src/sink/mod.rs
@@ -89,6 +89,7 @@ macro_rules! for_all_sinks {
{ Nats, $crate::sink::nats::NatsSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
+ { Opensearch, $crate::sink::remote::OpensearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index 4d3c48b6ec5c7..31bf0a92a86ac 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -73,6 +73,7 @@ macro_rules! def_remote_sink {
() => {
def_remote_sink! {
{ ElasticSearch, ElasticSearchSink, "elasticsearch" }
+ { Opensearch, OpensearchSink, "opensearch"}
{ Cassandra, CassandraSink, "cassandra" }
{ Jdbc, JdbcSink, "jdbc", |desc| {
desc.sink_type.is_append_only()
@@ -165,7 +166,7 @@ impl Sink for RemoteSink {
}
async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
- if sink_name == ElasticSearchSink::SINK_NAME
+ if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME
&& param.downstream_pk.len() > 1
&& param.properties.get(ES_OPTION_DELIMITER).is_none()
{
@@ -190,7 +191,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
| DataType::Jsonb
| DataType::Bytea => Ok(()),
DataType::List(list) => {
- if (sink_name==ElasticSearchSink::SINK_NAME) | matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
+ if (sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) | matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
Ok(())
} else{
Err(SinkError::Remote(anyhow!(
@@ -201,7 +202,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
}
},
DataType::Struct(_) => {
- if sink_name==ElasticSearchSink::SINK_NAME{
+ if sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME{
Ok(())
}else{
Err(SinkError::Remote(anyhow!(
@@ -264,7 +265,7 @@ impl RemoteLogSinker {
sink_name: &str,
) -> Result {
let sink_proto = sink_param.to_proto();
- let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME {
+ let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME {
let columns = vec![
ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
From 1746451b28f11630dc065acc09952cd4ed36a680 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Tue, 16 Apr 2024 15:02:42 +0800
Subject: [PATCH 2/8] support
---
.../sink/elasticsearch/EsSinkTest.java | 7 +--
.../risingwave-sink-es-7/pom.xml | 9 ----
.../connector/BulkProcessorAdapter.java | 6 ++-
.../connector/BulkRequestConsumerFactory.java | 10 +++--
.../ElasticBulkProcessorAdapter.java | 42 +++++++++++++++---
.../java/com/risingwave/connector/EsSink.java | 42 +++---------------
.../OpensearchBulkProcessorAdapter.java | 44 ++++++++++++++++---
.../connector/RestHighLevelClientAdapter.java | 38 +++++++++++-----
java/pom.xml | 19 +++++++-
src/connector/src/sink/remote.rs | 8 ++--
10 files changed, 144 insertions(+), 81 deletions(-)
diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
index 509f71ec1e569..f2894d2e790a2 100644
--- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
+++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.Lists;
import com.risingwave.connector.EsSink;
import com.risingwave.connector.EsSinkConfig;
+import com.risingwave.connector.RestHighLevelClientAdapter;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.ArraySinkRow;
import com.risingwave.proto.Data;
@@ -31,7 +32,6 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@@ -74,12 +74,13 @@ public void testEsSink(ElasticsearchContainer container, String username, String
fail(e.getMessage());
}
- RestHighLevelClient client = sink.getClient();
+ RestHighLevelClientAdapter client = sink.getClient();
SearchRequest searchRequest = new SearchRequest("test");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(searchSourceBuilder);
- SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
+ SearchResponse searchResponse =
+ (SearchResponse) client.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
assertEquals(2, hits.getHits().length);
diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml
index b6da9a49a0e8b..4ff4bd76ef109 100644
--- a/java/connector-node/risingwave-sink-es-7/pom.xml
+++ b/java/connector-node/risingwave-sink-es-7/pom.xml
@@ -54,19 +54,10 @@
org.opensearch
opensearch
- 2.12.0
-
org.opensearch.client
opensearch-rest-high-level-client
- 2.12.0
-
-
- org.apache.httpcomponents
- httpcore-nio
-
-
org.apache.httpcomponents
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
index 6b27e28b647a3..5cb743f05344e 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
@@ -1,9 +1,13 @@
package com.risingwave.connector;
+import com.risingwave.connector.EsSink.RequestTracker;
+import com.risingwave.connector.api.sink.SinkRow;
import java.util.concurrent.TimeUnit;
public interface BulkProcessorAdapter {
- public void add(Object request);
+ public void addRow(SinkRow row, String indexName, RequestTracker requestTracker);
+
+ public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker);
public void flush();
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
index 6da55bd5feb8f..ac7fc45c6b030 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
@@ -22,13 +22,17 @@
import org.elasticsearch.action.bulk.BulkResponse;
/**
- * {@link BulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API calls
- * across different Elasticsearch versions.
+ * {@link ElasticBulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API
+ * calls across different Elasticsearch versions.
*/
interface ElasticBulkRequestConsumerFactory
extends BiConsumer> {}
-interface OpenSearchBulkRequestConsumerFactory
+/**
+ * {@link ElasticBulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API
+ * calls across different Elasticsearch versions.
+ */
+interface OpensearchBulkRequestConsumerFactory
extends BiConsumer<
org.opensearch.action.bulk.BulkRequest,
org.opensearch.core.action.ActionListener<
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
index 9538d6007ce50..e9a362a2773fe 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
@@ -1,16 +1,19 @@
package com.risingwave.connector;
import com.risingwave.connector.EsSink.RequestTracker;
+import com.risingwave.connector.api.sink.SinkRow;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,11 +87,6 @@ public ElasticBulkProcessorAdapter(
this.esBulkProcessor = builder.build();
}
- @Override
- public void add(Object request) {
- esBulkProcessor.add((IndexRequest) request);
- }
-
@Override
public void flush() {
esBulkProcessor.flush();
@@ -98,4 +96,36 @@ public void flush() {
public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
esBulkProcessor.awaitClose(timeout, unit);
}
+
+ @Override
+ public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) {
+ final String index = (String) row.get(0);
+ final String key = (String) row.get(1);
+ String doc = (String) row.get(2);
+
+ UpdateRequest updateRequest;
+ if (indexName != null) {
+ updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON);
+ } else {
+ updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON);
+ }
+ updateRequest.docAsUpsert(true);
+ requestTracker.addWriteTask();
+ this.esBulkProcessor.add(updateRequest);
+ }
+
+ @Override
+ public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) {
+ final String index = (String) row.get(0);
+ final String key = (String) row.get(1);
+
+ DeleteRequest deleteRequest;
+ if (indexName != null) {
+ deleteRequest = new DeleteRequest(indexName, "_doc", key);
+ } else {
+ deleteRequest = new DeleteRequest(index, "_doc", key);
+ }
+ requestTracker.addWriteTask();
+ this.esBulkProcessor.add(deleteRequest);
+ }
}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
index 88f807c61e403..b29e74902be45 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
@@ -25,9 +25,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,46 +163,15 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
}
}
- private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException {
- final String index = (String) row.get(0);
- final String key = (String) row.get(1);
- String doc = (String) row.get(2);
-
- UpdateRequest updateRequest;
- if (config.getIndex() != null) {
- updateRequest =
- new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON);
- } else {
- updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
- }
- updateRequest.docAsUpsert(true);
- this.requestTracker.addWriteTask();
- bulkProcessor.add(updateRequest);
- }
-
- private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException {
- final String index = (String) row.get(0);
- final String key = (String) row.get(1);
-
- DeleteRequest deleteRequest;
- if (config.getIndex() != null) {
- deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key);
- } else {
- deleteRequest = new DeleteRequest(index, "_doc", key);
- }
- this.requestTracker.addWriteTask();
- bulkProcessor.add(deleteRequest);
- }
-
private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException {
switch (row.getOp()) {
case INSERT:
case UPDATE_INSERT:
- processUpsert(row);
+ this.bulkProcessor.addRow(row, config.getIndex(), requestTracker);
break;
case DELETE:
case UPDATE_DELETE:
- processDelete(row);
+ this.bulkProcessor.deleteRow(row, config.getIndex(), requestTracker);
break;
default:
throw Status.INVALID_ARGUMENT
@@ -249,4 +215,8 @@ public void drop() {
.asRuntimeException();
}
}
+
+ public RestHighLevelClientAdapter getClient() {
+ return this.client;
+ }
}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
index 8c16299711e2f..9bf0b90d9bcc8 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
@@ -1,14 +1,17 @@
package com.risingwave.connector;
import com.risingwave.connector.EsSink.RequestTracker;
+import com.risingwave.connector.api.sink.SinkRow;
import java.util.concurrent.TimeUnit;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
-import org.opensearch.action.index.IndexRequest;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.slf4j.Logger;
@@ -61,7 +64,7 @@ public OpensearchBulkProcessorAdapter(
RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
- (OpenSearchBulkRequestConsumerFactory)
+ (OpensearchBulkRequestConsumerFactory)
(bulkRequest, bulkResponseActionListener) ->
client.bulkAsync(
bulkRequest,
@@ -84,11 +87,6 @@ public OpensearchBulkProcessorAdapter(
this.opensearchBulkProcessor = builder.build();
}
- @Override
- public void add(Object request) {
- opensearchBulkProcessor.add((IndexRequest) request);
- }
-
@Override
public void flush() {
opensearchBulkProcessor.flush();
@@ -98,4 +96,36 @@ public void flush() {
public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
opensearchBulkProcessor.awaitClose(timeout, unit);
}
+
+ @Override
+ public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) {
+ final String index = (String) row.get(0);
+ final String key = (String) row.get(1);
+ String doc = (String) row.get(2);
+
+ UpdateRequest updateRequest;
+ if (indexName != null) {
+ updateRequest = new UpdateRequest(indexName, key).doc(doc, XContentType.JSON);
+ } else {
+ updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON);
+ }
+ updateRequest.docAsUpsert(true);
+ requestTracker.addWriteTask();
+ this.opensearchBulkProcessor.add(updateRequest);
+ }
+
+ @Override
+ public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) {
+ final String index = (String) row.get(0);
+ final String key = (String) row.get(1);
+
+ DeleteRequest deleteRequest;
+ if (indexName != null) {
+ deleteRequest = new DeleteRequest(indexName, key);
+ } else {
+ deleteRequest = new DeleteRequest(index, key);
+ }
+ requestTracker.addWriteTask();
+ this.opensearchBulkProcessor.add(deleteRequest);
+ }
}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
index f656537798819..ec35b14dea32a 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
@@ -20,6 +20,8 @@ public interface RestHighLevelClientAdapter {
public void close() throws IOException;
public boolean ping(Object options) throws IOException;
+
+ public Object search(Object searchRequest, Object options) throws IOException;
}
class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
@@ -66,15 +68,21 @@ public boolean ping(Object options) throws IOException {
}
public org.elasticsearch.client.Cancellable bulkAsync(
- org.elasticsearch.action.bulk.BulkRequest bulkRequest, org.elasticsearch.client.RequestOptions options, org.elasticsearch.action.ActionListener<
- org.elasticsearch.action.bulk.BulkResponse> listener) {
+ org.elasticsearch.action.bulk.BulkRequest bulkRequest,
+ org.elasticsearch.client.RequestOptions options,
+ org.elasticsearch.action.ActionListener
+ listener) {
org.elasticsearch.client.Cancellable cancellable =
- esClient.bulkAsync(
- bulkRequest,
- options,
- listener);
+ esClient.bulkAsync(bulkRequest, options, listener);
return cancellable;
}
+
+ @Override
+ public Object search(Object searchRequest, Object options) throws IOException {
+ return this.esClient.search(
+ (org.elasticsearch.action.search.SearchRequest) searchRequest,
+ (org.elasticsearch.client.RequestOptions) options);
+ }
}
class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
@@ -115,12 +123,18 @@ public boolean ping(Object options) throws IOException {
return flag;
}
- public Cancellable bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener listener) {
- Cancellable cancellable =
- opensearchClient.bulkAsync(
- bulkRequest,
- options,
- listener);
+ public Cancellable bulkAsync(
+ BulkRequest bulkRequest,
+ RequestOptions options,
+ ActionListener listener) {
+ Cancellable cancellable = opensearchClient.bulkAsync(bulkRequest, options, listener);
return cancellable;
}
+
+ @Override
+ public Object search(Object searchRequest, Object options) throws IOException {
+ return this.opensearchClient.search(
+ (org.opensearch.action.search.SearchRequest) searchRequest,
+ (org.opensearch.client.RequestOptions) options);
+ }
}
diff --git a/java/pom.xml b/java/pom.xml
index 89df5b870077b..822bfea2fbe24 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -77,10 +77,11 @@
1.10.0
3.12.0
2.4.2.Final
- 2.13.5
+ 2.15.0
3.3.1
3.3.3
7.17.19
+ 2.11.1
4.15.0
1.18.0
1.17.6
@@ -197,6 +198,22 @@
elasticsearch-rest-high-level-client
${elasticsearch.version}
+
+ org.opensearch
+ opensearch
+ ${opensearch.version}
+
+
+ org.opensearch.client
+ opensearch-rest-high-level-client
+ ${opensearch.version}
+
+
+ org.apache.httpcomponents
+ httpcore-nio
+
+
+
io.grpc
grpc-netty-shaded
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index 31bf0a92a86ac..cd666242af06a 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -166,7 +166,7 @@ impl Sink for RemoteSink {
}
async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
- if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME
+ if (sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME)
&& param.downstream_pk.len() > 1
&& param.properties.get(ES_OPTION_DELIMITER).is_none()
{
@@ -191,7 +191,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
| DataType::Jsonb
| DataType::Bytea => Ok(()),
DataType::List(list) => {
- if (sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) | matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
+ if (sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
Ok(())
} else{
Err(SinkError::Remote(anyhow!(
@@ -265,7 +265,9 @@ impl RemoteLogSinker {
sink_name: &str,
) -> Result {
let sink_proto = sink_param.to_proto();
- let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME {
+ let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME
+ || sink_name == OpensearchSink::SINK_NAME
+ {
let columns = vec![
ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
From f4caee5bd13473b003c8e8e4a235b8ccabc69712 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Wed, 17 Apr 2024 17:43:26 +0800
Subject: [PATCH 3/8] fmt
---
.../connector/BulkProcessorAdapter.java | 16 ++++++++++
.../ElasticBulkProcessorAdapter.java | 16 ++++++++++
.../java/com/risingwave/connector/EsSink.java | 29 ++++++++++--------
.../risingwave/connector/EsSinkFactory.java | 29 ++++++++++--------
.../OpensearchBulkProcessorAdapter.java | 16 ++++++++++
.../connector/RestHighLevelClientAdapter.java | 16 ++++++++++
java/pom.xml | 30 +++++++++----------
7 files changed, 111 insertions(+), 41 deletions(-)
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
index 5cb743f05344e..a4acc5e1e7e7d 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.risingwave.connector;
import com.risingwave.connector.EsSink.RequestTracker;
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
index e9a362a2773fe..16037caf4ca76 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.risingwave.connector;
import com.risingwave.connector.EsSink.RequestTracker;
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
index b29e74902be45..613e01206a323 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
@@ -1,16 +1,19 @@
-// Copyright 2024 RisingWave Labs
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.risingwave.connector;
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
index 11888204e0b1f..7cf741a66f87c 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
@@ -1,16 +1,19 @@
-// Copyright 2024 RisingWave Labs
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.risingwave.connector;
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
index 9bf0b90d9bcc8..4ae1c430071d1 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.risingwave.connector;
import com.risingwave.connector.EsSink.RequestTracker;
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
index ec35b14dea32a..d441719efe3c6 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.risingwave.connector;
import java.io.IOException;
diff --git a/java/pom.xml b/java/pom.xml
index 822bfea2fbe24..67d33b9800a00 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -199,21 +199,21 @@
${elasticsearch.version}
- org.opensearch
- opensearch
- ${opensearch.version}
-
-
- org.opensearch.client
- opensearch-rest-high-level-client
- ${opensearch.version}
-
-
- org.apache.httpcomponents
- httpcore-nio
-
-
-
+ org.opensearch
+ opensearch
+ ${opensearch.version}
+
+
+ org.opensearch.client
+ opensearch-rest-high-level-client
+ ${opensearch.version}
+
+
+ org.apache.httpcomponents
+ httpcore-nio
+
+
+
io.grpc
grpc-netty-shaded
From e6b07a74d07607d76357c868c29bd7ebfdecbcbd Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Fri, 24 May 2024 14:20:19 +0800
Subject: [PATCH 4/8] fix com
fix
fix
---
.../sink/elasticsearch/EsSinkTest.java | 8 +-
.../connector/BulkProcessorAdapter.java | 6 +-
.../connector/BulkRequestConsumerFactory.java | 39 -----
.../ElasticBulkProcessorAdapter.java | 154 ++++++++++--------
.../ElasticRestHighLevelClientAdapter.java | 89 ++++++++++
.../java/com/risingwave/connector/EsSink.java | 13 +-
.../risingwave/connector/EsSinkFactory.java | 37 ++---
.../OpensearchBulkProcessorAdapter.java | 82 ++--------
.../OpensearchRestHighLevelClientAdapter.java | 78 +++++++++
.../connector/RestHighLevelClientAdapter.java | 133 ---------------
src/connector/src/sink/elasticsearch.rs | 6 +-
src/connector/src/sink/remote.rs | 4 +-
12 files changed, 306 insertions(+), 343 deletions(-)
delete mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java
create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java
diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
index f2894d2e790a2..79636d5590ca6 100644
--- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
+++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
@@ -19,9 +19,9 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.risingwave.connector.ElasticRestHighLevelClientAdapter;
import com.risingwave.connector.EsSink;
import com.risingwave.connector.EsSinkConfig;
-import com.risingwave.connector.RestHighLevelClientAdapter;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.ArraySinkRow;
import com.risingwave.proto.Data;
@@ -74,13 +74,13 @@ public void testEsSink(ElasticsearchContainer container, String username, String
fail(e.getMessage());
}
- RestHighLevelClientAdapter client = sink.getClient();
+ ElasticRestHighLevelClientAdapter client =
+ (ElasticRestHighLevelClientAdapter) sink.getClient();
SearchRequest searchRequest = new SearchRequest("test");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(searchSourceBuilder);
- SearchResponse searchResponse =
- (SearchResponse) client.search(searchRequest, RequestOptions.DEFAULT);
+ SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
assertEquals(2, hits.getHits().length);
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
index a4acc5e1e7e7d..d72ebe2833953 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
@@ -16,14 +16,12 @@
package com.risingwave.connector;
-import com.risingwave.connector.EsSink.RequestTracker;
-import com.risingwave.connector.api.sink.SinkRow;
import java.util.concurrent.TimeUnit;
public interface BulkProcessorAdapter {
- public void addRow(SinkRow row, String indexName, RequestTracker requestTracker);
+ public void addRow(String index, String key, String doc);
- public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker);
+ public void deleteRow(String index, String key);
public void flush();
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
deleted file mode 100644
index ac7fc45c6b030..0000000000000
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2024 RisingWave Labs
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.risingwave.connector;
-
-import java.util.function.BiConsumer;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-
-/**
- * {@link ElasticBulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API
- * calls across different Elasticsearch versions.
- */
-interface ElasticBulkRequestConsumerFactory
- extends BiConsumer> {}
-
-/**
- * {@link ElasticBulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API
- * calls across different Elasticsearch versions.
- */
-interface OpensearchBulkRequestConsumerFactory
- extends BiConsumer<
- org.opensearch.action.bulk.BulkRequest,
- org.opensearch.core.action.ActionListener<
- org.opensearch.action.bulk.BulkResponse>> {}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
index 16037caf4ca76..8e3c57911118f 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
@@ -17,12 +17,10 @@
package com.risingwave.connector;
import com.risingwave.connector.EsSink.RequestTracker;
-import com.risingwave.connector.api.sink.SinkRow;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
@@ -36,56 +34,17 @@
public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
BulkProcessor esBulkProcessor;
-
- private class BulkListener implements BulkProcessor.Listener {
- private final RequestTracker requestTracker;
-
- public BulkListener(RequestTracker requestTracker) {
- this.requestTracker = requestTracker;
- }
-
- /** This method is called just before bulk is executed. */
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
- LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
- }
-
- /** This method is called after bulk execution. */
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- if (response.hasFailures()) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), response.buildFailureMessage());
- this.requestTracker.addErrResult(errMessage);
- } else {
- this.requestTracker.addOkResult(request.numberOfActions());
- LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
- }
- }
-
- /** This method is called when the bulk failed and raised a Throwable */
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), failure.getMessage());
- this.requestTracker.addErrResult(errMessage);
- }
- }
+ private final RequestTracker requestTracker;
public ElasticBulkProcessorAdapter(
RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
- (ElasticBulkRequestConsumerFactory)
- (bulkRequest, bulkResponseActionListener) ->
- client.bulkAsync(
- bulkRequest,
- RequestOptions.DEFAULT,
- bulkResponseActionListener),
+ (bulkRequest, bulkResponseActionListener) ->
+ client.bulkAsync(
+ bulkRequest,
+ RequestOptions.DEFAULT,
+ bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
@@ -101,6 +60,7 @@ public ElasticBulkProcessorAdapter(
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.esBulkProcessor = builder.build();
+ this.requestTracker = requestTracker;
}
@Override
@@ -114,34 +74,94 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}
@Override
- public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) {
- final String index = (String) row.get(0);
- final String key = (String) row.get(1);
- String doc = (String) row.get(2);
-
+ public void addRow(String index, String key, String doc) {
UpdateRequest updateRequest;
- if (indexName != null) {
- updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON);
- } else {
- updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON);
- }
+ updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
updateRequest.docAsUpsert(true);
- requestTracker.addWriteTask();
+ this.requestTracker.addWriteTask();
this.esBulkProcessor.add(updateRequest);
}
@Override
- public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) {
- final String index = (String) row.get(0);
- final String key = (String) row.get(1);
-
+ public void deleteRow(String index, String key) {
DeleteRequest deleteRequest;
- if (indexName != null) {
- deleteRequest = new DeleteRequest(indexName, "_doc", key);
+ deleteRequest = new DeleteRequest(index, "_doc", key);
+ this.requestTracker.addWriteTask();
+ this.esBulkProcessor.add(deleteRequest);
+ }
+}
+
+class BulkListener
+ implements org.elasticsearch.action.bulk.BulkProcessor.Listener,
+ org.opensearch.action.bulk.BulkProcessor.Listener {
+ private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
+ private final RequestTracker requestTracker;
+
+ public BulkListener(RequestTracker requestTracker) {
+ this.requestTracker = requestTracker;
+ }
+
+ @Override
+ public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) {
+ LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
+ }
+
+ @Override
+ public void afterBulk(
+ long executionId,
+ org.elasticsearch.action.bulk.BulkRequest request,
+ org.elasticsearch.action.bulk.BulkResponse response) {
+ if (response.hasFailures()) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), response.buildFailureMessage());
+ this.requestTracker.addErrResult(errMessage);
} else {
- deleteRequest = new DeleteRequest(index, "_doc", key);
+ this.requestTracker.addOkResult(request.numberOfActions());
+ LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
}
- requestTracker.addWriteTask();
- this.esBulkProcessor.add(deleteRequest);
+ }
+
+ /** This method is called when the bulk failed and raised a Throwable */
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), failure.getMessage());
+ this.requestTracker.addErrResult(errMessage);
+ }
+
+ @Override
+ public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) {
+ LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions());
+ }
+
+ @Override
+ public void afterBulk(
+ long executionId,
+ org.opensearch.action.bulk.BulkRequest request,
+ org.opensearch.action.bulk.BulkResponse response) {
+ if (response.hasFailures()) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), response.buildFailureMessage());
+ this.requestTracker.addErrResult(errMessage);
+ } else {
+ this.requestTracker.addOkResult(request.numberOfActions());
+ LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions());
+ }
+ }
+
+ @Override
+ public void afterBulk(
+ long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), failure.getMessage());
+ this.requestTracker.addErrResult(errMessage);
}
}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java
new file mode 100644
index 0000000000000..65e531bf8ea13
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import java.io.IOException;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.RestHighLevelClientBuilder;
+
+public class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
+ RestHighLevelClient esClient;
+
+ private static RestClientBuilder configureRestClientBuilder(
+ RestClientBuilder builder, EsSinkConfig config) {
+ // Possible config:
+ // 1. Connection path prefix
+ // 2. Username and password
+ if (config.getPassword() != null && config.getUsername() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
+ builder.setHttpClientConfigCallback(
+ httpClientBuilder ->
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ }
+ // 3. Timeout
+ return builder;
+ }
+
+ public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) {
+ this.esClient =
+ new RestHighLevelClientBuilder(
+ configureRestClientBuilder(RestClient.builder(host), config)
+ .build())
+ .setApiCompatibilityMode(true)
+ .build();
+ }
+
+ @Override
+ public void close() throws IOException {
+ esClient.close();
+ }
+
+ public boolean ping(RequestOptions options) throws IOException {
+ boolean flag = esClient.ping(options);
+ return flag;
+ }
+
+ public Cancellable bulkAsync(
+ BulkRequest bulkRequest,
+ RequestOptions options,
+ ActionListener listener) {
+ Cancellable cancellable = esClient.bulkAsync(bulkRequest, options, listener);
+ return cancellable;
+ }
+
+ public SearchResponse search(SearchRequest searchRequest, RequestOptions options)
+ throws IOException {
+ return this.esClient.search(searchRequest, options);
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
index 613e01206a323..3b03e24899056 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
@@ -14,7 +14,6 @@
* limitations under the License.
*/
-
package com.risingwave.connector;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -167,14 +166,22 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
}
private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException {
+ final String key = (String) row.get(1);
+ String doc = (String) row.get(2);
+ final String index;
+ if (config.getIndex() == null) {
+ index = (String) row.get(0);
+ } else {
+ index = config.getIndex();
+ }
switch (row.getOp()) {
case INSERT:
case UPDATE_INSERT:
- this.bulkProcessor.addRow(row, config.getIndex(), requestTracker);
+ this.bulkProcessor.addRow(index, key, doc);
break;
case DELETE:
case UPDATE_DELETE:
- this.bulkProcessor.deleteRow(row, config.getIndex(), requestTracker);
+ this.bulkProcessor.deleteRow(index, key);
break;
default:
throw Status.INVALID_ARGUMENT
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
index 7cf741a66f87c..f6bd427b3aa17 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
@@ -14,7 +14,6 @@
* limitations under the License.
*/
-
package com.risingwave.connector;
import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -26,10 +25,8 @@
import com.risingwave.proto.Catalog;
import com.risingwave.proto.Data;
import io.grpc.Status;
-import java.io.IOException;
import java.util.Map;
import org.apache.http.HttpHost;
-import org.elasticsearch.client.RequestOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,33 +82,31 @@ public void validate(
}
}
- RestHighLevelClientAdapter client;
// 2. check connection
try {
- boolean isConnected;
if (config.getConnector().equals("elasticsearch")) {
- client = new ElasticRestHighLevelClientAdapter(host, config);
- isConnected = client.ping(RequestOptions.DEFAULT);
+ ElasticRestHighLevelClientAdapter esClient =
+ new ElasticRestHighLevelClientAdapter(host, config);
+ if (esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("Cannot connect to " + config.getUrl())
+ .asRuntimeException();
+ }
+ esClient.close();
} else if (config.getConnector().equals("opensearch")) {
- client = new OpensearchRestHighLevelClientAdapter(host, config);
- isConnected = client.ping(org.opensearch.client.RequestOptions.DEFAULT);
+ OpensearchRestHighLevelClientAdapter opensearchClient =
+ new OpensearchRestHighLevelClientAdapter(host, config);
+ if (opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("Cannot connect to " + config.getUrl())
+ .asRuntimeException();
+ }
+ opensearchClient.close();
} else {
throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
- if (!isConnected) {
- throw Status.INVALID_ARGUMENT
- .withDescription("Cannot connect to " + config.getUrl())
- .asRuntimeException();
- }
} catch (Exception e) {
throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
}
-
- // 3. close client
- try {
- client.close();
- } catch (IOException e) {
- throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
- }
}
}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
index 4ae1c430071d1..d5d8cdc3d237d 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
@@ -17,12 +17,9 @@
package com.risingwave.connector;
import com.risingwave.connector.EsSink.RequestTracker;
-import com.risingwave.connector.api.sink.SinkRow;
import java.util.concurrent.TimeUnit;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.bulk.BulkProcessor;
-import org.opensearch.action.bulk.BulkRequest;
-import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
@@ -35,57 +32,18 @@
public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
+ private final RequestTracker requestTracker;
BulkProcessor opensearchBulkProcessor;
- private class BulkListener implements BulkProcessor.Listener {
- private final RequestTracker requestTracker;
-
- public BulkListener(RequestTracker requestTracker) {
- this.requestTracker = requestTracker;
- }
-
- /** This method is called just before bulk is executed. */
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
- LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
- }
-
- /** This method is called after bulk execution. */
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- if (response.hasFailures()) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), response.buildFailureMessage());
- this.requestTracker.addErrResult(errMessage);
- } else {
- this.requestTracker.addOkResult(request.numberOfActions());
- LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
- }
- }
-
- /** This method is called when the bulk failed and raised a Throwable */
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), failure.getMessage());
- this.requestTracker.addErrResult(errMessage);
- }
- }
-
public OpensearchBulkProcessorAdapter(
RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
- (OpensearchBulkRequestConsumerFactory)
- (bulkRequest, bulkResponseActionListener) ->
- client.bulkAsync(
- bulkRequest,
- RequestOptions.DEFAULT,
- bulkResponseActionListener),
+ (bulkRequest, bulkResponseActionListener) ->
+ client.bulkAsync(
+ bulkRequest,
+ RequestOptions.DEFAULT,
+ bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
@@ -101,6 +59,7 @@ public OpensearchBulkProcessorAdapter(
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.opensearchBulkProcessor = builder.build();
+ this.requestTracker = requestTracker;
}
@Override
@@ -114,34 +73,19 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException
}
@Override
- public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) {
- final String index = (String) row.get(0);
- final String key = (String) row.get(1);
- String doc = (String) row.get(2);
-
+ public void addRow(String index, String key, String doc) {
UpdateRequest updateRequest;
- if (indexName != null) {
- updateRequest = new UpdateRequest(indexName, key).doc(doc, XContentType.JSON);
- } else {
- updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON);
- }
+ updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON);
updateRequest.docAsUpsert(true);
- requestTracker.addWriteTask();
+ this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(updateRequest);
}
@Override
- public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) {
- final String index = (String) row.get(0);
- final String key = (String) row.get(1);
-
+ public void deleteRow(String index, String key) {
DeleteRequest deleteRequest;
- if (indexName != null) {
- deleteRequest = new DeleteRequest(indexName, key);
- } else {
- deleteRequest = new DeleteRequest(index, key);
- }
- requestTracker.addWriteTask();
+ deleteRequest = new DeleteRequest(index, key);
+ this.requestTracker.addWriteTask();
this.opensearchBulkProcessor.add(deleteRequest);
}
}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java
new file mode 100644
index 0000000000000..f962564c3f2a8
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import java.io.IOException;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.client.Cancellable;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.core.action.ActionListener;
+
+public class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
+ RestHighLevelClient opensearchClient;
+
+ private static RestClientBuilder configureRestClientBuilder(
+ RestClientBuilder builder, EsSinkConfig config) {
+ // Possible config:
+ // 1. Connection path prefix
+ // 2. Username and password
+ if (config.getPassword() != null && config.getUsername() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
+ builder.setHttpClientConfigCallback(
+ httpClientBuilder ->
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ }
+ // 3. Timeout
+ return builder;
+ }
+
+ public OpensearchRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) {
+ this.opensearchClient =
+ new org.opensearch.client.RestHighLevelClient(
+ configureRestClientBuilder(
+ org.opensearch.client.RestClient.builder(host), config));
+ }
+
+ @Override
+ public void close() throws IOException {
+ opensearchClient.close();
+ }
+
+ public boolean ping(org.opensearch.client.RequestOptions options) throws IOException {
+ boolean flag = opensearchClient.ping(options);
+ return flag;
+ }
+
+ public Cancellable bulkAsync(
+ BulkRequest bulkRequest,
+ RequestOptions options,
+ ActionListener listener) {
+ Cancellable cancellable = opensearchClient.bulkAsync(bulkRequest, options, listener);
+ return cancellable;
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
index d441719efe3c6..dacddaf3704d2 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
@@ -17,140 +17,7 @@
package com.risingwave.connector;
import java.io.IOException;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.client.RestHighLevelClientBuilder;
-import org.opensearch.action.bulk.BulkRequest;
-import org.opensearch.action.bulk.BulkResponse;
-import org.opensearch.client.Cancellable;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.client.RestClient;
-import org.opensearch.client.RestClientBuilder;
-import org.opensearch.client.RestHighLevelClient;
-import org.opensearch.core.action.ActionListener;
public interface RestHighLevelClientAdapter {
public void close() throws IOException;
-
- public boolean ping(Object options) throws IOException;
-
- public Object search(Object searchRequest, Object options) throws IOException;
-}
-
-class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
- org.elasticsearch.client.RestHighLevelClient esClient;
-
- private static org.elasticsearch.client.RestClientBuilder configureRestClientBuilder(
- org.elasticsearch.client.RestClientBuilder builder, EsSinkConfig config) {
- // Possible config:
- // 1. Connection path prefix
- // 2. Username and password
- if (config.getPassword() != null && config.getUsername() != null) {
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
- builder.setHttpClientConfigCallback(
- httpClientBuilder ->
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
- }
- // 3. Timeout
- return builder;
- }
-
- public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) {
- this.esClient =
- new RestHighLevelClientBuilder(
- configureRestClientBuilder(
- org.elasticsearch.client.RestClient.builder(host),
- config)
- .build())
- .setApiCompatibilityMode(true)
- .build();
- }
-
- @Override
- public void close() throws IOException {
- esClient.close();
- }
-
- @Override
- public boolean ping(Object options) throws IOException {
- boolean flag = esClient.ping((org.elasticsearch.client.RequestOptions) options);
- return flag;
- }
-
- public org.elasticsearch.client.Cancellable bulkAsync(
- org.elasticsearch.action.bulk.BulkRequest bulkRequest,
- org.elasticsearch.client.RequestOptions options,
- org.elasticsearch.action.ActionListener
- listener) {
- org.elasticsearch.client.Cancellable cancellable =
- esClient.bulkAsync(bulkRequest, options, listener);
- return cancellable;
- }
-
- @Override
- public Object search(Object searchRequest, Object options) throws IOException {
- return this.esClient.search(
- (org.elasticsearch.action.search.SearchRequest) searchRequest,
- (org.elasticsearch.client.RequestOptions) options);
- }
-}
-
-class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
- RestHighLevelClient opensearchClient;
-
- private static RestClientBuilder configureRestClientBuilder(
- RestClientBuilder builder, EsSinkConfig config) {
- // Possible config:
- // 1. Connection path prefix
- // 2. Username and password
- if (config.getPassword() != null && config.getUsername() != null) {
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
- builder.setHttpClientConfigCallback(
- httpClientBuilder ->
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
- }
- // 3. Timeout
- return builder;
- }
-
- public OpensearchRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) {
- this.opensearchClient =
- new RestHighLevelClient(
- configureRestClientBuilder(RestClient.builder(host), config));
- }
-
- @Override
- public void close() throws IOException {
- opensearchClient.close();
- }
-
- @Override
- public boolean ping(Object options) throws IOException {
- boolean flag = opensearchClient.ping((RequestOptions) options);
- return flag;
- }
-
- public Cancellable bulkAsync(
- BulkRequest bulkRequest,
- RequestOptions options,
- ActionListener listener) {
- Cancellable cancellable = opensearchClient.bulkAsync(bulkRequest, options, listener);
- return cancellable;
- }
-
- @Override
- public Object search(Object searchRequest, Object options) throws IOException {
- return this.opensearchClient.search(
- (org.opensearch.action.search.SearchRequest) searchRequest,
- (org.opensearch.client.RequestOptions) options);
- }
}
diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs
index 24f15e2537fdc..b3cfe3b9a801b 100644
--- a/src/connector/src/sink/elasticsearch.rs
+++ b/src/connector/src/sink/elasticsearch.rs
@@ -40,7 +40,7 @@ impl StreamChunkConverter {
pk_indices: &Vec,
properties: &HashMap,
) -> Result {
- if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME {
+ if is_es_sink(sink_name) {
let index_column = properties
.get(ES_OPTION_INDEX_COLUMN)
.cloned()
@@ -170,3 +170,7 @@ impl EsStreamChunkConverter {
(self.fn_build_id)(row)
}
}
+
+pub fn is_es_sink(sink_name: &str) -> bool {
+ sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME
+}
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index cd666242af06a..c45d729bec6be 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -57,7 +57,7 @@ use tokio::task::spawn_blocking;
use tokio_stream::wrappers::ReceiverStream;
use tracing::warn;
-use super::elasticsearch::{StreamChunkConverter, ES_OPTION_DELIMITER};
+use super::elasticsearch::{is_es_sink, StreamChunkConverter, ES_OPTION_DELIMITER};
use crate::error::ConnectorResult;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::coordinate::CoordinatedSinkWriter;
@@ -166,7 +166,7 @@ impl Sink for RemoteSink {
}
async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
- if (sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME)
+ if is_es_sink(sink_name)
&& param.downstream_pk.len() > 1
&& param.properties.get(ES_OPTION_DELIMITER).is_none()
{
From c57df293e7bec5d385023f4c41cf071d6f4531d4 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Fri, 24 May 2024 15:00:49 +0800
Subject: [PATCH 5/8] fix ci
---
java/connector-node/risingwave-connector-test/pom.xml | 4 ++--
.../risingwave/connector/sink/elasticsearch/EsSinkTest.java | 1 +
2 files changed, 3 insertions(+), 2 deletions(-)
diff --git a/java/connector-node/risingwave-connector-test/pom.xml b/java/connector-node/risingwave-connector-test/pom.xml
index 14b1c7bd65fc0..d3d47b0bc4571 100644
--- a/java/connector-node/risingwave-connector-test/pom.xml
+++ b/java/connector-node/risingwave-connector-test/pom.xml
@@ -128,13 +128,13 @@
com.fasterxml.jackson.core
jackson-databind
- ${jackson.version}
+ 2.13.5
test
com.fasterxml.jackson.core
jackson-core
- ${jackson.version}
+ 2.13.5
test
diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
index 79636d5590ca6..3b58878295311 100644
--- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
+++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
@@ -55,6 +55,7 @@ public void testEsSink(ElasticsearchContainer container, String username, String
EsSink sink =
new EsSink(
new EsSinkConfig(container.getHttpHostAddress())
+ .setConnector("elasticsearch")
.withIndex("test")
.withDelimiter("$")
.withUsername(username)
From 5848441be1b4fddfefd9d388a65749a433ffd723 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Mon, 27 May 2024 17:01:06 +0800
Subject: [PATCH 6/8] fix comm
fix
fix ci
fix ci
---
.../sink/elasticsearch/EsSinkTest.java | 21 ++--
.../risingwave/connector/BulkListener.java | 97 +++++++++++++++++++
.../ElasticBulkProcessorAdapter.java | 76 ---------------
.../ElasticRestHighLevelClientAdapter.java | 2 +-
.../java/com/risingwave/connector/EsSink.java | 8 --
.../OpensearchRestHighLevelClientAdapter.java | 2 +-
.../connector/RestHighLevelClientAdapter.java | 23 -----
src/connector/src/sink/remote.rs | 4 +-
8 files changed, 112 insertions(+), 121 deletions(-)
create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java
delete mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
index 3b58878295311..d2873fac9d216 100644
--- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
+++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
@@ -29,6 +29,7 @@
import com.risingwave.proto.Data.Op;
import java.io.IOException;
import java.util.Map;
+import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
@@ -52,15 +53,14 @@ static TableSchema getTestTableSchema() {
public void testEsSink(ElasticsearchContainer container, String username, String password)
throws IOException {
- EsSink sink =
- new EsSink(
- new EsSinkConfig(container.getHttpHostAddress())
- .setConnector("elasticsearch")
- .withIndex("test")
- .withDelimiter("$")
- .withUsername(username)
- .withPassword(password),
- getTestTableSchema());
+ EsSinkConfig config =
+ new EsSinkConfig(container.getHttpHostAddress())
+ .withIndex("test")
+ .withDelimiter("$")
+ .withUsername(username)
+ .withPassword(password);
+ config.setConnector("elasticsearch");
+ EsSink sink = new EsSink(config, getTestTableSchema());
sink.write(
Iterators.forArray(
new ArraySinkRow(
@@ -75,8 +75,9 @@ public void testEsSink(ElasticsearchContainer container, String username, String
fail(e.getMessage());
}
+ HttpHost host = HttpHost.create(config.getUrl());
ElasticRestHighLevelClientAdapter client =
- (ElasticRestHighLevelClientAdapter) sink.getClient();
+ new ElasticRestHighLevelClientAdapter(host, config);
SearchRequest searchRequest = new SearchRequest("test");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java
new file mode 100644
index 0000000000000..4ce1165ba1baf
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import com.risingwave.connector.EsSink.RequestTracker;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BulkListener
+ implements org.elasticsearch.action.bulk.BulkProcessor.Listener,
+ org.opensearch.action.bulk.BulkProcessor.Listener {
+ private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
+ private final RequestTracker requestTracker;
+
+ public BulkListener(RequestTracker requestTracker) {
+ this.requestTracker = requestTracker;
+ }
+
+ @Override
+ public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) {
+ LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
+ }
+
+ @Override
+ public void afterBulk(
+ long executionId,
+ org.elasticsearch.action.bulk.BulkRequest request,
+ org.elasticsearch.action.bulk.BulkResponse response) {
+ if (response.hasFailures()) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), response.buildFailureMessage());
+ this.requestTracker.addErrResult(errMessage);
+ } else {
+ this.requestTracker.addOkResult(request.numberOfActions());
+ LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
+ }
+ }
+
+ /** This method is called when the bulk failed and raised a Throwable */
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), failure.getMessage());
+ this.requestTracker.addErrResult(errMessage);
+ }
+
+ @Override
+ public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) {
+ LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions());
+ }
+
+ @Override
+ public void afterBulk(
+ long executionId,
+ org.opensearch.action.bulk.BulkRequest request,
+ org.opensearch.action.bulk.BulkResponse response) {
+ if (response.hasFailures()) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), response.buildFailureMessage());
+ this.requestTracker.addErrResult(errMessage);
+ } else {
+ this.requestTracker.addOkResult(request.numberOfActions());
+ LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions());
+ }
+ }
+
+ @Override
+ public void afterBulk(
+ long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), failure.getMessage());
+ this.requestTracker.addErrResult(errMessage);
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
index 8e3c57911118f..de6ab3414f65a 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
@@ -20,7 +20,6 @@
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
@@ -90,78 +89,3 @@ public void deleteRow(String index, String key) {
this.esBulkProcessor.add(deleteRequest);
}
}
-
-class BulkListener
- implements org.elasticsearch.action.bulk.BulkProcessor.Listener,
- org.opensearch.action.bulk.BulkProcessor.Listener {
- private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
- private final RequestTracker requestTracker;
-
- public BulkListener(RequestTracker requestTracker) {
- this.requestTracker = requestTracker;
- }
-
- @Override
- public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) {
- LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
- }
-
- @Override
- public void afterBulk(
- long executionId,
- org.elasticsearch.action.bulk.BulkRequest request,
- org.elasticsearch.action.bulk.BulkResponse response) {
- if (response.hasFailures()) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), response.buildFailureMessage());
- this.requestTracker.addErrResult(errMessage);
- } else {
- this.requestTracker.addOkResult(request.numberOfActions());
- LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
- }
- }
-
- /** This method is called when the bulk failed and raised a Throwable */
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), failure.getMessage());
- this.requestTracker.addErrResult(errMessage);
- }
-
- @Override
- public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) {
- LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions());
- }
-
- @Override
- public void afterBulk(
- long executionId,
- org.opensearch.action.bulk.BulkRequest request,
- org.opensearch.action.bulk.BulkResponse response) {
- if (response.hasFailures()) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), response.buildFailureMessage());
- this.requestTracker.addErrResult(errMessage);
- } else {
- this.requestTracker.addOkResult(request.numberOfActions());
- LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions());
- }
- }
-
- @Override
- public void afterBulk(
- long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), failure.getMessage());
- this.requestTracker.addErrResult(errMessage);
- }
-}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java
index 65e531bf8ea13..c64def3bef8a7 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java
@@ -34,7 +34,7 @@
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;
-public class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
+public class ElasticRestHighLevelClientAdapter implements AutoCloseable {
RestHighLevelClient esClient;
private static RestClientBuilder configureRestClientBuilder(
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
index 3b03e24899056..315fc800a2ef0 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
@@ -49,7 +49,6 @@ public class EsSink extends SinkWriterBase {
private final EsSinkConfig config;
private BulkProcessorAdapter bulkProcessor;
- private final RestHighLevelClientAdapter client;
// Used to handle the return message of ES and throw errors
private final RequestTracker requestTracker;
@@ -154,12 +153,10 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
ElasticRestHighLevelClientAdapter client =
new ElasticRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client);
- this.client = client;
} else if (config.getConnector().equals("opensearch")) {
OpensearchRestHighLevelClientAdapter client =
new OpensearchRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client);
- this.client = client;
} else {
throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
@@ -218,15 +215,10 @@ public void sync() {
public void drop() {
try {
bulkProcessor.awaitClose(100, TimeUnit.SECONDS);
- client.close();
} catch (Exception e) {
throw io.grpc.Status.INTERNAL
.withDescription(String.format(ERROR_REPORT_TEMPLATE, e.getMessage()))
.asRuntimeException();
}
}
-
- public RestHighLevelClientAdapter getClient() {
- return this.client;
- }
}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java
index f962564c3f2a8..5f3773b0a91aa 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java
@@ -30,7 +30,7 @@
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.core.action.ActionListener;
-public class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
+public class OpensearchRestHighLevelClientAdapter implements AutoCloseable {
RestHighLevelClient opensearchClient;
private static RestClientBuilder configureRestClientBuilder(
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
deleted file mode 100644
index dacddaf3704d2..0000000000000
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2024 RisingWave Labs
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.risingwave.connector;
-
-import java.io.IOException;
-
-public interface RestHighLevelClientAdapter {
- public void close() throws IOException;
-}
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index c45d729bec6be..cdfb0322658ec 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -191,7 +191,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
| DataType::Jsonb
| DataType::Bytea => Ok(()),
DataType::List(list) => {
- if (sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
+ if is_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
Ok(())
} else{
Err(SinkError::Remote(anyhow!(
@@ -202,7 +202,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
}
},
DataType::Struct(_) => {
- if sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME{
+ if is_es_sink(sink_name){
Ok(())
}else{
Err(SinkError::Remote(anyhow!(
From 85d9870d3cc66b0d12774c89469a389757a58c96 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Tue, 18 Jun 2024 16:14:59 +0800
Subject: [PATCH 7/8] fix comm
---
src/connector/src/sink/remote.rs | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index a3faf1310c12d..863ca02f69d59 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -264,9 +264,7 @@ impl RemoteLogSinker {
sink_name: &str,
) -> Result {
let sink_proto = sink_param.to_proto();
- let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME
- || sink_name == OpensearchSink::SINK_NAME
- {
+ let payload_schema = if is_es_sink(sink_name) {
let columns = vec![
ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),
From 8eca24a36d474ba8e66a814d8a012d2726c4b0f8 Mon Sep 17 00:00:00 2001
From: xxhZs <1060434431@qq.com>
Date: Tue, 18 Jun 2024 16:44:12 +0800
Subject: [PATCH 8/8] fix ci
---
.../src/main/java/com/risingwave/connector/EsSinkFactory.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
index f6bd427b3aa17..03e888a892df3 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
@@ -87,7 +87,7 @@ public void validate(
if (config.getConnector().equals("elasticsearch")) {
ElasticRestHighLevelClientAdapter esClient =
new ElasticRestHighLevelClientAdapter(host, config);
- if (esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) {
+ if (!esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) {
throw Status.INVALID_ARGUMENT
.withDescription("Cannot connect to " + config.getUrl())
.asRuntimeException();
@@ -96,7 +96,7 @@ public void validate(
} else if (config.getConnector().equals("opensearch")) {
OpensearchRestHighLevelClientAdapter opensearchClient =
new OpensearchRestHighLevelClientAdapter(host, config);
- if (opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) {
+ if (!opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) {
throw Status.INVALID_ARGUMENT
.withDescription("Cannot connect to " + config.getUrl())
.asRuntimeException();