From c93159712742466d3e5528515c5de892ffdfd9c0 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 22 Dec 2023 18:27:18 +0800 Subject: [PATCH 01/11] save --- .../connector/SinkWriterStreamObserver.java | 10 + .../java/com/risingwave/connector/EsSink.java | 207 ++++++++++-------- src/connector/src/sink/encoder/json.rs | 3 +- src/connector/src/sink/remote.rs | 63 +++++- 4 files changed, 184 insertions(+), 99 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index 8625b6298ffb2..22bd7c5b87546 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -23,7 +23,10 @@ import com.risingwave.metrics.ConnectorNodeMetrics; import com.risingwave.metrics.MonitoredRowIterable; import com.risingwave.proto.ConnectorServiceProto; +import com.risingwave.proto.Data; +import com.risingwave.proto.Data.DataType.TypeName; import io.grpc.stub.StreamObserver; +import java.util.List; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,6 +210,13 @@ private void bindSink( String connectorName = getConnectorName(sinkParam); SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName); sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap()); + if (connectorName.equals("elasticsearch")) { + tableSchema = + new TableSchema( + List.of("test"), + List.of(Data.DataType.newBuilder().setTypeName(TypeName.JSONB).build()), + List.of()); + } switch (format) { case FORMAT_UNSPECIFIED: case UNRECOGNIZED: diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 3e69fa184baea..4efd21498b487 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -15,8 +15,8 @@ package com.risingwave.connector; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkRow; @@ -47,6 +47,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,9 +69,9 @@ public class EsSink extends SinkWriterBase { private static final String ERROR_REPORT_TEMPLATE = "Error message %s"; private static final TimeZone UTCTimeZone = TimeZone.getTimeZone("UTC"); - private final SimpleDateFormat tDfm; - private final SimpleDateFormat tsDfm; - private final SimpleDateFormat tstzDfm; + // private final SimpleDateFormat tDfm; + // private final SimpleDateFormat tsDfm; + // private final SimpleDateFormat tstzDfm; private final EsSinkConfig config; private BulkProcessor bulkProcessor; @@ -198,13 +199,13 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { this.bulkProcessor = createBulkProcessor(this.requestTracker); primaryKeyIndexes = new ArrayList(); - for (String primaryKey : tableSchema.getPrimaryKeys()) { - primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey)); + for (String primaryKey : getTableSchema().getPrimaryKeys()) { + primaryKeyIndexes.add(getTableSchema().getColumnIndex(primaryKey)); } - tDfm = createSimpleDateFormat("HH:mm:ss.SSS", UTCTimeZone); - tsDfm = createSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", UTCTimeZone); - tstzDfm = createSimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", UTCTimeZone); + // tDfm = createSimpleDateFormat("HH:mm:ss.SSS", UTCTimeZone); + // tsDfm = createSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", UTCTimeZone); + // tstzDfm = createSimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", UTCTimeZone); } private static RestClientBuilder configureRestClientBuilder( @@ -297,98 +298,116 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } } - /** - * The api accepts doc in map form. - * - * @param row - * @return Map from Field name to Value - * @throws JsonProcessingException - * @throws JsonMappingException - */ - private Map buildDoc(SinkRow row) - throws JsonMappingException, JsonProcessingException { - Map doc = new HashMap(); - var tableSchema = getTableSchema(); - var columnDescs = tableSchema.getColumnDescs(); - for (int i = 0; i < row.size(); i++) { - var type = columnDescs.get(i).getDataType().getTypeName(); - Object col = row.get(i); - switch (type) { - // es client doesn't natively support java.sql.Timestamp/Time/Date - // so we need to convert Date/Time/Timestamp type into a string as suggested in - // https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292 - case DATE: - col = col.toString(); - break; - // construct java.sql.Time/Timestamp with milliseconds time value. - // it will use system timezone by default, so we have to set timezone manually - case TIME: - col = tDfm.format(col); - break; - case TIMESTAMP: - col = tsDfm.format(col); - break; - case TIMESTAMPTZ: - col = tstzDfm.format(col); - break; - case JSONB: - ObjectMapper mapper = new ObjectMapper(); - JsonNode jsonNode = mapper.readTree((String) col); - col = convertJsonNode(jsonNode); - break; - default: - break; - } - - doc.put(getTableSchema().getColumnDesc(i).getName(), col); - } - return doc; - } - - private static Object convertJsonNode(JsonNode jsonNode) { - if (jsonNode.isObject()) { - Map resultMap = new HashMap<>(); - jsonNode.fields() - .forEachRemaining( - entry -> { - resultMap.put(entry.getKey(), convertJsonNode(entry.getValue())); - }); - return resultMap; - } else if (jsonNode.isArray()) { - List resultList = new ArrayList<>(); - jsonNode.elements() - .forEachRemaining( - element -> { - resultList.add(convertJsonNode(element)); - }); - return resultList; - } else if (jsonNode.isNumber()) { - return jsonNode.numberValue(); - } else if (jsonNode.isTextual()) { - return jsonNode.textValue(); - } else if (jsonNode.isBoolean()) { - return jsonNode.booleanValue(); - } else if (jsonNode.isNull()) { - return null; - } else { - throw new IllegalArgumentException("Unsupported JSON type"); - } - } + // /** + // * The api accepts doc in map form. + // * + // * @param row + // * @return Map from Field name to Value + // * @throws JsonProcessingException + // * @throws JsonMappingException + // */ + // private Map buildDoc(SinkRow row) + // throws JsonMappingException, JsonProcessingException { + // Map doc = new HashMap(); + // var tableSchema = getTableSchema(); + // var columnDescs = tableSchema.getColumnDescs(); + // for (int i = 0; i < row.size(); i++) { + // var type = columnDescs.get(i).getDataType().getTypeName(); + // Object col = row.get(i); + // switch (type) { + // // es client doesn't natively support java.sql.Timestamp/Time/Date + // // so we need to convert Date/Time/Timestamp type into a string as suggested + // in + // // + // https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292 + // case DATE: + // col = col.toString(); + // break; + // // construct java.sql.Time/Timestamp with milliseconds time value. + // // it will use system timezone by default, so we have to set timezone + // manually + // case TIME: + // col = tDfm.format(col); + // break; + // case TIMESTAMP: + // col = tsDfm.format(col); + // break; + // case TIMESTAMPTZ: + // col = tstzDfm.format(col); + // break; + // case JSONB: + // ObjectMapper mapper = new ObjectMapper(); + // JsonNode jsonNode = mapper.readTree((String) col); + // col = convertJsonNode(jsonNode); + // break; + // default: + // break; + // } + + // doc.put(getTableSchema().getColumnDesc(i).getName(), col); + // } + // return doc; + // } + + // private static Object convertJsonNode(JsonNode jsonNode) { + // if (jsonNode.isObject()) { + // Map resultMap = new HashMap<>(); + // jsonNode.fields() + // .forEachRemaining( + // entry -> { + // resultMap.put(entry.getKey(), convertJsonNode(entry.getValue())); + // }); + // return resultMap; + // } else if (jsonNode.isArray()) { + // List resultList = new ArrayList<>(); + // jsonNode.elements() + // .forEachRemaining( + // element -> { + // resultList.add(convertJsonNode(element)); + // }); + // return resultList; + // } else if (jsonNode.isNumber()) { + // return jsonNode.numberValue(); + // } else if (jsonNode.isTextual()) { + // return jsonNode.textValue(); + // } else if (jsonNode.isBoolean()) { + // return jsonNode.booleanValue(); + // } else if (jsonNode.isNull()) { + // return null; + // } else { + // throw new IllegalArgumentException("Unsupported JSON type"); + // } + // } /** * use primary keys as id concatenated by a specific delimiter. * * @param row * @return + * @throws JsonProcessingException + * @throws JsonMappingException */ - private String buildId(SinkRow row) { + private String buildId(SinkRow row) throws JsonMappingException, JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + Map col = + mapper.readValue((String) row.get(0), new TypeReference>() {}); String id; + var tableSchema = getTableSchema(); if (primaryKeyIndexes.isEmpty()) { - id = row.get(0).toString(); + if (col.get(tableSchema.getColumnDesc(0).getName()) != null) { + id = col.get(tableSchema.getColumnDesc(0).getName()).toString(); + } else { + throw Status.INVALID_ARGUMENT + .withDescription("No primary key find in row") + .asRuntimeException(); + } } else { List keys = primaryKeyIndexes.stream() - .map(index -> row.get(primaryKeyIndexes.get(index)).toString()) + .map( + index -> + col.get(tableSchema.getColumnDesc(index).getName()) + .toString()) .collect(Collectors.toList()); id = String.join(config.getDelimiter(), keys); } @@ -396,17 +415,18 @@ private String buildId(SinkRow row) { } private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException { - Map doc = buildDoc(row); + String doc = (String) row.get(0); final String key = buildId(row); UpdateRequest updateRequest = - new UpdateRequest(config.getIndex(), "doc", key).doc(doc).upsert(doc); + new UpdateRequest(config.getIndex(), "doc", key).doc(doc, XContentType.JSON); + updateRequest.docAsUpsert(true); this.requestTracker.addWriteTask(); bulkProcessor.add(updateRequest); } - private void processDelete(SinkRow row) { - final String key = buildId(row); + private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException { + String key = buildId(row); DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "doc", key); this.requestTracker.addWriteTask(); bulkProcessor.add(deleteRequest); @@ -436,6 +456,7 @@ public void write(Iterator rows) { try { writeRow(row); } catch (Exception ex) { + ex.printStackTrace(); throw new RuntimeException(ex); } } diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 1cf64983e9a26..e432d6006c264 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -261,7 +261,8 @@ fn datum_to_json_object( json!(v.as_iso_8601()) } (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => { - json!(jsonb_ref.to_string()) + let asad: Map = serde_json::from_str(&jsonb_ref.to_string()).unwrap(); + json!(asad) } (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { let elems = list_ref.iter(); diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 9e7965ed73bbf..86add556e47c4 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -25,9 +25,10 @@ use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use jni::JavaVM; use prost::Message; -use risingwave_common::array::StreamChunk; +use risingwave_common::array::{StreamChunk, ArrayImpl, JsonbArrayBuilder}; +use risingwave_common::catalog::Schema; use risingwave_common::error::anyhow_error; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, JsonbVal, Scalar}; use risingwave_common::util::drop_either_future; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{ @@ -47,6 +48,7 @@ use risingwave_rpc_client::{ BidiStreamReceiver, BidiStreamSender, SinkCoordinatorStreamHandle, SinkWriterStreamHandle, DEFAULT_BUFFER_SIZE, }; +use serde_json::Value; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio::task::spawn_blocking; @@ -63,6 +65,8 @@ use crate::sink::{ }; use crate::ConnectorParams; +use super::encoder::{JsonEncoder, DateHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, RowEncoder}; + macro_rules! def_remote_sink { () => { def_remote_sink! { @@ -145,7 +149,7 @@ impl Sink for RemoteSink { } async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - RemoteLogSinker::new(self.param.clone(), writer_param).await + RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME, self.param.schema()).await } async fn validate(&self) -> Result<()> { @@ -174,11 +178,12 @@ async fn validate_remote_sink(param: &SinkParam) -> Result<()> { | DataType::Jsonb | DataType::Bytea | DataType::List(_) + | DataType::Struct(_) ) { Ok(()) } else { Err(SinkError::Remote(anyhow_error!( - "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, List, Bytea and Varchar, got {:?}: {:?}", + "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, List, Bytea, Struct and Varchar, got {:?}: {:?}", col.name, col.data_type, ))) @@ -228,14 +233,60 @@ async fn validate_remote_sink(param: &SinkParam) -> Result<()> { .map_err(|e| anyhow!("unable to validate: {:?}", e))? } +enum StreamChunkConverter{ + ES(ESStreamChunkConverter), + Other, +} +impl StreamChunkConverter{ + fn new(sink_name: &str, schema:Schema) -> Self{ + if sink_name.eq("elasticsearch") { + StreamChunkConverter::ES(ESStreamChunkConverter::new(schema)) + }else{ + StreamChunkConverter::Other + } + } + + fn convert_chunk(&self, chunk: StreamChunk) -> Result{ + match self { + StreamChunkConverter::ES(es) => { + es.convert_chunk(chunk) + } + _ => { Ok(chunk) } + } + } +} +struct ESStreamChunkConverter{ + json_encoder: JsonEncoder, +} +impl ESStreamChunkConverter{ + fn new(schema:Schema) -> Self{ + let json_encoder = JsonEncoder::new(schema, None, DateHandlingMode::String, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcWithoutSuffix); + Self{ + json_encoder, + } + } + + fn convert_chunk(&self, chunk: StreamChunk) -> Result{ + let mut ops = vec![]; + let mut json_builder = ::new(chunk.capacity()); + for (op,row) in chunk.rows(){ + ops.push(op); + let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?)); + risingwave_common::array::ArrayBuilder::append(&mut json_builder, Some(json.as_scalar_ref())); + } + let a = risingwave_common::array::ArrayBuilder::finish(json_builder); + Ok(StreamChunk::new(ops,vec![std::sync::Arc::new(ArrayImpl::Jsonb(a))])) + } +} pub struct RemoteLogSinker { request_sender: BidiStreamSender, response_stream: BidiStreamReceiver, sink_metrics: SinkMetrics, + stream_chunk_converter: StreamChunkConverter, } impl RemoteLogSinker { - async fn new(sink_param: SinkParam, writer_param: SinkWriterParam) -> Result { + async fn new(sink_param: SinkParam, writer_param: SinkWriterParam, sink_name: &str, schema: Schema) -> Result { let SinkWriterStreamHandle { request_sender, response_stream, @@ -248,6 +299,7 @@ impl RemoteLogSinker { request_sender, response_stream, sink_metrics, + stream_chunk_converter: StreamChunkConverter::new(sink_name, schema), }) } } @@ -391,6 +443,7 @@ impl LogSinker for RemoteLogSinker { .connector_sink_rows_received .inc_by(cardinality as _); + let chunk = self.stream_chunk_converter.convert_chunk(chunk)?; request_tx .send_request(JniSinkWriterStreamRequest::Chunk { epoch, From 70c48a968d1713329f87df7c799b83e9dd4e1f43 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Mon, 25 Dec 2023 15:24:07 +0800 Subject: [PATCH 02/11] fix jsonb --- src/connector/src/sink/big_query.rs | 8 ++- src/connector/src/sink/encoder/json.rs | 42 +++++++--------- src/connector/src/sink/encoder/mod.rs | 4 +- src/connector/src/sink/remote.rs | 69 +++++++++++++++----------- 4 files changed, 68 insertions(+), 55 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 1f2e4bbb46f1c..450802bc52333 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -33,7 +33,9 @@ use url::Url; use with_options::WithOptions; use yup_oauth2::ServiceAccountKey; -use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use super::encoder::{ + DateHandlingMode, JsonEncoder, RowEncoder, TimestampHandlingMode, TimestamptzHandlingMode, +}; use super::writer::LogSinkerOf; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::aws_utils::load_file_descriptor_from_s3; @@ -306,10 +308,12 @@ impl BigQuerySinkWriter { client, is_append_only, insert_request: TableDataInsertAllRequest::new(), - row_encoder: JsonEncoder::new_with_big_query( + row_encoder: JsonEncoder::new( schema, None, + DateHandlingMode::String, TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, ), }) } diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index e432d6006c264..4052aebbf60f0 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -23,7 +23,7 @@ use itertools::Itertools; use risingwave_common::array::{ArrayError, ArrayResult}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::Row; -use risingwave_common::types::{DataType, DatumRef, Decimal, ScalarRefImpl, ToText}; +use risingwave_common::types::{DataType, DatumRef, Decimal, JsonbVal, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_json::{json, Map, Value}; @@ -62,6 +62,18 @@ impl JsonEncoder { } } + pub fn new_with_es(schema: Schema, col_indices: Option>) -> Self { + Self { + schema, + col_indices, + date_handling_mode: DateHandlingMode::String, + timestamp_handling_mode: TimestampHandlingMode::String, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, + custom_json_type: CustomJsonType::Es, + kafka_connect: None, + } + } + pub fn new_with_doris( schema: Schema, col_indices: Option>, @@ -85,22 +97,6 @@ impl JsonEncoder { ..self } } - - pub fn new_with_big_query( - schema: Schema, - col_indices: Option>, - timestamp_handling_mode: TimestampHandlingMode, - ) -> Self { - Self { - schema, - col_indices, - date_handling_mode: DateHandlingMode::String, - timestamp_handling_mode, - timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, - custom_json_type: CustomJsonType::Bigquery, - kafka_connect: None, - } - } } impl RowEncoder for JsonEncoder { @@ -216,7 +212,7 @@ fn datum_to_json_object( } json!(v_string) } - CustomJsonType::None | CustomJsonType::Bigquery => { + _ => { json!(v.to_text()) } }, @@ -260,10 +256,10 @@ fn datum_to_json_object( (DataType::Interval, ScalarRefImpl::Interval(v)) => { json!(v.as_iso_8601()) } - (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => { - let asad: Map = serde_json::from_str(&jsonb_ref.to_string()).unwrap(); - json!(asad) - } + (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type { + CustomJsonType::Es => JsonbVal::from(jsonb_ref).take(), + _ => json!(jsonb_ref.to_string()), + }, (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { let elems = list_ref.iter(); let mut vec = Vec::with_capacity(elems.len()); @@ -304,7 +300,7 @@ fn datum_to_json_object( ArrayError::internal(format!("Json to string err{:?}", err)) })?) } - CustomJsonType::None | CustomJsonType::Bigquery => { + _ => { let mut map = Map::with_capacity(st.len()); for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( st.iter() diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 35a9f85e26d98..e77cc1e171dbe 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -134,8 +134,8 @@ pub enum CustomJsonType { // The internal order of the struct should follow the insertion order. // The decimal needs verification and calibration. Doris(HashMap), - // Bigquery's json need date is string. - Bigquery, + // Es's json need jsonb is struct + Es, None, } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 86add556e47c4..e2bc94f327ab4 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -25,7 +25,7 @@ use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use jni::JavaVM; use prost::Message; -use risingwave_common::array::{StreamChunk, ArrayImpl, JsonbArrayBuilder}; +use risingwave_common::array::{ArrayImpl, JsonbArrayBuilder, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::error::anyhow_error; use risingwave_common::types::{DataType, JsonbVal, Scalar}; @@ -55,6 +55,7 @@ use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; +use super::encoder::{JsonEncoder, RowEncoder}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; @@ -65,8 +66,6 @@ use crate::sink::{ }; use crate::ConnectorParams; -use super::encoder::{JsonEncoder, DateHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, RowEncoder}; - macro_rules! def_remote_sink { () => { def_remote_sink! { @@ -149,7 +148,13 @@ impl Sink for RemoteSink { } async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME, self.param.schema()).await + RemoteLogSinker::new( + self.param.clone(), + writer_param, + Self::SINK_NAME, + self.param.schema(), + ) + .await } async fn validate(&self) -> Result<()> { @@ -233,49 +238,52 @@ async fn validate_remote_sink(param: &SinkParam) -> Result<()> { .map_err(|e| anyhow!("unable to validate: {:?}", e))? } -enum StreamChunkConverter{ - ES(ESStreamChunkConverter), +enum StreamChunkConverter { + Es(EsStreamChunkConverter), Other, } -impl StreamChunkConverter{ - fn new(sink_name: &str, schema:Schema) -> Self{ +impl StreamChunkConverter { + fn new(sink_name: &str, schema: Schema) -> Self { if sink_name.eq("elasticsearch") { - StreamChunkConverter::ES(ESStreamChunkConverter::new(schema)) - }else{ + StreamChunkConverter::Es(EsStreamChunkConverter::new(schema)) + } else { StreamChunkConverter::Other } } - fn convert_chunk(&self, chunk: StreamChunk) -> Result{ + fn convert_chunk(&self, chunk: StreamChunk) -> Result { match self { - StreamChunkConverter::ES(es) => { - es.convert_chunk(chunk) - } - _ => { Ok(chunk) } + StreamChunkConverter::Es(es) => es.convert_chunk(chunk), + _ => Ok(chunk), } } } -struct ESStreamChunkConverter{ +struct EsStreamChunkConverter { json_encoder: JsonEncoder, } -impl ESStreamChunkConverter{ - fn new(schema:Schema) -> Self{ - let json_encoder = JsonEncoder::new(schema, None, DateHandlingMode::String, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcWithoutSuffix); - Self{ - json_encoder, - } +impl EsStreamChunkConverter { + fn new(schema: Schema) -> Self { + let json_encoder = JsonEncoder::new_with_es(schema, None); + Self { json_encoder } } - fn convert_chunk(&self, chunk: StreamChunk) -> Result{ + fn convert_chunk(&self, chunk: StreamChunk) -> Result { let mut ops = vec![]; - let mut json_builder = ::new(chunk.capacity()); - for (op,row) in chunk.rows(){ + let mut json_builder = + ::new(chunk.capacity()); + for (op, row) in chunk.rows() { ops.push(op); let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?)); - risingwave_common::array::ArrayBuilder::append(&mut json_builder, Some(json.as_scalar_ref())); + risingwave_common::array::ArrayBuilder::append( + &mut json_builder, + Some(json.as_scalar_ref()), + ); } let a = risingwave_common::array::ArrayBuilder::finish(json_builder); - Ok(StreamChunk::new(ops,vec![std::sync::Arc::new(ArrayImpl::Jsonb(a))])) + Ok(StreamChunk::new( + ops, + vec![std::sync::Arc::new(ArrayImpl::Jsonb(a))], + )) } } pub struct RemoteLogSinker { @@ -286,7 +294,12 @@ pub struct RemoteLogSinker { } impl RemoteLogSinker { - async fn new(sink_param: SinkParam, writer_param: SinkWriterParam, sink_name: &str, schema: Schema) -> Result { + async fn new( + sink_param: SinkParam, + writer_param: SinkWriterParam, + sink_name: &str, + schema: Schema, + ) -> Result { let SinkWriterStreamHandle { request_sender, response_stream, From f7ebc497aa3b2c4bb1a803f9bf8d805583b842b9 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 27 Dec 2023 15:26:58 +0800 Subject: [PATCH 03/11] add build id remove sout --- .../connector/SinkWriterStreamObserver.java | 8 +- .../java/com/risingwave/connector/EsSink.java | 136 +----------------- src/connector/src/sink/remote.rs | 75 ++++++++-- 3 files changed, 76 insertions(+), 143 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index 22bd7c5b87546..923f2f3be387b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -213,8 +213,12 @@ private void bindSink( if (connectorName.equals("elasticsearch")) { tableSchema = new TableSchema( - List.of("test"), - List.of(Data.DataType.newBuilder().setTypeName(TypeName.JSONB).build()), + List.of("id", "json_result"), + List.of( + Data.DataType.newBuilder() + .setTypeName(TypeName.VARCHAR) + .build(), + Data.DataType.newBuilder().setTypeName(TypeName.JSONB).build()), List.of()); } switch (format) { diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 4efd21498b487..efe2796c42577 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -15,9 +15,7 @@ package com.risingwave.connector; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkRow; import com.risingwave.connector.api.sink.SinkWriterBase; @@ -27,7 +25,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -68,11 +65,6 @@ public class EsSink extends SinkWriterBase { private static final Logger LOG = LoggerFactory.getLogger(EsSink.class); private static final String ERROR_REPORT_TEMPLATE = "Error message %s"; - private static final TimeZone UTCTimeZone = TimeZone.getTimeZone("UTC"); - // private final SimpleDateFormat tDfm; - // private final SimpleDateFormat tsDfm; - // private final SimpleDateFormat tstzDfm; - private final EsSinkConfig config; private BulkProcessor bulkProcessor; private final RestHighLevelClient client; @@ -202,10 +194,6 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { for (String primaryKey : getTableSchema().getPrimaryKeys()) { primaryKeyIndexes.add(getTableSchema().getColumnIndex(primaryKey)); } - - // tDfm = createSimpleDateFormat("HH:mm:ss.SSS", UTCTimeZone); - // tsDfm = createSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", UTCTimeZone); - // tstzDfm = createSimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", UTCTimeZone); } private static RestClientBuilder configureRestClientBuilder( @@ -298,125 +286,9 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } } - // /** - // * The api accepts doc in map form. - // * - // * @param row - // * @return Map from Field name to Value - // * @throws JsonProcessingException - // * @throws JsonMappingException - // */ - // private Map buildDoc(SinkRow row) - // throws JsonMappingException, JsonProcessingException { - // Map doc = new HashMap(); - // var tableSchema = getTableSchema(); - // var columnDescs = tableSchema.getColumnDescs(); - // for (int i = 0; i < row.size(); i++) { - // var type = columnDescs.get(i).getDataType().getTypeName(); - // Object col = row.get(i); - // switch (type) { - // // es client doesn't natively support java.sql.Timestamp/Time/Date - // // so we need to convert Date/Time/Timestamp type into a string as suggested - // in - // // - // https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292 - // case DATE: - // col = col.toString(); - // break; - // // construct java.sql.Time/Timestamp with milliseconds time value. - // // it will use system timezone by default, so we have to set timezone - // manually - // case TIME: - // col = tDfm.format(col); - // break; - // case TIMESTAMP: - // col = tsDfm.format(col); - // break; - // case TIMESTAMPTZ: - // col = tstzDfm.format(col); - // break; - // case JSONB: - // ObjectMapper mapper = new ObjectMapper(); - // JsonNode jsonNode = mapper.readTree((String) col); - // col = convertJsonNode(jsonNode); - // break; - // default: - // break; - // } - - // doc.put(getTableSchema().getColumnDesc(i).getName(), col); - // } - // return doc; - // } - - // private static Object convertJsonNode(JsonNode jsonNode) { - // if (jsonNode.isObject()) { - // Map resultMap = new HashMap<>(); - // jsonNode.fields() - // .forEachRemaining( - // entry -> { - // resultMap.put(entry.getKey(), convertJsonNode(entry.getValue())); - // }); - // return resultMap; - // } else if (jsonNode.isArray()) { - // List resultList = new ArrayList<>(); - // jsonNode.elements() - // .forEachRemaining( - // element -> { - // resultList.add(convertJsonNode(element)); - // }); - // return resultList; - // } else if (jsonNode.isNumber()) { - // return jsonNode.numberValue(); - // } else if (jsonNode.isTextual()) { - // return jsonNode.textValue(); - // } else if (jsonNode.isBoolean()) { - // return jsonNode.booleanValue(); - // } else if (jsonNode.isNull()) { - // return null; - // } else { - // throw new IllegalArgumentException("Unsupported JSON type"); - // } - // } - - /** - * use primary keys as id concatenated by a specific delimiter. - * - * @param row - * @return - * @throws JsonProcessingException - * @throws JsonMappingException - */ - private String buildId(SinkRow row) throws JsonMappingException, JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - Map col = - mapper.readValue((String) row.get(0), new TypeReference>() {}); - String id; - var tableSchema = getTableSchema(); - if (primaryKeyIndexes.isEmpty()) { - if (col.get(tableSchema.getColumnDesc(0).getName()) != null) { - id = col.get(tableSchema.getColumnDesc(0).getName()).toString(); - } else { - throw Status.INVALID_ARGUMENT - .withDescription("No primary key find in row") - .asRuntimeException(); - } - } else { - List keys = - primaryKeyIndexes.stream() - .map( - index -> - col.get(tableSchema.getColumnDesc(index).getName()) - .toString()) - .collect(Collectors.toList()); - id = String.join(config.getDelimiter(), keys); - } - return id; - } - private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException { - String doc = (String) row.get(0); - final String key = buildId(row); + final String key = (String) row.get(0); + String doc = (String) row.get(1); UpdateRequest updateRequest = new UpdateRequest(config.getIndex(), "doc", key).doc(doc, XContentType.JSON); @@ -426,7 +298,8 @@ private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcess } private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException { - String key = buildId(row); + final String key = (String) row.get(0); + DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "doc", key); this.requestTracker.addWriteTask(); bulkProcessor.add(deleteRequest); @@ -456,7 +329,6 @@ public void write(Iterator rows) { try { writeRow(row); } catch (Exception ex) { - ex.printStackTrace(); throw new RuntimeException(ex); } } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index e2bc94f327ab4..822e9a27e45f2 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -25,10 +25,13 @@ use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use jni::JavaVM; use prost::Message; -use risingwave_common::array::{ArrayImpl, JsonbArrayBuilder, StreamChunk}; +use risingwave_common::array::{ + ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder, +}; use risingwave_common::catalog::Schema; use risingwave_common::error::anyhow_error; -use risingwave_common::types::{DataType, JsonbVal, Scalar}; +use risingwave_common::row::Row; +use risingwave_common::types::{DataType, JsonbVal, Scalar, ToText}; use risingwave_common::util::drop_either_future; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{ @@ -153,6 +156,8 @@ impl Sink for RemoteSink { writer_param, Self::SINK_NAME, self.param.schema(), + self.param.downstream_pk.clone(), + self.param.properties.clone(), ) .await } @@ -243,9 +248,18 @@ enum StreamChunkConverter { Other, } impl StreamChunkConverter { - fn new(sink_name: &str, schema: Schema) -> Self { + fn new( + sink_name: &str, + schema: Schema, + pk_indices: Vec, + properties: HashMap, + ) -> Self { if sink_name.eq("elasticsearch") { - StreamChunkConverter::Es(EsStreamChunkConverter::new(schema)) + StreamChunkConverter::Es(EsStreamChunkConverter::new( + schema, + pk_indices, + properties.get("delimiter").cloned(), + )) } else { StreamChunkConverter::Other } @@ -260,31 +274,70 @@ impl StreamChunkConverter { } struct EsStreamChunkConverter { json_encoder: JsonEncoder, + pk_indices: Vec, + delimiter: Option, } impl EsStreamChunkConverter { - fn new(schema: Schema) -> Self { + fn new(schema: Schema, pk_indices: Vec, delimiter: Option) -> Self { let json_encoder = JsonEncoder::new_with_es(schema, None); - Self { json_encoder } + Self { + json_encoder, + pk_indices, + delimiter, + } } fn convert_chunk(&self, chunk: StreamChunk) -> Result { let mut ops = vec![]; + let mut id_string_builder = + ::new(chunk.capacity()); let mut json_builder = ::new(chunk.capacity()); for (op, row) in chunk.rows() { ops.push(op); let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?)); + risingwave_common::array::ArrayBuilder::append( + &mut id_string_builder, + Some(&self.build_id(row)?), + ); risingwave_common::array::ArrayBuilder::append( &mut json_builder, Some(json.as_scalar_ref()), ); } - let a = risingwave_common::array::ArrayBuilder::finish(json_builder); + let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder); + let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder); Ok(StreamChunk::new( ops, - vec![std::sync::Arc::new(ArrayImpl::Jsonb(a))], + vec![ + std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)), + std::sync::Arc::new(ArrayImpl::Jsonb(json_array)), + ], )) } + + fn build_id(&self, row: RowRef<'_>) -> Result { + if self.pk_indices.is_empty() { + Ok(row + .datum_at(0) + .ok_or_else(|| anyhow!("No value find in row, index is 0"))? + .to_text()) + } else { + let mut keys = vec![]; + for index in &self.pk_indices { + keys.push( + row.datum_at(*index) + .ok_or_else(|| anyhow!("No value find in row, index is {}", index))? + .to_text(), + ); + } + Ok(keys.join( + self.delimiter + .as_ref() + .ok_or_else(|| anyhow!("Please set delimiter in with option"))?, + )) + } + } } pub struct RemoteLogSinker { request_sender: BidiStreamSender, @@ -299,6 +352,8 @@ impl RemoteLogSinker { writer_param: SinkWriterParam, sink_name: &str, schema: Schema, + pk_indices: Vec, + properties: HashMap, ) -> Result { let SinkWriterStreamHandle { request_sender, @@ -312,7 +367,9 @@ impl RemoteLogSinker { request_sender, response_stream, sink_metrics, - stream_chunk_converter: StreamChunkConverter::new(sink_name, schema), + stream_chunk_converter: StreamChunkConverter::new( + sink_name, schema, pk_indices, properties, + ), }) } } From 9f5ae311aec1d6e6cb2c3354a55f07162bba98f5 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 27 Dec 2023 16:40:07 +0800 Subject: [PATCH 04/11] fix test --- .../risingwave/connector/sink/elasticsearch/EsSinkTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index af0ea7190f946..4cdb18071d735 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -61,8 +61,8 @@ public void testEsSink(ElasticsearchContainer container, String username, String getTestTableSchema()); sink.write( Iterators.forArray( - new ArraySinkRow(Op.INSERT, 1, "Alice"), - new ArraySinkRow(Op.INSERT, 2, "Bob"))); + new ArraySinkRow(Op.INSERT, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"), + new ArraySinkRow(Op.INSERT, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}"))); sink.sync(); // container is slow here, but our default flush time is 5s, // so 3s is enough for sync test From 116dff8f1143226f54875d00a08ce3a05b14f688 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 28 Dec 2023 10:47:51 +0800 Subject: [PATCH 05/11] save --- src/connector/src/sink/encoder/json.rs | 3 ++- src/connector/src/sink/encoder/mod.rs | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 4052aebbf60f0..a5b86d7fcfe1f 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -29,7 +29,7 @@ use serde_json::{json, Map, Value}; use super::{ CustomJsonType, DateHandlingMode, KafkaConnectParams, KafkaConnectParamsRef, Result, - RowEncoder, SerTo, TimestampHandlingMode, TimestamptzHandlingMode, + RowEncoder, SerTo, TimestampHandlingMode, TimestamptzHandlingMode, TimeHandlingMode, }; use crate::sink::SinkError; @@ -162,6 +162,7 @@ fn datum_to_json_object( date_handling_mode: DateHandlingMode, timestamp_handling_mode: TimestampHandlingMode, timestamptz_handling_mode: TimestamptzHandlingMode, + time_mode: TimeHandlingMode, custom_json_type: &CustomJsonType, ) -> ArrayResult { let scalar_ref = match datum { diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index e77cc1e171dbe..6973e198ce18d 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -97,6 +97,12 @@ pub enum TimestampHandlingMode { String, } +#[derive(Clone, Copy)] +pub enum TimeHandlingMode { + Milli, + String, +} + #[derive(Clone, Copy, Default)] pub enum TimestamptzHandlingMode { #[default] From 9365a08c698ca128e5640faacaf6f09486ee9f8e Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 28 Dec 2023 17:51:43 +0800 Subject: [PATCH 06/11] add time --- src/connector/src/sink/big_query.rs | 4 +- src/connector/src/sink/encoder/json.rs | 41 ++++++++++++++++--- .../src/sink/formatter/debezium_json.rs | 6 ++- src/connector/src/sink/formatter/mod.rs | 8 +++- src/connector/src/sink/kafka.rs | 4 +- src/connector/src/sink/nats.rs | 3 +- src/connector/src/sink/remote.rs | 6 +++ 7 files changed, 61 insertions(+), 11 deletions(-) diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index 450802bc52333..f540d52fba1f2 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -34,7 +34,8 @@ use with_options::WithOptions; use yup_oauth2::ServiceAccountKey; use super::encoder::{ - DateHandlingMode, JsonEncoder, RowEncoder, TimestampHandlingMode, TimestamptzHandlingMode, + DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, + TimestamptzHandlingMode, }; use super::writer::LogSinkerOf; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; @@ -314,6 +315,7 @@ impl BigQuerySinkWriter { DateHandlingMode::String, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, ), }) } diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index a5b86d7fcfe1f..ae04aafaca55d 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -29,13 +29,14 @@ use serde_json::{json, Map, Value}; use super::{ CustomJsonType, DateHandlingMode, KafkaConnectParams, KafkaConnectParamsRef, Result, - RowEncoder, SerTo, TimestampHandlingMode, TimestamptzHandlingMode, TimeHandlingMode, + RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, }; use crate::sink::SinkError; pub struct JsonEncoder { schema: Schema, col_indices: Option>, + time_handling_mode: TimeHandlingMode, date_handling_mode: DateHandlingMode, timestamp_handling_mode: TimestampHandlingMode, timestamptz_handling_mode: TimestamptzHandlingMode, @@ -50,10 +51,12 @@ impl JsonEncoder { date_handling_mode: DateHandlingMode, timestamp_handling_mode: TimestampHandlingMode, timestamptz_handling_mode: TimestamptzHandlingMode, + time_handling_mode: TimeHandlingMode, ) -> Self { Self { schema, col_indices, + time_handling_mode, date_handling_mode, timestamp_handling_mode, timestamptz_handling_mode, @@ -66,6 +69,7 @@ impl JsonEncoder { Self { schema, col_indices, + time_handling_mode: TimeHandlingMode::String, date_handling_mode: DateHandlingMode::String, timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, @@ -83,6 +87,7 @@ impl JsonEncoder { Self { schema, col_indices, + time_handling_mode: TimeHandlingMode::Milli, date_handling_mode: DateHandlingMode::String, timestamp_handling_mode, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, @@ -126,6 +131,7 @@ impl RowEncoder for JsonEncoder { self.date_handling_mode, self.timestamp_handling_mode, self.timestamptz_handling_mode, + self.time_handling_mode, &self.custom_json_type, ) .map_err(|e| SinkError::Encode(e.to_string()))?; @@ -162,7 +168,7 @@ fn datum_to_json_object( date_handling_mode: DateHandlingMode, timestamp_handling_mode: TimestampHandlingMode, timestamptz_handling_mode: TimestamptzHandlingMode, - time_mode: TimeHandlingMode, + time_handling_mode: TimeHandlingMode, custom_json_type: &CustomJsonType, ) -> ArrayResult { let scalar_ref = match datum { @@ -231,10 +237,16 @@ fn datum_to_json_object( TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()), TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()), }, - (DataType::Time, ScalarRefImpl::Time(v)) => { - // todo: just ignore the nanos part to avoid leap second complex - json!(v.0.num_seconds_from_midnight() as i64 * 1000) - } + (DataType::Time, ScalarRefImpl::Time(v)) => match time_handling_mode { + TimeHandlingMode::Milli => { + // todo: just ignore the nanos part to avoid leap second complex + json!(v.0.num_seconds_from_midnight() as i64 * 1000) + } + TimeHandlingMode::String => { + let a = v.0.format("%H:%M:%S%.6f").to_string(); + json!(a) + } + }, (DataType::Date, ScalarRefImpl::Date(v)) => match date_handling_mode { DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()), DateHandlingMode::FromEpoch => { @@ -272,6 +284,7 @@ fn datum_to_json_object( date_handling_mode, timestamp_handling_mode, timestamptz_handling_mode, + time_handling_mode, custom_json_type, )?; vec.push(value); @@ -293,6 +306,7 @@ fn datum_to_json_object( date_handling_mode, timestamp_handling_mode, timestamptz_handling_mode, + time_handling_mode, custom_json_type, )?; map.insert(sub_field.name.clone(), value); @@ -313,6 +327,7 @@ fn datum_to_json_object( date_handling_mode, timestamp_handling_mode, timestamptz_handling_mode, + time_handling_mode, custom_json_type, )?; map.insert(sub_field.name.clone(), value); @@ -431,6 +446,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -445,6 +461,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -459,6 +476,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -478,6 +496,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -493,6 +512,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcWithoutSuffix, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -510,6 +530,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -527,6 +548,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -545,6 +567,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -562,6 +585,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -579,6 +603,7 @@ mod tests { DateHandlingMode::String, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::Doris(map), ) .unwrap(); @@ -593,6 +618,7 @@ mod tests { DateHandlingMode::FromCe, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -607,6 +633,7 @@ mod tests { DateHandlingMode::FromEpoch, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::None, ) .unwrap(); @@ -621,6 +648,7 @@ mod tests { DateHandlingMode::String, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::Doris(HashMap::default()), ) .unwrap(); @@ -645,6 +673,7 @@ mod tests { DateHandlingMode::String, TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, &CustomJsonType::Doris(HashMap::default()), ) .unwrap(); diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index f3e877c44829e..03e6de67b6419 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -21,7 +21,8 @@ use tracing::warn; use super::{Result, SinkFormatter, StreamChunk}; use crate::sink::encoder::{ - DateHandlingMode, JsonEncoder, RowEncoder, TimestampHandlingMode, TimestamptzHandlingMode, + DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, + TimestamptzHandlingMode, }; use crate::tri; @@ -67,6 +68,7 @@ impl DebeziumJsonFormatter { DateHandlingMode::FromEpoch, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, ); let val_encoder = JsonEncoder::new( schema.clone(), @@ -74,6 +76,7 @@ impl DebeziumJsonFormatter { DateHandlingMode::FromEpoch, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, ); Self { schema, @@ -393,6 +396,7 @@ mod tests { DateHandlingMode::FromEpoch, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, ); let json_chunk = chunk_to_json(chunk, &encoder).unwrap(); let schema_json = schema_to_json(&schema, "test_db", "test_table"); diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 1abf451fb061e..a9a7ca59c4dfc 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -28,7 +28,9 @@ pub use upsert::UpsertFormatter; use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; use super::encoder::template::TemplateEncoder; -use super::encoder::{DateHandlingMode, KafkaConnectParams, TimestamptzHandlingMode}; +use super::encoder::{ + DateHandlingMode, KafkaConnectParams, TimeHandlingMode, TimestamptzHandlingMode, +}; use super::redis::{KEY_FORMAT, VALUE_FORMAT}; use crate::sink::encoder::{ AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, TimestampHandlingMode, @@ -102,6 +104,7 @@ impl SinkFormatterImpl { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, timestamptz_mode, + TimeHandlingMode::Milli, ) }); @@ -113,6 +116,7 @@ impl SinkFormatterImpl { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, timestamptz_mode, + TimeHandlingMode::Milli, ); let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); Ok(SinkFormatterImpl::AppendOnlyJson(formatter)) @@ -175,6 +179,7 @@ impl SinkFormatterImpl { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, timestamptz_mode, + TimeHandlingMode::Milli, ); let mut val_encoder = JsonEncoder::new( schema, @@ -182,6 +187,7 @@ impl SinkFormatterImpl { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, timestamptz_mode, + TimeHandlingMode::Milli, ); if let Some(s) = format_desc.options.get("schemas.enable") { diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 76bde538f27b0..aed8ef5b05d14 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -560,7 +560,8 @@ mod test { use super::*; use crate::sink::encoder::{ - DateHandlingMode, JsonEncoder, TimestampHandlingMode, TimestamptzHandlingMode, + DateHandlingMode, JsonEncoder, TimeHandlingMode, TimestampHandlingMode, + TimestamptzHandlingMode, }; use crate::sink::formatter::AppendOnlyFormatter; @@ -732,6 +733,7 @@ mod test { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, + TimeHandlingMode::Milli, ), )), ) diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index d1dd0f6eb9ea2..8ed2376c5e2cd 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -25,7 +25,7 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; use with_options::WithOptions; -use super::encoder::{DateHandlingMode, TimestamptzHandlingMode}; +use super::encoder::{DateHandlingMode, TimeHandlingMode, TimestamptzHandlingMode}; use super::utils::chunk_to_json; use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::common::NatsCommon; @@ -146,6 +146,7 @@ impl NatsSinkWriter { DateHandlingMode::FromCe, TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcWithoutSuffix, + TimeHandlingMode::Milli, ), }) } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 822e9a27e45f2..7505be7784622 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -322,6 +322,12 @@ impl EsStreamChunkConverter { .datum_at(0) .ok_or_else(|| anyhow!("No value find in row, index is 0"))? .to_text()) + } else if self.pk_indices.len() == 1 { + let index = self.pk_indices.get(0).unwrap(); + Ok(row + .datum_at(*index) + .ok_or_else(|| anyhow!("No value find in row, index is {}", index))? + .to_text()) } else { let mut keys = vec![]; for index in &self.pk_indices { From 12a164d792c50509ea4ac531a7be13b7628f3f99 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 2 Jan 2024 15:21:14 +0800 Subject: [PATCH 07/11] fix --- .../connector/SinkWriterStreamObserver.java | 39 ++-- .../java/com/risingwave/connector/EsSink.java | 8 - proto/connector_service.proto | 19 +- src/compute/src/server.rs | 9 +- src/connector/src/lib.rs | 13 +- src/connector/src/sink/encoder/json.rs | 6 +- src/connector/src/sink/remote.rs | 185 ++++++++++-------- src/rpc_client/src/connector_client.rs | 3 +- src/stream/src/task/env.rs | 8 +- 9 files changed, 162 insertions(+), 128 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index 923f2f3be387b..0c330225d99b7 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -23,10 +23,7 @@ import com.risingwave.metrics.ConnectorNodeMetrics; import com.risingwave.metrics.MonitoredRowIterable; import com.risingwave.proto.ConnectorServiceProto; -import com.risingwave.proto.Data; -import com.risingwave.proto.Data.DataType.TypeName; import io.grpc.stub.StreamObserver; -import java.util.List; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,7 +101,7 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) { .asRuntimeException(); } sinkId = sinkTask.getStart().getSinkParam().getSinkId(); - bindSink(sinkTask.getStart().getSinkParam(), sinkTask.getStart().getFormat()); + bindSink(sinkTask.getStart()); currentEpoch = null; currentBatchId = null; epochStarted = false; @@ -204,35 +201,35 @@ private void cleanup() { } private void bindSink( - ConnectorServiceProto.SinkParam sinkParam, - ConnectorServiceProto.SinkPayloadFormat format) { + com.risingwave.proto.ConnectorServiceProto.SinkWriterStreamRequest.StartSink + startSink) { + var sinkParam = startSink.getSinkParam(); + var format = startSink.getSinkPayloadFormatCase(); tableSchema = TableSchema.fromProto(sinkParam.getTableSchema()); String connectorName = getConnectorName(sinkParam); SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName); sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap()); - if (connectorName.equals("elasticsearch")) { - tableSchema = - new TableSchema( - List.of("id", "json_result"), - List.of( - Data.DataType.newBuilder() - .setTypeName(TypeName.VARCHAR) - .build(), - Data.DataType.newBuilder().setTypeName(TypeName.JSONB).build()), - List.of()); - } + switch (format) { - case FORMAT_UNSPECIFIED: - case UNRECOGNIZED: + case UNSPECIFIED_FORMAT: + case SINKPAYLOADFORMAT_NOT_SET: throw INVALID_ARGUMENT .withDescription("should specify payload format in request") .asRuntimeException(); - case JSON: + case JSON_FORMAT: deserializer = new JsonDeserializer(tableSchema); break; - case STREAM_CHUNK: + case STREAM_CHUNK_FORMAT: deserializer = new StreamChunkDeserializer(tableSchema); break; + case STREAM_CHUNK_WITH_SCHEMA_FORMAT: + deserializer = + new StreamChunkDeserializer( + TableSchema.fromProto( + startSink + .getStreamChunkWithSchemaFormat() + .getTableSchema())); + break; } this.connectorName = connectorName.toUpperCase(); ConnectorNodeMetrics.incActiveSinkConnections(connectorName, "node1"); diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index efe2796c42577..447eefe8f1240 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -72,9 +72,6 @@ public class EsSink extends SinkWriterBase { // Used to handle the return message of ES and throw errors private final RequestTracker requestTracker; - // For bulk listener - private final List primaryKeyIndexes; - class RequestTracker { // Used to save the return results of es asynchronous writes. The capacity is Integer.Max private final BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); @@ -189,11 +186,6 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) { throw Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException(); } this.bulkProcessor = createBulkProcessor(this.requestTracker); - - primaryKeyIndexes = new ArrayList(); - for (String primaryKey : getTableSchema().getPrimaryKeys()) { - primaryKeyIndexes.add(getTableSchema().getColumnIndex(primaryKey)); - } } private static RestClientBuilder configureRestClientBuilder( diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 7bc46cab9c187..b9849cf531b4a 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -30,16 +30,25 @@ message SinkParam { optional uint32 target_table = 8; } -enum SinkPayloadFormat { - FORMAT_UNSPECIFIED = 0; - JSON = 1; - STREAM_CHUNK = 2; +message JsonFormat{} + +message StreamChunkFormat{} + +message UnspecifiedFormat{} + +message StreamChunkWithSchemaFormat{ + TableSchema table_schema = 1; } message SinkWriterStreamRequest { message StartSink { SinkParam sink_param = 1; - SinkPayloadFormat format = 2; + oneof sink_payload_format { + JsonFormat json_format = 11; + StreamChunkFormat stream_chunk_format = 12; + UnspecifiedFormat unspecified_format = 13; + StreamChunkWithSchemaFormat stream_chunk_with_schema_format = 14; + } } message WriteBatch { diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 76c7f251aa334..430b6a71a850c 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -40,7 +40,8 @@ use risingwave_common_service::tracing::TracingExtractLayer; use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS; use risingwave_pb::common::WorkerType; use risingwave_pb::compute::config_service_server::ConfigServiceServer; -use risingwave_pb::connector_service::SinkPayloadFormat; +use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat; +use risingwave_pb::connector_service::{JsonFormat, StreamChunkFormat}; use risingwave_pb::health::health_server::HealthServer; use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer; @@ -332,8 +333,10 @@ pub async fn compute_node_serve( let connector_params = risingwave_connector::ConnectorParams { connector_client, sink_payload_format: match opts.connector_rpc_sink_payload_format.as_deref() { - None | Some("stream_chunk") => SinkPayloadFormat::StreamChunk, - Some("json") => SinkPayloadFormat::Json, + None | Some("stream_chunk") => { + SinkPayloadFormat::StreamChunkFormat(StreamChunkFormat {}) + } + Some("json") => SinkPayloadFormat::JsonFormat(JsonFormat {}), _ => { unreachable!( "invalid sink payload format: {:?}. Should be either json or stream_chunk", diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 85e49a5599541..e33000799a266 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -38,7 +38,8 @@ use std::time::Duration; use duration_str::parse_std; -use risingwave_pb::connector_service::SinkPayloadFormat; +use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat; +use risingwave_pb::connector_service::UnspecifiedFormat; use risingwave_rpc_client::ConnectorClient; use serde::de; @@ -58,11 +59,19 @@ pub use paste::paste; #[cfg(test)] mod with_options_test; -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct ConnectorParams { pub connector_client: Option, pub sink_payload_format: SinkPayloadFormat, } +impl Default for ConnectorParams { + fn default() -> Self { + Self { + connector_client: Default::default(), + sink_payload_format: SinkPayloadFormat::UnspecifiedFormat(UnspecifiedFormat {}), + } + } +} impl ConnectorParams { pub fn new( diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index ae04aafaca55d..45900fa285300 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -219,7 +219,7 @@ fn datum_to_json_object( } json!(v_string) } - _ => { + CustomJsonType::Es | CustomJsonType::None => { json!(v.to_text()) } }, @@ -271,7 +271,7 @@ fn datum_to_json_object( } (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type { CustomJsonType::Es => JsonbVal::from(jsonb_ref).take(), - _ => json!(jsonb_ref.to_string()), + CustomJsonType::Doris(_) | CustomJsonType::None => json!(jsonb_ref.to_string()), }, (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { let elems = list_ref.iter(); @@ -315,7 +315,7 @@ fn datum_to_json_object( ArrayError::internal(format!("Json to string err{:?}", err)) })?) } - _ => { + CustomJsonType::Es | CustomJsonType::None => { let mut map = Map::with_capacity(st.len()); for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( st.iter() diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 7505be7784622..91144dcc598f0 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -28,7 +28,7 @@ use prost::Message; use risingwave_common::array::{ ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder, }; -use risingwave_common::catalog::Schema; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; use risingwave_common::error::anyhow_error; use risingwave_common::row::Row; use risingwave_common::types::{DataType, JsonbVal, Scalar, ToText}; @@ -38,13 +38,15 @@ use risingwave_jni_core::{ call_static_method, gen_class_name, JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, }; use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator; +use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat; use risingwave_pb::connector_service::sink_writer_stream_request::{ Request as SinkRequest, StartSink, }; use risingwave_pb::connector_service::{ sink_coordinator_stream_request, sink_coordinator_stream_response, sink_writer_stream_response, - SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, SinkPayloadFormat, - SinkWriterStreamRequest, SinkWriterStreamResponse, ValidateSinkRequest, ValidateSinkResponse, + SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, + SinkWriterStreamRequest, SinkWriterStreamResponse, StreamChunkFormat, + StreamChunkWithSchemaFormat, TableSchema, ValidateSinkRequest, ValidateSinkResponse, }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::{ @@ -69,6 +71,8 @@ use crate::sink::{ }; use crate::ConnectorParams; +const ES_OPTION_DELIMITER: &str = "delimiter"; + macro_rules! def_remote_sink { () => { def_remote_sink! { @@ -151,28 +155,19 @@ impl Sink for RemoteSink { } async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - RemoteLogSinker::new( - self.param.clone(), - writer_param, - Self::SINK_NAME, - self.param.schema(), - self.param.downstream_pk.clone(), - self.param.properties.clone(), - ) - .await + RemoteLogSinker::new(self.param.clone(), writer_param, Self::SINK_NAME).await } async fn validate(&self) -> Result<()> { - validate_remote_sink(&self.param).await + validate_remote_sink(&self.param, Self::SINK_NAME).await } } -async fn validate_remote_sink(param: &SinkParam) -> Result<()> { +async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> Result<()> { // FIXME: support struct and array in stream sink param.columns.iter().map(|col| { - if matches!( - col.data_type, - DataType::Int16 + match col.data_type { + DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 @@ -186,19 +181,23 @@ async fn validate_remote_sink(param: &SinkParam) -> Result<()> { | DataType::Time | DataType::Interval | DataType::Jsonb - | DataType::Bytea - | DataType::List(_) - | DataType::Struct(_) - ) { - Ok(()) - } else { - Err(SinkError::Remote(anyhow_error!( - "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, List, Bytea, Struct and Varchar, got {:?}: {:?}", - col.name, - col.data_type, - ))) - } - }).try_collect()?; + | DataType::Bytea => Ok(()), + DataType::List(_) | DataType::Struct(_) => { + if sink_name.eq(ElasticSearchSink::SINK_NAME){ + Ok(()) + }else{ + Err(SinkError::Remote(anyhow_error!( + "Only Es sink support list and struct, got {:?}: {:?}", + col.name, + col.data_type, + ))) + } + }, + DataType::Serial | DataType::Int256 => Err(SinkError::Remote(anyhow_error!( + "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, and Varchar, (Es sink support Struct and List) got {:?}: {:?}", + col.name, + col.data_type, + )))}}).try_collect()?; let jvm = JVM.get_or_init()?; let sink_param = param.to_proto(); @@ -251,40 +250,71 @@ impl StreamChunkConverter { fn new( sink_name: &str, schema: Schema, - pk_indices: Vec, - properties: HashMap, - ) -> Self { - if sink_name.eq("elasticsearch") { - StreamChunkConverter::Es(EsStreamChunkConverter::new( + pk_indices: &Vec, + properties: &HashMap, + ) -> Result { + if sink_name.eq(ElasticSearchSink::SINK_NAME) { + Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new( schema, - pk_indices, - properties.get("delimiter").cloned(), - )) + pk_indices.clone(), + properties.get(ES_OPTION_DELIMITER).cloned(), + )?)) } else { - StreamChunkConverter::Other + Ok(StreamChunkConverter::Other) } } fn convert_chunk(&self, chunk: StreamChunk) -> Result { match self { StreamChunkConverter::Es(es) => es.convert_chunk(chunk), - _ => Ok(chunk), + StreamChunkConverter::Other => Ok(chunk), } } } struct EsStreamChunkConverter { json_encoder: JsonEncoder, - pk_indices: Vec, - delimiter: Option, + fn_build_id: Box) -> Result + Send>, } impl EsStreamChunkConverter { - fn new(schema: Schema, pk_indices: Vec, delimiter: Option) -> Self { + fn new(schema: Schema, pk_indices: Vec, delimiter: Option) -> Result { + let fn_build_id: Box) -> Result + Send> = if pk_indices.is_empty() + { + Box::new(|row: RowRef<'_>| { + Ok(row + .datum_at(0) + .ok_or_else(|| anyhow!("No value find in row, index is 0"))? + .to_text()) + }) + } else if pk_indices.len() == 1 { + let index = *pk_indices.get(0).unwrap(); + Box::new(move |row: RowRef<'_>| { + Ok(row + .datum_at(index) + .ok_or_else(|| anyhow!("No value find in row, index is 0"))? + .to_text()) + }) + } else { + let delimiter = delimiter + .as_ref() + .ok_or_else(|| anyhow!("Please set delimiter in with option"))? + .clone(); + Box::new(move |row: RowRef<'_>| { + let mut keys = vec![]; + for index in &pk_indices { + keys.push( + row.datum_at(*index) + .ok_or_else(|| anyhow!("No value find in row, index is {}", index))? + .to_text(), + ); + } + Ok(keys.join(&delimiter)) + }) + }; let json_encoder = JsonEncoder::new_with_es(schema, None); - Self { + Ok(Self { json_encoder, - pk_indices, - delimiter, - } + fn_build_id, + }) } fn convert_chunk(&self, chunk: StreamChunk) -> Result { @@ -317,32 +347,7 @@ impl EsStreamChunkConverter { } fn build_id(&self, row: RowRef<'_>) -> Result { - if self.pk_indices.is_empty() { - Ok(row - .datum_at(0) - .ok_or_else(|| anyhow!("No value find in row, index is 0"))? - .to_text()) - } else if self.pk_indices.len() == 1 { - let index = self.pk_indices.get(0).unwrap(); - Ok(row - .datum_at(*index) - .ok_or_else(|| anyhow!("No value find in row, index is {}", index))? - .to_text()) - } else { - let mut keys = vec![]; - for index in &self.pk_indices { - keys.push( - row.datum_at(*index) - .ok_or_else(|| anyhow!("No value find in row, index is {}", index))? - .to_text(), - ); - } - Ok(keys.join( - self.delimiter - .as_ref() - .ok_or_else(|| anyhow!("Please set delimiter in with option"))?, - )) - } + (self.fn_build_id)(row) } } pub struct RemoteLogSinker { @@ -357,15 +362,26 @@ impl RemoteLogSinker { sink_param: SinkParam, writer_param: SinkWriterParam, sink_name: &str, - schema: Schema, - pk_indices: Vec, - properties: HashMap, ) -> Result { + let sink_payload_format = if sink_name.eq(ElasticSearchSink::SINK_NAME) { + let columns = vec![ + ColumnDesc::unnamed(ColumnId::from(3), DataType::Varchar).to_protobuf(), + ColumnDesc::unnamed(ColumnId::from(4), DataType::Jsonb).to_protobuf(), + ]; + SinkPayloadFormat::StreamChunkWithSchemaFormat(StreamChunkWithSchemaFormat { + table_schema: Some(TableSchema { + columns, + pk_indices: vec![], + }), + }) + } else { + SinkPayloadFormat::StreamChunkFormat(StreamChunkFormat {}) + }; let SinkWriterStreamHandle { request_sender, response_stream, } = EmbeddedConnectorClient::new()? - .start_sink_writer_stream(sink_param, SinkPayloadFormat::StreamChunk) + .start_sink_writer_stream(&sink_param, sink_payload_format) .await?; let sink_metrics = writer_param.sink_metrics; @@ -374,8 +390,11 @@ impl RemoteLogSinker { response_stream, sink_metrics, stream_chunk_converter: StreamChunkConverter::new( - sink_name, schema, pk_indices, properties, - ), + sink_name, + sink_param.schema(), + &sink_param.downstream_pk, + &sink_param.properties, + )?, }) } } @@ -586,7 +605,7 @@ impl Sink for CoordinatedRemoteSink { const SINK_NAME: &'static str = R::SINK_NAME; async fn validate(&self) -> Result<()> { - validate_remote_sink(&self.param).await + validate_remote_sink(&self.param, Self::SINK_NAME).await } async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { @@ -633,7 +652,7 @@ impl CoordinatedRemoteSinkWriter { sink_metrics: SinkMetrics, ) -> Result { let stream_handle = EmbeddedConnectorClient::new()? - .start_sink_writer_stream(param.clone(), connector_params.sink_payload_format) + .start_sink_writer_stream(¶m, connector_params.sink_payload_format) .await?; Ok(Self { @@ -767,14 +786,14 @@ impl EmbeddedConnectorClient { async fn start_sink_writer_stream( &self, - sink_param: SinkParam, + sink_param: &SinkParam, sink_payload_format: SinkPayloadFormat, ) -> Result> { let (handle, first_rsp) = SinkWriterStreamHandle::initialize( SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { sink_param: Some(sink_param.to_proto()), - format: sink_payload_format as i32, + sink_payload_format: Some(sink_payload_format), })), }, |rx| async move { diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index 795581f71ce47..1aa64fc4006e5 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -24,6 +24,7 @@ use risingwave_pb::connector_service::connector_service_client::ConnectorService use risingwave_pb::connector_service::sink_coordinator_stream_request::{ CommitMetadata, StartCoordinator, }; +use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat; use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload; use risingwave_pb::connector_service::sink_writer_stream_request::{ Barrier, Request as SinkRequest, StartSink, WriteBatch, @@ -273,7 +274,7 @@ impl ConnectorClient { SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { sink_param: Some(sink_param), - format: sink_payload_format as i32, + sink_payload_format: Some(sink_payload_format), })), }, |rx| async move { diff --git a/src/stream/src/task/env.rs b/src/stream/src/task/env.rs index 75b074e477f37..302be6a0b386a 100644 --- a/src/stream/src/task/env.rs +++ b/src/stream/src/task/env.rs @@ -21,7 +21,7 @@ use risingwave_common::util::addr::HostAddr; use risingwave_connector::source::monitor::SourceMetrics; use risingwave_connector::ConnectorParams; #[cfg(test)] -use risingwave_pb::connector_service::SinkPayloadFormat; +use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat; use risingwave_rpc_client::MetaClient; use risingwave_source::dml_manager::DmlManagerRef; use risingwave_storage::StateStoreImpl; @@ -94,11 +94,15 @@ impl StreamEnvironment { #[cfg(test)] pub fn for_test() -> Self { use risingwave_common::system_param::local_manager::LocalSystemParamsManager; + use risingwave_pb::connector_service::PbJsonFormat; use risingwave_source::dml_manager::DmlManager; use risingwave_storage::monitor::MonitoredStorageMetrics; StreamEnvironment { server_addr: "127.0.0.1:5688".parse().unwrap(), - connector_params: ConnectorParams::new(None, SinkPayloadFormat::Json), + connector_params: ConnectorParams::new( + None, + SinkPayloadFormat::JsonFormat(PbJsonFormat {}), + ), config: Arc::new(StreamingConfig::default()), worker_id: WorkerNodeId::default(), state_store: StateStoreImpl::shared_in_memory_store(Arc::new( From f24254c78bdd985ee7ac71755c056d8bf1e4ee17 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 2 Jan 2024 18:07:00 +0800 Subject: [PATCH 08/11] fix ci --- .../elasticsearch/elasticsearch_sink.result | 2 +- .../sink/elasticsearch/elasticsearch_sink.slt | 17 +++---- .../elasticsearch_with_pk_sink.result | 2 +- .../connector/SinkWriterStreamObserver.java | 26 +++-------- proto/connector_service.proto | 20 +++----- src/compute/src/server.rs | 9 ++-- src/connector/src/lib.rs | 13 +----- src/connector/src/sink/remote.rs | 46 ++++++++++--------- src/rpc_client/src/connector_client.rs | 4 +- src/stream/src/task/env.rs | 9 +--- 10 files changed, 57 insertions(+), 91 deletions(-) diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.result b/e2e_test/sink/elasticsearch/elasticsearch_sink.result index 7f832258c9595..390b8e5415786 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.result +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.result @@ -1 +1 @@ -{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"_doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"_doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test","_type":"_doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test","_type":"_doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}} +{"took":34,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt index 049d111713d6f..70b7e911c0f17 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.slt +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.slt @@ -6,7 +6,8 @@ CREATE TABLE t7 ( d date, t time, ts timestamp, - tz timestamptz + tz timestamptz, + st struct, ); statement ok @@ -31,18 +32,18 @@ CREATE SINK s8 from t7 WITH ( statement ok INSERT INTO t7 VALUES - (1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'), - (2, 2, '2-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'), - (3, 2, '3-2', '1970-01-01', '00:00:00.123456', '1970-01-01 00:00:00.123456', '1970-01-01 00:00:00.123456Z'), - (5, 2, '5-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'), - (8, 2, '8-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'), - (13, 2, '13-2', '1970-01-01', '20:00:00.123456', '1970-01-01 20:00:00.123456', '1970-01-01 20:00:00.123456Z'); + (1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(1,1)), + (2, 2, '2-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(2,2)), + (3, 2, '3-2', '1970-01-01', '00:00:00.123456', '1970-01-01 00:00:00.123456', '1970-01-01 00:00:00.123456Z',(3,3)), + (5, 2, '5-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(5,5)), + (8, 2, '8-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z',(8,8)), + (13, 2, '13-2', '1970-01-01', '20:00:00.123456', '1970-01-01 20:00:00.123456', '1970-01-01 20:00:00.123456Z',(13,13)); statement ok FLUSH; statement ok -INSERT INTO t7 VALUES (1, 50, '1-50', '2000-01-01', '00:00:00.123456', '2000-01-01 00:00:00.123456', '2000-01-01 00:00:00.123456Z'); +INSERT INTO t7 VALUES (1, 50, '1-50', '2000-01-01', '00:00:00.123456', '2000-01-01 00:00:00.123456', '2000-01-01 00:00:00.123456Z',(1,1)); statement ok FLUSH; diff --git a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result index 12ac1c8370376..46fc2052ef14f 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result +++ b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result @@ -1 +1 @@ -{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}} +{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test1","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test1","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test1","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test1","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test1","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index b148992019e16..7997172b9f1f3 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -200,36 +200,24 @@ private void cleanup() { ConnectorNodeMetrics.decActiveSinkConnections(connectorName, "node1"); } - private void bindSink( - com.risingwave.proto.ConnectorServiceProto.SinkWriterStreamRequest.StartSink - startSink) { + private void bindSink(ConnectorServiceProto.SinkWriterStreamRequest.StartSink startSink) { var sinkParam = startSink.getSinkParam(); - var format = startSink.getSinkPayloadFormatCase(); - tableSchema = TableSchema.fromProto(sinkParam.getTableSchema()); + tableSchema = TableSchema.fromProto(startSink.getTableSchema()); String connectorName = getConnectorName(sinkParam); SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName); sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap()); - - switch (format) { - case UNSPECIFIED_FORMAT: - case SINKPAYLOADFORMAT_NOT_SET: + switch (startSink.getFormat()) { + case FORMAT_UNSPECIFIED: + case UNRECOGNIZED: throw INVALID_ARGUMENT .withDescription("should specify payload format in request") .asRuntimeException(); - case JSON_FORMAT: + case JSON: deserializer = new JsonDeserializer(tableSchema); break; - case STREAM_CHUNK_FORMAT: + case STREAM_CHUNK: deserializer = new StreamChunkDeserializer(tableSchema); break; - case STREAM_CHUNK_WITH_SCHEMA_FORMAT: - deserializer = - new StreamChunkDeserializer( - TableSchema.fromProto( - startSink - .getStreamChunkWithSchemaFormat() - .getTableSchema())); - break; } this.connectorName = connectorName.toUpperCase(); ConnectorNodeMetrics.incActiveSinkConnections(connectorName, "node1"); diff --git a/proto/connector_service.proto b/proto/connector_service.proto index b9849cf531b4a..585e3530b552b 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -30,25 +30,17 @@ message SinkParam { optional uint32 target_table = 8; } -message JsonFormat{} - -message StreamChunkFormat{} - -message UnspecifiedFormat{} - -message StreamChunkWithSchemaFormat{ - TableSchema table_schema = 1; +enum SinkPayloadFormat { + FORMAT_UNSPECIFIED = 0; + JSON = 1; + STREAM_CHUNK = 2; } message SinkWriterStreamRequest { message StartSink { SinkParam sink_param = 1; - oneof sink_payload_format { - JsonFormat json_format = 11; - StreamChunkFormat stream_chunk_format = 12; - UnspecifiedFormat unspecified_format = 13; - StreamChunkWithSchemaFormat stream_chunk_with_schema_format = 14; - } + SinkPayloadFormat format = 2; + TableSchema table_schema = 3; } message WriteBatch { diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index dbaec708225bf..5f500f0ad919e 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -40,8 +40,7 @@ use risingwave_common_service::tracing::TracingExtractLayer; use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS; use risingwave_pb::common::WorkerType; use risingwave_pb::compute::config_service_server::ConfigServiceServer; -use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat; -use risingwave_pb::connector_service::{JsonFormat, StreamChunkFormat}; +use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_pb::health::health_server::HealthServer; use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer; @@ -333,10 +332,8 @@ pub async fn compute_node_serve( let connector_params = risingwave_connector::ConnectorParams { connector_client, sink_payload_format: match opts.connector_rpc_sink_payload_format.as_deref() { - None | Some("stream_chunk") => { - SinkPayloadFormat::StreamChunkFormat(StreamChunkFormat {}) - } - Some("json") => SinkPayloadFormat::JsonFormat(JsonFormat {}), + None | Some("stream_chunk") => SinkPayloadFormat::StreamChunk, + Some("json") => SinkPayloadFormat::Json, _ => { unreachable!( "invalid sink payload format: {:?}. Should be either json or stream_chunk", diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 85950fada9d39..ae478b9c100ee 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -38,8 +38,7 @@ use std::time::Duration; use duration_str::parse_std; -use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat; -use risingwave_pb::connector_service::UnspecifiedFormat; +use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_rpc_client::ConnectorClient; use serde::de; @@ -61,19 +60,11 @@ mod with_options; #[cfg(test)] mod with_options_test; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct ConnectorParams { pub connector_client: Option, pub sink_payload_format: SinkPayloadFormat, } -impl Default for ConnectorParams { - fn default() -> Self { - Self { - connector_client: Default::default(), - sink_payload_format: SinkPayloadFormat::UnspecifiedFormat(UnspecifiedFormat {}), - } - } -} impl ConnectorParams { pub fn new( diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 362601232d464..7857f6933c807 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -38,15 +38,14 @@ use risingwave_jni_core::{ call_static_method, gen_class_name, JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, }; use risingwave_pb::connector_service::sink_coordinator_stream_request::StartCoordinator; -use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat; use risingwave_pb::connector_service::sink_writer_stream_request::{ Request as SinkRequest, StartSink, }; use risingwave_pb::connector_service::{ sink_coordinator_stream_request, sink_coordinator_stream_response, sink_writer_stream_response, - SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, - SinkWriterStreamRequest, SinkWriterStreamResponse, StreamChunkFormat, - StreamChunkWithSchemaFormat, TableSchema, ValidateSinkRequest, ValidateSinkResponse, + SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, SinkPayloadFormat, + SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest, + ValidateSinkResponse, }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::{ @@ -363,25 +362,11 @@ impl RemoteLogSinker { writer_param: SinkWriterParam, sink_name: &str, ) -> Result { - let sink_payload_format = if sink_name.eq(ElasticSearchSink::SINK_NAME) { - let columns = vec![ - ColumnDesc::unnamed(ColumnId::from(3), DataType::Varchar).to_protobuf(), - ColumnDesc::unnamed(ColumnId::from(4), DataType::Jsonb).to_protobuf(), - ]; - SinkPayloadFormat::StreamChunkWithSchemaFormat(StreamChunkWithSchemaFormat { - table_schema: Some(TableSchema { - columns, - pk_indices: vec![], - }), - }) - } else { - SinkPayloadFormat::StreamChunkFormat(StreamChunkFormat {}) - }; let SinkWriterStreamHandle { request_sender, response_stream, } = EmbeddedConnectorClient::new()? - .start_sink_writer_stream(&sink_param, sink_payload_format) + .start_sink_writer_stream(&sink_param, SinkPayloadFormat::StreamChunk, sink_name) .await?; let sink_metrics = writer_param.sink_metrics; @@ -625,6 +610,7 @@ impl Sink for CoordinatedRemoteSink { self.param.clone(), writer_param.connector_params, writer_param.sink_metrics.clone(), + Self::SINK_NAME, ) .await?, ) @@ -650,9 +636,10 @@ impl CoordinatedRemoteSinkWriter { param: SinkParam, connector_params: ConnectorParams, sink_metrics: SinkMetrics, + sink_name: &str, ) -> Result { let stream_handle = EmbeddedConnectorClient::new()? - .start_sink_writer_stream(¶m, connector_params.sink_payload_format) + .start_sink_writer_stream(¶m, connector_params.sink_payload_format, sink_name) .await?; Ok(Self { @@ -788,12 +775,27 @@ impl EmbeddedConnectorClient { &self, sink_param: &SinkParam, sink_payload_format: SinkPayloadFormat, + sink_name: &str, ) -> Result> { + let sink_proto = sink_param.to_proto(); + let table_schema = if sink_name.eq(ElasticSearchSink::SINK_NAME) { + let columns = vec![ + ColumnDesc::unnamed(ColumnId::from(3), DataType::Varchar).to_protobuf(), + ColumnDesc::unnamed(ColumnId::from(4), DataType::Jsonb).to_protobuf(), + ]; + Some(TableSchema { + columns, + pk_indices: vec![], + }) + } else { + sink_proto.table_schema.clone() + }; let (handle, first_rsp) = SinkWriterStreamHandle::initialize( SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { - sink_param: Some(sink_param.to_proto()), - sink_payload_format: Some(sink_payload_format), + sink_param: Some(sink_proto), + format: sink_payload_format as i32, + table_schema, })), }, |rx| async move { diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index 73441cbc6516e..0175bdf66d9a1 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -24,7 +24,6 @@ use risingwave_pb::connector_service::connector_service_client::ConnectorService use risingwave_pb::connector_service::sink_coordinator_stream_request::{ CommitMetadata, StartCoordinator, }; -use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat; use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::Payload; use risingwave_pb::connector_service::sink_writer_stream_request::{ Barrier, Request as SinkRequest, StartSink, WriteBatch, @@ -273,8 +272,9 @@ impl ConnectorClient { let (handle, first_rsp) = SinkWriterStreamHandle::initialize( SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { + table_schema: sink_param.table_schema.clone(), sink_param: Some(sink_param), - sink_payload_format: Some(sink_payload_format), + format: sink_payload_format as i32, })), }, |rx| async move { diff --git a/src/stream/src/task/env.rs b/src/stream/src/task/env.rs index a4914c7216dca..0083e8e02ff28 100644 --- a/src/stream/src/task/env.rs +++ b/src/stream/src/task/env.rs @@ -20,8 +20,6 @@ use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; use risingwave_common::util::addr::HostAddr; use risingwave_connector::source::monitor::SourceMetrics; use risingwave_connector::ConnectorParams; -#[cfg(test)] -use risingwave_pb::connector_service::sink_writer_stream_request::start_sink::SinkPayloadFormat; use risingwave_rpc_client::MetaClient; use risingwave_source::dml_manager::DmlManagerRef; use risingwave_storage::StateStoreImpl; @@ -94,15 +92,12 @@ impl StreamEnvironment { #[cfg(test)] pub fn for_test() -> Self { use risingwave_common::system_param::local_manager::LocalSystemParamsManager; - use risingwave_pb::connector_service::PbJsonFormat; + use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_source::dml_manager::DmlManager; use risingwave_storage::monitor::MonitoredStorageMetrics; StreamEnvironment { server_addr: "127.0.0.1:5688".parse().unwrap(), - connector_params: ConnectorParams::new( - None, - SinkPayloadFormat::JsonFormat(PbJsonFormat {}), - ), + connector_params: ConnectorParams::new(None, SinkPayloadFormat::Json), config: Arc::new(StreamingConfig::default()), worker_id: WorkerNodeId::default(), state_store: StateStoreImpl::shared_in_memory_store(Arc::new( From 2dcd7ebacaf557c6c1619dd37fb23fe6144af734 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 2 Jan 2024 22:12:08 +0800 Subject: [PATCH 09/11] fix --- .../connector/SinkWriterStreamObserver.java | 2 +- proto/connector_service.proto | 2 +- src/connector/src/sink/elasticsearch.rs | 138 ++++++++++++ src/connector/src/sink/mod.rs | 1 + src/connector/src/sink/remote.rs | 201 +++++------------- src/rpc_client/src/connector_client.rs | 2 +- 6 files changed, 197 insertions(+), 149 deletions(-) create mode 100644 src/connector/src/sink/elasticsearch.rs diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java index 7997172b9f1f3..cd61da38d6cb5 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkWriterStreamObserver.java @@ -202,7 +202,7 @@ private void cleanup() { private void bindSink(ConnectorServiceProto.SinkWriterStreamRequest.StartSink startSink) { var sinkParam = startSink.getSinkParam(); - tableSchema = TableSchema.fromProto(startSink.getTableSchema()); + tableSchema = TableSchema.fromProto(startSink.getPayloadSchema()); String connectorName = getConnectorName(sinkParam); SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName); sink = sinkFactory.createWriter(tableSchema, sinkParam.getPropertiesMap()); diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 97c90bd3f1819..465af0d2a55a8 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -39,7 +39,7 @@ message SinkWriterStreamRequest { message StartSink { SinkParam sink_param = 1; SinkPayloadFormat format = 2; - TableSchema table_schema = 3; + TableSchema payload_schema = 3; } message WriteBatch { diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs new file mode 100644 index 0000000000000..f6ddbeb7e35d2 --- /dev/null +++ b/src/connector/src/sink/elasticsearch.rs @@ -0,0 +1,138 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use anyhow::anyhow; +use risingwave_common::array::{ + ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder, +}; +use risingwave_common::catalog::Schema; +use risingwave_common::row::Row; +use risingwave_common::types::{JsonbVal, Scalar, ToText}; +use serde_json::Value; + +use super::encoder::{JsonEncoder, RowEncoder}; +use super::remote::ElasticSearchSink; +use crate::sink::{Result, Sink}; +pub const ES_OPTION_DELIMITER: &str = "delimiter"; + +pub enum StreamChunkConverter { + Es(EsStreamChunkConverter), + Other, +} +impl StreamChunkConverter { + pub fn new( + sink_name: &str, + schema: Schema, + pk_indices: &Vec, + properties: &HashMap, + ) -> Result { + if sink_name == ElasticSearchSink::SINK_NAME { + Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new( + schema, + pk_indices.clone(), + properties.get(ES_OPTION_DELIMITER).cloned(), + )?)) + } else { + Ok(StreamChunkConverter::Other) + } + } + + pub fn convert_chunk(&self, chunk: StreamChunk) -> Result { + match self { + StreamChunkConverter::Es(es) => es.convert_chunk(chunk), + StreamChunkConverter::Other => Ok(chunk), + } + } +} +pub struct EsStreamChunkConverter { + json_encoder: JsonEncoder, + fn_build_id: Box) -> Result + Send>, +} +impl EsStreamChunkConverter { + fn new(schema: Schema, pk_indices: Vec, delimiter: Option) -> Result { + let fn_build_id: Box) -> Result + Send> = if pk_indices.is_empty() + { + Box::new(|row: RowRef<'_>| { + Ok(row + .datum_at(0) + .ok_or_else(|| anyhow!("No value find in row, index is 0"))? + .to_text()) + }) + } else if pk_indices.len() == 1 { + let index = *pk_indices.get(0).unwrap(); + Box::new(move |row: RowRef<'_>| { + Ok(row + .datum_at(index) + .ok_or_else(|| anyhow!("No value find in row, index is 0"))? + .to_text()) + }) + } else { + let delimiter = delimiter + .as_ref() + .ok_or_else(|| anyhow!("Please set delimiter in with option"))? + .clone(); + Box::new(move |row: RowRef<'_>| { + let mut keys = vec![]; + for index in &pk_indices { + keys.push( + row.datum_at(*index) + .ok_or_else(|| anyhow!("No value find in row, index is {}", index))? + .to_text(), + ); + } + Ok(keys.join(&delimiter)) + }) + }; + let json_encoder = JsonEncoder::new_with_es(schema, None); + Ok(Self { + json_encoder, + fn_build_id, + }) + } + + fn convert_chunk(&self, chunk: StreamChunk) -> Result { + let mut ops = vec![]; + let mut id_string_builder = + ::new(chunk.capacity()); + let mut json_builder = + ::new(chunk.capacity()); + for (op, row) in chunk.rows() { + ops.push(op); + let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?)); + risingwave_common::array::ArrayBuilder::append( + &mut id_string_builder, + Some(&self.build_id(row)?), + ); + risingwave_common::array::ArrayBuilder::append( + &mut json_builder, + Some(json.as_scalar_ref()), + ); + } + let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder); + let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder); + Ok(StreamChunk::new( + ops, + vec![ + std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)), + std::sync::Arc::new(ArrayImpl::Jsonb(json_array)), + ], + )) + } + + fn build_id(&self, row: RowRef<'_>) -> Result { + (self.fn_build_id)(row) + } +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 7bbd42edcd338..e5f9df155c735 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -20,6 +20,7 @@ pub mod coordinate; pub mod deltalake; pub mod doris; pub mod doris_starrocks_connector; +pub mod elasticsearch; pub mod encoder; pub mod formatter; pub mod iceberg; diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 7857f6933c807..82896d6c76c4a 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -25,13 +25,10 @@ use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use jni::JavaVM; use prost::Message; -use risingwave_common::array::{ - ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder, -}; -use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; +use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::error::anyhow_error; -use risingwave_common::row::Row; -use risingwave_common::types::{DataType, JsonbVal, Scalar, ToText}; +use risingwave_common::types::DataType; use risingwave_common::util::drop_either_future; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{ @@ -43,23 +40,22 @@ use risingwave_pb::connector_service::sink_writer_stream_request::{ }; use risingwave_pb::connector_service::{ sink_coordinator_stream_request, sink_coordinator_stream_response, sink_writer_stream_response, - SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, SinkPayloadFormat, - SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, ValidateSinkRequest, - ValidateSinkResponse, + PbSinkParam, SinkCoordinatorStreamRequest, SinkCoordinatorStreamResponse, SinkMetadata, + SinkPayloadFormat, SinkWriterStreamRequest, SinkWriterStreamResponse, TableSchema, + ValidateSinkRequest, ValidateSinkResponse, }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::{ BidiStreamReceiver, BidiStreamSender, SinkCoordinatorStreamHandle, SinkWriterStreamHandle, DEFAULT_BUFFER_SIZE, }; -use serde_json::Value; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use tracing::warn; -use super::encoder::{JsonEncoder, RowEncoder}; +use super::elasticsearch::{StreamChunkConverter, ES_OPTION_DELIMITER}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; @@ -70,8 +66,6 @@ use crate::sink::{ }; use crate::ConnectorParams; -const ES_OPTION_DELIMITER: &str = "delimiter"; - macro_rules! def_remote_sink { () => { def_remote_sink! { @@ -163,9 +157,17 @@ impl Sink for RemoteSink { } async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> Result<()> { + if sink_name == ElasticSearchSink::SINK_NAME + && param.downstream_pk.len() > 1 + && param.properties.get(ES_OPTION_DELIMITER).is_none() + { + return Err(SinkError::Remote(anyhow_error!( + "Es sink only support single pk or pk with delimiter option" + ))); + } // FIXME: support struct and array in stream sink param.columns.iter().map(|col| { - match col.data_type { + match &col.data_type { DataType::Int16 | DataType::Int32 | DataType::Int64 @@ -181,19 +183,30 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> Result<()> | DataType::Interval | DataType::Jsonb | DataType::Bytea => Ok(()), - DataType::List(_) | DataType::Struct(_) => { - if sink_name.eq(ElasticSearchSink::SINK_NAME){ + DataType::List(list) => { + if (sink_name==ElasticSearchSink::SINK_NAME) | matches!(list.as_list(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ + Ok(()) + } else{ + Err(SinkError::Remote(anyhow_error!( + "Remote sink only support list, got {:?}: {:?}", + col.name, + col.data_type, + ))) + } + }, + DataType::Struct(_) => { + if sink_name==ElasticSearchSink::SINK_NAME{ Ok(()) }else{ Err(SinkError::Remote(anyhow_error!( - "Only Es sink support list and struct, got {:?}: {:?}", + "Only Es sink support struct, got {:?}: {:?}", col.name, col.data_type, ))) } }, DataType::Serial | DataType::Int256 => Err(SinkError::Remote(anyhow_error!( - "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, and Varchar, (Es sink support Struct and List) got {:?}: {:?}", + "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}", col.name, col.data_type, )))}}).try_collect()?; @@ -241,114 +254,6 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> Result<()> .map_err(|e| anyhow!("unable to validate: {:?}", e))? } -enum StreamChunkConverter { - Es(EsStreamChunkConverter), - Other, -} -impl StreamChunkConverter { - fn new( - sink_name: &str, - schema: Schema, - pk_indices: &Vec, - properties: &HashMap, - ) -> Result { - if sink_name.eq(ElasticSearchSink::SINK_NAME) { - Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new( - schema, - pk_indices.clone(), - properties.get(ES_OPTION_DELIMITER).cloned(), - )?)) - } else { - Ok(StreamChunkConverter::Other) - } - } - - fn convert_chunk(&self, chunk: StreamChunk) -> Result { - match self { - StreamChunkConverter::Es(es) => es.convert_chunk(chunk), - StreamChunkConverter::Other => Ok(chunk), - } - } -} -struct EsStreamChunkConverter { - json_encoder: JsonEncoder, - fn_build_id: Box) -> Result + Send>, -} -impl EsStreamChunkConverter { - fn new(schema: Schema, pk_indices: Vec, delimiter: Option) -> Result { - let fn_build_id: Box) -> Result + Send> = if pk_indices.is_empty() - { - Box::new(|row: RowRef<'_>| { - Ok(row - .datum_at(0) - .ok_or_else(|| anyhow!("No value find in row, index is 0"))? - .to_text()) - }) - } else if pk_indices.len() == 1 { - let index = *pk_indices.get(0).unwrap(); - Box::new(move |row: RowRef<'_>| { - Ok(row - .datum_at(index) - .ok_or_else(|| anyhow!("No value find in row, index is 0"))? - .to_text()) - }) - } else { - let delimiter = delimiter - .as_ref() - .ok_or_else(|| anyhow!("Please set delimiter in with option"))? - .clone(); - Box::new(move |row: RowRef<'_>| { - let mut keys = vec![]; - for index in &pk_indices { - keys.push( - row.datum_at(*index) - .ok_or_else(|| anyhow!("No value find in row, index is {}", index))? - .to_text(), - ); - } - Ok(keys.join(&delimiter)) - }) - }; - let json_encoder = JsonEncoder::new_with_es(schema, None); - Ok(Self { - json_encoder, - fn_build_id, - }) - } - - fn convert_chunk(&self, chunk: StreamChunk) -> Result { - let mut ops = vec![]; - let mut id_string_builder = - ::new(chunk.capacity()); - let mut json_builder = - ::new(chunk.capacity()); - for (op, row) in chunk.rows() { - ops.push(op); - let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?)); - risingwave_common::array::ArrayBuilder::append( - &mut id_string_builder, - Some(&self.build_id(row)?), - ); - risingwave_common::array::ArrayBuilder::append( - &mut json_builder, - Some(json.as_scalar_ref()), - ); - } - let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder); - let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder); - Ok(StreamChunk::new( - ops, - vec![ - std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)), - std::sync::Arc::new(ArrayImpl::Jsonb(json_array)), - ], - )) - } - - fn build_id(&self, row: RowRef<'_>) -> Result { - (self.fn_build_id)(row) - } -} pub struct RemoteLogSinker { request_sender: BidiStreamSender, response_stream: BidiStreamReceiver, @@ -362,11 +267,25 @@ impl RemoteLogSinker { writer_param: SinkWriterParam, sink_name: &str, ) -> Result { + let sink_proto = sink_param.to_proto(); + let table_schema = if sink_name == ElasticSearchSink::SINK_NAME { + let columns = vec![ + ColumnDesc::unnamed(ColumnId::from(3), DataType::Varchar).to_protobuf(), + ColumnDesc::unnamed(ColumnId::from(4), DataType::Jsonb).to_protobuf(), + ]; + Some(TableSchema { + columns, + pk_indices: vec![], + }) + } else { + sink_proto.table_schema.clone() + }; + let SinkWriterStreamHandle { request_sender, response_stream, } = EmbeddedConnectorClient::new()? - .start_sink_writer_stream(&sink_param, SinkPayloadFormat::StreamChunk, sink_name) + .start_sink_writer_stream(table_schema, sink_proto, SinkPayloadFormat::StreamChunk) .await?; let sink_metrics = writer_param.sink_metrics; @@ -610,7 +529,6 @@ impl Sink for CoordinatedRemoteSink { self.param.clone(), writer_param.connector_params, writer_param.sink_metrics.clone(), - Self::SINK_NAME, ) .await?, ) @@ -636,10 +554,14 @@ impl CoordinatedRemoteSinkWriter { param: SinkParam, connector_params: ConnectorParams, sink_metrics: SinkMetrics, - sink_name: &str, ) -> Result { + let sink_proto = param.to_proto(); let stream_handle = EmbeddedConnectorClient::new()? - .start_sink_writer_stream(¶m, connector_params.sink_payload_format, sink_name) + .start_sink_writer_stream( + sink_proto.table_schema.clone(), + sink_proto, + connector_params.sink_payload_format, + ) .await?; Ok(Self { @@ -773,29 +695,16 @@ impl EmbeddedConnectorClient { async fn start_sink_writer_stream( &self, - sink_param: &SinkParam, + payload_schema: Option, + sink_proto: PbSinkParam, sink_payload_format: SinkPayloadFormat, - sink_name: &str, ) -> Result> { - let sink_proto = sink_param.to_proto(); - let table_schema = if sink_name.eq(ElasticSearchSink::SINK_NAME) { - let columns = vec![ - ColumnDesc::unnamed(ColumnId::from(3), DataType::Varchar).to_protobuf(), - ColumnDesc::unnamed(ColumnId::from(4), DataType::Jsonb).to_protobuf(), - ]; - Some(TableSchema { - columns, - pk_indices: vec![], - }) - } else { - sink_proto.table_schema.clone() - }; let (handle, first_rsp) = SinkWriterStreamHandle::initialize( SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { sink_param: Some(sink_proto), format: sink_payload_format as i32, - table_schema, + payload_schema, })), }, |rx| async move { diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index 0175bdf66d9a1..d0e9ad7d794db 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -272,7 +272,7 @@ impl ConnectorClient { let (handle, first_rsp) = SinkWriterStreamHandle::initialize( SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { - table_schema: sink_param.table_schema.clone(), + payload_schema: sink_param.table_schema.clone(), sink_param: Some(sink_param), format: sink_payload_format as i32, })), From 61d09996777909ea8afb69c9ad3d126885748da5 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 3 Jan 2024 14:00:08 +0800 Subject: [PATCH 10/11] fix --- src/connector/src/sink/remote.rs | 8 ++++---- src/rpc_client/src/connector_client.rs | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 82896d6c76c4a..c69cf81aeaa2c 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -268,10 +268,10 @@ impl RemoteLogSinker { sink_name: &str, ) -> Result { let sink_proto = sink_param.to_proto(); - let table_schema = if sink_name == ElasticSearchSink::SINK_NAME { + let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME { let columns = vec![ - ColumnDesc::unnamed(ColumnId::from(3), DataType::Varchar).to_protobuf(), - ColumnDesc::unnamed(ColumnId::from(4), DataType::Jsonb).to_protobuf(), + ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(), + ColumnDesc::unnamed(ColumnId::from(1), DataType::Jsonb).to_protobuf(), ]; Some(TableSchema { columns, @@ -285,7 +285,7 @@ impl RemoteLogSinker { request_sender, response_stream, } = EmbeddedConnectorClient::new()? - .start_sink_writer_stream(table_schema, sink_proto, SinkPayloadFormat::StreamChunk) + .start_sink_writer_stream(payload_schema, sink_proto, SinkPayloadFormat::StreamChunk) .await?; let sink_metrics = writer_param.sink_metrics; diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index d0e9ad7d794db..cd53cd019ea64 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -265,15 +265,16 @@ impl ConnectorClient { pub async fn start_sink_writer_stream( &self, - sink_param: SinkParam, + payload_schema: Option, + sink_proto: PbSinkParam, sink_payload_format: SinkPayloadFormat, ) -> Result { let mut rpc_client = self.rpc_client.clone(); let (handle, first_rsp) = SinkWriterStreamHandle::initialize( SinkWriterStreamRequest { request: Some(SinkRequest::Start(StartSink { - payload_schema: sink_param.table_schema.clone(), - sink_param: Some(sink_param), + payload_schema, + sink_param: Some(sink_proto), format: sink_payload_format as i32, })), }, From bd4a8a9d194a9d48c7cb499b3c5c9bcafdfe5498 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 4 Jan 2024 15:42:04 +0800 Subject: [PATCH 11/11] fix ci fix ci --- e2e_test/sink/elasticsearch/elasticsearch_sink.result | 2 +- e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result | 2 +- src/connector/src/sink/remote.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e_test/sink/elasticsearch/elasticsearch_sink.result b/e2e_test/sink/elasticsearch/elasticsearch_sink.result index 390b8e5415786..b67b65495804f 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_sink.result +++ b/e2e_test/sink/elasticsearch/elasticsearch_sink.result @@ -1 +1 @@ -{"took":34,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file +{"took":34,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"_doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test","_type":"_doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test","_type":"_doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test","_type":"_doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test","_type":"_doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test","_type":"_doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file diff --git a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result index 46fc2052ef14f..3e03f5a511c0a 100644 --- a/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result +++ b/e2e_test/sink/elasticsearch/elasticsearch_with_pk_sink.result @@ -1 +1 @@ -{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test1","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test1","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test1","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test1","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test1","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file +{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test1","_type":"_doc","_id":"5_5-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":5,"st2":5},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":5,"v2":2,"v3":"5-2"}},{"_index":"test1","_type":"_doc","_id":"2_2-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":2,"st2":2},"t":"00:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":2,"v2":2,"v3":"2-2"}},{"_index":"test1","_type":"_doc","_id":"13_13-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":13,"st2":13},"t":"20:00:00.123456","ts":"1970-01-01 20:00:00.123456","tz":"1970-01-01 20:00:00.123456","v1":13,"v2":2,"v3":"13-2"}},{"_index":"test1","_type":"_doc","_id":"3_3-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":3,"st2":3},"t":"00:00:00.123456","ts":"1970-01-01 00:00:00.123456","tz":"1970-01-01 00:00:00.123456","v1":3,"v2":2,"v3":"3-2"}},{"_index":"test1","_type":"_doc","_id":"8_8-2","_score":1.0,"_source":{"d":"1970-01-01","st":{"st1":8,"st2":8},"t":"20:00:00.000000","ts":"1970-01-01 00:00:00.000000","tz":"1970-01-01 00:00:00.000000","v1":8,"v2":2,"v3":"8-2"}},{"_index":"test1","_type":"_doc","_id":"1_1-50","_score":1.0,"_source":{"d":"2000-01-01","st":{"st1":1,"st2":1},"t":"00:00:00.123456","ts":"2000-01-01 00:00:00.123456","tz":"2000-01-01 00:00:00.123456","v1":1,"v2":50,"v3":"1-50"}}]}} \ No newline at end of file diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index b164f720cf7b8..320f77c6a47ba 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -185,7 +185,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> anyhow::Res | DataType::Jsonb | DataType::Bytea => Ok(()), DataType::List(list) => { - if (sink_name==ElasticSearchSink::SINK_NAME) | matches!(list.as_list(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ + if (sink_name==ElasticSearchSink::SINK_NAME) | matches!(list.as_ref(), DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Float32 | DataType::Float64 | DataType::Varchar){ Ok(()) } else{ Err(SinkError::Remote(anyhow_error!(