From 35d3f492483c821ac43353a6e89ee696e280b9d7 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Tue, 18 Jun 2024 17:27:04 +0800 Subject: [PATCH] feat(sink): refator es sink to support opensearch (#16330) --- .../com/risingwave/connector/SinkUtils.java | 1 + .../risingwave-connector-test/pom.xml | 4 +- .../sink/elasticsearch/EsSinkTest.java | 23 +- .../risingwave-sink-es-7/pom.xml | 8 + .../risingwave/connector/BulkListener.java | 97 ++++++++ ...Factory.java => BulkProcessorAdapter.java} | 20 +- .../ElasticBulkProcessorAdapter.java | 91 ++++++++ .../ElasticRestHighLevelClientAdapter.java | 89 ++++++++ .../java/com/risingwave/connector/EsSink.java | 209 +++--------------- .../risingwave/connector/EsSinkFactory.java | 81 +++---- .../OpensearchBulkProcessorAdapter.java | 91 ++++++++ .../OpensearchRestHighLevelClientAdapter.java | 78 +++++++ java/pom.xml | 19 +- src/connector/src/sink/elasticsearch.rs | 8 +- src/connector/src/sink/mod.rs | 1 + src/connector/src/sink/remote.rs | 11 +- 16 files changed, 579 insertions(+), 252 deletions(-) create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java rename java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/{BulkRequestConsumerFactory.java => BulkProcessorAdapter.java} (58%) create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java create mode 100644 java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java index 679deedebcabf..73f0799e44d1d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java @@ -40,6 +40,7 @@ public static SinkFactory getSinkFactory(String sinkName) { case "jdbc": return new JDBCSinkFactory(); case "elasticsearch": + case "opensearch": return new EsSinkFactory(); case "cassandra": return new CassandraFactory(); diff --git a/java/connector-node/risingwave-connector-test/pom.xml b/java/connector-node/risingwave-connector-test/pom.xml index 14b1c7bd65fc0..d3d47b0bc4571 100644 --- a/java/connector-node/risingwave-connector-test/pom.xml +++ b/java/connector-node/risingwave-connector-test/pom.xml @@ -128,13 +128,13 @@ com.fasterxml.jackson.core jackson-databind - ${jackson.version} + 2.13.5 test com.fasterxml.jackson.core jackson-core - ${jackson.version} + 2.13.5 test diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index 509f71ec1e569..d2873fac9d216 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -19,6 +19,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.risingwave.connector.ElasticRestHighLevelClientAdapter; import com.risingwave.connector.EsSink; import com.risingwave.connector.EsSinkConfig; import com.risingwave.connector.api.TableSchema; @@ -28,10 +29,10 @@ import com.risingwave.proto.Data.Op; import java.io.IOException; import java.util.Map; +import org.apache.http.HttpHost; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; @@ -52,14 +53,14 @@ static TableSchema getTestTableSchema() { public void testEsSink(ElasticsearchContainer container, String username, String password) throws IOException { - EsSink sink = - new EsSink( - new EsSinkConfig(container.getHttpHostAddress()) - .withIndex("test") - .withDelimiter("$") - .withUsername(username) - .withPassword(password), - getTestTableSchema()); + EsSinkConfig config = + new EsSinkConfig(container.getHttpHostAddress()) + .withIndex("test") + .withDelimiter("$") + .withUsername(username) + .withPassword(password); + config.setConnector("elasticsearch"); + EsSink sink = new EsSink(config, getTestTableSchema()); sink.write( Iterators.forArray( new ArraySinkRow( @@ -74,7 +75,9 @@ public void testEsSink(ElasticsearchContainer container, String username, String fail(e.getMessage()); } - RestHighLevelClient client = sink.getClient(); + HttpHost host = HttpHost.create(config.getUrl()); + ElasticRestHighLevelClientAdapter client = + new ElasticRestHighLevelClientAdapter(host, config); SearchRequest searchRequest = new SearchRequest("test"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); diff --git a/java/connector-node/risingwave-sink-es-7/pom.xml b/java/connector-node/risingwave-sink-es-7/pom.xml index 9c8515098d7d8..4ff4bd76ef109 100644 --- a/java/connector-node/risingwave-sink-es-7/pom.xml +++ b/java/connector-node/risingwave-sink-es-7/pom.xml @@ -51,6 +51,14 @@ org.elasticsearch.client elasticsearch-rest-high-level-client + + org.opensearch + opensearch + + + org.opensearch.client + opensearch-rest-high-level-client + org.apache.httpcomponents httpclient diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java new file mode 100644 index 0000000000000..4ce1165ba1baf --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java @@ -0,0 +1,97 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.risingwave.connector.EsSink.RequestTracker; +import org.elasticsearch.action.bulk.BulkRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BulkListener + implements org.elasticsearch.action.bulk.BulkProcessor.Listener, + org.opensearch.action.bulk.BulkProcessor.Listener { + private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + private final RequestTracker requestTracker; + + public BulkListener(RequestTracker requestTracker) { + this.requestTracker = requestTracker; + } + + @Override + public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + + @Override + public void afterBulk( + long executionId, + org.elasticsearch.action.bulk.BulkRequest request, + org.elasticsearch.action.bulk.BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); + } else { + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + } + + /** This method is called when the bulk failed and raised a Throwable */ + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); + } + + @Override + public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions()); + } + + @Override + public void afterBulk( + long executionId, + org.opensearch.action.bulk.BulkRequest request, + org.opensearch.action.bulk.BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); + } else { + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions()); + } + } + + @Override + public void afterBulk( + long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java similarity index 58% rename from java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java rename to java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java index e26248b5fef74..d72ebe2833953 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java @@ -16,14 +16,14 @@ package com.risingwave.connector; -import java.util.function.BiConsumer; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; +import java.util.concurrent.TimeUnit; -/** - * {@link BulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API calls - * across different Elasticsearch versions. - */ -interface BulkRequestConsumerFactory - extends BiConsumer> {} +public interface BulkProcessorAdapter { + public void addRow(String index, String key, String doc); + + public void deleteRow(String index, String key); + + public void flush(); + + public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException; +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java new file mode 100644 index 0000000000000..de6ab3414f65a --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.risingwave.connector.EsSink.RequestTracker; +import java.util.concurrent.TimeUnit; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.XContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter { + private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + BulkProcessor esBulkProcessor; + private final RequestTracker requestTracker; + + public ElasticBulkProcessorAdapter( + RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) { + BulkProcessor.Builder builder = + BulkProcessor.builder( + (bulkRequest, bulkResponseActionListener) -> + client.bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + bulkResponseActionListener), + new BulkListener(requestTracker)); + // Possible feature: move these to config + // execute the bulk every 10 000 requests + builder.setBulkActions(1000); + // flush the bulk every 5mb + builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); + // flush the bulk every 5 seconds whatever the number of requests + builder.setFlushInterval(TimeValue.timeValueSeconds(5)); + // Set the number of concurrent requests + builder.setConcurrentRequests(1); + // Set a custom backoff policy which will initially wait for 100ms, increase exponentially + // and retries up to three times. + builder.setBackoffPolicy( + BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); + this.esBulkProcessor = builder.build(); + this.requestTracker = requestTracker; + } + + @Override + public void flush() { + esBulkProcessor.flush(); + } + + @Override + public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException { + esBulkProcessor.awaitClose(timeout, unit); + } + + @Override + public void addRow(String index, String key, String doc) { + UpdateRequest updateRequest; + updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON); + updateRequest.docAsUpsert(true); + this.requestTracker.addWriteTask(); + this.esBulkProcessor.add(updateRequest); + } + + @Override + public void deleteRow(String index, String key) { + DeleteRequest deleteRequest; + deleteRequest = new DeleteRequest(index, "_doc", key); + this.requestTracker.addWriteTask(); + this.esBulkProcessor.add(deleteRequest); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java new file mode 100644 index 0000000000000..c64def3bef8a7 --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java @@ -0,0 +1,89 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import java.io.IOException; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Cancellable; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.RestHighLevelClientBuilder; + +public class ElasticRestHighLevelClientAdapter implements AutoCloseable { + RestHighLevelClient esClient; + + private static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, EsSinkConfig config) { + // Possible config: + // 1. Connection path prefix + // 2. Username and password + if (config.getPassword() != null && config.getUsername() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); + builder.setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + // 3. Timeout + return builder; + } + + public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { + this.esClient = + new RestHighLevelClientBuilder( + configureRestClientBuilder(RestClient.builder(host), config) + .build()) + .setApiCompatibilityMode(true) + .build(); + } + + @Override + public void close() throws IOException { + esClient.close(); + } + + public boolean ping(RequestOptions options) throws IOException { + boolean flag = esClient.ping(options); + return flag; + } + + public Cancellable bulkAsync( + BulkRequest bulkRequest, + RequestOptions options, + ActionListener listener) { + Cancellable cancellable = esClient.bulkAsync(bulkRequest, options, listener); + return cancellable; + } + + public SearchResponse search(SearchRequest searchRequest, RequestOptions options) + throws IOException { + return this.esClient.search(searchRequest, options); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index cc5977a9c208c..315fc800a2ef0 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -1,16 +1,18 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.risingwave.connector; @@ -25,25 +27,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.RestHighLevelClientBuilder; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,8 +48,7 @@ public class EsSink extends SinkWriterBase { private static final String ERROR_REPORT_TEMPLATE = "Error message %s"; private final EsSinkConfig config; - private BulkProcessor bulkProcessor; - private final RestHighLevelClient client; + private BulkProcessorAdapter bulkProcessor; // Used to handle the return message of ES and throw errors private final RequestTracker requestTracker; @@ -167,156 +149,36 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { this.requestTracker = new RequestTracker(); // ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever. - this.client = - new RestHighLevelClientBuilder( - configureRestClientBuilder(RestClient.builder(host), config) - .build()) - .setApiCompatibilityMode(true) - .build(); - // Test connection - try { - boolean isConnected = this.client.ping(RequestOptions.DEFAULT); - if (!isConnected) { - throw Status.INVALID_ARGUMENT - .withDescription("Cannot connect to " + config.getUrl()) - .asRuntimeException(); - } - } catch (Exception e) { - throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); - } - this.bulkProcessor = createBulkProcessor(this.requestTracker); - } - - private static RestClientBuilder configureRestClientBuilder( - RestClientBuilder builder, EsSinkConfig config) { - // Possible config: - // 1. Connection path prefix - // 2. Username and password - if (config.getPassword() != null && config.getUsername() != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); - builder.setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - } - // 3. Timeout - return builder; - } - - private BulkProcessor.Builder applyBulkConfig( - RestHighLevelClient client, EsSinkConfig config, BulkProcessor.Listener listener) { - BulkProcessor.Builder builder = - BulkProcessor.builder( - (BulkRequestConsumerFactory) - (bulkRequest, bulkResponseActionListener) -> - client.bulkAsync( - bulkRequest, - RequestOptions.DEFAULT, - bulkResponseActionListener), - listener); - // Possible feature: move these to config - // execute the bulk every 10 000 requests - builder.setBulkActions(1000); - // flush the bulk every 5mb - builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); - // flush the bulk every 5 seconds whatever the number of requests - builder.setFlushInterval(TimeValue.timeValueSeconds(5)); - // Set the number of concurrent requests - builder.setConcurrentRequests(1); - // Set a custom backoff policy which will initially wait for 100ms, increase exponentially - // and retries up to three times. - builder.setBackoffPolicy( - BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); - return builder; - } - - private BulkProcessor createBulkProcessor(RequestTracker requestTracker) { - BulkProcessor.Builder builder = - applyBulkConfig(this.client, this.config, new BulkListener(requestTracker)); - return builder.build(); - } - - private class BulkListener implements BulkProcessor.Listener { - private final RequestTracker requestTracker; - - public BulkListener(RequestTracker requestTracker) { - this.requestTracker = requestTracker; - } - - /** This method is called just before bulk is executed. */ - @Override - public void beforeBulk(long executionId, BulkRequest request) { - LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - - /** This method is called after bulk execution. */ - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), response.buildFailureMessage()); - this.requestTracker.addErrResult(errMessage); - } else { - this.requestTracker.addOkResult(request.numberOfActions()); - LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - } - - /** This method is called when the bulk failed and raised a Throwable */ - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), failure.getMessage()); - this.requestTracker.addErrResult(errMessage); - } - } - - private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - String doc = (String) row.get(2); - - UpdateRequest updateRequest; - if (config.getIndex() != null) { - updateRequest = - new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON); + if (config.getConnector().equals("elasticsearch")) { + ElasticRestHighLevelClientAdapter client = + new ElasticRestHighLevelClientAdapter(host, config); + this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client); + } else if (config.getConnector().equals("opensearch")) { + OpensearchRestHighLevelClientAdapter client = + new OpensearchRestHighLevelClientAdapter(host, config); + this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client); } else { - updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON); + throw new RuntimeException("Sink type must be elasticsearch or opensearch"); } - updateRequest.docAsUpsert(true); - this.requestTracker.addWriteTask(); - bulkProcessor.add(updateRequest); } - private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException { - final String index = (String) row.get(0); + private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException { final String key = (String) row.get(1); - - DeleteRequest deleteRequest; - if (config.getIndex() != null) { - deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key); + String doc = (String) row.get(2); + final String index; + if (config.getIndex() == null) { + index = (String) row.get(0); } else { - deleteRequest = new DeleteRequest(index, "_doc", key); + index = config.getIndex(); } - this.requestTracker.addWriteTask(); - bulkProcessor.add(deleteRequest); - } - - private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException { switch (row.getOp()) { case INSERT: case UPDATE_INSERT: - processUpsert(row); + this.bulkProcessor.addRow(index, key, doc); break; case DELETE: case UPDATE_DELETE: - processDelete(row); + this.bulkProcessor.deleteRow(index, key); break; default: throw Status.INVALID_ARGUMENT @@ -353,15 +215,10 @@ public void sync() { public void drop() { try { bulkProcessor.awaitClose(100, TimeUnit.SECONDS); - client.close(); } catch (Exception e) { throw io.grpc.Status.INTERNAL .withDescription(String.format(ERROR_REPORT_TEMPLATE, e.getMessage())) .asRuntimeException(); } } - - public RestHighLevelClient getClient() { - return client; - } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index f3fa3bfa16c3b..03e888a892df3 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -1,16 +1,18 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.risingwave.connector; @@ -23,17 +25,8 @@ import com.risingwave.proto.Catalog; import com.risingwave.proto.Data; import io.grpc.Status; -import java.io.IOException; import java.util.Map; import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,34 +83,30 @@ public void validate( } // 2. check connection - RestClientBuilder builder = RestClient.builder(host); - if (config.getPassword() != null && config.getUsername() != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); - builder.setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - } - RestHighLevelClient client = new RestHighLevelClient(builder); - // Test connection try { - boolean isConnected = client.ping(RequestOptions.DEFAULT); - if (!isConnected) { - throw Status.INVALID_ARGUMENT - .withDescription("Cannot connect to " + config.getUrl()) - .asRuntimeException(); + if (config.getConnector().equals("elasticsearch")) { + ElasticRestHighLevelClientAdapter esClient = + new ElasticRestHighLevelClientAdapter(host, config); + if (!esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) { + throw Status.INVALID_ARGUMENT + .withDescription("Cannot connect to " + config.getUrl()) + .asRuntimeException(); + } + esClient.close(); + } else if (config.getConnector().equals("opensearch")) { + OpensearchRestHighLevelClientAdapter opensearchClient = + new OpensearchRestHighLevelClientAdapter(host, config); + if (!opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) { + throw Status.INVALID_ARGUMENT + .withDescription("Cannot connect to " + config.getUrl()) + .asRuntimeException(); + } + opensearchClient.close(); + } else { + throw new RuntimeException("Sink type must be elasticsearch or opensearch"); } } catch (Exception e) { throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); } - - // 3. close client - try { - client.close(); - } catch (IOException e) { - throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); - } } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java new file mode 100644 index 0000000000000..d5d8cdc3d237d --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.risingwave.connector.EsSink.RequestTracker; +import java.util.concurrent.TimeUnit; +import org.opensearch.action.bulk.BackoffPolicy; +import org.opensearch.action.bulk.BulkProcessor; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.client.RequestOptions; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter { + private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + private final RequestTracker requestTracker; + BulkProcessor opensearchBulkProcessor; + + public OpensearchBulkProcessorAdapter( + RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) { + BulkProcessor.Builder builder = + BulkProcessor.builder( + (bulkRequest, bulkResponseActionListener) -> + client.bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + bulkResponseActionListener), + new BulkListener(requestTracker)); + // Possible feature: move these to config + // execute the bulk every 10 000 requests + builder.setBulkActions(1000); + // flush the bulk every 5mb + builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); + // flush the bulk every 5 seconds whatever the number of requests + builder.setFlushInterval(TimeValue.timeValueSeconds(5)); + // Set the number of concurrent requests + builder.setConcurrentRequests(1); + // Set a custom backoff policy which will initially wait for 100ms, increase exponentially + // and retries up to three times. + builder.setBackoffPolicy( + BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); + this.opensearchBulkProcessor = builder.build(); + this.requestTracker = requestTracker; + } + + @Override + public void flush() { + opensearchBulkProcessor.flush(); + } + + @Override + public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException { + opensearchBulkProcessor.awaitClose(timeout, unit); + } + + @Override + public void addRow(String index, String key, String doc) { + UpdateRequest updateRequest; + updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON); + updateRequest.docAsUpsert(true); + this.requestTracker.addWriteTask(); + this.opensearchBulkProcessor.add(updateRequest); + } + + @Override + public void deleteRow(String index, String key) { + DeleteRequest deleteRequest; + deleteRequest = new DeleteRequest(index, key); + this.requestTracker.addWriteTask(); + this.opensearchBulkProcessor.add(deleteRequest); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java new file mode 100644 index 0000000000000..5f3773b0a91aa --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import java.io.IOException; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.client.Cancellable; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.core.action.ActionListener; + +public class OpensearchRestHighLevelClientAdapter implements AutoCloseable { + RestHighLevelClient opensearchClient; + + private static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, EsSinkConfig config) { + // Possible config: + // 1. Connection path prefix + // 2. Username and password + if (config.getPassword() != null && config.getUsername() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); + builder.setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + // 3. Timeout + return builder; + } + + public OpensearchRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { + this.opensearchClient = + new org.opensearch.client.RestHighLevelClient( + configureRestClientBuilder( + org.opensearch.client.RestClient.builder(host), config)); + } + + @Override + public void close() throws IOException { + opensearchClient.close(); + } + + public boolean ping(org.opensearch.client.RequestOptions options) throws IOException { + boolean flag = opensearchClient.ping(options); + return flag; + } + + public Cancellable bulkAsync( + BulkRequest bulkRequest, + RequestOptions options, + ActionListener listener) { + Cancellable cancellable = opensearchClient.bulkAsync(bulkRequest, options, listener); + return cancellable; + } +} diff --git a/java/pom.xml b/java/pom.xml index 5f0327bf8ffc9..44ff36fe8de0a 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -75,10 +75,11 @@ 1.10.0 3.12.0 2.4.2.Final - 2.13.5 + 2.15.0 3.3.1 3.3.3 7.17.19 + 2.11.1 4.15.0 1.18.0 1.17.6 @@ -195,6 +196,22 @@ elasticsearch-rest-high-level-client ${elasticsearch.version} + + org.opensearch + opensearch + ${opensearch.version} + + + org.opensearch.client + opensearch-rest-high-level-client + ${opensearch.version} + + + org.apache.httpcomponents + httpcore-nio + + + io.grpc grpc-netty-shaded diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs index 236f90823c505..3d51e48201c94 100644 --- a/src/connector/src/sink/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch.rs @@ -24,7 +24,7 @@ use risingwave_common::types::{JsonbVal, Scalar, ToText}; use serde_json::Value; use super::encoder::{JsonEncoder, RowEncoder}; -use super::remote::ElasticSearchSink; +use super::remote::{ElasticSearchSink, OpensearchSink}; use crate::sink::{Result, Sink}; pub const ES_OPTION_DELIMITER: &str = "delimiter"; pub const ES_OPTION_INDEX_COLUMN: &str = "index_column"; @@ -40,7 +40,7 @@ impl StreamChunkConverter { pk_indices: &Vec, properties: &BTreeMap, ) -> Result { - if sink_name == ElasticSearchSink::SINK_NAME { + if is_es_sink(sink_name) { let index_column = properties .get(ES_OPTION_INDEX_COLUMN) .cloned() @@ -170,3 +170,7 @@ impl EsStreamChunkConverter { (self.fn_build_id)(row) } } + +pub fn is_es_sink(sink_name: &str) -> bool { + sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 8102f03355e8f..cd9fdcd5ef7cd 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -93,6 +93,7 @@ macro_rules! for_all_sinks { { Nats, $crate::sink::nats::NatsSink }, { Jdbc, $crate::sink::remote::JdbcSink }, { ElasticSearch, $crate::sink::remote::ElasticSearchSink }, + { Opensearch, $crate::sink::remote::OpensearchSink }, { Cassandra, $crate::sink::remote::CassandraSink }, { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index f8b84fc64eb86..863ca02f69d59 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -58,7 +58,7 @@ use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; -use super::elasticsearch::{StreamChunkConverter, ES_OPTION_DELIMITER}; +use super::elasticsearch::{is_es_sink, StreamChunkConverter, ES_OPTION_DELIMITER}; use crate::error::ConnectorResult; use crate::sink::catalog::desc::SinkDesc; use crate::sink::coordinate::CoordinatedSinkWriter; @@ -73,6 +73,7 @@ macro_rules! def_remote_sink { () => { def_remote_sink! { { ElasticSearch, ElasticSearchSink, "elasticsearch" } + { Opensearch, OpensearchSink, "opensearch"} { Cassandra, CassandraSink, "cassandra" } { Jdbc, JdbcSink, "jdbc", |desc| { desc.sink_type.is_append_only() @@ -164,7 +165,7 @@ impl Sink for RemoteSink { } async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> { - if sink_name == ElasticSearchSink::SINK_NAME + if is_es_sink(sink_name) && param.downstream_pk.len() > 1 && !param.properties.contains_key(ES_OPTION_DELIMITER) { @@ -189,7 +190,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe | DataType::Jsonb | DataType::Bytea => Ok(()), DataType::List(list) => { - if (sink_name==ElasticSearchSink::SINK_NAME) | matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ + if is_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ Ok(()) } else{ Err(SinkError::Remote(anyhow!( @@ -200,7 +201,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe } }, DataType::Struct(_) => { - if sink_name==ElasticSearchSink::SINK_NAME{ + if is_es_sink(sink_name){ Ok(()) }else{ Err(SinkError::Remote(anyhow!( @@ -263,7 +264,7 @@ impl RemoteLogSinker { sink_name: &str, ) -> Result { let sink_proto = sink_param.to_proto(); - let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME { + let payload_schema = if is_es_sink(sink_name) { let columns = vec![ ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(), ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(),