diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index f2894d2e790a2..79636d5590ca6 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -19,9 +19,9 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.risingwave.connector.ElasticRestHighLevelClientAdapter; import com.risingwave.connector.EsSink; import com.risingwave.connector.EsSinkConfig; -import com.risingwave.connector.RestHighLevelClientAdapter; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.ArraySinkRow; import com.risingwave.proto.Data; @@ -74,13 +74,13 @@ public void testEsSink(ElasticsearchContainer container, String username, String fail(e.getMessage()); } - RestHighLevelClientAdapter client = sink.getClient(); + ElasticRestHighLevelClientAdapter client = + (ElasticRestHighLevelClientAdapter) sink.getClient(); SearchRequest searchRequest = new SearchRequest("test"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = - (SearchResponse) client.search(searchRequest, RequestOptions.DEFAULT); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = searchResponse.getHits(); assertEquals(2, hits.getHits().length); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java index a4acc5e1e7e7d..d72ebe2833953 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkProcessorAdapter.java @@ -16,14 +16,12 @@ package com.risingwave.connector; -import com.risingwave.connector.EsSink.RequestTracker; -import com.risingwave.connector.api.sink.SinkRow; import java.util.concurrent.TimeUnit; public interface BulkProcessorAdapter { - public void addRow(SinkRow row, String indexName, RequestTracker requestTracker); + public void addRow(String index, String key, String doc); - public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker); + public void deleteRow(String index, String key); public void flush(); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java deleted file mode 100644 index ac7fc45c6b030..0000000000000 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkRequestConsumerFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.risingwave.connector; - -import java.util.function.BiConsumer; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; - -/** - * {@link ElasticBulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API - * calls across different Elasticsearch versions. - */ -interface ElasticBulkRequestConsumerFactory - extends BiConsumer> {} - -/** - * {@link ElasticBulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API - * calls across different Elasticsearch versions. - */ -interface OpensearchBulkRequestConsumerFactory - extends BiConsumer< - org.opensearch.action.bulk.BulkRequest, - org.opensearch.core.action.ActionListener< - org.opensearch.action.bulk.BulkResponse>> {} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java index 16037caf4ca76..8e3c57911118f 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -17,12 +17,10 @@ package com.risingwave.connector; import com.risingwave.connector.EsSink.RequestTracker; -import com.risingwave.connector.api.sink.SinkRow; import java.util.concurrent.TimeUnit; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; @@ -36,56 +34,17 @@ public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); BulkProcessor esBulkProcessor; - - private class BulkListener implements BulkProcessor.Listener { - private final RequestTracker requestTracker; - - public BulkListener(RequestTracker requestTracker) { - this.requestTracker = requestTracker; - } - - /** This method is called just before bulk is executed. */ - @Override - public void beforeBulk(long executionId, BulkRequest request) { - LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - - /** This method is called after bulk execution. */ - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), response.buildFailureMessage()); - this.requestTracker.addErrResult(errMessage); - } else { - this.requestTracker.addOkResult(request.numberOfActions()); - LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - } - - /** This method is called when the bulk failed and raised a Throwable */ - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), failure.getMessage()); - this.requestTracker.addErrResult(errMessage); - } - } + private final RequestTracker requestTracker; public ElasticBulkProcessorAdapter( RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) { BulkProcessor.Builder builder = BulkProcessor.builder( - (ElasticBulkRequestConsumerFactory) - (bulkRequest, bulkResponseActionListener) -> - client.bulkAsync( - bulkRequest, - RequestOptions.DEFAULT, - bulkResponseActionListener), + (bulkRequest, bulkResponseActionListener) -> + client.bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + bulkResponseActionListener), new BulkListener(requestTracker)); // Possible feature: move these to config // execute the bulk every 10 000 requests @@ -101,6 +60,7 @@ public ElasticBulkProcessorAdapter( builder.setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); this.esBulkProcessor = builder.build(); + this.requestTracker = requestTracker; } @Override @@ -114,34 +74,94 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException } @Override - public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - String doc = (String) row.get(2); - + public void addRow(String index, String key, String doc) { UpdateRequest updateRequest; - if (indexName != null) { - updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON); - } else { - updateRequest = new UpdateRequest(indexName, "_doc", key).doc(doc, XContentType.JSON); - } + updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON); updateRequest.docAsUpsert(true); - requestTracker.addWriteTask(); + this.requestTracker.addWriteTask(); this.esBulkProcessor.add(updateRequest); } @Override - public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - + public void deleteRow(String index, String key) { DeleteRequest deleteRequest; - if (indexName != null) { - deleteRequest = new DeleteRequest(indexName, "_doc", key); + deleteRequest = new DeleteRequest(index, "_doc", key); + this.requestTracker.addWriteTask(); + this.esBulkProcessor.add(deleteRequest); + } +} + +class BulkListener + implements org.elasticsearch.action.bulk.BulkProcessor.Listener, + org.opensearch.action.bulk.BulkProcessor.Listener { + private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + private final RequestTracker requestTracker; + + public BulkListener(RequestTracker requestTracker) { + this.requestTracker = requestTracker; + } + + @Override + public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + + @Override + public void afterBulk( + long executionId, + org.elasticsearch.action.bulk.BulkRequest request, + org.elasticsearch.action.bulk.BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); } else { - deleteRequest = new DeleteRequest(index, "_doc", key); + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); } - requestTracker.addWriteTask(); - this.esBulkProcessor.add(deleteRequest); + } + + /** This method is called when the bulk failed and raised a Throwable */ + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); + } + + @Override + public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions()); + } + + @Override + public void afterBulk( + long executionId, + org.opensearch.action.bulk.BulkRequest request, + org.opensearch.action.bulk.BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); + } else { + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions()); + } + } + + @Override + public void afterBulk( + long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java new file mode 100644 index 0000000000000..65e531bf8ea13 --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java @@ -0,0 +1,89 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import java.io.IOException; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Cancellable; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.RestHighLevelClientBuilder; + +public class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter { + RestHighLevelClient esClient; + + private static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, EsSinkConfig config) { + // Possible config: + // 1. Connection path prefix + // 2. Username and password + if (config.getPassword() != null && config.getUsername() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); + builder.setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + // 3. Timeout + return builder; + } + + public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { + this.esClient = + new RestHighLevelClientBuilder( + configureRestClientBuilder(RestClient.builder(host), config) + .build()) + .setApiCompatibilityMode(true) + .build(); + } + + @Override + public void close() throws IOException { + esClient.close(); + } + + public boolean ping(RequestOptions options) throws IOException { + boolean flag = esClient.ping(options); + return flag; + } + + public Cancellable bulkAsync( + BulkRequest bulkRequest, + RequestOptions options, + ActionListener listener) { + Cancellable cancellable = esClient.bulkAsync(bulkRequest, options, listener); + return cancellable; + } + + public SearchResponse search(SearchRequest searchRequest, RequestOptions options) + throws IOException { + return this.esClient.search(searchRequest, options); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 613e01206a323..3b03e24899056 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -14,7 +14,6 @@ * limitations under the License. */ - package com.risingwave.connector; import com.fasterxml.jackson.core.JsonProcessingException; @@ -167,14 +166,22 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { } private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingException { + final String key = (String) row.get(1); + String doc = (String) row.get(2); + final String index; + if (config.getIndex() == null) { + index = (String) row.get(0); + } else { + index = config.getIndex(); + } switch (row.getOp()) { case INSERT: case UPDATE_INSERT: - this.bulkProcessor.addRow(row, config.getIndex(), requestTracker); + this.bulkProcessor.addRow(index, key, doc); break; case DELETE: case UPDATE_DELETE: - this.bulkProcessor.deleteRow(row, config.getIndex(), requestTracker); + this.bulkProcessor.deleteRow(index, key); break; default: throw Status.INVALID_ARGUMENT diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index 7cf741a66f87c..f6bd427b3aa17 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -14,7 +14,6 @@ * limitations under the License. */ - package com.risingwave.connector; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -26,10 +25,8 @@ import com.risingwave.proto.Catalog; import com.risingwave.proto.Data; import io.grpc.Status; -import java.io.IOException; import java.util.Map; import org.apache.http.HttpHost; -import org.elasticsearch.client.RequestOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,33 +82,31 @@ public void validate( } } - RestHighLevelClientAdapter client; // 2. check connection try { - boolean isConnected; if (config.getConnector().equals("elasticsearch")) { - client = new ElasticRestHighLevelClientAdapter(host, config); - isConnected = client.ping(RequestOptions.DEFAULT); + ElasticRestHighLevelClientAdapter esClient = + new ElasticRestHighLevelClientAdapter(host, config); + if (esClient.ping(org.elasticsearch.client.RequestOptions.DEFAULT)) { + throw Status.INVALID_ARGUMENT + .withDescription("Cannot connect to " + config.getUrl()) + .asRuntimeException(); + } + esClient.close(); } else if (config.getConnector().equals("opensearch")) { - client = new OpensearchRestHighLevelClientAdapter(host, config); - isConnected = client.ping(org.opensearch.client.RequestOptions.DEFAULT); + OpensearchRestHighLevelClientAdapter opensearchClient = + new OpensearchRestHighLevelClientAdapter(host, config); + if (opensearchClient.ping(org.opensearch.client.RequestOptions.DEFAULT)) { + throw Status.INVALID_ARGUMENT + .withDescription("Cannot connect to " + config.getUrl()) + .asRuntimeException(); + } + opensearchClient.close(); } else { throw new RuntimeException("Sink type must be elasticsearch or opensearch"); } - if (!isConnected) { - throw Status.INVALID_ARGUMENT - .withDescription("Cannot connect to " + config.getUrl()) - .asRuntimeException(); - } } catch (Exception e) { throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); } - - // 3. close client - try { - client.close(); - } catch (IOException e) { - throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); - } } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java index 4ae1c430071d1..d5d8cdc3d237d 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchBulkProcessorAdapter.java @@ -17,12 +17,9 @@ package com.risingwave.connector; import com.risingwave.connector.EsSink.RequestTracker; -import com.risingwave.connector.api.sink.SinkRow; import java.util.concurrent.TimeUnit; import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.bulk.BulkProcessor; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.client.RequestOptions; @@ -35,57 +32,18 @@ public class OpensearchBulkProcessorAdapter implements BulkProcessorAdapter { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + private final RequestTracker requestTracker; BulkProcessor opensearchBulkProcessor; - private class BulkListener implements BulkProcessor.Listener { - private final RequestTracker requestTracker; - - public BulkListener(RequestTracker requestTracker) { - this.requestTracker = requestTracker; - } - - /** This method is called just before bulk is executed. */ - @Override - public void beforeBulk(long executionId, BulkRequest request) { - LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - - /** This method is called after bulk execution. */ - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), response.buildFailureMessage()); - this.requestTracker.addErrResult(errMessage); - } else { - this.requestTracker.addOkResult(request.numberOfActions()); - LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - } - - /** This method is called when the bulk failed and raised a Throwable */ - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), failure.getMessage()); - this.requestTracker.addErrResult(errMessage); - } - } - public OpensearchBulkProcessorAdapter( RequestTracker requestTracker, OpensearchRestHighLevelClientAdapter client) { BulkProcessor.Builder builder = BulkProcessor.builder( - (OpensearchBulkRequestConsumerFactory) - (bulkRequest, bulkResponseActionListener) -> - client.bulkAsync( - bulkRequest, - RequestOptions.DEFAULT, - bulkResponseActionListener), + (bulkRequest, bulkResponseActionListener) -> + client.bulkAsync( + bulkRequest, + RequestOptions.DEFAULT, + bulkResponseActionListener), new BulkListener(requestTracker)); // Possible feature: move these to config // execute the bulk every 10 000 requests @@ -101,6 +59,7 @@ public OpensearchBulkProcessorAdapter( builder.setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)); this.opensearchBulkProcessor = builder.build(); + this.requestTracker = requestTracker; } @Override @@ -114,34 +73,19 @@ public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException } @Override - public void addRow(SinkRow row, String indexName, RequestTracker requestTracker) { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - String doc = (String) row.get(2); - + public void addRow(String index, String key, String doc) { UpdateRequest updateRequest; - if (indexName != null) { - updateRequest = new UpdateRequest(indexName, key).doc(doc, XContentType.JSON); - } else { - updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON); - } + updateRequest = new UpdateRequest(index, key).doc(doc, XContentType.JSON); updateRequest.docAsUpsert(true); - requestTracker.addWriteTask(); + this.requestTracker.addWriteTask(); this.opensearchBulkProcessor.add(updateRequest); } @Override - public void deleteRow(SinkRow row, String indexName, RequestTracker requestTracker) { - final String index = (String) row.get(0); - final String key = (String) row.get(1); - + public void deleteRow(String index, String key) { DeleteRequest deleteRequest; - if (indexName != null) { - deleteRequest = new DeleteRequest(indexName, key); - } else { - deleteRequest = new DeleteRequest(index, key); - } - requestTracker.addWriteTask(); + deleteRequest = new DeleteRequest(index, key); + this.requestTracker.addWriteTask(); this.opensearchBulkProcessor.add(deleteRequest); } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java new file mode 100644 index 0000000000000..f962564c3f2a8 --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import java.io.IOException; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.client.Cancellable; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.core.action.ActionListener; + +public class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter { + RestHighLevelClient opensearchClient; + + private static RestClientBuilder configureRestClientBuilder( + RestClientBuilder builder, EsSinkConfig config) { + // Possible config: + // 1. Connection path prefix + // 2. Username and password + if (config.getPassword() != null && config.getUsername() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); + builder.setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } + // 3. Timeout + return builder; + } + + public OpensearchRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { + this.opensearchClient = + new org.opensearch.client.RestHighLevelClient( + configureRestClientBuilder( + org.opensearch.client.RestClient.builder(host), config)); + } + + @Override + public void close() throws IOException { + opensearchClient.close(); + } + + public boolean ping(org.opensearch.client.RequestOptions options) throws IOException { + boolean flag = opensearchClient.ping(options); + return flag; + } + + public Cancellable bulkAsync( + BulkRequest bulkRequest, + RequestOptions options, + ActionListener listener) { + Cancellable cancellable = opensearchClient.bulkAsync(bulkRequest, options, listener); + return cancellable; + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java index d441719efe3c6..dacddaf3704d2 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java @@ -17,140 +17,7 @@ package com.risingwave.connector; import java.io.IOException; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.client.RestHighLevelClientBuilder; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.client.Cancellable; -import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestClient; -import org.opensearch.client.RestClientBuilder; -import org.opensearch.client.RestHighLevelClient; -import org.opensearch.core.action.ActionListener; public interface RestHighLevelClientAdapter { public void close() throws IOException; - - public boolean ping(Object options) throws IOException; - - public Object search(Object searchRequest, Object options) throws IOException; -} - -class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter { - org.elasticsearch.client.RestHighLevelClient esClient; - - private static org.elasticsearch.client.RestClientBuilder configureRestClientBuilder( - org.elasticsearch.client.RestClientBuilder builder, EsSinkConfig config) { - // Possible config: - // 1. Connection path prefix - // 2. Username and password - if (config.getPassword() != null && config.getUsername() != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); - builder.setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - } - // 3. Timeout - return builder; - } - - public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { - this.esClient = - new RestHighLevelClientBuilder( - configureRestClientBuilder( - org.elasticsearch.client.RestClient.builder(host), - config) - .build()) - .setApiCompatibilityMode(true) - .build(); - } - - @Override - public void close() throws IOException { - esClient.close(); - } - - @Override - public boolean ping(Object options) throws IOException { - boolean flag = esClient.ping((org.elasticsearch.client.RequestOptions) options); - return flag; - } - - public org.elasticsearch.client.Cancellable bulkAsync( - org.elasticsearch.action.bulk.BulkRequest bulkRequest, - org.elasticsearch.client.RequestOptions options, - org.elasticsearch.action.ActionListener - listener) { - org.elasticsearch.client.Cancellable cancellable = - esClient.bulkAsync(bulkRequest, options, listener); - return cancellable; - } - - @Override - public Object search(Object searchRequest, Object options) throws IOException { - return this.esClient.search( - (org.elasticsearch.action.search.SearchRequest) searchRequest, - (org.elasticsearch.client.RequestOptions) options); - } -} - -class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter { - RestHighLevelClient opensearchClient; - - private static RestClientBuilder configureRestClientBuilder( - RestClientBuilder builder, EsSinkConfig config) { - // Possible config: - // 1. Connection path prefix - // 2. Username and password - if (config.getPassword() != null && config.getUsername() != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(config.getUsername(), config.getPassword())); - builder.setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - } - // 3. Timeout - return builder; - } - - public OpensearchRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) { - this.opensearchClient = - new RestHighLevelClient( - configureRestClientBuilder(RestClient.builder(host), config)); - } - - @Override - public void close() throws IOException { - opensearchClient.close(); - } - - @Override - public boolean ping(Object options) throws IOException { - boolean flag = opensearchClient.ping((RequestOptions) options); - return flag; - } - - public Cancellable bulkAsync( - BulkRequest bulkRequest, - RequestOptions options, - ActionListener listener) { - Cancellable cancellable = opensearchClient.bulkAsync(bulkRequest, options, listener); - return cancellable; - } - - @Override - public Object search(Object searchRequest, Object options) throws IOException { - return this.opensearchClient.search( - (org.opensearch.action.search.SearchRequest) searchRequest, - (org.opensearch.client.RequestOptions) options); - } } diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs index 24f15e2537fdc..b3cfe3b9a801b 100644 --- a/src/connector/src/sink/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch.rs @@ -40,7 +40,7 @@ impl StreamChunkConverter { pk_indices: &Vec, properties: &HashMap, ) -> Result { - if sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME { + if is_es_sink(sink_name) { let index_column = properties .get(ES_OPTION_INDEX_COLUMN) .cloned() @@ -170,3 +170,7 @@ impl EsStreamChunkConverter { (self.fn_build_id)(row) } } + +pub fn is_es_sink(sink_name: &str) -> bool { + sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME +} diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index cd666242af06a..c45d729bec6be 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -57,7 +57,7 @@ use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; -use super::elasticsearch::{StreamChunkConverter, ES_OPTION_DELIMITER}; +use super::elasticsearch::{is_es_sink, StreamChunkConverter, ES_OPTION_DELIMITER}; use crate::error::ConnectorResult; use crate::sink::catalog::desc::SinkDesc; use crate::sink::coordinate::CoordinatedSinkWriter; @@ -166,7 +166,7 @@ impl Sink for RemoteSink { } async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorResult<()> { - if (sink_name == ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) + if is_es_sink(sink_name) && param.downstream_pk.len() > 1 && param.properties.get(ES_OPTION_DELIMITER).is_none() {