From 30fd4d8ab8995142eed9b185d769791e20e2bdb1 Mon Sep 17 00:00:00 2001 From: Yuhao Su <31772373+yuhao-su@users.noreply.github.com> Date: Wed, 17 Jul 2024 01:52:56 -0500 Subject: [PATCH] feat(sink): support encode jsonb data as dynamic json type in sink (#17693) --- src/connector/src/sink/encoder/json.rs | 44 ++++++++++++++++--- src/connector/src/sink/encoder/mod.rs | 25 +++++++++++ .../src/sink/formatter/debezium_json.rs | 7 ++- src/connector/src/sink/formatter/mod.rs | 5 ++- src/connector/src/sink/kafka.rs | 3 +- src/connector/src/sink/mqtt.rs | 7 +-- src/connector/src/sink/nats.rs | 5 ++- src/connector/src/sink/snowflake.rs | 4 +- 8 files changed, 85 insertions(+), 15 deletions(-) diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index d657b8b9c585..3652f38bacbb 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -30,8 +30,8 @@ use serde_json::{json, Map, Value}; use thiserror_ext::AsReport; use super::{ - CustomJsonType, DateHandlingMode, KafkaConnectParams, KafkaConnectParamsRef, Result, - RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, + CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef, + Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, }; use crate::sink::SinkError; @@ -41,6 +41,7 @@ pub struct JsonEncoderConfig { timestamp_handling_mode: TimestampHandlingMode, timestamptz_handling_mode: TimestamptzHandlingMode, custom_json_type: CustomJsonType, + jsonb_handling_mode: JsonbHandlingMode, } pub struct JsonEncoder { @@ -58,6 +59,7 @@ impl JsonEncoder { timestamp_handling_mode: TimestampHandlingMode, timestamptz_handling_mode: TimestamptzHandlingMode, time_handling_mode: TimeHandlingMode, + jsonb_handling_mode: JsonbHandlingMode, ) -> Self { let config = JsonEncoderConfig { time_handling_mode, @@ -65,6 +67,7 @@ impl JsonEncoder { timestamp_handling_mode, timestamptz_handling_mode, custom_json_type: CustomJsonType::None, + jsonb_handling_mode, }; Self { schema, @@ -81,6 +84,7 @@ impl JsonEncoder { timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, custom_json_type: CustomJsonType::Es, + jsonb_handling_mode: JsonbHandlingMode::Dynamic, }; Self { schema, @@ -101,6 +105,7 @@ impl JsonEncoder { timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, custom_json_type: CustomJsonType::Doris(map), + jsonb_handling_mode: JsonbHandlingMode::String, }; Self { schema, @@ -117,6 +122,7 @@ impl JsonEncoder { timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, custom_json_type: CustomJsonType::StarRocks, + jsonb_handling_mode: JsonbHandlingMode::Dynamic, }; Self { schema, @@ -289,11 +295,12 @@ fn datum_to_json_object( (DataType::Interval, ScalarRefImpl::Interval(v)) => { json!(v.as_iso_8601()) } - (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match &config.custom_json_type { - CustomJsonType::Es | CustomJsonType::StarRocks => JsonbVal::from(jsonb_ref).take(), - CustomJsonType::Doris(_) | CustomJsonType::None => { + + (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode { + JsonbHandlingMode::String => { json!(jsonb_ref.to_string()) } + JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(), }, (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { let elems = list_ref.iter(); @@ -446,6 +453,7 @@ mod tests { timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, custom_json_type: CustomJsonType::None, + jsonb_handling_mode: JsonbHandlingMode::String, }; let boolean_value = datum_to_json_object( @@ -517,6 +525,7 @@ mod tests { timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, custom_json_type: CustomJsonType::None, + jsonb_handling_mode: JsonbHandlingMode::String, }; let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap(); @@ -537,6 +546,7 @@ mod tests { timestamp_handling_mode: TimestampHandlingMode::Milli, timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, custom_json_type: CustomJsonType::None, + jsonb_handling_mode: JsonbHandlingMode::String, }; let ts_value = datum_to_json_object( &Field { @@ -603,6 +613,7 @@ mod tests { timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, custom_json_type: CustomJsonType::Doris(map), + jsonb_handling_mode: JsonbHandlingMode::String, }; let decimal = datum_to_json_object( &Field { @@ -628,11 +639,12 @@ mod tests { assert_eq!(date_value, json!(719163)); let from_epoch_config = JsonEncoderConfig { - time_handling_mode: TimeHandlingMode::Milli, + time_handling_mode: TimeHandlingMode::String, date_handling_mode: DateHandlingMode::FromEpoch, timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, custom_json_type: CustomJsonType::None, + jsonb_handling_mode: JsonbHandlingMode::String, }; let date_value = datum_to_json_object( &Field { @@ -651,6 +663,7 @@ mod tests { timestamp_handling_mode: TimestampHandlingMode::String, timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, custom_json_type: CustomJsonType::Doris(HashMap::default()), + jsonb_handling_mode: JsonbHandlingMode::String, }; let date_value = datum_to_json_object( &Field { @@ -683,6 +696,25 @@ mod tests { ) .unwrap(); assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}")); + + let encode_jsonb_obj_config = JsonEncoderConfig { + time_handling_mode: TimeHandlingMode::String, + date_handling_mode: DateHandlingMode::String, + timestamp_handling_mode: TimestampHandlingMode::String, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcString, + custom_json_type: CustomJsonType::None, + jsonb_handling_mode: JsonbHandlingMode::Dynamic, + }; + let json_value = datum_to_json_object( + &Field { + data_type: DataType::Jsonb, + ..mock_field.clone() + }, + Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()), + &encode_jsonb_obj_config, + ) + .unwrap(); + assert_eq!(json_value, json!([1, 2, 3])); } #[test] diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 889d0162784b..0a8a9e5abce7 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -150,6 +150,31 @@ pub enum CustomJsonType { None, } +/// How the jsonb type is encoded. +/// +/// - `String`: encode jsonb as string. `[1, true, "foo"] -> "[1, true, \"foo\"]"` +/// - `Dynamic`: encode jsonb as json type dynamically. `[1, true, "foo"] -> [1, true, "foo"]` +pub enum JsonbHandlingMode { + String, + Dynamic, +} + +impl JsonbHandlingMode { + pub const OPTION_KEY: &'static str = "jsonb.handling.mode"; + + pub fn from_options(options: &BTreeMap) -> Result { + match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) { + Some("string") | None => Ok(Self::String), + Some("dynamic") => Ok(Self::Dynamic), + Some(v) => Err(super::SinkError::Config(anyhow::anyhow!( + "unrecognized {} value {}", + Self::OPTION_KEY, + v + ))), + } + } +} + #[derive(Debug)] struct FieldEncodeError { message: String, diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index 6fff15058bd6..fd4813e78541 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -21,8 +21,8 @@ use tracing::warn; use super::{Result, SinkFormatter, StreamChunk}; use crate::sink::encoder::{ - DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, - TimestamptzHandlingMode, + DateHandlingMode, JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, + TimestampHandlingMode, TimestamptzHandlingMode, }; use crate::tri; @@ -69,6 +69,7 @@ impl DebeziumJsonFormatter { TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, TimeHandlingMode::Milli, + JsonbHandlingMode::String, ); let val_encoder = JsonEncoder::new( schema.clone(), @@ -77,6 +78,7 @@ impl DebeziumJsonFormatter { TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, TimeHandlingMode::Milli, + JsonbHandlingMode::String, ); Self { schema, @@ -397,6 +399,7 @@ mod tests { TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, TimeHandlingMode::Milli, + JsonbHandlingMode::String, ); let json_chunk = chunk_to_json(chunk, &encoder).unwrap(); let schema_json = schema_to_json(&schema, "test_db", "test_table"); diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 4628a925da98..b2e93cba763e 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -31,7 +31,8 @@ use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; use super::encoder::template::TemplateEncoder; use super::encoder::text::TextEncoder; use super::encoder::{ - DateHandlingMode, KafkaConnectParams, TimeHandlingMode, TimestamptzHandlingMode, + DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, TimeHandlingMode, + TimestamptzHandlingMode, }; use super::redis::{KEY_FORMAT, VALUE_FORMAT}; use crate::sink::encoder::{ @@ -113,6 +114,7 @@ pub trait EncoderBuild: Sized { impl EncoderBuild for JsonEncoder { async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?; + let jsonb_handling_mode = JsonbHandlingMode::from_options(&b.format_desc.options)?; let encoder = JsonEncoder::new( b.schema, pk_indices, @@ -120,6 +122,7 @@ impl EncoderBuild for JsonEncoder { TimestampHandlingMode::Milli, timestamptz_mode, TimeHandlingMode::Milli, + jsonb_handling_mode, ); let encoder = if let Some(s) = b.format_desc.options.get("schemas.enable") { match s.to_lowercase().parse::() { diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 617f427ae71f..c6e65fb00c39 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -590,7 +590,7 @@ mod test { use super::*; use crate::sink::encoder::{ - DateHandlingMode, JsonEncoder, TimeHandlingMode, TimestampHandlingMode, + DateHandlingMode, JsonEncoder, JsonbHandlingMode, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, }; use crate::sink::formatter::AppendOnlyFormatter; @@ -778,6 +778,7 @@ mod test { TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcString, TimeHandlingMode::Milli, + JsonbHandlingMode::String, ), )), ) diff --git a/src/connector/src/sink/mqtt.rs b/src/connector/src/sink/mqtt.rs index 9e86b5ec97e2..072666b01564 100644 --- a/src/connector/src/sink/mqtt.rs +++ b/src/connector/src/sink/mqtt.rs @@ -31,8 +31,8 @@ use with_options::WithOptions; use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; use super::encoder::{ - DateHandlingMode, JsonEncoder, ProtoEncoder, ProtoHeader, RowEncoder, SerTo, TimeHandlingMode, - TimestampHandlingMode, TimestamptzHandlingMode, + DateHandlingMode, JsonEncoder, JsonbHandlingMode, ProtoEncoder, ProtoHeader, RowEncoder, SerTo, + TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, }; use super::writer::AsyncTruncateSinkWriterExt; use super::{DummySinkCommitCoordinator, SinkWriterParam}; @@ -221,7 +221,7 @@ impl MqttSinkWriter { } let timestamptz_mode = TimestamptzHandlingMode::from_options(&format_desc.options)?; - + let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?; let encoder = match format_desc.format { SinkFormat::AppendOnly => match format_desc.encode { SinkEncode::Json => RowEncoderWrapper::Json(JsonEncoder::new( @@ -231,6 +231,7 @@ impl MqttSinkWriter { TimestampHandlingMode::Milli, timestamptz_mode, TimeHandlingMode::Milli, + jsonb_handling_mode, )), SinkEncode::Protobuf => { let (descriptor, sid) = crate::schema::protobuf::fetch_descriptor( diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 471ce6129841..aee0a5fbc723 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -28,7 +28,9 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; use with_options::WithOptions; -use super::encoder::{DateHandlingMode, TimeHandlingMode, TimestamptzHandlingMode}; +use super::encoder::{ + DateHandlingMode, JsonbHandlingMode, TimeHandlingMode, TimestamptzHandlingMode, +}; use super::utils::chunk_to_json; use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::connector_common::NatsCommon; @@ -151,6 +153,7 @@ impl NatsSinkWriter { TimestampHandlingMode::Milli, TimestamptzHandlingMode::UtcWithoutSuffix, TimeHandlingMode::Milli, + JsonbHandlingMode::String, ), }) } diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 1c9d67352247..6c3cc291f58e 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -34,7 +34,8 @@ use uuid::Uuid; use with_options::WithOptions; use super::encoder::{ - JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, + JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode, + TimestamptzHandlingMode, }; use super::writer::LogSinkerOf; use super::{SinkError, SinkParam}; @@ -193,6 +194,7 @@ impl SnowflakeSinkWriter { TimestampHandlingMode::String, TimestamptzHandlingMode::UtcString, TimeHandlingMode::String, + JsonbHandlingMode::String, ), // initial value of `epoch` will be set to 0 epoch: 0,