diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt index 2728f33da04e5..cecf9a47cf94a 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt @@ -3,7 +3,7 @@ CREATE TABLE t7 (v1 int primary key, v2 bigint, v3 varchar); statement ok CREATE SINK s7 AS select t7.v1 as v1, t7.v2 as v2, t7.v3 as v3 from t7 WITH ( - connector = 'elasticsearch-7', + connector = 'elasticsearch', index = 'test', url = 'http://elasticsearch:9200', username = 'elastic', diff --git a/integration_tests/elasticsearch-sink/README.md b/integration_tests/elasticsearch-sink/README.md new file mode 100644 index 0000000000000..b114e8132024a --- /dev/null +++ b/integration_tests/elasticsearch-sink/README.md @@ -0,0 +1,41 @@ +# Demo: Sinking to ElasticSearch + +In this demo, we want to showcase how RisingWave is able to sink data to ElasticSearch. + +1. Set the compose profile accordingly: +Demo with elasticsearch 7: +``` +export COMPOSE_PROFILES=es7 +``` + +Demo with elasticsearch 8 +``` +export COMPOSE_PROFILES=es8 +``` + +2. Launch the cluster: + +```sh +docker-compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a single-node elasticsearch for sink. + +3. Execute the SQL queries in sequence: + +- create_source.sql +- create_mv.sql +- create_es[7/8]_sink.sql + +4. Check the contents in ES: + +```sh +# Check the document counts +curl -XGET -u elastic:risingwave "http://localhost:9200/test/_count" -H 'Content-Type: application/json' + +# Check the content of a document by user_id +curl -XGET -u elastic:risingwave "http://localhost:9200/test/_search" -H 'Content-Type: application/json' -d '{"query":{"term": {"user_id":2}}' | jq + +# Get the first 10 documents sort by user_id +curl -XGET -u elastic:risingwave "http://localhost:9200/test/_search?size=10" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}, "sort": ["user_id"]}' | jq +``` \ No newline at end of file diff --git a/integration_tests/elasticsearch-sink/create_es7_sink.sql b/integration_tests/elasticsearch-sink/create_es7_sink.sql new file mode 100644 index 0000000000000..997c238b90344 --- /dev/null +++ b/integration_tests/elasticsearch-sink/create_es7_sink.sql @@ -0,0 +1,9 @@ +CREATE SINK bhv_es_sink +FROM + bhv_mv WITH ( + connector = 'elasticsearch', + index = 'test', + url = 'http://elasticsearch8:9200', + username = 'elastic', + password = 'risingwave' +); \ No newline at end of file diff --git a/integration_tests/elasticsearch-sink/create_es8_sink.sql b/integration_tests/elasticsearch-sink/create_es8_sink.sql new file mode 100644 index 0000000000000..997c238b90344 --- /dev/null +++ b/integration_tests/elasticsearch-sink/create_es8_sink.sql @@ -0,0 +1,9 @@ +CREATE SINK bhv_es_sink +FROM + bhv_mv WITH ( + connector = 'elasticsearch', + index = 'test', + url = 'http://elasticsearch8:9200', + username = 'elastic', + password = 'risingwave' +); \ No newline at end of file diff --git a/integration_tests/elasticsearch-sink/create_mv.sql b/integration_tests/elasticsearch-sink/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/elasticsearch-sink/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/elasticsearch-sink/create_source.sql b/integration_tests/elasticsearch-sink/create_source.sql new file mode 100644 index 0000000000000..c28c10f3616da --- /dev/null +++ b/integration_tests/elasticsearch-sink/create_source.sql @@ -0,0 +1,18 @@ +CREATE table user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = '1', + fields.user_id.end = '1000', + fields.user_name.kind = 'random', + fields.user_name.length = '10', + datagen.rows.per.second = '10' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/elasticsearch-sink/docker-compose.yml b/integration_tests/elasticsearch-sink/docker-compose.yml new file mode 100644 index 0000000000000..47d314d1f57e2 --- /dev/null +++ b/integration_tests/elasticsearch-sink/docker-compose.yml @@ -0,0 +1,73 @@ +--- +version: "3" +services: + elasticsearch7: + image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0 + environment: + - xpack.security.enabled=true + - discovery.type=single-node + - ELASTIC_PASSWORD=risingwave + ports: + - 9200:9200 + profiles: + - es7 + elasticsearch8: + image: docker.elastic.co/elasticsearch/elasticsearch:8.10.0 + environment: + - xpack.security.enabled=true + - discovery.type=single-node + - ELASTIC_PASSWORD=risingwave + ports: + - 9200:9200 + profiles: + - es8 + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + connector-node: + extends: + file: ../../docker/docker-compose.yml + service: connector-node + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose diff --git a/java/connector-node/python-client/integration_tests.py b/java/connector-node/python-client/integration_tests.py index 962f2a658018a..64fa949f48ce2 100644 --- a/java/connector-node/python-client/integration_tests.py +++ b/java/connector-node/python-client/integration_tests.py @@ -272,7 +272,7 @@ def test_jdbc_sink(input_file, param): def test_elasticsearch_sink(param): prop = { - "connector": "elasticsearch-7", + "connector": "elasticsearch", "url": "http://127.0.0.1:9200", "index": "test", } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java index ab3ac84346fa6..944d529a02d8d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java @@ -41,8 +41,8 @@ public static SinkFactory getSinkFactory(String sinkName) { return new IcebergSinkFactory(); case "deltalake": return new DeltaLakeSinkFactory(); - case "elasticsearch-7": - return new EsSink7Factory(); + case "elasticsearch": + return new EsSinkFactory(); case "cassandra": return new CassandraFactory(); default: diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSink7Test.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java similarity index 94% rename from java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSink7Test.java rename to java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index e3024ff09b26e..af0ea7190f946 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSink7Test.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -19,8 +19,8 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.risingwave.connector.EsSink7; -import com.risingwave.connector.EsSink7Config; +import com.risingwave.connector.EsSink; +import com.risingwave.connector.EsSinkConfig; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.ArraySinkRow; import com.risingwave.proto.Data; @@ -39,7 +39,7 @@ import org.junit.Test; import org.testcontainers.elasticsearch.ElasticsearchContainer; -public class EsSink7Test { +public class EsSinkTest { static TableSchema getTestTableSchema() { return new TableSchema( @@ -52,9 +52,9 @@ static TableSchema getTestTableSchema() { public void testEsSink(ElasticsearchContainer container, String username, String password) throws IOException { - EsSink7 sink = - new EsSink7( - new EsSink7Config(container.getHttpHostAddress(), "test") + EsSink sink = + new EsSink( + new EsSinkConfig(container.getHttpHostAddress(), "test") .withDelimiter("$") .withUsername(username) .withPassword(password), diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink7.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java similarity index 89% rename from java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink7.java rename to java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 44f1610ceaaba..7c1727f4a82f3 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink7.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -36,6 +36,7 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.RestHighLevelClientBuilder; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; @@ -55,17 +56,17 @@ * * 4. bulkprocessor and high-level-client are deprecated in es 8 java api. */ -public class EsSink7 extends SinkWriterBase { - private static final Logger LOG = LoggerFactory.getLogger(EsSink7.class); +public class EsSink extends SinkWriterBase { + private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s"; - private final EsSink7Config config; + private final EsSinkConfig config; private final BulkProcessor bulkProcessor; private final RestHighLevelClient client; // For bulk listener private final List primaryKeyIndexes; - public EsSink7(EsSink7Config config, TableSchema tableSchema) { + public EsSink(EsSinkConfig config, TableSchema tableSchema) { super(tableSchema); HttpHost host; try { @@ -75,9 +76,14 @@ public EsSink7(EsSink7Config config, TableSchema tableSchema) { } this.config = config; + + // ApiCompatibilityMode is enabled to ensure the client can talk to newer version es sever. this.client = - new RestHighLevelClient( - configureRestClientBuilder(RestClient.builder(host), config)); + new RestHighLevelClientBuilder( + configureRestClientBuilder(RestClient.builder(host), config) + .build()) + .setApiCompatibilityMode(true) + .build(); // Test connection try { boolean isConnected = this.client.ping(RequestOptions.DEFAULT); @@ -98,7 +104,7 @@ public EsSink7(EsSink7Config config, TableSchema tableSchema) { } private static RestClientBuilder configureRestClientBuilder( - RestClientBuilder builder, EsSink7Config config) { + RestClientBuilder builder, EsSinkConfig config) { // Possible config: // 1. Connection path prefix // 2. Username and password @@ -116,7 +122,7 @@ private static RestClientBuilder configureRestClientBuilder( } private BulkProcessor.Builder applyBulkConfig( - RestHighLevelClient client, EsSink7Config config, BulkProcessor.Listener listener) { + RestHighLevelClient client, EsSinkConfig config, BulkProcessor.Listener listener) { BulkProcessor.Builder builder = BulkProcessor.builder( (BulkRequestConsumerFactory) @@ -181,7 +187,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) private Map buildDoc(SinkRow row) { Map doc = new HashMap(); for (int i = 0; i < getTableSchema().getNumColumns(); i++) { - doc.put(getTableSchema().getColumnDesc(i).getName(), row.get(i)); + Object col = row.get(i); + if (col instanceof Date) { + // es client doesn't natively support java.sql.Timestamp/Time/Date + // so we need to convert Date type into a string as suggested in + // https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292 + col = col.toString(); + } + doc.put(getTableSchema().getColumnDesc(i).getName(), col); } return doc; } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink7Config.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java similarity index 87% rename from java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink7Config.java rename to java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java index 6d45b3ee52d87..e053dfed77b63 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink7Config.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java @@ -20,7 +20,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.risingwave.connector.api.sink.CommonSinkConfig; -public class EsSink7Config extends CommonSinkConfig { +public class EsSinkConfig extends CommonSinkConfig { /** Required */ private String url; @@ -38,7 +38,7 @@ public class EsSink7Config extends CommonSinkConfig { private String password; @JsonCreator - public EsSink7Config( + public EsSinkConfig( @JsonProperty(value = "url") String url, @JsonProperty(value = "index") String index) { this.url = url; this.index = index; @@ -64,17 +64,17 @@ public String getPassword() { return password; } - public EsSink7Config withDelimiter(String delimiter) { + public EsSinkConfig withDelimiter(String delimiter) { this.delimiter = delimiter; return this; } - public EsSink7Config withUsername(String username) { + public EsSinkConfig withUsername(String username) { this.username = username; return this; } - public EsSink7Config withPassword(String password) { + public EsSinkConfig withPassword(String password) { this.password = password; return this; } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink7Factory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java similarity index 91% rename from java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink7Factory.java rename to java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index 4bc6fa6cc1990..a31826a45a5ab 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink7Factory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -36,13 +36,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class EsSink7Factory implements SinkFactory { - private static final Logger LOG = LoggerFactory.getLogger(EsSink7Factory.class); +public class EsSinkFactory implements SinkFactory { + private static final Logger LOG = LoggerFactory.getLogger(EsSinkFactory.class); public SinkWriter createWriter(TableSchema tableSchema, Map tableProperties) { ObjectMapper mapper = new ObjectMapper(); - EsSink7Config config = mapper.convertValue(tableProperties, EsSink7Config.class); - return new SinkWriterV1.Adapter(new EsSink7(config, tableSchema)); + EsSinkConfig config = mapper.convertValue(tableProperties, EsSinkConfig.class); + return new SinkWriterV1.Adapter(new EsSink(config, tableSchema)); } @Override @@ -52,7 +52,7 @@ public void validate( Catalog.SinkType sinkType) { ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true); - EsSink7Config config = mapper.convertValue(tableProperties, EsSink7Config.class); + EsSinkConfig config = mapper.convertValue(tableProperties, EsSinkConfig.class); // 1. check url HttpHost host; diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 046d29bc687b5..851c81b8916d0 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -52,7 +52,7 @@ pub const VALID_REMOTE_SINKS: [&str; 5] = [ "jdbc", REMOTE_ICEBERG_SINK, "deltalake", - "elasticsearch-7", + "elasticsearch", "cassandra", ];