From 5bf6c462d36ee312b1ba0edeed06a6586cf3352e Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Thu, 4 Jan 2024 16:30:52 +0800 Subject: [PATCH] feat(sink): support es sink struct and refactor es sink (#14231) --- .../elasticsearch/elasticsearch_sink.result | 2 +- .../sink/elasticsearch/elasticsearch_sink.slt | 17 ++- .../elasticsearch_with_pk_sink.result | 2 +- .../connector/SinkWriterStreamObserver.java | 11 +- .../sink/elasticsearch/EsSinkTest.java | 4 +- .../java/com/risingwave/connector/EsSink.java | 131 +---------------- proto/connector_service.proto | 1 + src/connector/src/sink/big_query.rs | 10 +- src/connector/src/sink/elasticsearch.rs | 138 ++++++++++++++++++ src/connector/src/sink/encoder/json.rs | 81 ++++++---- src/connector/src/sink/encoder/mod.rs | 10 +- .../src/sink/formatter/debezium_json.rs | 6 +- src/connector/src/sink/formatter/mod.rs | 8 +- src/connector/src/sink/kafka.rs | 4 +- src/connector/src/sink/mod.rs | 1 + src/connector/src/sink/nats.rs | 3 +- src/connector/src/sink/remote.rs | 111 ++++++++++---- src/rpc_client/src/connector_client.rs | 6 +- src/stream/src/task/env.rs | 3 +- 19 files changed, 343 insertions(+), 206 deletions(-) create mode 100644 src/connector/src/sink/elasticsearch.rs diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.result b/e2e_test/sink/elasticsearch/elasticsearch_sink.result index 7f832258c959..b67b65495804 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.result +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.result @@ -1 +1 @@ -{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"_doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test","_type":"_doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}} +{"took":34,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"_doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test","_type":"_doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test","_type":"_doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test","_type":"_doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test","_type":"_doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test","_type":"_doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt index 049d111713d6..70b7e911c0f1 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt @@ -6,7 +6,8 @@ CREATE TABLE t7 ( d date, t time, ts timestamp, - tz timestamptz + tz timestamptz, + st struct, ); statement ok @@ -31,18 +32,18 @@ CREATE SINK s8 from t7 WITH ( statement ok INSERT INTO t7 VALUES - (1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'), - (2, 2, '2-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'), - (3, 2, '3-2', '1970-01-01', '00:00:00.123456', '1970-01-01 00:00:00.123456', '1970-01-01 00:00:00.123456Z'), - (5, 2, '5-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'), - (8, 2, '8-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'), - (13, 2, '13-2', '1970-01-01', '20:00:00.123456', '1970-01-01 20:00:00.123456', '1970-01-01 20:00:00.123456Z'); + (1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(1,1)), + (2, 2, '2-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(2,2)), + (3, 2, '3-2', '1970-01-01', '00:00:00.123456', '1970-01-01 00:00:00.123456', '1970-01-01 00:00:00.123456Z',(3,3)), + (5, 2, '5-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(5,5)), + (8, 2, '8-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(8,8)), + (13, 2, '13-2', '1970-01-01', '20:00:00.123456', '1970-01-01 20:00:00.123456', '1970-01-01 20:00:00.123456Z',(13,13)); statement ok FLUSH; statement ok -INSERT INTO t7 VALUES (1, 50, '1-50', '2000-01-01', '00:00:00.123456', '2000-01-01 00:00:00.123456', '2000-01-01 00:00:00.123456Z'); +INSERT INTO t7 VALUES (1, 50, '1-50', '2000-01-01', '00:00:00.123456', '2000-01-01 00:00:00.123456', '2000-01-01 00:00:00.123456Z',(1,1)); statement ok FLUSH; diff --git a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result index 12ac1c837037..3e03f5a511c0 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result +++ b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result @@ -1 +1 @@ -{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}} +{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index a8b53f5c7e48..cd61da38d6cb 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -101,7 +101,7 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) { .asRuntimeException(); } sinkId = sinkTask.getStart().getSinkParam().getSinkId(); - bindSink(sinkTask.getStart().getSinkParam(), sinkTask.getStart().getFormat()); + bindSink(sinkTask.getStart()); currentEpoch = null; currentBatchId = null; epochStarted = false; @@ -200,14 +200,13 @@ private void cleanup() { ConnectorNodeMetrics.decActiveSinkConnections(connectorName, "node1"); } - private void bindSink( - ConnectorServiceProto.SinkParam sinkParam, - ConnectorServiceProto.SinkPayloadFormat format) { - tableSchema = TableSchema.fromProto(sinkParam.getTableSchema()); + private void bindSink(ConnectorServiceProto.SinkWriterStreamRequest.StartSink startSink) { + var sinkParam = startSink.getSinkParam(); + tableSchema = TableSchema.fromProto(startSink.getPayloadSchema()); String connectorName = getConnectorName(sinkParam); SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName); sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap()); - switch (format) { + switch (startSink.getFormat()) { case FORMAT_UNSPECIFIED: case UNRECOGNIZED: throw INVALID_ARGUMENT diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index 8a61c4e8fba0..e7e74de2af6f 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -61,8 +61,8 @@ public void testEsSink(ElasticsearchContainer container, String username, String getTestTableSchema()); sink.write( Iterators.forArray( - new ArraySinkRow(Op.INSERT, 1, "Alice"), - new ArraySinkRow(Op.INSERT, 2, "Bob"))); + new ArraySinkRow(Op.INSERT, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"), + new ArraySinkRow(Op.INSERT, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}"))); sink.sync(); // container is slow here, but our default flush time is 5s, // so 3s is enough for sync test diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index dd579aafb603..7a2e5742bb79 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -16,8 +16,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkRow; import com.risingwave.connector.api.sink.SinkWriterBase; @@ -27,7 +25,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -47,6 +44,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,11 +65,6 @@ public class EsSink extends SinkWriterBase { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); private static final String ERROR_REPORT_TEMPLATE = "Error message %s"; - private static final TimeZone UTCTimeZone = TimeZone.getTimeZone("UTC"); - private final SimpleDateFormat tDfm; - private final SimpleDateFormat tsDfm; - private final SimpleDateFormat tstzDfm; - private final EsSinkConfig config; private BulkProcessor bulkProcessor; private final RestHighLevelClient client; @@ -79,9 +72,6 @@ public class EsSink extends SinkWriterBase { // Used to handle the return message of ES and throw errors private final RequestTracker requestTracker; - // For bulk listener - private final List primaryKeyIndexes; - class RequestTracker { // Used to save the return results of es asynchronous writes. The capacity is Integer.Max private final BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); @@ -196,15 +186,6 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); } this.bulkProcessor = createBulkProcessor(this.requestTracker); - - primaryKeyIndexes = new ArrayList(); - for (String primaryKey : tableSchema.getPrimaryKeys()) { - primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey)); - } - - tDfm = createSimpleDateFormat("HH:mm:ss.SSS", UTCTimeZone); - tsDfm = createSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", UTCTimeZone); - tstzDfm = createSimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", UTCTimeZone); } private static RestClientBuilder configureRestClientBuilder( @@ -297,116 +278,20 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } } - /** - * The api accepts doc in map form. - * - * @param row - * @return Map from Field name to Value - * @throws JsonProcessingException - * @throws JsonMappingException - */ - private Map buildDoc(SinkRow row) - throws JsonMappingException, JsonProcessingException { - Map doc = new HashMap(); - var tableSchema = getTableSchema(); - var columnDescs = tableSchema.getColumnDescs(); - for (int i = 0; i < row.size(); i++) { - var type = columnDescs.get(i).getDataType().getTypeName(); - Object col = row.get(i); - switch (type) { - // es client doesn't natively support java.sql.Timestamp/Time/Date - // so we need to convert Date/Time/Timestamp type into a string as suggested in - // https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292 - case DATE: - col = col.toString(); - break; - // construct java.sql.Time/Timestamp with milliseconds time value. - // it will use system timezone by default, so we have to set timezone manually - case TIME: - col = tDfm.format(col); - break; - case TIMESTAMP: - col = tsDfm.format(col); - break; - case TIMESTAMPTZ: - col = tstzDfm.format(col); - break; - case JSONB: - ObjectMapper mapper = new ObjectMapper(); - JsonNode jsonNode = mapper.readTree((String) col); - col = convertJsonNode(jsonNode); - break; - default: - break; - } - - doc.put(getTableSchema().getColumnDesc(i).getName(), col); - } - return doc; - } - - private static Object convertJsonNode(JsonNode jsonNode) { - if (jsonNode.isObject()) { - Map resultMap = new HashMap<>(); - jsonNode.fields() - .forEachRemaining( - entry -> { - resultMap.put(entry.getKey(), convertJsonNode(entry.getValue())); - }); - return resultMap; - } else if (jsonNode.isArray()) { - List resultList = new ArrayList<>(); - jsonNode.elements() - .forEachRemaining( - element -> { - resultList.add(convertJsonNode(element)); - }); - return resultList; - } else if (jsonNode.isNumber()) { - return jsonNode.numberValue(); - } else if (jsonNode.isTextual()) { - return jsonNode.textValue(); - } else if (jsonNode.isBoolean()) { - return jsonNode.booleanValue(); - } else if (jsonNode.isNull()) { - return null; - } else { - throw new IllegalArgumentException("Unsupported JSON type"); - } - } - - /** - * use primary keys as id concatenated by a specific delimiter. - * - * @param row - * @return - */ - private String buildId(SinkRow row) { - String id; - if (primaryKeyIndexes.isEmpty()) { - id = row.get(0).toString(); - } else { - List keys = - primaryKeyIndexes.stream() - .map(index -> row.get(index).toString()) - .collect(Collectors.toList()); - id = String.join(config.getDelimiter(), keys); - } - return id; - } - private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException { - Map doc = buildDoc(row); - final String key = buildId(row); + final String key = (String) row.get(0); + String doc = (String) row.get(1); UpdateRequest updateRequest = - new UpdateRequest(config.getIndex(), "_doc", key).doc(doc).upsert(doc); + new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON); + updateRequest.docAsUpsert(true); this.requestTracker.addWriteTask(); bulkProcessor.add(updateRequest); } - private void processDelete(SinkRow row) { - final String key = buildId(row); + private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException { + final String key = (String) row.get(0); + DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key); this.requestTracker.addWriteTask(); bulkProcessor.add(deleteRequest); diff --git a/proto/connector_service.proto b/proto/connector_service.proto index ddda62c6aace..465af0d2a55a 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -39,6 +39,7 @@ message SinkWriterStreamRequest { message StartSink { SinkParam sink_param = 1; SinkPayloadFormat format = 2; + TableSchema payload_schema = 3; } message WriteBatch { diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 081b5814b01e..41cfe7783819 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -33,7 +33,10 @@ use url::Url; use with_options::WithOptions; use yup_oauth2::ServiceAccountKey; -use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use super::encoder::{ + DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, + TimestamptzHandlingMode, +}; use super::writer::LogSinkerOf; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::aws_utils::load_file_descriptor_from_s3; @@ -309,10 +312,13 @@ impl BigQuerySinkWriter { client, is_append_only, insert_request: TableDataInsertAllRequest::new(), - row_encoder: JsonEncoder::new_with_big_query( + row_encoder: JsonEncoder::new( schema, None, + DateHandlingMode::String, TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, ), }) } diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs new file mode 100644 index 000000000000..f6ddbeb7e35d --- /dev/null +++ b/src/connector/src/sink/elasticsearch.rs @@ -0,0 +1,138 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use anyhow::anyhow; +use risingwave_common::array::{ + ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder, +}; +use risingwave_common::catalog::Schema; +use risingwave_common::row::Row; +use risingwave_common::types::{JsonbVal, Scalar, ToText}; +use serde_json::Value; + +use super::encoder::{JsonEncoder, RowEncoder}; +use super::remote::ElasticSearchSink; +use crate::sink::{Result, Sink}; +pub const ES_OPTION_DELIMITER: &str = "delimiter"; + +pub enum StreamChunkConverter { + Es(EsStreamChunkConverter), + Other, +} +impl StreamChunkConverter { + pub fn new( + sink_name: &str, + schema: Schema, + pk_indices: &Vec, + properties: &HashMap, + ) -> Result { + if sink_name == ElasticSearchSink::SINK_NAME { + Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new( + schema, + pk_indices.clone(), + properties.get(ES_OPTION_DELIMITER).cloned(), + )?)) + } else { + Ok(StreamChunkConverter::Other) + } + } + + pub fn convert_chunk(&self, chunk: StreamChunk) -> Result { + match self { + StreamChunkConverter::Es(es) => es.convert_chunk(chunk), + StreamChunkConverter::Other => Ok(chunk), + } + } +} +pub struct EsStreamChunkConverter { + json_encoder: JsonEncoder, + fn_build_id: Box) -> Result + Send>, +} +impl EsStreamChunkConverter { + fn new(schema: Schema, pk_indices: Vec, delimiter: Option) -> Result { + let fn_build_id: Box) -> Result + Send> = if pk_indices.is_empty() + { + Box::new(|row: RowRef<'_>| { + Ok(row + .datum_at(0) + .ok_or_else(|| anyhow!("No value find in row, index is 0"))? + .to_text()) + }) + } else if pk_indices.len() == 1 { + let index = *pk_indices.get(0).unwrap(); + Box::new(move |row: RowRef<'_>| { + Ok(row + .datum_at(index) + .ok_or_else(|| anyhow!("No value find in row, index is 0"))? + .to_text()) + }) + } else { + let delimiter = delimiter + .as_ref() + .ok_or_else(|| anyhow!("Please set delimiter in with option"))? + .clone(); + Box::new(move |row: RowRef<'_>| { + let mut keys = vec![]; + for index in &pk_indices { + keys.push( + row.datum_at(*index) + .ok_or_else(|| anyhow!("No value find in row, index is {}", index))? + .to_text(), + ); + } + Ok(keys.join(&delimiter)) + }) + }; + let json_encoder = JsonEncoder::new_with_es(schema, None); + Ok(Self { + json_encoder, + fn_build_id, + }) + } + + fn convert_chunk(&self, chunk: StreamChunk) -> Result { + let mut ops = vec![]; + let mut id_string_builder = + ::new(chunk.capacity()); + let mut json_builder = + ::new(chunk.capacity()); + for (op, row) in chunk.rows() { + ops.push(op); + let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?)); + risingwave_common::array::ArrayBuilder::append( + &mut id_string_builder, + Some(&self.build_id(row)?), + ); + risingwave_common::array::ArrayBuilder::append( + &mut json_builder, + Some(json.as_scalar_ref()), + ); + } + let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder); + let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder); + Ok(StreamChunk::new( + ops, + vec![ + std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)), + std::sync::Arc::new(ArrayImpl::Jsonb(json_array)), + ], + )) + } + + fn build_id(&self, row: RowRef<'_>) -> Result { + (self.fn_build_id)(row) + } +} diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 473db379341b..b4d2de84c006 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -23,19 +23,20 @@ use itertools::Itertools; use risingwave_common::array::{ArrayError, ArrayResult}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::Row; -use risingwave_common::types::{DataType, DatumRef, Decimal, ScalarRefImpl, ToText}; +use risingwave_common::types::{DataType, DatumRef, Decimal, JsonbVal, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_json::{json, Map, Value}; use super::{ CustomJsonType, DateHandlingMode, KafkaConnectParams, KafkaConnectParamsRef, Result, - RowEncoder, SerTo, TimestampHandlingMode, TimestamptzHandlingMode, + RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, }; use crate::sink::SinkError; pub struct JsonEncoder { schema: Schema, col_indices: Option>, + time_handling_mode: TimeHandlingMode, date_handling_mode: DateHandlingMode, timestamp_handling_mode: TimestampHandlingMode, timestamptz_handling_mode: TimestamptzHandlingMode, @@ -50,10 +51,12 @@ impl JsonEncoder { date_handling_mode: DateHandlingMode, timestamp_handling_mode: TimestampHandlingMode, timestamptz_handling_mode: TimestamptzHandlingMode, + time_handling_mode: TimeHandlingMode, ) -> Self { Self { schema, col_indices, + time_handling_mode, date_handling_mode, timestamp_handling_mode, timestamptz_handling_mode, @@ -62,6 +65,19 @@ impl JsonEncoder { } } + pub fn new_with_es(schema: Schema, col_indices: Option>) -> Self { + Self { + schema, + col_indices, + time_handling_mode: TimeHandlingMode::String, + date_handling_mode: DateHandlingMode::String, + timestamp_handling_mode: TimestampHandlingMode::String, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, + custom_json_type: CustomJsonType::Es, + kafka_connect: None, + } + } + pub fn new_with_doris( schema: Schema, col_indices: Option>, @@ -71,6 +87,7 @@ impl JsonEncoder { Self { schema, col_indices, + time_handling_mode: TimeHandlingMode::Milli, date_handling_mode: DateHandlingMode::String, timestamp_handling_mode, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, @@ -85,22 +102,6 @@ impl JsonEncoder { ..self } } - - pub fn new_with_big_query( - schema: Schema, - col_indices: Option>, - timestamp_handling_mode: TimestampHandlingMode, - ) -> Self { - Self { - schema, - col_indices, - date_handling_mode: DateHandlingMode::String, - timestamp_handling_mode, - timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, - custom_json_type: CustomJsonType::Bigquery, - kafka_connect: None, - } - } } impl RowEncoder for JsonEncoder { @@ -130,6 +131,7 @@ impl RowEncoder for JsonEncoder { self.date_handling_mode, self.timestamp_handling_mode, self.timestamptz_handling_mode, + self.time_handling_mode, &self.custom_json_type, ) .map_err(|e| SinkError::Encode(e.to_string()))?; @@ -166,6 +168,7 @@ fn datum_to_json_object( date_handling_mode: DateHandlingMode, timestamp_handling_mode: TimestampHandlingMode, timestamptz_handling_mode: TimestamptzHandlingMode, + time_handling_mode: TimeHandlingMode, custom_json_type: &CustomJsonType, ) -> ArrayResult { let scalar_ref = match datum { @@ -216,7 +219,7 @@ fn datum_to_json_object( } json!(v_string) } - CustomJsonType::None | CustomJsonType::Bigquery => { + CustomJsonType::Es | CustomJsonType::None => { json!(v.to_text()) } }, @@ -234,10 +237,16 @@ fn datum_to_json_object( TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()), TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()), }, - (DataType::Time, ScalarRefImpl::Time(v)) => { - // todo: just ignore the nanos part to avoid leap second complex - json!(v.0.num_seconds_from_midnight() as i64 * 1000) - } + (DataType::Time, ScalarRefImpl::Time(v)) => match time_handling_mode { + TimeHandlingMode::Milli => { + // todo: just ignore the nanos part to avoid leap second complex + json!(v.0.num_seconds_from_midnight() as i64 * 1000) + } + TimeHandlingMode::String => { + let a = v.0.format("%H:%M:%S%.6f").to_string(); + json!(a) + } + }, (DataType::Date, ScalarRefImpl::Date(v)) => match date_handling_mode { DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()), DateHandlingMode::FromEpoch => { @@ -260,9 +269,10 @@ fn datum_to_json_object( (DataType::Interval, ScalarRefImpl::Interval(v)) => { json!(v.as_iso_8601()) } - (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => { - json!(jsonb_ref.to_string()) - } + (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type { + CustomJsonType::Es => JsonbVal::from(jsonb_ref).take(), + CustomJsonType::Doris(_) | CustomJsonType::None => json!(jsonb_ref.to_string()), + }, (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { let elems = list_ref.iter(); let mut vec = Vec::with_capacity(elems.len()); @@ -274,6 +284,7 @@ fn datum_to_json_object( date_handling_mode, timestamp_handling_mode, timestamptz_handling_mode, + time_handling_mode, custom_json_type, )?; vec.push(value); @@ -295,6 +306,7 @@ fn datum_to_json_object( date_handling_mode, timestamp_handling_mode, timestamptz_handling_mode, + time_handling_mode, custom_json_type, )?; map.insert(sub_field.name.clone(), value); @@ -303,7 +315,7 @@ fn datum_to_json_object( ArrayError::internal(format!("Json to string err{:?}", err)) })?) } - CustomJsonType::None | CustomJsonType::Bigquery => { + CustomJsonType::Es | CustomJsonType::None => { let mut map = Map::with_capacity(st.len()); for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( st.iter() @@ -315,6 +327,7 @@ fn datum_to_json_object( date_handling_mode, timestamp_handling_mode, timestamptz_handling_mode, + time_handling_mode, custom_json_type, )?; map.insert(sub_field.name.clone(), value); @@ -433,6 +446,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -447,6 +461,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -461,6 +476,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -480,6 +496,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -495,6 +512,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcWithoutSuffix, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -512,6 +530,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -529,6 +548,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -547,6 +567,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -564,6 +585,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -581,6 +603,7 @@ mod tests { DateHandlingMode::String, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::Doris(map), ) .unwrap(); @@ -595,6 +618,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -609,6 +633,7 @@ mod tests { DateHandlingMode::FromEpoch, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -623,6 +648,7 @@ mod tests { DateHandlingMode::String, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::Doris(HashMap::default()), ) .unwrap(); @@ -647,6 +673,7 @@ mod tests { DateHandlingMode::String, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::Doris(HashMap::default()), ) .unwrap(); diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index ad32e78aa48c..83c28ce4b4a5 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -97,6 +97,12 @@ pub enum TimestampHandlingMode { String, } +#[derive(Clone, Copy)] +pub enum TimeHandlingMode { + Milli, + String, +} + #[derive(Clone, Copy, Default)] pub enum TimestamptzHandlingMode { #[default] @@ -134,8 +140,8 @@ pub enum CustomJsonType { // The internal order of the struct should follow the insertion order. // The decimal needs verification and calibration. Doris(HashMap), - // Bigquery's json need date is string. - Bigquery, + // Es's json need jsonb is struct + Es, None, } diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index ae98e7b51e62..6fff15058bd6 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -21,7 +21,8 @@ use tracing::warn; use super::{Result, SinkFormatter, StreamChunk}; use crate::sink::encoder::{ - DateHandlingMode, JsonEncoder, RowEncoder, TimestampHandlingMode, TimestamptzHandlingMode, + DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, + TimestamptzHandlingMode, }; use crate::tri; @@ -67,6 +68,7 @@ impl DebeziumJsonFormatter { DateHandlingMode::FromEpoch, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, ); let val_encoder = JsonEncoder::new( schema.clone(), @@ -74,6 +76,7 @@ impl DebeziumJsonFormatter { DateHandlingMode::FromEpoch, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, ); Self { schema, @@ -393,6 +396,7 @@ mod tests { DateHandlingMode::FromEpoch, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, ); let json_chunk = chunk_to_json(chunk, &encoder).unwrap(); let schema_json = schema_to_json(&schema, "test_db", "test_table"); diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 91025bf3b401..74c6d5415113 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -28,7 +28,9 @@ pub use upsert::UpsertFormatter; use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; use super::encoder::template::TemplateEncoder; -use super::encoder::{DateHandlingMode, KafkaConnectParams, TimestamptzHandlingMode}; +use super::encoder::{ + DateHandlingMode, KafkaConnectParams, TimeHandlingMode, TimestamptzHandlingMode, +}; use super::redis::{KEY_FORMAT, VALUE_FORMAT}; use crate::sink::encoder::{ AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, TimestampHandlingMode, @@ -102,6 +104,7 @@ impl SinkFormatterImpl { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, timestamptz_mode, + TimeHandlingMode::Milli, ) }); @@ -113,6 +116,7 @@ impl SinkFormatterImpl { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, timestamptz_mode, + TimeHandlingMode::Milli, ); let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); Ok(SinkFormatterImpl::AppendOnlyJson(formatter)) @@ -174,6 +178,7 @@ impl SinkFormatterImpl { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, timestamptz_mode, + TimeHandlingMode::Milli, ); let mut val_encoder = JsonEncoder::new( schema, @@ -181,6 +186,7 @@ impl SinkFormatterImpl { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, timestamptz_mode, + TimeHandlingMode::Milli, ); if let Some(s) = format_desc.options.get("schemas.enable") { diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index fd9935a12f5d..0c59780916cb 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -565,7 +565,8 @@ mod test { use super::*; use crate::sink::encoder::{ - DateHandlingMode, JsonEncoder, TimestampHandlingMode, TimestamptzHandlingMode, + DateHandlingMode, JsonEncoder, TimeHandlingMode, TimestampHandlingMode, + TimestamptzHandlingMode, }; use crate::sink::formatter::AppendOnlyFormatter; @@ -738,6 +739,7 @@ mod test { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, ), )), ) diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 7bbd42edcd33..e5f9df155c73 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -20,6 +20,7 @@ pub mod coordinate; pub mod deltalake; pub mod doris; pub mod doris_starrocks_connector; +pub mod elasticsearch; pub mod encoder; pub mod formatter; pub mod iceberg; diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index a51ab42050fd..01bac5e6b048 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -25,7 +25,7 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; use with_options::WithOptions; -use super::encoder::{DateHandlingMode, TimestamptzHandlingMode}; +use super::encoder::{DateHandlingMode, TimeHandlingMode, TimestamptzHandlingMode}; use super::utils::chunk_to_json; use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::common::NatsCommon; @@ -146,6 +146,7 @@ impl NatsSinkWriter { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcWithoutSuffix, + TimeHandlingMode::Milli, ), }) } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index f76b685f96c1..320f77c6a47b 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -26,6 +26,7 @@ use itertools::Itertools; use jni::JavaVM; use prost::Message; use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; use risingwave_common::util::drop_either_future; @@ -39,8 +40,9 @@ use risingwave_pb::connector_service::sink_writer_stream_request::{ }; use risingwave_pb::connector_service::{ sink_coordinator_stream_request, sink_coordinator_stream_response, sink_writer_stream_response, - SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, SinkPayloadFormat, - SinkWriterStreamRequest, SinkWriterStreamResponse, ValidateSinkRequest, ValidateSinkResponse, + PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, + SinkPayloadFormat, SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, + ValidateSinkRequest, ValidateSinkResponse, }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::{ @@ -53,6 +55,7 @@ use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; +use super::elasticsearch::{StreamChunkConverter, ES_OPTION_DELIMITER}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; @@ -145,21 +148,28 @@ impl Sink for RemoteSink { } async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - RemoteLogSinker::new(self.param.clone(), writer_param).await + RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await } async fn validate(&self) -> Result<()> { - validate_remote_sink(&self.param).await?; + validate_remote_sink(&self.param, Self::SINK_NAME).await?; Ok(()) } } -async fn validate_remote_sink(param: &SinkParam) -> anyhow::Result<()> { +async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> anyhow::Result<()> { + if sink_name == ElasticSearchSink::SINK_NAME + && param.downstream_pk.len() > 1 + && param.properties.get(ES_OPTION_DELIMITER).is_none() + { + return Err(anyhow_error!( + "Es sink only support single pk or pk with delimiter option" + )); + } // FIXME: support struct and array in stream sink param.columns.iter().map(|col| { - if matches!( - col.data_type, - DataType::Int16 + match &col.data_type { + DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 @@ -173,18 +183,34 @@ async fn validate_remote_sink(param: &SinkParam) -> anyhow::Result<()> { | DataType::Time | DataType::Interval | DataType::Jsonb - | DataType::Bytea - | DataType::List(_) - ) { - Ok(()) - } else { - Err(SinkError::Remote(anyhow_error!( - "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, List, Bytea and Varchar, got {:?}: {:?}", - col.name, - col.data_type, - ))) - } - }).try_collect()?; + | DataType::Bytea => Ok(()), + DataType::List(list) => { + if (sink_name==ElasticSearchSink::SINK_NAME) | matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ + Ok(()) + } else{ + Err(SinkError::Remote(anyhow_error!( + "Remote sink only support list, got {:?}: {:?}", + col.name, + col.data_type, + ))) + } + }, + DataType::Struct(_) => { + if sink_name==ElasticSearchSink::SINK_NAME{ + Ok(()) + }else{ + Err(SinkError::Remote(anyhow_error!( + "Only Es sink support struct, got {:?}: {:?}", + col.name, + col.data_type, + ))) + } + }, + DataType::Serial | DataType::Int256 => Err(SinkError::Remote(anyhow_error!( + "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}", + col.name, + col.data_type, + )))}}).try_collect()?; let jvm = JVM.get_or_init()?; let sink_param = param.to_proto(); @@ -226,15 +252,34 @@ pub struct RemoteLogSinker { request_sender: BidiStreamSender, response_stream: BidiStreamReceiver, sink_metrics: SinkMetrics, + stream_chunk_converter: StreamChunkConverter, } impl RemoteLogSinker { - async fn new(sink_param: SinkParam, writer_param: SinkWriterParam) -> Result { + async fn new( + sink_param: SinkParam, + writer_param: SinkWriterParam, + sink_name: &str, + ) -> Result { + let sink_proto = sink_param.to_proto(); + let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME { + let columns = vec![ + ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(), + ColumnDesc::unnamed(ColumnId::from(1), DataType::Jsonb).to_protobuf(), + ]; + Some(TableSchema { + columns, + pk_indices: vec![], + }) + } else { + sink_proto.table_schema.clone() + }; + let SinkWriterStreamHandle { request_sender, response_stream, } = EmbeddedConnectorClient::new()? - .start_sink_writer_stream(sink_param, SinkPayloadFormat::StreamChunk) + .start_sink_writer_stream(payload_schema, sink_proto, SinkPayloadFormat::StreamChunk) .await?; let sink_metrics = writer_param.sink_metrics; @@ -242,6 +287,12 @@ impl RemoteLogSinker { request_sender, response_stream, sink_metrics, + stream_chunk_converter: StreamChunkConverter::new( + sink_name, + sink_param.schema(), + &sink_param.downstream_pk, + &sink_param.properties, + )?, }) } } @@ -385,6 +436,7 @@ impl LogSinker for RemoteLogSinker { .connector_sink_rows_received .inc_by(cardinality as _); + let chunk = self.stream_chunk_converter.convert_chunk(chunk)?; request_tx .send_request(JniSinkWriterStreamRequest::Chunk { epoch, @@ -451,7 +503,7 @@ impl Sink for CoordinatedRemoteSink { const SINK_NAME: &'static str = R::SINK_NAME; async fn validate(&self) -> Result<()> { - validate_remote_sink(&self.param).await?; + validate_remote_sink(&self.param, Self::SINK_NAME).await?; Ok(()) } @@ -498,8 +550,13 @@ impl CoordinatedRemoteSinkWriter { connector_params: ConnectorParams, sink_metrics: SinkMetrics, ) -> Result { + let sink_proto = param.to_proto(); let stream_handle = EmbeddedConnectorClient::new()? - .start_sink_writer_stream(param.clone(), connector_params.sink_payload_format) + .start_sink_writer_stream( + sink_proto.table_schema.clone(), + sink_proto, + connector_params.sink_payload_format, + ) .await?; Ok(Self { @@ -635,14 +692,16 @@ impl EmbeddedConnectorClient { async fn start_sink_writer_stream( &self, - sink_param: SinkParam, + payload_schema: Option, + sink_proto: PbSinkParam, sink_payload_format: SinkPayloadFormat, ) -> Result> { let (handle, first_rsp) = SinkWriterStreamHandle::initialize( SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { - sink_param: Some(sink_param.to_proto()), + sink_param: Some(sink_proto), format: sink_payload_format as i32, + payload_schema, })), }, |rx| async move { diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index 85d9b6c9f6ec..cd53cd019ea6 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -265,14 +265,16 @@ impl ConnectorClient { pub async fn start_sink_writer_stream( &self, - sink_param: SinkParam, + payload_schema: Option, + sink_proto: PbSinkParam, sink_payload_format: SinkPayloadFormat, ) -> Result { let mut rpc_client = self.rpc_client.clone(); let (handle, first_rsp) = SinkWriterStreamHandle::initialize( SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { - sink_param: Some(sink_param), + payload_schema, + sink_param: Some(sink_proto), format: sink_payload_format as i32, })), }, diff --git a/src/stream/src/task/env.rs b/src/stream/src/task/env.rs index 63e610ad69ef..0083e8e02ff2 100644 --- a/src/stream/src/task/env.rs +++ b/src/stream/src/task/env.rs @@ -20,8 +20,6 @@ use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; use risingwave_common::util::addr::HostAddr; use risingwave_connector::source::monitor::SourceMetrics; use risingwave_connector::ConnectorParams; -#[cfg(test)] -use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_rpc_client::MetaClient; use risingwave_source::dml_manager::DmlManagerRef; use risingwave_storage::StateStoreImpl; @@ -94,6 +92,7 @@ impl StreamEnvironment { #[cfg(test)] pub fn for_test() -> Self { use risingwave_common::system_param::local_manager::LocalSystemParamsManager; + use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_source::dml_manager::DmlManager; use risingwave_storage::monitor::MonitoredStorageMetrics; StreamEnvironment {