From 7761e9a88e1f8bf7925394d37a1e14eaf467f4fa Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 1 Nov 2024 10:18:49 +0800 Subject: [PATCH] move conversion to connector --- Cargo.lock | 2 - src/batch/src/executor/mysql_query.rs | 2 +- src/common/Cargo.toml | 4 - src/common/src/lib.rs | 1 - src/common/src/mysql.rs | 191 -------------------------- src/connector/src/parser/mod.rs | 2 +- src/connector/src/parser/mysql.rs | 176 +++++++++++++++++++++++- 7 files changed, 177 insertions(+), 201 deletions(-) delete mode 100644 src/common/src/mysql.rs diff --git a/Cargo.lock b/Cargo.lock index c131441badd78..bcd371c54a448 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10632,8 +10632,6 @@ dependencies = [ "madsim-tokio", "memcomparable", "more-asserts", - "mysql_async", - "mysql_common", "num-integer", "num-traits", "number_prefix", diff --git a/src/batch/src/executor/mysql_query.rs b/src/batch/src/executor/mysql_query.rs index 35c55eaf59497..721c9c5e55bf1 100644 --- a/src/batch/src/executor/mysql_query.rs +++ b/src/batch/src/executor/mysql_query.rs @@ -18,9 +18,9 @@ use futures_util::stream::StreamExt; use mysql_async; use mysql_async::prelude::*; use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::mysql::mysql_datum_to_rw_datum; use risingwave_common::row::OwnedRow; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_connector::parser::mysql_datum_to_rw_datum; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, BatchExternalSystemError}; diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 7facfde900cc3..8b341fb621fe6 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -59,10 +59,6 @@ itoa = "1.0" jsonbb = { workspace = true } lru = { workspace = true } memcomparable = { version = "0.2", features = ["decimal"] } -mysql_async = { workspace = true } -mysql_common = { version = "0.32", default-features = false, features = [ - "chrono", -] } num-integer = "0.1" num-traits = "0.2" number_prefix = "0.4.0" diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 9c8374060df9a..a62b7314d9bda 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -82,7 +82,6 @@ pub use { risingwave_license as license, }; pub mod lru; -pub mod mysql; pub mod opts; pub mod range; pub mod row; diff --git a/src/common/src/mysql.rs b/src/common/src/mysql.rs deleted file mode 100644 index c3ed3d9903764..0000000000000 --- a/src/common/src/mysql.rs +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use anyhow::anyhow; -use chrono::NaiveDate; -use mysql_async::Row as MysqlRow; -use risingwave_common::types::{ - DataType, Date, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, -}; -use rust_decimal::Decimal as RustDecimal; - -use crate::types::Datum; - -macro_rules! handle_data_type { - ($row:expr, $i:expr, $name:expr, $type:ty) => {{ - match $row.take_opt::, _>($i) { - None => bail!("no value found at column: {}, index: {}", $name, $i), - Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(v))), - Some(Err(e)) => Err(anyhow::Error::new(e.clone()) - .context("failed to deserialize MySQL value into rust value") - .context(format!( - "column: {}, index: {}, rust_type: {}", - $name, - $i, - stringify!($type), - ))), - } - }}; - ($row:expr, $i:expr, $name:expr, $type:ty, $rw_type:ty) => {{ - match $row.take_opt::, _>($i) { - None => bail!("no value found at column: {}, index: {}", $name, $i), - Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(<$rw_type>::from(v)))), - Some(Err(e)) => Err(anyhow::Error::new(e.clone()) - .context("failed to deserialize MySQL value into rust value") - .context(format!( - "column: {}, index: {}, rust_type: {}", - $name, - $i, - stringify!($ty), - ))), - } - }}; -} - -/// The decoding result can be interpreted as follows: -/// Ok(value) => The value was found and successfully decoded. -/// Err(error) => The value was found but could not be decoded, -/// either because it was not supported, -/// or there was an error during conversion. -pub fn mysql_datum_to_rw_datum( - mysql_row: &mut MysqlRow, - mysql_datum_index: usize, - column_name: &str, - rw_data_type: &DataType, -) -> Result { - match rw_data_type { - DataType::Boolean => { - // Bit(1) - match mysql_row.take_opt::>, _>(mysql_datum_index) { - None => bail!( - "no value found at column: {}, index: {}", - column_name, - mysql_datum_index - ), - Some(Ok(val)) => match val { - None => Ok(None), - Some(val) => match val.as_slice() { - [0] => Ok(Some(ScalarImpl::from(false))), - [1] => Ok(Some(ScalarImpl::from(true))), - _ => Err(anyhow!("invalid value for boolean: {:?}", val)), - }, - }, - Some(Err(e)) => Err(anyhow::Error::new(e.clone()) - .context("failed to deserialize MySQL value into rust value") - .context(format!( - "column: {}, index: {}, rust_type: Vec", - column_name, mysql_datum_index, - ))), - } - } - DataType::Int16 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i16) - } - DataType::Int32 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i32) - } - DataType::Int64 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, i64) - } - DataType::Float32 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, f32) - } - DataType::Float64 => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, f64) - } - DataType::Decimal => { - handle_data_type!( - mysql_row, - mysql_datum_index, - column_name, - RustDecimal, - Decimal - ) - } - DataType::Varchar => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, String) - } - DataType::Date => { - handle_data_type!(mysql_row, mysql_datum_index, column_name, NaiveDate, Date) - } - DataType::Time => { - handle_data_type!( - mysql_row, - mysql_datum_index, - column_name, - chrono::NaiveTime, - Time - ) - } - DataType::Timestamp => { - handle_data_type!( - mysql_row, - mysql_datum_index, - column_name, - chrono::NaiveDateTime, - Timestamp - ) - } - DataType::Timestamptz => { - match mysql_row.take_opt::, _>(mysql_datum_index) { - None => bail!( - "no value found at column: {}, index: {}", - column_name, - mysql_datum_index - ), - Some(Ok(val)) => Ok(val.map(|v| { - ScalarImpl::from(Timestamptz::from_micros(v.and_utc().timestamp_micros())) - })), - Some(Err(err)) => Err(anyhow::Error::new(err.clone()) - .context("failed to deserialize MySQL value into rust value") - .context(format!( - "column: {}, index: {}, rust_type: chrono::NaiveDateTime", - column_name, mysql_datum_index, - ))), - } - } - DataType::Bytea => match mysql_row.take_opt::>, _>(mysql_datum_index) { - None => bail!( - "no value found at column: {}, index: {}", - column_name, - mysql_datum_index - ), - Some(Ok(val)) => Ok(val.map(ScalarImpl::from)), - Some(Err(err)) => Err(anyhow::Error::new(err.clone()) - .context("failed to deserialize MySQL value into rust value") - .context(format!( - "column: {}, index: {}, rust_type: Vec", - column_name, mysql_datum_index, - ))), - }, - DataType::Jsonb => { - handle_data_type!( - mysql_row, - mysql_datum_index, - column_name, - serde_json::Value, - JsonbVal - ) - } - DataType::Interval - | DataType::Struct(_) - | DataType::List(_) - | DataType::Int256 - | DataType::Serial - | DataType::Map(_) => Err(anyhow!( - "unsupported data type: {}, set to null", - rw_data_type - )), - } -} diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 2142914aa2503..d0a0ca9973af9 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -44,7 +44,7 @@ use thiserror_ext::AsReport; use self::avro::AvroAccessBuilder; use self::bytes_parser::BytesAccessBuilder; -pub use self::mysql::mysql_row_to_owned_row; +pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row}; use self::plain_parser::PlainParser; pub use self::postgres::postgres_row_to_owned_row; use self::simd_json_parser::DebeziumJsonAccessBuilder; diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index 6aabe9d3ccafe..e9a8eeba70cb3 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -23,7 +23,181 @@ use thiserror_ext::AsReport; use crate::parser::util::log_error; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); -use risingwave_common::mysql::mysql_datum_to_rw_datum; +use anyhow::anyhow; +use chrono::NaiveDate; +use risingwave_common::bail; +use risingwave_common::types::{ + DataType, Date, Datum, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, +}; +use rust_decimal::Decimal as RustDecimal; + +macro_rules! handle_data_type { + ($row:expr, $i:expr, $name:expr, $type:ty) => {{ + match $row.take_opt::, _>($i) { + None => bail!("no value found at column: {}, index: {}", $name, $i), + Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(v))), + Some(Err(e)) => Err(anyhow::Error::new(e.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: {}", + $name, + $i, + stringify!($type), + ))), + } + }}; + ($row:expr, $i:expr, $name:expr, $type:ty, $rw_type:ty) => {{ + match $row.take_opt::, _>($i) { + None => bail!("no value found at column: {}, index: {}", $name, $i), + Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(<$rw_type>::from(v)))), + Some(Err(e)) => Err(anyhow::Error::new(e.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: {}", + $name, + $i, + stringify!($ty), + ))), + } + }}; +} + +/// The decoding result can be interpreted as follows: +/// Ok(value) => The value was found and successfully decoded. +/// Err(error) => The value was found but could not be decoded, +/// either because it was not supported, +/// or there was an error during conversion. +pub fn mysql_datum_to_rw_datum( + mysql_row: &mut MysqlRow, + mysql_datum_index: usize, + column_name: &str, + rw_data_type: &DataType, +) -> Result { + match rw_data_type { + DataType::Boolean => { + // Bit(1) + match mysql_row.take_opt::>, _>(mysql_datum_index) { + None => bail!( + "no value found at column: {}, index: {}", + column_name, + mysql_datum_index + ), + Some(Ok(val)) => match val { + None => Ok(None), + Some(val) => match val.as_slice() { + [0] => Ok(Some(ScalarImpl::from(false))), + [1] => Ok(Some(ScalarImpl::from(true))), + _ => Err(anyhow!("invalid value for boolean: {:?}", val)), + }, + }, + Some(Err(e)) => Err(anyhow::Error::new(e.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: Vec", + column_name, mysql_datum_index, + ))), + } + } + DataType::Int16 => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, i16) + } + DataType::Int32 => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, i32) + } + DataType::Int64 => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, i64) + } + DataType::Float32 => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, f32) + } + DataType::Float64 => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, f64) + } + DataType::Decimal => { + handle_data_type!( + mysql_row, + mysql_datum_index, + column_name, + RustDecimal, + Decimal + ) + } + DataType::Varchar => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, String) + } + DataType::Date => { + handle_data_type!(mysql_row, mysql_datum_index, column_name, NaiveDate, Date) + } + DataType::Time => { + handle_data_type!( + mysql_row, + mysql_datum_index, + column_name, + chrono::NaiveTime, + Time + ) + } + DataType::Timestamp => { + handle_data_type!( + mysql_row, + mysql_datum_index, + column_name, + chrono::NaiveDateTime, + Timestamp + ) + } + DataType::Timestamptz => { + match mysql_row.take_opt::, _>(mysql_datum_index) { + None => bail!( + "no value found at column: {}, index: {}", + column_name, + mysql_datum_index + ), + Some(Ok(val)) => Ok(val.map(|v| { + ScalarImpl::from(Timestamptz::from_micros(v.and_utc().timestamp_micros())) + })), + Some(Err(err)) => Err(anyhow::Error::new(err.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: chrono::NaiveDateTime", + column_name, mysql_datum_index, + ))), + } + } + DataType::Bytea => match mysql_row.take_opt::>, _>(mysql_datum_index) { + None => bail!( + "no value found at column: {}, index: {}", + column_name, + mysql_datum_index + ), + Some(Ok(val)) => Ok(val.map(ScalarImpl::from)), + Some(Err(err)) => Err(anyhow::Error::new(err.clone()) + .context("failed to deserialize MySQL value into rust value") + .context(format!( + "column: {}, index: {}, rust_type: Vec", + column_name, mysql_datum_index, + ))), + }, + DataType::Jsonb => { + handle_data_type!( + mysql_row, + mysql_datum_index, + column_name, + serde_json::Value, + JsonbVal + ) + } + DataType::Interval + | DataType::Struct(_) + | DataType::List(_) + | DataType::Int256 + | DataType::Serial + | DataType::Map(_) => Err(anyhow!( + "unsupported data type: {}, set to null", + rw_data_type + )), + } +} pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> OwnedRow { let mut datums = vec![];