diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
index 679deedebcabf..73f0799e44d1d 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java
@@ -40,6 +40,7 @@ public static SinkFactory getSinkFactory(String sinkName) {
case "jdbc":
return new JDBCSinkFactory();
case "elasticsearch":
+ case "opensearch":
return new EsSinkFactory();
case "cassandra":
return new CassandraFactory();
diff --git a/java/connector-node/risingwave-connector-test/pom.xml b/java/connector-node/risingwave-connector-test/pom.xml
index 14b1c7bd65fc0..d3d47b0bc4571 100644
--- a/java/connector-node/risingwave-connector-test/pom.xml
+++ b/java/connector-node/risingwave-connector-test/pom.xml
@@ -128,13 +128,13 @@
com.fasterxml.jackson.core
jackson-databind
- ${jackson.version}
+ 2.13.5
test
com.fasterxml.jackson.core
jackson-core
- ${jackson.version}
+ 2.13.5
test
diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
index 509f71ec1e569..d2873fac9d216 100644
--- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
+++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java
@@ -19,6 +19,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.risingwave.connector.ElasticRestHighLevelClientAdapter;
import com.risingwave.connector.EsSink;
import com.risingwave.connector.EsSinkConfig;
import com.risingwave.connector.api.TableSchema;
@@ -28,10 +29,10 @@
import com.risingwave.proto.Data.Op;
import java.io.IOException;
import java.util.Map;
+import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@@ -52,14 +53,14 @@ static TableSchema getTestTableSchema() {
public void testEsSink(ElasticsearchContainer container, String username, String password)
throws IOException {
- EsSink sink =
- new EsSink(
- new EsSinkConfig(container.getHttpHostAddress())
- .withIndex("test")
- .withDelimiter("$")
- .withUsername(username)
- .withPassword(password),
- getTestTableSchema());
+ EsSinkConfig config =
+ new EsSinkConfig(container.getHttpHostAddress())
+ .withIndex("test")
+ .withDelimiter("$")
+ .withUsername(username)
+ .withPassword(password);
+ config.setConnector("elasticsearch");
+ EsSink sink = new EsSink(config, getTestTableSchema());
sink.write(
Iterators.forArray(
new ArraySinkRow(
@@ -74,7 +75,9 @@ public void testEsSink(ElasticsearchContainer container, String username, String
fail(e.getMessage());
}
- RestHighLevelClient client = sink.getClient();
+ HttpHost host = HttpHost.create(config.getUrl());
+ ElasticRestHighLevelClientAdapter client =
+ new ElasticRestHighLevelClientAdapter(host, config);
SearchRequest searchRequest = new SearchRequest("test");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml
index 9c8515098d7d8..4ff4bd76ef109 100644
--- a/java/connector-node/risingwave-sink-es-7/pom.xml
+++ b/java/connector-node/risingwave-sink-es-7/pom.xml
@@ -51,6 +51,14 @@
org.elasticsearch.client
elasticsearch-rest-high-level-client
+
+ org.opensearch
+ opensearch
+
+
+ org.opensearch.client
+ opensearch-rest-high-level-client
+
org.apache.httpcomponents
httpclient
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java
new file mode 100644
index 0000000000000..4ce1165ba1baf
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import com.risingwave.connector.EsSink.RequestTracker;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BulkListener
+ implements org.elasticsearch.action.bulk.BulkProcessor.Listener,
+ org.opensearch.action.bulk.BulkProcessor.Listener {
+ private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
+ private final RequestTracker requestTracker;
+
+ public BulkListener(RequestTracker requestTracker) {
+ this.requestTracker = requestTracker;
+ }
+
+ @Override
+ public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) {
+ LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
+ }
+
+ @Override
+ public void afterBulk(
+ long executionId,
+ org.elasticsearch.action.bulk.BulkRequest request,
+ org.elasticsearch.action.bulk.BulkResponse response) {
+ if (response.hasFailures()) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), response.buildFailureMessage());
+ this.requestTracker.addErrResult(errMessage);
+ } else {
+ this.requestTracker.addOkResult(request.numberOfActions());
+ LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
+ }
+ }
+
+ /** This method is called when the bulk failed and raised a Throwable */
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), failure.getMessage());
+ this.requestTracker.addErrResult(errMessage);
+ }
+
+ @Override
+ public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) {
+ LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions());
+ }
+
+ @Override
+ public void afterBulk(
+ long executionId,
+ org.opensearch.action.bulk.BulkRequest request,
+ org.opensearch.action.bulk.BulkResponse response) {
+ if (response.hasFailures()) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), response.buildFailureMessage());
+ this.requestTracker.addErrResult(errMessage);
+ } else {
+ this.requestTracker.addOkResult(request.numberOfActions());
+ LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions());
+ }
+ }
+
+ @Override
+ public void afterBulk(
+ long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) {
+ String errMessage =
+ String.format(
+ "Bulk of %d actions failed. Failure: %s",
+ request.numberOfActions(), failure.getMessage());
+ this.requestTracker.addErrResult(errMessage);
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
similarity index 58%
rename from java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
rename to java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
index e26248b5fef74..d72ebe2833953 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java
@@ -16,14 +16,14 @@
package com.risingwave.connector;
-import java.util.function.BiConsumer;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
+import java.util.concurrent.TimeUnit;
-/**
- * {@link BulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API calls
- * across different Elasticsearch versions.
- */
-interface BulkRequestConsumerFactory
- extends BiConsumer> {}
+public interface BulkProcessorAdapter {
+ public void addRow(String index, String key, String doc);
+
+ public void deleteRow(String index, String key);
+
+ public void flush();
+
+ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
new file mode 100644
index 0000000000000..de6ab3414f65a
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import com.risingwave.connector.EsSink.RequestTracker;
+import java.util.concurrent.TimeUnit;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.XContentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter {
+ private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
+ BulkProcessor esBulkProcessor;
+ private final RequestTracker requestTracker;
+
+ public ElasticBulkProcessorAdapter(
+ RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) {
+ BulkProcessor.Builder builder =
+ BulkProcessor.builder(
+ (bulkRequest, bulkResponseActionListener) ->
+ client.bulkAsync(
+ bulkRequest,
+ RequestOptions.DEFAULT,
+ bulkResponseActionListener),
+ new BulkListener(requestTracker));
+ // Possible feature: move these to config
+ // execute the bulk every 10 000 requests
+ builder.setBulkActions(1000);
+ // flush the bulk every 5mb
+ builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
+ // flush the bulk every 5 seconds whatever the number of requests
+ builder.setFlushInterval(TimeValue.timeValueSeconds(5));
+ // Set the number of concurrent requests
+ builder.setConcurrentRequests(1);
+ // Set a custom backoff policy which will initially wait for 100ms, increase exponentially
+ // and retries up to three times.
+ builder.setBackoffPolicy(
+ BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
+ this.esBulkProcessor = builder.build();
+ this.requestTracker = requestTracker;
+ }
+
+ @Override
+ public void flush() {
+ esBulkProcessor.flush();
+ }
+
+ @Override
+ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
+ esBulkProcessor.awaitClose(timeout, unit);
+ }
+
+ @Override
+ public void addRow(String index, String key, String doc) {
+ UpdateRequest updateRequest;
+ updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
+ updateRequest.docAsUpsert(true);
+ this.requestTracker.addWriteTask();
+ this.esBulkProcessor.add(updateRequest);
+ }
+
+ @Override
+ public void deleteRow(String index, String key) {
+ DeleteRequest deleteRequest;
+ deleteRequest = new DeleteRequest(index, "_doc", key);
+ this.requestTracker.addWriteTask();
+ this.esBulkProcessor.add(deleteRequest);
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java
new file mode 100644
index 0000000000000..c64def3bef8a7
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import java.io.IOException;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.RestHighLevelClientBuilder;
+
+public class ElasticRestHighLevelClientAdapter implements AutoCloseable {
+ RestHighLevelClient esClient;
+
+ private static RestClientBuilder configureRestClientBuilder(
+ RestClientBuilder builder, EsSinkConfig config) {
+ // Possible config:
+ // 1. Connection path prefix
+ // 2. Username and password
+ if (config.getPassword() != null && config.getUsername() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
+ builder.setHttpClientConfigCallback(
+ httpClientBuilder ->
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ }
+ // 3. Timeout
+ return builder;
+ }
+
+ public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) {
+ this.esClient =
+ new RestHighLevelClientBuilder(
+ configureRestClientBuilder(RestClient.builder(host), config)
+ .build())
+ .setApiCompatibilityMode(true)
+ .build();
+ }
+
+ @Override
+ public void close() throws IOException {
+ esClient.close();
+ }
+
+ public boolean ping(RequestOptions options) throws IOException {
+ boolean flag = esClient.ping(options);
+ return flag;
+ }
+
+ public Cancellable bulkAsync(
+ BulkRequest bulkRequest,
+ RequestOptions options,
+ ActionListener listener) {
+ Cancellable cancellable = esClient.bulkAsync(bulkRequest, options, listener);
+ return cancellable;
+ }
+
+ public SearchResponse search(SearchRequest searchRequest, RequestOptions options)
+ throws IOException {
+ return this.esClient.search(searchRequest, options);
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
index cc5977a9c208c..315fc800a2ef0 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java
@@ -1,16 +1,18 @@
-// Copyright 2024 RisingWave Labs
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.risingwave.connector;
@@ -25,25 +27,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.RestHighLevelClientBuilder;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,8 +48,7 @@ public class EsSink extends SinkWriterBase {
private static final String ERROR_REPORT_TEMPLATE = "Error message %s";
private final EsSinkConfig config;
- private BulkProcessor bulkProcessor;
- private final RestHighLevelClient client;
+ private BulkProcessorAdapter bulkProcessor;
// Used to handle the return message of ES and throw errors
private final RequestTracker requestTracker;
@@ -167,156 +149,36 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
this.requestTracker = new RequestTracker();
// ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever.
- this.client =
- new RestHighLevelClientBuilder(
- configureRestClientBuilder(RestClient.builder(host), config)
- .build())
- .setApiCompatibilityMode(true)
- .build();
- // Test connection
- try {
- boolean isConnected = this.client.ping(RequestOptions.DEFAULT);
- if (!isConnected) {
- throw Status.INVALID_ARGUMENT
- .withDescription("Cannot connect to " + config.getUrl())
- .asRuntimeException();
- }
- } catch (Exception e) {
- throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
- }
- this.bulkProcessor = createBulkProcessor(this.requestTracker);
- }
-
- private static RestClientBuilder configureRestClientBuilder(
- RestClientBuilder builder, EsSinkConfig config) {
- // Possible config:
- // 1. Connection path prefix
- // 2. Username and password
- if (config.getPassword() != null && config.getUsername() != null) {
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
- builder.setHttpClientConfigCallback(
- httpClientBuilder ->
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
- }
- // 3. Timeout
- return builder;
- }
-
- private BulkProcessor.Builder applyBulkConfig(
- RestHighLevelClient client, EsSinkConfig config, BulkProcessor.Listener listener) {
- BulkProcessor.Builder builder =
- BulkProcessor.builder(
- (BulkRequestConsumerFactory)
- (bulkRequest, bulkResponseActionListener) ->
- client.bulkAsync(
- bulkRequest,
- RequestOptions.DEFAULT,
- bulkResponseActionListener),
- listener);
- // Possible feature: move these to config
- // execute the bulk every 10 000 requests
- builder.setBulkActions(1000);
- // flush the bulk every 5mb
- builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
- // flush the bulk every 5 seconds whatever the number of requests
- builder.setFlushInterval(TimeValue.timeValueSeconds(5));
- // Set the number of concurrent requests
- builder.setConcurrentRequests(1);
- // Set a custom backoff policy which will initially wait for 100ms, increase exponentially
- // and retries up to three times.
- builder.setBackoffPolicy(
- BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
- return builder;
- }
-
- private BulkProcessor createBulkProcessor(RequestTracker requestTracker) {
- BulkProcessor.Builder builder =
- applyBulkConfig(this.client, this.config, new BulkListener(requestTracker));
- return builder.build();
- }
-
- private class BulkListener implements BulkProcessor.Listener {
- private final RequestTracker requestTracker;
-
- public BulkListener(RequestTracker requestTracker) {
- this.requestTracker = requestTracker;
- }
-
- /** This method is called just before bulk is executed. */
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
- LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
- }
-
- /** This method is called after bulk execution. */
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- if (response.hasFailures()) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), response.buildFailureMessage());
- this.requestTracker.addErrResult(errMessage);
- } else {
- this.requestTracker.addOkResult(request.numberOfActions());
- LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
- }
- }
-
- /** This method is called when the bulk failed and raised a Throwable */
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- String errMessage =
- String.format(
- "Bulk of %d actions failed. Failure: %s",
- request.numberOfActions(), failure.getMessage());
- this.requestTracker.addErrResult(errMessage);
- }
- }
-
- private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException {
- final String index = (String) row.get(0);
- final String key = (String) row.get(1);
- String doc = (String) row.get(2);
-
- UpdateRequest updateRequest;
- if (config.getIndex() != null) {
- updateRequest =
- new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON);
+ if (config.getConnector().equals("elasticsearch")) {
+ ElasticRestHighLevelClientAdapter client =
+ new ElasticRestHighLevelClientAdapter(host, config);
+ this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client);
+ } else if (config.getConnector().equals("opensearch")) {
+ OpensearchRestHighLevelClientAdapter client =
+ new OpensearchRestHighLevelClientAdapter(host, config);
+ this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client);
} else {
- updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
+ throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
- updateRequest.docAsUpsert(true);
- this.requestTracker.addWriteTask();
- bulkProcessor.add(updateRequest);
}
- private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException {
- final String index = (String) row.get(0);
+ private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException {
final String key = (String) row.get(1);
-
- DeleteRequest deleteRequest;
- if (config.getIndex() != null) {
- deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key);
+ String doc = (String) row.get(2);
+ final String index;
+ if (config.getIndex() == null) {
+ index = (String) row.get(0);
} else {
- deleteRequest = new DeleteRequest(index, "_doc", key);
+ index = config.getIndex();
}
- this.requestTracker.addWriteTask();
- bulkProcessor.add(deleteRequest);
- }
-
- private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException {
switch (row.getOp()) {
case INSERT:
case UPDATE_INSERT:
- processUpsert(row);
+ this.bulkProcessor.addRow(index, key, doc);
break;
case DELETE:
case UPDATE_DELETE:
- processDelete(row);
+ this.bulkProcessor.deleteRow(index, key);
break;
default:
throw Status.INVALID_ARGUMENT
@@ -353,15 +215,10 @@ public void sync() {
public void drop() {
try {
bulkProcessor.awaitClose(100, TimeUnit.SECONDS);
- client.close();
} catch (Exception e) {
throw io.grpc.Status.INTERNAL
.withDescription(String.format(ERROR_REPORT_TEMPLATE, e.getMessage()))
.asRuntimeException();
}
}
-
- public RestHighLevelClient getClient() {
- return client;
- }
}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
index f3fa3bfa16c3b..03e888a892df3 100644
--- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java
@@ -1,16 +1,18 @@
-// Copyright 2024 RisingWave Labs
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.risingwave.connector;
@@ -23,17 +25,8 @@
import com.risingwave.proto.Catalog;
import com.risingwave.proto.Data;
import io.grpc.Status;
-import java.io.IOException;
import java.util.Map;
import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,34 +83,30 @@ public void validate(
}
// 2. check connection
- RestClientBuilder builder = RestClient.builder(host);
- if (config.getPassword() != null && config.getUsername() != null) {
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
- builder.setHttpClientConfigCallback(
- httpClientBuilder ->
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
- }
- RestHighLevelClient client = new RestHighLevelClient(builder);
- // Test connection
try {
- boolean isConnected = client.ping(RequestOptions.DEFAULT);
- if (!isConnected) {
- throw Status.INVALID_ARGUMENT
- .withDescription("Cannot connect to " + config.getUrl())
- .asRuntimeException();
+ if (config.getConnector().equals("elasticsearch")) {
+ ElasticRestHighLevelClientAdapter esClient =
+ new ElasticRestHighLevelClientAdapter(host, config);
+ if (!esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("Cannot connect to " + config.getUrl())
+ .asRuntimeException();
+ }
+ esClient.close();
+ } else if (config.getConnector().equals("opensearch")) {
+ OpensearchRestHighLevelClientAdapter opensearchClient =
+ new OpensearchRestHighLevelClientAdapter(host, config);
+ if (!opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("Cannot connect to " + config.getUrl())
+ .asRuntimeException();
+ }
+ opensearchClient.close();
+ } else {
+ throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
} catch (Exception e) {
throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
}
-
- // 3. close client
- try {
- client.close();
- } catch (IOException e) {
- throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
- }
}
}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
new file mode 100644
index 0000000000000..d5d8cdc3d237d
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import com.risingwave.connector.EsSink.RequestTracker;
+import java.util.concurrent.TimeUnit;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.action.bulk.BulkProcessor;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.update.UpdateRequest;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.core.common.unit.ByteSizeUnit;
+import org.opensearch.core.common.unit.ByteSizeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter {
+ private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
+ private final RequestTracker requestTracker;
+ BulkProcessor opensearchBulkProcessor;
+
+ public OpensearchBulkProcessorAdapter(
+ RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) {
+ BulkProcessor.Builder builder =
+ BulkProcessor.builder(
+ (bulkRequest, bulkResponseActionListener) ->
+ client.bulkAsync(
+ bulkRequest,
+ RequestOptions.DEFAULT,
+ bulkResponseActionListener),
+ new BulkListener(requestTracker));
+ // Possible feature: move these to config
+ // execute the bulk every 10 000 requests
+ builder.setBulkActions(1000);
+ // flush the bulk every 5mb
+ builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
+ // flush the bulk every 5 seconds whatever the number of requests
+ builder.setFlushInterval(TimeValue.timeValueSeconds(5));
+ // Set the number of concurrent requests
+ builder.setConcurrentRequests(1);
+ // Set a custom backoff policy which will initially wait for 100ms, increase exponentially
+ // and retries up to three times.
+ builder.setBackoffPolicy(
+ BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
+ this.opensearchBulkProcessor = builder.build();
+ this.requestTracker = requestTracker;
+ }
+
+ @Override
+ public void flush() {
+ opensearchBulkProcessor.flush();
+ }
+
+ @Override
+ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
+ opensearchBulkProcessor.awaitClose(timeout, unit);
+ }
+
+ @Override
+ public void addRow(String index, String key, String doc) {
+ UpdateRequest updateRequest;
+ updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON);
+ updateRequest.docAsUpsert(true);
+ this.requestTracker.addWriteTask();
+ this.opensearchBulkProcessor.add(updateRequest);
+ }
+
+ @Override
+ public void deleteRow(String index, String key) {
+ DeleteRequest deleteRequest;
+ deleteRequest = new DeleteRequest(index, key);
+ this.requestTracker.addWriteTask();
+ this.opensearchBulkProcessor.add(deleteRequest);
+ }
+}
diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java
new file mode 100644
index 0000000000000..5f3773b0a91aa
--- /dev/null
+++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2024 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.risingwave.connector;
+
+import java.io.IOException;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.client.Cancellable;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.core.action.ActionListener;
+
+public class OpensearchRestHighLevelClientAdapter implements AutoCloseable {
+ RestHighLevelClient opensearchClient;
+
+ private static RestClientBuilder configureRestClientBuilder(
+ RestClientBuilder builder, EsSinkConfig config) {
+ // Possible config:
+ // 1. Connection path prefix
+ // 2. Username and password
+ if (config.getPassword() != null && config.getUsername() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
+ builder.setHttpClientConfigCallback(
+ httpClientBuilder ->
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ }
+ // 3. Timeout
+ return builder;
+ }
+
+ public OpensearchRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) {
+ this.opensearchClient =
+ new org.opensearch.client.RestHighLevelClient(
+ configureRestClientBuilder(
+ org.opensearch.client.RestClient.builder(host), config));
+ }
+
+ @Override
+ public void close() throws IOException {
+ opensearchClient.close();
+ }
+
+ public boolean ping(org.opensearch.client.RequestOptions options) throws IOException {
+ boolean flag = opensearchClient.ping(options);
+ return flag;
+ }
+
+ public Cancellable bulkAsync(
+ BulkRequest bulkRequest,
+ RequestOptions options,
+ ActionListener listener) {
+ Cancellable cancellable = opensearchClient.bulkAsync(bulkRequest, options, listener);
+ return cancellable;
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 5f0327bf8ffc9..44ff36fe8de0a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -75,10 +75,11 @@
1.10.0
3.12.0
2.4.2.Final
- 2.13.5
+ 2.15.0
3.3.1
3.3.3
7.17.19
+ 2.11.1
4.15.0
1.18.0
1.17.6
@@ -195,6 +196,22 @@
elasticsearch-rest-high-level-client
${elasticsearch.version}
+
+ org.opensearch
+ opensearch
+ ${opensearch.version}
+
+
+ org.opensearch.client
+ opensearch-rest-high-level-client
+ ${opensearch.version}
+
+
+ org.apache.httpcomponents
+ httpcore-nio
+
+
+
io.grpc
grpc-netty-shaded
diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs
index 236f90823c505..3d51e48201c94 100644
--- a/src/connector/src/sink/elasticsearch.rs
+++ b/src/connector/src/sink/elasticsearch.rs
@@ -24,7 +24,7 @@ use risingwave_common::types::{JsonbVal, Scalar, ToText};
use serde_json::Value;
use super::encoder::{JsonEncoder, RowEncoder};
-use super::remote::ElasticSearchSink;
+use super::remote::{ElasticSearchSink, OpensearchSink};
use crate::sink::{Result, Sink};
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
@@ -40,7 +40,7 @@ impl StreamChunkConverter {
pk_indices: &Vec,
properties: &BTreeMap,
) -> Result {
- if sink_name == ElasticSearchSink::SINK_NAME {
+ if is_es_sink(sink_name) {
let index_column = properties
.get(ES_OPTION_INDEX_COLUMN)
.cloned()
@@ -170,3 +170,7 @@ impl EsStreamChunkConverter {
(self.fn_build_id)(row)
}
}
+
+pub fn is_es_sink(sink_name: &str) -> bool {
+ sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME
+}
diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs
index 8102f03355e8f..cd9fdcd5ef7cd 100644
--- a/src/connector/src/sink/mod.rs
+++ b/src/connector/src/sink/mod.rs
@@ -93,6 +93,7 @@ macro_rules! for_all_sinks {
{ Nats, $crate::sink::nats::NatsSink },
{ Jdbc, $crate::sink::remote::JdbcSink },
{ ElasticSearch, $crate::sink::remote::ElasticSearchSink },
+ { Opensearch, $crate::sink::remote::OpensearchSink },
{ Cassandra, $crate::sink::remote::CassandraSink },
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs
index f8b84fc64eb86..863ca02f69d59 100644
--- a/src/connector/src/sink/remote.rs
+++ b/src/connector/src/sink/remote.rs
@@ -58,7 +58,7 @@ use tokio::task::spawn_blocking;
use tokio_stream::wrappers::ReceiverStream;
use tracing::warn;
-use super::elasticsearch::{StreamChunkConverter, ES_OPTION_DELIMITER};
+use super::elasticsearch::{is_es_sink, StreamChunkConverter, ES_OPTION_DELIMITER};
use crate::error::ConnectorResult;
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::coordinate::CoordinatedSinkWriter;
@@ -73,6 +73,7 @@ macro_rules! def_remote_sink {
() => {
def_remote_sink! {
{ ElasticSearch, ElasticSearchSink, "elasticsearch" }
+ { Opensearch, OpensearchSink, "opensearch"}
{ Cassandra, CassandraSink, "cassandra" }
{ Jdbc, JdbcSink, "jdbc", |desc| {
desc.sink_type.is_append_only()
@@ -164,7 +165,7 @@ impl Sink for RemoteSink {
}
async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> {
- if sink_name == ElasticSearchSink::SINK_NAME
+ if is_es_sink(sink_name)
&& param.downstream_pk.len() > 1
&& !param.properties.contains_key(ES_OPTION_DELIMITER)
{
@@ -189,7 +190,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
| DataType::Jsonb
| DataType::Bytea => Ok(()),
DataType::List(list) => {
- if (sink_name==ElasticSearchSink::SINK_NAME) | matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
+ if is_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
Ok(())
} else{
Err(SinkError::Remote(anyhow!(
@@ -200,7 +201,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
}
},
DataType::Struct(_) => {
- if sink_name==ElasticSearchSink::SINK_NAME{
+ if is_es_sink(sink_name){
Ok(())
}else{
Err(SinkError::Remote(anyhow!(
@@ -263,7 +264,7 @@ impl RemoteLogSinker {
sink_name: &str,
) -> Result {
let sink_proto = sink_param.to_proto();
- let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME {
+ let payload_schema = if is_es_sink(sink_name) {
let columns = vec![
ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(),
ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),