Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
fix

fix ci

fix ci
  • Loading branch information
xxhZs committed May 27, 2024
1 parent c57df29 commit 5848441
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.risingwave.proto.Data.Op;
import java.io.IOException;
import java.util.Map;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
Expand All @@ -52,15 +53,14 @@ static TableSchema getTestTableSchema() {

public void testEsSink(ElasticsearchContainer container, String username, String password)
throws IOException {
EsSink sink =
new EsSink(
new EsSinkConfig(container.getHttpHostAddress())
.setConnector("elasticsearch")
.withIndex("test")
.withDelimiter("$")
.withUsername(username)
.withPassword(password),
getTestTableSchema());
EsSinkConfig config =
new EsSinkConfig(container.getHttpHostAddress())
.withIndex("test")
.withDelimiter("$")
.withUsername(username)
.withPassword(password);
config.setConnector("elasticsearch");
EsSink sink = new EsSink(config, getTestTableSchema());
sink.write(
Iterators.forArray(
new ArraySinkRow(
Expand All @@ -75,8 +75,9 @@ public void testEsSink(ElasticsearchContainer container, String username, String
fail(e.getMessage());
}

HttpHost host = HttpHost.create(config.getUrl());
ElasticRestHighLevelClientAdapter client =
(ElasticRestHighLevelClientAdapter) sink.getClient();
new ElasticRestHighLevelClientAdapter(host, config);
SearchRequest searchRequest = new SearchRequest("test");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2024 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector;

import com.risingwave.connector.EsSink.RequestTracker;
import org.elasticsearch.action.bulk.BulkRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BulkListener
implements org.elasticsearch.action.bulk.BulkProcessor.Listener,
org.opensearch.action.bulk.BulkProcessor.Listener {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private final RequestTracker requestTracker;

public BulkListener(RequestTracker requestTracker) {
this.requestTracker = requestTracker;
}

@Override
public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) {
LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
}

@Override
public void afterBulk(
long executionId,
org.elasticsearch.action.bulk.BulkRequest request,
org.elasticsearch.action.bulk.BulkResponse response) {
if (response.hasFailures()) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), response.buildFailureMessage());
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
}
}

/** This method is called when the bulk failed and raised a Throwable */
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), failure.getMessage());
this.requestTracker.addErrResult(errMessage);
}

@Override
public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) {
LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions());
}

@Override
public void afterBulk(
long executionId,
org.opensearch.action.bulk.BulkRequest request,
org.opensearch.action.bulk.BulkResponse response) {
if (response.hasFailures()) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), response.buildFailureMessage());
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions());
}
}

@Override
public void afterBulk(
long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), failure.getMessage());
this.requestTracker.addErrResult(errMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
Expand Down Expand Up @@ -90,78 +89,3 @@ public void deleteRow(String index, String key) {
this.esBulkProcessor.add(deleteRequest);
}
}

class BulkListener
implements org.elasticsearch.action.bulk.BulkProcessor.Listener,
org.opensearch.action.bulk.BulkProcessor.Listener {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private final RequestTracker requestTracker;

public BulkListener(RequestTracker requestTracker) {
this.requestTracker = requestTracker;
}

@Override
public void beforeBulk(long executionId, org.elasticsearch.action.bulk.BulkRequest request) {
LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
}

@Override
public void afterBulk(
long executionId,
org.elasticsearch.action.bulk.BulkRequest request,
org.elasticsearch.action.bulk.BulkResponse response) {
if (response.hasFailures()) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), response.buildFailureMessage());
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
}
}

/** This method is called when the bulk failed and raised a Throwable */
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), failure.getMessage());
this.requestTracker.addErrResult(errMessage);
}

@Override
public void beforeBulk(long executionId, org.opensearch.action.bulk.BulkRequest request) {
LOG.debug("Sending bulk of {} actions to Opensearch.", request.numberOfActions());
}

@Override
public void afterBulk(
long executionId,
org.opensearch.action.bulk.BulkRequest request,
org.opensearch.action.bulk.BulkResponse response) {
if (response.hasFailures()) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), response.buildFailureMessage());
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Opensearch.", request.numberOfActions());
}
}

@Override
public void afterBulk(
long executionId, org.opensearch.action.bulk.BulkRequest request, Throwable failure) {
String errMessage =
String.format(
"Bulk of %d actions failed. Failure: %s",
request.numberOfActions(), failure.getMessage());
this.requestTracker.addErrResult(errMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;

public class ElasticRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
public class ElasticRestHighLevelClientAdapter implements AutoCloseable {
RestHighLevelClient esClient;

private static RestClientBuilder configureRestClientBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public class EsSink extends SinkWriterBase {

private final EsSinkConfig config;
private BulkProcessorAdapter bulkProcessor;
private final RestHighLevelClientAdapter client;

// Used to handle the return message of ES and throw errors
private final RequestTracker requestTracker;
Expand Down Expand Up @@ -154,12 +153,10 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
ElasticRestHighLevelClientAdapter client =
new ElasticRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new ElasticBulkProcessorAdapter(this.requestTracker, client);
this.client = client;
} else if (config.getConnector().equals("opensearch")) {
OpensearchRestHighLevelClientAdapter client =
new OpensearchRestHighLevelClientAdapter(host, config);
this.bulkProcessor = new OpensearchBulkProcessorAdapter(this.requestTracker, client);
this.client = client;
} else {
throw new RuntimeException("Sink type must be elasticsearch or opensearch");
}
Expand Down Expand Up @@ -218,15 +215,10 @@ public void sync() {
public void drop() {
try {
bulkProcessor.awaitClose(100, TimeUnit.SECONDS);
client.close();
} catch (Exception e) {
throw io.grpc.Status.INTERNAL
.withDescription(String.format(ERROR_REPORT_TEMPLATE, e.getMessage()))
.asRuntimeException();
}
}

public RestHighLevelClientAdapter getClient() {
return this.client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.core.action.ActionListener;

public class OpensearchRestHighLevelClientAdapter implements RestHighLevelClientAdapter {
public class OpensearchRestHighLevelClientAdapter implements AutoCloseable {
RestHighLevelClient opensearchClient;

private static RestClientBuilder configureRestClientBuilder(
Expand Down

This file was deleted.

4 changes: 2 additions & 2 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
| DataType::Jsonb
| DataType::Bytea => Ok(()),
DataType::List(list) => {
if (sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
if is_es_sink(sink_name) || matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){
Ok(())
} else{
Err(SinkError::Remote(anyhow!(
Expand All @@ -202,7 +202,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
}
},
DataType::Struct(_) => {
if sink_name==ElasticSearchSink::SINK_NAME || sink_name == OpensearchSink::SINK_NAME{
if is_es_sink(sink_name){
Ok(())
}else{
Err(SinkError::Remote(anyhow!(
Expand Down

0 comments on commit 5848441

Please sign in to comment.