Skip to content

Commit

Permalink
feat(sink): refator es sink to support opensearch (#16330) (#17316)
Browse files Browse the repository at this point in the history
Co-authored-by: Xinhao Xu <84456268+xxhZs@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and xxhZs authored Jun 27, 2024
1 parent e56299d commit 6e3d70c
Showing 16 changed files with 579 additions and 252 deletions.
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ public static SinkFactory getSinkFactory(String sinkName) {
case "jdbc":
return new JDBCSinkFactory();
case "elasticsearch":
case "opensearch":
return new EsSinkFactory();
case "cassandra":
return new CassandraFactory();
4 changes: 2 additions & 2 deletions java/connector-node/risingwave-connector-test/pom.xml
Original file line number Diff line number Diff line change
@@ -128,13 +128,13 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<version>2.13.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<version>2.13.5</version>
<scope>test</scope>
</dependency>

Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.risingwave.connector.ElasticRestHighLevelClientAdapter;
import com.risingwave.connector.EsSink;
import com.risingwave.connector.EsSinkConfig;
import com.risingwave.connector.api.TableSchema;
@@ -28,10 +29,10 @@
import com.risingwave.proto.Data.Op;
import java.io.IOException;
import java.util.Map;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@@ -52,14 +53,14 @@ static TableSchema getTestTableSchema() {

public void testEsSink(ElasticsearchContainer container, String username, String password)
throws IOException {
EsSink sink =
new EsSink(
new EsSinkConfig(container.getHttpHostAddress())
.withIndex("test")
.withDelimiter("$")
.withUsername(username)
.withPassword(password),
getTestTableSchema());
EsSinkConfig config =
new EsSinkConfig(container.getHttpHostAddress())
.withIndex("test")
.withDelimiter("$")
.withUsername(username)
.withPassword(password);
config.setConnector("elasticsearch");
EsSink sink = new EsSink(config, getTestTableSchema());
sink.write(
Iterators.forArray(
new ArraySinkRow(
@@ -74,7 +75,9 @@ public void testEsSink(ElasticsearchContainer container, String username, String
fail(e.getMessage());
}

RestHighLevelClient client = sink.getClient();
HttpHost host = HttpHost.create(config.getUrl());
ElasticRestHighLevelClientAdapter client =
new ElasticRestHighLevelClientAdapter(host, config);
SearchRequest searchRequest = new SearchRequest("test");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
8 changes: 8 additions & 0 deletions java/connector-node/risingwave-sink-es-7/pom.xml
Original file line number Diff line number Diff line change
@@ -51,6 +51,14 @@
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>org.opensearch</groupId>
<artifactId>opensearch</artifactId>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2024 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector;

import com.risingwave.connector.EsSink.RequestTracker;
import org.elasticsearch.action.bulk.BulkRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BulkListener
implements org.elasticsearch.action.bulk.BulkProcessor.Listener,
org.opensearch.action.bulk.BulkProcessor.Listener {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private final RequestTracker requestTracker;

public BulkListener(RequestTracker requestTracker) {
this.requestTracker = requestTracker;
}

@Override
public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) {
LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
}

@Override
public void afterBulk(
long executionId,
org.elasticsearch.action.bulk.BulkRequest request,
org.elasticsearch.action.bulk.BulkResponse response) {
if (response.hasFailures()) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), response.buildFailureMessage());
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
}
}

/** This method is called when the bulk failed and raised a Throwable */
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), failure.getMessage());
this.requestTracker.addErrResult(errMessage);
}

@Override
public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) {
LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions());
}

@Override
public void afterBulk(
long executionId,
org.opensearch.action.bulk.BulkRequest request,
org.opensearch.action.bulk.BulkResponse response) {
if (response.hasFailures()) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), response.buildFailureMessage());
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions());
}
}

@Override
public void afterBulk(
long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), failure.getMessage());
this.requestTracker.addErrResult(errMessage);
}
}
Original file line number Diff line number Diff line change
@@ -16,14 +16,14 @@

package com.risingwave.connector;

import java.util.function.BiConsumer;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import java.util.concurrent.TimeUnit;

/**
* {@link BulkRequestConsumerFactory} is used to bridge incompatible Elasticsearch Java API calls
* across different Elasticsearch versions.
*/
interface BulkRequestConsumerFactory
extends BiConsumer<BulkRequest, ActionListener<BulkResponse>> {}
public interface BulkProcessorAdapter {
public void addRow(String index, String key, String doc);

public void deleteRow(String index, String key);

public void flush();

public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2024 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector;

import com.risingwave.connector.EsSink.RequestTracker;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticBulkProcessorAdapter implements BulkProcessorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
BulkProcessor esBulkProcessor;
private final RequestTracker requestTracker;

public ElasticBulkProcessorAdapter(
RequestTracker requestTracker, ElasticRestHighLevelClientAdapter client) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) ->
client.bulkAsync(
bulkRequest,
RequestOptions.DEFAULT,
bulkResponseActionListener),
new BulkListener(requestTracker));
// Possible feature: move these to config
// execute the bulk every 10 000 requests
builder.setBulkActions(1000);
// flush the bulk every 5mb
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
// flush the bulk every 5 seconds whatever the number of requests
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
// Set the number of concurrent requests
builder.setConcurrentRequests(1);
// Set a custom backoff policy which will initially wait for 100ms, increase exponentially
// and retries up to three times.
builder.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3));
this.esBulkProcessor = builder.build();
this.requestTracker = requestTracker;
}

@Override
public void flush() {
esBulkProcessor.flush();
}

@Override
public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
esBulkProcessor.awaitClose(timeout, unit);
}

@Override
public void addRow(String index, String key, String doc) {
UpdateRequest updateRequest;
updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(updateRequest);
}

@Override
public void deleteRow(String index, String key) {
DeleteRequest deleteRequest;
deleteRequest = new DeleteRequest(index, "_doc", key);
this.requestTracker.addWriteTask();
this.esBulkProcessor.add(deleteRequest);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2024 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector;

import java.io.IOException;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;

public class ElasticRestHighLevelClientAdapter implements AutoCloseable {
RestHighLevelClient esClient;

private static RestClientBuilder configureRestClientBuilder(
RestClientBuilder builder, EsSinkConfig config) {
// Possible config:
// 1. Connection path prefix
// 2. Username and password
if (config.getPassword() != null && config.getUsername() != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
builder.setHttpClientConfigCallback(
httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
// 3. Timeout
return builder;
}

public ElasticRestHighLevelClientAdapter(HttpHost host, EsSinkConfig config) {
this.esClient =
new RestHighLevelClientBuilder(
configureRestClientBuilder(RestClient.builder(host), config)
.build())
.setApiCompatibilityMode(true)
.build();
}

@Override
public void close() throws IOException {
esClient.close();
}

public boolean ping(RequestOptions options) throws IOException {
boolean flag = esClient.ping(options);
return flag;
}

public Cancellable bulkAsync(
BulkRequest bulkRequest,
RequestOptions options,
ActionListener<BulkResponse> listener) {
Cancellable cancellable = esClient.bulkAsync(bulkRequest, options, listener);
return cancellable;
}

public SearchResponse search(SearchRequest searchRequest, RequestOptions options)
throws IOException {
return this.esClient.search(searchRequest, options);
}
}
Loading

0 comments on commit 6e3d70c

Please sign in to comment.