Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): auto schema change for mysql cdc #17876

Merged
merged 93 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
606b1d0
enable ddl event
StrikeW Jun 16, 2024
99c0785
split schema event to a chunk
StrikeW Jul 8, 2024
245ddbe
Merge remote-tracking branch 'origin/main' into siyuan/cdc-schema-change
StrikeW Jul 11, 2024
dbb18a5
skeleton to parse cdc message
StrikeW Jul 11, 2024
bd123ff
Merge branch 'siyuan/cdc-schema-change' of github.com:risingwavelabs/…
StrikeW Jul 11, 2024
5971ea2
minor
StrikeW Jul 11, 2024
1d60a6a
impl mysql schema event parser
StrikeW Jul 16, 2024
da9d689
Merge remote-tracking branch 'origin/main' into siyuan/cdc-schema-change
StrikeW Jul 17, 2024
7d2a408
meta->frontend rpc skeleton
StrikeW Jul 21, 2024
e1cc54a
call frontend to generate new table plan
StrikeW Jul 22, 2024
58e1817
minor
StrikeW Jul 22, 2024
35f9b4f
launch rpc service
StrikeW Jul 22, 2024
532c31e
refactor frontend service
StrikeW Jul 23, 2024
f507196
add cdc table name to PbTable
StrikeW Jul 23, 2024
9b85887
wip: meta->frontend rpc implementation
StrikeW Jul 23, 2024
33857bb
wip: rpc source parser->meta
StrikeW Jul 24, 2024
850f1d0
refine source -> meta -> frontend rpc
StrikeW Jul 24, 2024
6ffeb3b
submit schema change to spawned task in source exec
StrikeW Jul 24, 2024
44f736f
Merge remote-tracking branch 'origin/main' into siyuan/cdc-schema-change
StrikeW Jul 26, 2024
266354b
meta->frontend rpc skeleton
StrikeW Jul 21, 2024
32b9377
call frontend to generate new table plan
StrikeW Jul 22, 2024
253ea41
minor
StrikeW Jul 22, 2024
e6cfb50
launch rpc service
StrikeW Jul 22, 2024
f1c41b0
refactor frontend service
StrikeW Jul 23, 2024
dcb359e
add cdc table name to PbTable
StrikeW Jul 23, 2024
0539172
wip: meta->frontend rpc implementation
StrikeW Jul 23, 2024
f1fff64
wip: rpc source parser->meta
StrikeW Jul 24, 2024
5d97d73
refine source -> meta -> frontend rpc
StrikeW Jul 24, 2024
ead027b
submit schema change to spawned task in source exec
StrikeW Jul 24, 2024
7d1fe85
Merge branch 'siyuan/cdc-handle-schema-event' of github.com:risingwav…
StrikeW Jul 27, 2024
1295374
wip: debuging rpc
StrikeW Jul 27, 2024
82815d1
finish auto replace workflow
StrikeW Jul 28, 2024
7be0555
register frontend rpc addr to meta
StrikeW Jul 29, 2024
78088c9
add e2e test
StrikeW Jul 29, 2024
a2b2a2b
minor
StrikeW Jul 29, 2024
bd3cbaf
minor
StrikeW Jul 29, 2024
73878f4
refine
StrikeW Jul 29, 2024
09b7979
refine parsing
StrikeW Jul 29, 2024
950a09d
add
StrikeW Jul 30, 2024
27a7017
clean
StrikeW Jul 30, 2024
158ff10
format
StrikeW Jul 30, 2024
a7c00d6
clippy
StrikeW Jul 30, 2024
7cd2788
clean code
StrikeW Jul 31, 2024
702d146
minor
StrikeW Jul 31, 2024
12778aa
clean again
StrikeW Jul 31, 2024
6452745
fix comment
StrikeW Jul 31, 2024
c50856a
minor
StrikeW Jul 31, 2024
a8f52e4
fix comments
StrikeW Aug 1, 2024
4ce2c47
Merge branch 'siyuan/cdc-schema-change' into siyuan/cdc-handle-schema…
StrikeW Aug 1, 2024
b85a604
remove fullTableName
StrikeW Aug 1, 2024
3532e74
Merge remote-tracking branch 'origin/siyuan/cdc-schema-change' into s…
StrikeW Aug 1, 2024
22e67e8
minor
StrikeW Aug 1, 2024
70c9299
fix type cast
StrikeW Aug 2, 2024
2c435d9
Merge remote-tracking branch 'origin/siyuan/cdc-schema-change' into s…
StrikeW Aug 2, 2024
3012b57
Merge remote-tracking branch 'origin/siyuan/cdc-handle-schema-event' …
StrikeW Aug 2, 2024
af8bca9
clean
StrikeW Aug 2, 2024
5b1d8d6
clippy
StrikeW Aug 2, 2024
da9ee1b
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 2, 2024
6d18dce
fix comments
StrikeW Aug 6, 2024
d1711c9
fix check
StrikeW Aug 7, 2024
3038bf2
add license check
StrikeW Aug 7, 2024
717f1af
Merge branch 'main' into siyuan/cdc-handle-schema-event
StrikeW Aug 7, 2024
6ac586b
fix ut
StrikeW Aug 7, 2024
331409a
Merge branch 'siyuan/cdc-handle-schema-event' of github.com:risingwav…
StrikeW Aug 7, 2024
0dee439
minor
StrikeW Aug 7, 2024
90bc49e
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 7, 2024
f17d1c2
fix sim test
StrikeW Aug 8, 2024
40ccdd4
fix sim test
StrikeW Aug 8, 2024
ed91f7e
refactor frontend rpc addr impl
StrikeW Aug 12, 2024
259c055
fix
StrikeW Aug 12, 2024
df65235
add feature guard flag
StrikeW Aug 14, 2024
766313c
retry with grpc errors
StrikeW Aug 14, 2024
df80dd6
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 14, 2024
756d840
Revert "chore: bump `tonic` to v0.12 (#17889)"
StrikeW Aug 14, 2024
219a77e
minor
StrikeW Aug 14, 2024
ddd667a
Reapply "chore: bump `tonic` to v0.12 (#17889)"
StrikeW Aug 14, 2024
deefc9a
fix
StrikeW Aug 14, 2024
08d355b
fix
StrikeW Aug 14, 2024
5d238d2
fix
StrikeW Aug 14, 2024
1b3ee00
fix
StrikeW Aug 14, 2024
864fae2
rename cdc_table_name -> cdc_table_id
StrikeW Aug 16, 2024
a7fc383
fix
StrikeW Aug 16, 2024
7609748
fix
StrikeW Aug 16, 2024
52ddfeb
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 19, 2024
844cbf0
fix ut
StrikeW Aug 19, 2024
b853f20
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 19, 2024
27fa578
minor
StrikeW Aug 19, 2024
e0fd50c
minor
StrikeW Aug 19, 2024
ec742de
fix
StrikeW Aug 20, 2024
31162c8
try fix
StrikeW Aug 20, 2024
d68aca5
init logger for test_sink_scale
StrikeW Aug 20, 2024
d003d2b
Merge remote-tracking branch 'origin/main' into siyuan/cdc-handle-sch…
StrikeW Aug 20, 2024
533cf22
unblock ci
StrikeW Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,8 @@ Caused by these errors (recent errors listed first):
2: invalid IPv4 address


statement error
statement error Failed to run the query
create function int_42() returns int as int_42 using link '55.55.55.55:5555';
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: failed to check UDF signature
2: failed to send requests to UDF service
3: status: Unknown, message: "transport error", details: [], metadata: MetadataMap { headers: {} }
4: transport error
5: connection error
6: connection reset


statement error
Expand Down
69 changes: 69 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_change_mysql.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
control substitution on

system ok
mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"

system ok
mysql -e "
USE mytest;
DROP TABLE IF EXISTS customers;
CREATE TABLE customers(
id BIGINT PRIMARY KEY,
modified DATETIME,
custinfo JSON
);
ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) );
"

statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'root',
password = '${MYSQL_PWD:}',
database.name = 'mytest',
server.id = '5701',
auto.schema.change = 'true'
);

statement ok
create table rw_customers (id bigint, modified timestamp, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers';

# Name, Type, Is Hidden, Description
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL


system ok
mysql -e "
USE mytest;
ALTER TABLE customers ADD COLUMN v1 VARCHAR(255);
ALTER TABLE customers ADD COLUMN v2 double(5,2);
"

sleep 3s

# Name, Type, Is Hidden, Description
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL


statement ok
drop source mysql_source cascade;
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ table.include.list=${database.name}.${table.name:-*}
schema.history.internal.store.only.captured.tables.ddl=true
schema.history.internal.store.only.captured.databases.ddl=true
# default to disable schema change events
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
include.schema.changes=${debezium.include.schema.changes:-false}
include.schema.changes=${auto.schema.change:-false}
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
database.server.id=${server.id}
# default to use unencrypted connection
database.ssl.mode=${ssl.mode:-disabled}
Expand Down
5 changes: 5 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ message Table {
// conflict" operations.
optional uint32 version_column_index = 38;

// The unique identifier of the upstream table if it is a CDC table.
// It will be used in auto schema change to get the Table which mapped to the
// upstream table.
optional string cdc_table_id = 39;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
2 changes: 2 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ message WorkerNode {
bool is_streaming = 1;
bool is_serving = 2;
bool is_unschedulable = 3;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 4;
}
message Resource {
string rw_version = 1;
Expand Down
9 changes: 8 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -454,14 +454,20 @@
}

TableChangeType change_type = 1;
string cdc_table_name = 2;
string cdc_table_id = 2;

Check failure on line 457 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "cdc_table_id" on message "TableSchemaChange" changed option "json_name" from "cdcTableName" to "cdcTableId".

Check failure on line 457 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" on message "TableSchemaChange" changed name from "cdc_table_name" to "cdc_table_id".
repeated plan_common.ColumnCatalog columns = 3;
}

message SchemaChangeEnvelope {
repeated TableSchemaChange table_changes = 1;
}

message AutoSchemaChangeRequest {
SchemaChangeEnvelope schema_change = 1;
}

message AutoSchemaChangeResponse {}

service DdlService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse);
Expand Down Expand Up @@ -500,4 +506,5 @@
rpc GetTables(GetTablesRequest) returns (GetTablesResponse);
rpc Wait(WaitRequest) returns (WaitResponse);
rpc CommentOn(CommentOnRequest) returns (CommentOnResponse);
rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse);
}
23 changes: 23 additions & 0 deletions proto/frontend_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

package frontend_service;

import "ddl_service.proto";

option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message GetTableReplacePlanRequest {
uint32 database_id = 1;
uint32 owner = 2;
string table_name = 3;
ddl_service.TableSchemaChange table_change = 4;
}

message GetTableReplacePlanResponse {
ddl_service.ReplaceTablePlan replace_plan = 1;
}

service FrontendService {
rpc GetTableReplacePlan(GetTableReplacePlanRequest) returns (GetTableReplacePlanResponse);
}
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ message AddWorkerNodeRequest {
bool is_streaming = 2;
bool is_serving = 3;
bool is_unschedulable = 4;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 5;
}
common.WorkerType worker_type = 1;
common.HostAddress host = 2;
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ impl SourceExecutor {
rate_limit: None,
},
ConnectorProperties::default(),
None,
));
let stream = self
.source
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(1),
..Default::default()
Expand All @@ -444,6 +445,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
}),
transactional_id: Some(2),
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ mod test {
],
),
prometheus_listener_addr: "127.0.0.1:1234",
health_check_listener_addr: "127.0.0.1:6786",
frontend_rpc_listener_addr: "127.0.0.1:6786",
config_path: "src/config/test.toml",
metrics_level: None,
enable_barrier_read: None,
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,10 @@ pub struct StreamingDeveloperConfig {
/// If not specified, the value of `server.connection_pool_size` will be used.
#[serde(default = "default::developer::stream_exchange_connection_pool_size")]
pub exchange_connection_pool_size: Option<u16>,

/// A flag to allow disabling the auto schema change handling
#[serde(default = "default::developer::stream_enable_auto_schema_change")]
pub enable_auto_schema_change: bool,
}

/// The subsections `[batch.developer]`.
Expand Down Expand Up @@ -1903,6 +1907,10 @@ pub mod default {
pub fn enable_actor_tokio_metrics() -> bool {
false
}

pub fn stream_enable_auto_schema_change() -> bool {
true
}
}

pub use crate::system_param::default as system;
Expand Down
7 changes: 4 additions & 3 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
Expand All @@ -231,7 +232,7 @@ mod tests {
let worker_1 = WorkerNode {
id: 1,
parallelism: 1,
property: Some(serving_property),
property: Some(serving_property.clone()),
..Default::default()
};

Expand All @@ -246,7 +247,7 @@ mod tests {
let worker_2 = WorkerNode {
id: 2,
parallelism: 50,
property: Some(serving_property),
property: Some(serving_property.clone()),
..Default::default()
};

Expand All @@ -265,7 +266,7 @@ mod tests {
let worker_3 = WorkerNode {
id: 3,
parallelism: 60,
property: Some(serving_property),
property: Some(serving_property.clone()),
..Default::default()
};
let re_pu_mapping_2 = place_vnode(
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub async fn compute_node_serve(
is_streaming: opts.role.for_streaming(),
is_serving: opts.role.for_serving(),
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
},
&config.meta,
)
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ stream_enable_arrangement_backfill = true
stream_high_join_amplification_threshold = 2048
stream_enable_actor_tokio_metrics = false
stream_exchange_connection_pool_size = 1
stream_enable_auto_schema_change = true

[storage]
share_buffers_sync_parallelism = 1
Expand Down
15 changes: 13 additions & 2 deletions src/connector/src/parser/debezium/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ impl From<&str> for TableChangeType {

#[derive(Debug)]
pub struct TableSchemaChange {
pub(crate) cdc_table_name: String,
pub(crate) cdc_table_id: String,
pub(crate) columns: Vec<ColumnCatalog>,
pub(crate) change_type: TableChangeType,
}

impl SchemaChangeEnvelope {
pub fn is_empty(&self) -> bool {
self.table_changes.is_empty()
}

pub fn to_protobuf(&self) -> PbSchemaChangeEnvelope {
let table_changes = self
.table_changes
Expand All @@ -83,12 +87,19 @@ impl SchemaChangeEnvelope {
.collect();
PbTableSchemaChange {
change_type: table_change.change_type.to_proto() as _,
cdc_table_name: table_change.cdc_table_name.clone(),
cdc_table_id: table_change.cdc_table_id.clone(),
columns,
}
})
.collect();

PbSchemaChangeEnvelope { table_changes }
}

pub fn table_names(&self) -> Vec<String> {
self.table_changes
.iter()
.map(|table_change| table_change.cdc_table_id.clone())
.collect()
}
}
22 changes: 20 additions & 2 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,8 +836,26 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
}
},

Ok(ParseResult::SchemaChange(_)) => {
// TODO
Ok(ParseResult::SchemaChange(schema_change)) => {
if schema_change.is_empty() {
continue;
}

let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
// we bubble up the schema change event to the source executor via channel,
// and wait for the source executor to finish the schema change process before
// parsing the following messages.
if let Some(ref tx) = parser.source_ctx().schema_change_tx {
tx.send((schema_change, oneshot_tx))
.await
.expect("send schema change to executor");
match oneshot_rx.await {
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
Ok(()) => {}
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
Err(e) => {
tracing::error!(error = %e.as_report(), "failed to wait for schema change");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding metrics for auto schema change trigger count, failure count and duration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.

}
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ mod tests {
SchemaChangeEnvelope {
table_changes: [
TableSchemaChange {
cdc_table_name: "mydb.test",
cdc_table_id: "mydb.test",
columns: [
ColumnCatalog {
column_desc: ColumnDesc {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub fn parse_schema_change(
}
}
schema_changes.push(TableSchemaChange {
cdc_table_name: id.replace('"', ""), // remove the double quotes
cdc_table_id: id.replace('"', ""), // remove the double quotes
columns: column_descs
.into_iter()
.map(|column_desc| ColumnCatalog {
Expand Down
Loading
Loading