Skip to content

Commit

Permalink
update grammar
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Jul 11, 2024
1 parent 5e785a1 commit e5ee68b
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 51 deletions.
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ message StreamSourceInfo {
// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type.
// For format and encode options.
map<string, secret.SecretRef> format_encode_secret_refs = 16;
string json_single_blob_column = 17;
}

message Source {
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl DebeziumParser {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_blob_column: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down Expand Up @@ -226,6 +227,7 @@ mod tests {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_blob_column: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down Expand Up @@ -298,6 +300,7 @@ mod tests {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_blob_column: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ mod tests {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_blob_column: None,
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
Expand Down
7 changes: 5 additions & 2 deletions src/connector/src/parser/dynamodb/cdc_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct DynamodbCdcJsonParser {
payload_builder: AccessBuilderImpl,
pub(crate) rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
single_blob_column: String,
}

impl DynamodbCdcJsonParser {
Expand All @@ -36,11 +37,13 @@ impl DynamodbCdcJsonParser {
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
// the key of Dynamodb CDC are embedded value of primary key and partition key, which is not used here.
let payload_builder = build_dynamodb_json_accessor_builder(props.encoding_config).await?;
let (payload_builder, single_blob_column) =
build_dynamodb_json_accessor_builder(props.encoding_config).await?;
Ok(Self {
payload_builder,
rw_columns,
source_ctx,
single_blob_column,
})
}

Expand All @@ -50,7 +53,7 @@ impl DynamodbCdcJsonParser {
mut writer: SourceStreamChunkRowWriter<'_>,
) -> ConnectorResult<()> {
let payload_accessor = self.payload_builder.generate_accessor(payload).await?;
let row_op = DynamodbChangeEvent::new(payload_accessor);
let row_op = DynamodbChangeEvent::new(payload_accessor, self.single_blob_column.clone());
match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) {
Ok(_) => Ok(()),
Err(err) => Err(err)?,
Expand Down
41 changes: 29 additions & 12 deletions src/connector/src/parser/dynamodb/change_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use assert_matches::assert_matches;
use risingwave_common::types::{DataType, DatumCow, ScalarRefImpl, ToDatumRef};

use crate::parser::dynamodb::map_rw_type_to_dynamodb_type;
Expand Down Expand Up @@ -85,6 +86,7 @@ use crate::source::SourceColumnDesc;
// }
pub struct DynamodbChangeEvent<A> {
value_accessor: A,
single_blob_column: String,
}

const OLD_IMAGE: &str = "OldImage";
Expand All @@ -101,8 +103,11 @@ impl<A> DynamodbChangeEvent<A>
where
A: Access,
{
pub fn new(value_accessor: A) -> Self {
Self { value_accessor }
pub fn new(value_accessor: A, single_blob_column: String) -> Self {
Self {
value_accessor,
single_blob_column,
}
}
}

Expand All @@ -111,16 +116,28 @@ where
A: Access,
{
fn access_field(&self, desc: &SourceColumnDesc) -> crate::parser::AccessResult<DatumCow<'_>> {
let dynamodb_type = map_rw_type_to_dynamodb_type(&desc.data_type)?;
match self.op()? {
ChangeEventOperation::Delete => self.value_accessor.access(
&[DYNAMODB, OLD_IMAGE, &desc.name, dynamodb_type.as_str()],
&desc.data_type,
),
ChangeEventOperation::Upsert => self.value_accessor.access(
&[DYNAMODB, NEW_IMAGE, &desc.name, dynamodb_type.as_str()],
&desc.data_type,
),
if desc.name == self.single_blob_column {
assert_matches!(desc.data_type, DataType::Jsonb);
match self.op()? {
ChangeEventOperation::Delete => self
.value_accessor
.access(&[DYNAMODB, OLD_IMAGE], &desc.data_type),
ChangeEventOperation::Upsert => self
.value_accessor
.access(&[DYNAMODB, NEW_IMAGE], &desc.data_type),
}
} else {
let dynamodb_type = map_rw_type_to_dynamodb_type(&desc.data_type)?;
match self.op()? {
ChangeEventOperation::Delete => self.value_accessor.access(
&[DYNAMODB, OLD_IMAGE, &desc.name, dynamodb_type.as_str()],
&desc.data_type,
),
ChangeEventOperation::Upsert => self.value_accessor.access(
&[DYNAMODB, NEW_IMAGE, &desc.name, dynamodb_type.as_str()],
&desc.data_type,
),
}
}
}

Expand Down
22 changes: 16 additions & 6 deletions src/connector/src/parser/dynamodb/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use assert_matches::assert_matches;
use risingwave_common::types::DataType;
use risingwave_connector_codec::decoder::Access;

use crate::error::ConnectorResult;
Expand Down Expand Up @@ -53,6 +55,7 @@ pub struct DynamodbJsonParser {
payload_builder: AccessBuilderImpl,
pub(crate) rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
single_blob_column: String,
}

impl DynamodbJsonParser {
Expand All @@ -61,11 +64,13 @@ impl DynamodbJsonParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> ConnectorResult<Self> {
let payload_builder = build_dynamodb_json_accessor_builder(props.encoding_config).await?;
let (payload_builder, single_blob_column) =
build_dynamodb_json_accessor_builder(props.encoding_config).await?;
Ok(Self {
payload_builder,
rw_columns,
source_ctx,
single_blob_column,
})
}

Expand All @@ -76,11 +81,16 @@ impl DynamodbJsonParser {
) -> ConnectorResult<()> {
let payload_accessor = self.payload_builder.generate_accessor(payload).await?;
writer.do_insert(|column| {
let dynamodb_type = map_rw_type_to_dynamodb_type(&column.data_type)?;
payload_accessor.access(
&[ITEM, &column.name, dynamodb_type.as_str()],
&column.data_type,
)
if column.name == self.single_blob_column {
assert_matches!(column.data_type, DataType::Jsonb);
payload_accessor.access(&[ITEM], &column.data_type)
} else {
let dynamodb_type = map_rw_type_to_dynamodb_type(&column.data_type)?;
payload_accessor.access(
&[ITEM, &column.name, dynamodb_type.as_str()],
&column.data_type,
)
}
})?;
Ok(())
}
Expand Down
13 changes: 9 additions & 4 deletions src/connector/src/parser/dynamodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ use crate::parser::{AccessBuilderImpl, EncodingProperties, JsonAccessBuilder};

pub(crate) async fn build_dynamodb_json_accessor_builder(
config: EncodingProperties,
) -> ConnectorResult<AccessBuilderImpl> {
) -> ConnectorResult<(AccessBuilderImpl, String)> {
match config {
EncodingProperties::Json(json_config) => Ok(AccessBuilderImpl::Json(
JsonAccessBuilder::new_for_dynamodb(json_config)?,
)),
EncodingProperties::Json(json_config) => {
assert!(json_config.single_blob_column.is_some());
let single_blob_column = json_config.single_blob_column.clone().unwrap();
Ok((
AccessBuilderImpl::Json(JsonAccessBuilder::new_for_dynamodb(json_config)?),
single_blob_column,
))
}
_ => bail!("unsupported encoding for Dynamodb"),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/maxwell/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod tests {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_blob_column: None,
}),
protocol_config: ProtocolProperties::Maxwell,
};
Expand Down
18 changes: 15 additions & 3 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ impl SpecificParserConfig {
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_blob_column: None,
}),
protocol_config: ProtocolProperties::Plain,
};
Expand Down Expand Up @@ -1122,6 +1123,7 @@ pub struct CsvProperties {
pub struct JsonProperties {
pub use_schema_registry: bool,
pub timestamptz_handling: Option<TimestamptzHandling>,
pub single_blob_column: Option<String>,
}

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -1277,20 +1279,30 @@ impl SpecificParserConfig {
| SourceFormat::Debezium
| SourceFormat::Maxwell
| SourceFormat::Canal
| SourceFormat::Upsert
| SourceFormat::Dynamodb
| SourceFormat::DynamodbCdc,
| SourceFormat::Upsert,
SourceEncode::Json,
) => EncodingProperties::Json(JsonProperties {
use_schema_registry: info.use_schema_registry,
timestamptz_handling: TimestamptzHandling::from_options(
&info.format_encode_options,
)?,
single_blob_column: None,
}),

(SourceFormat::Dynamodb | SourceFormat::DynamodbCdc, SourceEncode::Json) => {
EncodingProperties::Json(JsonProperties {
use_schema_registry: info.use_schema_registry,
timestamptz_handling: TimestamptzHandling::from_options(
&info.format_encode_options,
)?,
single_blob_column: Some(info.json_single_blob_column.clone()),
})
}
(SourceFormat::DebeziumMongo, SourceEncode::Json) => {
EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_blob_column: None,
})
}
(SourceFormat::Plain, SourceEncode::Bytes) => {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ mod tests {
encoding_config: EncodingProperties::Json(crate::parser::JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
single_blob_column: None,
}),
},
data_types,
Expand Down
80 changes: 56 additions & 24 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ pub(crate) async fn bind_columns_from_source(
const MESSAGE_NAME_KEY: &str = "message";
const KEY_MESSAGE_NAME_KEY: &str = "key.message";
const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";
const JSON_SINGLE_BLOB_COLUMN_KEY: &str = "single_blob_column";

let is_kafka: bool = with_properties.is_kafka_connector();
let format_encode_options = WithOptions::try_from(source_schema.row_options())?.into_inner();
Expand Down Expand Up @@ -468,6 +469,13 @@ pub(crate) async fn bind_columns_from_source(
);
}

if let Some(ast_string) = try_consume_string_from_options(
&mut format_encode_options_to_consume,
JSON_SINGLE_BLOB_COLUMN_KEY,
) {
stream_source_info.json_single_blob_column = ast_string.0;
}

let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
stream_source_info.use_schema_registry =
json_schema_infer_use_schema_registry(&schema_config);
Expand Down Expand Up @@ -847,18 +855,7 @@ pub(crate) async fn bind_source_pk(
additional_column_names
))));
}
if !sql_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with FORMAT DYNAMODB."
.to_string(),
)));
}
if sql_defined_pk_names.len() < 2 {
return Err(RwError::from(ProtocolError(
"Primary key must include at least two columns when creating source with FORMAT DYNAMODB."
.to_string(),
)));
}
validate_dynamodb_source(source_info, columns, &sql_defined_pk_names, "DYNAMODB")?;
sql_defined_pk_names
}
(Format::DynamodbCdc, Encode::Json) => {
Expand All @@ -868,18 +865,7 @@ pub(crate) async fn bind_source_pk(
additional_column_names
))));
}
if !sql_defined_pk {
return Err(RwError::from(ProtocolError(
"Primary key must be specified when creating source with FORMAT DYNAMODB_CDC."
.to_string(),
)));
}
if sql_defined_pk_names.len() < 2 {
return Err(RwError::from(ProtocolError(
"Primary key must include at least two columns when creating source with FORMAT DYNAMODB_CDC."
.to_string(),
)));
}
validate_dynamodb_source(source_info, columns, &sql_defined_pk_names, "DYNAMODB_CDC")?;
sql_defined_pk_names
}
(Format::Debezium, Encode::Avro) => {
Expand Down Expand Up @@ -1265,6 +1251,52 @@ pub(super) fn check_nexmark_schema(
Ok(())
}

fn validate_dynamodb_source(
source_info: &StreamSourceInfo,
columns: &mut [ColumnCatalog],
sql_defined_pk_names: &Vec<String>,
format_string: &str,
) -> Result<()> {
if sql_defined_pk_names.is_empty() {
return Err(RwError::from(ProtocolError(format!(
"Primary key must be specified when creating source with FORMAT {}.",
format_string
))));
}
if source_info.json_single_blob_column.is_empty() {
return Err(RwError::from(ProtocolError(format!(
"Single blob column must be specified when creating source with FORMAT {}.",
format_string
))));
}
if sql_defined_pk_names.len() + 1 != columns.len() {
return Err(RwError::from(ProtocolError(
format!("Primary key must include all columns except single blob column when creating source with FORMAT {}.", format_string
),
)));
}
let single_blob_columns = columns
.iter()
.filter_map(|col| {
if sql_defined_pk_names.contains(&col.column_desc.name) {
None
} else {
Some((col.name().to_string(), col.data_type().clone()))
}
})
.collect_vec();
if single_blob_columns.len() != 1
|| single_blob_columns[0].0 != source_info.json_single_blob_column
|| single_blob_columns[0].1 != DataType::Jsonb
{
return Err(RwError::from(ProtocolError(
format!("Single blob column must be a jsonb column and not a part of primary keys when creating source with FORMAT {}.", format_string
),
)));
}
Ok(())
}

pub async fn extract_iceberg_columns(
with_properties: &BTreeMap<String, String>,
) -> anyhow::Result<Vec<ColumnCatalog>> {
Expand Down
Loading

0 comments on commit e5ee68b

Please sign in to comment.