Skip to content

Commit

Permalink
feat(sink): support elasticsearch 8 sink (#12269)
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 authored and Li0k committed Sep 15, 2023
1 parent d8c3b6b commit 9d58b90
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 30 deletions.
2 changes: 1 addition & 1 deletion e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CREATE TABLE t7 (v1 int primary key, v2 bigint, v3 varchar);

statement ok
CREATE SINK s7 AS select t7.v1 as v1, t7.v2 as v2, t7.v3 as v3 from t7 WITH (
connector = 'elasticsearch-7',
connector = 'elasticsearch',
index = 'test',
url = 'http://elasticsearch:9200',
username = 'elastic',
Expand Down
41 changes: 41 additions & 0 deletions integration_tests/elasticsearch-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Demo: Sinking to ElasticSearch

In this demo, we want to showcase how RisingWave is able to sink data to ElasticSearch.

1. Set the compose profile accordingly:
Demo with elasticsearch 7:
```
export COMPOSE_PROFILES=es7
```

Demo with elasticsearch 8
```
export COMPOSE_PROFILES=es8
```

2. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a single-node elasticsearch for sink.

3. Execute the SQL queries in sequence:

- create_source.sql
- create_mv.sql
- create_es[7/8]_sink.sql

4. Check the contents in ES:

```sh
# Check the document counts
curl -XGET -u elastic:risingwave "http://localhost:9200/test/_count" -H 'Content-Type: application/json'

# Check the content of a document by user_id
curl -XGET -u elastic:risingwave "http://localhost:9200/test/_search" -H 'Content-Type: application/json' -d '{"query":{"term": {"user_id":2}}' | jq

# Get the first 10 documents sort by user_id
curl -XGET -u elastic:risingwave "http://localhost:9200/test/_search?size=10" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}, "sort": ["user_id"]}' | jq
```
9 changes: 9 additions & 0 deletions integration_tests/elasticsearch-sink/create_es7_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE SINK bhv_es_sink
FROM
bhv_mv WITH (
connector = 'elasticsearch',
index = 'test',
url = 'http://elasticsearch8:9200',
username = 'elastic',
password = 'risingwave'
);
9 changes: 9 additions & 0 deletions integration_tests/elasticsearch-sink/create_es8_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE SINK bhv_es_sink
FROM
bhv_mv WITH (
connector = 'elasticsearch',
index = 'test',
url = 'http://elasticsearch8:9200',
username = 'elastic',
password = 'risingwave'
);
7 changes: 7 additions & 0 deletions integration_tests/elasticsearch-sink/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
18 changes: 18 additions & 0 deletions integration_tests/elasticsearch-sink/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
73 changes: 73 additions & 0 deletions integration_tests/elasticsearch-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
version: "3"
services:
elasticsearch7:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
environment:
- xpack.security.enabled=true
- discovery.type=single-node
- ELASTIC_PASSWORD=risingwave
ports:
- 9200:9200
profiles:
- es7
elasticsearch8:
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.0
environment:
- xpack.security.enabled=true
- discovery.type=single-node
- ELASTIC_PASSWORD=risingwave
ports:
- 9200:9200
profiles:
- es8
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
connector-node:
extends:
file: ../../docker/docker-compose.yml
service: connector-node
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
2 changes: 1 addition & 1 deletion java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def test_jdbc_sink(input_file, param):

def test_elasticsearch_sink(param):
prop = {
"connector": "elasticsearch-7",
"connector": "elasticsearch",
"url": "http://127.0.0.1:9200",
"index": "test",
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public static SinkFactory getSinkFactory(String sinkName) {
return new IcebergSinkFactory();
case "deltalake":
return new DeltaLakeSinkFactory();
case "elasticsearch-7":
return new EsSink7Factory();
case "elasticsearch":
return new EsSinkFactory();
case "cassandra":
return new CassandraFactory();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.risingwave.connector.EsSink7;
import com.risingwave.connector.EsSink7Config;
import com.risingwave.connector.EsSink;
import com.risingwave.connector.EsSinkConfig;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.ArraySinkRow;
import com.risingwave.proto.Data;
Expand All @@ -39,7 +39,7 @@
import org.junit.Test;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class EsSink7Test {
public class EsSinkTest {

static TableSchema getTestTableSchema() {
return new TableSchema(
Expand All @@ -52,9 +52,9 @@ static TableSchema getTestTableSchema() {

public void testEsSink(ElasticsearchContainer container, String username, String password)
throws IOException {
EsSink7 sink =
new EsSink7(
new EsSink7Config(container.getHttpHostAddress(), "test")
EsSink sink =
new EsSink(
new EsSinkConfig(container.getHttpHostAddress(), "test")
.withDelimiter("$")
.withUsername(username)
.withPassword(password),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
Expand All @@ -55,17 +56,17 @@
*
* 4. bulkprocessor and high-level-client are deprecated in es 8 java api.
*/
public class EsSink7 extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink7.class);
public class EsSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s";

private final EsSink7Config config;
private final EsSinkConfig config;
private final BulkProcessor bulkProcessor;
private final RestHighLevelClient client;
// For bulk listener
private final List<Integer> primaryKeyIndexes;

public EsSink7(EsSink7Config config, TableSchema tableSchema) {
public EsSink(EsSinkConfig config, TableSchema tableSchema) {
super(tableSchema);
HttpHost host;
try {
Expand All @@ -75,9 +76,14 @@ public EsSink7(EsSink7Config config, TableSchema tableSchema) {
}

this.config = config;

// ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever.
this.client =
new RestHighLevelClient(
configureRestClientBuilder(RestClient.builder(host), config));
new RestHighLevelClientBuilder(
configureRestClientBuilder(RestClient.builder(host), config)
.build())
.setApiCompatibilityMode(true)
.build();
// Test connection
try {
boolean isConnected = this.client.ping(RequestOptions.DEFAULT);
Expand All @@ -98,7 +104,7 @@ public EsSink7(EsSink7Config config, TableSchema tableSchema) {
}

private static RestClientBuilder configureRestClientBuilder(
RestClientBuilder builder, EsSink7Config config) {
RestClientBuilder builder, EsSinkConfig config) {
// Possible config:
// 1. Connection path prefix
// 2. Username and password
Expand All @@ -116,7 +122,7 @@ private static RestClientBuilder configureRestClientBuilder(
}

private BulkProcessor.Builder applyBulkConfig(
RestHighLevelClient client, EsSink7Config config, BulkProcessor.Listener listener) {
RestHighLevelClient client, EsSinkConfig config, BulkProcessor.Listener listener) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
(BulkRequestConsumerFactory)
Expand Down Expand Up @@ -181,7 +187,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
private Map<String, Object> buildDoc(SinkRow row) {
Map<String, Object> doc = new HashMap();
for (int i = 0; i < getTableSchema().getNumColumns(); i++) {
doc.put(getTableSchema().getColumnDesc(i).getName(), row.get(i));
Object col = row.get(i);
if (col instanceof Date) {
// es client doesn't natively support java.sql.Timestamp/Time/Date
// so we need to convert Date type into a string as suggested in
// https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292
col = col.toString();
}
doc.put(getTableSchema().getColumnDesc(i).getName(), col);
}
return doc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.risingwave.connector.api.sink.CommonSinkConfig;

public class EsSink7Config extends CommonSinkConfig {
public class EsSinkConfig extends CommonSinkConfig {
/** Required */
private String url;

Expand All @@ -38,7 +38,7 @@ public class EsSink7Config extends CommonSinkConfig {
private String password;

@JsonCreator
public EsSink7Config(
public EsSinkConfig(
@JsonProperty(value = "url") String url, @JsonProperty(value = "index") String index) {
this.url = url;
this.index = index;
Expand All @@ -64,17 +64,17 @@ public String getPassword() {
return password;
}

public EsSink7Config withDelimiter(String delimiter) {
public EsSinkConfig withDelimiter(String delimiter) {
this.delimiter = delimiter;
return this;
}

public EsSink7Config withUsername(String username) {
public EsSinkConfig withUsername(String username) {
this.username = username;
return this;
}

public EsSink7Config withPassword(String password) {
public EsSinkConfig withPassword(String password) {
this.password = password;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EsSink7Factory implements SinkFactory {
private static final Logger LOG = LoggerFactory.getLogger(EsSink7Factory.class);
public class EsSinkFactory implements SinkFactory {
private static final Logger LOG = LoggerFactory.getLogger(EsSinkFactory.class);

public SinkWriter createWriter(TableSchema tableSchema, Map<String, String> tableProperties) {
ObjectMapper mapper = new ObjectMapper();
EsSink7Config config = mapper.convertValue(tableProperties, EsSink7Config.class);
return new SinkWriterV1.Adapter(new EsSink7(config, tableSchema));
EsSinkConfig config = mapper.convertValue(tableProperties, EsSinkConfig.class);
return new SinkWriterV1.Adapter(new EsSink(config, tableSchema));
}

@Override
Expand All @@ -52,7 +52,7 @@ public void validate(
Catalog.SinkType sinkType) {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true);
EsSink7Config config = mapper.convertValue(tableProperties, EsSink7Config.class);
EsSinkConfig config = mapper.convertValue(tableProperties, EsSinkConfig.class);

// 1. check url
HttpHost host;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub const VALID_REMOTE_SINKS: [&str; 5] = [
"jdbc",
REMOTE_ICEBERG_SINK,
"deltalake",
"elasticsearch-7",
"elasticsearch",
"cassandra",
];

Expand Down

0 comments on commit 9d58b90

Please sign in to comment.