diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 474811660970..d657b8b9c585 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -35,15 +35,19 @@ use super::{ }; use crate::sink::SinkError; -pub struct JsonEncoder { - schema: Schema, - col_indices: Option>, +pub struct JsonEncoderConfig { time_handling_mode: TimeHandlingMode, date_handling_mode: DateHandlingMode, timestamp_handling_mode: TimestampHandlingMode, timestamptz_handling_mode: TimestamptzHandlingMode, custom_json_type: CustomJsonType, +} + +pub struct JsonEncoder { + schema: Schema, + col_indices: Option>, kafka_connect: Option, + config: JsonEncoderConfig, } impl JsonEncoder { @@ -55,28 +59,34 @@ impl JsonEncoder { timestamptz_handling_mode: TimestamptzHandlingMode, time_handling_mode: TimeHandlingMode, ) -> Self { - Self { - schema, - col_indices, + let config = JsonEncoderConfig { time_handling_mode, date_handling_mode, timestamp_handling_mode, timestamptz_handling_mode, custom_json_type: CustomJsonType::None, + }; + Self { + schema, + col_indices, kafka_connect: None, + config, } } pub fn new_with_es(schema: Schema, col_indices: Option>) -> Self { - Self { - schema, - col_indices, + let config = JsonEncoderConfig { time_handling_mode: TimeHandlingMode::String, date_handling_mode: DateHandlingMode::String, timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, custom_json_type: CustomJsonType::Es, + }; + Self { + schema, + col_indices, kafka_connect: None, + config, } } @@ -85,28 +95,34 @@ impl JsonEncoder { col_indices: Option>, map: HashMap, ) -> Self { - Self { - schema, - col_indices, + let config = JsonEncoderConfig { time_handling_mode: TimeHandlingMode::Milli, date_handling_mode: DateHandlingMode::String, timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, custom_json_type: CustomJsonType::Doris(map), + }; + Self { + schema, + col_indices, kafka_connect: None, + config, } } pub fn new_with_starrocks(schema: Schema, col_indices: Option>) -> Self { - Self { - schema, - col_indices, + let config = JsonEncoderConfig { time_handling_mode: TimeHandlingMode::Milli, date_handling_mode: DateHandlingMode::String, timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, custom_json_type: CustomJsonType::StarRocks, + }; + Self { + schema, + col_indices, kafka_connect: None, + config, } } @@ -139,16 +155,8 @@ impl RowEncoder for JsonEncoder { for idx in &col_indices { let field = &self.schema[*idx]; let key = field.name.clone(); - let value = datum_to_json_object( - field, - row.datum_at(*idx), - self.date_handling_mode, - self.timestamp_handling_mode, - self.timestamptz_handling_mode, - self.time_handling_mode, - &self.custom_json_type, - ) - .map_err(|e| SinkError::Encode(e.to_report_string()))?; + let value = datum_to_json_object(field, row.datum_at(*idx), &self.config) + .map_err(|e| SinkError::Encode(e.to_report_string()))?; mappings.insert(key, value); } @@ -179,11 +187,7 @@ impl SerTo for Value { fn datum_to_json_object( field: &Field, datum: DatumRef<'_>, - date_handling_mode: DateHandlingMode, - timestamp_handling_mode: TimestampHandlingMode, - timestamptz_handling_mode: TimestamptzHandlingMode, - time_handling_mode: TimeHandlingMode, - custom_json_type: &CustomJsonType, + config: &JsonEncoderConfig, ) -> ArrayResult { let scalar_ref = match datum { None => { @@ -223,7 +227,7 @@ fn datum_to_json_object( json!(v) } // Doris/Starrocks will convert out-of-bounds decimal and -INF, INF, NAN to NULL - (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match custom_json_type { + (DataType::Decimal, ScalarRefImpl::Decimal(mut v)) => match &config.custom_json_type { CustomJsonType::Doris(map) => { let s = map.get(&field.name).unwrap(); v.rescale(*s as u32); @@ -233,21 +237,23 @@ fn datum_to_json_object( json!(v.to_text()) } }, - (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => match timestamptz_handling_mode { - TimestamptzHandlingMode::UtcString => { - let parsed = v.to_datetime_utc(); - let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true); - json!(v) - } - TimestamptzHandlingMode::UtcWithoutSuffix => { - let parsed = v.to_datetime_utc().naive_utc(); - let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string(); - json!(v) + (DataType::Timestamptz, ScalarRefImpl::Timestamptz(v)) => { + match config.timestamptz_handling_mode { + TimestamptzHandlingMode::UtcString => { + let parsed = v.to_datetime_utc(); + let v = parsed.to_rfc3339_opts(chrono::SecondsFormat::Micros, true); + json!(v) + } + TimestamptzHandlingMode::UtcWithoutSuffix => { + let parsed = v.to_datetime_utc().naive_utc(); + let v = parsed.format("%Y-%m-%d %H:%M:%S%.6f").to_string(); + json!(v) + } + TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()), + TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()), } - TimestamptzHandlingMode::Micro => json!(v.timestamp_micros()), - TimestamptzHandlingMode::Milli => json!(v.timestamp_millis()), - }, - (DataType::Time, ScalarRefImpl::Time(v)) => match time_handling_mode { + } + (DataType::Time, ScalarRefImpl::Time(v)) => match config.time_handling_mode { TimeHandlingMode::Milli => { // todo: just ignore the nanos part to avoid leap second complex json!(v.0.num_seconds_from_midnight() as i64 * 1000) @@ -257,7 +263,7 @@ fn datum_to_json_object( json!(a) } }, - (DataType::Date, ScalarRefImpl::Date(v)) => match date_handling_mode { + (DataType::Date, ScalarRefImpl::Date(v)) => match config.date_handling_mode { DateHandlingMode::FromCe => json!(v.0.num_days_from_ce()), DateHandlingMode::FromEpoch => { let duration = v.0 - NaiveDateTime::UNIX_EPOCH.date(); @@ -268,10 +274,14 @@ fn datum_to_json_object( json!(a) } }, - (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => match timestamp_handling_mode { - TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()), - TimestampHandlingMode::String => json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string()), - }, + (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => { + match config.timestamp_handling_mode { + TimestampHandlingMode::Milli => json!(v.0.and_utc().timestamp_millis()), + TimestampHandlingMode::String => { + json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string()) + } + } + } (DataType::Bytea, ScalarRefImpl::Bytea(v)) => { json!(general_purpose::STANDARD.encode(v)) } @@ -279,7 +289,7 @@ fn datum_to_json_object( (DataType::Interval, ScalarRefImpl::Interval(v)) => { json!(v.as_iso_8601()) } - (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type { + (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match &config.custom_json_type { CustomJsonType::Es | CustomJsonType::StarRocks => JsonbVal::from(jsonb_ref).take(), CustomJsonType::Doris(_) | CustomJsonType::None => { json!(jsonb_ref.to_string()) @@ -290,21 +300,13 @@ fn datum_to_json_object( let mut vec = Vec::with_capacity(elems.len()); let inner_field = Field::unnamed(Box::::into_inner(datatype)); for sub_datum_ref in elems { - let value = datum_to_json_object( - &inner_field, - sub_datum_ref, - date_handling_mode, - timestamp_handling_mode, - timestamptz_handling_mode, - time_handling_mode, - custom_json_type, - )?; + let value = datum_to_json_object(&inner_field, sub_datum_ref, config)?; vec.push(value); } json!(vec) } (DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => { - match custom_json_type { + match config.custom_json_type { CustomJsonType::Doris(_) => { // We need to ensure that the order of elements in the json matches the insertion order. let mut map = IndexMap::with_capacity(st.len()); @@ -312,15 +314,7 @@ fn datum_to_json_object( st.iter() .map(|(name, dt)| Field::with_name(dt.clone(), name)), ) { - let value = datum_to_json_object( - &sub_field, - sub_datum_ref, - date_handling_mode, - timestamp_handling_mode, - timestamptz_handling_mode, - time_handling_mode, - custom_json_type, - )?; + let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?; map.insert(sub_field.name.clone(), value); } Value::String( @@ -338,15 +332,7 @@ fn datum_to_json_object( st.iter() .map(|(name, dt)| Field::with_name(dt.clone(), name)), ) { - let value = datum_to_json_object( - &sub_field, - sub_datum_ref, - date_handling_mode, - timestamp_handling_mode, - timestamptz_handling_mode, - time_handling_mode, - custom_json_type, - )?; + let value = datum_to_json_object(&sub_field, sub_datum_ref, config)?; map.insert(sub_field.name.clone(), value); } json!(map) @@ -454,17 +440,21 @@ mod tests { type_name: Default::default(), }; + let config = JsonEncoderConfig { + time_handling_mode: TimeHandlingMode::Milli, + date_handling_mode: DateHandlingMode::FromCe, + timestamp_handling_mode: TimestampHandlingMode::String, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, + custom_json_type: CustomJsonType::None, + }; + let boolean_value = datum_to_json_object( &Field { data_type: DataType::Boolean, ..mock_field.clone() }, Some(ScalarImpl::Bool(false).as_scalar_ref_impl()), - DateHandlingMode::FromCe, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &config, ) .unwrap(); assert_eq!(boolean_value, json!(false)); @@ -475,11 +465,7 @@ mod tests { ..mock_field.clone() }, Some(ScalarImpl::Int16(16).as_scalar_ref_impl()), - DateHandlingMode::FromCe, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &config, ) .unwrap(); assert_eq!(int16_value, json!(16)); @@ -490,11 +476,7 @@ mod tests { ..mock_field.clone() }, Some(ScalarImpl::Int64(i64::MAX).as_scalar_ref_impl()), - DateHandlingMode::FromCe, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &config, ) .unwrap(); assert_eq!( @@ -508,11 +490,7 @@ mod tests { ..mock_field.clone() }, Some(ScalarImpl::Serial(i64::MAX.into()).as_scalar_ref_impl()), - DateHandlingMode::FromCe, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &config, ) .unwrap(); assert_eq!( @@ -528,15 +506,19 @@ mod tests { ..mock_field.clone() }, Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()), - DateHandlingMode::FromCe, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &config, ) .unwrap(); assert_eq!(tstz_value, "2018-01-26T18:30:09.453000Z"); + let unix_wo_suffix_config = JsonEncoderConfig { + time_handling_mode: TimeHandlingMode::Milli, + date_handling_mode: DateHandlingMode::FromCe, + timestamp_handling_mode: TimestampHandlingMode::String, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, + custom_json_type: CustomJsonType::None, + }; + let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap(); let tstz_value = datum_to_json_object( &Field { @@ -544,15 +526,18 @@ mod tests { ..mock_field.clone() }, Some(ScalarImpl::Timestamptz(tstz_inner).as_scalar_ref_impl()), - DateHandlingMode::FromCe, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcWithoutSuffix, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &unix_wo_suffix_config, ) .unwrap(); assert_eq!(tstz_value, "2018-01-26 18:30:09.453000"); + let timestamp_milli_config = JsonEncoderConfig { + time_handling_mode: TimeHandlingMode::String, + date_handling_mode: DateHandlingMode::FromCe, + timestamp_handling_mode: TimestampHandlingMode::Milli, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, + custom_json_type: CustomJsonType::None, + }; let ts_value = datum_to_json_object( &Field { data_type: DataType::Timestamp, @@ -562,11 +547,7 @@ mod tests { ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0)) .as_scalar_ref_impl(), ), - DateHandlingMode::FromCe, - TimestampHandlingMode::Milli, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + ×tamp_milli_config, ) .unwrap(); assert_eq!(ts_value, json!(1000 * 1000)); @@ -580,11 +561,7 @@ mod tests { ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(1000, 0)) .as_scalar_ref_impl(), ), - DateHandlingMode::FromCe, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &config, ) .unwrap(); assert_eq!(ts_value, json!("1970-01-01 00:16:40.000000".to_string())); @@ -599,11 +576,7 @@ mod tests { ScalarImpl::Time(Time::from_num_seconds_from_midnight_uncheck(1000, 0)) .as_scalar_ref_impl(), ), - DateHandlingMode::FromCe, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &config, ) .unwrap(); assert_eq!(time_value, json!(1000 * 1000)); @@ -617,17 +590,20 @@ mod tests { ScalarImpl::Interval(Interval::from_month_day_usec(13, 2, 1000000)) .as_scalar_ref_impl(), ), - DateHandlingMode::FromCe, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &config, ) .unwrap(); assert_eq!(interval_value, json!("P1Y1M2DT0H0M1S")); let mut map = HashMap::default(); map.insert("aaa".to_string(), 5_u8); + let doris_config = JsonEncoderConfig { + time_handling_mode: TimeHandlingMode::String, + date_handling_mode: DateHandlingMode::String, + timestamp_handling_mode: TimestampHandlingMode::String, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, + custom_json_type: CustomJsonType::Doris(map), + }; let decimal = datum_to_json_object( &Field { data_type: DataType::Decimal, @@ -635,11 +611,7 @@ mod tests { ..mock_field.clone() }, Some(ScalarImpl::Decimal(Decimal::try_from(1.1111111).unwrap()).as_scalar_ref_impl()), - DateHandlingMode::String, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::Doris(map), + &doris_config, ) .unwrap(); assert_eq!(decimal, json!("1.11111")); @@ -650,41 +622,43 @@ mod tests { ..mock_field.clone() }, Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()), - DateHandlingMode::FromCe, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &config, ) .unwrap(); assert_eq!(date_value, json!(719163)); + let from_epoch_config = JsonEncoderConfig { + time_handling_mode: TimeHandlingMode::Milli, + date_handling_mode: DateHandlingMode::FromEpoch, + timestamp_handling_mode: TimestampHandlingMode::String, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, + custom_json_type: CustomJsonType::None, + }; let date_value = datum_to_json_object( &Field { data_type: DataType::Date, ..mock_field.clone() }, Some(ScalarImpl::Date(Date::from_ymd_uncheck(1970, 1, 1)).as_scalar_ref_impl()), - DateHandlingMode::FromEpoch, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::None, + &from_epoch_config, ) .unwrap(); assert_eq!(date_value, json!(0)); + let doris_config = JsonEncoderConfig { + time_handling_mode: TimeHandlingMode::String, + date_handling_mode: DateHandlingMode::String, + timestamp_handling_mode: TimestampHandlingMode::String, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, + custom_json_type: CustomJsonType::Doris(HashMap::default()), + }; let date_value = datum_to_json_object( &Field { data_type: DataType::Date, ..mock_field.clone() }, Some(ScalarImpl::Date(Date::from_ymd_uncheck(2010, 10, 10)).as_scalar_ref_impl()), - DateHandlingMode::String, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::Doris(HashMap::default()), + &doris_config, ) .unwrap(); assert_eq!(date_value, json!("2010-10-10")); @@ -705,11 +679,7 @@ mod tests { ..mock_field.clone() }, Some(ScalarRefImpl::Struct(StructRef::ValueRef { val: &value })), - DateHandlingMode::String, - TimestampHandlingMode::String, - TimestamptzHandlingMode::UtcString, - TimeHandlingMode::Milli, - &CustomJsonType::Doris(HashMap::default()), + &doris_config, ) .unwrap(); assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));