Skip to content

Commit

Permalink
fix(sink): Set default es.type is '_doc', And fix pk bug (#14273)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Dec 29, 2023
1 parent 9ec5812 commit 64aed91
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 4 deletions.
2 changes: 2 additions & 0 deletions ci/scripts/e2e-elasticsearch-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ sleep 5

echo "--- checking elasticsearch sink result"
curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_sink.tmp.result
curl -XGET -u elastic:risingwave "http://elasticsearch:9200/test1/_search" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}}' > ./e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.tmp.result
python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_sink.result e2e_test/sink/elasticsearch/elasticsearch_sink.tmp.result
python3 e2e_test/sink/elasticsearch/elasticsearch.py e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.tmp.result
if [ $? -ne 0 ]; then
echo "The output is not as expected."
exit 1
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/sink/elasticsearch/elasticsearch_sink.result
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test","_type":"doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test","_type":"doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test","_type":"doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}}
{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"_doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test","_type":"_doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}}
14 changes: 14 additions & 0 deletions e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ CREATE SINK s7 from t7 WITH (
password = 'risingwave'
);

statement ok
CREATE SINK s8 from t7 WITH (
connector = 'elasticsearch',
index = 'test1',
primary_key = 'v1,v3',
url = 'http://elasticsearch:9200',
username = 'elastic',
password = 'risingwave',
delimiter = '_'
);

statement ok
INSERT INTO t7 VALUES
(1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
Expand All @@ -39,5 +50,8 @@ FLUSH;
statement ok
DROP SINK s7;

statement ok
DROP SINK s8;

statement ok
DROP TABLE t7;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}}
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ private String buildId(SinkRow row) {
} else {
List<String> keys =
primaryKeyIndexes.stream()
.map(index -> row.get(primaryKeyIndexes.get(index)).toString())
.map(index -> row.get(index).toString())
.collect(Collectors.toList());
id = String.join(config.getDelimiter(), keys);
}
Expand All @@ -400,14 +400,14 @@ private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcess
final String key = buildId(row);

UpdateRequest updateRequest =
new UpdateRequest(config.getIndex(), "doc", key).doc(doc).upsert(doc);
new UpdateRequest(config.getIndex(), "_doc", key).doc(doc).upsert(doc);
this.requestTracker.addWriteTask();
bulkProcessor.add(updateRequest);
}

private void processDelete(SinkRow row) {
final String key = buildId(row);
DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "doc", key);
DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key);
this.requestTracker.addWriteTask();
bulkProcessor.add(deleteRequest);
}
Expand Down

0 comments on commit 64aed91

Please sign in to comment.