Skip to content

Commit

Permalink
feat(sink): support es sink struct and refactor es sink (#14231)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored and Li0k committed Jan 10, 2024
1 parent f3eafad commit 5bf6c46
Show file tree
Hide file tree
Showing 19 changed files with 343 additions and 206 deletions.
2 changes: 1 addition & 1 deletion e2e_test/sink/elasticsearch/elasticsearch_sink.result
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"_doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test","_type":"_doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}}
{"took":34,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"_doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test","_type":"_doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test","_type":"_doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test","_type":"_doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test","_type":"_doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test","_type":"_doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}}
17 changes: 9 additions & 8 deletions e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ CREATE TABLE t7 (
d date,
t time,
ts timestamp,
tz timestamptz
tz timestamptz,
st struct<st1 int, st2 int>,
);

statement ok
Expand All @@ -31,18 +32,18 @@ CREATE SINK s8 from t7 WITH (

statement ok
INSERT INTO t7 VALUES
(1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(2, 2, '2-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(3, 2, '3-2', '1970-01-01', '00:00:00.123456', '1970-01-01 00:00:00.123456', '1970-01-01 00:00:00.123456Z'),
(5, 2, '5-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(8, 2, '8-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(13, 2, '13-2', '1970-01-01', '20:00:00.123456', '1970-01-01 20:00:00.123456', '1970-01-01 20:00:00.123456Z');
(1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(1,1)),
(2, 2, '2-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(2,2)),
(3, 2, '3-2', '1970-01-01', '00:00:00.123456', '1970-01-01 00:00:00.123456', '1970-01-01 00:00:00.123456Z',(3,3)),
(5, 2, '5-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(5,5)),
(8, 2, '8-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(8,8)),
(13, 2, '13-2', '1970-01-01', '20:00:00.123456', '1970-01-01 20:00:00.123456', '1970-01-01 20:00:00.123456Z',(13,13));

statement ok
FLUSH;

statement ok
INSERT INTO t7 VALUES (1, 50, '1-50', '2000-01-01', '00:00:00.123456', '2000-01-01 00:00:00.123456', '2000-01-01 00:00:00.123456Z');
INSERT INTO t7 VALUES (1, 50, '1-50', '2000-01-01', '00:00:00.123456', '2000-01-01 00:00:00.123456', '2000-01-01 00:00:00.123456Z',(1,1));

statement ok
FLUSH;
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}}
{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
.asRuntimeException();
}
sinkId = sinkTask.getStart().getSinkParam().getSinkId();
bindSink(sinkTask.getStart().getSinkParam(), sinkTask.getStart().getFormat());
bindSink(sinkTask.getStart());
currentEpoch = null;
currentBatchId = null;
epochStarted = false;
Expand Down Expand Up @@ -200,14 +200,13 @@ private void cleanup() {
ConnectorNodeMetrics.decActiveSinkConnections(connectorName, "node1");
}

private void bindSink(
ConnectorServiceProto.SinkParam sinkParam,
ConnectorServiceProto.SinkPayloadFormat format) {
tableSchema = TableSchema.fromProto(sinkParam.getTableSchema());
private void bindSink(ConnectorServiceProto.SinkWriterStreamRequest.StartSink startSink) {
var sinkParam = startSink.getSinkParam();
tableSchema = TableSchema.fromProto(startSink.getPayloadSchema());
String connectorName = getConnectorName(sinkParam);
SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName);
sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap());
switch (format) {
switch (startSink.getFormat()) {
case FORMAT_UNSPECIFIED:
case UNRECOGNIZED:
throw INVALID_ARGUMENT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public void testEsSink(ElasticsearchContainer container, String username, String
getTestTableSchema());
sink.write(
Iterators.forArray(
new ArraySinkRow(Op.INSERT, 1, "Alice"),
new ArraySinkRow(Op.INSERT, 2, "Bob")));
new ArraySinkRow(Op.INSERT, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"),
new ArraySinkRow(Op.INSERT, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}")));
sink.sync();
// container is slow here, but our default flush time is 5s,
// so 3s is enough for sync test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriterBase;
Expand All @@ -27,7 +25,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
Expand All @@ -47,6 +44,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -67,21 +65,13 @@ public class EsSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private static final String ERROR_REPORT_TEMPLATE = "Error message %s";

private static final TimeZone UTCTimeZone = TimeZone.getTimeZone("UTC");
private final SimpleDateFormat tDfm;
private final SimpleDateFormat tsDfm;
private final SimpleDateFormat tstzDfm;

private final EsSinkConfig config;
private BulkProcessor bulkProcessor;
private final RestHighLevelClient client;

// Used to handle the return message of ES and throw errors
private final RequestTracker requestTracker;

// For bulk listener
private final List<Integer> primaryKeyIndexes;

class RequestTracker {
// Used to save the return results of es asynchronous writes. The capacity is Integer.Max
private final BlockingQueue<EsWriteResultResp> blockingQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -196,15 +186,6 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
}
this.bulkProcessor = createBulkProcessor(this.requestTracker);

primaryKeyIndexes = new ArrayList<Integer>();
for (String primaryKey : tableSchema.getPrimaryKeys()) {
primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey));
}

tDfm = createSimpleDateFormat("HH:mm:ss.SSS", UTCTimeZone);
tsDfm = createSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", UTCTimeZone);
tstzDfm = createSimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", UTCTimeZone);
}

private static RestClientBuilder configureRestClientBuilder(
Expand Down Expand Up @@ -297,116 +278,20 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}

/**
* The api accepts doc in map form.
*
* @param row
* @return Map from Field name to Value
* @throws JsonProcessingException
* @throws JsonMappingException
*/
private Map<String, Object> buildDoc(SinkRow row)
throws JsonMappingException, JsonProcessingException {
Map<String, Object> doc = new HashMap();
var tableSchema = getTableSchema();
var columnDescs = tableSchema.getColumnDescs();
for (int i = 0; i < row.size(); i++) {
var type = columnDescs.get(i).getDataType().getTypeName();
Object col = row.get(i);
switch (type) {
// es client doesn't natively support java.sql.Timestamp/Time/Date
// so we need to convert Date/Time/Timestamp type into a string as suggested in
// https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292
case DATE:
col = col.toString();
break;
// construct java.sql.Time/Timestamp with milliseconds time value.
// it will use system timezone by default, so we have to set timezone manually
case TIME:
col = tDfm.format(col);
break;
case TIMESTAMP:
col = tsDfm.format(col);
break;
case TIMESTAMPTZ:
col = tstzDfm.format(col);
break;
case JSONB:
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree((String) col);
col = convertJsonNode(jsonNode);
break;
default:
break;
}

doc.put(getTableSchema().getColumnDesc(i).getName(), col);
}
return doc;
}

private static Object convertJsonNode(JsonNode jsonNode) {
if (jsonNode.isObject()) {
Map<String, Object> resultMap = new HashMap<>();
jsonNode.fields()
.forEachRemaining(
entry -> {
resultMap.put(entry.getKey(), convertJsonNode(entry.getValue()));
});
return resultMap;
} else if (jsonNode.isArray()) {
List<Object> resultList = new ArrayList<>();
jsonNode.elements()
.forEachRemaining(
element -> {
resultList.add(convertJsonNode(element));
});
return resultList;
} else if (jsonNode.isNumber()) {
return jsonNode.numberValue();
} else if (jsonNode.isTextual()) {
return jsonNode.textValue();
} else if (jsonNode.isBoolean()) {
return jsonNode.booleanValue();
} else if (jsonNode.isNull()) {
return null;
} else {
throw new IllegalArgumentException("Unsupported JSON type");
}
}

/**
* use primary keys as id concatenated by a specific delimiter.
*
* @param row
* @return
*/
private String buildId(SinkRow row) {
String id;
if (primaryKeyIndexes.isEmpty()) {
id = row.get(0).toString();
} else {
List<String> keys =
primaryKeyIndexes.stream()
.map(index -> row.get(index).toString())
.collect(Collectors.toList());
id = String.join(config.getDelimiter(), keys);
}
return id;
}

private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException {
Map<String, Object> doc = buildDoc(row);
final String key = buildId(row);
final String key = (String) row.get(0);
String doc = (String) row.get(1);

UpdateRequest updateRequest =
new UpdateRequest(config.getIndex(), "_doc", key).doc(doc).upsert(doc);
new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
bulkProcessor.add(updateRequest);
}

private void processDelete(SinkRow row) {
final String key = buildId(row);
private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException {
final String key = (String) row.get(0);

DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key);
this.requestTracker.addWriteTask();
bulkProcessor.add(deleteRequest);
Expand Down
1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ message SinkWriterStreamRequest {
message StartSink {
SinkParam sink_param = 1;
SinkPayloadFormat format = 2;
TableSchema payload_schema = 3;
}

message WriteBatch {
Expand Down
10 changes: 8 additions & 2 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use url::Url;
use with_options::WithOptions;
use yup_oauth2::ServiceAccountKey;

use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use super::encoder::{
DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
TimestamptzHandlingMode,
};
use super::writer::LogSinkerOf;
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::aws_utils::load_file_descriptor_from_s3;
Expand Down Expand Up @@ -309,10 +312,13 @@ impl BigQuerySinkWriter {
client,
is_append_only,
insert_request: TableDataInsertAllRequest::new(),
row_encoder: JsonEncoder::new_with_big_query(
row_encoder: JsonEncoder::new(
schema,
None,
DateHandlingMode::String,
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
),
})
}
Expand Down
Loading

0 comments on commit 5bf6c46

Please sign in to comment.