From e5ee68b38791e62c42a534de6a1f48c9fd99a4f9 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Wed, 10 Jul 2024 22:09:21 -0400 Subject: [PATCH] update grammar --- proto/catalog.proto | 1 + .../src/parser/debezium/debezium_parser.rs | 3 + .../src/parser/debezium/simd_json_parser.rs | 1 + .../src/parser/dynamodb/cdc_json_parser.rs | 7 +- .../src/parser/dynamodb/change_event.rs | 41 +++++++--- .../src/parser/dynamodb/json_parser.rs | 22 +++-- src/connector/src/parser/dynamodb/mod.rs | 13 ++- .../src/parser/maxwell/simd_json_parser.rs | 1 + src/connector/src/parser/mod.rs | 18 ++++- .../src/source/datagen/source/generator.rs | 1 + src/frontend/src/handler/create_source.rs | 80 +++++++++++++------ .../src/executor/backfill/cdc/cdc_backfill.rs | 1 + 12 files changed, 138 insertions(+), 51 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 8a360418159b3..78214e9d68068 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -87,6 +87,7 @@ message StreamSourceInfo { // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type. // For format and encode options. map format_encode_secret_refs = 16; + string json_single_blob_column = 17; } message Source { diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index bc43e556cb4f6..09200cfe3593f 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -116,6 +116,7 @@ impl DebeziumParser { encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, + single_blob_column: None, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), }; @@ -226,6 +227,7 @@ mod tests { encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, + single_blob_column: None, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), }; @@ -298,6 +300,7 @@ mod tests { encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, + single_blob_column: None, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), }; diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index f9738eb9e357e..c3ef61c0a5bb5 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -133,6 +133,7 @@ mod tests { encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, + single_blob_column: None, }), protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), }; diff --git a/src/connector/src/parser/dynamodb/cdc_json_parser.rs b/src/connector/src/parser/dynamodb/cdc_json_parser.rs index b7209d2b4a0f7..6b55b0f623107 100644 --- a/src/connector/src/parser/dynamodb/cdc_json_parser.rs +++ b/src/connector/src/parser/dynamodb/cdc_json_parser.rs @@ -27,6 +27,7 @@ pub struct DynamodbCdcJsonParser { payload_builder: AccessBuilderImpl, pub(crate) rw_columns: Vec, source_ctx: SourceContextRef, + single_blob_column: String, } impl DynamodbCdcJsonParser { @@ -36,11 +37,13 @@ impl DynamodbCdcJsonParser { source_ctx: SourceContextRef, ) -> ConnectorResult { // the key of Dynamodb CDC are embedded value of primary key and partition key, which is not used here. - let payload_builder = build_dynamodb_json_accessor_builder(props.encoding_config).await?; + let (payload_builder, single_blob_column) = + build_dynamodb_json_accessor_builder(props.encoding_config).await?; Ok(Self { payload_builder, rw_columns, source_ctx, + single_blob_column, }) } @@ -50,7 +53,7 @@ impl DynamodbCdcJsonParser { mut writer: SourceStreamChunkRowWriter<'_>, ) -> ConnectorResult<()> { let payload_accessor = self.payload_builder.generate_accessor(payload).await?; - let row_op = DynamodbChangeEvent::new(payload_accessor); + let row_op = DynamodbChangeEvent::new(payload_accessor, self.single_blob_column.clone()); match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) { Ok(_) => Ok(()), Err(err) => Err(err)?, diff --git a/src/connector/src/parser/dynamodb/change_event.rs b/src/connector/src/parser/dynamodb/change_event.rs index a875d780c1d98..8780328ffc9ed 100644 --- a/src/connector/src/parser/dynamodb/change_event.rs +++ b/src/connector/src/parser/dynamodb/change_event.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use assert_matches::assert_matches; use risingwave_common::types::{DataType, DatumCow, ScalarRefImpl, ToDatumRef}; use crate::parser::dynamodb::map_rw_type_to_dynamodb_type; @@ -85,6 +86,7 @@ use crate::source::SourceColumnDesc; // } pub struct DynamodbChangeEvent { value_accessor: A, + single_blob_column: String, } const OLD_IMAGE: &str = "OldImage"; @@ -101,8 +103,11 @@ impl DynamodbChangeEvent where A: Access, { - pub fn new(value_accessor: A) -> Self { - Self { value_accessor } + pub fn new(value_accessor: A, single_blob_column: String) -> Self { + Self { + value_accessor, + single_blob_column, + } } } @@ -111,16 +116,28 @@ where A: Access, { fn access_field(&self, desc: &SourceColumnDesc) -> crate::parser::AccessResult> { - let dynamodb_type = map_rw_type_to_dynamodb_type(&desc.data_type)?; - match self.op()? { - ChangeEventOperation::Delete => self.value_accessor.access( - &[DYNAMODB, OLD_IMAGE, &desc.name, dynamodb_type.as_str()], - &desc.data_type, - ), - ChangeEventOperation::Upsert => self.value_accessor.access( - &[DYNAMODB, NEW_IMAGE, &desc.name, dynamodb_type.as_str()], - &desc.data_type, - ), + if desc.name == self.single_blob_column { + assert_matches!(desc.data_type, DataType::Jsonb); + match self.op()? { + ChangeEventOperation::Delete => self + .value_accessor + .access(&[DYNAMODB, OLD_IMAGE], &desc.data_type), + ChangeEventOperation::Upsert => self + .value_accessor + .access(&[DYNAMODB, NEW_IMAGE], &desc.data_type), + } + } else { + let dynamodb_type = map_rw_type_to_dynamodb_type(&desc.data_type)?; + match self.op()? { + ChangeEventOperation::Delete => self.value_accessor.access( + &[DYNAMODB, OLD_IMAGE, &desc.name, dynamodb_type.as_str()], + &desc.data_type, + ), + ChangeEventOperation::Upsert => self.value_accessor.access( + &[DYNAMODB, NEW_IMAGE, &desc.name, dynamodb_type.as_str()], + &desc.data_type, + ), + } } } diff --git a/src/connector/src/parser/dynamodb/json_parser.rs b/src/connector/src/parser/dynamodb/json_parser.rs index a878cb051faba..1b85611349796 100644 --- a/src/connector/src/parser/dynamodb/json_parser.rs +++ b/src/connector/src/parser/dynamodb/json_parser.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use assert_matches::assert_matches; +use risingwave_common::types::DataType; use risingwave_connector_codec::decoder::Access; use crate::error::ConnectorResult; @@ -53,6 +55,7 @@ pub struct DynamodbJsonParser { payload_builder: AccessBuilderImpl, pub(crate) rw_columns: Vec, source_ctx: SourceContextRef, + single_blob_column: String, } impl DynamodbJsonParser { @@ -61,11 +64,13 @@ impl DynamodbJsonParser { rw_columns: Vec, source_ctx: SourceContextRef, ) -> ConnectorResult { - let payload_builder = build_dynamodb_json_accessor_builder(props.encoding_config).await?; + let (payload_builder, single_blob_column) = + build_dynamodb_json_accessor_builder(props.encoding_config).await?; Ok(Self { payload_builder, rw_columns, source_ctx, + single_blob_column, }) } @@ -76,11 +81,16 @@ impl DynamodbJsonParser { ) -> ConnectorResult<()> { let payload_accessor = self.payload_builder.generate_accessor(payload).await?; writer.do_insert(|column| { - let dynamodb_type = map_rw_type_to_dynamodb_type(&column.data_type)?; - payload_accessor.access( - &[ITEM, &column.name, dynamodb_type.as_str()], - &column.data_type, - ) + if column.name == self.single_blob_column { + assert_matches!(column.data_type, DataType::Jsonb); + payload_accessor.access(&[ITEM], &column.data_type) + } else { + let dynamodb_type = map_rw_type_to_dynamodb_type(&column.data_type)?; + payload_accessor.access( + &[ITEM, &column.name, dynamodb_type.as_str()], + &column.data_type, + ) + } })?; Ok(()) } diff --git a/src/connector/src/parser/dynamodb/mod.rs b/src/connector/src/parser/dynamodb/mod.rs index 604f98dba9892..9f531799be4d2 100644 --- a/src/connector/src/parser/dynamodb/mod.rs +++ b/src/connector/src/parser/dynamodb/mod.rs @@ -27,11 +27,16 @@ use crate::parser::{AccessBuilderImpl, EncodingProperties, JsonAccessBuilder}; pub(crate) async fn build_dynamodb_json_accessor_builder( config: EncodingProperties, -) -> ConnectorResult { +) -> ConnectorResult<(AccessBuilderImpl, String)> { match config { - EncodingProperties::Json(json_config) => Ok(AccessBuilderImpl::Json( - JsonAccessBuilder::new_for_dynamodb(json_config)?, - )), + EncodingProperties::Json(json_config) => { + assert!(json_config.single_blob_column.is_some()); + let single_blob_column = json_config.single_blob_column.clone().unwrap(); + Ok(( + AccessBuilderImpl::Json(JsonAccessBuilder::new_for_dynamodb(json_config)?), + single_blob_column, + )) + } _ => bail!("unsupported encoding for Dynamodb"), } } diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index e5bc7291ccf07..becfef388fe4c 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -38,6 +38,7 @@ mod tests { encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, + single_blob_column: None, }), protocol_config: ProtocolProperties::Maxwell, }; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index a899c9be7b1be..443f28cfb659b 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -1080,6 +1080,7 @@ impl SpecificParserConfig { encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, + single_blob_column: None, }), protocol_config: ProtocolProperties::Plain, }; @@ -1122,6 +1123,7 @@ pub struct CsvProperties { pub struct JsonProperties { pub use_schema_registry: bool, pub timestamptz_handling: Option, + pub single_blob_column: Option, } #[derive(Debug, Default, Clone)] @@ -1277,20 +1279,30 @@ impl SpecificParserConfig { | SourceFormat::Debezium | SourceFormat::Maxwell | SourceFormat::Canal - | SourceFormat::Upsert - | SourceFormat::Dynamodb - | SourceFormat::DynamodbCdc, + | SourceFormat::Upsert, SourceEncode::Json, ) => EncodingProperties::Json(JsonProperties { use_schema_registry: info.use_schema_registry, timestamptz_handling: TimestamptzHandling::from_options( &info.format_encode_options, )?, + single_blob_column: None, }), + + (SourceFormat::Dynamodb | SourceFormat::DynamodbCdc, SourceEncode::Json) => { + EncodingProperties::Json(JsonProperties { + use_schema_registry: info.use_schema_registry, + timestamptz_handling: TimestamptzHandling::from_options( + &info.format_encode_options, + )?, + single_blob_column: Some(info.json_single_blob_column.clone()), + }) + } (SourceFormat::DebeziumMongo, SourceEncode::Json) => { EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, + single_blob_column: None, }) } (SourceFormat::Plain, SourceEncode::Bytes) => { diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index 600efda2f6255..07af655229834 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -277,6 +277,7 @@ mod tests { encoding_config: EncodingProperties::Json(crate::parser::JsonProperties { use_schema_registry: false, timestamptz_handling: None, + single_blob_column: None, }), }, data_types, diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index dd7815d9a4268..93d12ae91989b 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -302,6 +302,7 @@ pub(crate) async fn bind_columns_from_source( const MESSAGE_NAME_KEY: &str = "message"; const KEY_MESSAGE_NAME_KEY: &str = "key.message"; const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; + const JSON_SINGLE_BLOB_COLUMN_KEY: &str = "single_blob_column"; let is_kafka: bool = with_properties.is_kafka_connector(); let format_encode_options = WithOptions::try_from(source_schema.row_options())?.into_inner(); @@ -468,6 +469,13 @@ pub(crate) async fn bind_columns_from_source( ); } + if let Some(ast_string) = try_consume_string_from_options( + &mut format_encode_options_to_consume, + JSON_SINGLE_BLOB_COLUMN_KEY, + ) { + stream_source_info.json_single_blob_column = ast_string.0; + } + let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?; stream_source_info.use_schema_registry = json_schema_infer_use_schema_registry(&schema_config); @@ -847,18 +855,7 @@ pub(crate) async fn bind_source_pk( additional_column_names )))); } - if !sql_defined_pk { - return Err(RwError::from(ProtocolError( - "Primary key must be specified when creating source with FORMAT DYNAMODB." - .to_string(), - ))); - } - if sql_defined_pk_names.len() < 2 { - return Err(RwError::from(ProtocolError( - "Primary key must include at least two columns when creating source with FORMAT DYNAMODB." - .to_string(), - ))); - } + validate_dynamodb_source(source_info, columns, &sql_defined_pk_names, "DYNAMODB")?; sql_defined_pk_names } (Format::DynamodbCdc, Encode::Json) => { @@ -868,18 +865,7 @@ pub(crate) async fn bind_source_pk( additional_column_names )))); } - if !sql_defined_pk { - return Err(RwError::from(ProtocolError( - "Primary key must be specified when creating source with FORMAT DYNAMODB_CDC." - .to_string(), - ))); - } - if sql_defined_pk_names.len() < 2 { - return Err(RwError::from(ProtocolError( - "Primary key must include at least two columns when creating source with FORMAT DYNAMODB_CDC." - .to_string(), - ))); - } + validate_dynamodb_source(source_info, columns, &sql_defined_pk_names, "DYNAMODB_CDC")?; sql_defined_pk_names } (Format::Debezium, Encode::Avro) => { @@ -1265,6 +1251,52 @@ pub(super) fn check_nexmark_schema( Ok(()) } +fn validate_dynamodb_source( + source_info: &StreamSourceInfo, + columns: &mut [ColumnCatalog], + sql_defined_pk_names: &Vec, + format_string: &str, +) -> Result<()> { + if sql_defined_pk_names.is_empty() { + return Err(RwError::from(ProtocolError(format!( + "Primary key must be specified when creating source with FORMAT {}.", + format_string + )))); + } + if source_info.json_single_blob_column.is_empty() { + return Err(RwError::from(ProtocolError(format!( + "Single blob column must be specified when creating source with FORMAT {}.", + format_string + )))); + } + if sql_defined_pk_names.len() + 1 != columns.len() { + return Err(RwError::from(ProtocolError( + format!("Primary key must include all columns except single blob column when creating source with FORMAT {}.", format_string + ), + ))); + } + let single_blob_columns = columns + .iter() + .filter_map(|col| { + if sql_defined_pk_names.contains(&col.column_desc.name) { + None + } else { + Some((col.name().to_string(), col.data_type().clone())) + } + }) + .collect_vec(); + if single_blob_columns.len() != 1 + || single_blob_columns[0].0 != source_info.json_single_blob_column + || single_blob_columns[0].1 != DataType::Jsonb + { + return Err(RwError::from(ProtocolError( + format!("Single blob column must be a jsonb column and not a part of primary keys when creating source with FORMAT {}.", format_string + ), + ))); + } + Ok(()) +} + pub async fn extract_iceberg_columns( with_properties: &BTreeMap, ) -> anyhow::Result> { diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 59686f4bb8fdd..d6dcbc3f9c258 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -689,6 +689,7 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ encoding_config: EncodingProperties::Json(JsonProperties { use_schema_registry: false, timestamptz_handling: None, + single_blob_column: None, }), // the cdc message is generated internally so the key must exist. protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),