Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support es sink struct and refactor es sink #14231

Merged
merged 17 commits into from
Jan 4, 2024
2 changes: 1 addition & 1 deletion e2e_test/sink/elasticsearch/elasticsearch_sink.result
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"_doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test","_type":"_doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}}
{"took":34,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}}
17 changes: 9 additions & 8 deletions e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ CREATE TABLE t7 (
d date,
t time,
ts timestamp,
tz timestamptz
tz timestamptz,
st struct<st1 int, st2 int>,
);

statement ok
Expand All @@ -31,18 +32,18 @@ CREATE SINK s8 from t7 WITH (

statement ok
INSERT INTO t7 VALUES
(1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(2, 2, '2-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(3, 2, '3-2', '1970-01-01', '00:00:00.123456', '1970-01-01 00:00:00.123456', '1970-01-01 00:00:00.123456Z'),
(5, 2, '5-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(8, 2, '8-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(13, 2, '13-2', '1970-01-01', '20:00:00.123456', '1970-01-01 20:00:00.123456', '1970-01-01 20:00:00.123456Z');
(1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(1,1)),
(2, 2, '2-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(2,2)),
(3, 2, '3-2', '1970-01-01', '00:00:00.123456', '1970-01-01 00:00:00.123456', '1970-01-01 00:00:00.123456Z',(3,3)),
(5, 2, '5-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(5,5)),
(8, 2, '8-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(8,8)),
(13, 2, '13-2', '1970-01-01', '20:00:00.123456', '1970-01-01 20:00:00.123456', '1970-01-01 20:00:00.123456Z',(13,13));

statement ok
FLUSH;

statement ok
INSERT INTO t7 VALUES (1, 50, '1-50', '2000-01-01', '00:00:00.123456', '2000-01-01 00:00:00.123456', '2000-01-01 00:00:00.123456Z');
INSERT INTO t7 VALUES (1, 50, '1-50', '2000-01-01', '00:00:00.123456', '2000-01-01 00:00:00.123456', '2000-01-01 00:00:00.123456Z',(1,1));

statement ok
FLUSH;
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}}
{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test1","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test1","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test1","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test1","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test1","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
.asRuntimeException();
}
sinkId = sinkTask.getStart().getSinkParam().getSinkId();
bindSink(sinkTask.getStart().getSinkParam(), sinkTask.getStart().getFormat());
bindSink(sinkTask.getStart());
currentEpoch = null;
currentBatchId = null;
epochStarted = false;
Expand Down Expand Up @@ -200,14 +200,13 @@ private void cleanup() {
ConnectorNodeMetrics.decActiveSinkConnections(connectorName, "node1");
}

private void bindSink(
ConnectorServiceProto.SinkParam sinkParam,
ConnectorServiceProto.SinkPayloadFormat format) {
tableSchema = TableSchema.fromProto(sinkParam.getTableSchema());
private void bindSink(ConnectorServiceProto.SinkWriterStreamRequest.StartSink startSink) {
var sinkParam = startSink.getSinkParam();
tableSchema = TableSchema.fromProto(startSink.getPayloadSchema());
String connectorName = getConnectorName(sinkParam);
SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName);
sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap());
switch (format) {
switch (startSink.getFormat()) {
case FORMAT_UNSPECIFIED:
case UNRECOGNIZED:
throw INVALID_ARGUMENT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public void testEsSink(ElasticsearchContainer container, String username, String
getTestTableSchema());
sink.write(
Iterators.forArray(
new ArraySinkRow(Op.INSERT, 1, "Alice"),
new ArraySinkRow(Op.INSERT, 2, "Bob")));
new ArraySinkRow(Op.INSERT, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"),
new ArraySinkRow(Op.INSERT, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}")));
sink.sync();
// container is slow here, but our default flush time is 5s,
// so 3s is enough for sync test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriterBase;
Expand All @@ -27,7 +25,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
Expand All @@ -47,6 +44,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -67,21 +65,13 @@ public class EsSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private static final String ERROR_REPORT_TEMPLATE = "Error message %s";

private static final TimeZone UTCTimeZone = TimeZone.getTimeZone("UTC");
private final SimpleDateFormat tDfm;
private final SimpleDateFormat tsDfm;
private final SimpleDateFormat tstzDfm;

private final EsSinkConfig config;
private BulkProcessor bulkProcessor;
private final RestHighLevelClient client;

// Used to handle the return message of ES and throw errors
private final RequestTracker requestTracker;

// For bulk listener
private final List<Integer> primaryKeyIndexes;

class RequestTracker {
// Used to save the return results of es asynchronous writes. The capacity is Integer.Max
private final BlockingQueue<EsWriteResultResp> blockingQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -196,15 +186,6 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException();
}
this.bulkProcessor = createBulkProcessor(this.requestTracker);

primaryKeyIndexes = new ArrayList<Integer>();
for (String primaryKey : tableSchema.getPrimaryKeys()) {
primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey));
}

tDfm = createSimpleDateFormat("HH:mm:ss.SSS", UTCTimeZone);
tsDfm = createSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", UTCTimeZone);
tstzDfm = createSimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", UTCTimeZone);
}

private static RestClientBuilder configureRestClientBuilder(
Expand Down Expand Up @@ -297,116 +278,20 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}

/**
* The api accepts doc in map form.
*
* @param row
* @return Map from Field name to Value
* @throws JsonProcessingException
* @throws JsonMappingException
*/
private Map<String, Object> buildDoc(SinkRow row)
throws JsonMappingException, JsonProcessingException {
Map<String, Object> doc = new HashMap();
var tableSchema = getTableSchema();
var columnDescs = tableSchema.getColumnDescs();
for (int i = 0; i < row.size(); i++) {
var type = columnDescs.get(i).getDataType().getTypeName();
Object col = row.get(i);
switch (type) {
// es client doesn't natively support java.sql.Timestamp/Time/Date
// so we need to convert Date/Time/Timestamp type into a string as suggested in
// https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292
case DATE:
col = col.toString();
break;
// construct java.sql.Time/Timestamp with milliseconds time value.
// it will use system timezone by default, so we have to set timezone manually
case TIME:
col = tDfm.format(col);
break;
case TIMESTAMP:
col = tsDfm.format(col);
break;
case TIMESTAMPTZ:
col = tstzDfm.format(col);
break;
case JSONB:
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree((String) col);
col = convertJsonNode(jsonNode);
break;
default:
break;
}

doc.put(getTableSchema().getColumnDesc(i).getName(), col);
}
return doc;
}

private static Object convertJsonNode(JsonNode jsonNode) {
if (jsonNode.isObject()) {
Map<String, Object> resultMap = new HashMap<>();
jsonNode.fields()
.forEachRemaining(
entry -> {
resultMap.put(entry.getKey(), convertJsonNode(entry.getValue()));
});
return resultMap;
} else if (jsonNode.isArray()) {
List<Object> resultList = new ArrayList<>();
jsonNode.elements()
.forEachRemaining(
element -> {
resultList.add(convertJsonNode(element));
});
return resultList;
} else if (jsonNode.isNumber()) {
return jsonNode.numberValue();
} else if (jsonNode.isTextual()) {
return jsonNode.textValue();
} else if (jsonNode.isBoolean()) {
return jsonNode.booleanValue();
} else if (jsonNode.isNull()) {
return null;
} else {
throw new IllegalArgumentException("Unsupported JSON type");
}
}

/**
* use primary keys as id concatenated by a specific delimiter.
*
* @param row
* @return
*/
private String buildId(SinkRow row) {
String id;
if (primaryKeyIndexes.isEmpty()) {
id = row.get(0).toString();
} else {
List<String> keys =
primaryKeyIndexes.stream()
.map(index -> row.get(index).toString())
.collect(Collectors.toList());
id = String.join(config.getDelimiter(), keys);
}
return id;
}

private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException {
Map<String, Object> doc = buildDoc(row);
final String key = buildId(row);
final String key = (String) row.get(0);
String doc = (String) row.get(1);
Comment on lines +282 to +283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern about the forward compatibility


UpdateRequest updateRequest =
new UpdateRequest(config.getIndex(), "_doc", key).doc(doc).upsert(doc);
new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON);
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
bulkProcessor.add(updateRequest);
}

private void processDelete(SinkRow row) {
final String key = buildId(row);
private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException {
final String key = (String) row.get(0);

DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key);
this.requestTracker.addWriteTask();
bulkProcessor.add(deleteRequest);
Expand Down
1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ message SinkWriterStreamRequest {
message StartSink {
SinkParam sink_param = 1;
SinkPayloadFormat format = 2;
TableSchema payload_schema = 3;
}

message WriteBatch {
Expand Down
10 changes: 8 additions & 2 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use url::Url;
use with_options::WithOptions;
use yup_oauth2::ServiceAccountKey;

use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use super::encoder::{
DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
TimestamptzHandlingMode,
};
use super::writer::LogSinkerOf;
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::aws_utils::load_file_descriptor_from_s3;
Expand Down Expand Up @@ -309,10 +312,13 @@ impl BigQuerySinkWriter {
client,
is_append_only,
insert_request: TableDataInsertAllRequest::new(),
row_encoder: JsonEncoder::new_with_big_query(
row_encoder: JsonEncoder::new(
schema,
None,
DateHandlingMode::String,
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
),
})
}
Expand Down
Loading
Loading