diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs new file mode 100644 index 0000000000000..e4fe775f6306c --- /dev/null +++ b/src/connector/src/sink/encoder/json.rs @@ -0,0 +1,309 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use base64::engine::general_purpose; +use base64::Engine as _; +use chrono::{Datelike, Timelike}; +use risingwave_common::array::{ArrayError, ArrayResult}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::row::Row; +use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, ToText}; +use risingwave_common::util::iter_util::ZipEqDebug; +use serde_json::{json, Map, Value}; + +use super::{Result, RowEncoder, SerTo, TimestampHandlingMode}; +use crate::sink::SinkError; + +pub struct JsonEncoder<'a> { + schema: &'a Schema, + col_indices: Option<&'a [usize]>, + timestamp_handling_mode: TimestampHandlingMode, +} + +impl<'a> JsonEncoder<'a> { + pub fn new( + schema: &'a Schema, + col_indices: Option<&'a [usize]>, + timestamp_handling_mode: TimestampHandlingMode, + ) -> Self { + Self { + schema, + col_indices, + timestamp_handling_mode, + } + } +} + +impl<'a> RowEncoder for JsonEncoder<'a> { + type Output = Map; + + fn schema(&self) -> &Schema { + self.schema + } + + fn col_indices(&self) -> Option<&[usize]> { + self.col_indices + } + + fn encode_cols( + &self, + row: impl Row, + col_indices: impl Iterator, + ) -> Result { + let mut mappings = Map::with_capacity(self.schema.len()); + for idx in col_indices { + let field = &self.schema[idx]; + let key = field.name.clone(); + let value = + datum_to_json_object(field, row.datum_at(idx), self.timestamp_handling_mode) + .map_err(|e| SinkError::JsonParse(e.to_string()))?; + mappings.insert(key, value); + } + Ok(mappings) + } +} + +impl SerTo for Map { + fn ser_to(self) -> Result { + Value::Object(self).ser_to() + } +} + +impl SerTo for Value { + fn ser_to(self) -> Result { + Ok(self.to_string()) + } +} + +fn datum_to_json_object( + field: &Field, + datum: DatumRef<'_>, + timestamp_handling_mode: TimestampHandlingMode, +) -> ArrayResult { + let scalar_ref = match datum { + None => return Ok(Value::Null), + Some(datum) => datum, + }; + + let data_type = field.data_type(); + + tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref); + + let value = match (data_type, scalar_ref) { + (DataType::Boolean, ScalarRefImpl::Bool(v)) => { + json!(v) + } + (DataType::Int16, ScalarRefImpl::Int16(v)) => { + json!(v) + } + (DataType::Int32, ScalarRefImpl::Int32(v)) => { + json!(v) + } + (DataType::Int64, ScalarRefImpl::Int64(v)) => { + json!(v) + } + (DataType::Float32, ScalarRefImpl::Float32(v)) => { + json!(f32::from(v)) + } + (DataType::Float64, ScalarRefImpl::Float64(v)) => { + json!(f64::from(v)) + } + (DataType::Varchar, ScalarRefImpl::Utf8(v)) => { + json!(v) + } + (DataType::Decimal, ScalarRefImpl::Decimal(v)) => { + json!(v.to_text()) + } + (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => { + // risingwave's timestamp with timezone is stored in UTC and does not maintain the + // timezone info and the time is in microsecond. + let parsed = v.to_datetime_utc().naive_utc(); + let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string(); + json!(v) + } + (DataType::Time, ScalarRefImpl::Time(v)) => { + // todo: just ignore the nanos part to avoid leap second complex + json!(v.0.num_seconds_from_midnight() as i64 * 1000) + } + (DataType::Date, ScalarRefImpl::Date(v)) => { + json!(v.0.num_days_from_ce()) + } + (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => match timestamp_handling_mode { + TimestampHandlingMode::Milli => json!(v.0.timestamp_millis()), + TimestampHandlingMode::String => json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string()), + }, + (DataType::Bytea, ScalarRefImpl::Bytea(v)) => { + json!(general_purpose::STANDARD_NO_PAD.encode(v)) + } + // PYMDTHMS + (DataType::Interval, ScalarRefImpl::Interval(v)) => { + json!(v.as_iso_8601()) + } + (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => { + json!(jsonb_ref.to_string()) + } + (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { + let elems = list_ref.iter(); + let mut vec = Vec::with_capacity(elems.len()); + let inner_field = Field::unnamed(Box::::into_inner(datatype)); + for sub_datum_ref in elems { + let value = + datum_to_json_object(&inner_field, sub_datum_ref, timestamp_handling_mode)?; + vec.push(value); + } + json!(vec) + } + (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => { + let mut map = Map::with_capacity(st.len()); + for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( + st.iter() + .map(|(name, dt)| Field::with_name(dt.clone(), name)), + ) { + let value = + datum_to_json_object(&sub_field, sub_datum_ref, timestamp_handling_mode)?; + map.insert(sub_field.name.clone(), value); + } + json!(map) + } + (data_type, scalar_ref) => { + return Err(ArrayError::internal( + format!("datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}", field.name, data_type, scalar_ref), + )); + } + }; + + Ok(value) +} + +#[cfg(test)] +mod tests { + + use risingwave_common::types::{DataType, Interval, ScalarImpl, Time, Timestamp}; + + use super::*; + #[test] + fn test_to_json_basic_type() { + let mock_field = Field { + data_type: DataType::Boolean, + name: Default::default(), + sub_fields: Default::default(), + type_name: Default::default(), + }; + let boolean_value = datum_to_json_object( + &Field { + data_type: DataType::Boolean, + ..mock_field.clone() + }, + Some(ScalarImpl::Bool(false).as_scalar_ref_impl()), + TimestampHandlingMode::String, + ) + .unwrap(); + assert_eq!(boolean_value, json!(false)); + + let int16_value = datum_to_json_object( + &Field { + data_type: DataType::Int16, + ..mock_field.clone() + }, + Some(ScalarImpl::Int16(16).as_scalar_ref_impl()), + TimestampHandlingMode::String, + ) + .unwrap(); + assert_eq!(int16_value, json!(16)); + + let int64_value = datum_to_json_object( + &Field { + data_type: DataType::Int64, + ..mock_field.clone() + }, + Some(ScalarImpl::Int64(std::i64::MAX).as_scalar_ref_impl()), + TimestampHandlingMode::String, + ) + .unwrap(); + assert_eq!( + serde_json::to_string(&int64_value).unwrap(), + std::i64::MAX.to_string() + ); + + // https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/time/ZonedTimestamp.java + let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap(); + let tstz_value = datum_to_json_object( + &Field { + data_type: DataType::Timestamptz, + ..mock_field.clone() + }, + Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()), + TimestampHandlingMode::String, + ) + .unwrap(); + assert_eq!(tstz_value, "2018-01-26 18:30:09.453000"); + + let ts_value = datum_to_json_object( + &Field { + data_type: DataType::Timestamp, + ..mock_field.clone() + }, + Some( + ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0)) + .as_scalar_ref_impl(), + ), + TimestampHandlingMode::Milli, + ) + .unwrap(); + assert_eq!(ts_value, json!(1000 * 1000)); + + let ts_value = datum_to_json_object( + &Field { + data_type: DataType::Timestamp, + ..mock_field.clone() + }, + Some( + ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0)) + .as_scalar_ref_impl(), + ), + TimestampHandlingMode::String, + ) + .unwrap(); + assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_string())); + + // Represents the number of microseconds past midnigh, io.debezium.time.Time + let time_value = datum_to_json_object( + &Field { + data_type: DataType::Time, + ..mock_field.clone() + }, + Some( + ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0)) + .as_scalar_ref_impl(), + ), + TimestampHandlingMode::String, + ) + .unwrap(); + assert_eq!(time_value, json!(1000 * 1000)); + + let interval_value = datum_to_json_object( + &Field { + data_type: DataType::Interval, + ..mock_field + }, + Some( + ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000)) + .as_scalar_ref_impl(), + ), + TimestampHandlingMode::String, + ) + .unwrap(); + assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S")); + } +} diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs new file mode 100644 index 0000000000000..83f185935a44e --- /dev/null +++ b/src/connector/src/sink/encoder/mod.rs @@ -0,0 +1,80 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::Schema; +use risingwave_common::row::Row; + +use crate::sink::Result; + +mod json; + +pub use json::JsonEncoder; + +/// Encode a row of a relation into +/// * an object in json +/// * a message in protobuf +/// * a record in avro +pub trait RowEncoder { + type Output: SerTo>; + + fn encode_cols( + &self, + row: impl Row, + col_indices: impl Iterator, + ) -> Result; + fn schema(&self) -> &Schema; + fn col_indices(&self) -> Option<&[usize]>; + + fn encode(&self, row: impl Row) -> Result { + assert_eq!(row.len(), self.schema().len()); + match self.col_indices() { + Some(col_indices) => self.encode_cols(row, col_indices.iter().copied()), + None => self.encode_cols(row, 0..self.schema().len()), + } + } +} + +/// Do the actual encoding from +/// * an json object +/// * a protobuf message +/// * an avro record +/// into +/// * string (required by kinesis key) +/// * bytes +/// +/// This is like `TryInto` but allows us to `impl> SerTo> for T`. +/// +/// Shall we consider `impl serde::Serialize` in the future? +pub trait SerTo { + fn ser_to(self) -> Result; +} + +impl> SerTo> for T { + fn ser_to(self) -> Result> { + self.ser_to().map(|s: String| s.into_bytes()) + } +} + +impl SerTo for T { + fn ser_to(self) -> Result { + Ok(self) + } +} + +/// Useful for both json and protobuf +#[derive(Clone, Copy)] +pub enum TimestampHandlingMode { + Milli, + String, +} diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index aee99da2b74bb..72234bcbbb907 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -33,6 +33,7 @@ use serde_derive::{Deserialize, Serialize}; use serde_json::Value; use serde_with::{serde_as, DisplayFromStr}; +use super::encoder::{JsonEncoder, TimestampHandlingMode}; use super::{ Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -546,10 +547,17 @@ impl KafkaSinkWriter { // TODO: Remove the clones here, only to satisfy borrow checker at present let schema = self.schema.clone(); let pk_indices = self.pk_indices.clone(); + let key_encoder = + JsonEncoder::new(&schema, Some(&pk_indices), TimestampHandlingMode::Milli); + let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli); // Initialize the upsert_stream - let upsert_stream = - gen_upsert_message_stream(&schema, &pk_indices, chunk, UpsertAdapterOpts::default()); + let upsert_stream = gen_upsert_message_stream( + chunk, + UpsertAdapterOpts::default(), + key_encoder, + val_encoder, + ); #[for_await] for msg in upsert_stream { @@ -564,13 +572,16 @@ impl KafkaSinkWriter { // TODO: Remove the clones here, only to satisfy borrow checker at present let schema = self.schema.clone(); let pk_indices = self.pk_indices.clone(); + let key_encoder = + JsonEncoder::new(&schema, Some(&pk_indices), TimestampHandlingMode::Milli); + let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli); // Initialize the append_only_stream let append_only_stream = gen_append_only_message_stream( - &schema, - &pk_indices, chunk, AppendOnlyAdapterOpts::default(), + key_encoder, + val_encoder, ); #[for_await] diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 7ef608b6c6372..b73b48771c2bd 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -31,6 +31,7 @@ use tokio_retry::Retry; use super::SinkParam; use crate::common::KinesisCommon; +use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; use crate::sink::utils::{ gen_append_only_message_stream, gen_debezium_message_stream, gen_upsert_message_stream, AppendOnlyAdapterOpts, DebeziumAdapterOpts, UpsertAdapterOpts, @@ -202,11 +203,17 @@ impl KinesisSinkWriter { } async fn upsert(&self, chunk: StreamChunk) -> Result<()> { - let upsert_stream = gen_upsert_message_stream( + let key_encoder = JsonEncoder::new( &self.schema, - &self.pk_indices, + Some(&self.pk_indices), + TimestampHandlingMode::Milli, + ); + let val_encoder = JsonEncoder::new(&self.schema, None, TimestampHandlingMode::Milli); + let upsert_stream = gen_upsert_message_stream( chunk, UpsertAdapterOpts::default(), + key_encoder, + val_encoder, ); crate::impl_load_stream_write_record!(upsert_stream, self.put_record); @@ -214,11 +221,17 @@ impl KinesisSinkWriter { } async fn append_only(&self, chunk: StreamChunk) -> Result<()> { - let append_only_stream = gen_append_only_message_stream( + let key_encoder = JsonEncoder::new( &self.schema, - &self.pk_indices, + Some(&self.pk_indices), + TimestampHandlingMode::Milli, + ); + let val_encoder = JsonEncoder::new(&self.schema, None, TimestampHandlingMode::Milli); + let append_only_stream = gen_append_only_message_stream( chunk, AppendOnlyAdapterOpts::default(), + key_encoder, + val_encoder, ); crate::impl_load_stream_write_record!(append_only_stream, self.put_record); diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index c2a0f3019c942..3e785edc7a9e9 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -16,6 +16,7 @@ pub mod boxed; pub mod catalog; pub mod clickhouse; pub mod coordinate; +pub mod encoder; pub mod iceberg; pub mod kafka; pub mod kinesis; diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 403ee4c7b73e5..046d29bc687b5 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -38,9 +38,9 @@ use tokio::sync::mpsc::{Sender, UnboundedReceiver}; use tonic::Status; use tracing::{error, warn}; +use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::iceberg::REMOTE_ICEBERG_SINK; -use crate::sink::utils::{record_to_json, TimestampHandlingMode}; use crate::sink::SinkError::Remote; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, @@ -351,12 +351,9 @@ where let payload = match self.payload_format { SinkPayloadFormat::Json => { let mut row_ops = vec![]; + let enc = JsonEncoder::new(&self.schema, None, TimestampHandlingMode::String); for (op, row_ref) in chunk.rows() { - let map = record_to_json( - row_ref, - &self.schema.fields, - TimestampHandlingMode::String, - )?; + let map = enc.encode(row_ref)?; let row_op = RowOp { op_type: op.to_protobuf() as i32, line: serde_json::to_string(&map) diff --git a/src/connector/src/sink/utils.rs b/src/connector/src/sink/utils.rs index 1eb2f91e9af22..cb3023b9e4f9b 100644 --- a/src/connector/src/sink/utils.rs +++ b/src/connector/src/sink/utils.rs @@ -12,19 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use base64::engine::general_purpose; -use base64::Engine as _; -use chrono::{Datelike, Timelike}; use futures_async_stream::try_stream; use risingwave_common::array::stream_chunk::Op; -use risingwave_common::array::{ArrayError, ArrayResult, RowRef, StreamChunk}; +use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::row::Row; -use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, ToText}; -use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast}; use serde_json::{json, Map, Value}; use tracing::warn; +use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; use crate::sink::{Result, SinkError}; const DEBEZIUM_NAME_FIELD_PREFIX: &str = "RisingWave"; @@ -62,6 +57,9 @@ pub async fn gen_debezium_message_stream<'a>( let mut update_cache: Option> = None; + let key_encoder = JsonEncoder::new(schema, Some(pk_indices), TimestampHandlingMode::Milli); + let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); + for (op, row) in chunk.rows() { let event_key_object: Option = Some(json!({ "schema": json!({ @@ -70,14 +68,14 @@ pub async fn gen_debezium_message_stream<'a>( "optional": false, "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), }), - "payload": pk_to_json(row, &schema.fields, pk_indices)?, + "payload": key_encoder.encode(row)?, })); let event_object: Option = match op { Op::Insert => Some(json!({ "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { "before": null, - "after": record_to_json(row, &schema.fields, TimestampHandlingMode::Milli)?, + "after": val_encoder.encode(row)?, "op": "c", "ts_ms": ts_ms, "source": source_field, @@ -87,7 +85,7 @@ pub async fn gen_debezium_message_stream<'a>( let value_obj = Some(json!({ "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { - "before": record_to_json(row, &schema.fields, TimestampHandlingMode::Milli)?, + "before": val_encoder.encode(row)?, "after": null, "op": "d", "ts_ms": ts_ms, @@ -105,11 +103,7 @@ pub async fn gen_debezium_message_stream<'a>( continue; } Op::UpdateDelete => { - update_cache = Some(record_to_json( - row, - &schema.fields, - TimestampHandlingMode::Milli, - )?); + update_cache = Some(val_encoder.encode(row)?); continue; } Op::UpdateInsert => { @@ -118,7 +112,7 @@ pub async fn gen_debezium_message_stream<'a>( "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { "before": before, - "after": record_to_json(row, &schema.fields, TimestampHandlingMode::Milli)?, + "after": val_encoder.encode(row)?, "op": "u", "ts_ms": ts_ms, "source": source_field, @@ -245,186 +239,38 @@ pub(crate) fn field_to_json(field: &Field) -> Value { }) } -pub(crate) fn pk_to_json( - row: RowRef<'_>, - schema: &[Field], - pk_indices: &[usize], -) -> Result> { - let mut mappings = Map::with_capacity(schema.len()); - for idx in pk_indices { - let field = &schema[*idx]; - let key = field.name.clone(); - let value = datum_to_json_object(field, row.datum_at(*idx), TimestampHandlingMode::Milli) - .map_err(|e| SinkError::JsonParse(e.to_string()))?; - mappings.insert(key, value); - } - Ok(mappings) -} - pub fn chunk_to_json(chunk: StreamChunk, schema: &Schema) -> Result> { + let encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); let mut records: Vec = Vec::with_capacity(chunk.capacity()); for (_, row) in chunk.rows() { - let record = Value::Object(record_to_json( - row, - &schema.fields, - TimestampHandlingMode::Milli, - )?); + let record = Value::Object(encoder.encode(row)?); records.push(record.to_string()); } Ok(records) } -#[derive(Clone, Copy)] -pub enum TimestampHandlingMode { - Milli, - String, -} - -pub fn record_to_json( - row: RowRef<'_>, - schema: &[Field], - timestamp_handling_mode: TimestampHandlingMode, -) -> Result> { - let mut mappings = Map::with_capacity(schema.len()); - for (field, datum_ref) in schema.iter().zip_eq_fast(row.iter()) { - let key = field.name.clone(); - let value = datum_to_json_object(field, datum_ref, timestamp_handling_mode) - .map_err(|e| SinkError::JsonParse(e.to_string()))?; - mappings.insert(key, value); - } - Ok(mappings) -} - -fn datum_to_json_object( - field: &Field, - datum: DatumRef<'_>, - timestamp_handling_mode: TimestampHandlingMode, -) -> ArrayResult { - let scalar_ref = match datum { - None => return Ok(Value::Null), - Some(datum) => datum, - }; - - let data_type = field.data_type(); - - tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref); - - let value = match (data_type, scalar_ref) { - (DataType::Boolean, ScalarRefImpl::Bool(v)) => { - json!(v) - } - (DataType::Int16, ScalarRefImpl::Int16(v)) => { - json!(v) - } - (DataType::Int32, ScalarRefImpl::Int32(v)) => { - json!(v) - } - (DataType::Int64, ScalarRefImpl::Int64(v)) => { - json!(v) - } - (DataType::Float32, ScalarRefImpl::Float32(v)) => { - json!(f32::from(v)) - } - (DataType::Float64, ScalarRefImpl::Float64(v)) => { - json!(f64::from(v)) - } - (DataType::Varchar, ScalarRefImpl::Utf8(v)) => { - json!(v) - } - (DataType::Decimal, ScalarRefImpl::Decimal(v)) => { - json!(v.to_text()) - } - (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => { - // risingwave's timestamp with timezone is stored in UTC and does not maintain the - // timezone info and the time is in microsecond. - let parsed = v.to_datetime_utc().naive_utc(); - let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string(); - json!(v) - } - (DataType::Time, ScalarRefImpl::Time(v)) => { - // todo: just ignore the nanos part to avoid leap second complex - json!(v.0.num_seconds_from_midnight() as i64 * 1000) - } - (DataType::Date, ScalarRefImpl::Date(v)) => { - json!(v.0.num_days_from_ce()) - } - (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => match timestamp_handling_mode { - TimestampHandlingMode::Milli => json!(v.0.timestamp_millis()), - TimestampHandlingMode::String => json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string()), - }, - (DataType::Bytea, ScalarRefImpl::Bytea(v)) => { - json!(general_purpose::STANDARD_NO_PAD.encode(v)) - } - // PYMDTHMS - (DataType::Interval, ScalarRefImpl::Interval(v)) => { - json!(v.as_iso_8601()) - } - (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => { - json!(jsonb_ref.to_string()) - } - (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { - let elems = list_ref.iter(); - let mut vec = Vec::with_capacity(elems.len()); - let inner_field = Field::unnamed(Box::::into_inner(datatype)); - for sub_datum_ref in elems { - let value = - datum_to_json_object(&inner_field, sub_datum_ref, timestamp_handling_mode)?; - vec.push(value); - } - json!(vec) - } - (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => { - let mut map = Map::with_capacity(st.len()); - for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( - st.iter() - .map(|(name, dt)| Field::with_name(dt.clone(), name)), - ) { - let value = - datum_to_json_object(&sub_field, sub_datum_ref, timestamp_handling_mode)?; - map.insert(sub_field.name.clone(), value); - } - json!(map) - } - (data_type, scalar_ref) => { - return Err(ArrayError::internal( - format!("datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}", field.name, data_type, scalar_ref), - )); - } - }; - - Ok(value) -} - #[derive(Debug, Clone, Default)] pub struct UpsertAdapterOpts {} #[try_stream(ok = (Option, Option), error = SinkError)] pub async fn gen_upsert_message_stream<'a>( - schema: &'a Schema, - pk_indices: &'a [usize], chunk: StreamChunk, _opts: UpsertAdapterOpts, + key_encoder: JsonEncoder<'a>, + val_encoder: JsonEncoder<'a>, ) { for (op, row) in chunk.rows() { - let event_key_object = Some(Value::Object(pk_to_json(row, &schema.fields, pk_indices)?)); + let event_key_object = Some(Value::Object(key_encoder.encode(row)?)); let event_object = match op { - Op::Insert => Some(Value::Object(record_to_json( - row, - &schema.fields, - TimestampHandlingMode::Milli, - )?)), + Op::Insert => Some(Value::Object(val_encoder.encode(row)?)), Op::Delete => Some(Value::Null), Op::UpdateDelete => { // upsert semantic does not require update delete event continue; } - Op::UpdateInsert => Some(Value::Object(record_to_json( - row, - &schema.fields, - TimestampHandlingMode::Milli, - )?)), + Op::UpdateInsert => Some(Value::Object(val_encoder.encode(row)?)), }; yield (event_key_object, event_object); @@ -436,144 +282,18 @@ pub struct AppendOnlyAdapterOpts {} #[try_stream(ok = (Option, Option), error = SinkError)] pub async fn gen_append_only_message_stream<'a>( - schema: &'a Schema, - pk_indices: &'a [usize], chunk: StreamChunk, _opts: AppendOnlyAdapterOpts, + key_encoder: JsonEncoder<'a>, + val_encoder: JsonEncoder<'a>, ) { for (op, row) in chunk.rows() { if op != Op::Insert { continue; } - let event_key_object = Some(Value::Object(pk_to_json(row, &schema.fields, pk_indices)?)); - let event_object = Some(Value::Object(record_to_json( - row, - &schema.fields, - TimestampHandlingMode::Milli, - )?)); + let event_key_object = Some(Value::Object(key_encoder.encode(row)?)); + let event_object = Some(Value::Object(val_encoder.encode(row)?)); yield (event_key_object, event_object); } } - -#[cfg(test)] -mod tests { - - use risingwave_common::types::{DataType, Interval, ScalarImpl, Time, Timestamp}; - - use super::*; - #[test] - fn test_to_json_basic_type() { - let mock_field = Field { - data_type: DataType::Boolean, - name: Default::default(), - sub_fields: Default::default(), - type_name: Default::default(), - }; - let boolean_value = datum_to_json_object( - &Field { - data_type: DataType::Boolean, - ..mock_field.clone() - }, - Some(ScalarImpl::Bool(false).as_scalar_ref_impl()), - TimestampHandlingMode::String, - ) - .unwrap(); - assert_eq!(boolean_value, json!(false)); - - let int16_value = datum_to_json_object( - &Field { - data_type: DataType::Int16, - ..mock_field.clone() - }, - Some(ScalarImpl::Int16(16).as_scalar_ref_impl()), - TimestampHandlingMode::String, - ) - .unwrap(); - assert_eq!(int16_value, json!(16)); - - let int64_value = datum_to_json_object( - &Field { - data_type: DataType::Int64, - ..mock_field.clone() - }, - Some(ScalarImpl::Int64(std::i64::MAX).as_scalar_ref_impl()), - TimestampHandlingMode::String, - ) - .unwrap(); - assert_eq!( - serde_json::to_string(&int64_value).unwrap(), - std::i64::MAX.to_string() - ); - - // https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/time/ZonedTimestamp.java - let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap(); - let tstz_value = datum_to_json_object( - &Field { - data_type: DataType::Timestamptz, - ..mock_field.clone() - }, - Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()), - TimestampHandlingMode::String, - ) - .unwrap(); - assert_eq!(tstz_value, "2018-01-26 18:30:09.453000"); - - let ts_value = datum_to_json_object( - &Field { - data_type: DataType::Timestamp, - ..mock_field.clone() - }, - Some( - ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0)) - .as_scalar_ref_impl(), - ), - TimestampHandlingMode::Milli, - ) - .unwrap(); - assert_eq!(ts_value, json!(1000 * 1000)); - - let ts_value = datum_to_json_object( - &Field { - data_type: DataType::Timestamp, - ..mock_field.clone() - }, - Some( - ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0)) - .as_scalar_ref_impl(), - ), - TimestampHandlingMode::String, - ) - .unwrap(); - assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_string())); - - // Represents the number of microseconds past midnigh, io.debezium.time.Time - let time_value = datum_to_json_object( - &Field { - data_type: DataType::Time, - ..mock_field.clone() - }, - Some( - ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0)) - .as_scalar_ref_impl(), - ), - TimestampHandlingMode::String, - ) - .unwrap(); - assert_eq!(time_value, json!(1000 * 1000)); - - let interval_value = datum_to_json_object( - &Field { - data_type: DataType::Interval, - ..mock_field - }, - Some( - ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000)) - .as_scalar_ref_impl(), - ), - TimestampHandlingMode::String, - ) - .unwrap(); - assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S")); - } -}