From 64aed9108e9ec442bfaa1f04b3f6620eec66d2f6 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Fri, 29 Dec 2023 22:22:17 +0800 Subject: [PATCH] fix(sink): Set default es.type is '_doc', And fix pk bug (#14273) --- ci/scripts/e2e-elasticsearch-sink-test.sh | 2 ++ .../sink/elasticsearch/elasticsearch_sink.result | 2 +- e2e_test/sink/elasticsearch/elasticsearch_sink.slt | 14 ++++++++++++++ .../elasticsearch_with_pk_sink.result | 1 + .../main/java/com/risingwave/connector/EsSink.java | 6 +++--- 5 files changed, 21 insertions(+), 4 deletions(-) create mode 100644 e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result diff --git a/ci/scripts/e2e-elasticsearch-sink-test.sh b/ci/scripts/e2e-elasticsearch-sink-test.sh index 29913a7509db8..029cddbf91a90 100644 --- a/ci/scripts/e2e-elasticsearch-sink-test.sh +++ b/ci/scripts/e2e-elasticsearch-sink-test.sh @@ -13,7 +13,9 @@ sleep 5 echo "--- checking elasticsearch sink result" curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_sink.tmp.result +curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test1/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.tmp.result python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_sink.result e2e_test/sink/elasticsearch/elasticsearch_sink.tmp.result +python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.tmp.result if [ $? -ne 0 ]; then echo "The output is not as expected." exit 1 diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.result b/e2e_test/sink/elasticsearch/elasticsearch_sink.result index 6c861feecca76..7f832258c9595 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.result +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.result @@ -1 +1 @@ -{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test","_type":"doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test","_type":"doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test","_type":"doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}} +{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"_doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test","_type":"_doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}} diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt index fd496283e5f1e..049d111713d6f 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt @@ -18,6 +18,17 @@ CREATE SINK s7 from t7 WITH ( password = 'risingwave' ); +statement ok +CREATE SINK s8 from t7 WITH ( + connector = 'elasticsearch', + index = 'test1', + primary_key = 'v1,v3', + url = 'http://elasticsearch:9200', + username = 'elastic', + password = 'risingwave', + delimiter = '_' +); + statement ok INSERT INTO t7 VALUES (1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'), @@ -39,5 +50,8 @@ FLUSH; statement ok DROP SINK s7; +statement ok +DROP SINK s8; + statement ok DROP TABLE t7; diff --git a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result new file mode 100644 index 0000000000000..12ac1c8370376 --- /dev/null +++ b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result @@ -0,0 +1 @@ +{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}} diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 3e69fa184baea..25a1c60f34d80 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -388,7 +388,7 @@ private String buildId(SinkRow row) { } else { List keys = primaryKeyIndexes.stream() - .map(index -> row.get(primaryKeyIndexes.get(index)).toString()) + .map(index -> row.get(index).toString()) .collect(Collectors.toList()); id = String.join(config.getDelimiter(), keys); } @@ -400,14 +400,14 @@ private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcess final String key = buildId(row); UpdateRequest updateRequest = - new UpdateRequest(config.getIndex(), "doc", key).doc(doc).upsert(doc); + new UpdateRequest(config.getIndex(), "_doc", key).doc(doc).upsert(doc); this.requestTracker.addWriteTask(); bulkProcessor.add(updateRequest); } private void processDelete(SinkRow row) { final String key = buildId(row); - DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "doc", key); + DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key); this.requestTracker.addWriteTask(); bulkProcessor.add(deleteRequest); }