From 73878f4b9e87e6c050906fd997cfc79b06cbc213 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 29 Jul 2024 17:03:31 +0800 Subject: [PATCH] refine --- .../src/parser/debezium/schema_change.rs | 2 +- src/connector/src/parser/mysql.rs | 2 +- src/connector/src/parser/unified/debezium.rs | 36 +++++++------------ 3 files changed, 15 insertions(+), 25 deletions(-) diff --git a/src/connector/src/parser/debezium/schema_change.rs b/src/connector/src/parser/debezium/schema_change.rs index 0881cd26cc503..2075745dd110c 100644 --- a/src/connector/src/parser/debezium/schema_change.rs +++ b/src/connector/src/parser/debezium/schema_change.rs @@ -7,6 +7,6 @@ pub struct SchemaChangeEnvelope { #[derive(Debug)] pub struct TableSchemaChange { - pub(crate) up_table_full_name: String, + pub(crate) cdc_table_name: String, pub(crate) columns: Vec, } diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index f27c7edebb5e8..453ab413f88e0 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -153,7 +153,7 @@ pub fn mysql_typename_to_rw_type(type_name: &str) -> anyhow::Result { "date" => Ok(DataType::Date), "time" => Ok(DataType::Time), "timestamp" => Ok(DataType::Timestamptz), - "datetime" => Ok(DataType::Timestamptz), + "datetime" => Ok(DataType::Timestamp), "json" => Ok(DataType::Jsonb), "binary" | "varbinary" | "blob" | "mediumblob" | "longblob" => Ok(DataType::Bytea), _ => Err(anyhow::anyhow!("unsupported type: {}", type_name)), diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index af74a80fc13ab..2c036ff724edd 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -137,6 +137,14 @@ pub fn parse_transaction_meta( }) } +macro_rules! jsonb_access_field { + ($col:expr, $field:expr, $as_type:tt) => { + $crate::paste! { + $col.access_object_field($field).unwrap().[]().unwrap() + } + }; +} + pub fn parse_schema_change( accessor: &impl Access, connector_props: &ConnectorProperties, @@ -153,34 +161,16 @@ pub fn parse_schema_change( _ => unreachable!(""), }; - let id = jsonb - .access_object_field("id") - .unwrap() - .as_string() - .unwrap(); - - println!("id: {}", id); + let id = jsonb_access_field!(jsonb, "id", string); + let ty = jsonb_access_field!(jsonb, "type", string); let mut column_descs: Vec = vec![]; if let Some(table) = jsonb.access_object_field("table") && let Some(columns) = table.access_object_field("columns") { for col in columns.array_elements().unwrap() { - let name = col - .access_object_field("name") - .unwrap() - .as_string() - .unwrap(); - let type_name = col - .access_object_field("typeName") - .unwrap() - .as_string() - .unwrap(); - let position = col - .access_object_field("position") - .unwrap() - .as_number() - .unwrap(); + let name = jsonb_access_field!(col, "name", string); + let type_name = jsonb_access_field!(col, "typeName", string); let data_type = match *connector_props { ConnectorProperties::PostgresCdc(_) => { @@ -198,7 +188,7 @@ pub fn parse_schema_change( } } schema_changes.push(TableSchemaChange { - up_table_full_name: id, + cdc_table_name: id.replace('"', ""), // remove the double quotes columns: column_descs .into_iter() .map(|column_desc| ColumnCatalog {