Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support mysql_query for mysql batch ingestion #19071

Merged
merged 48 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
250c4aa
add mysql_query
kwannoel Oct 22, 2024
82935fd
fmt
kwannoel Oct 23, 2024
12ce8f0
use mysql_async in binder
kwannoel Oct 23, 2024
63202d9
add mysql table type
kwannoel Oct 24, 2024
db4062b
bind types
kwannoel Oct 24, 2024
a8d3efc
add optimizer rule
kwannoel Oct 24, 2024
8f6e323
minor
kwannoel Oct 25, 2024
546370e
add error context
kwannoel Oct 25, 2024
8258dff
add mysql plan nodes + batch executor skeleton
kwannoel Oct 25, 2024
2486af4
link rule to logical plan node
kwannoel Oct 25, 2024
23cd0ee
instantiate connection in mysql executor
kwannoel Oct 25, 2024
c3715b6
handle MySql serde
kwannoel Oct 28, 2024
8c68e2f
introduce macro
kwannoel Oct 28, 2024
ee9f146
convert all mysql types to rw types
kwannoel Oct 28, 2024
f5925d5
add e2e mysql_query slt test
kwannoel Oct 28, 2024
fb18cac
fix warn
kwannoel Oct 28, 2024
86d8d45
rename db
kwannoel Oct 28, 2024
e1002df
missing comma?
kwannoel Oct 28, 2024
07aa7a6
add varchar len
kwannoel Oct 28, 2024
90085a8
fix
kwannoel Oct 29, 2024
522ad34
fix
kwannoel Oct 29, 2024
b50c38c
fix
kwannoel Oct 29, 2024
406f3b0
fix
kwannoel Oct 29, 2024
03bf907
add TableFunctionToMySqlQueryRule
kwannoel Oct 29, 2024
e420c8c
fix
kwannoel Oct 29, 2024
d3923af
handle more types
kwannoel Oct 29, 2024
90958d5
try again
kwannoel Oct 29, 2024
ea78a3d
add more context to error
kwannoel Oct 29, 2024
2e4c15c
refine
kwannoel Oct 29, 2024
e909dd4
try fix
kwannoel Oct 29, 2024
30908a0
fix test
kwannoel Oct 30, 2024
2606251
explicitly bind unsupported types
kwannoel Oct 30, 2024
20b224f
support more types
kwannoel Oct 30, 2024
adcf79b
extract common logic
kwannoel Oct 30, 2024
90f7d71
reuse common parsing logic
kwannoel Oct 30, 2024
48503b0
add risedev profile for local inline tests
kwannoel Oct 30, 2024
38d447c
make handling of tinyint straightforward
kwannoel Oct 30, 2024
f3ec0ff
handle more types
kwannoel Oct 30, 2024
02d983d
test all types
kwannoel Oct 30, 2024
e7d6070
test jsonb
kwannoel Oct 30, 2024
5f31f3d
add chrono feature
kwannoel Oct 30, 2024
bbbca68
test null
kwannoel Oct 30, 2024
5b54d1b
cleanup docs
kwannoel Oct 30, 2024
db7d3bd
fix source test
kwannoel Oct 30, 2024
aff5786
use chunk_size instead of magic value
kwannoel Nov 1, 2024
0aa4fba
safely parse port
kwannoel Nov 1, 2024
f389b4d
fix typo + handling parse error
kwannoel Nov 1, 2024
e84ce45
move conversion to connector
kwannoel Nov 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ deltalake = { version = "0.20.1", features = [
itertools = "0.13.0"
jsonbb = "0.1.4"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" }
mysql_async = { version = "0.34", default-features = false, features = [
"default",
] }
parquet = { version = "53", features = ["async"] }
thiserror-ext = "0.1.2"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
Expand Down
73 changes: 73 additions & 0 deletions e2e_test/source_inline/tvf/mysql_query.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
control substitution on

system ok
mysql -e "DROP DATABASE IF EXISTS tvf; CREATE DATABASE tvf;"

system ok
mysql -e "
USE tvf;
CREATE TABLE test (
id bigint primary key,
v0 bit,
v1 bool,
v2 tinyint(1),
v3 tinyint(2),
v4 smallint,
v5 mediumint,
v6 integer,
v7 bigint,
v8 float,
v9 double,
v10 numeric(4, 2),
v11 decimal(4, 2),
v12 char(255),
v13 varchar(255),
v14 bit(10),
v15 tinyblob,
v16 blob,
v17 mediumblob,
v18 longblob,
v19 date,
v20 time,
v21 timestamp,
v22 json,
v23 int
);
INSERT INTO test SELECT
1 as id,
true as v0,
true as v1,
2 as v2,
3 as v3,
4 as v4,
5 as v5,
6 as v6,
7 as v7,
1.08 as v8,
1.09 as v9,
1.10 as v10,
1.11 as v11,
'char' as v12,
'varchar' as v13,
b'1010' as v14,
x'16' as v15,
x'17' as v16,
x'18' as v17,
x'19' as v18,
'2021-01-01' as v19,
'12:34:56' as v20,
'2021-01-01 12:34:56' as v21,
JSON_OBJECT('key1', 1, 'key2', 'abc') as v22,
null as v23;
"

query
select * from mysql_query('$MYSQL_HOST', '$MYSQL_TCP_PORT', '$RISEDEV_MYSQL_USER', '$MYSQL_PWD', 'tvf', 'select * from test;');
----
1 t 1 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x000a \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL

system ok
mysql -e "
USE tvf;
DROP DATABASE tvf;
"
4 changes: 2 additions & 2 deletions e2e_test/source_legacy/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ SELECT
c_binary_255
FROM rw_mysql_types_test order by c_boolean;
----
0 NULL NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
1 NULL -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
0 f NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this changed? Could you please note the "changes" of the "Type mapping table" in the release note?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the release notes, previously only mentioned them in the PR description. It's because we support bit conversion now.

1 t -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000

query TTTTTTTT
SELECT
Expand Down
12 changes: 12 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ message PostgresQueryNode {
string query = 7;
}

// NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed.
message MySqlQueryNode {
repeated plan_common.ColumnDesc columns = 1;
string hostname = 2;
string port = 3;
string username = 4;
string password = 5;
string database = 6;
string query = 7;
}

message ProjectNode {
repeated expr.ExprNode select_list = 1;
}
Expand Down Expand Up @@ -386,6 +397,7 @@ message PlanNode {
FileScanNode file_scan = 38;
IcebergScanNode iceberg_scan = 39;
PostgresQueryNode postgres_query = 40;
MySqlQueryNode mysql_query = 41;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
2 changes: 2 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ message TableFunction {
FILE_SCAN = 19;
// postgres query
POSTGRES_QUERY = 20;
// mysql query
MYSQL_QUERY = 21;
// User defined table function
USER_DEFINED = 100;
}
Expand Down
19 changes: 19 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,25 @@ profile:
address: schemaregistry
port: 8082

local-inline-source-test:
config-path: src/config/ci-recovery.toml
steps:
- use: minio
- use: sqlite
- use: meta-node
meta-backend: sqlite
- use: compute-node
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: pubsub
persist-data: true
- use: kafka
persist-data: true
- use: schema-registry
- use: mysql
- use: postgres

ci-inline-source-test:
config-path: src/config/ci-recovery.toml
steps:
Expand Down
3 changes: 3 additions & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
bytes = "1"
chrono = "0.4"
either = "1"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand All @@ -29,6 +30,7 @@ hytra = "0.1.2"
iceberg = { workspace = true }
itertools = { workspace = true }
memcomparable = "0.2"
mysql_async = { workspace = true }
opendal = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true }
Expand All @@ -45,6 +47,7 @@ risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_storage = { workspace = true }
rust_decimal = "1"
rw_futures_util = { workspace = true }
scopeguard = "1"
serde_json = "1"
Expand Down
9 changes: 6 additions & 3 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
use std::sync::Arc;

pub use anyhow::anyhow;
use iceberg::Error as IcebergError;
use mysql_async::Error as MySqlError;
use parquet::errors::ParquetError;
use risingwave_common::array::ArrayError;
use risingwave_common::error::{def_anyhow_newtype, def_anyhow_variant, BoxedError};
Expand All @@ -29,7 +31,7 @@ use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
use risingwave_storage::error::StorageError;
use thiserror::Error;
use thiserror_ext::Construct;
use tokio_postgres;
use tokio_postgres::Error as PostgresError;
use tonic::Status;

use crate::worker_manager::worker_node_manager::FragmentId;
Expand Down Expand Up @@ -192,7 +194,8 @@ def_anyhow_variant! {
pub BatchExternalSystemError,
BatchError ExternalSystemError,

tokio_postgres::Error => "Postgres error",
iceberg::Error => "Iceberg error",
PostgresError => "Postgres error",
IcebergError => "Iceberg error",
ParquetError => "Parquet error",
MySqlError => "MySQL error",
}
3 changes: 3 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod managed;
mod max_one_row;
mod merge_sort;
mod merge_sort_exchange;
mod mysql_query;
mod order_by;
mod postgres_query;
mod project;
Expand Down Expand Up @@ -65,6 +66,7 @@ pub use managed::*;
pub use max_one_row::*;
pub use merge_sort::*;
pub use merge_sort_exchange::*;
pub use mysql_query::*;
pub use order_by::*;
pub use postgres_query::*;
pub use project::*;
Expand Down Expand Up @@ -247,6 +249,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::FileScan => FileScanExecutorBuilder,
NodeBody::IcebergScan => IcebergScanExecutorBuilder,
NodeBody::PostgresQuery => PostgresQueryExecutorBuilder,
NodeBody::MysqlQuery => MySqlQueryExecutorBuilder,
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuilder,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder,
Expand Down
Loading
Loading