diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index 3b58878295311..d2873fac9d216 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -29,6 +29,7 @@ import com.risingwave.proto.Data.Op; import java.io.IOException; import java.util.Map; +import org.apache.http.HttpHost; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; @@ -52,15 +53,14 @@ static TableSchema getTestTableSchema() { public void testEsSink(ElasticsearchContainer container, String username, String password) throws IOException { - EsSink sink = - new EsSink( - new EsSinkConfig(container.getHttpHostAddress()) - .setConnector("elasticsearch") - .withIndex("test") - .withDelimiter("$") - .withUsername(username) - .withPassword(password), - getTestTableSchema()); + EsSinkConfig config = + new EsSinkConfig(container.getHttpHostAddress()) + .withIndex("test") + .withDelimiter("$") + .withUsername(username) + .withPassword(password); + config.setConnector("elasticsearch"); + EsSink sink = new EsSink(config, getTestTableSchema()); sink.write( Iterators.forArray( new ArraySinkRow( @@ -75,8 +75,9 @@ public void testEsSink(ElasticsearchContainer container, String username, String fail(e.getMessage()); } + HttpHost host = HttpHost.create(config.getUrl()); ElasticRestHighLevelClientAdapter client = - (ElasticRestHighLevelClientAdapter) sink.getClient(); + new ElasticRestHighLevelClientAdapter(host, config); SearchRequest searchRequest = new SearchRequest("test"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java new file mode 100644 index 0000000000000..4ce1165ba1baf --- /dev/null +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/BulkListener.java @@ -0,0 +1,97 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.risingwave.connector.EsSink.RequestTracker; +import org.elasticsearch.action.bulk.BulkRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BulkListener + implements org.elasticsearch.action.bulk.BulkProcessor.Listener, + org.opensearch.action.bulk.BulkProcessor.Listener { + private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); + private final RequestTracker requestTracker; + + public BulkListener(RequestTracker requestTracker) { + this.requestTracker = requestTracker; + } + + @Override + public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + + @Override + public void afterBulk( + long executionId, + org.elasticsearch.action.bulk.BulkRequest request, + org.elasticsearch.action.bulk.BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); + } else { + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); + } + } + + /** This method is called when the bulk failed and raised a Throwable */ + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); + } + + @Override + public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) { + LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions()); + } + + @Override + public void afterBulk( + long executionId, + org.opensearch.action.bulk.BulkRequest request, + org.opensearch.action.bulk.BulkResponse response) { + if (response.hasFailures()) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), response.buildFailureMessage()); + this.requestTracker.addErrResult(errMessage); + } else { + this.requestTracker.addOkResult(request.numberOfActions()); + LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions()); + } + } + + @Override + public void afterBulk( + long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) { + String errMessage = + String.format( + "Bulk of %d actions failed. Failure: %s", + request.numberOfActions(), failure.getMessage()); + this.requestTracker.addErrResult(errMessage); + } +} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java index 8e3c57911118f..de6ab3414f65a 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticBulkProcessorAdapter.java @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; @@ -90,78 +89,3 @@ public void deleteRow(String index, String key) { this.esBulkProcessor.add(deleteRequest); } } - -class BulkListener - implements org.elasticsearch.action.bulk.BulkProcessor.Listener, - org.opensearch.action.bulk.BulkProcessor.Listener { - private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); - private final RequestTracker requestTracker; - - public BulkListener(RequestTracker requestTracker) { - this.requestTracker = requestTracker; - } - - @Override - public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) { - LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - - @Override - public void afterBulk( - long executionId, - org.elasticsearch.action.bulk.BulkRequest request, - org.elasticsearch.action.bulk.BulkResponse response) { - if (response.hasFailures()) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), response.buildFailureMessage()); - this.requestTracker.addErrResult(errMessage); - } else { - this.requestTracker.addOkResult(request.numberOfActions()); - LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions()); - } - } - - /** This method is called when the bulk failed and raised a Throwable */ - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), failure.getMessage()); - this.requestTracker.addErrResult(errMessage); - } - - @Override - public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) { - LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions()); - } - - @Override - public void afterBulk( - long executionId, - org.opensearch.action.bulk.BulkRequest request, - org.opensearch.action.bulk.BulkResponse response) { - if (response.hasFailures()) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), response.buildFailureMessage()); - this.requestTracker.addErrResult(errMessage); - } else { - this.requestTracker.addOkResult(request.numberOfActions()); - LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions()); - } - } - - @Override - public void afterBulk( - long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) { - String errMessage = - String.format( - "Bulk of %d actions failed. Failure: %s", - request.numberOfActions(), failure.getMessage()); - this.requestTracker.addErrResult(errMessage); - } -} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java index 65e531bf8ea13..c64def3bef8a7 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/ElasticRestHighLevelClientAdapter.java @@ -34,7 +34,7 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClientBuilder; -public class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter { +public class ElasticRestHighLevelClientAdapter implements AutoCloseable { RestHighLevelClient esClient; private static RestClientBuilder configureRestClientBuilder( diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 3b03e24899056..315fc800a2ef0 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -49,7 +49,6 @@ public class EsSink extends SinkWriterBase { private final EsSinkConfig config; private BulkProcessorAdapter bulkProcessor; - private final RestHighLevelClientAdapter client; // Used to handle the return message of ES and throw errors private final RequestTracker requestTracker; @@ -154,12 +153,10 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { ElasticRestHighLevelClientAdapter client = new ElasticRestHighLevelClientAdapter(host, config); this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client); - this.client = client; } else if (config.getConnector().equals("opensearch")) { OpensearchRestHighLevelClientAdapter client = new OpensearchRestHighLevelClientAdapter(host, config); this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client); - this.client = client; } else { throw new RuntimeException("Sink type must be elasticsearch or opensearch"); } @@ -218,15 +215,10 @@ public void sync() { public void drop() { try { bulkProcessor.awaitClose(100, TimeUnit.SECONDS); - client.close(); } catch (Exception e) { throw io.grpc.Status.INTERNAL .withDescription(String.format(ERROR_REPORT_TEMPLATE, e.getMessage())) .asRuntimeException(); } } - - public RestHighLevelClientAdapter getClient() { - return this.client; - } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java index f962564c3f2a8..5f3773b0a91aa 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/OpensearchRestHighLevelClientAdapter.java @@ -30,7 +30,7 @@ import org.opensearch.client.RestHighLevelClient; import org.opensearch.core.action.ActionListener; -public class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter { +public class OpensearchRestHighLevelClientAdapter implements AutoCloseable { RestHighLevelClient opensearchClient; private static RestClientBuilder configureRestClientBuilder( diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java deleted file mode 100644 index dacddaf3704d2..0000000000000 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/RestHighLevelClientAdapter.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.risingwave.connector; - -import java.io.IOException; - -public interface RestHighLevelClientAdapter { - public void close() throws IOException; -} diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index c45d729bec6be..cdfb0322658ec 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -191,7 +191,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe | DataType::Jsonb | DataType::Bytea => Ok(()), DataType::List(list) => { - if (sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ + if is_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ Ok(()) } else{ Err(SinkError::Remote(anyhow!( @@ -202,7 +202,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe } }, DataType::Struct(_) => { - if sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME{ + if is_es_sink(sink_name){ Ok(()) }else{ Err(SinkError::Remote(anyhow!(