Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): auto schema change for mysql cdc #17876

Merged
merged 93 commits into from
Aug 20, 2024
Merged
Changes from 1 commit
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
606b1d0
enable ddl event
StrikeW Jun 16, 2024
99c0785
split schema event to a chunk
StrikeW Jul 8, 2024
245ddbe
Merge remote-tracking branch 'origin/main' into siyuan/cdc-schema-change
StrikeW Jul 11, 2024
dbb18a5
skeleton to parse cdc message
StrikeW Jul 11, 2024
bd123ff
Merge branch 'siyuan/cdc-schema-change' of github.com:risingwavelabs/…
StrikeW Jul 11, 2024
5971ea2
minor
StrikeW Jul 11, 2024
1d60a6a
impl mysql schema event parser
StrikeW Jul 16, 2024
da9d689
Merge remote-tracking branch 'origin/main' into siyuan/cdc-schema-change
StrikeW Jul 17, 2024
7d2a408
meta->frontend rpc skeleton
StrikeW Jul 21, 2024
e1cc54a
call frontend to generate new table plan
StrikeW Jul 22, 2024
58e1817
minor
StrikeW Jul 22, 2024
35f9b4f
launch rpc service
StrikeW Jul 22, 2024
532c31e
refactor frontend service
StrikeW Jul 23, 2024
f507196
add cdc table name to PbTable
StrikeW Jul 23, 2024
9b85887
wip: meta->frontend rpc implementation
StrikeW Jul 23, 2024
33857bb
wip: rpc source parser->meta
StrikeW Jul 24, 2024
850f1d0
refine source -> meta -> frontend rpc
StrikeW Jul 24, 2024
6ffeb3b
submit schema change to spawned task in source exec
StrikeW Jul 24, 2024
44f736f
Merge remote-tracking branch 'origin/main' into siyuan/cdc-schema-change
StrikeW Jul 26, 2024
266354b
meta->frontend rpc skeleton
StrikeW Jul 21, 2024
32b9377
call frontend to generate new table plan
StrikeW Jul 22, 2024
253ea41
minor
StrikeW Jul 22, 2024
e6cfb50
launch rpc service
StrikeW Jul 22, 2024
f1c41b0
refactor frontend service
StrikeW Jul 23, 2024
dcb359e
add cdc table name to PbTable
StrikeW Jul 23, 2024
0539172
wip: meta->frontend rpc implementation
StrikeW Jul 23, 2024
f1fff64
wip: rpc source parser->meta
StrikeW Jul 24, 2024
5d97d73
refine source -> meta -> frontend rpc
StrikeW Jul 24, 2024
ead027b
submit schema change to spawned task in source exec
StrikeW Jul 24, 2024
7d1fe85
Merge branch 'siyuan/cdc-handle-schema-event' of github.com:risingwav…
StrikeW Jul 27, 2024
1295374
wip: debuging rpc
StrikeW Jul 27, 2024
82815d1
finish auto replace workflow
StrikeW Jul 28, 2024
7be0555
register frontend rpc addr to meta
StrikeW Jul 29, 2024
78088c9
add e2e test
StrikeW Jul 29, 2024
a2b2a2b
minor
StrikeW Jul 29, 2024
bd3cbaf
minor
StrikeW Jul 29, 2024
73878f4
refine
StrikeW Jul 29, 2024
09b7979
refine parsing
StrikeW Jul 29, 2024
950a09d
add
StrikeW Jul 30, 2024
27a7017
clean
StrikeW Jul 30, 2024
158ff10
format
StrikeW Jul 30, 2024
a7c00d6
clippy
StrikeW Jul 30, 2024
7cd2788
clean code
StrikeW Jul 31, 2024
702d146
minor
StrikeW Jul 31, 2024
12778aa
clean again
StrikeW Jul 31, 2024
6452745
fix comment
StrikeW Jul 31, 2024
c50856a
minor
StrikeW Jul 31, 2024
a8f52e4
fix comments
StrikeW Aug 1, 2024
4ce2c47
Merge branch 'siyuan/cdc-schema-change' into siyuan/cdc-handle-schema…
StrikeW Aug 1, 2024
b85a604
remove fullTableName
StrikeW Aug 1, 2024
3532e74
Merge remote-tracking branch 'origin/siyuan/cdc-schema-change' into s…
StrikeW Aug 1, 2024
22e67e8
minor
StrikeW Aug 1, 2024
70c9299
fix type cast
StrikeW Aug 2, 2024
2c435d9
Merge remote-tracking branch 'origin/siyuan/cdc-schema-change' into s…
StrikeW Aug 2, 2024
3012b57
Merge remote-tracking branch 'origin/siyuan/cdc-handle-schema-event' …
StrikeW Aug 2, 2024
af8bca9
clean
StrikeW Aug 2, 2024
5b1d8d6
clippy
StrikeW Aug 2, 2024
da9ee1b
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 2, 2024
6d18dce
fix comments
StrikeW Aug 6, 2024
d1711c9
fix check
StrikeW Aug 7, 2024
3038bf2
add license check
StrikeW Aug 7, 2024
717f1af
Merge branch 'main' into siyuan/cdc-handle-schema-event
StrikeW Aug 7, 2024
6ac586b
fix ut
StrikeW Aug 7, 2024
331409a
Merge branch 'siyuan/cdc-handle-schema-event' of github.com:risingwav…
StrikeW Aug 7, 2024
0dee439
minor
StrikeW Aug 7, 2024
90bc49e
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 7, 2024
f17d1c2
fix sim test
StrikeW Aug 8, 2024
40ccdd4
fix sim test
StrikeW Aug 8, 2024
ed91f7e
refactor frontend rpc addr impl
StrikeW Aug 12, 2024
259c055
fix
StrikeW Aug 12, 2024
df65235
add feature guard flag
StrikeW Aug 14, 2024
766313c
retry with grpc errors
StrikeW Aug 14, 2024
df80dd6
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 14, 2024
756d840
Revert "chore: bump `tonic` to v0.12 (#17889)"
StrikeW Aug 14, 2024
219a77e
minor
StrikeW Aug 14, 2024
ddd667a
Reapply "chore: bump `tonic` to v0.12 (#17889)"
StrikeW Aug 14, 2024
deefc9a
fix
StrikeW Aug 14, 2024
08d355b
fix
StrikeW Aug 14, 2024
5d238d2
fix
StrikeW Aug 14, 2024
1b3ee00
fix
StrikeW Aug 14, 2024
864fae2
rename cdc_table_name -> cdc_table_id
StrikeW Aug 16, 2024
a7fc383
fix
StrikeW Aug 16, 2024
7609748
fix
StrikeW Aug 16, 2024
52ddfeb
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 19, 2024
844cbf0
fix ut
StrikeW Aug 19, 2024
b853f20
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 19, 2024
27fa578
minor
StrikeW Aug 19, 2024
e0fd50c
minor
StrikeW Aug 19, 2024
ec742de
fix
StrikeW Aug 20, 2024
31162c8
try fix
StrikeW Aug 20, 2024
d68aca5
init logger for test_sink_scale
StrikeW Aug 20, 2024
d003d2b
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 20, 2024
533cf22
unblock ci
StrikeW Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix comments
StrikeW committed Aug 1, 2024
commit a8f52e4585631cadb88699de907c17a814e0f3da
6 changes: 1 addition & 5 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
@@ -161,14 +161,10 @@
string offset = 3;
string full_table_name = 4;
int64 source_ts_ms = 5;

// Deprecated: use `msg_type` instead
reserved "is_transaction_meta";
reserved 6;
CdcMessageType msg_type = 6;

Check failure on line 164 in proto/connector_service.proto

GitHub Actions / Check breaking changes in Protobuf files

Field "6" with name "msg_type" on message "CdcMessage" changed option "json_name" from "isTransactionMeta" to "msgType".

Check failure on line 164 in proto/connector_service.proto

GitHub Actions / Check breaking changes in Protobuf files

Field "6" with name "msg_type" on message "CdcMessage" changed type from "bool" to "enum". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 164 in proto/connector_service.proto

GitHub Actions / Check breaking changes in Protobuf files

Field "6" on message "CdcMessage" changed name from "is_transaction_meta" to "msg_type".

// The key of the Debezium message, which only used by `mongodb-cdc` connector.
string key = 7;
CdcMessageType msg_type = 8;
}

enum SourceType {
6 changes: 3 additions & 3 deletions src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
@@ -44,9 +44,9 @@ pub enum AccessError {

#[error(transparent)]
NotImplemented(#[from] NotImplemented),

#[error(transparent)]
Other(#[from] anyhow::Error),
// NOTE: We intentionally don't embed `anyhow::Error` in `AccessError` since it happens
// in record-level and it might be too heavy to capture the backtrace
// when creating a new `anyhow::Error`.
}

pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;
7 changes: 5 additions & 2 deletions src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::types::{
DataType, Date, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
use risingwave_connector_codec::decoder::{AccessError, AccessResult};
use rust_decimal::Decimal as RustDecimal;
use thiserror_ext::AsReport;

@@ -141,7 +142,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
OwnedRow::new(datums)
}

pub fn mysql_typename_to_rw_type(type_name: &str) -> anyhow::Result<DataType> {
pub fn mysql_typename_to_rw_type(type_name: &str) -> AccessResult<DataType> {
match type_name.to_lowercase().as_str() {
"tinyint" | "smallint" => Ok(DataType::Int16),
"int" => Ok(DataType::Int32),
@@ -156,7 +157,9 @@ pub fn mysql_typename_to_rw_type(type_name: &str) -> anyhow::Result<DataType> {
"datetime" => Ok(DataType::Timestamp),
"json" => Ok(DataType::Jsonb),
"binary" | "varbinary" | "blob" | "mediumblob" | "longblob" => Ok(DataType::Bytea),
_ => Err(anyhow::anyhow!("unsupported type: {}", type_name)),
_ => Err(AccessError::UnsupportedType {
ty: type_name.to_string(),
}),
}
}

77 changes: 64 additions & 13 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
@@ -202,6 +202,7 @@ mod tests {
use std::ops::Deref;
use std::sync::Arc;

use expect_test::expect;
use futures::executor::block_on;
use futures::StreamExt;
use futures_async_stream::try_stream;
@@ -210,7 +211,6 @@ mod tests {
use risingwave_pb::connector_service::cdc_message;

use super::*;
use crate::parser::schema_change::TableChangeType;
use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
use crate::source::cdc::DebeziumCdcMeta;
use crate::source::{ConnectorProperties, DataType, SourceMessage, SplitId};
@@ -503,17 +503,68 @@ mod tests {
.await;

let res = res.unwrap();
match res {
ParseResult::SchemaChange(schema_change) => {
assert_eq!(schema_change.table_changes.len(), 1);
let table_change = &schema_change.table_changes[0];
assert_eq!(table_change.cdc_table_name, "mydb.test");
assert_eq!(table_change.change_type, TableChangeType::Alter);
assert_eq!(table_change.columns.len(), 3);
let column_names = table_change.columns.iter().map(|c| c.name()).collect_vec();
assert_eq!(column_names, vec!["id", "v1", "v2"]);
}
_ => panic!("unexpected parse result: {:?}", res),
}
expect![[r#"
SchemaChange(
SchemaChangeEnvelope {
table_changes: [
TableSchemaChange {
cdc_table_name: "mydb.test",
columns: [
ColumnCatalog {
column_desc: ColumnDesc {
data_type: Int32,
column_id: #2147483646,
name: "id",
field_descs: [],
type_name: "",
generated_or_default_column: None,
description: None,
additional_column: AdditionalColumn {
column_type: None,
},
version: Pr13707,
},
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc {
data_type: Timestamptz,
column_id: #2147483646,
name: "v1",
field_descs: [],
type_name: "",
generated_or_default_column: None,
description: None,
additional_column: AdditionalColumn {
column_type: None,
},
version: Pr13707,
},
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc {
data_type: Varchar,
column_id: #2147483646,
name: "v2",
field_descs: [],
type_name: "",
generated_or_default_column: None,
description: None,
additional_column: AdditionalColumn {
column_type: None,
},
version: Pr13707,
},
is_hidden: false,
},
],
change_type: Alter,
},
],
},
)
"#]]
.assert_debug_eq(&res);
}
}