Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Jul 29, 2024
1 parent 44f736f commit 73878f4
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/parser/debezium/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ pub struct SchemaChangeEnvelope {

#[derive(Debug)]
pub struct TableSchemaChange {
pub(crate) up_table_full_name: String,
pub(crate) cdc_table_name: String,
pub(crate) columns: Vec<ColumnCatalog>,
}
2 changes: 1 addition & 1 deletion src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub fn mysql_typename_to_rw_type(type_name: &str) -> anyhow::Result<DataType> {
"date" => Ok(DataType::Date),
"time" => Ok(DataType::Time),
"timestamp" => Ok(DataType::Timestamptz),
"datetime" => Ok(DataType::Timestamptz),
"datetime" => Ok(DataType::Timestamp),
"json" => Ok(DataType::Jsonb),
"binary" | "varbinary" | "blob" | "mediumblob" | "longblob" => Ok(DataType::Bytea),
_ => Err(anyhow::anyhow!("unsupported type: {}", type_name)),
Expand Down
36 changes: 13 additions & 23 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ pub fn parse_transaction_meta(
})
}

macro_rules! jsonb_access_field {
($col:expr, $field:expr, $as_type:tt) => {
$crate::paste! {
$col.access_object_field($field).unwrap().[<as_ $as_type>]().unwrap()
}
};
}

pub fn parse_schema_change(
accessor: &impl Access,
connector_props: &ConnectorProperties,
Expand All @@ -153,34 +161,16 @@ pub fn parse_schema_change(
_ => unreachable!(""),
};

let id = jsonb
.access_object_field("id")
.unwrap()
.as_string()
.unwrap();

println!("id: {}", id);
let id = jsonb_access_field!(jsonb, "id", string);
let ty = jsonb_access_field!(jsonb, "type", string);

let mut column_descs: Vec<ColumnDesc> = vec![];
if let Some(table) = jsonb.access_object_field("table")
&& let Some(columns) = table.access_object_field("columns")
{
for col in columns.array_elements().unwrap() {
let name = col
.access_object_field("name")
.unwrap()
.as_string()
.unwrap();
let type_name = col
.access_object_field("typeName")
.unwrap()
.as_string()
.unwrap();
let position = col
.access_object_field("position")
.unwrap()
.as_number()
.unwrap();
let name = jsonb_access_field!(col, "name", string);
let type_name = jsonb_access_field!(col, "typeName", string);

let data_type = match *connector_props {
ConnectorProperties::PostgresCdc(_) => {
Expand All @@ -198,7 +188,7 @@ pub fn parse_schema_change(
}
}
schema_changes.push(TableSchemaChange {
up_table_full_name: id,
cdc_table_name: id.replace('"', ""), // remove the double quotes
columns: column_descs
.into_iter()
.map(|column_desc| ColumnCatalog {
Expand Down

0 comments on commit 73878f4

Please sign in to comment.