Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support mysql_query for mysql batch ingestion #19071

Merged
merged 48 commits into from
Nov 1, 2024
Merged
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
250c4aa
add mysql_query
kwannoel Oct 22, 2024
82935fd
fmt
kwannoel Oct 23, 2024
12ce8f0
use mysql_async in binder
kwannoel Oct 23, 2024
63202d9
add mysql table type
kwannoel Oct 24, 2024
db4062b
bind types
kwannoel Oct 24, 2024
a8d3efc
add optimizer rule
kwannoel Oct 24, 2024
8f6e323
minor
kwannoel Oct 25, 2024
546370e
add error context
kwannoel Oct 25, 2024
8258dff
add mysql plan nodes + batch executor skeleton
kwannoel Oct 25, 2024
2486af4
link rule to logical plan node
kwannoel Oct 25, 2024
23cd0ee
instantiate connection in mysql executor
kwannoel Oct 25, 2024
c3715b6
handle MySql serde
kwannoel Oct 28, 2024
8c68e2f
introduce macro
kwannoel Oct 28, 2024
ee9f146
convert all mysql types to rw types
kwannoel Oct 28, 2024
f5925d5
add e2e mysql_query slt test
kwannoel Oct 28, 2024
fb18cac
fix warn
kwannoel Oct 28, 2024
86d8d45
rename db
kwannoel Oct 28, 2024
e1002df
missing comma?
kwannoel Oct 28, 2024
07aa7a6
add varchar len
kwannoel Oct 28, 2024
90085a8
fix
kwannoel Oct 29, 2024
522ad34
fix
kwannoel Oct 29, 2024
b50c38c
fix
kwannoel Oct 29, 2024
406f3b0
fix
kwannoel Oct 29, 2024
03bf907
add TableFunctionToMySqlQueryRule
kwannoel Oct 29, 2024
e420c8c
fix
kwannoel Oct 29, 2024
d3923af
handle more types
kwannoel Oct 29, 2024
90958d5
try again
kwannoel Oct 29, 2024
ea78a3d
add more context to error
kwannoel Oct 29, 2024
2e4c15c
refine
kwannoel Oct 29, 2024
e909dd4
try fix
kwannoel Oct 29, 2024
30908a0
fix test
kwannoel Oct 30, 2024
2606251
explicitly bind unsupported types
kwannoel Oct 30, 2024
20b224f
support more types
kwannoel Oct 30, 2024
adcf79b
extract common logic
kwannoel Oct 30, 2024
90f7d71
reuse common parsing logic
kwannoel Oct 30, 2024
48503b0
add risedev profile for local inline tests
kwannoel Oct 30, 2024
38d447c
make handling of tinyint straightforward
kwannoel Oct 30, 2024
f3ec0ff
handle more types
kwannoel Oct 30, 2024
02d983d
test all types
kwannoel Oct 30, 2024
e7d6070
test jsonb
kwannoel Oct 30, 2024
5f31f3d
add chrono feature
kwannoel Oct 30, 2024
bbbca68
test null
kwannoel Oct 30, 2024
5b54d1b
cleanup docs
kwannoel Oct 30, 2024
db7d3bd
fix source test
kwannoel Oct 30, 2024
aff5786
use chunk_size instead of magic value
kwannoel Nov 1, 2024
0aa4fba
safely parse port
kwannoel Nov 1, 2024
f389b4d
fix typo + handling parse error
kwannoel Nov 1, 2024
e84ce45
move conversion to connector
kwannoel Nov 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
extract common logic
kwannoel committed Nov 1, 2024
commit adcf79b25b631ec9f0c09c1cce936a56ec5ca85c
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@ itoa = "1.0"
jsonbb = { workspace = true }
lru = { workspace = true }
memcomparable = { version = "0.2", features = ["decimal"] }
mysql_async = { workspace = true }
num-integer = "0.1"
num-traits = "0.2"
number_prefix = "0.4.0"
2 changes: 2 additions & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -80,6 +80,7 @@ pub use {
risingwave_license as license,
};
pub mod lru;
pub mod mysql;
pub mod opts;
pub mod range;
pub mod row;
@@ -91,6 +92,7 @@ pub mod test_utils;
pub mod transaction;
pub mod types;
pub mod vnode_mapping;

pub mod test_prelude {
pub use super::array::{DataChunkTestExt, StreamChunkTestExt};
pub use super::catalog::test_utils::ColumnDescTestExt;
164 changes: 164 additions & 0 deletions src/common/src/mysql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use chrono::NaiveDate;
use mysql_async::Row as MysqlRow;
use risingwave_common::types::{
DataType, Date, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
use rust_decimal::Decimal as RustDecimal;

use crate::types::Datum;

macro_rules! handle_data_type {
($row:expr, $i:expr, $name:expr, $type:ty) => {{
let datum_opt = $row.take_opt::<Option<$type>, _>($i);
datum_opt.map(|res| match res {
Ok(val) => Ok(val.map(|v| ScalarImpl::from(v))),
Err(e) => Err(anyhow::Error::new(e.clone())
.context("failed to deserialize MySQL value into rust value")
.context(format!(
"column: {}, index: {}, rust_type: {}",
$name,
$i,
stringify!($ty),
))),
})
}};
($row:expr, $i:expr, $name:expr, $type:ty, $rw_type:ty) => {{
let datum_opt = $row.take_opt::<Option<$type>, _>($i);
datum_opt.map(|res| match res {
Ok(val) => Ok(val.map(|v| ScalarImpl::from(<$rw_type>::from(v)))),
Err(e) => Err(anyhow::Error::new(e.clone())
.context("failed to deserialize MySQL value into rust value")
.context(format!(
"column: {}, index: {}, rust_type: {}",
$name,
$i,
stringify!($ty),
))),
})
}};
}

/// The decoding result can be interpreted as follows:
/// None => No value was found in the column.
/// Some(Ok(value)) => The value was found and successfully decoded.
/// Some(Err(error)) => The value was found but could not be decoded,
/// either because it was not supported,
/// or there was an error during conversion.
/// This error is wrapped via anyhow, so it is opaque.
pub fn mysql_datum_to_rw_datum(
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
mysql_row: &mut MysqlRow,
mysql_datum_index: usize,
column_name: &str,
rw_data_type: DataType,
) -> Option<Result<Datum, anyhow::Error>> {
match rw_data_type {
DataType::Boolean => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, bool)
}
DataType::Int16 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, i16)
}
DataType::Int32 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, i32)
}
DataType::Int64 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, i64)
}
DataType::Float32 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, f32)
}
DataType::Float64 => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, f64)
}
DataType::Decimal => {
handle_data_type!(
mysql_row,
mysql_datum_index,
column_name,
RustDecimal,
Decimal
)
}
DataType::Varchar => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, String)
}
DataType::Date => {
handle_data_type!(mysql_row, mysql_datum_index, column_name, NaiveDate, Date)
}
DataType::Time => {
handle_data_type!(
mysql_row,
mysql_datum_index,
column_name,
chrono::NaiveTime,
Time
)
}
DataType::Timestamp => {
handle_data_type!(
mysql_row,
mysql_datum_index,
column_name,
chrono::NaiveDateTime,
Timestamp
)
}
DataType::Timestamptz => mysql_row
.take_opt::<Option<chrono::NaiveDateTime>, _>(mysql_datum_index)
.map(|res| match res {
Ok(val) => Ok(val.map(|v| {
ScalarImpl::from(Timestamptz::from_micros(v.and_utc().timestamp_micros()))
})),
Err(err) => Err(anyhow::Error::new(err.clone())
.context("failed to deserialize MySQL value into rust value")
.context(format!(
"column: {}, index: {}, rust_type: chrono::NaiveDateTime",
column_name, mysql_datum_index,
))),
}),
DataType::Bytea => mysql_row
.take_opt::<Option<Vec<u8>>, _>(mysql_datum_index)
.map(|res| match res {
Ok(val) => Ok(val.map(ScalarImpl::from)),
Err(err) => Err(anyhow::Error::new(err.clone())
.context("failed to deserialize MySQL value into rust value")
.context(format!(
"column: {}, index: {}, rust_type: Vec<u8>",
column_name, mysql_datum_index,
))),
}),
DataType::Jsonb => {
handle_data_type!(
mysql_row,
mysql_datum_index,
column_name,
serde_json::Value,
JsonbVal
)
}
DataType::Interval
| DataType::Struct(_)
| DataType::List(_)
| DataType::Int256
| DataType::Serial
| DataType::Map(_) => Some(Err(anyhow!(
"unsupported data type: {}, set to null",
rw_data_type
))),
}
}